Skip to main content

bitwarden_shared_unlock/
leader.rs

1#[cfg(not(feature = "wasm"))]
2use std::time::Instant;
3use std::{
4    collections::HashMap,
5    ops::Sub,
6    sync::{Arc, Mutex},
7    time::Duration,
8};
9
10use bitwarden_error::bitwarden_error;
11use bitwarden_ipc::{Endpoint, IpcClient, IpcClientExt, SubscribeError, TypedIncomingMessage};
12use bitwarden_threading::cancellation_token;
13use thiserror::Error;
14use tracing::{info, warn};
15#[cfg(feature = "wasm")]
16use web_time::Instant;
17
18use crate::{
19    DeviceEvent, FollowerMessage, LeaderMessage, LockState, UserKey, drivers::SharedUnlockDriver,
20};
21
22const FOLLOWER_STALE_AFTER: Duration = Duration::from_secs(30);
23
24/// Error type for failure to start the shared unlock leader.
25#[bitwarden_error(basic)]
26#[derive(Debug, Error)]
27#[error("Could not start shared unlock leader: {0}")]
28pub struct LeaderStartError(#[from] SubscribeError);
29
30struct FollowerSession {
31    last_seen_at: Instant,
32}
33
34struct FollowerSessions {
35    sessions: Mutex<HashMap<bitwarden_ipc::Endpoint, FollowerSession>>,
36}
37
38impl FollowerSessions {
39    fn new() -> Self {
40        Self {
41            sessions: Mutex::new(HashMap::new()),
42        }
43    }
44
45    fn upsert(&self, endpoint: bitwarden_ipc::Endpoint) {
46        let mut sessions = self
47            .sessions
48            .lock()
49            .unwrap_or_else(|poisoned| poisoned.into_inner());
50
51        if !sessions.contains_key(&endpoint) {
52            info!("Shared-Unlock client connected {:?}", endpoint);
53        }
54
55        sessions.insert(
56            endpoint,
57            FollowerSession {
58                last_seen_at: Instant::now(),
59            },
60        );
61    }
62
63    fn active_endpoints(&self) -> Vec<bitwarden_ipc::Endpoint> {
64        let sessions = self
65            .sessions
66            .lock()
67            .unwrap_or_else(|poisoned| poisoned.into_inner());
68
69        sessions.keys().cloned().collect()
70    }
71
72    fn prune_stale(&self, stale_after: Duration) {
73        let mut sessions = self
74            .sessions
75            .lock()
76            .unwrap_or_else(|poisoned| poisoned.into_inner());
77
78        let now = Instant::now();
79        for (endpoint, session) in sessions.iter() {
80            if now.sub(session.last_seen_at) > stale_after {
81                info!("Shared-Unlock client {:?} disconnected", endpoint);
82            }
83        }
84        sessions.retain(|_, session| now.sub(session.last_seen_at) <= stale_after);
85    }
86}
87
88/// Coordinates shared unlock state as the authoritative endpoint for followers.
89pub struct Leader<L: SharedUnlockDriver>(Arc<InnerLeader<L>>);
90
91impl<L: SharedUnlockDriver> Clone for Leader<L> {
92    fn clone(&self) -> Self {
93        Self(self.0.clone())
94    }
95}
96
97/// Inner implementation of the shared unlock leader, containing the actual state and logic. The
98/// outer `Leader` struct is a thin wrapper around an `Arc` to allow for shared ownership across
99/// async tasks.
100struct InnerLeader<D: SharedUnlockDriver> {
101    driver: D,
102    follower_sessions: FollowerSessions,
103    ipc_client: Arc<dyn IpcClient>,
104}
105
106impl<D: SharedUnlockDriver + Send + Sync + 'static> Leader<D> {
107    /// Creates a leader instance for the shared unlock protocol.
108    pub fn create(lock_system: D, ipc_client: Arc<dyn IpcClient>) -> Self {
109        Self(Arc::new(InnerLeader {
110            driver: lock_system,
111            follower_sessions: FollowerSessions::new(),
112            ipc_client,
113        }))
114    }
115
116    /// Starts background processing for incoming follower messages.
117    pub async fn start(
118        &self,
119        cancellation_token: Option<cancellation_token::CancellationToken>,
120    ) -> Result<(), LeaderStartError> {
121        let cancellation_token = cancellation_token.unwrap_or_default();
122        let mut subscription = self
123            .0
124            .ipc_client
125            .subscribe_typed::<FollowerMessage>()
126            .await?;
127        let leader = self.clone();
128
129        let cancellation_token_clone = cancellation_token.clone();
130        let future = async move {
131            loop {
132                let result = subscription
133                    .receive(Some(cancellation_token_clone.clone()))
134                    .await;
135                match result {
136                    Ok(message) => {
137                        if let Err(error) = leader.receive_message(message).await {
138                            tracing::error!(
139                                ?error,
140                                "Failed to handle shared unlock leader message"
141                            );
142                        }
143                    }
144                    Err(bitwarden_ipc::TypedReceiveError::Cancelled) => {
145                        tracing::info!("Shared unlock leader stopped by cancellation");
146                        break;
147                    }
148                    Err(error) => {
149                        tracing::error!(?error, "Failed to receive shared unlock IPC message");
150                    }
151                }
152            }
153        };
154
155        #[cfg(not(target_arch = "wasm32"))]
156        tokio::spawn(future);
157
158        #[cfg(target_arch = "wasm32")]
159        wasm_bindgen_futures::spawn_local(future);
160
161        let cancellation_token = cancellation_token.clone();
162        let leader = self.clone();
163        let timer_future = async move {
164            loop {
165                tokio::select! {
166                    _ = cancellation_token.cancelled() => {
167                        tracing::debug!("Shared unlock leader timer cancelled");
168                        break;
169                    }
170                    _ = bitwarden_threading::time::sleep(crate::HEARTBEAT_INTERVAL) => {
171                        leader.0
172                            .follower_sessions
173                            .prune_stale(FOLLOWER_STALE_AFTER);
174                    }
175                }
176            }
177        };
178
179        #[cfg(not(target_arch = "wasm32"))]
180        tokio::spawn(timer_future);
181
182        #[cfg(target_arch = "wasm32")]
183        wasm_bindgen_futures::spawn_local(timer_future);
184
185        Ok(())
186    }
187
188    async fn broadcast_to_active_followers(&self, message: LeaderMessage) {
189        let endpoints = self.0.follower_sessions.active_endpoints();
190        for endpoint in endpoints {
191            self.send_message(message.clone(), endpoint).await;
192        }
193    }
194
195    /// Handles a message sent by a follower.
196    ///
197    /// This updates follower session liveness, validates web message origins against the
198    /// follower user's vault URL, and applies lock state changes when needed.
199    pub async fn receive_message(
200        &self,
201        incoming_message: TypedIncomingMessage<FollowerMessage>,
202    ) -> Result<(), ()> {
203        let message = incoming_message.payload;
204        let sender = incoming_message.source;
205        let endpoint: bitwarden_ipc::Endpoint = sender.clone().into();
206
207        // Validate the origin of web sources against the user's vault URL
208        if let bitwarden_ipc::Source::Web { origin, .. } = &sender {
209            let user_id = message.user_id();
210            match self.0.driver.get_vault_url(user_id).await {
211                Some(user_vault_url) if origin == &user_vault_url => {}
212                Some(user_vault_url) => {
213                    warn!(%origin, %user_vault_url, "IPC message origin does not match user's vault URL, ignoring message");
214                    return Ok(());
215                }
216                None => {
217                    warn!(%origin, "No vault URL found for user, ignoring message");
218                    return Ok(());
219                }
220            }
221        }
222
223        match message {
224            FollowerMessage::LockStateUpdate {
225                user_id,
226                lock_state: LockState::Locked,
227            } => {
228                self.0.follower_sessions.upsert(endpoint.clone());
229
230                let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
231                if self_lock_state == LockState::Locked {
232                    return Ok(());
233                }
234
235                self.0
236                    .driver
237                    .lock_user(user_id)
238                    .await
239                    .inspect_err(|_| warn!(%user_id, "Failed to lock user"))?;
240                Ok(())
241            }
242            FollowerMessage::LockStateUpdate {
243                user_id,
244                lock_state: LockState::Unlocked { user_key },
245            } => {
246                self.0.follower_sessions.upsert(endpoint.clone());
247
248                let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
249                if let LockState::Unlocked { .. } = self_lock_state {
250                    return Ok(());
251                }
252
253                self.0
254                    .driver
255                    .unlock_user(user_id, user_key.clone())
256                    .await
257                    .inspect_err(|_| warn!(%user_id, "Failed to unlock user"))?;
258                Ok(())
259            }
260            FollowerMessage::StartSession {
261                user_id,
262                lock_state,
263            } => {
264                self.0.follower_sessions.upsert(endpoint.clone());
265                let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
266
267                match (lock_state, self_lock_state.clone()) {
268                    (LockState::Unlocked { user_key }, LockState::Locked) => {
269                        self.0
270                            .driver
271                            .unlock_user(user_id, user_key.clone())
272                            .await
273                            .inspect_err(
274                                |_| warn!(%user_id, "Failed to unlock user during start session"),
275                            )?;
276                    }
277                    (LockState::Locked, LockState::Unlocked { .. }) => {
278                        let response = LeaderMessage::LockStateUpdate {
279                            user_id,
280                            lock_state: self_lock_state,
281                        };
282                        self.send_message(response, endpoint.clone()).await;
283                    }
284                    _ => {
285                        // States are already in sync, no action needed
286                    }
287                };
288
289                Ok(())
290            }
291            FollowerMessage::HeartBeat { user_id } => {
292                self.0.follower_sessions.upsert(endpoint.clone());
293
294                // Echo back the heartbeat to confirm liveness
295                let response = LeaderMessage::HeartBeat { user_id };
296                self.send_message(response, endpoint.clone()).await;
297
298                let lock_state = self.0.driver.get_user_lock_state(user_id).await;
299                // Ensure that if somehow the lockstate is desynced, it syncs again
300                let authoritative_lockstate_update = LeaderMessage::LockStateUpdate {
301                    user_id,
302                    lock_state,
303                };
304                self.send_message(authoritative_lockstate_update, endpoint.clone())
305                    .await;
306                Ok(())
307            }
308        }
309    }
310
311    /// Handles local device events and propagates authoritative updates to followers.
312    ///
313    /// Lock and unlock events are broadcast to active followers. Timer events prune stale
314    /// follower sessions that have not sent recent heartbeats.
315    pub async fn handle_device_event(&self, event: DeviceEvent) -> Result<(), ()> {
316        match event {
317            DeviceEvent::ManualLock { user_id } => {
318                let message = LeaderMessage::LockStateUpdate {
319                    user_id,
320                    lock_state: LockState::Locked,
321                };
322                self.broadcast_to_active_followers(message).await;
323            }
324            DeviceEvent::ManualUnlock {
325                user_id,
326                ref user_key,
327            } => {
328                let message = LeaderMessage::LockStateUpdate {
329                    user_id,
330                    lock_state: LockState::Unlocked {
331                        user_key: UserKey::from_bytes(user_key.to_owned()),
332                    },
333                };
334                self.broadcast_to_active_followers(message).await;
335            }
336        }
337
338        Ok(())
339    }
340
341    async fn send_message(&self, message: LeaderMessage, recipient: Endpoint) {
342        if let Err(error) = self.0.ipc_client.send_typed(message, recipient).await {
343            tracing::error!(?error, "Failed to send shared unlock IPC message");
344        }
345    }
346}