Skip to main content

bitwarden_shared_unlock/
follower.rs

1use std::{ops::Add, sync::Arc};
2
3use bitwarden_error::bitwarden_error;
4use bitwarden_ipc::{Endpoint, IpcClient, IpcClientExt, SubscribeError, TypedIncomingMessage};
5use bitwarden_threading::{cancellation_token, time::sleep};
6use thiserror::Error;
7
8use crate::{DeviceEvent, FollowerMessage, LeaderMessage, LockState, drivers::SharedUnlockDriver};
9
10/// Error type for failure to start the shared unlock follower.
11#[bitwarden_error(basic)]
12#[derive(Debug, Error)]
13#[error("Could not start shared unlock follower: {0}")]
14pub struct FollowerStartError(#[from] SubscribeError);
15
16/// Tracks local state and follows authoritative lock updates from a leader.
17pub struct Follower<L: SharedUnlockDriver>(Arc<InnerFollower<L>>);
18
19impl<L: SharedUnlockDriver> Clone for Follower<L> {
20    fn clone(&self) -> Self {
21        Self(self.0.clone())
22    }
23}
24
25/// Inner implementation of the shared unlock follower, containing the actual state and logic. The
26/// outer `Follower` struct is a thin wrapper around an `Arc` to allow for shared ownership across
27/// async tasks.
28struct InnerFollower<D: SharedUnlockDriver> {
29    driver: D,
30    ipc_client: Arc<dyn IpcClient>,
31}
32
33impl<L: SharedUnlockDriver + Send + Sync + 'static> Follower<L> {
34    /// Creates a follower instance and starts sessions for all currently known users.
35    ///
36    /// During startup, a `StartSession` message is sent per user so the leader can reconcile
37    /// initial lock state.
38    pub fn create(driver: L, ipc_client: Arc<dyn IpcClient>) -> Self {
39        Self(Arc::new(InnerFollower { driver, ipc_client }))
40    }
41
42    pub(crate) async fn start_sessions(&self) {
43        let users: Vec<bitwarden_core::UserId> = self.0.driver.list_users().await;
44        let leader = self
45            .0
46            .driver
47            .discover_leader()
48            .await
49            .expect("leader discovery should return a leader");
50
51        if !users.is_empty() {
52            tracing::info!("Starting shared unlock sessions for users: {:?}", users);
53        }
54
55        for user_id in users {
56            let lock_state = self.0.driver.get_user_lock_state(user_id).await;
57            let message = FollowerMessage::StartSession {
58                user_id,
59                lock_state,
60            };
61            self.send_message(message, leader.clone()).await;
62        }
63    }
64
65    /// Starts background tasks for IPC message handling and heartbeat timers.
66    pub async fn start(
67        &self,
68        cancellation_token: Option<cancellation_token::CancellationToken>,
69    ) -> Result<(), FollowerStartError> {
70        let cancellation_token = cancellation_token.unwrap_or_default();
71        let mut subscription = self.0.ipc_client.subscribe_typed::<LeaderMessage>().await?;
72        let follower = self.clone();
73
74        let cancellation_token_clone = cancellation_token.clone();
75        let future = async move {
76            loop {
77                let result = subscription
78                    .receive(Some(cancellation_token_clone.clone()))
79                    .await;
80                match result {
81                    Ok(message) => {
82                        if let Err(error) = follower.receive_message(message).await {
83                            tracing::error!(
84                                ?error,
85                                "Failed to handle shared unlock follower message"
86                            );
87                        }
88                    }
89                    Err(bitwarden_ipc::TypedReceiveError::Cancelled) => {
90                        tracing::info!("Shared unlock follower stopped by cancellation");
91                        break;
92                    }
93                    // This is required because otherwise the browser may freeze in this loop
94                    Err(bitwarden_ipc::TypedReceiveError::Channel(
95                        tokio::sync::broadcast::error::RecvError::Closed,
96                    )) => {
97                        tracing::info!("Transport channel closed. Waiting for it to open");
98                        sleep(std::time::Duration::from_secs(1)).await;
99                        break;
100                    }
101                    Err(error) => {
102                        tracing::error!(?error, "Failed to receive shared unlock IPC message");
103                    }
104                }
105            }
106        };
107
108        #[cfg(not(target_arch = "wasm32"))]
109        tokio::spawn(future);
110
111        #[cfg(target_arch = "wasm32")]
112        wasm_bindgen_futures::spawn_local(future);
113
114        let cancellation_token = cancellation_token.clone();
115        let follower = self.clone();
116        let timer_future = async move {
117            loop {
118                tokio::select! {
119                    _ = cancellation_token.cancelled() => {
120                        tracing::debug!("Shared unlock follower timer cancelled");
121                        break;
122                    }
123                    _ = bitwarden_threading::time::sleep(crate::HEARTBEAT_INTERVAL) => {
124                        if let Some(leader) = follower.0.driver.discover_leader().await {
125                            // For all users that are logged in, send a heartbeat message to the leader.
126                            for user_id in follower.0.driver.list_users().await {
127                                let message = FollowerMessage::HeartBeat { user_id };
128                                follower.send_message(message, leader.clone()).await;
129                            }
130                        }
131                    }
132                }
133            }
134        };
135
136        #[cfg(not(target_arch = "wasm32"))]
137        tokio::spawn(timer_future);
138
139        #[cfg(target_arch = "wasm32")]
140        wasm_bindgen_futures::spawn_local(timer_future);
141
142        self.start_sessions().await;
143        Ok(())
144    }
145
146    /// Handles an authoritative message from the leader.
147    ///
148    /// Lock state updates overwrite local state to keep follower and leader in sync. Heartbeat
149    /// responses are forwarded to the heartbeat response handler.
150    pub async fn receive_message(
151        &self,
152        incoming_message: TypedIncomingMessage<LeaderMessage>,
153    ) -> Result<(), ()> {
154        let message = incoming_message.payload;
155        match message {
156            LeaderMessage::LockStateUpdate {
157                user_id,
158                lock_state,
159            } => {
160                // The leader is the authoritative state source for the follow, and it should
161                // always overwrite the local state of the follower.
162                let current_state = self.0.driver.get_user_lock_state(user_id).await;
163
164                match (current_state, lock_state) {
165                    (LockState::Unlocked { .. }, LockState::Locked) => {
166                        // If the user is currently unlocked and it receives an authoritative lock
167                        // state update from the leader that is Locked, then
168                        // it should follow, and lock the local state.
169                        self.0.driver.lock_user(user_id).await?;
170                    }
171                    (LockState::Locked, LockState::Unlocked { user_key }) => {
172                        // If the user is currently locked and it receives an authoritative lock
173                        // state update from the leader that is Unlocked,
174                        // then it should follow, and unlock the local state.
175                        self.0.driver.unlock_user(user_id, user_key).await?;
176                    }
177                    (LockState::Locked, LockState::Locked)
178                    | (LockState::Unlocked { .. }, LockState::Unlocked { .. }) => {
179                        // If both the current state and the received lock state are the same, then
180                        // do nothing, as they are already in sync.
181                    }
182                }
183            }
184            LeaderMessage::HeartBeat { user_id } => {
185                self.0
186                    .driver
187                    .suppress_vault_timeout(
188                        user_id,
189                        crate::HEARTBEAT_INTERVAL.add(crate::VAULT_TIMEOUT_GRACE_PERIOD),
190                    )
191                    .await;
192            }
193            LeaderMessage::RequestSessionStart { user_id } => {
194                let lock_state = self.0.driver.get_user_lock_state(user_id).await;
195                let message = FollowerMessage::StartSession {
196                    user_id,
197                    lock_state,
198                };
199                self.send_message(message, self.0.driver.discover_leader().await.ok_or(())?)
200                    .await;
201            }
202        }
203
204        Ok(())
205    }
206
207    /// Handles local device events and forwards them to the discovered leader.
208    ///
209    /// Manual lock/unlock events are sent as lock state updates. Timer events send per-user
210    /// heartbeats to keep the shared session active.
211    pub async fn handle_device_event(&self, event: DeviceEvent) -> Result<(), ()> {
212        let leader = self.0.driver.discover_leader().await.ok_or(())?;
213
214        match event {
215            DeviceEvent::ManualLock { user_id } => {
216                let message = FollowerMessage::LockStateUpdate {
217                    user_id,
218                    lock_state: LockState::Locked,
219                };
220                self.send_message(message, leader).await;
221            }
222            DeviceEvent::ManualUnlock {
223                user_id,
224                ref user_key,
225            } => {
226                let message = FollowerMessage::LockStateUpdate {
227                    user_id,
228                    lock_state: LockState::Unlocked {
229                        user_key: user_key.to_owned(),
230                    },
231                };
232                self.send_message(message, leader).await;
233            }
234        }
235
236        Ok(())
237    }
238
239    async fn send_message(&self, message: FollowerMessage, recipient: Endpoint) {
240        if let Err(error) = self.0.ipc_client.send_typed(message, recipient).await {
241            tracing::error!(?error, "Failed to send shared unlock IPC message");
242        }
243    }
244}