Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "sigq" -version = "0.12.0" +version = "0.13.0" authors = ["Jan Danielsson "] edition = "2021" license = "0BSD" categories = [ "asynchronous", "concurrency", "data-structures" ] keywords = [ "threads", "sync" ] Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -10,19 +10,23 @@ use parking_lot::{Condvar, Mutex}; use indexmap::IndexMap; -pub use pull::{Puller, StaleErr}; +pub use pull::Puller; pub use push::Pusher; + +#[derive(Debug, PartialEq)] +pub struct StaleErr; /// Inner shared data. /// /// This is read/write data, and hence protected by a mutex. struct Inner { q: VecDeque, npushers: usize, + npullers: usize, wakers: IndexMap } /// Inner shared data. struct Shared { @@ -34,10 +38,11 @@ /// Create a new queue and return its paired push and pull objects. pub fn new() -> (Pusher, Puller) { let inner = Inner { q: VecDeque::new(), npushers: 1, + npullers: 1, wakers: IndexMap::new() }; let shared = Shared { signal: Condvar::new(), inner: Mutex::new(inner), Index: src/pull.rs ================================================================== --- src/pull.rs +++ src/pull.rs @@ -5,16 +5,15 @@ sync::atomic::Ordering, sync::Arc, task::{Context, Poll} }; -#[derive(Debug, PartialEq)] -pub struct StaleErr; - /// The receiving end-point of queue. #[repr(transparent)] pub struct Puller(pub(crate) Arc>); + +use super::StaleErr; impl Puller { /// Pull the oldest node off the queue and return it. /// /// If no nodes are available on the queue, then block and wait for one to @@ -92,10 +91,17 @@ pub fn was_empty(&self) -> bool { let inner = self.0.inner.lock(); inner.q.is_empty() } } + +impl Drop for Puller { + fn drop(&mut self) { + let mut inner = self.0.inner.lock(); + inner.npullers -= 1; + } +} #[doc(hidden)] pub struct PopFuture { ctx: Arc>, Index: src/push.rs ================================================================== --- src/push.rs +++ src/push.rs @@ -2,24 +2,33 @@ /// The transmitting end-point of queue. #[repr(transparent)] pub struct Pusher(pub(crate) Arc>); +use super::StaleErr; + impl Pusher { /// Push a node on to the queue and unlock one queue reader, if any. /// /// If there are any tasks or threads waiting for new nodes to arrive they /// will be notified. + /// + /// Returns `Err(StaleErr)` is there are no [`Puller`](super::Puller)'s + /// available to receive any new nodes. #[cfg_attr(feature = "inline-more", inline)] - pub fn push(&self, item: I) { + pub fn push(&self, item: I) -> Result<(), StaleErr> { let mut inner = self.0.inner.lock(); - inner.q.push_back(item); - if let Some((_, n)) = inner.wakers.pop() { - n.wake(); + if inner.npullers == 0 { + Err(StaleErr) + } else { + inner.q.push_back(item); + if let Some((_, n)) = inner.wakers.pop() { + n.wake(); + } + self.0.signal.notify_one(); + Ok(()) } - self.0.signal.notify_one(); - drop(inner); } /// Returns a boolean indicating whether the queue was empty or not. /// /// This function is not particularly useful. If you don't understand why, Index: tests/simple.rs ================================================================== --- tests/simple.rs +++ tests/simple.rs @@ -5,11 +5,11 @@ let (tx, rx) = sigq::new(); // There's a client, no data, so try_pop() should return Ok(None) assert_eq!(rx.try_pop(), Ok(None)); - tx.push("hello"); + tx.push("hello").unwrap(); // There's a client, data, so try_pop() should return Ok(Some) assert_eq!(rx.try_pop(), Ok(Some("hello"))); // There's a client, so try_pop() should return Ok(None) @@ -27,11 +27,11 @@ let (tx, rx) = sigq::new(); // There's a client, no data, so try_pop() should return Ok(None) assert_eq!(rx.try_pop(), Ok(None)); - tx.push("hello"); + tx.push("hello").unwrap(); // Drop the only pusher. It should still be possible to pull existing nodes // off the stack. drop(tx); @@ -57,7 +57,16 @@ drop(tx); assert!(jh.join().unwrap()); } + +#[test] +fn stale_puller() { + let (tx, rx) = sigq::new::<&str>(); + + drop(rx); + + assert_eq!(tx.push("hello"), Err(StaleErr)); +} // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -4,10 +4,22 @@ ### Added ### Changed +### Removed + + +## [0.13.0] + +### Added + +### Changed + +- `Pusher::push()` will return `Err(StaleErr)` if there are no more associated + `Puller`s. + ### Removed ## [0.12.0] - 2023-07-24