Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From sigq-0.11.0 To sigq-0.12.0
2023-07-25
| ||
00:28 | Keep track of whether the Puller has disappeared. check-in: 58a487a4a1 user: jan tags: trunk | |
2023-07-24
| ||
21:34 | Clippy. check-in: 8c44c91ce4 user: jan tags: sigq-0.12.0, trunk | |
21:29 | Release maintenance. check-in: 470cdcb5d3 user: jan tags: trunk | |
2022-09-11
| ||
15:10 | Implement inline-more feature. check-in: ee68191032 user: jan tags: trunk | |
2022-09-09
| ||
16:56 | Prepare for release of 0.11.0. check-in: c16d005501 user: jan tags: sigq-0.11.0, trunk | |
16:51 | Cargo.toml maintenance. check-in: 2a9cd0b2bb user: jan tags: trunk | |
Changes to .efiles.
1 2 3 | Cargo.toml www/changelog.md src/lib.rs | > > > | 1 2 3 4 5 6 | Cargo.toml www/changelog.md src/lib.rs src/push.rs src/pull.rs tests/simple.rs |
Changes to Cargo.toml.
1 2 | [package] name = "sigq" | | | > | > > > > > | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | [package] name = "sigq" version = "0.12.0" authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"] edition = "2021" license = "0BSD" categories = [ "asynchronous", "concurrency", "data-structures" ] keywords = [ "threads", "sync" ] repository = "https://repos.qrnch.tech/pub/sigq" description = "Queue that signals waiting consumers about node availability" exclude = [ ".efiles", ".fossil-settings", ".fslckout", "rustfmt.toml", "tests", "www" ] rust-version = "1.64" [features] default = ["inline-more"] inline-more = [] [dependencies] indexmap = { version = "2.0.0" } parking_lot = { version = "0.12.1" } |
Changes to src/lib.rs.
1 2 3 | //! Queue which supports pushing and poping nodes from threads/tasks, crossing //! sync/async boundaries. | | | | | | < | > > > > > < < | | < < < < < < < < < < < < | < < | < < < < < < < | < | | < < < | < < < < < < < < < < < | < < < | | < | | < < < < < < < < < < < < < < < < < < < < < < < < < < < < | < < | < < < < < < < < < < < < | < < < < < | < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | //! Queue which supports pushing and poping nodes from threads/tasks, crossing //! sync/async boundaries. pub mod pull; pub mod push; use std::{ collections::VecDeque, sync::atomic::AtomicUsize, sync::Arc, task::Waker }; use parking_lot::{Condvar, Mutex}; use indexmap::IndexMap; pub use pull::{Puller, StaleErr}; pub use push::Pusher; /// Inner shared data. /// /// This is read/write data, and hence protected by a mutex. struct Inner<I> { q: VecDeque<I>, npushers: usize, wakers: IndexMap<usize, Waker> } /// Inner shared data. struct Shared<I> { signal: Condvar, inner: Mutex<Inner<I>>, idgen: AtomicUsize } /// Create a new queue and return its paired push and pull objects. pub fn new<T>() -> (Pusher<T>, Puller<T>) { let inner = Inner { q: VecDeque::new(), npushers: 1, wakers: IndexMap::new() }; let shared = Shared { signal: Condvar::new(), inner: Mutex::new(inner), idgen: AtomicUsize::new(1) }; let shared = Arc::new(shared); let pusher = Pusher(Arc::clone(&shared)); let puller = Puller(shared); (pusher, puller) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Added src/pull.rs.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 | use std::{ future::Future, num::NonZeroUsize, pin::Pin, sync::atomic::Ordering, sync::Arc, task::{Context, Poll} }; #[derive(Debug, PartialEq)] pub struct StaleErr; /// The receiving end-point of queue. #[repr(transparent)] pub struct Puller<I>(pub(crate) Arc<super::Shared<I>>); impl<I> Puller<I> { /// Pull the oldest node off the queue and return it. /// /// If no nodes are available on the queue, then block and wait for one to /// become available. /// /// Returns `Err(())` if there are no more items in queue and there are no /// more [`Pusher`](super::Pusher) objects associated with this `Puller`. #[cfg_attr(feature = "inline-more", inline)] pub fn pop(&self) -> Result<I, StaleErr> { let mut inner = self.0.inner.lock(); loop { if inner.npushers == 0 { break Err(StaleErr); } else { match inner.q.pop_front() { Some(node) => { break Ok(node); } None => { self.0.signal.wait(&mut inner); } } } } } /// Pull the oldest node off the queue and return it. /// /// If a node is available on the queue then take it off and return it. /// /// If no nodes are available and there are no more [`Pusher`](super::Pusher) /// objects associated with this `Puller`, then return `Err(StaleErr)`. /// /// If no nodes were available and there's at least one associated `Pusher` /// exists then return `Ok(None)`. #[cfg_attr(feature = "inline-more", inline)] pub fn try_pop(&self) -> Result<Option<I>, StaleErr> { let mut inner = self.0.inner.lock(); if let Some(n) = inner.q.pop_front() { Ok(Some(n)) } else if inner.npushers == 0 { Err(StaleErr) } else { Ok(None) } } /// This method serves the same purpose as the [`pop()`](#method.pop) method, /// but rather than block it returns a `Future` to be used to wait for a node /// to arrive in an `async` context. /// /// ``` /// async fn test() { /// let (tx, rx) = sigq::new(); /// tx.push("hello"); /// assert_eq!(rx.was_empty(), false); /// let node = rx.apop().await.unwrap(); /// assert_eq!(node, "hello"); /// assert_eq!(rx.was_empty(), true); /// } /// ``` #[cfg_attr(feature = "inline-more", inline)] pub fn apop(&self) -> PopFuture<I> { PopFuture { ctx: Arc::clone(&self.0), id: None } } /// Returns a boolean indicating whether the queue was empty or not. /// /// This function is not particularly useful. If you don't understand why, /// then please don't use it. #[cfg_attr(feature = "inline-more", inline)] pub fn was_empty(&self) -> bool { let inner = self.0.inner.lock(); inner.q.is_empty() } } #[doc(hidden)] pub struct PopFuture<I> { ctx: Arc<super::Shared<I>>, id: Option<NonZeroUsize> } impl<I: 'static + Send> Future for PopFuture<I> { type Output = Result<I, StaleErr>; fn poll( mut self: Pin<&mut Self>, ctx: &mut Context<'_> ) -> Poll<Self::Output> { let mut inner = self.ctx.inner.lock(); match inner.q.pop_front() { Some(node) => Poll::Ready(Ok(node)), None => { if inner.npushers == 0 { // No more nodes and no more pushers, so return None Poll::Ready(Err(StaleErr)) } else { // Generate a unique identifier for this waker let id = loop { let id = self.ctx.idgen.fetch_add(1, Ordering::SeqCst); // Make sure it is non-zero and unique if id == 0 || inner.wakers.contains_key(&id) { continue; } break id; }; inner.wakers.insert(id, ctx.waker().clone()); drop(inner); self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) }); Poll::Pending } } } } } impl<I> Drop for PopFuture<I> { fn drop(&mut self) { if let Some(id) = self.id { let mut inner = self.ctx.inner.lock(); // Remove this future's waker let _ = inner.wakers.remove(&id.get()); } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Added src/push.rs.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | use std::sync::Arc; /// The transmitting end-point of queue. #[repr(transparent)] pub struct Pusher<I>(pub(crate) Arc<super::Shared<I>>); impl<I> Pusher<I> { /// Push a node on to the queue and unlock one queue reader, if any. /// /// If there are any tasks or threads waiting for new nodes to arrive they /// will be notified. #[cfg_attr(feature = "inline-more", inline)] pub fn push(&self, item: I) { let mut inner = self.0.inner.lock(); inner.q.push_back(item); if let Some((_, n)) = inner.wakers.pop() { n.wake(); } self.0.signal.notify_one(); drop(inner); } /// Returns a boolean indicating whether the queue was empty or not. /// /// This function is not particularly useful. If you don't understand why, /// then please don't use it. #[cfg_attr(feature = "inline-more", inline)] pub fn was_empty(&self) -> bool { let inner = self.0.inner.lock(); inner.q.is_empty() } } impl<I> Clone for Pusher<I> { fn clone(&self) -> Self { let mut inner = self.0.inner.lock(); inner.npushers += 1; Self(Arc::clone(&self.0)) } } impl<I> Drop for Pusher<I> { fn drop(&mut self) { let mut inner = self.0.inner.lock(); inner.npushers -= 1; // If this was the last pusher then wake any pullers that are waiting to // receive new items. (When they discover that no pushers remain they will // return None). if inner.npushers == 0 { self.0.signal.notify_all(); for waker in inner.wakers.drain(..).map(|(_k, v)| v) { waker.wake() } } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Added tests/simple.rs.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 | use sigq::StaleErr; #[test] fn try_pop_retvals() { let (tx, rx) = sigq::new(); // There's a client, no data, so try_pop() should return Ok(None) assert_eq!(rx.try_pop(), Ok(None)); tx.push("hello"); // There's a client, data, so try_pop() should return Ok(Some) assert_eq!(rx.try_pop(), Ok(Some("hello"))); // There's a client, so try_pop() should return Ok(None) assert_eq!(rx.try_pop(), Ok(None)); // Drop the only pusher. drop(tx); // There are no more clients, so try_pop() should return Err(()) assert_eq!(rx.try_pop(), Err(StaleErr)); } #[test] fn drop_nonempty() { let (tx, rx) = sigq::new(); // There's a client, no data, so try_pop() should return Ok(None) assert_eq!(rx.try_pop(), Ok(None)); tx.push("hello"); // Drop the only pusher. It should still be possible to pull existing nodes // off the stack. drop(tx); // There's a client, data, so try_pop() should return Ok(Some) assert_eq!(rx.try_pop(), Ok(Some("hello"))); // There are no more clients, so try_pop() should return Err(()) assert_eq!(rx.try_pop(), Err(StaleErr)); } #[test] fn unblock_on_stale() { let (tx, rx) = sigq::new::<&str>(); // Kick off a thread with a Puller; blocking and waiting for a node or // termination. When the Pusher is dropped this should return Err(StaleErr). let jh = std::thread::spawn(move || match rx.pop() { Ok(_) => false, Err(StaleErr) => true }); std::thread::sleep(std::time::Duration::from_millis(500)); drop(tx); assert!(jh.join().unwrap()); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to www/changelog.md.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.11.0] - 2022-09-09 ### Added | > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.12.0] - 2023-07-24 ### Added - Add an `inline-more` feature (enabled by default). ### Changed - Split `Queue` into `Pusher` and `Puller` end-points. - When taking nodes off the queue using `Puller`, return a `Result` which can indicate an error if no nodes remain and there are no more `Pushers` associated with the queue (implied: If no pushers remain, no new nodes will be added, so the queue is effectively dead). - Dependencies updated: - `indexmap` updated to `2.0.0` - Use Rust edition 2021. - Bump MSRV to 1.64 (based on MSRV in `indexmap`) ### Removed ## [0.11.0] - 2022-09-09 ### Added |
︙ | ︙ |