Skip to main content

bitwarden_ipc/traits/
communication_backend.rs

1use std::fmt::Debug;
2
3use crate::message::{IncomingMessage, OutgoingMessage};
4
5/// This trait defines the interface that will be used to send and receive messages over IPC.
6/// It is up to the platform to implement this trait and any necessary thread synchronization and
7/// broadcasting.
8pub trait CommunicationBackend: Send + Sync + 'static {
9    type SendError: Debug + Send + Sync + 'static;
10    type Receiver: CommunicationBackendReceiver;
11
12    /// Send a message to the destination specified in the message. This function may be called
13    /// from any thread at any time.
14    ///
15    /// An error should only be returned for fatal and unrecoverable errors.
16    /// Returning an error will cause the IPC client to stop processing messages.
17    ///
18    /// The implementation of this trait needs to guarantee that:
19    ///     - Multiple concurrent receivers and senders can coexist.
20    fn send(
21        &self,
22        message: OutgoingMessage,
23    ) -> impl std::future::Future<Output = Result<(), Self::SendError>> + Send;
24
25    /// Subscribe to receive messages. This function will return a receiver that can be used to
26    /// receive messages asynchronously.
27    ///
28    /// The implementation of this trait needs to guarantee that:
29    ///     - Multiple concurrent receivers may be created.
30    ///     - All concurrent receivers will receive the same messages.
31    ///      - Multiple concurrent receivers and senders can coexist.
32    fn subscribe(&self) -> impl std::future::Future<Output = Self::Receiver> + Send + Sync;
33}
34
35/// This trait defines the interface for receiving messages from the communication backend.
36///
37/// The implementation of this trait needs to guarantee that:
38///     - The receiver buffers messages from the creation of the receiver until the first call to
39///       receive().
40///     - The receiver buffers messages between calls to receive().
41pub trait CommunicationBackendReceiver: Send + Sync + 'static {
42    type ReceiveError: Debug + Send + Sync + 'static;
43
44    /// Receive a message. This function will block asynchronously until a message is received.
45    ///
46    /// An error should only be returned for fatal and unrecoverable errors.
47    /// Returning an error will cause the IPC client to stop processing messages.
48    ///
49    /// Do not call this function from multiple threads at the same time. Use the subscribe function
50    /// to create one receiver per thread.
51    fn receive(
52        &self,
53    ) -> impl std::future::Future<Output = Result<IncomingMessage, Self::ReceiveError>> + Send + Sync;
54}
55
56pub(crate) mod noop {
57    use super::*;
58
59    /// A no-op implementation of the `CommunicationBackend` trait.
60    ///
61    /// Sending discards messages silently and receiving blocks forever (the future never resolves).
62    /// This is useful as a default backend for platforms that do not need IPC communication.
63    pub struct NoopCommunicationBackend;
64
65    /// Receiver for [`NoopCommunicationBackend`] that never yields a message.
66    pub struct NoopCommunicationBackendReceiver;
67
68    impl CommunicationBackend for NoopCommunicationBackend {
69        type SendError = std::convert::Infallible;
70        type Receiver = NoopCommunicationBackendReceiver;
71
72        async fn send(&self, _message: OutgoingMessage) -> Result<(), Self::SendError> {
73            Ok(())
74        }
75
76        async fn subscribe(&self) -> Self::Receiver {
77            NoopCommunicationBackendReceiver
78        }
79    }
80
81    impl CommunicationBackendReceiver for NoopCommunicationBackendReceiver {
82        type ReceiveError = std::convert::Infallible;
83
84        async fn receive(&self) -> Result<IncomingMessage, Self::ReceiveError> {
85            std::future::pending().await
86        }
87    }
88}
89
90#[cfg(any(test, feature = "test-support"))]
91pub(crate) mod test_support {
92    use std::sync::Arc;
93
94    use tokio::sync::{
95        RwLock,
96        broadcast::{self, Receiver, Sender},
97    };
98
99    use super::*;
100
101    /// A test implementation of the `CommunicationBackend` trait. Provides methods to inject
102    /// incoming messages and inspect outgoing messages.
103    #[derive(Debug)]
104    pub struct TestCommunicationBackend {
105        outgoing_tx: Sender<OutgoingMessage>,
106        outgoing_rx: Receiver<OutgoingMessage>,
107        outgoing: Arc<RwLock<Vec<OutgoingMessage>>>,
108        incoming_tx: Sender<IncomingMessage>,
109        incoming_rx: Receiver<IncomingMessage>,
110    }
111
112    impl Clone for TestCommunicationBackend {
113        fn clone(&self) -> Self {
114            TestCommunicationBackend {
115                outgoing_tx: self.outgoing_tx.clone(),
116                outgoing_rx: self.outgoing_rx.resubscribe(),
117                outgoing: self.outgoing.clone(),
118                incoming_tx: self.incoming_tx.clone(),
119                incoming_rx: self.incoming_rx.resubscribe(),
120            }
121        }
122    }
123
124    impl Default for TestCommunicationBackend {
125        fn default() -> Self {
126            Self::new()
127        }
128    }
129
130    /// Receiver for [`TestCommunicationBackend`].
131    #[derive(Debug)]
132    pub struct TestCommunicationBackendReceiver(RwLock<Receiver<IncomingMessage>>);
133
134    impl TestCommunicationBackend {
135        /// Create a new test communication backend.
136        pub fn new() -> Self {
137            let (outgoing_tx, outgoing_rx) = broadcast::channel(10);
138            let (incoming_tx, incoming_rx) = broadcast::channel(10);
139            TestCommunicationBackend {
140                outgoing_tx,
141                outgoing_rx,
142                outgoing: Arc::new(RwLock::new(Vec::new())),
143                incoming_tx,
144                incoming_rx,
145            }
146        }
147
148        /// Inject a message as if it were received from a remote endpoint.
149        pub fn push_incoming(&self, message: IncomingMessage) {
150            self.incoming_tx
151                .send(message)
152                .expect("Failed to send incoming message");
153        }
154
155        /// Get a copy of all the outgoing messages that have been sent.
156        pub async fn outgoing(&self) -> Vec<OutgoingMessage> {
157            self.outgoing.read().await.clone()
158        }
159
160        /// Drain all outgoing messages, returning them and clearing the internal buffer.
161        pub async fn drain_outgoing(&self) -> Vec<OutgoingMessage> {
162            self.outgoing.write().await.drain(..).collect()
163        }
164    }
165
166    impl CommunicationBackend for TestCommunicationBackend {
167        type SendError = ();
168        type Receiver = TestCommunicationBackendReceiver;
169
170        async fn send(&self, message: OutgoingMessage) -> Result<(), Self::SendError> {
171            self.outgoing.write().await.push(message);
172            Ok(())
173        }
174
175        async fn subscribe(&self) -> Self::Receiver {
176            TestCommunicationBackendReceiver(RwLock::new(self.incoming_rx.resubscribe()))
177        }
178    }
179
180    impl CommunicationBackendReceiver for TestCommunicationBackendReceiver {
181        type ReceiveError = ();
182
183        async fn receive(&self) -> Result<IncomingMessage, Self::ReceiveError> {
184            Ok(self
185                .0
186                .write()
187                .await
188                .recv()
189                .await
190                .expect("Failed to receive incoming message"))
191        }
192    }
193}