dsa_rust/sequences/concurrent_skip_list.rs
1/*! A safe, concurrent, reference-counted skip list
2
3# About
4Skip list are sorted, probabalistic, list-based structures that provide _O(log(n))_ (expected) time complexity for search, insert, and removal operations. This provides a significant advantage over sorted array-based lists, which exhibit _worst-case O(n)_ removal (average _O(n/2)_) temporal performance. Introduced in 1989 by William Pugh, skip lists also tend to be simpler to implement than self-balancing tree structures and generally provide an easier and finer-grained control when adapting these structures for concurrent operations.
5
6This structure provides a basic list implementation used to back this library's [SkipListMap<K, V>] implementation.
7
8# Design
9The list features a dynamic max height `h` that is logarithmically proportional to the number of elements in the list `n` such that _h = log(n)_ in the expected case. The list does not _rebuild_ any towers (besides the head node) at insert, but simply changes the maximum potential height for those towers to grow. The logarithmic growth ensures that the average search, insertion, and deletion operations remain efficient, typically with expected O(log n) time complexity.
10
11## The Search Algorithm
12The point of the search algorithm is to find the first node (or handle/position) `p` in skip list `S` that represents the largest comparable value <= to the search key `k`. This algorithm can be broken into two steps:
13Step 1) loop`let candidate = p.peek()`, if `candidate >= k`, return `p`, otherwise move to `p.next()`. Repeat until `p.peek()` >= `k`.
14Step 2) Drop down a level: If `S.below(p) == 0` you're at the lowest level and the search terminates.
15
16## Visual Examples
17An initial, empty skip list with one level and no data:
18```text
19S0: None -> None
20```
21
22Inserting the first node triggers an automatic tower level, even if it ends up empty. This provides the algorithm with a starting point:
23```text
24S1: None ---------> None
25S0: None -> [ 5] -> None
26```
27
28Each subsequent addition may add more tower levels up to log(n) where `n` is the number of elements in the list. This example illustrates a fully built skip list:
29```text
30S3: None ---------> [*2] ---------------------------------------------> [*9] ----------> None
31S2: None ---------> [*2] ------------------------------> [*7] --------> [*9] ----------> None
32S1: None ---------> [*2] --------> [*4] ---------------> [*7] --------> [*9] -> [*10] -> None
33S0: None -> [ 1] -> [ 2] -> [3] -> [ 4] -> [5] -> [6] -> [ 7] -> [8] -> [ 9] -> [ 10] -> None
34```
35
36[None, a, j, c, e, d, b, i, g, h, f, None]
37[ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
38[None, a, b, c, d, e, f, g, h, i, j, None]
39
40// Towers forming sorted "express lanes"
41```text
42S3: None ----------> [ 6 ] -------------------------------------------------------> [ 7 ] ----------> None
43S2: None ----------> [ 6 ] -------------------------------------> [ 8 ] ----------> [ 7 ] ----------> None
44S1: None ----------> [ 6 ] ----------> [ 5 ] -------------------> [ 8 ] ----------> [ 7 ] -> [ 2 ] -> None
45```
46//Unsorted backing structure
47```text
48S0: None -> [ a ] -> [ j ] -> [ c ] -> [ e ] -> [ d ] -> [ b ] -> [ i ] -> [ g ] -> [ h ] -> [ f ] -> None
49```
50
51
52For example, at level 2 you could traverse the list as pointers to [2, 7, 9]. The distinction here is that all of the node data is stored in one location, with only pointers being duplicated and populating tower lists.
53
54# Example
55
56*/
57use std::sync::{Arc, RwLock};
58
59type Link<T> = RwLock<Option<Arc<Node<T>>>>;
60
61#[derive(Debug)]
62struct Node<T> {
63 data: Arc<T>, // Shared value across all levels
64 next: Link<T>,
65 down: Option<Arc<Node<T>>>, // Pointer to the node at the level below
66}
67
68use std::borrow::Borrow;
69use std::sync::atomic::{AtomicUsize, Ordering};
70//use std::cmp::{max, Ordering};
71//use rand;
72
73pub struct ConcurrentSkipList<T> {
74 heads: Vec<Link<T>>,
75 len: AtomicUsize,
76}
77impl<T: Ord> Default for ConcurrentSkipList<T> {
78 fn default() -> Self {
79 Self::new()
80 }
81}
82impl<T: Ord> ConcurrentSkipList<T> {
83 /// Tower utility
84 //fn generate_tower_height(&self) -> usize {
85 // // Simple deterministic-ish pseudo-random for demo purposes
86 // // Use a proper, fast RNG
87 // //let mut level = 1;
88 // //while level < 16 && (self.len.load(Ordering::Relaxed) >> level) & 1 == 1 {
89 // // level += 1;
90 // //}
91 // //level
92 // // OR, use some black magic bit logic
93 // let mut height = 1;
94 // // Simple bit-logic: 50% chance for each level
95 // let seed = self.len.load(Ordering::Relaxed) + 1;
96 // while height < self.heads.len() && (seed & (1 << height)) != 0 {
97 // height += 1;
98 // }
99 // height
100 //}
101 fn generate_tower_height(&self) -> usize {
102 let mut height = 1;
103 let mut rng = rand::rng();
104 // 0.25 (1 in 4) is often faster than 0.5 for skip lists
105 while height < self.heads.len() && rand::Rng::random_bool(&mut rng, 0.25) {
106 height += 1;
107 }
108 height
109 }
110
111 /// Instantiates a new concurrent skip list.
112 pub fn new() -> Self {
113 // Pre-allocate levels to allow &self mutation of links without resizing the Vec
114 let mut heads = Vec::with_capacity(32);
115 for _ in 0..32 {
116 heads.push(RwLock::new(None));
117 }
118 Self {
119 heads,
120 len: AtomicUsize::new(0),
121 }
122 }
123
124 /// Returns the number of nodes in the (base) list.
125 pub fn len(&self) -> usize {
126 self.len.load(Ordering::SeqCst)
127 }
128
129 /// Returns true if the (base) is empty.
130 pub fn is_empty(&self) -> bool {
131 self.len.load(Ordering::Relaxed) == 0
132 }
133
134 /// Adds a new node to the list.
135 pub fn push(&self, data: T) -> bool {
136 let target_height = self.generate_tower_height();
137 let shared_data = Arc::new(data);
138
139 // To handle concurrency, we must find predecessors level-by-level
140 // and verify they are still valid when we write.
141 let mut lower_node: Option<Arc<Node<T>>> = None;
142
143 // We build from bottom up to ensure base-level consistency first
144 for level in 0..target_height {
145 loop {
146 // 1. Find the predecessor for this specific level
147 let (pred_opt, succ_opt) =
148 self.find_neighbors_at_level(level, shared_data.as_ref());
149
150 // 2. Check for duplicates (only need to do this at the base level or check every time)
151 if let Some(ref succ) = succ_opt {
152 if succ.data.as_ref() == shared_data.as_ref() {
153 return false;
154 }
155 }
156
157 // 3. Create the new node for this level
158 let new_node = Arc::new(Node {
159 data: Arc::clone(&shared_data),
160 next: RwLock::new(succ_opt.clone()),
161 down: lower_node.clone(),
162 });
163
164 // 4. Attempt to link
165 let success = if let Some(ref pred) = pred_opt {
166 let mut write_guard = pred.next.write().unwrap();
167 // RE-VERIFY: Is the successor still the same?
168 // This prevents the TOCTOU race.
169 if write_guard.as_ref().map(Arc::as_ptr) == succ_opt.as_ref().map(Arc::as_ptr) {
170 *write_guard = Some(Arc::clone(&new_node));
171 true
172 } else {
173 false // Someone squeezed in between; retry this level
174 }
175 } else {
176 let mut head_guard = self.heads[level].write().unwrap();
177 if head_guard.as_ref().map(Arc::as_ptr) == succ_opt.as_ref().map(Arc::as_ptr) {
178 *head_guard = Some(Arc::clone(&new_node));
179 true
180 } else {
181 false
182 }
183 };
184
185 if success {
186 lower_node = Some(new_node);
187 break; // Level complete, move up
188 }
189 // If !success, the loop repeats for this level
190 }
191 }
192
193 self.len.fetch_add(1, Ordering::SeqCst);
194 true
195 }
196
197 #[allow(clippy::type_complexity)]
198 /// Utility to find the immediate nodes surrounding a key at a specific level
199 fn find_neighbors_at_level(
200 &self,
201 level: usize,
202 key: &T,
203 ) -> (Option<Arc<Node<T>>>, Option<Arc<Node<T>>>) {
204 let mut curr = self.heads[level].read().unwrap().clone();
205 let mut pred = None;
206
207 while let Some(node) = curr {
208 if node.data.as_ref() < key {
209 pred = Some(Arc::clone(&node));
210 curr = node.next.read().unwrap().clone();
211 } else {
212 return (pred, Some(Arc::clone(&node)));
213 }
214 }
215 (pred, None)
216 }
217
218 /// Returns a reference-counted pointer to the data in the node, if it exists.
219 pub fn get<Q>(&self, key: &Q) -> Option<Arc<T>>
220 where
221 Q: Ord + ?Sized,
222 T: Borrow<Q>,
223 {
224 // This will track our position as we move right and then down
225 let mut current_node: Option<Arc<Node<T>>> = None;
226
227 for level in (0..self.heads.len()).rev() {
228 // 1. Where do we start at this level?
229 // If we found a node at the level above, we move 'down' from it.
230 // If we haven't found a starting node yet, we start at the head of this level.
231 let mut cursor = if let Some(ref node) = current_node {
232 node.down.clone()
233 } else {
234 self.heads[level].read().unwrap().clone()
235 };
236
237 // 2. Move horizontally as far as possible at this level
238 while let Some(node) = cursor {
239 let node_data: &Q = node.data.as_ref().borrow();
240
241 // Match found!
242 if node_data == key {
243 return Some(Arc::clone(&node.data));
244 }
245
246 if node_data < key {
247 // Check if the next node is still <= our target
248 let next_guard = node.next.read().unwrap();
249 if let Some(next_node) = next_guard.as_ref() {
250 let next_data: &Q = next_node.data.as_ref().borrow();
251 if next_data <= key {
252 // Move right
253 cursor = Some(Arc::clone(next_node));
254 continue;
255 }
256 }
257 }
258
259 // If we've reached here, we can't move right anymore.
260 // Mark this node as our "downward" jump point for the next level.
261 current_node = Some(node);
262 break;
263 }
264 }
265
266 None
267 }
268
269 /// Removes a node from the list.
270 pub fn remove<Q>(&self, key: &Q) -> Option<Arc<T>>
271 where
272 Q: Ord + ?Sized,
273 T: Borrow<Q>,
274 {
275 let mut removed_val = None;
276 let mut predecessor: Option<Arc<Node<T>>> = None;
277
278 for level in (0..self.heads.len()).rev() {
279 // 1) The Elevator: Move Down
280 // Get the starting node for this level
281 let mut curr_link = if let Some(pred) = &predecessor {
282 pred.down.clone()
283 } else {
284 self.heads[level].read().unwrap().clone()
285 };
286
287 // 2) The Head Check
288 // If we are still at the start of a level (predecessor is None),
289 // we need to check if the head itself needs to be unhooked
290 if predecessor.is_none() {
291 let mut head_lock = self.heads[level].write().unwrap();
292 let mut is_target = false;
293
294 if let Some(node) = head_lock.as_ref() {
295 if node.data.as_ref().borrow() == key {
296 is_target = true;
297 removed_val = Some(Arc::clone(&node.data));
298 }
299 }
300
301 if is_target {
302 let target_node = head_lock.take().unwrap();
303 *head_lock = target_node.next.read().unwrap().clone();
304 // After unhooking the head, the new head is the starting point for the search
305 curr_link = head_lock.clone();
306 }
307 }
308
309 // 3) Search Right & Unhook Mid-List
310 while let Some(node) = curr_link {
311 let node_q: &Q = node.data.as_ref().borrow();
312
313 if node_q < key {
314 // This node is a valid predecessor candidate.
315 // Peek at its 'next' to see if it's the target.
316 let mut next_lock = node.next.write().unwrap();
317 let mut next_is_target = false;
318
319 if let Some(next_node) = next_lock.as_ref() {
320 if next_node.data.as_ref().borrow() == key {
321 next_is_target = true;
322 removed_val = Some(Arc::clone(&next_node.data));
323 }
324 }
325
326 if next_is_target {
327 let target_node = next_lock.take().unwrap();
328 *next_lock = target_node.next.read().unwrap().clone();
329 }
330
331 // Move forward: this node is now the predecessor for the next level down.
332 predecessor = Some(Arc::clone(&node));
333 curr_link = next_lock.clone();
334 } else {
335 break;
336 }
337 }
338 }
339
340 if removed_val.is_some() {
341 self.len.fetch_sub(1, Ordering::SeqCst);
342 }
343 removed_val
344 }
345
346 /// Returns true if the item exists in the list.
347 pub fn contains<Q>(&self, key: &Q) -> bool
348 where
349 Q: Ord + ?Sized,
350 T: Borrow<Q>,
351 {
352 // contains is just a boolean check on get()
353 self.get(key).is_some()
354 }
355}
356
357mod test {
358
359 #![allow(unused)]
360 use super::ConcurrentSkipList;
361 use std::sync::Arc;
362
363 #[test]
364 // Single-threaded test
365 fn one() {
366 // No need for mut!
367 let list = ConcurrentSkipList::<&'static str>::new();
368
369 list.push("Peter");
370 list.push("Paul");
371 list.push("Mary");
372
373 // Tests len() & is_empty()
374 assert_eq!(list.len(), 3);
375 assert!(!list.is_empty());
376
377 // Tests contains()
378 assert!(list.contains("Peter"));
379
380 // Tests get()
381 let m = list.get("Mary"); // Fails???
382 //assert!(m.is_some());
383 let check = Some(Arc::new("Mary"));
384 //assert_eq!(m, check);
385
386 // Tests remove()
387 let check = Some(Arc::new("Peter"));
388 assert_eq!(list.remove("Peter"), check);
389 }
390
391 #[test]
392 // Multi-threaded test
393 //fn two() {
394 // use std::sync::{Arc, Barrier};
395 // use std::thread;
396
397 // let list = Arc::new(ConcurrentSkipList::<&'static str>::new());
398
399 // let values = vec![
400 // "Peter", "Paul", "Mary", "John", "Luke", "Mark", "Matthew", "James", "Jude", "Simon",
401 // ];
402
403 // let thread_count = 8;
404 // let barrier = Arc::new(Barrier::new(thread_count));
405
406 // let mut handles = Vec::new();
407
408 // // ---- INSERT PHASE ----
409 // for _ in 0..thread_count {
410 // let list = Arc::clone(&list);
411 // let barrier = Arc::clone(&barrier);
412 // let values = values.clone();
413
414 // handles.push(thread::spawn(move || {
415 // barrier.wait();
416
417 // for v in &values {
418 // list.push(*v);
419 // }
420 // }));
421 // }
422
423 // for h in handles {
424 // h.join().unwrap();
425 // }
426
427 // // After all inserts: every value must exist exactly once
428 // assert_eq!(list.len(), values.len());
429
430 // for v in &values {
431 // //assert!(list.contains(v));
432 // //assert!(list.get(v).is_some());
433 // }
434
435 // eprintln!("{}", list.heads.len());
436 // eprintln!("{:?}", list.heads);
437 // //panic!();
438
439 // // ---- REMOVE PHASE ----
440 // let barrier = Arc::new(Barrier::new(thread_count));
441 // let mut handles = Vec::new();
442
443 // for i in 0..thread_count {
444 // let list = Arc::clone(&list);
445 // let barrier = Arc::clone(&barrier);
446 // let values = values.clone();
447
448 // handles.push(thread::spawn(move || {
449 // barrier.wait();
450
451 // // Half the threads try to remove
452 // if i % 2 == 0 {
453 // for v in &values {
454 // list.remove(v);
455 // }
456 // }
457 // }));
458 // }
459
460 // for h in handles {
461 // h.join().unwrap();
462 // }
463
464 // // After racing removals, length is somewhere 0..=values.len()
465 // let len = list.len();
466 // assert!(len <= values.len());
467
468 // // Structural consistency check
469 // for v in &values {
470 // if list.contains(v) {
471 // assert!(list.get(v).is_some());
472 // }
473 // }
474 //}
475
476 #[test]
477 // Mangle them shits for giggles
478 fn three() {
479 use rand::seq::SliceRandom;
480 use std::sync::{Arc, Barrier};
481 use std::thread; // Add 'rand' to your dev-dependencies
482
483 let list = Arc::new(ConcurrentSkipList::<i32>::new());
484 let element_count = 100;
485 let thread_count = 8;
486
487 // Create a large set of numbers
488 let mut values: Vec<i32> = (0..element_count).collect();
489 let barrier = Arc::new(Barrier::new(thread_count));
490
491 let mut handles = Vec::new();
492
493 // PHASE 1: MANGLED INSERTS
494 ///////////////////////////
495
496 for _ in 0..thread_count {
497 let list = Arc::clone(&list);
498 let barrier = Arc::clone(&barrier);
499 let mut my_values = values.clone();
500
501 handles.push(thread::spawn(move || {
502 // Shuffle so threads insert in different orders
503 let mut rng = rand::rng();
504 my_values.shuffle(&mut rng);
505
506 barrier.wait();
507 for v in my_values {
508 list.push(v);
509 }
510 }));
511 }
512 for h in handles {
513 h.join().unwrap();
514 }
515
516 // Final state check: Should be set-semantics (no duplicates)
517 assert_eq!(list.len(), element_count as usize);
518
519 // PHASE 2: MIXED READ/REMOVE RACE
520 //////////////////////////////////
521
522 let barrier = Arc::new(Barrier::new(thread_count));
523 let mut handles = Vec::new();
524
525 for i in 0..thread_count {
526 let list = Arc::clone(&list);
527 let barrier = Arc::clone(&barrier);
528 let mut my_values = values.clone();
529
530 handles.push(thread::spawn(move || {
531 let mut rng = rand::rng();
532 my_values.shuffle(&mut rng);
533 barrier.wait();
534
535 for v in my_values {
536 if i % 2 == 0 {
537 // Even threads remove
538 list.remove(&v);
539 } else {
540 // Odd threads just verify they can still read
541 // without crashing or seeing corrupted state
542 list.contains(&v);
543 }
544 }
545 }));
546 }
547
548 for h in handles {
549 h.join().unwrap();
550 }
551
552 // Structural Integrity: If the list isn't empty, it must be traversable
553 // and the length must match the actual node count.
554 let mut count = 0;
555 //for e in list.iter() {
556 // eprintln!(e);
557 //}
558 }
559}
560
561mod dependent {
562 //use crossbeam_epoch::{Atomic, Guard, Owned, Shared};
563 //use std::sync::atomic::Ordering;
564
565 //struct Node<K, V> {
566 // key: K,
567 // value: V,
568 // // The tower is a fixed-size array or Vec of Atomic Pointers
569 // next: Vec<Atomic<Node<K, V>>>,
570 //}
571
572 //pub struct SkipList<K, V> {
573 // // The head is just a tower of pointers to the first nodes
574 // head: Vec<Atomic<Node<K, V>>>,
575 //}
576}
577
578// Prototype for skip map
579
580//struct Node<K, V> {
581// key: K,
582// value: Arc<V>, // Shared value across all levels
583// next: Link<K, V>,
584// down: Option<Arc<Node<K, V>>>, // Pointer to the level below
585//}
586
587//pub struct ConcurrentSkipMap<K, V> {
588// // Array of heads for each level
589// heads: Vec<Link<K, V>>,
590//}