Index: .efiles ================================================================== --- .efiles +++ .efiles @@ -1,3 +1,6 @@ Cargo.toml www/changelog.md src/lib.rs +src/push.rs +src/pull.rs +tests/simple.rs Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,10 +1,10 @@ [package] name = "sigq" -version = "0.11.0" +version = "0.12.0" authors = ["Jan Danielsson "] -edition = "2018" +edition = "2021" license = "0BSD" categories = [ "asynchronous", "concurrency", "data-structures" ] keywords = [ "threads", "sync" ] repository = "https://repos.qrnch.tech/pub/sigq" description = "Queue that signals waiting consumers about node availability" @@ -11,13 +11,19 @@ exclude = [ ".efiles", ".fossil-settings", ".fslckout", "rustfmt.toml", + "tests", "www" ] -rust-version = "1.36" +rust-version = "1.64" + +[features] +default = ["inline-more"] + +inline-more = [] [dependencies] -indexmap = { version = "1.9.1" } +indexmap = { version = "2.0.0" } parking_lot = { version = "0.12.1" } Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -1,175 +1,54 @@ //! Queue which supports pushing and poping nodes from threads/tasks, crossing //! sync/async boundaries. -use std::collections::VecDeque; -use std::future::Future; -use std::num::NonZeroUsize; -use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::task::{Context, Poll, Waker}; +pub mod pull; +pub mod push; + +use std::{ + collections::VecDeque, sync::atomic::AtomicUsize, sync::Arc, task::Waker +}; use parking_lot::{Condvar, Mutex}; use indexmap::IndexMap; + +pub use pull::{Puller, StaleErr}; +pub use push::Pusher; /// Inner shared data. /// /// This is read/write data, and hence protected by a mutex. struct Inner { q: VecDeque, + npushers: usize, wakers: IndexMap } +/// Inner shared data. struct Shared { signal: Condvar, inner: Mutex>, idgen: AtomicUsize } -#[repr(transparent)] -pub struct Queue(Arc>); - - -impl Queue { - /// Create, and return, a new queue. - pub fn new() -> Self { - Queue::default() - } - - /// Returns a boolean indicating whether the queue was empty or not. - /// - /// This function is not particularly useful. If you don't understand why, - /// then please don't use it. - #[inline(always)] - pub fn was_empty(&self) -> bool { - let inner = self.0.inner.lock(); - inner.q.is_empty() - } - - /// Push a node on to the queue and unlock one queue reader, if any. - /// - /// If there are any tasks or threads waiting for new nodes to arrive they - /// will be notified. - #[inline(always)] - pub fn push(&self, item: I) { - let mut inner = self.0.inner.lock(); - inner.q.push_back(item); - if let Some((_, n)) = inner.wakers.pop() { - n.wake(); - } - self.0.signal.notify_one(); - drop(inner); - } - - /// Pull the oldest node off the queue and return it. If no nodes are - /// available on the queue, then block and wait for one to become available. - #[inline(always)] - pub fn pop(&self) -> I { - let mut inner = self.0.inner.lock(); - let node = loop { - match inner.q.pop_front() { - Some(node) => { - break node; - } - None => { - self.0.signal.wait(&mut inner); - } - } - }; - drop(inner); - - node - } - - /// Pull the oldest node off the queue and return it. If no nodes are - /// available on the queue, then return `None`. - #[inline(always)] - pub fn try_pop(&self) -> Option { - let mut inner = self.0.inner.lock(); - inner.q.pop_front() - } - - /// This method serves the same purpose as the [`pop()`](#method.pop) method, - /// but rather than block it returns a `Future` to be used to wait for a node - /// to arrive in an `async` context. - /// - /// ``` - /// use sigq::Queue; - /// async fn test() { - /// let q = Queue::new(); - /// q.push("hello".to_string()); - /// assert_eq!(q.was_empty(), false); - /// let node = q.apop().await; - /// assert_eq!(node, "hello"); - /// assert_eq!(q.was_empty(), true); - /// } - /// ``` - #[inline(always)] - pub fn apop(&self) -> PopFuture { - PopFuture { - ctx: Arc::clone(&self.0), - id: None - } - } -} - -impl Default for Queue { - fn default() -> Self { - Queue(Arc::new(Shared { - signal: Condvar::new(), - inner: Mutex::new(Inner { - q: VecDeque::new(), - wakers: IndexMap::new() - }), - idgen: AtomicUsize::new(1) - })) - } -} - - -#[doc(hidden)] -pub struct PopFuture { - ctx: Arc>, - id: Option -} - -impl Future for PopFuture { - type Output = I; - fn poll( - mut self: Pin<&mut Self>, - ctx: &mut Context<'_> - ) -> Poll { - let mut inner = self.ctx.inner.lock(); - match inner.q.pop_front() { - Some(node) => Poll::Ready(node), - None => { - // Generate a unique identifier for this waker - let id = loop { - let id = self.ctx.idgen.fetch_add(1, Ordering::SeqCst); - // Make sure if is non-zero and unique - if id == 0 || inner.wakers.contains_key(&id) { - continue; - } - break id; - }; - inner.wakers.insert(id, ctx.waker().clone()); - drop(inner); - self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) }); - Poll::Pending - } - } - } -} - -impl Drop for PopFuture { - fn drop(&mut self) { - if let Some(id) = self.id { - let mut inner = self.ctx.inner.lock(); - // Remove this future's waker - let _ = inner.wakers.remove(&id.get()); - } - } +/// Create a new queue and return its paired push and pull objects. +pub fn new() -> (Pusher, Puller) { + let inner = Inner { + q: VecDeque::new(), + npushers: 1, + wakers: IndexMap::new() + }; + let shared = Shared { + signal: Condvar::new(), + inner: Mutex::new(inner), + idgen: AtomicUsize::new(1) + }; + let shared = Arc::new(shared); + + let pusher = Pusher(Arc::clone(&shared)); + let puller = Puller(shared); + + (pusher, puller) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/pull.rs Index: src/pull.rs ================================================================== --- /dev/null +++ src/pull.rs @@ -0,0 +1,148 @@ +use std::{ + future::Future, + num::NonZeroUsize, + pin::Pin, + sync::atomic::Ordering, + sync::Arc, + task::{Context, Poll} +}; + +#[derive(Debug, PartialEq)] +pub struct StaleErr; + +/// The receiving end-point of queue. +#[repr(transparent)] +pub struct Puller(pub(crate) Arc>); + +impl Puller { + /// Pull the oldest node off the queue and return it. + /// + /// If no nodes are available on the queue, then block and wait for one to + /// become available. + /// + /// Returns `Err(())` if there are no more items in queue and there are no + /// more [`Pusher`](super::Pusher) objects associated with this `Puller`. + #[cfg_attr(feature = "inline-more", inline)] + pub fn pop(&self) -> Result { + let mut inner = self.0.inner.lock(); + loop { + if inner.npushers == 0 { + break Err(StaleErr); + } else { + match inner.q.pop_front() { + Some(node) => { + break Ok(node); + } + None => { + self.0.signal.wait(&mut inner); + } + } + } + } + } + + /// Pull the oldest node off the queue and return it. + /// + /// If a node is available on the queue then take it off and return it. + /// + /// If no nodes are available and there are no more [`Pusher`](super::Pusher) + /// objects associated with this `Puller`, then return `Err(StaleErr)`. + /// + /// If no nodes were available and there's at least one associated `Pusher` + /// exists then return `Ok(None)`. + #[cfg_attr(feature = "inline-more", inline)] + pub fn try_pop(&self) -> Result, StaleErr> { + let mut inner = self.0.inner.lock(); + if let Some(n) = inner.q.pop_front() { + Ok(Some(n)) + } else if inner.npushers == 0 { + Err(StaleErr) + } else { + Ok(None) + } + } + + /// This method serves the same purpose as the [`pop()`](#method.pop) method, + /// but rather than block it returns a `Future` to be used to wait for a node + /// to arrive in an `async` context. + /// + /// ``` + /// async fn test() { + /// let (tx, rx) = sigq::new(); + /// tx.push("hello"); + /// assert_eq!(rx.was_empty(), false); + /// let node = rx.apop().await.unwrap(); + /// assert_eq!(node, "hello"); + /// assert_eq!(rx.was_empty(), true); + /// } + /// ``` + #[cfg_attr(feature = "inline-more", inline)] + pub fn apop(&self) -> PopFuture { + PopFuture { + ctx: Arc::clone(&self.0), + id: None + } + } + + /// Returns a boolean indicating whether the queue was empty or not. + /// + /// This function is not particularly useful. If you don't understand why, + /// then please don't use it. + #[cfg_attr(feature = "inline-more", inline)] + pub fn was_empty(&self) -> bool { + let inner = self.0.inner.lock(); + inner.q.is_empty() + } +} + + +#[doc(hidden)] +pub struct PopFuture { + ctx: Arc>, + id: Option +} + +impl Future for PopFuture { + type Output = Result; + fn poll( + mut self: Pin<&mut Self>, + ctx: &mut Context<'_> + ) -> Poll { + let mut inner = self.ctx.inner.lock(); + match inner.q.pop_front() { + Some(node) => Poll::Ready(Ok(node)), + None => { + if inner.npushers == 0 { + // No more nodes and no more pushers, so return None + Poll::Ready(Err(StaleErr)) + } else { + // Generate a unique identifier for this waker + let id = loop { + let id = self.ctx.idgen.fetch_add(1, Ordering::SeqCst); + // Make sure it is non-zero and unique + if id == 0 || inner.wakers.contains_key(&id) { + continue; + } + break id; + }; + inner.wakers.insert(id, ctx.waker().clone()); + drop(inner); + self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) }); + Poll::Pending + } + } + } + } +} + +impl Drop for PopFuture { + fn drop(&mut self) { + if let Some(id) = self.id { + let mut inner = self.ctx.inner.lock(); + // Remove this future's waker + let _ = inner.wakers.remove(&id.get()); + } + } +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/push.rs Index: src/push.rs ================================================================== --- /dev/null +++ src/push.rs @@ -0,0 +1,59 @@ +use std::sync::Arc; + +/// The transmitting end-point of queue. +#[repr(transparent)] +pub struct Pusher(pub(crate) Arc>); + +impl Pusher { + /// Push a node on to the queue and unlock one queue reader, if any. + /// + /// If there are any tasks or threads waiting for new nodes to arrive they + /// will be notified. + #[cfg_attr(feature = "inline-more", inline)] + pub fn push(&self, item: I) { + let mut inner = self.0.inner.lock(); + inner.q.push_back(item); + if let Some((_, n)) = inner.wakers.pop() { + n.wake(); + } + self.0.signal.notify_one(); + drop(inner); + } + + /// Returns a boolean indicating whether the queue was empty or not. + /// + /// This function is not particularly useful. If you don't understand why, + /// then please don't use it. + #[cfg_attr(feature = "inline-more", inline)] + pub fn was_empty(&self) -> bool { + let inner = self.0.inner.lock(); + inner.q.is_empty() + } +} + +impl Clone for Pusher { + fn clone(&self) -> Self { + let mut inner = self.0.inner.lock(); + inner.npushers += 1; + Self(Arc::clone(&self.0)) + } +} + +impl Drop for Pusher { + fn drop(&mut self) { + let mut inner = self.0.inner.lock(); + inner.npushers -= 1; + + // If this was the last pusher then wake any pullers that are waiting to + // receive new items. (When they discover that no pushers remain they will + // return None). + if inner.npushers == 0 { + self.0.signal.notify_all(); + for waker in inner.wakers.drain(..).map(|(_k, v)| v) { + waker.wake() + } + } + } +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED tests/simple.rs Index: tests/simple.rs ================================================================== --- /dev/null +++ tests/simple.rs @@ -0,0 +1,63 @@ +use sigq::StaleErr; + +#[test] +fn try_pop_retvals() { + let (tx, rx) = sigq::new(); + + // There's a client, no data, so try_pop() should return Ok(None) + assert_eq!(rx.try_pop(), Ok(None)); + + tx.push("hello"); + + // There's a client, data, so try_pop() should return Ok(Some) + assert_eq!(rx.try_pop(), Ok(Some("hello"))); + + // There's a client, so try_pop() should return Ok(None) + assert_eq!(rx.try_pop(), Ok(None)); + + // Drop the only pusher. + drop(tx); + + // There are no more clients, so try_pop() should return Err(()) + assert_eq!(rx.try_pop(), Err(StaleErr)); +} + +#[test] +fn drop_nonempty() { + let (tx, rx) = sigq::new(); + + // There's a client, no data, so try_pop() should return Ok(None) + assert_eq!(rx.try_pop(), Ok(None)); + + tx.push("hello"); + + // Drop the only pusher. It should still be possible to pull existing nodes + // off the stack. + drop(tx); + + // There's a client, data, so try_pop() should return Ok(Some) + assert_eq!(rx.try_pop(), Ok(Some("hello"))); + + // There are no more clients, so try_pop() should return Err(()) + assert_eq!(rx.try_pop(), Err(StaleErr)); +} + +#[test] +fn unblock_on_stale() { + let (tx, rx) = sigq::new::<&str>(); + + // Kick off a thread with a Puller; blocking and waiting for a node or + // termination. When the Pusher is dropped this should return Err(StaleErr). + let jh = std::thread::spawn(move || match rx.pop() { + Ok(_) => false, + Err(StaleErr) => true + }); + + std::thread::sleep(std::time::Duration::from_millis(500)); + + drop(tx); + + assert!(jh.join().unwrap()); +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -4,10 +4,31 @@ ### Added ### Changed +### Removed + + +## [0.12.0] - 2023-07-24 + +### Added + +- Add an `inline-more` feature (enabled by default). + +### Changed + +- Split `Queue` into `Pusher` and `Puller` end-points. + - When taking nodes off the queue using `Puller`, return a `Result` which can + indicate an error if no nodes remain and there are no more `Pushers` + associated with the queue (implied: If no pushers remain, no new nodes will + be added, so the queue is effectively dead). +- Dependencies updated: + - `indexmap` updated to `2.0.0` +- Use Rust edition 2021. +- Bump MSRV to 1.64 (based on MSRV in `indexmap`) + ### Removed ## [0.11.0] - 2022-09-09