bitwarden_ipc/traits/
communication_backend.rs1use std::fmt::Debug;
2
3use crate::{
4 error::IpcErrorKind,
5 message::{IncomingMessage, OutgoingMessage},
6};
7
8pub trait CommunicationBackend: Send + Sync + 'static {
12 type SendError: Debug + Send + Sync + 'static + IpcErrorKind;
13 type Receiver: CommunicationBackendReceiver;
14
15 fn send(
26 &self,
27 message: OutgoingMessage,
28 ) -> impl std::future::Future<Output = Result<(), Self::SendError>> + Send + Sync;
29
30 fn subscribe(&self) -> impl std::future::Future<Output = Self::Receiver> + Send + Sync;
38}
39
40pub trait CommunicationBackendReceiver: Send + Sync + 'static {
47 type ReceiveError: Debug + Send + Sync + 'static + IpcErrorKind;
48
49 fn receive(
59 &self,
60 ) -> impl std::future::Future<Output = Result<IncomingMessage, Self::ReceiveError>> + Send + Sync;
61}
62
63pub(crate) mod noop {
64 use super::*;
65
66 pub struct NoopCommunicationBackend;
71
72 pub struct NoopCommunicationBackendReceiver;
74
75 impl CommunicationBackend for NoopCommunicationBackend {
76 type SendError = std::convert::Infallible;
77 type Receiver = NoopCommunicationBackendReceiver;
78
79 async fn send(&self, _message: OutgoingMessage) -> Result<(), Self::SendError> {
80 Ok(())
81 }
82
83 async fn subscribe(&self) -> Self::Receiver {
84 NoopCommunicationBackendReceiver
85 }
86 }
87
88 impl CommunicationBackendReceiver for NoopCommunicationBackendReceiver {
89 type ReceiveError = std::convert::Infallible;
90
91 async fn receive(&self) -> Result<IncomingMessage, Self::ReceiveError> {
92 std::future::pending().await
93 }
94 }
95}
96
97#[cfg(any(test, feature = "test-support"))]
98pub(crate) mod test_support {
99 use std::sync::Arc;
100
101 use tokio::sync::{
102 Mutex, RwLock,
103 broadcast::{self, Receiver, Sender},
104 };
105
106 use super::*;
107
108 #[derive(Debug)]
111 pub struct TestCommunicationBackend {
112 outgoing_tx: Sender<OutgoingMessage>,
113 outgoing_rx: Receiver<OutgoingMessage>,
114 outgoing: Arc<RwLock<Vec<OutgoingMessage>>>,
115 incoming_tx: Sender<IncomingMessage>,
116 incoming_rx: Receiver<IncomingMessage>,
117 }
118
119 impl Clone for TestCommunicationBackend {
120 fn clone(&self) -> Self {
121 TestCommunicationBackend {
122 outgoing_tx: self.outgoing_tx.clone(),
123 outgoing_rx: self.outgoing_rx.resubscribe(),
124 outgoing: self.outgoing.clone(),
125 incoming_tx: self.incoming_tx.clone(),
126 incoming_rx: self.incoming_rx.resubscribe(),
127 }
128 }
129 }
130
131 impl Default for TestCommunicationBackend {
132 fn default() -> Self {
133 Self::new()
134 }
135 }
136
137 #[derive(Debug)]
139 pub struct TestCommunicationBackendReceiver(RwLock<Receiver<IncomingMessage>>);
140
141 impl TestCommunicationBackend {
142 pub fn new() -> Self {
144 let (outgoing_tx, outgoing_rx) = broadcast::channel(10);
145 let (incoming_tx, incoming_rx) = broadcast::channel(10);
146 TestCommunicationBackend {
147 outgoing_tx,
148 outgoing_rx,
149 outgoing: Arc::new(RwLock::new(Vec::new())),
150 incoming_tx,
151 incoming_rx,
152 }
153 }
154
155 pub fn push_incoming(&self, message: IncomingMessage) {
157 self.incoming_tx
158 .send(message)
159 .expect("Failed to send incoming message");
160 }
161
162 pub async fn outgoing(&self) -> Vec<OutgoingMessage> {
164 self.outgoing.read().await.clone()
165 }
166
167 pub async fn drain_outgoing(&self) -> Vec<OutgoingMessage> {
169 self.outgoing.write().await.drain(..).collect()
170 }
171 }
172
173 impl CommunicationBackend for TestCommunicationBackend {
174 type SendError = ();
175 type Receiver = TestCommunicationBackendReceiver;
176
177 async fn send(&self, message: OutgoingMessage) -> Result<(), Self::SendError> {
178 self.outgoing.write().await.push(message);
179 Ok(())
180 }
181
182 async fn subscribe(&self) -> Self::Receiver {
183 TestCommunicationBackendReceiver(RwLock::new(self.incoming_rx.resubscribe()))
184 }
185 }
186
187 impl CommunicationBackendReceiver for TestCommunicationBackendReceiver {
188 type ReceiveError = ();
189
190 async fn receive(&self) -> Result<IncomingMessage, Self::ReceiveError> {
191 Ok(self
192 .0
193 .write()
194 .await
195 .recv()
196 .await
197 .expect("Failed to receive incoming message"))
198 }
199 }
200
201 #[allow(unused)]
202 #[derive(Clone)]
203 pub struct TestTwoWayCommunicationBackend {
204 outgoing: broadcast::Sender<OutgoingMessage>,
205 receiver: TestTwoWayCommunicationBackendReceiver,
206 }
207
208 #[allow(unused)]
209 #[derive(Clone)]
210 pub struct TestTwoWayCommunicationBackendReceiver {
211 incoming: Arc<Mutex<broadcast::Receiver<OutgoingMessage>>>,
212 }
213
214 impl CommunicationBackendReceiver for TestTwoWayCommunicationBackendReceiver {
215 type ReceiveError = ();
216
217 async fn receive(&self) -> Result<IncomingMessage, Self::ReceiveError> {
218 let mut incoming = self.incoming.lock().await;
219 let message = incoming
220 .recv()
221 .await
222 .expect("Failed to receive incoming message");
223 Ok(IncomingMessage {
224 payload: message.payload,
225 destination: message.destination,
226 source: crate::endpoint::Source::DesktopMain,
227 topic: message.topic,
228 })
229 }
230 }
231
232 impl TestTwoWayCommunicationBackend {
233 #[allow(unused)]
234 pub fn new() -> (Self, Self) {
235 let (outgoing0, incoming0) = broadcast::channel(128);
236 let (outgoing1, incoming1) = broadcast::channel(128);
237 let one = TestTwoWayCommunicationBackend {
238 outgoing: outgoing0,
239 receiver: TestTwoWayCommunicationBackendReceiver {
240 incoming: Arc::new(Mutex::new(incoming1)),
241 },
242 };
243 let two = TestTwoWayCommunicationBackend {
244 outgoing: outgoing1,
245 receiver: TestTwoWayCommunicationBackendReceiver {
246 incoming: Arc::new(Mutex::new(incoming0)),
247 },
248 };
249 (one, two)
250 }
251 }
252
253 impl CommunicationBackend for TestTwoWayCommunicationBackend {
254 type SendError = ();
255 type Receiver = TestTwoWayCommunicationBackendReceiver;
256
257 async fn send(&self, message: OutgoingMessage) -> Result<(), Self::SendError> {
258 self.outgoing
259 .send(message)
260 .expect("Failed to send outgoing message");
261 Ok(())
262 }
263
264 async fn subscribe(&self) -> Self::Receiver {
265 TestTwoWayCommunicationBackendReceiver {
266 incoming: Arc::new(Mutex::new(
267 self.receiver.incoming.lock().await.resubscribe(),
268 )),
269 }
270 }
271 }
272}