1#[cfg(not(feature = "wasm"))]
2use std::time::Instant;
3use std::{
4 collections::HashMap,
5 ops::Sub,
6 sync::{Arc, Mutex},
7 time::Duration,
8};
9
10use bitwarden_error::bitwarden_error;
11use bitwarden_ipc::{Endpoint, IpcClient, IpcClientExt, SubscribeError, TypedIncomingMessage};
12use bitwarden_threading::{cancellation_token, time::sleep};
13use thiserror::Error;
14use tracing::{info, warn};
15#[cfg(feature = "wasm")]
16use web_time::Instant;
17
18use crate::{DeviceEvent, FollowerMessage, LeaderMessage, LockState, drivers::SharedUnlockDriver};
19
20const FOLLOWER_STALE_AFTER: Duration = Duration::from_secs(30);
21
22#[bitwarden_error(basic)]
24#[derive(Debug, Error)]
25#[error("Could not start shared unlock leader: {0}")]
26pub struct LeaderStartError(#[from] SubscribeError);
27
28struct FollowerSession {
29 last_seen_at: Instant,
30}
31
32struct FollowerSessions {
33 sessions: Mutex<HashMap<bitwarden_ipc::Endpoint, FollowerSession>>,
34}
35
36impl FollowerSessions {
37 fn new() -> Self {
38 Self {
39 sessions: Mutex::new(HashMap::new()),
40 }
41 }
42
43 fn upsert(&self, endpoint: bitwarden_ipc::Endpoint) {
44 let mut sessions = self
45 .sessions
46 .lock()
47 .unwrap_or_else(|poisoned| poisoned.into_inner());
48
49 if !sessions.contains_key(&endpoint) {
50 info!("Shared-Unlock client connected {:?}", endpoint);
51 }
52
53 sessions.insert(
54 endpoint,
55 FollowerSession {
56 last_seen_at: Instant::now(),
57 },
58 );
59 }
60
61 fn active_endpoints(&self) -> Vec<bitwarden_ipc::Endpoint> {
62 let sessions = self
63 .sessions
64 .lock()
65 .unwrap_or_else(|poisoned| poisoned.into_inner());
66
67 sessions.keys().cloned().collect()
68 }
69
70 fn prune_stale(&self, stale_after: Duration) {
71 let mut sessions = self
72 .sessions
73 .lock()
74 .unwrap_or_else(|poisoned| poisoned.into_inner());
75
76 let now = Instant::now();
77 for (endpoint, session) in sessions.iter() {
78 if now.sub(session.last_seen_at) > stale_after {
79 info!("Shared-Unlock client {:?} disconnected", endpoint);
80 }
81 }
82 sessions.retain(|_, session| now.sub(session.last_seen_at) <= stale_after);
83 }
84}
85
86pub struct Leader<L: SharedUnlockDriver>(Arc<InnerLeader<L>>);
88
89impl<L: SharedUnlockDriver> Clone for Leader<L> {
90 fn clone(&self) -> Self {
91 Self(self.0.clone())
92 }
93}
94
95struct InnerLeader<D: SharedUnlockDriver> {
99 driver: D,
100 follower_sessions: FollowerSessions,
101 ipc_client: Arc<dyn IpcClient>,
102}
103
104impl<D: SharedUnlockDriver + Send + Sync + 'static> Leader<D> {
105 pub fn create(lock_system: D, ipc_client: Arc<dyn IpcClient>) -> Self {
107 Self(Arc::new(InnerLeader {
108 driver: lock_system,
109 follower_sessions: FollowerSessions::new(),
110 ipc_client,
111 }))
112 }
113
114 pub async fn start(
116 &self,
117 cancellation_token: Option<cancellation_token::CancellationToken>,
118 ) -> Result<(), LeaderStartError> {
119 let cancellation_token = cancellation_token.unwrap_or_default();
120 let mut subscription = self
121 .0
122 .ipc_client
123 .subscribe_typed::<FollowerMessage>()
124 .await?;
125 let leader = self.clone();
126
127 let cancellation_token_clone = cancellation_token.clone();
128 let future = async move {
129 loop {
130 let result = subscription
131 .receive(Some(cancellation_token_clone.clone()))
132 .await;
133 match result {
134 Ok(message) => {
135 if let Err(error) = leader.receive_message(message).await {
136 tracing::error!(
137 ?error,
138 "Failed to handle shared unlock leader message"
139 );
140 }
141 }
142 Err(bitwarden_ipc::TypedReceiveError::Cancelled) => {
143 tracing::info!("Shared unlock leader stopped by cancellation");
144 break;
145 }
146 Err(bitwarden_ipc::TypedReceiveError::Channel(
148 tokio::sync::broadcast::error::RecvError::Closed,
149 )) => {
150 tracing::info!("Transport channel closed. Waiting for it to open");
151 sleep(std::time::Duration::from_secs(1)).await;
152 }
153 Err(error) => {
154 tracing::error!(?error, "Failed to receive shared unlock IPC message");
155 }
156 }
157 }
158 };
159
160 #[cfg(not(target_arch = "wasm32"))]
161 tokio::spawn(future);
162
163 #[cfg(target_arch = "wasm32")]
164 wasm_bindgen_futures::spawn_local(future);
165
166 let cancellation_token = cancellation_token.clone();
167 let leader = self.clone();
168 let timer_future = async move {
169 loop {
170 tokio::select! {
171 _ = cancellation_token.cancelled() => {
172 tracing::debug!("Shared unlock leader timer cancelled");
173 break;
174 }
175 _ = bitwarden_threading::time::sleep(crate::HEARTBEAT_INTERVAL) => {
176 leader.0
177 .follower_sessions
178 .prune_stale(FOLLOWER_STALE_AFTER);
179 }
180 }
181 }
182 };
183
184 #[cfg(not(target_arch = "wasm32"))]
185 tokio::spawn(timer_future);
186
187 #[cfg(target_arch = "wasm32")]
188 wasm_bindgen_futures::spawn_local(timer_future);
189
190 Ok(())
191 }
192
193 async fn broadcast_to_active_followers(&self, message: LeaderMessage) {
194 let endpoints = self.0.follower_sessions.active_endpoints();
195 for endpoint in endpoints {
196 self.send_message(message.clone(), endpoint).await;
197 }
198 }
199
200 pub async fn receive_message(
205 &self,
206 incoming_message: TypedIncomingMessage<FollowerMessage>,
207 ) -> Result<(), ()> {
208 let message = incoming_message.payload;
209 let sender = incoming_message.source;
210 let endpoint: bitwarden_ipc::Endpoint = sender.clone().into();
211
212 if let bitwarden_ipc::Source::Web { origin, .. } = &sender {
214 let user_id = message.user_id();
215 match self.0.driver.get_vault_url(user_id).await {
216 Some(user_vault_url) if origin == &user_vault_url => {}
217 Some(user_vault_url) => {
218 warn!(%origin, %user_vault_url, "IPC message origin does not match user's vault URL, ignoring message");
219 return Ok(());
220 }
221 None => {
222 warn!(%origin, "No vault URL found for user, ignoring message");
223 return Ok(());
224 }
225 }
226 }
227
228 match message {
229 FollowerMessage::LockStateUpdate {
230 user_id,
231 lock_state: LockState::Locked,
232 } => {
233 self.0.follower_sessions.upsert(endpoint.clone());
234
235 let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
236 if self_lock_state == LockState::Locked {
237 return Ok(());
238 }
239
240 self.0
241 .driver
242 .lock_user(user_id)
243 .await
244 .inspect_err(|_| warn!(%user_id, "Failed to lock user"))?;
245 Ok(())
246 }
247 FollowerMessage::LockStateUpdate {
248 user_id,
249 lock_state: LockState::Unlocked { user_key },
250 } => {
251 self.0.follower_sessions.upsert(endpoint.clone());
252
253 let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
254 if let LockState::Unlocked { .. } = self_lock_state {
255 return Ok(());
256 }
257
258 self.0
259 .driver
260 .unlock_user(user_id, user_key.clone())
261 .await
262 .inspect_err(|_| warn!(%user_id, "Failed to unlock user"))?;
263 Ok(())
264 }
265 FollowerMessage::StartSession {
266 user_id,
267 lock_state,
268 } => {
269 self.0.follower_sessions.upsert(endpoint.clone());
270 let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
271
272 match (lock_state, self_lock_state.clone()) {
273 (LockState::Unlocked { user_key }, LockState::Locked) => {
274 self.0
275 .driver
276 .unlock_user(user_id, user_key.clone())
277 .await
278 .inspect_err(
279 |_| warn!(%user_id, "Failed to unlock user during start session"),
280 )?;
281 }
282 (LockState::Locked, LockState::Unlocked { .. }) => {
283 let response = LeaderMessage::LockStateUpdate {
284 user_id,
285 lock_state: self_lock_state,
286 };
287 self.send_message(response, endpoint.clone()).await;
288 }
289 _ => {
290 }
292 };
293
294 Ok(())
295 }
296 FollowerMessage::HeartBeat { user_id } => {
297 if self
298 .0
299 .follower_sessions
300 .sessions
301 .lock()
302 .unwrap_or_else(|poisoned| poisoned.into_inner())
303 .get(&endpoint)
304 .is_none()
305 {
306 tracing::info!(
307 "Received heartbeat from unknown shared-unlock client {:?}, asking for session start",
308 endpoint
309 );
310 let response = LeaderMessage::RequestSessionStart { user_id };
311 self.send_message(response, endpoint.clone()).await;
312 return Ok(());
313 }
314
315 self.0.follower_sessions.upsert(endpoint.clone());
316
317 let response = LeaderMessage::HeartBeat { user_id };
319 self.send_message(response, endpoint.clone()).await;
320
321 let lock_state = self.0.driver.get_user_lock_state(user_id).await;
322 let authoritative_lockstate_update = LeaderMessage::LockStateUpdate {
324 user_id,
325 lock_state,
326 };
327 self.send_message(authoritative_lockstate_update, endpoint.clone())
328 .await;
329 Ok(())
330 }
331 }
332 }
333
334 pub async fn handle_device_event(&self, event: DeviceEvent) -> Result<(), ()> {
339 match event {
340 DeviceEvent::ManualLock { user_id } => {
341 let message = LeaderMessage::LockStateUpdate {
342 user_id,
343 lock_state: LockState::Locked,
344 };
345 self.broadcast_to_active_followers(message).await;
346 }
347 DeviceEvent::ManualUnlock {
348 user_id,
349 ref user_key,
350 } => {
351 let message = LeaderMessage::LockStateUpdate {
352 user_id,
353 lock_state: LockState::Unlocked {
354 user_key: user_key.to_owned(),
355 },
356 };
357 self.broadcast_to_active_followers(message).await;
358 }
359 }
360
361 Ok(())
362 }
363
364 async fn send_message(&self, message: LeaderMessage, recipient: Endpoint) {
365 if let Err(error) = self.0.ipc_client.send_typed(message, recipient).await {
366 tracing::error!(?error, "Failed to send shared unlock IPC message");
367 }
368 }
369}