Skip to main content

bitwarden_ipc/wasm/
communication_backend.rs

1use std::sync::Arc;
2
3use bitwarden_error::bitwarden_error;
4use bitwarden_threading::ThreadBoundRunner;
5use thiserror::Error;
6use tokio::sync::RwLock;
7use wasm_bindgen::prelude::*;
8
9use crate::{
10    error::IpcErrorKind,
11    message::{IncomingMessage, OutgoingMessage},
12    traits::{CommunicationBackend, CommunicationBackendReceiver},
13};
14
15#[allow(missing_docs)]
16#[derive(Debug, Error)]
17#[bitwarden_error(basic)]
18#[error("Failed to deserialize incoming message: {0}")]
19pub struct DeserializeError(String);
20
21#[allow(missing_docs)]
22#[derive(Debug, Error)]
23#[bitwarden_error(basic)]
24#[error("Incoming message channel failed: {0}")]
25pub struct ChannelError(String);
26
27/// Error type for the WASM communication backend's send and receive operations.
28///
29/// Distinguishes recoverable failures (which leave the shared IPC client running) from the fatal
30/// closed-channel state. Without this distinction the client's processing loop would treat a
31/// permanently-closed broadcast channel as recoverable and busy-loop on it, since a closed channel
32/// returns an error immediately and forever without ever awaiting.
33#[derive(Debug, Error)]
34pub enum WasmCommunicationError {
35    /// An error returned by the JavaScript backend (e.g. a failed send). Recoverable: the IPC
36    /// client keeps running so future operations can succeed.
37    #[error("{0}")]
38    Js(String),
39
40    /// The incoming message receiver fell behind and `0` messages were dropped. Recoverable: the
41    /// next receive resumes normally.
42    #[error("incoming message channel lagged, {0} messages were dropped")]
43    Lagged(u64),
44
45    /// The communication channel was closed because all senders were dropped. This is fatal: the
46    /// IPC client's processing loop stops cleanly instead of busy-looping on the closed channel.
47    #[error("incoming message channel closed")]
48    Closed,
49}
50
51impl IpcErrorKind for WasmCommunicationError {
52    fn is_fatal(&self) -> bool {
53        matches!(self, WasmCommunicationError::Closed)
54    }
55}
56
57#[wasm_bindgen(typescript_custom_section)]
58const TS_CUSTOM_TYPES: &'static str = r#"
59export interface IpcCommunicationBackendSender {
60    send(message: OutgoingMessage): Promise<void>;
61}
62"#;
63
64#[wasm_bindgen]
65extern "C" {
66    /// JavaScript interface for handling outgoing messages from the IPC framework.
67    #[wasm_bindgen(js_name = IpcCommunicationBackendSender, typescript_type = "IpcCommunicationBackendSender")]
68    pub type JsCommunicationBackendSender;
69
70    /// Used by the IPC framework to send an outgoing message.
71    #[wasm_bindgen(catch, method, structural)]
72    pub async fn send(
73        this: &JsCommunicationBackendSender,
74        message: OutgoingMessage,
75    ) -> Result<(), JsValue>;
76
77    /// Used by JavaScript to provide an incoming message to the IPC framework.
78    #[wasm_bindgen(catch, method, structural)]
79    pub async fn receive(this: &JsCommunicationBackendSender) -> Result<JsValue, JsValue>;
80}
81
82/// JavaScript implementation of the `CommunicationBackend` trait for IPC communication.
83#[wasm_bindgen(js_name = IpcCommunicationBackend)]
84pub struct JsCommunicationBackend {
85    sender: Arc<ThreadBoundRunner<JsCommunicationBackendSender>>,
86    receive_rx: tokio::sync::broadcast::Receiver<IncomingMessage>,
87    receive_tx: tokio::sync::broadcast::Sender<IncomingMessage>,
88}
89
90impl Clone for JsCommunicationBackend {
91    fn clone(&self) -> Self {
92        Self {
93            sender: self.sender.clone(),
94            receive_rx: self.receive_rx.resubscribe(),
95            receive_tx: self.receive_tx.clone(),
96        }
97    }
98}
99
100#[wasm_bindgen(js_class = IpcCommunicationBackend)]
101impl JsCommunicationBackend {
102    /// Creates a new instance of the JavaScript communication backend.
103    #[wasm_bindgen(constructor)]
104    pub fn new(sender: JsCommunicationBackendSender) -> Self {
105        let (receive_tx, receive_rx) = tokio::sync::broadcast::channel(20);
106        Self {
107            sender: Arc::new(ThreadBoundRunner::new(sender)),
108            receive_rx,
109            receive_tx,
110        }
111    }
112
113    /// Used by JavaScript to provide an incoming message to the IPC framework.
114    pub fn receive(&self, message: IncomingMessage) -> Result<(), JsValue> {
115        self.receive_tx
116            .send(message)
117            .map_err(|e| ChannelError(e.to_string()))?;
118        Ok(())
119    }
120}
121
122impl CommunicationBackend for JsCommunicationBackend {
123    type SendError = WasmCommunicationError;
124    type Receiver = RwLock<tokio::sync::broadcast::Receiver<IncomingMessage>>;
125
126    async fn send(&self, message: OutgoingMessage) -> Result<(), Self::SendError> {
127        // Both the thread-runner failure and the JS-side send failure are treated as recoverable:
128        // a single failed send must not tear down the shared IPC client.
129        self.sender
130            .run_in_thread(|sender| async move {
131                sender.send(message).await.map_err(|e| format!("{e:?}"))
132            })
133            .await
134            .map_err(|e| WasmCommunicationError::Js(e.to_string()))?
135            .map_err(WasmCommunicationError::Js)
136    }
137
138    async fn subscribe(&self) -> Self::Receiver {
139        RwLock::new(self.receive_rx.resubscribe())
140    }
141}
142
143impl CommunicationBackendReceiver for RwLock<tokio::sync::broadcast::Receiver<IncomingMessage>> {
144    type ReceiveError = WasmCommunicationError;
145
146    async fn receive(&self) -> Result<IncomingMessage, Self::ReceiveError> {
147        use tokio::sync::broadcast::error::RecvError;
148
149        self.write().await.recv().await.map_err(|e| match e {
150            // All senders have been dropped; the channel can never produce another message and
151            // `recv()` would return immediately forever. Treat this as fatal so the processing
152            // loop stops instead of busy-looping.
153            RecvError::Closed => WasmCommunicationError::Closed,
154            // The receiver fell behind and missed `skipped` messages. The next `recv()` resumes
155            // normally, so this is recoverable.
156            RecvError::Lagged(skipped) => WasmCommunicationError::Lagged(skipped),
157        })
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use tokio::sync::{RwLock, broadcast};
164
165    use super::*;
166    use crate::{
167        endpoint::{Endpoint, HostId, Source},
168        error::IpcErrorKind,
169    };
170
171    fn test_message() -> IncomingMessage {
172        IncomingMessage {
173            payload: vec![],
174            source: Source::BrowserBackground { id: HostId::Own },
175            destination: Endpoint::BrowserBackground { id: HostId::Own },
176            topic: None,
177        }
178    }
179
180    #[tokio::test]
181    async fn receive_returns_fatal_closed_when_all_senders_are_dropped() {
182        let (tx, rx) = broadcast::channel::<IncomingMessage>(4);
183        let receiver = RwLock::new(rx);
184
185        // Dropping the only sender closes the channel permanently.
186        drop(tx);
187
188        let error = receiver.receive().await.unwrap_err();
189        assert!(matches!(error, WasmCommunicationError::Closed));
190        assert!(error.is_fatal());
191    }
192
193    #[tokio::test]
194    async fn receive_returns_recoverable_lagged_when_receiver_falls_behind() {
195        let (tx, rx) = broadcast::channel::<IncomingMessage>(1);
196        let receiver = RwLock::new(rx);
197
198        // Overflow the buffer without receiving so the next recv reports lag.
199        for _ in 0..3 {
200            tx.send(test_message()).expect("send should not fail");
201        }
202
203        let error = receiver.receive().await.unwrap_err();
204        assert!(matches!(error, WasmCommunicationError::Lagged(_)));
205        assert!(!error.is_fatal());
206    }
207}