bitwarden_ipc/wasm/
communication_backend.rs1use std::sync::Arc;
2
3use bitwarden_error::bitwarden_error;
4use bitwarden_threading::ThreadBoundRunner;
5use thiserror::Error;
6use tokio::sync::RwLock;
7use wasm_bindgen::prelude::*;
8
9use crate::{
10 error::IpcErrorKind,
11 message::{IncomingMessage, OutgoingMessage},
12 traits::{CommunicationBackend, CommunicationBackendReceiver},
13};
14
15#[allow(missing_docs)]
16#[derive(Debug, Error)]
17#[bitwarden_error(basic)]
18#[error("Failed to deserialize incoming message: {0}")]
19pub struct DeserializeError(String);
20
21#[allow(missing_docs)]
22#[derive(Debug, Error)]
23#[bitwarden_error(basic)]
24#[error("Incoming message channel failed: {0}")]
25pub struct ChannelError(String);
26
27#[derive(Debug, Error)]
34pub enum WasmCommunicationError {
35 #[error("{0}")]
38 Js(String),
39
40 #[error("incoming message channel lagged, {0} messages were dropped")]
43 Lagged(u64),
44
45 #[error("incoming message channel closed")]
48 Closed,
49}
50
51impl IpcErrorKind for WasmCommunicationError {
52 fn is_fatal(&self) -> bool {
53 matches!(self, WasmCommunicationError::Closed)
54 }
55}
56
57#[wasm_bindgen(typescript_custom_section)]
58const TS_CUSTOM_TYPES: &'static str = r#"
59export interface IpcCommunicationBackendSender {
60 send(message: OutgoingMessage): Promise<void>;
61}
62"#;
63
64#[wasm_bindgen]
65extern "C" {
66 #[wasm_bindgen(js_name = IpcCommunicationBackendSender, typescript_type = "IpcCommunicationBackendSender")]
68 pub type JsCommunicationBackendSender;
69
70 #[wasm_bindgen(catch, method, structural)]
72 pub async fn send(
73 this: &JsCommunicationBackendSender,
74 message: OutgoingMessage,
75 ) -> Result<(), JsValue>;
76
77 #[wasm_bindgen(catch, method, structural)]
79 pub async fn receive(this: &JsCommunicationBackendSender) -> Result<JsValue, JsValue>;
80}
81
82#[wasm_bindgen(js_name = IpcCommunicationBackend)]
84pub struct JsCommunicationBackend {
85 sender: Arc<ThreadBoundRunner<JsCommunicationBackendSender>>,
86 receive_rx: tokio::sync::broadcast::Receiver<IncomingMessage>,
87 receive_tx: tokio::sync::broadcast::Sender<IncomingMessage>,
88}
89
90impl Clone for JsCommunicationBackend {
91 fn clone(&self) -> Self {
92 Self {
93 sender: self.sender.clone(),
94 receive_rx: self.receive_rx.resubscribe(),
95 receive_tx: self.receive_tx.clone(),
96 }
97 }
98}
99
100#[wasm_bindgen(js_class = IpcCommunicationBackend)]
101impl JsCommunicationBackend {
102 #[wasm_bindgen(constructor)]
104 pub fn new(sender: JsCommunicationBackendSender) -> Self {
105 let (receive_tx, receive_rx) = tokio::sync::broadcast::channel(20);
106 Self {
107 sender: Arc::new(ThreadBoundRunner::new(sender)),
108 receive_rx,
109 receive_tx,
110 }
111 }
112
113 pub fn receive(&self, message: IncomingMessage) -> Result<(), JsValue> {
115 self.receive_tx
116 .send(message)
117 .map_err(|e| ChannelError(e.to_string()))?;
118 Ok(())
119 }
120}
121
122impl CommunicationBackend for JsCommunicationBackend {
123 type SendError = WasmCommunicationError;
124 type Receiver = RwLock<tokio::sync::broadcast::Receiver<IncomingMessage>>;
125
126 async fn send(&self, message: OutgoingMessage) -> Result<(), Self::SendError> {
127 self.sender
130 .run_in_thread(|sender| async move {
131 sender.send(message).await.map_err(|e| format!("{e:?}"))
132 })
133 .await
134 .map_err(|e| WasmCommunicationError::Js(e.to_string()))?
135 .map_err(WasmCommunicationError::Js)
136 }
137
138 async fn subscribe(&self) -> Self::Receiver {
139 RwLock::new(self.receive_rx.resubscribe())
140 }
141}
142
143impl CommunicationBackendReceiver for RwLock<tokio::sync::broadcast::Receiver<IncomingMessage>> {
144 type ReceiveError = WasmCommunicationError;
145
146 async fn receive(&self) -> Result<IncomingMessage, Self::ReceiveError> {
147 use tokio::sync::broadcast::error::RecvError;
148
149 self.write().await.recv().await.map_err(|e| match e {
150 RecvError::Closed => WasmCommunicationError::Closed,
154 RecvError::Lagged(skipped) => WasmCommunicationError::Lagged(skipped),
157 })
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use tokio::sync::{RwLock, broadcast};
164
165 use super::*;
166 use crate::{
167 endpoint::{Endpoint, HostId, Source},
168 error::IpcErrorKind,
169 };
170
171 fn test_message() -> IncomingMessage {
172 IncomingMessage {
173 payload: vec![],
174 source: Source::BrowserBackground { id: HostId::Own },
175 destination: Endpoint::BrowserBackground { id: HostId::Own },
176 topic: None,
177 }
178 }
179
180 #[tokio::test]
181 async fn receive_returns_fatal_closed_when_all_senders_are_dropped() {
182 let (tx, rx) = broadcast::channel::<IncomingMessage>(4);
183 let receiver = RwLock::new(rx);
184
185 drop(tx);
187
188 let error = receiver.receive().await.unwrap_err();
189 assert!(matches!(error, WasmCommunicationError::Closed));
190 assert!(error.is_fatal());
191 }
192
193 #[tokio::test]
194 async fn receive_returns_recoverable_lagged_when_receiver_falls_behind() {
195 let (tx, rx) = broadcast::channel::<IncomingMessage>(1);
196 let receiver = RwLock::new(rx);
197
198 for _ in 0..3 {
200 tx.send(test_message()).expect("send should not fail");
201 }
202
203 let error = receiver.receive().await.unwrap_err();
204 assert!(matches!(error, WasmCommunicationError::Lagged(_)));
205 assert!(!error.is_fatal());
206 }
207}