bitwarden_shared_unlock/
follower.rs1use std::{ops::Add, sync::Arc};
2
3use bitwarden_error::bitwarden_error;
4use bitwarden_ipc::{Endpoint, IpcClient, IpcClientExt, SubscribeError, TypedIncomingMessage};
5use bitwarden_threading::cancellation_token;
6use thiserror::Error;
7
8use crate::{DeviceEvent, FollowerMessage, LeaderMessage, LockState, drivers::SharedUnlockDriver};
9
10#[bitwarden_error(basic)]
12#[derive(Debug, Error)]
13#[error("Could not start shared unlock follower: {0}")]
14pub struct FollowerStartError(#[from] SubscribeError);
15
16pub struct Follower<L: SharedUnlockDriver>(Arc<InnerFollower<L>>);
18
19impl<L: SharedUnlockDriver> Clone for Follower<L> {
20 fn clone(&self) -> Self {
21 Self(self.0.clone())
22 }
23}
24
25struct InnerFollower<D: SharedUnlockDriver> {
29 driver: D,
30 ipc_client: Arc<dyn IpcClient>,
31}
32
33impl<L: SharedUnlockDriver + Send + Sync + 'static> Follower<L> {
34 pub fn create(driver: L, ipc_client: Arc<dyn IpcClient>) -> Self {
39 Self(Arc::new(InnerFollower { driver, ipc_client }))
40 }
41
42 pub(crate) async fn start_sessions(&self) {
43 let users: Vec<bitwarden_core::UserId> = self.0.driver.list_users().await;
44 let leader = self
45 .0
46 .driver
47 .discover_leader()
48 .await
49 .expect("leader discovery should return a leader");
50
51 for user_id in users {
52 let lock_state = self.0.driver.get_user_lock_state(user_id).await;
53 let message = FollowerMessage::StartSession {
54 user_id,
55 lock_state,
56 };
57 self.send_message(message, leader.clone()).await;
58 }
59 }
60
61 pub async fn start(
63 &self,
64 cancellation_token: Option<cancellation_token::CancellationToken>,
65 ) -> Result<(), FollowerStartError> {
66 let cancellation_token = cancellation_token.unwrap_or_default();
67 let mut subscription = self.0.ipc_client.subscribe_typed::<LeaderMessage>().await?;
68 let follower = self.clone();
69
70 let cancellation_token_clone = cancellation_token.clone();
71 let future = async move {
72 loop {
73 let result = subscription
74 .receive(Some(cancellation_token_clone.clone()))
75 .await;
76 match result {
77 Ok(message) => {
78 if let Err(error) = follower.receive_message(message).await {
79 tracing::error!(
80 ?error,
81 "Failed to handle shared unlock follower message"
82 );
83 }
84 }
85 Err(bitwarden_ipc::TypedReceiveError::Cancelled) => {
86 tracing::info!("Shared unlock follower stopped by cancellation");
87 break;
88 }
89 Err(error) => {
90 tracing::error!(?error, "Failed to receive shared unlock IPC message");
91 }
92 }
93 }
94 };
95
96 #[cfg(not(target_arch = "wasm32"))]
97 tokio::spawn(future);
98
99 #[cfg(target_arch = "wasm32")]
100 wasm_bindgen_futures::spawn_local(future);
101
102 let cancellation_token = cancellation_token.clone();
103 let follower = self.clone();
104 let timer_future = async move {
105 loop {
106 tokio::select! {
107 _ = cancellation_token.cancelled() => {
108 tracing::debug!("Shared unlock follower timer cancelled");
109 break;
110 }
111 _ = bitwarden_threading::time::sleep(crate::HEARTBEAT_INTERVAL) => {
112 if let Some(leader) = follower.0.driver.discover_leader().await {
113 for user_id in follower.0.driver.list_users().await {
115 let message = FollowerMessage::HeartBeat { user_id };
116 follower.send_message(message, leader.clone()).await;
117 }
118 }
119 }
120 }
121 }
122 };
123
124 #[cfg(not(target_arch = "wasm32"))]
125 tokio::spawn(timer_future);
126
127 #[cfg(target_arch = "wasm32")]
128 wasm_bindgen_futures::spawn_local(timer_future);
129
130 self.start_sessions().await;
131 Ok(())
132 }
133
134 pub async fn receive_message(
139 &self,
140 incoming_message: TypedIncomingMessage<LeaderMessage>,
141 ) -> Result<(), ()> {
142 let message = incoming_message.payload;
143 match message {
144 LeaderMessage::LockStateUpdate {
145 user_id,
146 lock_state,
147 } => {
148 let current_state = self.0.driver.get_user_lock_state(user_id).await;
151
152 match (current_state, lock_state) {
153 (LockState::Unlocked { .. }, LockState::Locked) => {
154 self.0.driver.lock_user(user_id).await?;
158 }
159 (LockState::Locked, LockState::Unlocked { user_key }) => {
160 self.0.driver.unlock_user(user_id, user_key).await?;
164 }
165 (LockState::Locked, LockState::Locked)
166 | (LockState::Unlocked { .. }, LockState::Unlocked { .. }) => {
167 }
170 }
171 }
172 LeaderMessage::HeartBeat { user_id } => {
173 self.0
174 .driver
175 .suppress_vault_timeout(
176 user_id,
177 crate::HEARTBEAT_INTERVAL.add(crate::VAULT_TIMEOUT_GRACE_PERIOD),
178 )
179 .await;
180 }
181 }
182
183 Ok(())
184 }
185
186 pub async fn handle_device_event(&self, event: DeviceEvent) -> Result<(), ()> {
191 let leader = self.0.driver.discover_leader().await.ok_or(())?;
192
193 match event {
194 DeviceEvent::ManualLock { user_id } => {
195 let message = FollowerMessage::LockStateUpdate {
196 user_id,
197 lock_state: LockState::Locked,
198 };
199 self.send_message(message, leader).await;
200 }
201 DeviceEvent::ManualUnlock {
202 user_id,
203 ref user_key,
204 } => {
205 let message = FollowerMessage::LockStateUpdate {
206 user_id,
207 lock_state: LockState::Unlocked {
208 user_key: super::UserKey::from_bytes(user_key.to_owned()),
209 },
210 };
211 self.send_message(message, leader).await;
212 }
213 }
214
215 Ok(())
216 }
217
218 async fn send_message(&self, message: FollowerMessage, recipient: Endpoint) {
219 if let Err(error) = self.0.ipc_client.send_typed(message, recipient).await {
220 tracing::error!(?error, "Failed to send shared unlock IPC message");
221 }
222 }
223}