Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "sigq" -version = "0.13.1" +version = "0.13.2" authors = ["Jan Danielsson "] edition = "2021" license = "0BSD" categories = [ "asynchronous", "concurrency", "data-structures" ] keywords = [ "threads", "sync" ] Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -1,10 +1,38 @@ -//! Queue which supports pushing and poping nodes from threads/tasks, crossing -//! sync/async boundaries. +//! _sigq_ is a FIFO queue that supports pushing and poping nodes from +//! threads/tasks, crossing sync/async boundaries. The interface to interact +//! with the queue is a pair of end-points. The [`Pusher`] is used to add data +//! to the queue, and the [`Puller`] is used to pull data off the queue. +//! +//! The `Pusher` has a [`push()`](Pusher::push) method that is used to push new +//! nodes onto the queue. +//! +//! The `Puller` has a blocking [`pop()`](Puller::pop) and a +//! [`apop()`](Puller::apop) that returns a `Future` for getting the next node +//! off the queue. These will return immediately with the next node if +//! available, or block and wait for a new node to be pushed onto the queue. +//! [`try_pop()`](Puller::try_pop) can be used as a non-blocking way to get the +//! next node, if available. +//! +//! ``` +//! let (pusher, puller) = sigq::new(); +//! pusher.push(42).unwrap(); +//! assert_eq!(puller.pop(), Ok(42)); +//! assert_eq!(puller.try_pop(), Ok(None)); +//! ``` +//! +//! # Semantics +//! - Dropping the last `Pusher` end-point will cause waiting `Puller`'s to +//! wake up and return `Err(StaleErr)` if there are no more nodes on the +//! queue. +//! - Dropping the last `Puller` end-point will: +//! - Immediately drop all the nodes in the queue. +//! - Cause the `Puller`'s to return `Err(StaleErr)` if new nodes are +//! attempted to be added to the queue. -pub mod pull; -pub mod push; +pub(crate) mod pull; +pub(crate) mod push; use std::{ collections::VecDeque, sync::atomic::AtomicUsize, sync::Arc, task::Waker }; @@ -13,10 +41,18 @@ use indexmap::IndexMap; pub use pull::Puller; pub use push::Pusher; +/// Error value used to indicate that there are no remote end-points available. +/// +/// If a `Puller` method returns this it means the queue has no more associated +/// `Pusher`'s, which implies that no new nodes can become available. +/// +/// If a `Pusher` method returns this it means that the queue has no more +/// associated `Puller`'s, which implies that there's nothing to take nodes off +/// the queue any longer. #[derive(Debug, PartialEq)] pub struct StaleErr; /// Inner shared data. /// Index: src/pull.rs ================================================================== --- src/pull.rs +++ src/pull.rs @@ -17,12 +17,13 @@ /// Pull the oldest node off the queue and return it. /// /// If no nodes are available on the queue, then block and wait for one to /// become available. /// - /// Returns `Err(())` if there are no more items in queue and there are no - /// more [`Pusher`](super::Pusher) objects associated with this `Puller`. + /// Returns `Err(StaleErr)` if there are no more items in queue and there are + /// no more [`Pusher`](super::Pusher) objects associated with this + /// `Puller`. #[cfg_attr(feature = "inline-more", inline)] pub fn pop(&self) -> Result { let mut inner = self.0.inner.lock(); loop { if inner.npushers == 0 { @@ -45,11 +46,11 @@ /// If a node is available on the queue then take it off and return it. /// /// If no nodes are available and there are no more [`Pusher`](super::Pusher) /// objects associated with this `Puller`, then return `Err(StaleErr)`. /// - /// If no nodes were available and there's at least one associated `Pusher` + /// If no nodes are available and there's at least one associated `Pusher` /// exists then return `Ok(None)`. #[cfg_attr(feature = "inline-more", inline)] pub fn try_pop(&self) -> Result, StaleErr> { let mut inner = self.0.inner.lock(); if let Some(n) = inner.q.pop_front() { @@ -93,10 +94,15 @@ inner.q.is_empty() } } impl Drop for Puller { + /// Drop a `Puller` instance. + /// + /// If this is the last `Puller` end-point of a sigq instance, then the inner + /// queue will be cleared (i.e. all its elements will be immediately + /// dropped). fn drop(&mut self) { let mut inner = self.0.inner.lock(); inner.npullers -= 1; // If this is the last puller then remove all thr nodes. Index: src/push.rs ================================================================== --- src/push.rs +++ src/push.rs @@ -47,10 +47,14 @@ Self(Arc::clone(&self.0)) } } impl Drop for Pusher { + /// Drop a `Pusher` instance. + /// + /// When the final instance of a sigq's instance's `Pusher` is dropped, wake + /// up any `Puller`'s waiting for new nodes to arrive. fn drop(&mut self) { let mut inner = self.0.inner.lock(); inner.npushers -= 1; // If this was the last pusher then wake any pullers that are waiting to Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -4,10 +4,21 @@ ### Added ### Changed +### Removed + + +## [0.13.2] - 2023-07-26 + +### Added + +### Changed + +- Documentation updates. + ### Removed ## [0.13.1] - 2023-07-25