Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,22 +1,21 @@ [package] name = "sigq" -version = "0.13.3" +version = "0.13.4" authors = ["Jan Danielsson "] edition = "2021" license = "0BSD" categories = [ "asynchronous", "concurrency", "data-structures" ] keywords = [ "threads", "sync" ] repository = "https://repos.qrnch.tech/pub/sigq" description = "Queue that signals waiting consumers about node availability" exclude = [ - ".efiles", ".fossil-settings", + ".efiles", ".fslckout", - "rustfmt.toml", - "tests", - "www" + "www", + "rustfmt.toml" ] rust-version = "1.64" [features] default = ["inline-more"] @@ -25,5 +24,8 @@ [dependencies] indexmap = { version = "2.0.0" } parking_lot = { version = "0.12.1" } +[package.metadata.docs.rs] +rustdoc-args = ["--generate-link-to-definition"] + Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -39,11 +39,11 @@ use parking_lot::{Condvar, Mutex}; use indexmap::IndexMap; pub use pull::Puller; -pub use push::Pusher; +pub use push::{Pusher, WeakPusher}; /// Error value used to indicate that there are no remote end-points available. /// /// If a `Puller` method returns this it means the queue has no more associated /// `Pusher`'s, which implies that no new nodes can become available. Index: src/push.rs ================================================================== --- src/push.rs +++ src/push.rs @@ -1,6 +1,6 @@ -use std::sync::Arc; +use std::sync::{Arc, Weak}; /// The transmitting end-point of queue. #[repr(transparent)] pub struct Pusher(pub(crate) Arc>); @@ -36,10 +36,15 @@ #[cfg_attr(feature = "inline-more", inline)] pub fn was_empty(&self) -> bool { let inner = self.0.inner.lock(); inner.q.is_empty() } + + /// Create a weak reference to this `Pusher`. + pub fn weak(&self) -> WeakPusher { + WeakPusher(Arc::downgrade(&self.0)) + } } impl Clone for Pusher { fn clone(&self) -> Self { let mut inner = self.0.inner.lock(); @@ -66,7 +71,32 @@ waker.wake() } } } } + +/// A weak reference to a [`Pusher`]. +#[repr(transparent)] +pub struct WeakPusher(pub(crate) Weak>); + +impl Clone for WeakPusher { + fn clone(&self) -> Self { + Self(Weak::clone(&self.0)) + } +} + +impl WeakPusher { + /// Attempt to upgrade `WeakPusher` to a [`Pusher`]. + /// + /// Returns `None` is all the strong references have been exhausted. + pub fn upgrade(&self) -> Option> { + if let Some(strong) = self.0.upgrade() { + let mut inner = strong.inner.lock(); + inner.npushers += 1; + Some(Pusher(Arc::clone(&strong))) + } else { + None + } + } +} // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: tests/simple.rs ================================================================== --- tests/simple.rs +++ tests/simple.rs @@ -1,28 +1,49 @@ -use sigq::StaleErr; +use sigq::{Puller, Pusher, StaleErr}; + + +/// A reustable test +fn std_test(tx: &Pusher<&str>, rx: &Puller<&str>) { + // There's a client, no data; try_pop() should return Ok(None) + assert_eq!(rx.try_pop(), Ok(None)); + + tx.push("hello").unwrap(); + + // There's a client, data; try_pop() should return Ok(Some) + assert_eq!(rx.try_pop(), Ok(Some("hello"))); + + // There's a client; try_pop() should return Ok(None) + assert_eq!(rx.try_pop(), Ok(None)); +} #[test] fn try_pop_retvals() { let (tx, rx) = sigq::new(); - // There's a client, no data, so try_pop() should return Ok(None) - assert_eq!(rx.try_pop(), Ok(None)); - - tx.push("hello").unwrap(); - - // There's a client, data, so try_pop() should return Ok(Some) - assert_eq!(rx.try_pop(), Ok(Some("hello"))); - - // There's a client, so try_pop() should return Ok(None) - assert_eq!(rx.try_pop(), Ok(None)); + // Run the common tests + std_test(&tx, &rx); // Drop the only pusher. drop(tx); // There are no more clients, so try_pop() should return Err(()) assert_eq!(rx.try_pop(), Err(StaleErr)); } + +#[test] +fn try_pop_with_only_weak_tx() { + let (tx, rx) = sigq::new::<&str>(); + + // Create a weak reference to the pusher + let _wtx = tx.weak(); + + // Drop the only (strong) pusher. + drop(tx); + + // There are no more clients; try_pop() should return Err(StaleErr) + assert_eq!(rx.try_pop(), Err(StaleErr)); +} #[test] fn drop_nonempty() { let (tx, rx) = sigq::new(); @@ -66,7 +87,53 @@ drop(rx); assert_eq!(tx.push("hello"), Err(StaleErr)); } + +/// Make sure that a weakened pusher can be upgraded and used +#[test] +fn weak_to_strong() { + let (tx, rx) = sigq::new::<&str>(); + + // Create a weak reference to the pusher + let wtx = tx.weak(); + + // Create a strong Pusher from the weak reference + let Some(tx2) = wtx.upgrade() else { + panic!("Unable to upgrade weak Pusher"); + }; + + // Run the common tests + std_test(&tx2, &rx); + + // Drop the only (strong) pusher. + drop(tx); + + // There's still a Pusher; try_pop() should return Ok(None) + assert_eq!(rx.try_pop(), Ok(None)); + + drop(tx2); + + // There are no more Pushers; try_pop() should return Err(StaleErr) + assert_eq!(rx.try_pop(), Err(StaleErr)); +} + +/// If the last strong `Pusher` is dropped a `WeakPusher` should fail to +/// upgrade to a `Pusher`. +#[test] +fn no_strong_weak_upgrade_fail() { + let (tx, _rx) = sigq::new::<&str>(); + + // Create a weak reference to the pusher + let wtx = tx.weak(); + + // Drop the only (strong) pusher. + drop(tx); + + // The only strong pusher was released; upgrading the weak one should fail + let Some(_) = wtx.upgrade() else { + panic!("Upgrade unexpectedly successful"); + }; +} // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -6,10 +6,19 @@ ### Changed ### Removed + +## [0.13.4] - 2023-09-15 + +### Added + +- `Pusher` objects can spawn downgraded to `WeakPusher` objects, that in turn + can be upgraded to `Pusher` objects (as long as all the strong `Pusher` + objects have not been dropped). + ## [0.13.3] - 2023-07-27 ### Changed