Skip to main content

bitwarden_shared_unlock/
leader.rs

1#[cfg(not(feature = "wasm"))]
2use std::time::Instant;
3use std::{
4    collections::HashMap,
5    ops::Sub,
6    sync::{Arc, Mutex},
7    time::Duration,
8};
9
10use bitwarden_error::bitwarden_error;
11use bitwarden_ipc::{Endpoint, IpcClient, IpcClientExt, SubscribeError, TypedIncomingMessage};
12use bitwarden_threading::{cancellation_token, time::sleep};
13use thiserror::Error;
14use tracing::{info, warn};
15#[cfg(feature = "wasm")]
16use web_time::Instant;
17
18use crate::{DeviceEvent, FollowerMessage, LeaderMessage, LockState, drivers::SharedUnlockDriver};
19
20const FOLLOWER_STALE_AFTER: Duration = Duration::from_secs(30);
21
22/// Error type for failure to start the shared unlock leader.
23#[bitwarden_error(basic)]
24#[derive(Debug, Error)]
25#[error("Could not start shared unlock leader: {0}")]
26pub struct LeaderStartError(#[from] SubscribeError);
27
28struct FollowerSession {
29    last_seen_at: Instant,
30}
31
32struct FollowerSessions {
33    sessions: Mutex<HashMap<bitwarden_ipc::Endpoint, FollowerSession>>,
34}
35
36impl FollowerSessions {
37    fn new() -> Self {
38        Self {
39            sessions: Mutex::new(HashMap::new()),
40        }
41    }
42
43    fn upsert(&self, endpoint: bitwarden_ipc::Endpoint) {
44        let mut sessions = self
45            .sessions
46            .lock()
47            .unwrap_or_else(|poisoned| poisoned.into_inner());
48
49        if !sessions.contains_key(&endpoint) {
50            info!("Shared-Unlock client connected {:?}", endpoint);
51        }
52
53        sessions.insert(
54            endpoint,
55            FollowerSession {
56                last_seen_at: Instant::now(),
57            },
58        );
59    }
60
61    fn active_endpoints(&self) -> Vec<bitwarden_ipc::Endpoint> {
62        let sessions = self
63            .sessions
64            .lock()
65            .unwrap_or_else(|poisoned| poisoned.into_inner());
66
67        sessions.keys().cloned().collect()
68    }
69
70    fn prune_stale(&self, stale_after: Duration) {
71        let mut sessions = self
72            .sessions
73            .lock()
74            .unwrap_or_else(|poisoned| poisoned.into_inner());
75
76        let now = Instant::now();
77        for (endpoint, session) in sessions.iter() {
78            if now.sub(session.last_seen_at) > stale_after {
79                info!("Shared-Unlock client {:?} disconnected", endpoint);
80            }
81        }
82        sessions.retain(|_, session| now.sub(session.last_seen_at) <= stale_after);
83    }
84}
85
86/// Coordinates shared unlock state as the authoritative endpoint for followers.
87pub struct Leader<L: SharedUnlockDriver>(Arc<InnerLeader<L>>);
88
89impl<L: SharedUnlockDriver> Clone for Leader<L> {
90    fn clone(&self) -> Self {
91        Self(self.0.clone())
92    }
93}
94
95/// Inner implementation of the shared unlock leader, containing the actual state and logic. The
96/// outer `Leader` struct is a thin wrapper around an `Arc` to allow for shared ownership across
97/// async tasks.
98struct InnerLeader<D: SharedUnlockDriver> {
99    driver: D,
100    follower_sessions: FollowerSessions,
101    ipc_client: Arc<dyn IpcClient>,
102}
103
104impl<D: SharedUnlockDriver + Send + Sync + 'static> Leader<D> {
105    /// Creates a leader instance for the shared unlock protocol.
106    pub fn create(lock_system: D, ipc_client: Arc<dyn IpcClient>) -> Self {
107        Self(Arc::new(InnerLeader {
108            driver: lock_system,
109            follower_sessions: FollowerSessions::new(),
110            ipc_client,
111        }))
112    }
113
114    /// Starts background processing for incoming follower messages.
115    pub async fn start(
116        &self,
117        cancellation_token: Option<cancellation_token::CancellationToken>,
118    ) -> Result<(), LeaderStartError> {
119        let cancellation_token = cancellation_token.unwrap_or_default();
120        let mut subscription = self
121            .0
122            .ipc_client
123            .subscribe_typed::<FollowerMessage>()
124            .await?;
125        let leader = self.clone();
126
127        let cancellation_token_clone = cancellation_token.clone();
128        let future = async move {
129            loop {
130                let result = subscription
131                    .receive(Some(cancellation_token_clone.clone()))
132                    .await;
133                match result {
134                    Ok(message) => {
135                        if let Err(error) = leader.receive_message(message).await {
136                            tracing::error!(
137                                ?error,
138                                "Failed to handle shared unlock leader message"
139                            );
140                        }
141                    }
142                    Err(bitwarden_ipc::TypedReceiveError::Cancelled) => {
143                        tracing::info!("Shared unlock leader stopped by cancellation");
144                        break;
145                    }
146                    // This is required because otherwise the browser may freeze in this loop
147                    Err(bitwarden_ipc::TypedReceiveError::Channel(
148                        tokio::sync::broadcast::error::RecvError::Closed,
149                    )) => {
150                        tracing::info!("Transport channel closed. Waiting for it to open");
151                        sleep(std::time::Duration::from_secs(1)).await;
152                    }
153                    Err(error) => {
154                        tracing::error!(?error, "Failed to receive shared unlock IPC message");
155                    }
156                }
157            }
158        };
159
160        #[cfg(not(target_arch = "wasm32"))]
161        tokio::spawn(future);
162
163        #[cfg(target_arch = "wasm32")]
164        wasm_bindgen_futures::spawn_local(future);
165
166        let cancellation_token = cancellation_token.clone();
167        let leader = self.clone();
168        let timer_future = async move {
169            loop {
170                tokio::select! {
171                    _ = cancellation_token.cancelled() => {
172                        tracing::debug!("Shared unlock leader timer cancelled");
173                        break;
174                    }
175                    _ = bitwarden_threading::time::sleep(crate::HEARTBEAT_INTERVAL) => {
176                        leader.0
177                            .follower_sessions
178                            .prune_stale(FOLLOWER_STALE_AFTER);
179                    }
180                }
181            }
182        };
183
184        #[cfg(not(target_arch = "wasm32"))]
185        tokio::spawn(timer_future);
186
187        #[cfg(target_arch = "wasm32")]
188        wasm_bindgen_futures::spawn_local(timer_future);
189
190        Ok(())
191    }
192
193    async fn broadcast_to_active_followers(&self, message: LeaderMessage) {
194        let endpoints = self.0.follower_sessions.active_endpoints();
195        for endpoint in endpoints {
196            self.send_message(message.clone(), endpoint).await;
197        }
198    }
199
200    /// Handles a message sent by a follower.
201    ///
202    /// This updates follower session liveness, validates web message origins against the
203    /// follower user's vault URL, and applies lock state changes when needed.
204    pub async fn receive_message(
205        &self,
206        incoming_message: TypedIncomingMessage<FollowerMessage>,
207    ) -> Result<(), ()> {
208        let message = incoming_message.payload;
209        let sender = incoming_message.source;
210        let endpoint: bitwarden_ipc::Endpoint = sender.clone().into();
211
212        // Validate the origin of web sources against the user's vault URL
213        if let bitwarden_ipc::Source::Web { origin, .. } = &sender {
214            let user_id = message.user_id();
215            match self.0.driver.get_vault_url(user_id).await {
216                Some(user_vault_url) if origin == &user_vault_url => {}
217                Some(user_vault_url) => {
218                    warn!(%origin, %user_vault_url, "IPC message origin does not match user's vault URL, ignoring message");
219                    return Ok(());
220                }
221                None => {
222                    warn!(%origin, "No vault URL found for user, ignoring message");
223                    return Ok(());
224                }
225            }
226        }
227
228        match message {
229            FollowerMessage::LockStateUpdate {
230                user_id,
231                lock_state: LockState::Locked,
232            } => {
233                self.0.follower_sessions.upsert(endpoint.clone());
234
235                let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
236                if self_lock_state == LockState::Locked {
237                    return Ok(());
238                }
239
240                self.0
241                    .driver
242                    .lock_user(user_id)
243                    .await
244                    .inspect_err(|_| warn!(%user_id, "Failed to lock user"))?;
245                Ok(())
246            }
247            FollowerMessage::LockStateUpdate {
248                user_id,
249                lock_state: LockState::Unlocked { user_key },
250            } => {
251                self.0.follower_sessions.upsert(endpoint.clone());
252
253                let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
254                if let LockState::Unlocked { .. } = self_lock_state {
255                    return Ok(());
256                }
257
258                self.0
259                    .driver
260                    .unlock_user(user_id, user_key.clone())
261                    .await
262                    .inspect_err(|_| warn!(%user_id, "Failed to unlock user"))?;
263                Ok(())
264            }
265            FollowerMessage::StartSession {
266                user_id,
267                lock_state,
268            } => {
269                self.0.follower_sessions.upsert(endpoint.clone());
270                let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
271
272                match (lock_state, self_lock_state.clone()) {
273                    (LockState::Unlocked { user_key }, LockState::Locked) => {
274                        self.0
275                            .driver
276                            .unlock_user(user_id, user_key.clone())
277                            .await
278                            .inspect_err(
279                                |_| warn!(%user_id, "Failed to unlock user during start session"),
280                            )?;
281                    }
282                    (LockState::Locked, LockState::Unlocked { .. }) => {
283                        let response = LeaderMessage::LockStateUpdate {
284                            user_id,
285                            lock_state: self_lock_state,
286                        };
287                        self.send_message(response, endpoint.clone()).await;
288                    }
289                    _ => {
290                        // States are already in sync, no action needed
291                    }
292                };
293
294                Ok(())
295            }
296            FollowerMessage::HeartBeat { user_id } => {
297                if self
298                    .0
299                    .follower_sessions
300                    .sessions
301                    .lock()
302                    .unwrap_or_else(|poisoned| poisoned.into_inner())
303                    .get(&endpoint)
304                    .is_none()
305                {
306                    tracing::info!(
307                        "Received heartbeat from unknown shared-unlock client {:?}, asking for session start",
308                        endpoint
309                    );
310                    let response = LeaderMessage::RequestSessionStart { user_id };
311                    self.send_message(response, endpoint.clone()).await;
312                    return Ok(());
313                }
314
315                self.0.follower_sessions.upsert(endpoint.clone());
316
317                // Echo back the heartbeat to confirm liveness
318                let response = LeaderMessage::HeartBeat { user_id };
319                self.send_message(response, endpoint.clone()).await;
320
321                let lock_state = self.0.driver.get_user_lock_state(user_id).await;
322                // Ensure that if somehow the lockstate is desynced, it syncs again
323                let authoritative_lockstate_update = LeaderMessage::LockStateUpdate {
324                    user_id,
325                    lock_state,
326                };
327                self.send_message(authoritative_lockstate_update, endpoint.clone())
328                    .await;
329                Ok(())
330            }
331        }
332    }
333
334    /// Handles local device events and propagates authoritative updates to followers.
335    ///
336    /// Lock and unlock events are broadcast to active followers. Timer events prune stale
337    /// follower sessions that have not sent recent heartbeats.
338    pub async fn handle_device_event(&self, event: DeviceEvent) -> Result<(), ()> {
339        match event {
340            DeviceEvent::ManualLock { user_id } => {
341                let message = LeaderMessage::LockStateUpdate {
342                    user_id,
343                    lock_state: LockState::Locked,
344                };
345                self.broadcast_to_active_followers(message).await;
346            }
347            DeviceEvent::ManualUnlock {
348                user_id,
349                ref user_key,
350            } => {
351                let message = LeaderMessage::LockStateUpdate {
352                    user_id,
353                    lock_state: LockState::Unlocked {
354                        user_key: user_key.to_owned(),
355                    },
356                };
357                self.broadcast_to_active_followers(message).await;
358            }
359        }
360
361        Ok(())
362    }
363
364    async fn send_message(&self, message: LeaderMessage, recipient: Endpoint) {
365        if let Err(error) = self.0.ipc_client.send_typed(message, recipient).await {
366            tracing::error!(?error, "Failed to send shared unlock IPC message");
367        }
368    }
369}