Skip to main content

bitwarden_shared_unlock/
follower.rs

1use std::{ops::Add, sync::Arc};
2
3use bitwarden_error::bitwarden_error;
4use bitwarden_ipc::{Endpoint, IpcClient, IpcClientExt, SubscribeError, TypedIncomingMessage};
5use bitwarden_threading::cancellation_token;
6use thiserror::Error;
7
8use crate::{DeviceEvent, FollowerMessage, LeaderMessage, LockState, drivers::SharedUnlockDriver};
9
10/// Error type for failure to start the shared unlock follower.
11#[bitwarden_error(basic)]
12#[derive(Debug, Error)]
13#[error("Could not start shared unlock follower: {0}")]
14pub struct FollowerStartError(#[from] SubscribeError);
15
16/// Tracks local state and follows authoritative lock updates from a leader.
17pub struct Follower<L: SharedUnlockDriver>(Arc<InnerFollower<L>>);
18
19impl<L: SharedUnlockDriver> Clone for Follower<L> {
20    fn clone(&self) -> Self {
21        Self(self.0.clone())
22    }
23}
24
25/// Inner implementation of the shared unlock follower, containing the actual state and logic. The
26/// outer `Follower` struct is a thin wrapper around an `Arc` to allow for shared ownership across
27/// async tasks.
28struct InnerFollower<D: SharedUnlockDriver> {
29    driver: D,
30    ipc_client: Arc<dyn IpcClient>,
31}
32
33impl<L: SharedUnlockDriver + Send + Sync + 'static> Follower<L> {
34    /// Creates a follower instance and starts sessions for all currently known users.
35    ///
36    /// During startup, a `StartSession` message is sent per user so the leader can reconcile
37    /// initial lock state.
38    pub fn create(driver: L, ipc_client: Arc<dyn IpcClient>) -> Self {
39        Self(Arc::new(InnerFollower { driver, ipc_client }))
40    }
41
42    pub(crate) async fn start_sessions(&self) {
43        let users: Vec<bitwarden_core::UserId> = self.0.driver.list_users().await;
44        let leader = self
45            .0
46            .driver
47            .discover_leader()
48            .await
49            .expect("leader discovery should return a leader");
50
51        for user_id in users {
52            let lock_state = self.0.driver.get_user_lock_state(user_id).await;
53            let message = FollowerMessage::StartSession {
54                user_id,
55                lock_state,
56            };
57            self.send_message(message, leader.clone()).await;
58        }
59    }
60
61    /// Starts background tasks for IPC message handling and heartbeat timers.
62    pub async fn start(
63        &self,
64        cancellation_token: Option<cancellation_token::CancellationToken>,
65    ) -> Result<(), FollowerStartError> {
66        let cancellation_token = cancellation_token.unwrap_or_default();
67        let mut subscription = self.0.ipc_client.subscribe_typed::<LeaderMessage>().await?;
68        let follower = self.clone();
69
70        let cancellation_token_clone = cancellation_token.clone();
71        let future = async move {
72            loop {
73                let result = subscription
74                    .receive(Some(cancellation_token_clone.clone()))
75                    .await;
76                match result {
77                    Ok(message) => {
78                        if let Err(error) = follower.receive_message(message).await {
79                            tracing::error!(
80                                ?error,
81                                "Failed to handle shared unlock follower message"
82                            );
83                        }
84                    }
85                    Err(bitwarden_ipc::TypedReceiveError::Cancelled) => {
86                        tracing::info!("Shared unlock follower stopped by cancellation");
87                        break;
88                    }
89                    Err(error) => {
90                        tracing::error!(?error, "Failed to receive shared unlock IPC message");
91                    }
92                }
93            }
94        };
95
96        #[cfg(not(target_arch = "wasm32"))]
97        tokio::spawn(future);
98
99        #[cfg(target_arch = "wasm32")]
100        wasm_bindgen_futures::spawn_local(future);
101
102        let cancellation_token = cancellation_token.clone();
103        let follower = self.clone();
104        let timer_future = async move {
105            loop {
106                tokio::select! {
107                    _ = cancellation_token.cancelled() => {
108                        tracing::debug!("Shared unlock follower timer cancelled");
109                        break;
110                    }
111                    _ = bitwarden_threading::time::sleep(crate::HEARTBEAT_INTERVAL) => {
112                        if let Some(leader) = follower.0.driver.discover_leader().await {
113                            // For all users that are logged in, send a heartbeat message to the leader.
114                            for user_id in follower.0.driver.list_users().await {
115                                let message = FollowerMessage::HeartBeat { user_id };
116                                follower.send_message(message, leader.clone()).await;
117                            }
118                        }
119                    }
120                }
121            }
122        };
123
124        #[cfg(not(target_arch = "wasm32"))]
125        tokio::spawn(timer_future);
126
127        #[cfg(target_arch = "wasm32")]
128        wasm_bindgen_futures::spawn_local(timer_future);
129
130        self.start_sessions().await;
131        Ok(())
132    }
133
134    /// Handles an authoritative message from the leader.
135    ///
136    /// Lock state updates overwrite local state to keep follower and leader in sync. Heartbeat
137    /// responses are forwarded to the heartbeat response handler.
138    pub async fn receive_message(
139        &self,
140        incoming_message: TypedIncomingMessage<LeaderMessage>,
141    ) -> Result<(), ()> {
142        let message = incoming_message.payload;
143        match message {
144            LeaderMessage::LockStateUpdate {
145                user_id,
146                lock_state,
147            } => {
148                // The leader is the authoritative state source for the follow, and it should
149                // always overwrite the local state of the follower.
150                let current_state = self.0.driver.get_user_lock_state(user_id).await;
151
152                match (current_state, lock_state) {
153                    (LockState::Unlocked { .. }, LockState::Locked) => {
154                        // If the user is currently unlocked and it receives an authoritative lock
155                        // state update from the leader that is Locked, then
156                        // it should follow, and lock the local state.
157                        self.0.driver.lock_user(user_id).await?;
158                    }
159                    (LockState::Locked, LockState::Unlocked { user_key }) => {
160                        // If the user is currently locked and it receives an authoritative lock
161                        // state update from the leader that is Unlocked,
162                        // then it should follow, and unlock the local state.
163                        self.0.driver.unlock_user(user_id, user_key).await?;
164                    }
165                    (LockState::Locked, LockState::Locked)
166                    | (LockState::Unlocked { .. }, LockState::Unlocked { .. }) => {
167                        // If both the current state and the received lock state are the same, then
168                        // do nothing, as they are already in sync.
169                    }
170                }
171            }
172            LeaderMessage::HeartBeat { user_id } => {
173                self.0
174                    .driver
175                    .suppress_vault_timeout(
176                        user_id,
177                        crate::HEARTBEAT_INTERVAL.add(crate::VAULT_TIMEOUT_GRACE_PERIOD),
178                    )
179                    .await;
180            }
181        }
182
183        Ok(())
184    }
185
186    /// Handles local device events and forwards them to the discovered leader.
187    ///
188    /// Manual lock/unlock events are sent as lock state updates. Timer events send per-user
189    /// heartbeats to keep the shared session active.
190    pub async fn handle_device_event(&self, event: DeviceEvent) -> Result<(), ()> {
191        let leader = self.0.driver.discover_leader().await.ok_or(())?;
192
193        match event {
194            DeviceEvent::ManualLock { user_id } => {
195                let message = FollowerMessage::LockStateUpdate {
196                    user_id,
197                    lock_state: LockState::Locked,
198                };
199                self.send_message(message, leader).await;
200            }
201            DeviceEvent::ManualUnlock {
202                user_id,
203                ref user_key,
204            } => {
205                let message = FollowerMessage::LockStateUpdate {
206                    user_id,
207                    lock_state: LockState::Unlocked {
208                        user_key: super::UserKey::from_bytes(user_key.to_owned()),
209                    },
210                };
211                self.send_message(message, leader).await;
212            }
213        }
214
215        Ok(())
216    }
217
218    async fn send_message(&self, message: FollowerMessage, recipient: Endpoint) {
219        if let Err(error) = self.0.ipc_client.send_typed(message, recipient).await {
220            tracing::error!(?error, "Failed to send shared unlock IPC message");
221        }
222    }
223}