1#[cfg(not(feature = "wasm"))]
2use std::time::Instant;
3use std::{
4 collections::HashMap,
5 ops::Sub,
6 sync::{Arc, Mutex},
7 time::Duration,
8};
9
10use bitwarden_error::bitwarden_error;
11use bitwarden_ipc::{Endpoint, IpcClient, IpcClientExt, SubscribeError, TypedIncomingMessage};
12use bitwarden_threading::cancellation_token;
13use thiserror::Error;
14use tracing::{info, warn};
15#[cfg(feature = "wasm")]
16use web_time::Instant;
17
18use crate::{
19 DeviceEvent, FollowerMessage, LeaderMessage, LockState, UserKey, drivers::SharedUnlockDriver,
20};
21
22const FOLLOWER_STALE_AFTER: Duration = Duration::from_secs(30);
23
24#[bitwarden_error(basic)]
26#[derive(Debug, Error)]
27#[error("Could not start shared unlock leader: {0}")]
28pub struct LeaderStartError(#[from] SubscribeError);
29
30struct FollowerSession {
31 last_seen_at: Instant,
32}
33
34struct FollowerSessions {
35 sessions: Mutex<HashMap<bitwarden_ipc::Endpoint, FollowerSession>>,
36}
37
38impl FollowerSessions {
39 fn new() -> Self {
40 Self {
41 sessions: Mutex::new(HashMap::new()),
42 }
43 }
44
45 fn upsert(&self, endpoint: bitwarden_ipc::Endpoint) {
46 let mut sessions = self
47 .sessions
48 .lock()
49 .unwrap_or_else(|poisoned| poisoned.into_inner());
50
51 if !sessions.contains_key(&endpoint) {
52 info!("Shared-Unlock client connected {:?}", endpoint);
53 }
54
55 sessions.insert(
56 endpoint,
57 FollowerSession {
58 last_seen_at: Instant::now(),
59 },
60 );
61 }
62
63 fn active_endpoints(&self) -> Vec<bitwarden_ipc::Endpoint> {
64 let sessions = self
65 .sessions
66 .lock()
67 .unwrap_or_else(|poisoned| poisoned.into_inner());
68
69 sessions.keys().cloned().collect()
70 }
71
72 fn prune_stale(&self, stale_after: Duration) {
73 let mut sessions = self
74 .sessions
75 .lock()
76 .unwrap_or_else(|poisoned| poisoned.into_inner());
77
78 let now = Instant::now();
79 for (endpoint, session) in sessions.iter() {
80 if now.sub(session.last_seen_at) > stale_after {
81 info!("Shared-Unlock client {:?} disconnected", endpoint);
82 }
83 }
84 sessions.retain(|_, session| now.sub(session.last_seen_at) <= stale_after);
85 }
86}
87
88pub struct Leader<L: SharedUnlockDriver>(Arc<InnerLeader<L>>);
90
91impl<L: SharedUnlockDriver> Clone for Leader<L> {
92 fn clone(&self) -> Self {
93 Self(self.0.clone())
94 }
95}
96
97struct InnerLeader<D: SharedUnlockDriver> {
101 driver: D,
102 follower_sessions: FollowerSessions,
103 ipc_client: Arc<dyn IpcClient>,
104}
105
106impl<D: SharedUnlockDriver + Send + Sync + 'static> Leader<D> {
107 pub fn create(lock_system: D, ipc_client: Arc<dyn IpcClient>) -> Self {
109 Self(Arc::new(InnerLeader {
110 driver: lock_system,
111 follower_sessions: FollowerSessions::new(),
112 ipc_client,
113 }))
114 }
115
116 pub async fn start(
118 &self,
119 cancellation_token: Option<cancellation_token::CancellationToken>,
120 ) -> Result<(), LeaderStartError> {
121 let cancellation_token = cancellation_token.unwrap_or_default();
122 let mut subscription = self
123 .0
124 .ipc_client
125 .subscribe_typed::<FollowerMessage>()
126 .await?;
127 let leader = self.clone();
128
129 let cancellation_token_clone = cancellation_token.clone();
130 let future = async move {
131 loop {
132 let result = subscription
133 .receive(Some(cancellation_token_clone.clone()))
134 .await;
135 match result {
136 Ok(message) => {
137 if let Err(error) = leader.receive_message(message).await {
138 tracing::error!(
139 ?error,
140 "Failed to handle shared unlock leader message"
141 );
142 }
143 }
144 Err(bitwarden_ipc::TypedReceiveError::Cancelled) => {
145 tracing::info!("Shared unlock leader stopped by cancellation");
146 break;
147 }
148 Err(error) => {
149 tracing::error!(?error, "Failed to receive shared unlock IPC message");
150 }
151 }
152 }
153 };
154
155 #[cfg(not(target_arch = "wasm32"))]
156 tokio::spawn(future);
157
158 #[cfg(target_arch = "wasm32")]
159 wasm_bindgen_futures::spawn_local(future);
160
161 let cancellation_token = cancellation_token.clone();
162 let leader = self.clone();
163 let timer_future = async move {
164 loop {
165 tokio::select! {
166 _ = cancellation_token.cancelled() => {
167 tracing::debug!("Shared unlock leader timer cancelled");
168 break;
169 }
170 _ = bitwarden_threading::time::sleep(crate::HEARTBEAT_INTERVAL) => {
171 leader.0
172 .follower_sessions
173 .prune_stale(FOLLOWER_STALE_AFTER);
174 }
175 }
176 }
177 };
178
179 #[cfg(not(target_arch = "wasm32"))]
180 tokio::spawn(timer_future);
181
182 #[cfg(target_arch = "wasm32")]
183 wasm_bindgen_futures::spawn_local(timer_future);
184
185 Ok(())
186 }
187
188 async fn broadcast_to_active_followers(&self, message: LeaderMessage) {
189 let endpoints = self.0.follower_sessions.active_endpoints();
190 for endpoint in endpoints {
191 self.send_message(message.clone(), endpoint).await;
192 }
193 }
194
195 pub async fn receive_message(
200 &self,
201 incoming_message: TypedIncomingMessage<FollowerMessage>,
202 ) -> Result<(), ()> {
203 let message = incoming_message.payload;
204 let sender = incoming_message.source;
205 let endpoint: bitwarden_ipc::Endpoint = sender.clone().into();
206
207 if let bitwarden_ipc::Source::Web { origin, .. } = &sender {
209 let user_id = message.user_id();
210 match self.0.driver.get_vault_url(user_id).await {
211 Some(user_vault_url) if origin == &user_vault_url => {}
212 Some(user_vault_url) => {
213 warn!(%origin, %user_vault_url, "IPC message origin does not match user's vault URL, ignoring message");
214 return Ok(());
215 }
216 None => {
217 warn!(%origin, "No vault URL found for user, ignoring message");
218 return Ok(());
219 }
220 }
221 }
222
223 match message {
224 FollowerMessage::LockStateUpdate {
225 user_id,
226 lock_state: LockState::Locked,
227 } => {
228 self.0.follower_sessions.upsert(endpoint.clone());
229
230 let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
231 if self_lock_state == LockState::Locked {
232 return Ok(());
233 }
234
235 self.0
236 .driver
237 .lock_user(user_id)
238 .await
239 .inspect_err(|_| warn!(%user_id, "Failed to lock user"))?;
240 Ok(())
241 }
242 FollowerMessage::LockStateUpdate {
243 user_id,
244 lock_state: LockState::Unlocked { user_key },
245 } => {
246 self.0.follower_sessions.upsert(endpoint.clone());
247
248 let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
249 if let LockState::Unlocked { .. } = self_lock_state {
250 return Ok(());
251 }
252
253 self.0
254 .driver
255 .unlock_user(user_id, user_key.clone())
256 .await
257 .inspect_err(|_| warn!(%user_id, "Failed to unlock user"))?;
258 Ok(())
259 }
260 FollowerMessage::StartSession {
261 user_id,
262 lock_state,
263 } => {
264 self.0.follower_sessions.upsert(endpoint.clone());
265 let self_lock_state = self.0.driver.get_user_lock_state(user_id).await;
266
267 match (lock_state, self_lock_state.clone()) {
268 (LockState::Unlocked { user_key }, LockState::Locked) => {
269 self.0
270 .driver
271 .unlock_user(user_id, user_key.clone())
272 .await
273 .inspect_err(
274 |_| warn!(%user_id, "Failed to unlock user during start session"),
275 )?;
276 }
277 (LockState::Locked, LockState::Unlocked { .. }) => {
278 let response = LeaderMessage::LockStateUpdate {
279 user_id,
280 lock_state: self_lock_state,
281 };
282 self.send_message(response, endpoint.clone()).await;
283 }
284 _ => {
285 }
287 };
288
289 Ok(())
290 }
291 FollowerMessage::HeartBeat { user_id } => {
292 self.0.follower_sessions.upsert(endpoint.clone());
293
294 let response = LeaderMessage::HeartBeat { user_id };
296 self.send_message(response, endpoint.clone()).await;
297
298 let lock_state = self.0.driver.get_user_lock_state(user_id).await;
299 let authoritative_lockstate_update = LeaderMessage::LockStateUpdate {
301 user_id,
302 lock_state,
303 };
304 self.send_message(authoritative_lockstate_update, endpoint.clone())
305 .await;
306 Ok(())
307 }
308 }
309 }
310
311 pub async fn handle_device_event(&self, event: DeviceEvent) -> Result<(), ()> {
316 match event {
317 DeviceEvent::ManualLock { user_id } => {
318 let message = LeaderMessage::LockStateUpdate {
319 user_id,
320 lock_state: LockState::Locked,
321 };
322 self.broadcast_to_active_followers(message).await;
323 }
324 DeviceEvent::ManualUnlock {
325 user_id,
326 ref user_key,
327 } => {
328 let message = LeaderMessage::LockStateUpdate {
329 user_id,
330 lock_state: LockState::Unlocked {
331 user_key: UserKey::from_bytes(user_key.to_owned()),
332 },
333 };
334 self.broadcast_to_active_followers(message).await;
335 }
336 }
337
338 Ok(())
339 }
340
341 async fn send_message(&self, message: LeaderMessage, recipient: Endpoint) {
342 if let Err(error) = self.0.ipc_client.send_typed(message, recipient).await {
343 tracing::error!(?error, "Failed to send shared unlock IPC message");
344 }
345 }
346}