Skip to main content

bitwarden_shared_unlock/
leader.rs

1#[cfg(not(feature = "wasm"))]
2use std::time::Instant;
3use std::{
4    collections::HashMap,
5    ops::Sub,
6    sync::{Arc, Mutex},
7    time::Duration,
8};
9
10use bitwarden_error::bitwarden_error;
11use bitwarden_ipc::{Endpoint, IpcClient, IpcClientExt, SubscribeError, TypedIncomingMessage};
12use bitwarden_threading::cancellation_token;
13use thiserror::Error;
14use tracing::{info, warn};
15#[cfg(feature = "wasm")]
16use web_time::Instant;
17
18use crate::{DeviceEvent, FollowerMessage, LeaderMessage, LockState, drivers::SharedUnlockDriver};
19
20const FOLLOWER_STALE_AFTER: Duration = Duration::from_secs(30);
21
22/// Error type for failure to start the shared unlock leader.
23#[bitwarden_error(basic)]
24#[derive(Debug, Error)]
25#[error("Could not start shared unlock leader: {0}")]
26pub struct LeaderStartError(#[from] SubscribeError);
27
28struct FollowerSession {
29    last_seen_at: Instant,
30}
31
32struct FollowerSessions {
33    sessions: Mutex<HashMap<bitwarden_ipc::Endpoint, FollowerSession>>,
34}
35
36impl FollowerSessions {
37    fn new() -> Self {
38        Self {
39            sessions: Mutex::new(HashMap::new()),
40        }
41    }
42
43    fn upsert(&self, endpoint: bitwarden_ipc::Endpoint) {
44        let mut sessions = self
45            .sessions
46            .lock()
47            .unwrap_or_else(|poisoned| poisoned.into_inner());
48
49        if !sessions.contains_key(&endpoint) {
50            info!("Shared-Unlock client connected {:?}", endpoint);
51        }
52
53        sessions.insert(
54            endpoint,
55            FollowerSession {
56                last_seen_at: Instant::now(),
57            },
58        );
59    }
60
61    fn active_endpoints(&self) -> Vec<bitwarden_ipc::Endpoint> {
62        let sessions = self
63            .sessions
64            .lock()
65            .unwrap_or_else(|poisoned| poisoned.into_inner());
66
67        sessions.keys().cloned().collect()
68    }
69
70    fn prune_stale(&self, stale_after: Duration) {
71        let mut sessions = self
72            .sessions
73            .lock()
74            .unwrap_or_else(|poisoned| poisoned.into_inner());
75
76        let now = Instant::now();
77        for (endpoint, session) in sessions.iter() {
78            if now.sub(session.last_seen_at) > stale_after {
79                info!("Shared-Unlock client {:?} disconnected", endpoint);
80            }
81        }
82        sessions.retain(|_, session| now.sub(session.last_seen_at) <= stale_after);
83    }
84}
85
86/// Coordinates shared unlock state as the authoritative endpoint for followers.
87pub struct Leader<L: SharedUnlockDriver>(Arc<InnerLeader<L>>);
88
89impl<L: SharedUnlockDriver> Clone for Leader<L> {
90    fn clone(&self) -> Self {
91        Self(self.0.clone())
92    }
93}
94
95/// Inner implementation of the shared unlock leader, containing the actual state and logic. The
96/// outer `Leader` struct is a thin wrapper around an `Arc` to allow for shared ownership across
97/// async tasks.
98struct InnerLeader<D: SharedUnlockDriver> {
99    driver: D,
100    follower_sessions: FollowerSessions,
101    ipc_client: Arc<dyn IpcClient>,
102}
103
104impl<D: SharedUnlockDriver + Send + Sync + 'static> Leader<D> {
105    /// Creates a leader instance for the shared unlock protocol.
106    pub fn create(lock_system: D, ipc_client: Arc<dyn IpcClient>) -> Self {
107        Self(Arc::new(InnerLeader {
108            driver: lock_system,
109            follower_sessions: FollowerSessions::new(),
110            ipc_client,
111        }))
112    }
113
114    /// Starts background processing for incoming follower messages.
115    pub async fn start(
116        &self,
117        cancellation_token: Option<cancellation_token::CancellationToken>,
118    ) -> Result<(), LeaderStartError> {
119        let cancellation_token = cancellation_token.unwrap_or_default();
120        let mut subscription = self
121            .0
122            .ipc_client
123            .subscribe_typed::<FollowerMessage>()
124            .await?;
125        let leader = self.clone();
126
127        let cancellation_token_clone = cancellation_token.clone();
128        let future = async move {
129            loop {
130                let result = subscription
131                    .receive(Some(cancellation_token_clone.clone()))
132                    .await;
133                match result {
134                    Ok(message) => {
135                        if let Err(error) = leader.receive_message(message).await {
136                            tracing::error!(
137                                ?error,
138                                "Failed to handle shared unlock leader message"
139                            );
140                        }
141                    }
142                    Err(bitwarden_ipc::TypedReceiveError::Cancelled) => {
143                        tracing::info!("Shared unlock leader stopped by cancellation");
144                        break;
145                    }
146                    Err(error) => {
147                        tracing::error!(?error, "Failed to receive shared unlock IPC message");
148                    }
149                }
150            }
151        };
152
153        #[cfg(not(target_arch = "wasm32"))]
154        tokio::spawn(future);
155
156        #[cfg(target_arch = "wasm32")]
157        wasm_bindgen_futures::spawn_local(future);
158
159        let cancellation_token = cancellation_token.clone();
160        let leader = self.clone();
161        let timer_future = async move {
162            loop {
163                tokio::select! {
164                    _ = cancellation_token.cancelled() => {
165                        tracing::debug!("Shared unlock leader timer cancelled");
166                        break;
167                    }
168                    _ = bitwarden_threading::time::sleep(crate::HEARTBEAT_INTERVAL) => {
169                        leader.0
170                            .follower_sessions
171                            .prune_stale(FOLLOWER_STALE_AFTER);
172                    }
173                }
174            }
175        };
176
177        #[cfg(not(target_arch = "wasm32"))]
178        tokio::spawn(timer_future);
179
180        #[cfg(target_arch = "wasm32")]
181        wasm_bindgen_futures::spawn_local(timer_future);
182
183        Ok(())
184    }
185
186    async fn broadcast_to_active_followers(&self, message: LeaderMessage) {
187        let endpoints = self.0.follower_sessions.active_endpoints();
188        for endpoint in endpoints {
189            self.send_message(message.clone(), endpoint).await;
190        }
191    }
192
193    /// Handles a message sent by a follower.
194    ///
195    /// This updates follower session liveness, validates web message origins against the
196    /// follower user's vault URL, and applies lock state changes when needed.
197    pub async fn receive_message(
198        &self,
199        incoming_message: TypedIncomingMessage<FollowerMessage>,
200    ) -> Result<(), ()> {
201        let message = incoming_message.payload;
202        let sender = incoming_message.source;
203        let endpoint: bitwarden_ipc::Endpoint = sender.clone().into();
204
205        // Validate the origin of web sources against the user's vault URL
206        if let bitwarden_ipc::Source::Web { origin, .. } = &sender {
207            let user_id = message.user_id();
208            match self.0.driver.get_vault_url(user_id).await {
209                Some(user_vault_url) if origin == &user_vault_url => {}
210                Some(user_vault_url) => {
211                    warn!(%origin, %user_vault_url, "IPC message origin does not match user's vault URL, ignoring message");
212                    return Ok(());
213                }
214                None => {
215                    warn!(%origin, "No vault URL found for user, ignoring message");
216                    return Ok(());
217                }
218            }
219        }
220
221        match message {
222            FollowerMessage::LockStateUpdate {
223                user_id,
224                lock_state: LockState::Locked,
225            } => {
226                self.0.follower_sessions.upsert(endpoint.clone());
227
228                let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
229                if self_lock_state == LockState::Locked {
230                    return Ok(());
231                }
232
233                self.0
234                    .driver
235                    .lock_user(user_id)
236                    .await
237                    .inspect_err(|_| warn!(%user_id, "Failed to lock user"))?;
238                Ok(())
239            }
240            FollowerMessage::LockStateUpdate {
241                user_id,
242                lock_state: LockState::Unlocked { user_key },
243            } => {
244                self.0.follower_sessions.upsert(endpoint.clone());
245
246                let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
247                if let LockState::Unlocked { .. } = self_lock_state {
248                    return Ok(());
249                }
250
251                self.0
252                    .driver
253                    .unlock_user(user_id, user_key.clone())
254                    .await
255                    .inspect_err(|_| warn!(%user_id, "Failed to unlock user"))?;
256                Ok(())
257            }
258            FollowerMessage::StartSession {
259                user_id,
260                lock_state,
261            } => {
262                self.0.follower_sessions.upsert(endpoint.clone());
263                let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
264
265                match (lock_state, self_lock_state.clone()) {
266                    (LockState::Unlocked { user_key }, LockState::Locked) => {
267                        self.0
268                            .driver
269                            .unlock_user(user_id, user_key.clone())
270                            .await
271                            .inspect_err(
272                                |_| warn!(%user_id, "Failed to unlock user during start session"),
273                            )?;
274                    }
275                    (LockState::Locked, LockState::Unlocked { .. }) => {
276                        let response = LeaderMessage::LockStateUpdate {
277                            user_id,
278                            lock_state: self_lock_state,
279                        };
280                        self.send_message(response, endpoint.clone()).await;
281                    }
282                    _ => {
283                        // States are already in sync, no action needed
284                    }
285                };
286
287                Ok(())
288            }
289            FollowerMessage::HeartBeat { user_id } => {
290                self.0.follower_sessions.upsert(endpoint.clone());
291
292                // Echo back the heartbeat to confirm liveness
293                let response = LeaderMessage::HeartBeat { user_id };
294                self.send_message(response, endpoint.clone()).await;
295
296                let lock_state = self.0.driver.get_user_lock_state(user_id).await;
297                // Ensure that if somehow the lockstate is desynced, it syncs again
298                let authoritative_lockstate_update = LeaderMessage::LockStateUpdate {
299                    user_id,
300                    lock_state,
301                };
302                self.send_message(authoritative_lockstate_update, endpoint.clone())
303                    .await;
304                Ok(())
305            }
306        }
307    }
308
309    /// Handles local device events and propagates authoritative updates to followers.
310    ///
311    /// Lock and unlock events are broadcast to active followers. Timer events prune stale
312    /// follower sessions that have not sent recent heartbeats.
313    pub async fn handle_device_event(&self, event: DeviceEvent) -> Result<(), ()> {
314        match event {
315            DeviceEvent::ManualLock { user_id } => {
316                let message = LeaderMessage::LockStateUpdate {
317                    user_id,
318                    lock_state: LockState::Locked,
319                };
320                self.broadcast_to_active_followers(message).await;
321            }
322            DeviceEvent::ManualUnlock {
323                user_id,
324                ref user_key,
325            } => {
326                let message = LeaderMessage::LockStateUpdate {
327                    user_id,
328                    lock_state: LockState::Unlocked {
329                        user_key: user_key.to_owned(),
330                    },
331                };
332                self.broadcast_to_active_followers(message).await;
333            }
334        }
335
336        Ok(())
337    }
338
339    async fn send_message(&self, message: LeaderMessage, recipient: Endpoint) {
340        if let Err(error) = self.0.ipc_client.send_typed(message, recipient).await {
341            tracing::error!(?error, "Failed to send shared unlock IPC message");
342        }
343    }
344}