bitwarden_ipc/traits/
communication_backend.rs

1use std::fmt::Debug;
2
3use crate::message::{IncomingMessage, OutgoingMessage};
4
5/// This trait defines the interface that will be used to send and receive messages over IPC.
6/// It is up to the platform to implement this trait and any necessary thread synchronization and
7/// broadcasting.
8pub trait CommunicationBackend: Send + Sync + 'static {
9    type SendError: Debug + Send + Sync + 'static;
10    type Receiver: CommunicationBackendReceiver;
11
12    /// Send a message to the destination specified in the message. This function may be called
13    /// from any thread at any time.
14    ///
15    /// An error should only be returned for fatal and unrecoverable errors.
16    /// Returning an error will cause the IPC client to stop processing messages.
17    ///
18    /// The implementation of this trait needs to guarantee that:
19    ///     - Multiple concurrent receivers and senders can coexist.
20    fn send(
21        &self,
22        message: OutgoingMessage,
23    ) -> impl std::future::Future<Output = Result<(), Self::SendError>> + Send;
24
25    /// Subscribe to receive messages. This function will return a receiver that can be used to
26    /// receive messages asynchronously.
27    ///
28    /// The implementation of this trait needs to guarantee that:
29    ///     - Multiple concurrent receivers may be created.
30    ///     - All concurrent receivers will receive the same messages.
31    ///      - Multiple concurrent receivers and senders can coexist.
32    fn subscribe(&self) -> impl std::future::Future<Output = Self::Receiver> + Send + Sync;
33}
34
35/// This trait defines the interface for receiving messages from the communication backend.
36///
37/// The implementation of this trait needs to guarantee that:
38///     - The receiver buffers messages from the creation of the receiver until the first call to
39///       receive().
40///     - The receiver buffers messages between calls to receive().
41pub trait CommunicationBackendReceiver: Send + Sync + 'static {
42    type ReceiveError: Debug + Send + Sync + 'static;
43
44    /// Receive a message. This function will block asynchronously until a message is received.
45    ///
46    /// An error should only be returned for fatal and unrecoverable errors.
47    /// Returning an error will cause the IPC client to stop processing messages.
48    ///
49    /// Do not call this function from multiple threads at the same time. Use the subscribe function
50    /// to create one receiver per thread.
51    fn receive(
52        &self,
53    ) -> impl std::future::Future<Output = Result<IncomingMessage, Self::ReceiveError>> + Send + Sync;
54}
55
56#[cfg(test)]
57pub mod tests {
58    use std::sync::Arc;
59
60    use tokio::sync::{
61        broadcast::{self, Receiver, Sender},
62        RwLock,
63    };
64
65    use super::*;
66
67    /// A mock implementation of the CommunicationBackend trait that can be used for testing.
68    #[derive(Debug)]
69    pub struct TestCommunicationBackend {
70        outgoing_tx: Sender<OutgoingMessage>,
71        outgoing_rx: Receiver<OutgoingMessage>,
72        outgoing: Arc<RwLock<Vec<OutgoingMessage>>>,
73        incoming_tx: Sender<IncomingMessage>,
74        incoming_rx: Receiver<IncomingMessage>,
75    }
76
77    impl Clone for TestCommunicationBackend {
78        fn clone(&self) -> Self {
79            TestCommunicationBackend {
80                outgoing_tx: self.outgoing_tx.clone(),
81                outgoing_rx: self.outgoing_rx.resubscribe(),
82                outgoing: self.outgoing.clone(),
83                incoming_tx: self.incoming_tx.clone(),
84                incoming_rx: self.incoming_rx.resubscribe(),
85            }
86        }
87    }
88
89    #[derive(Debug)]
90    pub struct TestCommunicationBackendReceiver(RwLock<Receiver<IncomingMessage>>);
91
92    impl TestCommunicationBackend {
93        pub fn new() -> Self {
94            let (outgoing_tx, outgoing_rx) = broadcast::channel(10);
95            let (incoming_tx, incoming_rx) = broadcast::channel(10);
96            TestCommunicationBackend {
97                outgoing_tx,
98                outgoing_rx,
99                outgoing: Arc::new(RwLock::new(Vec::new())),
100                incoming_tx,
101                incoming_rx,
102            }
103        }
104
105        pub fn push_incoming(&self, message: IncomingMessage) {
106            self.incoming_tx
107                .send(message)
108                .expect("Failed to send incoming message");
109        }
110
111        /// Get a copy of all the outgoing messages that have been sent.
112        pub async fn outgoing(&self) -> Vec<OutgoingMessage> {
113            self.outgoing.read().await.clone()
114        }
115    }
116
117    impl CommunicationBackend for TestCommunicationBackend {
118        type SendError = ();
119        type Receiver = TestCommunicationBackendReceiver;
120
121        async fn send(&self, message: OutgoingMessage) -> Result<(), Self::SendError> {
122            self.outgoing.write().await.push(message);
123            Ok(())
124        }
125
126        async fn subscribe(&self) -> Self::Receiver {
127            TestCommunicationBackendReceiver(RwLock::new(self.incoming_rx.resubscribe()))
128        }
129    }
130
131    impl CommunicationBackendReceiver for TestCommunicationBackendReceiver {
132        type ReceiveError = ();
133
134        async fn receive(&self) -> Result<IncomingMessage, Self::ReceiveError> {
135            Ok(self
136                .0
137                .write()
138                .await
139                .recv()
140                .await
141                .expect("Failed to receive incoming message"))
142        }
143    }
144}