1use std::sync::Arc;
2
3use bitwarden_error::bitwarden_error;
4use bitwarden_threading::cancellation_token::CancellationToken;
5use serde::de::DeserializeOwned;
6use thiserror::Error;
7use tokio::{select, sync::RwLock};
8
9use crate::{
10 constants::CHANNEL_BUFFER_CAPACITY,
11 endpoint::Endpoint,
12 message::{
13 IncomingMessage, OutgoingMessage, PayloadTypeName, TypedIncomingMessage,
14 TypedOutgoingMessage,
15 },
16 rpc::{
17 error::RpcError,
18 exec::handler_registry::RpcHandlerRegistry,
19 request::RpcRequest,
20 request_message::{RpcRequestMessage, RpcRequestPayload, RPC_REQUEST_PAYLOAD_TYPE_NAME},
21 response_message::{IncomingRpcResponseMessage, OutgoingRpcResponseMessage},
22 },
23 serde_utils,
24 traits::{CommunicationBackend, CryptoProvider, SessionRepository},
25 RpcHandler,
26};
27
28pub struct IpcClient<Crypto, Com, Ses>
32where
33 Crypto: CryptoProvider<Com, Ses>,
34 Com: CommunicationBackend,
35 Ses: SessionRepository<Crypto::Session>,
36{
37 crypto: Crypto,
38 communication: Com,
39 sessions: Ses,
40
41 handlers: RpcHandlerRegistry,
42 incoming: RwLock<Option<tokio::sync::broadcast::Receiver<IncomingMessage>>>,
43 cancellation_token: RwLock<Option<CancellationToken>>,
44}
45
46pub struct IpcClientSubscription {
51 receiver: tokio::sync::broadcast::Receiver<IncomingMessage>,
52 topic: Option<String>,
53}
54
55pub struct IpcClientTypedSubscription<Payload: DeserializeOwned + PayloadTypeName>(
60 IpcClientSubscription,
61 std::marker::PhantomData<Payload>,
62);
63
64#[derive(Debug, Error, Clone, PartialEq, Eq)]
65#[bitwarden_error(flat)]
66#[allow(missing_docs)]
67pub enum SubscribeError {
68 #[error("The IPC processing thread is not running")]
69 NotStarted,
70}
71
72#[derive(Debug, Error, PartialEq, Eq)]
73#[bitwarden_error(flat)]
74#[allow(missing_docs)]
75pub enum ReceiveError {
76 #[error("Failed to subscribe to the IPC channel: {0}")]
77 Channel(#[from] tokio::sync::broadcast::error::RecvError),
78
79 #[error("Timed out while waiting for a message: {0}")]
80 Timeout(#[from] tokio::time::error::Elapsed),
81
82 #[error("Cancelled while waiting for a message")]
83 Cancelled,
84}
85
86#[derive(Debug, Error, PartialEq, Eq)]
87#[bitwarden_error(flat)]
88#[allow(missing_docs)]
89pub enum TypedReceiveError {
90 #[error("Failed to subscribe to the IPC channel: {0}")]
91 Channel(#[from] tokio::sync::broadcast::error::RecvError),
92
93 #[error("Timed out while waiting for a message: {0}")]
94 Timeout(#[from] tokio::time::error::Elapsed),
95
96 #[error("Cancelled while waiting for a message")]
97 Cancelled,
98
99 #[error("Typing error: {0}")]
100 Typing(String),
101}
102
103impl From<ReceiveError> for TypedReceiveError {
104 fn from(value: ReceiveError) -> Self {
105 match value {
106 ReceiveError::Channel(e) => TypedReceiveError::Channel(e),
107 ReceiveError::Timeout(e) => TypedReceiveError::Timeout(e),
108 ReceiveError::Cancelled => TypedReceiveError::Cancelled,
109 }
110 }
111}
112
113#[derive(Debug, Error, PartialEq, Eq)]
114#[bitwarden_error(flat)]
115#[allow(missing_docs)]
116pub enum RequestError {
117 #[error(transparent)]
118 Subscribe(#[from] SubscribeError),
119
120 #[error(transparent)]
121 Receive(#[from] TypedReceiveError),
122
123 #[error("Timed out while waiting for a message: {0}")]
124 Timeout(#[from] tokio::time::error::Elapsed),
125
126 #[error("Failed to send message: {0}")]
127 Send(String),
128
129 #[error("Error occured on the remote target: {0}")]
130 RpcError(#[from] RpcError),
131}
132
133impl<Crypto, Com, Ses> IpcClient<Crypto, Com, Ses>
134where
135 Crypto: CryptoProvider<Com, Ses>,
136 Com: CommunicationBackend,
137 Ses: SessionRepository<Crypto::Session>,
138{
139 pub fn new(crypto: Crypto, communication: Com, sessions: Ses) -> Arc<Self> {
142 Arc::new(Self {
143 crypto,
144 communication,
145 sessions,
146
147 handlers: RpcHandlerRegistry::new(),
148 incoming: RwLock::new(None),
149 cancellation_token: RwLock::new(None),
150 })
151 }
152
153 pub async fn start(self: &Arc<Self>) {
161 let cancellation_token = CancellationToken::new();
162 self.cancellation_token
163 .write()
164 .await
165 .replace(cancellation_token.clone());
166
167 let com_receiver = self.communication.subscribe().await;
168 let (client_tx, client_rx) = tokio::sync::broadcast::channel(CHANNEL_BUFFER_CAPACITY);
169
170 self.incoming.write().await.replace(client_rx);
171
172 let client = self.clone();
173 let future = async move {
174 loop {
175 let rpc_topic = RPC_REQUEST_PAYLOAD_TYPE_NAME.to_owned();
176 select! {
177 _ = cancellation_token.cancelled() => {
178 log::debug!("Cancellation signal received, stopping IPC client");
179 break;
180 }
181 received = client.crypto.receive(&com_receiver, &client.communication, &client.sessions) => {
182 match received {
183 Ok(message) if message.topic == Some(rpc_topic) => {
184 client.handle_rpc_request(message)
185 }
186 Ok(message) => {
187 if client_tx.send(message).is_err() {
188 log::error!("Failed to save incoming message");
189 break;
190 };
191 }
192 Err(e) => {
193 log::error!("Error receiving message: {:?}", e);
194 break;
195 }
196 }
197 }
198 }
199 }
200 log::debug!("IPC client shutting down");
201 client.stop().await;
202 };
203
204 #[cfg(not(target_arch = "wasm32"))]
205 tokio::spawn(future);
206
207 #[cfg(target_arch = "wasm32")]
208 wasm_bindgen_futures::spawn_local(future);
209 }
210
211 pub async fn is_running(self: &Arc<Self>) -> bool {
213 let has_incoming = self.incoming.read().await.is_some();
214 let has_cancellation_token = self.cancellation_token.read().await.is_some();
215 has_incoming && has_cancellation_token
216 }
217
218 pub async fn stop(self: &Arc<Self>) {
220 let mut incoming = self.incoming.write().await;
221 let _ = incoming.take();
222
223 let mut cancellation_token = self.cancellation_token.write().await;
224 if let Some(cancellation_token) = cancellation_token.take() {
225 cancellation_token.cancel();
226 }
227 }
228
229 pub async fn register_rpc_handler<H>(self: &Arc<Self>, handler: H)
233 where
234 H: RpcHandler + Send + Sync + 'static,
235 {
236 self.handlers.register(handler).await;
237 }
238
239 pub async fn send(self: &Arc<Self>, message: OutgoingMessage) -> Result<(), Crypto::SendError> {
241 let result = self
242 .crypto
243 .send(&self.communication, &self.sessions, message)
244 .await;
245
246 if result.is_err() {
247 log::error!("Error sending message: {:?}", result);
248 self.stop().await;
249 }
250
251 result
252 }
253
254 pub async fn subscribe(
257 self: &Arc<Self>,
258 topic: Option<String>,
259 ) -> Result<IpcClientSubscription, SubscribeError> {
260 Ok(IpcClientSubscription {
261 receiver: self
262 .incoming
263 .read()
264 .await
265 .as_ref()
266 .ok_or(SubscribeError::NotStarted)?
267 .resubscribe(),
268 topic,
269 })
270 }
271
272 pub async fn subscribe_typed<Payload>(
275 self: &Arc<Self>,
276 ) -> Result<IpcClientTypedSubscription<Payload>, SubscribeError>
277 where
278 Payload: DeserializeOwned + PayloadTypeName,
279 {
280 Ok(IpcClientTypedSubscription(
281 self.subscribe(Some(Payload::PAYLOAD_TYPE_NAME.to_owned()))
282 .await?,
283 std::marker::PhantomData,
284 ))
285 }
286
287 pub async fn request<Request>(
291 self: &Arc<Self>,
292 request: Request,
293 destination: Endpoint,
294 cancellation_token: Option<CancellationToken>,
295 ) -> Result<Request::Response, RequestError>
296 where
297 Request: RpcRequest,
298 {
299 let request_id = uuid::Uuid::new_v4().to_string();
300 let mut response_subscription = self
301 .subscribe_typed::<IncomingRpcResponseMessage<_>>()
302 .await?;
303
304 let request_payload = RpcRequestMessage {
305 request,
306 request_id: request_id.clone(),
307 request_type: Request::NAME.to_owned(),
308 };
309
310 let message = TypedOutgoingMessage {
311 payload: request_payload,
312 destination,
313 }
314 .try_into()
315 .map_err(|e: serde_utils::DeserializeError| {
316 RequestError::RpcError(RpcError::RequestSerializationError(e.to_string()))
317 })?;
318
319 self.send(message)
320 .await
321 .map_err(|e| RequestError::Send(format!("{:?}", e)))?;
322
323 let response = loop {
324 let received = response_subscription
325 .receive(cancellation_token.clone())
326 .await
327 .map_err(RequestError::Receive)?;
328
329 if received.payload.request_id == request_id {
330 break received;
331 }
332 };
333
334 Ok(response.payload.result?)
335 }
336
337 fn handle_rpc_request(self: &Arc<Self>, incoming_message: IncomingMessage) {
338 let client = self.clone();
339 let future = async move {
340 let client = client.clone();
341
342 #[derive(Debug, Error)]
343 enum HandleError {
344 #[error("Failed to deserialize request message: {0}")]
345 Deserialize(String),
346
347 #[error("Failed to serialize response message: {0}")]
348 Serialize(String),
349 }
350
351 async fn handle(
352 incoming_message: IncomingMessage,
353 handlers: &RpcHandlerRegistry,
354 ) -> Result<OutgoingMessage, HandleError> {
355 let request = RpcRequestPayload::from_slice(incoming_message.payload.clone())
356 .map_err(|e: serde_utils::DeserializeError| {
357 HandleError::Deserialize(e.to_string())
358 })?;
359
360 let response = handlers.handle(&request).await;
361
362 let response_message = OutgoingRpcResponseMessage {
363 request_id: request.request_id(),
364 request_type: request.request_type(),
365 result: response,
366 };
367
368 let outgoing = TypedOutgoingMessage {
369 payload: response_message,
370 destination: incoming_message.source,
371 }
372 .try_into()
373 .map_err(|e: serde_utils::SerializeError| HandleError::Serialize(e.to_string()))?;
374
375 Ok(outgoing)
376 }
377
378 match handle(incoming_message, &client.handlers).await {
379 Ok(outgoing_message) => {
380 if client.send(outgoing_message).await.is_err() {
381 log::error!("Failed to send response message");
382 }
383 }
384 Err(e) => {
385 log::error!("Error handling RPC request: {:?}", e);
386 }
387 }
388 };
389
390 #[cfg(not(target_arch = "wasm32"))]
391 tokio::spawn(future);
392
393 #[cfg(target_arch = "wasm32")]
394 wasm_bindgen_futures::spawn_local(future);
395 }
396}
397
398impl IpcClientSubscription {
399 pub async fn receive(
402 &mut self,
403 cancellation_token: Option<CancellationToken>,
404 ) -> Result<IncomingMessage, ReceiveError> {
405 let cancellation_token = cancellation_token.unwrap_or_default();
406
407 loop {
408 select! {
409 _ = cancellation_token.cancelled() => {
410 return Err(ReceiveError::Cancelled)
411 }
412 result = self.receiver.recv() => {
413 let received = result?;
414 if self.topic.is_none() || received.topic == self.topic {
415 return Ok::<IncomingMessage, ReceiveError>(received);
416 }
417 }
418 }
419 }
420 }
421}
422
423impl<Payload> IpcClientTypedSubscription<Payload>
424where
425 Payload: DeserializeOwned + PayloadTypeName,
426{
427 pub async fn receive(
430 &mut self,
431 cancellation_token: Option<CancellationToken>,
432 ) -> Result<TypedIncomingMessage<Payload>, TypedReceiveError> {
433 let received = self.0.receive(cancellation_token).await?;
434 received
435 .try_into()
436 .map_err(|e: serde_utils::DeserializeError| TypedReceiveError::Typing(e.to_string()))
437 }
438}
439
440#[cfg(test)]
441mod tests {
442 use std::{collections::HashMap, time::Duration};
443
444 use bitwarden_threading::time::sleep;
445 use serde::{Deserialize, Serialize};
446
447 use super::*;
448 use crate::{
449 endpoint::Endpoint,
450 traits::{
451 tests::TestCommunicationBackend, InMemorySessionRepository, NoEncryptionCryptoProvider,
452 },
453 };
454
455 struct TestCryptoProvider {
456 send_result: Option<Result<(), String>>,
458 receive_result: Option<Result<IncomingMessage, String>>,
460 }
461
462 type TestSessionRepository = InMemorySessionRepository<String>;
463 impl CryptoProvider<TestCommunicationBackend, TestSessionRepository> for TestCryptoProvider {
464 type Session = String;
465 type SendError = String;
466 type ReceiveError = String;
467
468 async fn receive(
469 &self,
470 _receiver: &<TestCommunicationBackend as CommunicationBackend>::Receiver,
471 _communication: &TestCommunicationBackend,
472 _sessions: &TestSessionRepository,
473 ) -> Result<IncomingMessage, Self::ReceiveError> {
474 match &self.receive_result {
475 Some(result) => result.clone(),
476 None => {
477 sleep(Duration::from_secs(600)).await;
479 Err("Simulated timeout".to_string())
480 }
481 }
482 }
483
484 async fn send(
485 &self,
486 _communication: &TestCommunicationBackend,
487 _sessions: &TestSessionRepository,
488 _message: OutgoingMessage,
489 ) -> Result<(), Self::SendError> {
490 match &self.send_result {
491 Some(result) => result.clone(),
492 None => {
493 sleep(Duration::from_secs(600)).await;
495 Err("Simulated timeout".to_string())
496 }
497 }
498 }
499 }
500
501 #[tokio::test]
502 async fn returns_send_error_when_crypto_provider_returns_error() {
503 let message = OutgoingMessage {
504 payload: vec![],
505 destination: Endpoint::BrowserBackground,
506 topic: None,
507 };
508 let crypto_provider = TestCryptoProvider {
509 send_result: Some(Err("Crypto error".to_string())),
510 receive_result: Some(Err("Should not have be called".to_string())),
511 };
512 let communication_provider = TestCommunicationBackend::new();
513 let session_map = TestSessionRepository::new(HashMap::new());
514 let client = IpcClient::new(crypto_provider, communication_provider, session_map);
515 client.start().await;
516
517 let error = client.send(message).await.unwrap_err();
518
519 assert_eq!(error, "Crypto error".to_string());
520 }
521
522 #[tokio::test]
523 async fn communication_provider_has_outgoing_message_when_sending_through_ipc_client() {
524 let message = OutgoingMessage {
525 payload: vec![],
526 destination: Endpoint::BrowserBackground,
527 topic: None,
528 };
529 let crypto_provider = NoEncryptionCryptoProvider;
530 let communication_provider = TestCommunicationBackend::new();
531 let session_map = InMemorySessionRepository::new(HashMap::new());
532 let client = IpcClient::new(crypto_provider, communication_provider.clone(), session_map);
533 client.start().await;
534
535 client.send(message.clone()).await.unwrap();
536
537 let outgoing_messages = communication_provider.outgoing().await;
538 assert_eq!(outgoing_messages, vec![message]);
539 }
540
541 #[tokio::test]
542 async fn returns_received_message_when_received_from_backend() {
543 let message = IncomingMessage {
544 payload: vec![],
545 source: Endpoint::Web { id: 9001 },
546 destination: Endpoint::BrowserBackground,
547 topic: None,
548 };
549 let crypto_provider = NoEncryptionCryptoProvider;
550 let communication_provider = TestCommunicationBackend::new();
551 let session_map = InMemorySessionRepository::new(HashMap::new());
552 let client = IpcClient::new(crypto_provider, communication_provider.clone(), session_map);
553 client.start().await;
554
555 let mut subscription = client
556 .subscribe(None)
557 .await
558 .expect("Subscribing should not fail");
559 communication_provider.push_incoming(message.clone());
560 let received_message = subscription.receive(None).await.unwrap();
561
562 assert_eq!(received_message, message);
563 }
564
565 #[tokio::test]
566 async fn skips_non_matching_topics_and_returns_first_matching_message() {
567 let non_matching_message = IncomingMessage {
568 payload: vec![],
569 source: Endpoint::Web { id: 9001 },
570 destination: Endpoint::BrowserBackground,
571 topic: Some("non_matching_topic".to_owned()),
572 };
573 let matching_message = IncomingMessage {
574 payload: vec![109],
575 source: Endpoint::Web { id: 9001 },
576 destination: Endpoint::BrowserBackground,
577 topic: Some("matching_topic".to_owned()),
578 };
579
580 let crypto_provider = NoEncryptionCryptoProvider;
581 let communication_provider = TestCommunicationBackend::new();
582 let session_map = InMemorySessionRepository::new(HashMap::new());
583 let client = IpcClient::new(crypto_provider, communication_provider.clone(), session_map);
584 client.start().await;
585 let mut subscription = client
586 .subscribe(Some("matching_topic".to_owned()))
587 .await
588 .expect("Subscribing should not fail");
589 communication_provider.push_incoming(non_matching_message.clone());
590 communication_provider.push_incoming(non_matching_message.clone());
591 communication_provider.push_incoming(matching_message.clone());
592
593 let received_message: IncomingMessage = subscription.receive(None).await.unwrap();
594
595 assert_eq!(received_message, matching_message);
596 }
597
598 #[tokio::test]
599 async fn skips_unrelated_messages_and_returns_typed_message() {
600 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
601 struct TestPayload {
602 some_data: String,
603 }
604
605 impl PayloadTypeName for TestPayload {
606 const PAYLOAD_TYPE_NAME: &str = "TestPayload";
607 }
608
609 let unrelated = IncomingMessage {
610 payload: vec![],
611 source: Endpoint::Web { id: 9001 },
612 destination: Endpoint::BrowserBackground,
613 topic: None,
614 };
615 let typed_message = TypedIncomingMessage {
616 payload: TestPayload {
617 some_data: "Hello, world!".to_string(),
618 },
619 source: Endpoint::Web { id: 9001 },
620 destination: Endpoint::BrowserBackground,
621 };
622
623 let crypto_provider = NoEncryptionCryptoProvider;
624 let communication_provider = TestCommunicationBackend::new();
625 let session_map = InMemorySessionRepository::new(HashMap::new());
626 let client = IpcClient::new(crypto_provider, communication_provider.clone(), session_map);
627 client.start().await;
628 let mut subscription = client
629 .subscribe_typed::<TestPayload>()
630 .await
631 .expect("Subscribing should not fail");
632 communication_provider.push_incoming(unrelated.clone());
633 communication_provider.push_incoming(unrelated.clone());
634 communication_provider.push_incoming(
635 typed_message
636 .clone()
637 .try_into()
638 .expect("Serialization should not fail"),
639 );
640
641 let received_message = subscription.receive(None).await.unwrap();
642
643 assert_eq!(received_message, typed_message);
644 }
645
646 #[tokio::test]
647 async fn returns_error_if_related_message_was_not_deserializable() {
648 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
649 struct TestPayload {
650 some_data: String,
651 }
652
653 impl PayloadTypeName for TestPayload {
654 const PAYLOAD_TYPE_NAME: &str = "TestPayload";
655 }
656
657 let non_deserializable_message = IncomingMessage {
658 payload: vec![],
659 source: Endpoint::Web { id: 9001 },
660 destination: Endpoint::BrowserBackground,
661 topic: Some("TestPayload".to_owned()),
662 };
663
664 let crypto_provider = NoEncryptionCryptoProvider;
665 let communication_provider = TestCommunicationBackend::new();
666 let session_map = InMemorySessionRepository::new(HashMap::new());
667 let client = IpcClient::new(crypto_provider, communication_provider.clone(), session_map);
668 client.start().await;
669 let mut subscription = client
670 .subscribe_typed::<TestPayload>()
671 .await
672 .expect("Subscribing should not fail");
673 communication_provider.push_incoming(non_deserializable_message.clone());
674
675 let result = subscription.receive(None).await;
676 assert!(matches!(result, Err(TypedReceiveError::Typing(_))));
677 }
678
679 #[tokio::test]
680 async fn ipc_client_stops_if_crypto_returns_send_error() {
681 let message = OutgoingMessage {
682 payload: vec![],
683 destination: Endpoint::BrowserBackground,
684 topic: None,
685 };
686 let crypto_provider = TestCryptoProvider {
687 send_result: Some(Err("Crypto error".to_string())),
688 receive_result: None,
689 };
690 let communication_provider = TestCommunicationBackend::new();
691 let session_map = TestSessionRepository::new(HashMap::new());
692 let client = IpcClient::new(crypto_provider, communication_provider, session_map);
693 client.start().await;
694
695 let error = client.send(message).await.unwrap_err();
696 let is_running = client.is_running().await;
697
698 assert_eq!(error, "Crypto error".to_string());
699 assert!(!is_running);
700 }
701
702 #[tokio::test]
703 async fn ipc_client_stops_if_crypto_returns_receive_error() {
704 let crypto_provider = TestCryptoProvider {
705 send_result: None,
706 receive_result: Some(Err("Crypto error".to_string())),
707 };
708 let communication_provider = TestCommunicationBackend::new();
709 let session_map = TestSessionRepository::new(HashMap::new());
710 let client = IpcClient::new(crypto_provider, communication_provider, session_map);
711 client.start().await;
712
713 tokio::time::sleep(Duration::from_millis(100)).await;
715 let is_running = client.is_running().await;
716
717 assert!(!is_running);
718 }
719
720 #[tokio::test]
721 async fn ipc_client_is_running_if_no_errors_are_encountered() {
722 let crypto_provider = TestCryptoProvider {
723 send_result: None,
724 receive_result: None,
725 };
726 let communication_provider = TestCommunicationBackend::new();
727 let session_map = TestSessionRepository::new(HashMap::new());
728 let client = IpcClient::new(crypto_provider, communication_provider, session_map);
729 client.start().await;
730
731 tokio::time::sleep(Duration::from_millis(100)).await;
733 let is_running = client.is_running().await;
734
735 assert!(is_running);
736 }
737
738 mod request {
739 use super::*;
740 use crate::rpc::response_message::IncomingRpcResponseMessage;
741
742 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
743 struct TestRequest {
744 a: i32,
745 b: i32,
746 }
747
748 #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
749 struct TestResponse {
750 result: i32,
751 }
752
753 impl RpcRequest for TestRequest {
754 type Response = TestResponse;
755
756 const NAME: &str = "TestRequest";
757 }
758
759 struct TestHandler;
760
761 impl RpcHandler for TestHandler {
762 type Request = TestRequest;
763
764 async fn handle(&self, request: Self::Request) -> TestResponse {
765 TestResponse {
766 result: request.a + request.b,
767 }
768 }
769 }
770
771 #[tokio::test]
772 async fn request_sends_message_and_returns_response() {
773 let crypto_provider = NoEncryptionCryptoProvider;
774 let communication_provider = TestCommunicationBackend::new();
775 let session_map = InMemorySessionRepository::new(HashMap::new());
776 let client =
777 IpcClient::new(crypto_provider, communication_provider.clone(), session_map);
778 client.start().await;
779 let request = TestRequest { a: 1, b: 2 };
780 let response = TestResponse { result: 3 };
781
782 let request_clone = request.clone();
784 let result_handle = tokio::spawn(async move {
785 let client = client.clone();
786 client
787 .request::<TestRequest>(request_clone, Endpoint::BrowserBackground, None)
788 .await
789 });
790 tokio::time::sleep(Duration::from_millis(100)).await;
791
792 let outgoing_messages = communication_provider.outgoing().await;
794 let outgoing_request: RpcRequestMessage<TestRequest> =
795 serde_utils::from_slice(&outgoing_messages[0].payload)
796 .expect("Deserialization should not fail");
797 assert_eq!(outgoing_request.request_type, "TestRequest");
798 assert_eq!(outgoing_request.request, request);
799
800 let simulated_response = IncomingRpcResponseMessage {
802 result: Ok(response),
803 request_id: outgoing_request.request_id.clone(),
804 request_type: outgoing_request.request_type.clone(),
805 };
806 let simulated_response = IncomingMessage {
807 payload: serde_utils::to_vec(&simulated_response)
808 .expect("Serialization should not fail"),
809 source: Endpoint::BrowserBackground,
810 destination: Endpoint::Web { id: 9001 },
811 topic: Some(
812 IncomingRpcResponseMessage::<TestRequest>::PAYLOAD_TYPE_NAME.to_owned(),
813 ),
814 };
815 communication_provider.push_incoming(simulated_response);
816
817 let result = result_handle.await.unwrap();
819 assert_eq!(result.unwrap().result, 3);
820 }
821
822 #[tokio::test]
823 async fn incoming_rpc_message_handles_request_and_returns_response() {
824 let crypto_provider = NoEncryptionCryptoProvider;
825 let communication_provider = TestCommunicationBackend::new();
826 let session_map = InMemorySessionRepository::new(HashMap::new());
827 let client =
828 IpcClient::new(crypto_provider, communication_provider.clone(), session_map);
829 client.start().await;
830 let request_id = uuid::Uuid::new_v4().to_string();
831 let request = TestRequest { a: 1, b: 2 };
832 let response = TestResponse { result: 3 };
833
834 client.register_rpc_handler(TestHandler).await;
836
837 let simulated_request = RpcRequestMessage {
839 request,
840 request_id: request_id.clone(),
841 request_type: "TestRequest".to_string(),
842 };
843 let simulated_request_message = IncomingMessage {
844 payload: serde_utils::to_vec(&simulated_request)
845 .expect("Serialization should not fail"),
846 source: Endpoint::Web { id: 9001 },
847 destination: Endpoint::BrowserBackground,
848 topic: Some(RPC_REQUEST_PAYLOAD_TYPE_NAME.to_owned()),
849 };
850 communication_provider.push_incoming(simulated_request_message);
851
852 tokio::time::sleep(Duration::from_millis(100)).await;
854
855 let outgoing_messages = communication_provider.outgoing().await;
857 let outgoing_response: IncomingRpcResponseMessage<TestResponse> =
858 serde_utils::from_slice(&outgoing_messages[0].payload)
859 .expect("Deserialization should not fail");
860
861 assert_eq!(
862 outgoing_messages[0].topic,
863 Some(IncomingRpcResponseMessage::<TestResponse>::PAYLOAD_TYPE_NAME.to_owned())
864 );
865 assert_eq!(outgoing_response.request_type, "TestRequest");
866 assert_eq!(outgoing_response.result, Ok(response));
867 }
868 }
869}