bitwarden_ipc/traits/
communication_backend.rs1use std::fmt::Debug;
2
3use crate::message::{IncomingMessage, OutgoingMessage};
4
5pub trait CommunicationBackend: Send + Sync + 'static {
9 type SendError: Debug + Send + Sync + 'static;
10 type Receiver: CommunicationBackendReceiver;
11
12 fn send(
21 &self,
22 message: OutgoingMessage,
23 ) -> impl std::future::Future<Output = Result<(), Self::SendError>> + Send + Sync;
24
25 fn subscribe(&self) -> impl std::future::Future<Output = Self::Receiver> + Send + Sync;
33}
34
35pub trait CommunicationBackendReceiver: Send + Sync + 'static {
42 type ReceiveError: Debug + Send + Sync + 'static;
43
44 fn receive(
52 &self,
53 ) -> impl std::future::Future<Output = Result<IncomingMessage, Self::ReceiveError>> + Send + Sync;
54}
55
56pub(crate) mod noop {
57 use super::*;
58
59 pub struct NoopCommunicationBackend;
64
65 pub struct NoopCommunicationBackendReceiver;
67
68 impl CommunicationBackend for NoopCommunicationBackend {
69 type SendError = std::convert::Infallible;
70 type Receiver = NoopCommunicationBackendReceiver;
71
72 async fn send(&self, _message: OutgoingMessage) -> Result<(), Self::SendError> {
73 Ok(())
74 }
75
76 async fn subscribe(&self) -> Self::Receiver {
77 NoopCommunicationBackendReceiver
78 }
79 }
80
81 impl CommunicationBackendReceiver for NoopCommunicationBackendReceiver {
82 type ReceiveError = std::convert::Infallible;
83
84 async fn receive(&self) -> Result<IncomingMessage, Self::ReceiveError> {
85 std::future::pending().await
86 }
87 }
88}
89
90#[cfg(any(test, feature = "test-support"))]
91pub(crate) mod test_support {
92 use std::sync::Arc;
93
94 use tokio::sync::{
95 Mutex, RwLock,
96 broadcast::{self, Receiver, Sender},
97 };
98
99 use super::*;
100
101 #[derive(Debug)]
104 pub struct TestCommunicationBackend {
105 outgoing_tx: Sender<OutgoingMessage>,
106 outgoing_rx: Receiver<OutgoingMessage>,
107 outgoing: Arc<RwLock<Vec<OutgoingMessage>>>,
108 incoming_tx: Sender<IncomingMessage>,
109 incoming_rx: Receiver<IncomingMessage>,
110 }
111
112 impl Clone for TestCommunicationBackend {
113 fn clone(&self) -> Self {
114 TestCommunicationBackend {
115 outgoing_tx: self.outgoing_tx.clone(),
116 outgoing_rx: self.outgoing_rx.resubscribe(),
117 outgoing: self.outgoing.clone(),
118 incoming_tx: self.incoming_tx.clone(),
119 incoming_rx: self.incoming_rx.resubscribe(),
120 }
121 }
122 }
123
124 impl Default for TestCommunicationBackend {
125 fn default() -> Self {
126 Self::new()
127 }
128 }
129
130 #[derive(Debug)]
132 pub struct TestCommunicationBackendReceiver(RwLock<Receiver<IncomingMessage>>);
133
134 impl TestCommunicationBackend {
135 pub fn new() -> Self {
137 let (outgoing_tx, outgoing_rx) = broadcast::channel(10);
138 let (incoming_tx, incoming_rx) = broadcast::channel(10);
139 TestCommunicationBackend {
140 outgoing_tx,
141 outgoing_rx,
142 outgoing: Arc::new(RwLock::new(Vec::new())),
143 incoming_tx,
144 incoming_rx,
145 }
146 }
147
148 pub fn push_incoming(&self, message: IncomingMessage) {
150 self.incoming_tx
151 .send(message)
152 .expect("Failed to send incoming message");
153 }
154
155 pub async fn outgoing(&self) -> Vec<OutgoingMessage> {
157 self.outgoing.read().await.clone()
158 }
159
160 pub async fn drain_outgoing(&self) -> Vec<OutgoingMessage> {
162 self.outgoing.write().await.drain(..).collect()
163 }
164 }
165
166 impl CommunicationBackend for TestCommunicationBackend {
167 type SendError = ();
168 type Receiver = TestCommunicationBackendReceiver;
169
170 async fn send(&self, message: OutgoingMessage) -> Result<(), Self::SendError> {
171 self.outgoing.write().await.push(message);
172 Ok(())
173 }
174
175 async fn subscribe(&self) -> Self::Receiver {
176 TestCommunicationBackendReceiver(RwLock::new(self.incoming_rx.resubscribe()))
177 }
178 }
179
180 impl CommunicationBackendReceiver for TestCommunicationBackendReceiver {
181 type ReceiveError = ();
182
183 async fn receive(&self) -> Result<IncomingMessage, Self::ReceiveError> {
184 Ok(self
185 .0
186 .write()
187 .await
188 .recv()
189 .await
190 .expect("Failed to receive incoming message"))
191 }
192 }
193
194 #[allow(unused)]
195 #[derive(Clone)]
196 pub struct TestTwoWayCommunicationBackend {
197 outgoing: broadcast::Sender<OutgoingMessage>,
198 receiver: TestTwoWayCommunicationBackendReceiver,
199 }
200
201 #[allow(unused)]
202 #[derive(Clone)]
203 pub struct TestTwoWayCommunicationBackendReceiver {
204 incoming: Arc<Mutex<broadcast::Receiver<OutgoingMessage>>>,
205 }
206
207 impl CommunicationBackendReceiver for TestTwoWayCommunicationBackendReceiver {
208 type ReceiveError = ();
209
210 async fn receive(&self) -> Result<IncomingMessage, Self::ReceiveError> {
211 let mut incoming = self.incoming.lock().await;
212 let message = incoming
213 .recv()
214 .await
215 .expect("Failed to receive incoming message");
216 Ok(IncomingMessage {
217 payload: message.payload,
218 destination: message.destination,
219 source: crate::endpoint::Source::DesktopMain,
220 topic: message.topic,
221 })
222 }
223 }
224
225 impl TestTwoWayCommunicationBackend {
226 #[allow(unused)]
227 pub fn new() -> (Self, Self) {
228 let (outgoing0, incoming0) = broadcast::channel(128);
229 let (outgoing1, incoming1) = broadcast::channel(128);
230 let one = TestTwoWayCommunicationBackend {
231 outgoing: outgoing0,
232 receiver: TestTwoWayCommunicationBackendReceiver {
233 incoming: Arc::new(Mutex::new(incoming1)),
234 },
235 };
236 let two = TestTwoWayCommunicationBackend {
237 outgoing: outgoing1,
238 receiver: TestTwoWayCommunicationBackendReceiver {
239 incoming: Arc::new(Mutex::new(incoming0)),
240 },
241 };
242 (one, two)
243 }
244 }
245
246 impl CommunicationBackend for TestTwoWayCommunicationBackend {
247 type SendError = ();
248 type Receiver = TestTwoWayCommunicationBackendReceiver;
249
250 async fn send(&self, message: OutgoingMessage) -> Result<(), Self::SendError> {
251 self.outgoing
252 .send(message)
253 .expect("Failed to send outgoing message");
254 Ok(())
255 }
256
257 async fn subscribe(&self) -> Self::Receiver {
258 TestTwoWayCommunicationBackendReceiver {
259 incoming: Arc::new(Mutex::new(
260 self.receiver.incoming.lock().await.resubscribe(),
261 )),
262 }
263 }
264 }
265}