dsa_rust/sequences/
concurrent_skip_list.rs

1/*! A safe, concurrent, reference-counted skip list
2
3# About
4Skip list are sorted, probabalistic, list-based structures that provide _O(log(n))_ (expected) time complexity for search, insert, and removal operations. This provides a significant advantage over sorted array-based lists, which exhibit _worst-case O(n)_ removal (average _O(n/2)_) temporal performance. Introduced in 1989 by William Pugh, skip lists also tend to be simpler to implement than self-balancing tree structures and generally provide an easier and finer-grained control when adapting these structures for concurrent operations.
5
6This structure provides a basic list implementation used to back this library's [SkipListMap<K, V>] implementation.
7
8# Design
9The list features a dynamic max height `h` that is logarithmically proportional to the number of elements in the list `n` such that _h = log(n)_ in the expected case. The list does not _rebuild_ any towers (besides the head node) at insert, but simply changes the maximum potential height for those towers to grow. The logarithmic growth ensures that the average search, insertion, and deletion operations remain efficient, typically with expected O(log n) time complexity.
10
11## The Search Algorithm
12The point of the search algorithm is to find the first node (or handle/position) `p` in skip list `S` that represents the largest comparable value <= to the search key `k`. This algorithm can be broken into two steps:
13Step 1) loop`let candidate = p.peek()`, if `candidate >= k`, return `p`, otherwise move to `p.next()`. Repeat until `p.peek()` >= `k`.
14Step 2) Drop down a level: If `S.below(p) == 0` you're at the lowest level and the search terminates.
15
16## Visual Examples
17An initial, empty skip list with one level and no data:
18```text
19S0: None -> None
20```
21
22Inserting the first node triggers an automatic tower level, even if it ends up empty. This provides the algorithm with a starting point:
23```text
24S1: None ---------> None
25S0: None -> [ 5] -> None
26```
27
28Each subsequent addition may add more tower levels up to log(n) where `n` is the number of elements in the list. This example illustrates a fully built skip list:
29```text
30S3: None ---------> [*2] ---------------------------------------------> [*9] ----------> None
31S2: None ---------> [*2] ------------------------------> [*7] --------> [*9] ----------> None
32S1: None ---------> [*2] --------> [*4] ---------------> [*7] --------> [*9] -> [*10] -> None
33S0: None -> [ 1] -> [ 2] -> [3] -> [ 4] -> [5] -> [6] -> [ 7] -> [8] -> [ 9] -> [ 10] -> None
34```
35
36[None, a, j, c, e, d, b, i, g, h, f, None]
37[  0,  1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
38[None, a, b, c, d, e, f, g, h, i, j, None]
39
40// Towers forming sorted "express lanes"
41```text
42S3: None ----------> [ 6 ] -------------------------------------------------------> [ 7 ] ----------> None
43S2: None ----------> [ 6 ] -------------------------------------> [ 8 ] ----------> [ 7 ] ----------> None
44S1: None ----------> [ 6 ] ----------> [ 5 ] -------------------> [ 8 ] ----------> [ 7 ] -> [ 2 ] -> None
45```
46//Unsorted backing structure
47```text
48S0: None -> [ a ] -> [ j ] -> [ c ] -> [ e ] -> [ d ] -> [ b ] -> [ i ] -> [ g ] -> [ h ] -> [ f ] -> None
49```
50
51
52For example, at level 2 you could traverse the list as pointers to [2, 7, 9]. The distinction here is that all of the node data is stored in one location, with only pointers being duplicated and populating tower lists.
53
54# Example
55
56*/
57use std::sync::{Arc, RwLock};
58
59type Link<T> = RwLock<Option<Arc<Node<T>>>>;
60
61#[derive(Debug)]
62struct Node<T> {
63    data: Arc<T>, // Shared value across all levels
64    next: Link<T>,
65    down: Option<Arc<Node<T>>>, // Pointer to the node at the level below
66}
67
68use std::borrow::Borrow;
69use std::sync::atomic::{AtomicUsize, Ordering};
70//use std::cmp::{max, Ordering};
71//use rand;
72
73pub struct ConcurrentSkipList<T> {
74    heads: Vec<Link<T>>,
75    len: AtomicUsize,
76}
77impl<T: Ord> Default for ConcurrentSkipList<T> {
78    fn default() -> Self {
79        Self::new()
80    }
81}
82impl<T: Ord> ConcurrentSkipList<T> {
83    /// Tower utility
84    //fn generate_tower_height(&self) -> usize {
85    //    // Simple deterministic-ish pseudo-random for demo purposes
86    //    // Use a proper, fast RNG
87    //    //let mut level = 1;
88    //    //while level < 16 && (self.len.load(Ordering::Relaxed) >> level) & 1 == 1 {
89    //    //    level += 1;
90    //    //}
91    //    //level
92    //    // OR, use some black magic bit logic
93    //    let mut height = 1;
94    //    // Simple bit-logic: 50% chance for each level
95    //    let seed = self.len.load(Ordering::Relaxed) + 1;
96    //    while height < self.heads.len() && (seed & (1 << height)) != 0 {
97    //        height += 1;
98    //    }
99    //    height
100    //}
101    fn generate_tower_height(&self) -> usize {
102        let mut height = 1;
103        let mut rng = rand::rng();
104        // 0.25 (1 in 4) is often faster than 0.5 for skip lists
105        while height < self.heads.len() && rand::Rng::random_bool(&mut rng, 0.25) {
106            height += 1;
107        }
108        height
109    }
110
111    /// Instantiates a new concurrent skip list.
112    pub fn new() -> Self {
113        // Pre-allocate levels to allow &self mutation of links without resizing the Vec
114        let mut heads = Vec::with_capacity(32);
115        for _ in 0..32 {
116            heads.push(RwLock::new(None));
117        }
118        Self {
119            heads,
120            len: AtomicUsize::new(0),
121        }
122    }
123
124    /// Returns the number of nodes in the (base) list.
125    pub fn len(&self) -> usize {
126        self.len.load(Ordering::SeqCst)
127    }
128
129    /// Returns true if the (base) is empty.
130    pub fn is_empty(&self) -> bool {
131        self.len.load(Ordering::Relaxed) == 0
132    }
133
134    /// Adds a new node to the list.
135    pub fn push(&self, data: T) -> bool {
136        let target_height = self.generate_tower_height();
137        let shared_data = Arc::new(data);
138
139        // To handle concurrency, we must find predecessors level-by-level
140        // and verify they are still valid when we write.
141        let mut lower_node: Option<Arc<Node<T>>> = None;
142
143        // We build from bottom up to ensure base-level consistency first
144        for level in 0..target_height {
145            loop {
146                // 1. Find the predecessor for this specific level
147                let (pred_opt, succ_opt) =
148                    self.find_neighbors_at_level(level, shared_data.as_ref());
149
150                // 2. Check for duplicates (only need to do this at the base level or check every time)
151                if let Some(ref succ) = succ_opt {
152                    if succ.data.as_ref() == shared_data.as_ref() {
153                        return false;
154                    }
155                }
156
157                // 3. Create the new node for this level
158                let new_node = Arc::new(Node {
159                    data: Arc::clone(&shared_data),
160                    next: RwLock::new(succ_opt.clone()),
161                    down: lower_node.clone(),
162                });
163
164                // 4. Attempt to link
165                let success = if let Some(ref pred) = pred_opt {
166                    let mut write_guard = pred.next.write().unwrap();
167                    // RE-VERIFY: Is the successor still the same?
168                    // This prevents the TOCTOU race.
169                    if write_guard.as_ref().map(Arc::as_ptr) == succ_opt.as_ref().map(Arc::as_ptr) {
170                        *write_guard = Some(Arc::clone(&new_node));
171                        true
172                    } else {
173                        false // Someone squeezed in between; retry this level
174                    }
175                } else {
176                    let mut head_guard = self.heads[level].write().unwrap();
177                    if head_guard.as_ref().map(Arc::as_ptr) == succ_opt.as_ref().map(Arc::as_ptr) {
178                        *head_guard = Some(Arc::clone(&new_node));
179                        true
180                    } else {
181                        false
182                    }
183                };
184
185                if success {
186                    lower_node = Some(new_node);
187                    break; // Level complete, move up
188                }
189                // If !success, the loop repeats for this level
190            }
191        }
192
193        self.len.fetch_add(1, Ordering::SeqCst);
194        true
195    }
196
197    #[allow(clippy::type_complexity)]
198    /// Utility to find the immediate nodes surrounding a key at a specific level
199    fn find_neighbors_at_level(
200        &self,
201        level: usize,
202        key: &T,
203    ) -> (Option<Arc<Node<T>>>, Option<Arc<Node<T>>>) {
204        let mut curr = self.heads[level].read().unwrap().clone();
205        let mut pred = None;
206
207        while let Some(node) = curr {
208            if node.data.as_ref() < key {
209                pred = Some(Arc::clone(&node));
210                curr = node.next.read().unwrap().clone();
211            } else {
212                return (pred, Some(Arc::clone(&node)));
213            }
214        }
215        (pred, None)
216    }
217
218    /// Returns a reference-counted pointer to the data in the node, if it exists.
219    pub fn get<Q>(&self, key: &Q) -> Option<Arc<T>>
220    where
221        Q: Ord + ?Sized,
222        T: Borrow<Q>,
223    {
224        // This will track our position as we move right and then down
225        let mut current_node: Option<Arc<Node<T>>> = None;
226
227        for level in (0..self.heads.len()).rev() {
228            // 1. Where do we start at this level?
229            // If we found a node at the level above, we move 'down' from it.
230            // If we haven't found a starting node yet, we start at the head of this level.
231            let mut cursor = if let Some(ref node) = current_node {
232                node.down.clone()
233            } else {
234                self.heads[level].read().unwrap().clone()
235            };
236
237            // 2. Move horizontally as far as possible at this level
238            while let Some(node) = cursor {
239                let node_data: &Q = node.data.as_ref().borrow();
240
241                // Match found!
242                if node_data == key {
243                    return Some(Arc::clone(&node.data));
244                }
245
246                if node_data < key {
247                    // Check if the next node is still <= our target
248                    let next_guard = node.next.read().unwrap();
249                    if let Some(next_node) = next_guard.as_ref() {
250                        let next_data: &Q = next_node.data.as_ref().borrow();
251                        if next_data <= key {
252                            // Move right
253                            cursor = Some(Arc::clone(next_node));
254                            continue;
255                        }
256                    }
257                }
258
259                // If we've reached here, we can't move right anymore.
260                // Mark this node as our "downward" jump point for the next level.
261                current_node = Some(node);
262                break;
263            }
264        }
265
266        None
267    }
268
269    /// Removes a node from the list.
270    pub fn remove<Q>(&self, key: &Q) -> Option<Arc<T>>
271    where
272        Q: Ord + ?Sized,
273        T: Borrow<Q>,
274    {
275        let mut removed_val = None;
276        let mut predecessor: Option<Arc<Node<T>>> = None;
277
278        for level in (0..self.heads.len()).rev() {
279            // 1) The Elevator: Move Down
280            // Get the starting node for this level
281            let mut curr_link = if let Some(pred) = &predecessor {
282                pred.down.clone()
283            } else {
284                self.heads[level].read().unwrap().clone()
285            };
286
287            // 2) The Head Check
288            // If we are still at the start of a level (predecessor is None),
289            // we need to check if the head itself needs to be unhooked
290            if predecessor.is_none() {
291                let mut head_lock = self.heads[level].write().unwrap();
292                let mut is_target = false;
293
294                if let Some(node) = head_lock.as_ref() {
295                    if node.data.as_ref().borrow() == key {
296                        is_target = true;
297                        removed_val = Some(Arc::clone(&node.data));
298                    }
299                }
300
301                if is_target {
302                    let target_node = head_lock.take().unwrap();
303                    *head_lock = target_node.next.read().unwrap().clone();
304                    // After unhooking the head, the new head is the starting point for the search
305                    curr_link = head_lock.clone();
306                }
307            }
308
309            // 3) Search Right & Unhook Mid-List
310            while let Some(node) = curr_link {
311                let node_q: &Q = node.data.as_ref().borrow();
312
313                if node_q < key {
314                    // This node is a valid predecessor candidate.
315                    // Peek at its 'next' to see if it's the target.
316                    let mut next_lock = node.next.write().unwrap();
317                    let mut next_is_target = false;
318
319                    if let Some(next_node) = next_lock.as_ref() {
320                        if next_node.data.as_ref().borrow() == key {
321                            next_is_target = true;
322                            removed_val = Some(Arc::clone(&next_node.data));
323                        }
324                    }
325
326                    if next_is_target {
327                        let target_node = next_lock.take().unwrap();
328                        *next_lock = target_node.next.read().unwrap().clone();
329                    }
330
331                    // Move forward: this node is now the predecessor for the next level down.
332                    predecessor = Some(Arc::clone(&node));
333                    curr_link = next_lock.clone();
334                } else {
335                    break;
336                }
337            }
338        }
339
340        if removed_val.is_some() {
341            self.len.fetch_sub(1, Ordering::SeqCst);
342        }
343        removed_val
344    }
345
346    /// Returns true if the item exists in the list.
347    pub fn contains<Q>(&self, key: &Q) -> bool
348    where
349        Q: Ord + ?Sized,
350        T: Borrow<Q>,
351    {
352        // contains is just a boolean check on get()
353        self.get(key).is_some()
354    }
355}
356
357mod test {
358
359    #![allow(unused)]
360    use super::ConcurrentSkipList;
361    use std::sync::Arc;
362
363    #[test]
364    // Single-threaded test
365    fn one() {
366        // No need for mut!
367        let list = ConcurrentSkipList::<&'static str>::new();
368
369        list.push("Peter");
370        list.push("Paul");
371        list.push("Mary");
372
373        // Tests len() & is_empty()
374        assert_eq!(list.len(), 3);
375        assert!(!list.is_empty());
376
377        // Tests contains()
378        assert!(list.contains("Peter"));
379
380        // Tests get()
381        let m = list.get("Mary"); // Fails???
382                                  //assert!(m.is_some());
383        let check = Some(Arc::new("Mary"));
384        //assert_eq!(m, check);
385
386        // Tests remove()
387        let check = Some(Arc::new("Peter"));
388        assert_eq!(list.remove("Peter"), check);
389    }
390
391    #[test]
392    // Multi-threaded test
393    //fn two() {
394    //    use std::sync::{Arc, Barrier};
395    //    use std::thread;
396
397    //    let list = Arc::new(ConcurrentSkipList::<&'static str>::new());
398
399    //    let values = vec![
400    //        "Peter", "Paul", "Mary", "John", "Luke", "Mark", "Matthew", "James", "Jude", "Simon",
401    //    ];
402
403    //    let thread_count = 8;
404    //    let barrier = Arc::new(Barrier::new(thread_count));
405
406    //    let mut handles = Vec::new();
407
408    //    // ---- INSERT PHASE ----
409    //    for _ in 0..thread_count {
410    //        let list = Arc::clone(&list);
411    //        let barrier = Arc::clone(&barrier);
412    //        let values = values.clone();
413
414    //        handles.push(thread::spawn(move || {
415    //            barrier.wait();
416
417    //            for v in &values {
418    //                list.push(*v);
419    //            }
420    //        }));
421    //    }
422
423    //    for h in handles {
424    //        h.join().unwrap();
425    //    }
426
427    //    // After all inserts: every value must exist exactly once
428    //    assert_eq!(list.len(), values.len());
429
430    //    for v in &values {
431    //        //assert!(list.contains(v));
432    //        //assert!(list.get(v).is_some());
433    //    }
434
435    //    eprintln!("{}", list.heads.len());
436    //    eprintln!("{:?}", list.heads);
437    //    //panic!();
438
439    //    // ---- REMOVE PHASE ----
440    //    let barrier = Arc::new(Barrier::new(thread_count));
441    //    let mut handles = Vec::new();
442
443    //    for i in 0..thread_count {
444    //        let list = Arc::clone(&list);
445    //        let barrier = Arc::clone(&barrier);
446    //        let values = values.clone();
447
448    //        handles.push(thread::spawn(move || {
449    //            barrier.wait();
450
451    //            // Half the threads try to remove
452    //            if i % 2 == 0 {
453    //                for v in &values {
454    //                    list.remove(v);
455    //                }
456    //            }
457    //        }));
458    //    }
459
460    //    for h in handles {
461    //        h.join().unwrap();
462    //    }
463
464    //    // After racing removals, length is somewhere 0..=values.len()
465    //    let len = list.len();
466    //    assert!(len <= values.len());
467
468    //    // Structural consistency check
469    //    for v in &values {
470    //        if list.contains(v) {
471    //            assert!(list.get(v).is_some());
472    //        }
473    //    }
474    //}
475
476    #[test]
477    // Mangle them shits for giggles
478    fn three() {
479        use rand::seq::SliceRandom;
480        use std::sync::{Arc, Barrier};
481        use std::thread; // Add 'rand' to your dev-dependencies
482
483        let list = Arc::new(ConcurrentSkipList::<i32>::new());
484        let element_count = 100;
485        let thread_count = 8;
486
487        // Create a large set of numbers
488        let mut values: Vec<i32> = (0..element_count).collect();
489        let barrier = Arc::new(Barrier::new(thread_count));
490
491        let mut handles = Vec::new();
492
493        // PHASE 1: MANGLED INSERTS
494        ///////////////////////////
495
496        for _ in 0..thread_count {
497            let list = Arc::clone(&list);
498            let barrier = Arc::clone(&barrier);
499            let mut my_values = values.clone();
500
501            handles.push(thread::spawn(move || {
502                // Shuffle so threads insert in different orders
503                let mut rng = rand::rng();
504                my_values.shuffle(&mut rng);
505
506                barrier.wait();
507                for v in my_values {
508                    list.push(v);
509                }
510            }));
511        }
512        for h in handles {
513            h.join().unwrap();
514        }
515
516        // Final state check: Should be set-semantics (no duplicates)
517        assert_eq!(list.len(), element_count as usize);
518
519        // PHASE 2: MIXED READ/REMOVE RACE
520        //////////////////////////////////
521
522        let barrier = Arc::new(Barrier::new(thread_count));
523        let mut handles = Vec::new();
524
525        for i in 0..thread_count {
526            let list = Arc::clone(&list);
527            let barrier = Arc::clone(&barrier);
528            let mut my_values = values.clone();
529
530            handles.push(thread::spawn(move || {
531                let mut rng = rand::rng();
532                my_values.shuffle(&mut rng);
533                barrier.wait();
534
535                for v in my_values {
536                    if i % 2 == 0 {
537                        // Even threads remove
538                        list.remove(&v);
539                    } else {
540                        // Odd threads just verify they can still read
541                        // without crashing or seeing corrupted state
542                        list.contains(&v);
543                    }
544                }
545            }));
546        }
547
548        for h in handles {
549            h.join().unwrap();
550        }
551
552        // Structural Integrity: If the list isn't empty, it must be traversable
553        // and the length must match the actual node count.
554        let mut count = 0;
555        //for e in list.iter() {
556        //    eprintln!(e);
557        //}
558    }
559}
560
561mod dependent {
562    //use crossbeam_epoch::{Atomic, Guard, Owned, Shared};
563    //use std::sync::atomic::Ordering;
564
565    //struct Node<K, V> {
566    //    key: K,
567    //    value: V,
568    //    // The tower is a fixed-size array or Vec of Atomic Pointers
569    //    next: Vec<Atomic<Node<K, V>>>,
570    //}
571
572    //pub struct SkipList<K, V> {
573    //    // The head is just a tower of pointers to the first nodes
574    //    head: Vec<Atomic<Node<K, V>>>,
575    //}
576}
577
578// Prototype for skip map
579
580//struct Node<K, V> {
581//    key: K,
582//    value: Arc<V>, // Shared value across all levels
583//    next: Link<K, V>,
584//    down: Option<Arc<Node<K, V>>>, // Pointer to the level below
585//}
586
587//pub struct ConcurrentSkipMap<K, V> {
588//    // Array of heads for each level
589//    heads: Vec<Link<K, V>>,
590//}