bitwarden_ipc/
ipc_client.rs

1use std::sync::Arc;
2
3use bitwarden_error::bitwarden_error;
4use bitwarden_threading::cancellation_token::CancellationToken;
5use serde::de::DeserializeOwned;
6use thiserror::Error;
7use tokio::{select, sync::RwLock};
8
9use crate::{
10    constants::CHANNEL_BUFFER_CAPACITY,
11    endpoint::Endpoint,
12    message::{
13        IncomingMessage, OutgoingMessage, PayloadTypeName, TypedIncomingMessage,
14        TypedOutgoingMessage,
15    },
16    rpc::{
17        error::RpcError,
18        exec::handler_registry::RpcHandlerRegistry,
19        request::RpcRequest,
20        request_message::{RpcRequestMessage, RpcRequestPayload, RPC_REQUEST_PAYLOAD_TYPE_NAME},
21        response_message::{IncomingRpcResponseMessage, OutgoingRpcResponseMessage},
22    },
23    serde_utils,
24    traits::{CommunicationBackend, CryptoProvider, SessionRepository},
25    RpcHandler,
26};
27
28/// An IPC client that handles communication between different components and clients.
29/// It uses a crypto provider to encrypt and decrypt messages, a communication backend to send and
30/// receive messages, and a session repository to persist sessions.
31pub struct IpcClient<Crypto, Com, Ses>
32where
33    Crypto: CryptoProvider<Com, Ses>,
34    Com: CommunicationBackend,
35    Ses: SessionRepository<Crypto::Session>,
36{
37    crypto: Crypto,
38    communication: Com,
39    sessions: Ses,
40
41    handlers: RpcHandlerRegistry,
42    incoming: RwLock<Option<tokio::sync::broadcast::Receiver<IncomingMessage>>>,
43    cancellation_token: RwLock<Option<CancellationToken>>,
44}
45
46/// A subscription to receive messages over IPC.
47/// The subcription will start buffering messages after its creation and return them
48/// when receive() is called. Messages received before the subscription was created will not be
49/// returned.
50pub struct IpcClientSubscription {
51    receiver: tokio::sync::broadcast::Receiver<IncomingMessage>,
52    topic: Option<String>,
53}
54
55/// A subscription to receive messages over IPC.
56/// The subcription will start buffering messages after its creation and return them
57/// when receive() is called. Messages received before the subscription was created will not be
58/// returned.
59pub struct IpcClientTypedSubscription<Payload: DeserializeOwned + PayloadTypeName>(
60    IpcClientSubscription,
61    std::marker::PhantomData<Payload>,
62);
63
64#[derive(Debug, Error, Clone, PartialEq, Eq)]
65#[bitwarden_error(flat)]
66#[allow(missing_docs)]
67pub enum SubscribeError {
68    #[error("The IPC processing thread is not running")]
69    NotStarted,
70}
71
72#[derive(Debug, Error, PartialEq, Eq)]
73#[bitwarden_error(flat)]
74#[allow(missing_docs)]
75pub enum ReceiveError {
76    #[error("Failed to subscribe to the IPC channel: {0}")]
77    Channel(#[from] tokio::sync::broadcast::error::RecvError),
78
79    #[error("Timed out while waiting for a message: {0}")]
80    Timeout(#[from] tokio::time::error::Elapsed),
81
82    #[error("Cancelled while waiting for a message")]
83    Cancelled,
84}
85
86#[derive(Debug, Error, PartialEq, Eq)]
87#[bitwarden_error(flat)]
88#[allow(missing_docs)]
89pub enum TypedReceiveError {
90    #[error("Failed to subscribe to the IPC channel: {0}")]
91    Channel(#[from] tokio::sync::broadcast::error::RecvError),
92
93    #[error("Timed out while waiting for a message: {0}")]
94    Timeout(#[from] tokio::time::error::Elapsed),
95
96    #[error("Cancelled while waiting for a message")]
97    Cancelled,
98
99    #[error("Typing error: {0}")]
100    Typing(String),
101}
102
103impl From<ReceiveError> for TypedReceiveError {
104    fn from(value: ReceiveError) -> Self {
105        match value {
106            ReceiveError::Channel(e) => TypedReceiveError::Channel(e),
107            ReceiveError::Timeout(e) => TypedReceiveError::Timeout(e),
108            ReceiveError::Cancelled => TypedReceiveError::Cancelled,
109        }
110    }
111}
112
113#[derive(Debug, Error, PartialEq, Eq)]
114#[bitwarden_error(flat)]
115#[allow(missing_docs)]
116pub enum RequestError {
117    #[error(transparent)]
118    Subscribe(#[from] SubscribeError),
119
120    #[error(transparent)]
121    Receive(#[from] TypedReceiveError),
122
123    #[error("Timed out while waiting for a message: {0}")]
124    Timeout(#[from] tokio::time::error::Elapsed),
125
126    #[error("Failed to send message: {0}")]
127    Send(String),
128
129    #[error("Error occured on the remote target: {0}")]
130    RpcError(#[from] RpcError),
131}
132
133impl<Crypto, Com, Ses> IpcClient<Crypto, Com, Ses>
134where
135    Crypto: CryptoProvider<Com, Ses>,
136    Com: CommunicationBackend,
137    Ses: SessionRepository<Crypto::Session>,
138{
139    /// Create a new IPC client with the provided crypto provider, communication backend, and
140    /// session repository.
141    pub fn new(crypto: Crypto, communication: Com, sessions: Ses) -> Arc<Self> {
142        Arc::new(Self {
143            crypto,
144            communication,
145            sessions,
146
147            handlers: RpcHandlerRegistry::new(),
148            incoming: RwLock::new(None),
149            cancellation_token: RwLock::new(None),
150        })
151    }
152
153    /// Start the IPC client, which will begin listening for incoming messages and processing them.
154    /// This will be done in a separate task, allowing the client to receive messages
155    /// asynchronously. The client will stop automatically if an error occurs during message
156    /// processing or if the cancellation token is triggered.
157    ///
158    /// Note: The client can and will send messages in the background while it is running, even if
159    /// no active subscriptions are present.
160    pub async fn start(self: &Arc<Self>) {
161        let cancellation_token = CancellationToken::new();
162        self.cancellation_token
163            .write()
164            .await
165            .replace(cancellation_token.clone());
166
167        let com_receiver = self.communication.subscribe().await;
168        let (client_tx, client_rx) = tokio::sync::broadcast::channel(CHANNEL_BUFFER_CAPACITY);
169
170        self.incoming.write().await.replace(client_rx);
171
172        let client = self.clone();
173        let future = async move {
174            loop {
175                let rpc_topic = RPC_REQUEST_PAYLOAD_TYPE_NAME.to_owned();
176                select! {
177                    _ = cancellation_token.cancelled() => {
178                        log::debug!("Cancellation signal received, stopping IPC client");
179                        break;
180                    }
181                    received = client.crypto.receive(&com_receiver, &client.communication, &client.sessions) => {
182                        match received {
183                            Ok(message) if message.topic == Some(rpc_topic) => {
184                                client.handle_rpc_request(message)
185                            }
186                            Ok(message) => {
187                                if client_tx.send(message).is_err() {
188                                    log::error!("Failed to save incoming message");
189                                    break;
190                                };
191                            }
192                            Err(e) => {
193                                log::error!("Error receiving message: {:?}", e);
194                                break;
195                            }
196                        }
197                    }
198                }
199            }
200            log::debug!("IPC client shutting down");
201            client.stop().await;
202        };
203
204        #[cfg(not(target_arch = "wasm32"))]
205        tokio::spawn(future);
206
207        #[cfg(target_arch = "wasm32")]
208        wasm_bindgen_futures::spawn_local(future);
209    }
210
211    /// Check if the IPC client task is currently running.
212    pub async fn is_running(self: &Arc<Self>) -> bool {
213        let has_incoming = self.incoming.read().await.is_some();
214        let has_cancellation_token = self.cancellation_token.read().await.is_some();
215        has_incoming && has_cancellation_token
216    }
217
218    /// Stop the IPC client task. This will stop listening for incoming messages.
219    pub async fn stop(self: &Arc<Self>) {
220        let mut incoming = self.incoming.write().await;
221        let _ = incoming.take();
222
223        let mut cancellation_token = self.cancellation_token.write().await;
224        if let Some(cancellation_token) = cancellation_token.take() {
225            cancellation_token.cancel();
226        }
227    }
228
229    /// Register a new RPC handler for processing incoming RPC requests.
230    /// The handler will be executed by the IPC client when an RPC request is received and
231    /// the response will be sent back over IPC.
232    pub async fn register_rpc_handler<H>(self: &Arc<Self>, handler: H)
233    where
234        H: RpcHandler + Send + Sync + 'static,
235    {
236        self.handlers.register(handler).await;
237    }
238
239    /// Send a message
240    pub async fn send(self: &Arc<Self>, message: OutgoingMessage) -> Result<(), Crypto::SendError> {
241        let result = self
242            .crypto
243            .send(&self.communication, &self.sessions, message)
244            .await;
245
246        if result.is_err() {
247            log::error!("Error sending message: {:?}", result);
248            self.stop().await;
249        }
250
251        result
252    }
253
254    /// Create a subscription to receive messages, optionally filtered by topic.
255    /// Setting the topic to `None` will receive all messages.
256    pub async fn subscribe(
257        self: &Arc<Self>,
258        topic: Option<String>,
259    ) -> Result<IpcClientSubscription, SubscribeError> {
260        Ok(IpcClientSubscription {
261            receiver: self
262                .incoming
263                .read()
264                .await
265                .as_ref()
266                .ok_or(SubscribeError::NotStarted)?
267                .resubscribe(),
268            topic,
269        })
270    }
271
272    /// Create a subscription to receive messages that can be deserialized into the provided payload
273    /// type.
274    pub async fn subscribe_typed<Payload>(
275        self: &Arc<Self>,
276    ) -> Result<IpcClientTypedSubscription<Payload>, SubscribeError>
277    where
278        Payload: DeserializeOwned + PayloadTypeName,
279    {
280        Ok(IpcClientTypedSubscription(
281            self.subscribe(Some(Payload::PAYLOAD_TYPE_NAME.to_owned()))
282                .await?,
283            std::marker::PhantomData,
284        ))
285    }
286
287    /// Send a request to the specified destination and wait for a response.
288    /// The destination must have a registered RPC handler for the request type, otherwise
289    /// an error will be returned by the remote endpoint.
290    pub async fn request<Request>(
291        self: &Arc<Self>,
292        request: Request,
293        destination: Endpoint,
294        cancellation_token: Option<CancellationToken>,
295    ) -> Result<Request::Response, RequestError>
296    where
297        Request: RpcRequest,
298    {
299        let request_id = uuid::Uuid::new_v4().to_string();
300        let mut response_subscription = self
301            .subscribe_typed::<IncomingRpcResponseMessage<_>>()
302            .await?;
303
304        let request_payload = RpcRequestMessage {
305            request,
306            request_id: request_id.clone(),
307            request_type: Request::NAME.to_owned(),
308        };
309
310        let message = TypedOutgoingMessage {
311            payload: request_payload,
312            destination,
313        }
314        .try_into()
315        .map_err(|e: serde_utils::DeserializeError| {
316            RequestError::RpcError(RpcError::RequestSerializationError(e.to_string()))
317        })?;
318
319        self.send(message)
320            .await
321            .map_err(|e| RequestError::Send(format!("{:?}", e)))?;
322
323        let response = loop {
324            let received = response_subscription
325                .receive(cancellation_token.clone())
326                .await
327                .map_err(RequestError::Receive)?;
328
329            if received.payload.request_id == request_id {
330                break received;
331            }
332        };
333
334        Ok(response.payload.result?)
335    }
336
337    fn handle_rpc_request(self: &Arc<Self>, incoming_message: IncomingMessage) {
338        let client = self.clone();
339        let future = async move {
340            let client = client.clone();
341
342            #[derive(Debug, Error)]
343            enum HandleError {
344                #[error("Failed to deserialize request message: {0}")]
345                Deserialize(String),
346
347                #[error("Failed to serialize response message: {0}")]
348                Serialize(String),
349            }
350
351            async fn handle(
352                incoming_message: IncomingMessage,
353                handlers: &RpcHandlerRegistry,
354            ) -> Result<OutgoingMessage, HandleError> {
355                let request = RpcRequestPayload::from_slice(incoming_message.payload.clone())
356                    .map_err(|e: serde_utils::DeserializeError| {
357                        HandleError::Deserialize(e.to_string())
358                    })?;
359
360                let response = handlers.handle(&request).await;
361
362                let response_message = OutgoingRpcResponseMessage {
363                    request_id: request.request_id(),
364                    request_type: request.request_type(),
365                    result: response,
366                };
367
368                let outgoing = TypedOutgoingMessage {
369                    payload: response_message,
370                    destination: incoming_message.source,
371                }
372                .try_into()
373                .map_err(|e: serde_utils::SerializeError| HandleError::Serialize(e.to_string()))?;
374
375                Ok(outgoing)
376            }
377
378            match handle(incoming_message, &client.handlers).await {
379                Ok(outgoing_message) => {
380                    if client.send(outgoing_message).await.is_err() {
381                        log::error!("Failed to send response message");
382                    }
383                }
384                Err(e) => {
385                    log::error!("Error handling RPC request: {:?}", e);
386                }
387            }
388        };
389
390        #[cfg(not(target_arch = "wasm32"))]
391        tokio::spawn(future);
392
393        #[cfg(target_arch = "wasm32")]
394        wasm_bindgen_futures::spawn_local(future);
395    }
396}
397
398impl IpcClientSubscription {
399    /// Receive a message, optionally filtering by topic.
400    /// Setting the cancellation_token to `None` will wait indefinitely.
401    pub async fn receive(
402        &mut self,
403        cancellation_token: Option<CancellationToken>,
404    ) -> Result<IncomingMessage, ReceiveError> {
405        let cancellation_token = cancellation_token.unwrap_or_default();
406
407        loop {
408            select! {
409                _ = cancellation_token.cancelled() => {
410                    return Err(ReceiveError::Cancelled)
411                }
412                result = self.receiver.recv() => {
413                    let received = result?;
414                    if self.topic.is_none() || received.topic == self.topic {
415                        return Ok::<IncomingMessage, ReceiveError>(received);
416                    }
417                }
418            }
419        }
420    }
421}
422
423impl<Payload> IpcClientTypedSubscription<Payload>
424where
425    Payload: DeserializeOwned + PayloadTypeName,
426{
427    /// Receive a message.
428    /// Setting the cancellation_token to `None` will wait indefinitely.
429    pub async fn receive(
430        &mut self,
431        cancellation_token: Option<CancellationToken>,
432    ) -> Result<TypedIncomingMessage<Payload>, TypedReceiveError> {
433        let received = self.0.receive(cancellation_token).await?;
434        received
435            .try_into()
436            .map_err(|e: serde_utils::DeserializeError| TypedReceiveError::Typing(e.to_string()))
437    }
438}
439
440#[cfg(test)]
441mod tests {
442    use std::{collections::HashMap, time::Duration};
443
444    use bitwarden_threading::time::sleep;
445    use serde::{Deserialize, Serialize};
446
447    use super::*;
448    use crate::{
449        endpoint::Endpoint,
450        traits::{
451            tests::TestCommunicationBackend, InMemorySessionRepository, NoEncryptionCryptoProvider,
452        },
453    };
454
455    struct TestCryptoProvider {
456        /// Simulate a send result. Set to `None` wait indefinitely
457        send_result: Option<Result<(), String>>,
458        /// Simulate a receive result. Set to `None` wait indefinitely
459        receive_result: Option<Result<IncomingMessage, String>>,
460    }
461
462    type TestSessionRepository = InMemorySessionRepository<String>;
463    impl CryptoProvider<TestCommunicationBackend, TestSessionRepository> for TestCryptoProvider {
464        type Session = String;
465        type SendError = String;
466        type ReceiveError = String;
467
468        async fn receive(
469            &self,
470            _receiver: &<TestCommunicationBackend as CommunicationBackend>::Receiver,
471            _communication: &TestCommunicationBackend,
472            _sessions: &TestSessionRepository,
473        ) -> Result<IncomingMessage, Self::ReceiveError> {
474            match &self.receive_result {
475                Some(result) => result.clone(),
476                None => {
477                    // Simulate waiting for a message but never returning
478                    sleep(Duration::from_secs(600)).await;
479                    Err("Simulated timeout".to_string())
480                }
481            }
482        }
483
484        async fn send(
485            &self,
486            _communication: &TestCommunicationBackend,
487            _sessions: &TestSessionRepository,
488            _message: OutgoingMessage,
489        ) -> Result<(), Self::SendError> {
490            match &self.send_result {
491                Some(result) => result.clone(),
492                None => {
493                    // Simulate waiting for a message to be send but never returning
494                    sleep(Duration::from_secs(600)).await;
495                    Err("Simulated timeout".to_string())
496                }
497            }
498        }
499    }
500
501    #[tokio::test]
502    async fn returns_send_error_when_crypto_provider_returns_error() {
503        let message = OutgoingMessage {
504            payload: vec![],
505            destination: Endpoint::BrowserBackground,
506            topic: None,
507        };
508        let crypto_provider = TestCryptoProvider {
509            send_result: Some(Err("Crypto error".to_string())),
510            receive_result: Some(Err("Should not have be called".to_string())),
511        };
512        let communication_provider = TestCommunicationBackend::new();
513        let session_map = TestSessionRepository::new(HashMap::new());
514        let client = IpcClient::new(crypto_provider, communication_provider, session_map);
515        client.start().await;
516
517        let error = client.send(message).await.unwrap_err();
518
519        assert_eq!(error, "Crypto error".to_string());
520    }
521
522    #[tokio::test]
523    async fn communication_provider_has_outgoing_message_when_sending_through_ipc_client() {
524        let message = OutgoingMessage {
525            payload: vec![],
526            destination: Endpoint::BrowserBackground,
527            topic: None,
528        };
529        let crypto_provider = NoEncryptionCryptoProvider;
530        let communication_provider = TestCommunicationBackend::new();
531        let session_map = InMemorySessionRepository::new(HashMap::new());
532        let client = IpcClient::new(crypto_provider, communication_provider.clone(), session_map);
533        client.start().await;
534
535        client.send(message.clone()).await.unwrap();
536
537        let outgoing_messages = communication_provider.outgoing().await;
538        assert_eq!(outgoing_messages, vec![message]);
539    }
540
541    #[tokio::test]
542    async fn returns_received_message_when_received_from_backend() {
543        let message = IncomingMessage {
544            payload: vec![],
545            source: Endpoint::Web { id: 9001 },
546            destination: Endpoint::BrowserBackground,
547            topic: None,
548        };
549        let crypto_provider = NoEncryptionCryptoProvider;
550        let communication_provider = TestCommunicationBackend::new();
551        let session_map = InMemorySessionRepository::new(HashMap::new());
552        let client = IpcClient::new(crypto_provider, communication_provider.clone(), session_map);
553        client.start().await;
554
555        let mut subscription = client
556            .subscribe(None)
557            .await
558            .expect("Subscribing should not fail");
559        communication_provider.push_incoming(message.clone());
560        let received_message = subscription.receive(None).await.unwrap();
561
562        assert_eq!(received_message, message);
563    }
564
565    #[tokio::test]
566    async fn skips_non_matching_topics_and_returns_first_matching_message() {
567        let non_matching_message = IncomingMessage {
568            payload: vec![],
569            source: Endpoint::Web { id: 9001 },
570            destination: Endpoint::BrowserBackground,
571            topic: Some("non_matching_topic".to_owned()),
572        };
573        let matching_message = IncomingMessage {
574            payload: vec![109],
575            source: Endpoint::Web { id: 9001 },
576            destination: Endpoint::BrowserBackground,
577            topic: Some("matching_topic".to_owned()),
578        };
579
580        let crypto_provider = NoEncryptionCryptoProvider;
581        let communication_provider = TestCommunicationBackend::new();
582        let session_map = InMemorySessionRepository::new(HashMap::new());
583        let client = IpcClient::new(crypto_provider, communication_provider.clone(), session_map);
584        client.start().await;
585        let mut subscription = client
586            .subscribe(Some("matching_topic".to_owned()))
587            .await
588            .expect("Subscribing should not fail");
589        communication_provider.push_incoming(non_matching_message.clone());
590        communication_provider.push_incoming(non_matching_message.clone());
591        communication_provider.push_incoming(matching_message.clone());
592
593        let received_message: IncomingMessage = subscription.receive(None).await.unwrap();
594
595        assert_eq!(received_message, matching_message);
596    }
597
598    #[tokio::test]
599    async fn skips_unrelated_messages_and_returns_typed_message() {
600        #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
601        struct TestPayload {
602            some_data: String,
603        }
604
605        impl PayloadTypeName for TestPayload {
606            const PAYLOAD_TYPE_NAME: &str = "TestPayload";
607        }
608
609        let unrelated = IncomingMessage {
610            payload: vec![],
611            source: Endpoint::Web { id: 9001 },
612            destination: Endpoint::BrowserBackground,
613            topic: None,
614        };
615        let typed_message = TypedIncomingMessage {
616            payload: TestPayload {
617                some_data: "Hello, world!".to_string(),
618            },
619            source: Endpoint::Web { id: 9001 },
620            destination: Endpoint::BrowserBackground,
621        };
622
623        let crypto_provider = NoEncryptionCryptoProvider;
624        let communication_provider = TestCommunicationBackend::new();
625        let session_map = InMemorySessionRepository::new(HashMap::new());
626        let client = IpcClient::new(crypto_provider, communication_provider.clone(), session_map);
627        client.start().await;
628        let mut subscription = client
629            .subscribe_typed::<TestPayload>()
630            .await
631            .expect("Subscribing should not fail");
632        communication_provider.push_incoming(unrelated.clone());
633        communication_provider.push_incoming(unrelated.clone());
634        communication_provider.push_incoming(
635            typed_message
636                .clone()
637                .try_into()
638                .expect("Serialization should not fail"),
639        );
640
641        let received_message = subscription.receive(None).await.unwrap();
642
643        assert_eq!(received_message, typed_message);
644    }
645
646    #[tokio::test]
647    async fn returns_error_if_related_message_was_not_deserializable() {
648        #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
649        struct TestPayload {
650            some_data: String,
651        }
652
653        impl PayloadTypeName for TestPayload {
654            const PAYLOAD_TYPE_NAME: &str = "TestPayload";
655        }
656
657        let non_deserializable_message = IncomingMessage {
658            payload: vec![],
659            source: Endpoint::Web { id: 9001 },
660            destination: Endpoint::BrowserBackground,
661            topic: Some("TestPayload".to_owned()),
662        };
663
664        let crypto_provider = NoEncryptionCryptoProvider;
665        let communication_provider = TestCommunicationBackend::new();
666        let session_map = InMemorySessionRepository::new(HashMap::new());
667        let client = IpcClient::new(crypto_provider, communication_provider.clone(), session_map);
668        client.start().await;
669        let mut subscription = client
670            .subscribe_typed::<TestPayload>()
671            .await
672            .expect("Subscribing should not fail");
673        communication_provider.push_incoming(non_deserializable_message.clone());
674
675        let result = subscription.receive(None).await;
676        assert!(matches!(result, Err(TypedReceiveError::Typing(_))));
677    }
678
679    #[tokio::test]
680    async fn ipc_client_stops_if_crypto_returns_send_error() {
681        let message = OutgoingMessage {
682            payload: vec![],
683            destination: Endpoint::BrowserBackground,
684            topic: None,
685        };
686        let crypto_provider = TestCryptoProvider {
687            send_result: Some(Err("Crypto error".to_string())),
688            receive_result: None,
689        };
690        let communication_provider = TestCommunicationBackend::new();
691        let session_map = TestSessionRepository::new(HashMap::new());
692        let client = IpcClient::new(crypto_provider, communication_provider, session_map);
693        client.start().await;
694
695        let error = client.send(message).await.unwrap_err();
696        let is_running = client.is_running().await;
697
698        assert_eq!(error, "Crypto error".to_string());
699        assert!(!is_running);
700    }
701
702    #[tokio::test]
703    async fn ipc_client_stops_if_crypto_returns_receive_error() {
704        let crypto_provider = TestCryptoProvider {
705            send_result: None,
706            receive_result: Some(Err("Crypto error".to_string())),
707        };
708        let communication_provider = TestCommunicationBackend::new();
709        let session_map = TestSessionRepository::new(HashMap::new());
710        let client = IpcClient::new(crypto_provider, communication_provider, session_map);
711        client.start().await;
712
713        // Give the client some time to process the error
714        tokio::time::sleep(Duration::from_millis(100)).await;
715        let is_running = client.is_running().await;
716
717        assert!(!is_running);
718    }
719
720    #[tokio::test]
721    async fn ipc_client_is_running_if_no_errors_are_encountered() {
722        let crypto_provider = TestCryptoProvider {
723            send_result: None,
724            receive_result: None,
725        };
726        let communication_provider = TestCommunicationBackend::new();
727        let session_map = TestSessionRepository::new(HashMap::new());
728        let client = IpcClient::new(crypto_provider, communication_provider, session_map);
729        client.start().await;
730
731        // Give the client some time to process
732        tokio::time::sleep(Duration::from_millis(100)).await;
733        let is_running = client.is_running().await;
734
735        assert!(is_running);
736    }
737
738    mod request {
739        use super::*;
740        use crate::rpc::response_message::IncomingRpcResponseMessage;
741
742        #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
743        struct TestRequest {
744            a: i32,
745            b: i32,
746        }
747
748        #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
749        struct TestResponse {
750            result: i32,
751        }
752
753        impl RpcRequest for TestRequest {
754            type Response = TestResponse;
755
756            const NAME: &str = "TestRequest";
757        }
758
759        struct TestHandler;
760
761        impl RpcHandler for TestHandler {
762            type Request = TestRequest;
763
764            async fn handle(&self, request: Self::Request) -> TestResponse {
765                TestResponse {
766                    result: request.a + request.b,
767                }
768            }
769        }
770
771        #[tokio::test]
772        async fn request_sends_message_and_returns_response() {
773            let crypto_provider = NoEncryptionCryptoProvider;
774            let communication_provider = TestCommunicationBackend::new();
775            let session_map = InMemorySessionRepository::new(HashMap::new());
776            let client =
777                IpcClient::new(crypto_provider, communication_provider.clone(), session_map);
778            client.start().await;
779            let request = TestRequest { a: 1, b: 2 };
780            let response = TestResponse { result: 3 };
781
782            // Send the request
783            let request_clone = request.clone();
784            let result_handle = tokio::spawn(async move {
785                let client = client.clone();
786                client
787                    .request::<TestRequest>(request_clone, Endpoint::BrowserBackground, None)
788                    .await
789            });
790            tokio::time::sleep(Duration::from_millis(100)).await;
791
792            // Read and verify the outgoing message
793            let outgoing_messages = communication_provider.outgoing().await;
794            let outgoing_request: RpcRequestMessage<TestRequest> =
795                serde_utils::from_slice(&outgoing_messages[0].payload)
796                    .expect("Deserialization should not fail");
797            assert_eq!(outgoing_request.request_type, "TestRequest");
798            assert_eq!(outgoing_request.request, request);
799
800            // Simulate receiving a response
801            let simulated_response = IncomingRpcResponseMessage {
802                result: Ok(response),
803                request_id: outgoing_request.request_id.clone(),
804                request_type: outgoing_request.request_type.clone(),
805            };
806            let simulated_response = IncomingMessage {
807                payload: serde_utils::to_vec(&simulated_response)
808                    .expect("Serialization should not fail"),
809                source: Endpoint::BrowserBackground,
810                destination: Endpoint::Web { id: 9001 },
811                topic: Some(
812                    IncomingRpcResponseMessage::<TestRequest>::PAYLOAD_TYPE_NAME.to_owned(),
813                ),
814            };
815            communication_provider.push_incoming(simulated_response);
816
817            // Wait for the response
818            let result = result_handle.await.unwrap();
819            assert_eq!(result.unwrap().result, 3);
820        }
821
822        #[tokio::test]
823        async fn incoming_rpc_message_handles_request_and_returns_response() {
824            let crypto_provider = NoEncryptionCryptoProvider;
825            let communication_provider = TestCommunicationBackend::new();
826            let session_map = InMemorySessionRepository::new(HashMap::new());
827            let client =
828                IpcClient::new(crypto_provider, communication_provider.clone(), session_map);
829            client.start().await;
830            let request_id = uuid::Uuid::new_v4().to_string();
831            let request = TestRequest { a: 1, b: 2 };
832            let response = TestResponse { result: 3 };
833
834            // Register the handler
835            client.register_rpc_handler(TestHandler).await;
836
837            // Simulate receiving a request
838            let simulated_request = RpcRequestMessage {
839                request,
840                request_id: request_id.clone(),
841                request_type: "TestRequest".to_string(),
842            };
843            let simulated_request_message = IncomingMessage {
844                payload: serde_utils::to_vec(&simulated_request)
845                    .expect("Serialization should not fail"),
846                source: Endpoint::Web { id: 9001 },
847                destination: Endpoint::BrowserBackground,
848                topic: Some(RPC_REQUEST_PAYLOAD_TYPE_NAME.to_owned()),
849            };
850            communication_provider.push_incoming(simulated_request_message);
851
852            // Give the client some time to process the request
853            tokio::time::sleep(Duration::from_millis(100)).await;
854
855            // Read and verify the outgoing message
856            let outgoing_messages = communication_provider.outgoing().await;
857            let outgoing_response: IncomingRpcResponseMessage<TestResponse> =
858                serde_utils::from_slice(&outgoing_messages[0].payload)
859                    .expect("Deserialization should not fail");
860
861            assert_eq!(
862                outgoing_messages[0].topic,
863                Some(IncomingRpcResponseMessage::<TestResponse>::PAYLOAD_TYPE_NAME.to_owned())
864            );
865            assert_eq!(outgoing_response.request_type, "TestRequest");
866            assert_eq!(outgoing_response.result, Ok(response));
867        }
868    }
869}