Skip to main content

bitwarden_ipc/traits/
communication_backend.rs

1use std::fmt::Debug;
2
3use crate::{
4    error::IpcErrorKind,
5    message::{IncomingMessage, OutgoingMessage},
6};
7
8/// This trait defines the interface that will be used to send and receive messages over IPC.
9/// It is up to the platform to implement this trait and any necessary thread synchronization and
10/// broadcasting.
11pub trait CommunicationBackend: Send + Sync + 'static {
12    type SendError: Debug + Send + Sync + 'static + IpcErrorKind;
13    type Receiver: CommunicationBackendReceiver;
14
15    /// Send a message to the destination specified in the message. This function may be called
16    /// from any thread at any time.
17    ///
18    /// Both recoverable and fatal errors may be returned, classified via
19    /// [`IpcErrorKind::is_fatal()`]. A recoverable error (e.g. a transient transport failure) is
20    /// logged and the IPC client keeps running; a fatal error stops the client from processing any
21    /// further messages. Ambiguous cases should be classified as recoverable.
22    ///
23    /// The implementation of this trait needs to guarantee that:
24    ///     - Multiple concurrent receivers and senders can coexist.
25    fn send(
26        &self,
27        message: OutgoingMessage,
28    ) -> impl std::future::Future<Output = Result<(), Self::SendError>> + Send + Sync;
29
30    /// Subscribe to receive messages. This function will return a receiver that can be used to
31    /// receive messages asynchronously.
32    ///
33    /// The implementation of this trait needs to guarantee that:
34    ///     - Multiple concurrent receivers may be created.
35    ///     - All concurrent receivers will receive the same messages.
36    ///      - Multiple concurrent receivers and senders can coexist.
37    fn subscribe(&self) -> impl std::future::Future<Output = Self::Receiver> + Send + Sync;
38}
39
40/// This trait defines the interface for receiving messages from the communication backend.
41///
42/// The implementation of this trait needs to guarantee that:
43///     - The receiver buffers messages from the creation of the receiver until the first call to
44///       receive().
45///     - The receiver buffers messages between calls to receive().
46pub trait CommunicationBackendReceiver: Send + Sync + 'static {
47    type ReceiveError: Debug + Send + Sync + 'static + IpcErrorKind;
48
49    /// Receive a message. This function will block asynchronously until a message is received.
50    ///
51    /// Both recoverable and fatal errors may be returned, classified via
52    /// [`IpcErrorKind::is_fatal()`]. A recoverable error (e.g. the receiver lagging) is logged and
53    /// the IPC client's processing loop continues; a fatal error (e.g. the channel being closed)
54    /// stops the loop. Ambiguous cases should be classified as recoverable.
55    ///
56    /// Do not call this function from multiple threads at the same time. Use the subscribe function
57    /// to create one receiver per thread.
58    fn receive(
59        &self,
60    ) -> impl std::future::Future<Output = Result<IncomingMessage, Self::ReceiveError>> + Send + Sync;
61}
62
63pub(crate) mod noop {
64    use super::*;
65
66    /// A no-op implementation of the `CommunicationBackend` trait.
67    ///
68    /// Sending discards messages silently and receiving blocks forever (the future never resolves).
69    /// This is useful as a default backend for platforms that do not need IPC communication.
70    pub struct NoopCommunicationBackend;
71
72    /// Receiver for [`NoopCommunicationBackend`] that never yields a message.
73    pub struct NoopCommunicationBackendReceiver;
74
75    impl CommunicationBackend for NoopCommunicationBackend {
76        type SendError = std::convert::Infallible;
77        type Receiver = NoopCommunicationBackendReceiver;
78
79        async fn send(&self, _message: OutgoingMessage) -> Result<(), Self::SendError> {
80            Ok(())
81        }
82
83        async fn subscribe(&self) -> Self::Receiver {
84            NoopCommunicationBackendReceiver
85        }
86    }
87
88    impl CommunicationBackendReceiver for NoopCommunicationBackendReceiver {
89        type ReceiveError = std::convert::Infallible;
90
91        async fn receive(&self) -> Result<IncomingMessage, Self::ReceiveError> {
92            std::future::pending().await
93        }
94    }
95}
96
97#[cfg(any(test, feature = "test-support"))]
98pub(crate) mod test_support {
99    use std::sync::Arc;
100
101    use tokio::sync::{
102        Mutex, RwLock,
103        broadcast::{self, Receiver, Sender},
104    };
105
106    use super::*;
107
108    /// A test implementation of the `CommunicationBackend` trait. Provides methods to inject
109    /// incoming messages and inspect outgoing messages.
110    #[derive(Debug)]
111    pub struct TestCommunicationBackend {
112        outgoing_tx: Sender<OutgoingMessage>,
113        outgoing_rx: Receiver<OutgoingMessage>,
114        outgoing: Arc<RwLock<Vec<OutgoingMessage>>>,
115        incoming_tx: Sender<IncomingMessage>,
116        incoming_rx: Receiver<IncomingMessage>,
117    }
118
119    impl Clone for TestCommunicationBackend {
120        fn clone(&self) -> Self {
121            TestCommunicationBackend {
122                outgoing_tx: self.outgoing_tx.clone(),
123                outgoing_rx: self.outgoing_rx.resubscribe(),
124                outgoing: self.outgoing.clone(),
125                incoming_tx: self.incoming_tx.clone(),
126                incoming_rx: self.incoming_rx.resubscribe(),
127            }
128        }
129    }
130
131    impl Default for TestCommunicationBackend {
132        fn default() -> Self {
133            Self::new()
134        }
135    }
136
137    /// Receiver for [`TestCommunicationBackend`].
138    #[derive(Debug)]
139    pub struct TestCommunicationBackendReceiver(RwLock<Receiver<IncomingMessage>>);
140
141    impl TestCommunicationBackend {
142        /// Create a new test communication backend.
143        pub fn new() -> Self {
144            let (outgoing_tx, outgoing_rx) = broadcast::channel(10);
145            let (incoming_tx, incoming_rx) = broadcast::channel(10);
146            TestCommunicationBackend {
147                outgoing_tx,
148                outgoing_rx,
149                outgoing: Arc::new(RwLock::new(Vec::new())),
150                incoming_tx,
151                incoming_rx,
152            }
153        }
154
155        /// Inject a message as if it were received from a remote endpoint.
156        pub fn push_incoming(&self, message: IncomingMessage) {
157            self.incoming_tx
158                .send(message)
159                .expect("Failed to send incoming message");
160        }
161
162        /// Get a copy of all the outgoing messages that have been sent.
163        pub async fn outgoing(&self) -> Vec<OutgoingMessage> {
164            self.outgoing.read().await.clone()
165        }
166
167        /// Drain all outgoing messages, returning them and clearing the internal buffer.
168        pub async fn drain_outgoing(&self) -> Vec<OutgoingMessage> {
169            self.outgoing.write().await.drain(..).collect()
170        }
171    }
172
173    impl CommunicationBackend for TestCommunicationBackend {
174        type SendError = ();
175        type Receiver = TestCommunicationBackendReceiver;
176
177        async fn send(&self, message: OutgoingMessage) -> Result<(), Self::SendError> {
178            self.outgoing.write().await.push(message);
179            Ok(())
180        }
181
182        async fn subscribe(&self) -> Self::Receiver {
183            TestCommunicationBackendReceiver(RwLock::new(self.incoming_rx.resubscribe()))
184        }
185    }
186
187    impl CommunicationBackendReceiver for TestCommunicationBackendReceiver {
188        type ReceiveError = ();
189
190        async fn receive(&self) -> Result<IncomingMessage, Self::ReceiveError> {
191            Ok(self
192                .0
193                .write()
194                .await
195                .recv()
196                .await
197                .expect("Failed to receive incoming message"))
198        }
199    }
200
201    #[allow(unused)]
202    #[derive(Clone)]
203    pub struct TestTwoWayCommunicationBackend {
204        outgoing: broadcast::Sender<OutgoingMessage>,
205        receiver: TestTwoWayCommunicationBackendReceiver,
206    }
207
208    #[allow(unused)]
209    #[derive(Clone)]
210    pub struct TestTwoWayCommunicationBackendReceiver {
211        incoming: Arc<Mutex<broadcast::Receiver<OutgoingMessage>>>,
212    }
213
214    impl CommunicationBackendReceiver for TestTwoWayCommunicationBackendReceiver {
215        type ReceiveError = ();
216
217        async fn receive(&self) -> Result<IncomingMessage, Self::ReceiveError> {
218            let mut incoming = self.incoming.lock().await;
219            let message = incoming
220                .recv()
221                .await
222                .expect("Failed to receive incoming message");
223            Ok(IncomingMessage {
224                payload: message.payload,
225                destination: message.destination,
226                source: crate::endpoint::Source::DesktopMain,
227                topic: message.topic,
228            })
229        }
230    }
231
232    impl TestTwoWayCommunicationBackend {
233        #[allow(unused)]
234        pub fn new() -> (Self, Self) {
235            let (outgoing0, incoming0) = broadcast::channel(128);
236            let (outgoing1, incoming1) = broadcast::channel(128);
237            let one = TestTwoWayCommunicationBackend {
238                outgoing: outgoing0,
239                receiver: TestTwoWayCommunicationBackendReceiver {
240                    incoming: Arc::new(Mutex::new(incoming1)),
241                },
242            };
243            let two = TestTwoWayCommunicationBackend {
244                outgoing: outgoing1,
245                receiver: TestTwoWayCommunicationBackendReceiver {
246                    incoming: Arc::new(Mutex::new(incoming0)),
247                },
248            };
249            (one, two)
250        }
251    }
252
253    impl CommunicationBackend for TestTwoWayCommunicationBackend {
254        type SendError = ();
255        type Receiver = TestTwoWayCommunicationBackendReceiver;
256
257        async fn send(&self, message: OutgoingMessage) -> Result<(), Self::SendError> {
258            self.outgoing
259                .send(message)
260                .expect("Failed to send outgoing message");
261            Ok(())
262        }
263
264        async fn subscribe(&self) -> Self::Receiver {
265            TestTwoWayCommunicationBackendReceiver {
266                incoming: Arc::new(Mutex::new(
267                    self.receiver.incoming.lock().await.resubscribe(),
268                )),
269            }
270        }
271    }
272}