Skip to main content

bitwarden_logging/
circular_buffer.rs

1//! Thread-safe circular buffer implementation.
2
3use std::{collections::VecDeque, num::NonZeroUsize, sync::Mutex};
4
5// Static assertion: CircularBuffer<T> must be Send + Sync for any Send type T.
6// This is required because the buffer is shared across threads via Arc.
7// T only needs Send (not Sync) because Mutex<VecDeque<T>> provides the Sync guarantee.
8const _: () = {
9    fn _assert_send_sync<T: Send + Sync>() {}
10    fn _assert_circular_buffer<T: Send>() {
11        _assert_send_sync::<CircularBuffer<T>>();
12    }
13};
14
15/// A thread-safe circular buffer with FIFO eviction.
16///
17/// When the buffer reaches its capacity, the oldest items are automatically
18/// removed to make room for new ones.
19pub struct CircularBuffer<T> {
20    buffer: Mutex<VecDeque<T>>,
21    capacity: usize,
22}
23
24impl<T> std::fmt::Debug for CircularBuffer<T> {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        f.debug_struct("CircularBuffer")
27            .field("capacity", &self.capacity)
28            .finish()
29    }
30}
31
32impl<T> CircularBuffer<T> {
33    /// Create a new circular buffer with the given capacity.
34    pub fn new(capacity: NonZeroUsize) -> Self {
35        let capacity = capacity.get();
36        Self {
37            buffer: Mutex::new(VecDeque::with_capacity(capacity)),
38            capacity,
39        }
40    }
41
42    /// Push an item into the buffer.
43    ///
44    /// If the buffer is at capacity, the oldest item is evicted and returned.
45    pub fn push(&self, item: T) -> Option<T> {
46        let mut buffer = self.buffer.lock().expect("CircularBuffer mutex poisoned");
47
48        let evicted = if buffer.len() >= self.capacity {
49            buffer.pop_front()
50        } else {
51            None
52        };
53
54        buffer.push_back(item);
55        evicted
56    }
57
58    /// Get the current number of items in the buffer.
59    #[inline]
60    #[must_use]
61    pub fn len(&self) -> usize {
62        self.buffer
63            .lock()
64            .expect("CircularBuffer mutex poisoned")
65            .len()
66    }
67
68    /// Check if the buffer is empty.
69    #[inline]
70    #[must_use]
71    pub fn is_empty(&self) -> bool {
72        self.buffer
73            .lock()
74            .expect("CircularBuffer mutex poisoned")
75            .is_empty()
76    }
77}
78
79impl<T: Clone> CircularBuffer<T> {
80    /// Read all items from the buffer.
81    ///
82    /// Returns a snapshot of current buffer contents in order (oldest first).
83    #[must_use]
84    pub fn read(&self) -> Vec<T> {
85        let mut result = Vec::with_capacity(self.capacity);
86        let buffer = self.buffer.lock().expect("CircularBuffer mutex poisoned");
87        result.extend(buffer.iter().cloned());
88        result
89    }
90}
91
92#[cfg(test)]
93mod tests {
94    use std::sync::Arc;
95
96    use super::*;
97
98    #[test]
99    fn test_push_and_read() {
100        let buffer = CircularBuffer::new(NonZeroUsize::new(5).unwrap());
101        buffer.push("first".to_string());
102        buffer.push("second".to_string());
103        buffer.push("third".to_string());
104
105        let items = buffer.read();
106        assert_eq!(items, vec!["first", "second", "third"]);
107    }
108
109    #[test]
110    fn test_fifo_eviction() {
111        let buffer = CircularBuffer::new(NonZeroUsize::new(3).unwrap());
112        buffer.push("event1".to_string());
113        buffer.push("event2".to_string());
114        buffer.push("event3".to_string());
115
116        // Fourth push should evict the oldest
117        let evicted = buffer.push("event4".to_string());
118        assert_eq!(evicted, Some("event1".to_string()));
119
120        let contents = buffer.read();
121        assert_eq!(contents, vec!["event2", "event3", "event4"]);
122    }
123
124    #[test]
125    fn test_len_and_is_empty() {
126        let buffer = CircularBuffer::new(NonZeroUsize::new(10).unwrap());
127        assert!(buffer.is_empty());
128        assert_eq!(buffer.len(), 0);
129
130        buffer.push("item".to_string());
131        assert!(!buffer.is_empty());
132        assert_eq!(buffer.len(), 1);
133
134        let items = buffer.read();
135        assert_eq!(items.len(), 1);
136    }
137
138    #[test]
139    fn test_read_preserves_contents() {
140        let buffer = CircularBuffer::new(NonZeroUsize::new(5).unwrap());
141        buffer.push("a".to_string());
142        buffer.push("b".to_string());
143
144        let first_read = buffer.read();
145        let second_read = buffer.read();
146        assert_eq!(first_read, second_read);
147    }
148
149    #[test]
150    fn test_concurrent_push() {
151        let buffer = Arc::new(CircularBuffer::new(NonZeroUsize::new(1000).unwrap()));
152        let handles: Vec<_> = (0..10)
153            .map(|i| {
154                let buf = Arc::clone(&buffer);
155                std::thread::spawn(move || {
156                    for j in 0..100 {
157                        buf.push(format!("thread{i}-event{j}"));
158                    }
159                })
160            })
161            .collect();
162
163        for h in handles {
164            h.join().expect("thread panicked");
165        }
166
167        assert_eq!(buffer.len(), 1000);
168    }
169}