1#[cfg(not(feature = "wasm"))]
2use std::time::Instant;
3use std::{
4 collections::HashMap,
5 ops::Sub,
6 sync::{Arc, Mutex},
7 time::Duration,
8};
9
10use bitwarden_error::bitwarden_error;
11use bitwarden_ipc::{Endpoint, IpcClient, IpcClientExt, SubscribeError, TypedIncomingMessage};
12use bitwarden_threading::cancellation_token;
13use thiserror::Error;
14use tracing::{info, warn};
15#[cfg(feature = "wasm")]
16use web_time::Instant;
17
18use crate::{DeviceEvent, FollowerMessage, LeaderMessage, LockState, drivers::SharedUnlockDriver};
19
20const FOLLOWER_STALE_AFTER: Duration = Duration::from_secs(30);
21
22#[bitwarden_error(basic)]
24#[derive(Debug, Error)]
25#[error("Could not start shared unlock leader: {0}")]
26pub struct LeaderStartError(#[from] SubscribeError);
27
28struct FollowerSession {
29 last_seen_at: Instant,
30}
31
32struct FollowerSessions {
33 sessions: Mutex<HashMap<bitwarden_ipc::Endpoint, FollowerSession>>,
34}
35
36impl FollowerSessions {
37 fn new() -> Self {
38 Self {
39 sessions: Mutex::new(HashMap::new()),
40 }
41 }
42
43 fn upsert(&self, endpoint: bitwarden_ipc::Endpoint) {
44 let mut sessions = self
45 .sessions
46 .lock()
47 .unwrap_or_else(|poisoned| poisoned.into_inner());
48
49 if !sessions.contains_key(&endpoint) {
50 info!("Shared-Unlock client connected {:?}", endpoint);
51 }
52
53 sessions.insert(
54 endpoint,
55 FollowerSession {
56 last_seen_at: Instant::now(),
57 },
58 );
59 }
60
61 fn active_endpoints(&self) -> Vec<bitwarden_ipc::Endpoint> {
62 let sessions = self
63 .sessions
64 .lock()
65 .unwrap_or_else(|poisoned| poisoned.into_inner());
66
67 sessions.keys().cloned().collect()
68 }
69
70 fn prune_stale(&self, stale_after: Duration) {
71 let mut sessions = self
72 .sessions
73 .lock()
74 .unwrap_or_else(|poisoned| poisoned.into_inner());
75
76 let now = Instant::now();
77 for (endpoint, session) in sessions.iter() {
78 if now.sub(session.last_seen_at) > stale_after {
79 info!("Shared-Unlock client {:?} disconnected", endpoint);
80 }
81 }
82 sessions.retain(|_, session| now.sub(session.last_seen_at) <= stale_after);
83 }
84}
85
86pub struct Leader<L: SharedUnlockDriver>(Arc<InnerLeader<L>>);
88
89impl<L: SharedUnlockDriver> Clone for Leader<L> {
90 fn clone(&self) -> Self {
91 Self(self.0.clone())
92 }
93}
94
95struct InnerLeader<D: SharedUnlockDriver> {
99 driver: D,
100 follower_sessions: FollowerSessions,
101 ipc_client: Arc<dyn IpcClient>,
102}
103
104impl<D: SharedUnlockDriver + Send + Sync + 'static> Leader<D> {
105 pub fn create(lock_system: D, ipc_client: Arc<dyn IpcClient>) -> Self {
107 Self(Arc::new(InnerLeader {
108 driver: lock_system,
109 follower_sessions: FollowerSessions::new(),
110 ipc_client,
111 }))
112 }
113
114 pub async fn start(
116 &self,
117 cancellation_token: Option<cancellation_token::CancellationToken>,
118 ) -> Result<(), LeaderStartError> {
119 let cancellation_token = cancellation_token.unwrap_or_default();
120 let mut subscription = self
121 .0
122 .ipc_client
123 .subscribe_typed::<FollowerMessage>()
124 .await?;
125 let leader = self.clone();
126
127 let cancellation_token_clone = cancellation_token.clone();
128 let future = async move {
129 loop {
130 let result = subscription
131 .receive(Some(cancellation_token_clone.clone()))
132 .await;
133 match result {
134 Ok(message) => {
135 if let Err(error) = leader.receive_message(message).await {
136 tracing::error!(
137 ?error,
138 "Failed to handle shared unlock leader message"
139 );
140 }
141 }
142 Err(bitwarden_ipc::TypedReceiveError::Cancelled) => {
143 tracing::info!("Shared unlock leader stopped by cancellation");
144 break;
145 }
146 Err(error) => {
147 tracing::error!(?error, "Failed to receive shared unlock IPC message");
148 }
149 }
150 }
151 };
152
153 #[cfg(not(target_arch = "wasm32"))]
154 tokio::spawn(future);
155
156 #[cfg(target_arch = "wasm32")]
157 wasm_bindgen_futures::spawn_local(future);
158
159 let cancellation_token = cancellation_token.clone();
160 let leader = self.clone();
161 let timer_future = async move {
162 loop {
163 tokio::select! {
164 _ = cancellation_token.cancelled() => {
165 tracing::debug!("Shared unlock leader timer cancelled");
166 break;
167 }
168 _ = bitwarden_threading::time::sleep(crate::HEARTBEAT_INTERVAL) => {
169 leader.0
170 .follower_sessions
171 .prune_stale(FOLLOWER_STALE_AFTER);
172 }
173 }
174 }
175 };
176
177 #[cfg(not(target_arch = "wasm32"))]
178 tokio::spawn(timer_future);
179
180 #[cfg(target_arch = "wasm32")]
181 wasm_bindgen_futures::spawn_local(timer_future);
182
183 Ok(())
184 }
185
186 async fn broadcast_to_active_followers(&self, message: LeaderMessage) {
187 let endpoints = self.0.follower_sessions.active_endpoints();
188 for endpoint in endpoints {
189 self.send_message(message.clone(), endpoint).await;
190 }
191 }
192
193 pub async fn receive_message(
198 &self,
199 incoming_message: TypedIncomingMessage<FollowerMessage>,
200 ) -> Result<(), ()> {
201 let message = incoming_message.payload;
202 let sender = incoming_message.source;
203 let endpoint: bitwarden_ipc::Endpoint = sender.clone().into();
204
205 if let bitwarden_ipc::Source::Web { origin, .. } = &sender {
207 let user_id = message.user_id();
208 match self.0.driver.get_vault_url(user_id).await {
209 Some(user_vault_url) if origin == &user_vault_url => {}
210 Some(user_vault_url) => {
211 warn!(%origin, %user_vault_url, "IPC message origin does not match user's vault URL, ignoring message");
212 return Ok(());
213 }
214 None => {
215 warn!(%origin, "No vault URL found for user, ignoring message");
216 return Ok(());
217 }
218 }
219 }
220
221 match message {
222 FollowerMessage::LockStateUpdate {
223 user_id,
224 lock_state: LockState::Locked,
225 } => {
226 self.0.follower_sessions.upsert(endpoint.clone());
227
228 let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
229 if self_lock_state == LockState::Locked {
230 return Ok(());
231 }
232
233 self.0
234 .driver
235 .lock_user(user_id)
236 .await
237 .inspect_err(|_| warn!(%user_id, "Failed to lock user"))?;
238 Ok(())
239 }
240 FollowerMessage::LockStateUpdate {
241 user_id,
242 lock_state: LockState::Unlocked { user_key },
243 } => {
244 self.0.follower_sessions.upsert(endpoint.clone());
245
246 let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
247 if let LockState::Unlocked { .. } = self_lock_state {
248 return Ok(());
249 }
250
251 self.0
252 .driver
253 .unlock_user(user_id, user_key.clone())
254 .await
255 .inspect_err(|_| warn!(%user_id, "Failed to unlock user"))?;
256 Ok(())
257 }
258 FollowerMessage::StartSession {
259 user_id,
260 lock_state,
261 } => {
262 self.0.follower_sessions.upsert(endpoint.clone());
263 let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
264
265 match (lock_state, self_lock_state.clone()) {
266 (LockState::Unlocked { user_key }, LockState::Locked) => {
267 self.0
268 .driver
269 .unlock_user(user_id, user_key.clone())
270 .await
271 .inspect_err(
272 |_| warn!(%user_id, "Failed to unlock user during start session"),
273 )?;
274 }
275 (LockState::Locked, LockState::Unlocked { .. }) => {
276 let response = LeaderMessage::LockStateUpdate {
277 user_id,
278 lock_state: self_lock_state,
279 };
280 self.send_message(response, endpoint.clone()).await;
281 }
282 _ => {
283 }
285 };
286
287 Ok(())
288 }
289 FollowerMessage::HeartBeat { user_id } => {
290 self.0.follower_sessions.upsert(endpoint.clone());
291
292 let response = LeaderMessage::HeartBeat { user_id };
294 self.send_message(response, endpoint.clone()).await;
295
296 let lock_state = self.0.driver.get_user_lock_state(user_id).await;
297 let authoritative_lockstate_update = LeaderMessage::LockStateUpdate {
299 user_id,
300 lock_state,
301 };
302 self.send_message(authoritative_lockstate_update, endpoint.clone())
303 .await;
304 Ok(())
305 }
306 }
307 }
308
309 pub async fn handle_device_event(&self, event: DeviceEvent) -> Result<(), ()> {
314 match event {
315 DeviceEvent::ManualLock { user_id } => {
316 let message = LeaderMessage::LockStateUpdate {
317 user_id,
318 lock_state: LockState::Locked,
319 };
320 self.broadcast_to_active_followers(message).await;
321 }
322 DeviceEvent::ManualUnlock {
323 user_id,
324 ref user_key,
325 } => {
326 let message = LeaderMessage::LockStateUpdate {
327 user_id,
328 lock_state: LockState::Unlocked {
329 user_key: user_key.to_owned(),
330 },
331 };
332 self.broadcast_to_active_followers(message).await;
333 }
334 }
335
336 Ok(())
337 }
338
339 async fn send_message(&self, message: LeaderMessage, recipient: Endpoint) {
340 if let Err(error) = self.0.ipc_client.send_typed(message, recipient).await {
341 tracing::error!(?error, "Failed to send shared unlock IPC message");
342 }
343 }
344}