bitwarden_shared_unlock/
follower.rs1use std::{ops::Add, sync::Arc};
2
3use bitwarden_error::bitwarden_error;
4use bitwarden_ipc::{Endpoint, IpcClient, IpcClientExt, SubscribeError, TypedIncomingMessage};
5use bitwarden_threading::{cancellation_token, time::sleep};
6use thiserror::Error;
7
8use crate::{DeviceEvent, FollowerMessage, LeaderMessage, LockState, drivers::SharedUnlockDriver};
9
10#[bitwarden_error(basic)]
12#[derive(Debug, Error)]
13#[error("Could not start shared unlock follower: {0}")]
14pub struct FollowerStartError(#[from] SubscribeError);
15
16pub struct Follower<L: SharedUnlockDriver>(Arc<InnerFollower<L>>);
18
19impl<L: SharedUnlockDriver> Clone for Follower<L> {
20 fn clone(&self) -> Self {
21 Self(self.0.clone())
22 }
23}
24
25struct InnerFollower<D: SharedUnlockDriver> {
29 driver: D,
30 ipc_client: Arc<dyn IpcClient>,
31}
32
33impl<L: SharedUnlockDriver + Send + Sync + 'static> Follower<L> {
34 pub fn create(driver: L, ipc_client: Arc<dyn IpcClient>) -> Self {
39 Self(Arc::new(InnerFollower { driver, ipc_client }))
40 }
41
42 pub(crate) async fn start_sessions(&self) {
43 let users: Vec<bitwarden_core::UserId> = self.0.driver.list_users().await;
44 let leader = self
45 .0
46 .driver
47 .discover_leader()
48 .await
49 .expect("leader discovery should return a leader");
50
51 if !users.is_empty() {
52 tracing::info!("Starting shared unlock sessions for users: {:?}", users);
53 }
54
55 for user_id in users {
56 let lock_state = self.0.driver.get_user_lock_state(user_id).await;
57 let message = FollowerMessage::StartSession {
58 user_id,
59 lock_state,
60 };
61 self.send_message(message, leader.clone()).await;
62 }
63 }
64
65 pub async fn start(
67 &self,
68 cancellation_token: Option<cancellation_token::CancellationToken>,
69 ) -> Result<(), FollowerStartError> {
70 let cancellation_token = cancellation_token.unwrap_or_default();
71 let mut subscription = self.0.ipc_client.subscribe_typed::<LeaderMessage>().await?;
72 let follower = self.clone();
73
74 let cancellation_token_clone = cancellation_token.clone();
75 let future = async move {
76 loop {
77 let result = subscription
78 .receive(Some(cancellation_token_clone.clone()))
79 .await;
80 match result {
81 Ok(message) => {
82 if let Err(error) = follower.receive_message(message).await {
83 tracing::error!(
84 ?error,
85 "Failed to handle shared unlock follower message"
86 );
87 }
88 }
89 Err(bitwarden_ipc::TypedReceiveError::Cancelled) => {
90 tracing::info!("Shared unlock follower stopped by cancellation");
91 break;
92 }
93 Err(bitwarden_ipc::TypedReceiveError::Channel(
95 tokio::sync::broadcast::error::RecvError::Closed,
96 )) => {
97 tracing::info!("Transport channel closed. Waiting for it to open");
98 sleep(std::time::Duration::from_secs(1)).await;
99 break;
100 }
101 Err(error) => {
102 tracing::error!(?error, "Failed to receive shared unlock IPC message");
103 }
104 }
105 }
106 };
107
108 #[cfg(not(target_arch = "wasm32"))]
109 tokio::spawn(future);
110
111 #[cfg(target_arch = "wasm32")]
112 wasm_bindgen_futures::spawn_local(future);
113
114 let cancellation_token = cancellation_token.clone();
115 let follower = self.clone();
116 let timer_future = async move {
117 loop {
118 tokio::select! {
119 _ = cancellation_token.cancelled() => {
120 tracing::debug!("Shared unlock follower timer cancelled");
121 break;
122 }
123 _ = bitwarden_threading::time::sleep(crate::HEARTBEAT_INTERVAL) => {
124 if let Some(leader) = follower.0.driver.discover_leader().await {
125 for user_id in follower.0.driver.list_users().await {
127 let message = FollowerMessage::HeartBeat { user_id };
128 follower.send_message(message, leader.clone()).await;
129 }
130 }
131 }
132 }
133 }
134 };
135
136 #[cfg(not(target_arch = "wasm32"))]
137 tokio::spawn(timer_future);
138
139 #[cfg(target_arch = "wasm32")]
140 wasm_bindgen_futures::spawn_local(timer_future);
141
142 self.start_sessions().await;
143 Ok(())
144 }
145
146 pub async fn receive_message(
151 &self,
152 incoming_message: TypedIncomingMessage<LeaderMessage>,
153 ) -> Result<(), ()> {
154 let message = incoming_message.payload;
155 match message {
156 LeaderMessage::LockStateUpdate {
157 user_id,
158 lock_state,
159 } => {
160 let current_state = self.0.driver.get_user_lock_state(user_id).await;
163
164 match (current_state, lock_state) {
165 (LockState::Unlocked { .. }, LockState::Locked) => {
166 self.0.driver.lock_user(user_id).await?;
170 }
171 (LockState::Locked, LockState::Unlocked { user_key }) => {
172 self.0.driver.unlock_user(user_id, user_key).await?;
176 }
177 (LockState::Locked, LockState::Locked)
178 | (LockState::Unlocked { .. }, LockState::Unlocked { .. }) => {
179 }
182 }
183 }
184 LeaderMessage::HeartBeat { user_id } => {
185 self.0
186 .driver
187 .suppress_vault_timeout(
188 user_id,
189 crate::HEARTBEAT_INTERVAL.add(crate::VAULT_TIMEOUT_GRACE_PERIOD),
190 )
191 .await;
192 }
193 LeaderMessage::RequestSessionStart { user_id } => {
194 let lock_state = self.0.driver.get_user_lock_state(user_id).await;
195 let message = FollowerMessage::StartSession {
196 user_id,
197 lock_state,
198 };
199 self.send_message(message, self.0.driver.discover_leader().await.ok_or(())?)
200 .await;
201 }
202 }
203
204 Ok(())
205 }
206
207 pub async fn handle_device_event(&self, event: DeviceEvent) -> Result<(), ()> {
212 let leader = self.0.driver.discover_leader().await.ok_or(())?;
213
214 match event {
215 DeviceEvent::ManualLock { user_id } => {
216 let message = FollowerMessage::LockStateUpdate {
217 user_id,
218 lock_state: LockState::Locked,
219 };
220 self.send_message(message, leader).await;
221 }
222 DeviceEvent::ManualUnlock {
223 user_id,
224 ref user_key,
225 } => {
226 let message = FollowerMessage::LockStateUpdate {
227 user_id,
228 lock_state: LockState::Unlocked {
229 user_key: user_key.to_owned(),
230 },
231 };
232 self.send_message(message, leader).await;
233 }
234 }
235
236 Ok(())
237 }
238
239 async fn send_message(&self, message: FollowerMessage, recipient: Endpoint) {
240 if let Err(error) = self.0.ipc_client.send_typed(message, recipient).await {
241 tracing::error!(?error, "Failed to send shared unlock IPC message");
242 }
243 }
244}