Index: .efiles ================================================================== --- .efiles +++ .efiles @@ -2,5 +2,6 @@ www/changelog.md src/lib.rs src/push.rs src/pull.rs tests/simple.rs +tests/managed.rs Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,9 +1,8 @@ [package] name = "sigq" -version = "0.13.4" -authors = ["Jan Danielsson "] +version = "0.13.5" edition = "2021" license = "0BSD" categories = [ "asynchronous", "concurrency", "data-structures" ] keywords = [ "threads", "sync" ] repository = "https://repos.qrnch.tech/pub/sigq" @@ -17,15 +16,14 @@ ] rust-version = "1.64" [features] default = ["inline-more"] - inline-more = [] [dependencies] -indexmap = { version = "2.0.0" } -parking_lot = { version = "0.12.1" } +indexmap = { version = "2.5.0" } +parking_lot = { version = "0.12.3" } [package.metadata.docs.rs] rustdoc-args = ["--generate-link-to-definition"] ADDED bacon.toml Index: bacon.toml ================================================================== --- /dev/null +++ bacon.toml @@ -0,0 +1,130 @@ +# This is a configuration file for the bacon tool +# +# Bacon repository: https://github.com/Canop/bacon +# Complete help on configuration: https://dystroy.org/bacon/config/ +# You can also check bacon's own bacon.toml file +# as an example: https://github.com/Canop/bacon/blob/main/bacon.toml + +# For information about clippy lints, see: +# https://github.com/rust-lang/rust-clippy/blob/master/README.md + +#default_job = "check" +default_job = "clippy-all-pedantic" + +[jobs.check] +command = ["cargo", "check", "--color", "always"] +need_stdout = false + +[jobs.check-all] +command = ["cargo", "check", "--all-targets", "--color", "always"] +need_stdout = false + +# Run clippy on the default target +[jobs.clippy] +command = [ + "cargo", "clippy", + "--color", "always", +] +need_stdout = false + +# Run clippy on all targets +# To disable some lints, you may change the job this way: +# [jobs.clippy-all] +# command = [ +# "cargo", "clippy", +# "--all-targets", +# "--color", "always", +# "--", +# "-A", "clippy::bool_to_int_with_if", +# "-A", "clippy::collapsible_if", +# "-A", "clippy::derive_partial_eq_without_eq", +# ] +# need_stdout = false +[jobs.clippy-all] +command = [ + "cargo", "clippy", + "--all-targets", + "--color", "always", +] +need_stdout = false + +[jobs.clippy-pedantic] +command = [ + "cargo", "clippy", + "--color", "always", + "--", + "-Wclippy::all", + "-Wclippy::pedantic", + "-Wclippy::nursery", + "-Wclippy::cargo" +] +need_stdout = false + +[jobs.clippy-all-pedantic] +command = [ + "cargo", "clippy", + "--all-targets", + "--color", "always", + "--", + "-Wclippy::all", + "-Wclippy::pedantic", + "-Wclippy::nursery", + "-Wclippy::cargo" +] +need_stdout = false + +# This job lets you run +# - all tests: bacon test +# - a specific test: bacon test -- config::test_default_files +# - the tests of a package: bacon test -- -- -p config +[jobs.test] +command = [ + "cargo", "test", "--color", "always", + "--", "--color", "always", # see https://github.com/Canop/bacon/issues/124 +] +need_stdout = true + +[jobs.doc] +command = ["cargo", "doc", "--color", "always", "--no-deps"] +need_stdout = false + +# If the doc compiles, then it opens in your browser and bacon switches +# to the previous job +[jobs.doc-open] +command = ["cargo", "doc", "--color", "always", "--no-deps", "--open"] +need_stdout = false +on_success = "back" # so that we don't open the browser at each change + +# You can run your application and have the result displayed in bacon, +# *if* it makes sense for this crate. +# Don't forget the `--color always` part or the errors won't be +# properly parsed. +# If your program never stops (eg a server), you may set `background` +# to false to have the cargo run output immediately displayed instead +# of waiting for program's end. +[jobs.run] +command = [ + "cargo", "run", + "--color", "always", + # put launch parameters for your program behind a `--` separator +] +need_stdout = true +allow_warnings = true +background = true + +# This parameterized job runs the example of your choice, as soon +# as the code compiles. +# Call it as +# bacon ex -- my-example +[jobs.ex] +command = ["cargo", "run", "--color", "always", "--example"] +need_stdout = true +allow_warnings = true + +# You may define here keybindings that would be specific to +# a project, for example a shortcut to launch a specific job. +# Shortcuts to internal functions (scrolling, toggling, etc.) +# should go in your personal global prefs.toml file instead. +[keybindings] +# alt-m = "job:my-job" +c = "job:clippy-all" # comment this to have 'c' run clippy on only the default target Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -38,11 +38,11 @@ use parking_lot::{Condvar, Mutex}; use indexmap::IndexMap; -pub use pull::Puller; +pub use pull::{MustHandle, Puller}; pub use push::{Pusher, WeakPusher}; /// Error value used to indicate that there are no remote end-points available. /// /// If a `Puller` method returns this it means the queue has no more associated @@ -49,11 +49,11 @@ /// `Pusher`'s, which implies that no new nodes can become available. /// /// If a `Pusher` method returns this it means that the queue has no more /// associated `Puller`'s, which implies that there's nothing to take nodes off /// the queue any longer. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Eq)] pub struct StaleErr; /// Inner shared data. /// /// This is read/write data, and hence protected by a mutex. @@ -70,10 +70,11 @@ inner: Mutex>, idgen: AtomicUsize } /// Create a new queue and return its paired push and pull objects. +#[must_use] pub fn new() -> (Pusher, Puller) { let inner = Inner { q: VecDeque::new(), npushers: 1, npullers: 1, Index: src/pull.rs ================================================================== --- src/pull.rs +++ src/pull.rs @@ -1,8 +1,10 @@ use std::{ future::Future, + mem::ManuallyDrop, num::NonZeroUsize, + ops::{Deref, DerefMut}, pin::Pin, sync::atomic::Ordering, sync::Arc, task::{Context, Poll} }; @@ -10,49 +12,126 @@ /// The receiving end-point of queue. #[repr(transparent)] pub struct Puller(pub(crate) Arc>); use super::StaleErr; + +#[derive(Default)] +enum DropAction { + #[default] + ReturnToQueue, + Drop, + Nothing +} + +/// Wrapper around elements that must be handled by the application. +pub struct MustHandle { + sh: Arc>, + inner: ManuallyDrop, + drop_action: DropAction +} + +impl MustHandle { + fn new(sh: Arc>, inner: T) -> Self { + Self { + sh, + inner: ManuallyDrop::new(inner), + drop_action: DropAction::default() + } + } + + /// Mark the inner object has handled and then drop it. + pub fn handled(mut self) { + self.drop_action = DropAction::Drop; + } + + /// Remove the inner object from the `MustHandle` and return it. + pub fn into_inner(mut self) -> T { + self.drop_action = DropAction::Nothing; + unsafe { ManuallyDrop::take(&mut self.inner) } + } +} + +impl Deref for MustHandle { + type Target = T; + + fn deref(&self) -> &T { + &self.inner + } +} + +impl DerefMut for MustHandle { + fn deref_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl Drop for MustHandle { + fn drop(&mut self) { + match self.drop_action { + DropAction::ReturnToQueue => { + let t = unsafe { ManuallyDrop::take(&mut self.inner) }; + let mut inner = self.sh.inner.lock(); + inner.q.push_front(t); + } + DropAction::Drop => unsafe { ManuallyDrop::drop(&mut self.inner) }, + DropAction::Nothing => {} + } + } +} + impl Puller { /// Pull the oldest node off the queue and return it. /// /// If no nodes are available on the queue, then block and wait for one to /// become available. /// - /// Returns `Err(StaleErr)` if there are no more items in queue and there are - /// no more [`Pusher`](super::Pusher) objects associated with this - /// `Puller`. + /// # Errors + /// `StaleErr` means there are no more items in queue and there are no more + /// [`Pusher`](super::Pusher) objects associated with this `Puller`. #[cfg_attr(feature = "inline-more", inline)] pub fn pop(&self) -> Result { let mut inner = self.0.inner.lock(); loop { if inner.q.is_empty() && inner.npushers == 0 { break Err(StaleErr); - } else { - match inner.q.pop_front() { - Some(node) => { - break Ok(node); - } - None => { - self.0.signal.wait(&mut inner); - } + } + match inner.q.pop_front() { + Some(node) => { + break Ok(node); + } + None => { + self.0.signal.wait(&mut inner); } } } } + + /// Take an element off the queue that must be handled by the application, or + /// it will be returned to the queue. + /// + /// # Errors + /// `StaleErr` means there are no more items in queue and there are no more + /// [`Pusher`](super::Pusher) objects associated with this `Puller`. + pub fn pop_managed(&self) -> Result, StaleErr> { + let n = self.pop()?; + Ok(MustHandle::new(Arc::clone(&self.0), n)) + } /// Pull the oldest node off the queue and return it. /// /// If a node is available on the queue then take it off and return it. /// - /// If no nodes are available and there are no more [`Pusher`](super::Pusher) - /// objects associated with this `Puller`, then return `Err(StaleErr)`. - /// /// If no nodes are available and there's at least one associated `Pusher` /// exists then return `Ok(None)`. + /// + /// # Errors + /// `StaleErr` is returned if no nodes are available and there are no more + /// [`Pusher`](super::Pusher) objects associated with this `Puller`. #[cfg_attr(feature = "inline-more", inline)] + #[allow(clippy::option_if_let_else)] pub fn try_pop(&self) -> Result, StaleErr> { let mut inner = self.0.inner.lock(); if let Some(n) = inner.q.pop_front() { Ok(Some(n)) } else if inner.npushers == 0 { @@ -59,10 +138,29 @@ Err(StaleErr) } else { Ok(None) } } + + /// Take an element off the queue that must be handled by the application, or + /// it will be returned to the queue. + /// + /// If a node is available on the queue then take it off and return it. + /// + /// If no nodes are available and there's at least one associated `Pusher` + /// exists then return `Ok(None)`. + /// + /// # Errors + /// `StaleErr` is returned if no nodes are available and there are no more + /// [`Pusher`](super::Pusher) objects associated with this `Puller`. + pub fn try_pop_managed(&self) -> Result>, StaleErr> { + Ok( + self + .try_pop()? + .map(|n| MustHandle::new(Arc::clone(&self.0), n)) + ) + } /// This method serves the same purpose as the [`pop()`](#method.pop) method, /// but rather than block it returns a `Future` to be used to wait for a node /// to arrive in an `async` context. /// @@ -75,22 +173,36 @@ /// assert_eq!(node, "hello"); /// assert_eq!(rx.was_empty(), true); /// } /// ``` #[cfg_attr(feature = "inline-more", inline)] + #[must_use] pub fn apop(&self) -> PopFuture { PopFuture { ctx: Arc::clone(&self.0), id: None } } + + /// This method serves the same purpose as the [`pop()`](#method.pop) method, + /// but rather than block it returns a `Future` to be used to wait for a node + /// to arrive in an `async` context. + #[cfg_attr(feature = "inline-more", inline)] + #[must_use] + pub fn apop_managed(&self) -> PopManagedFuture { + PopManagedFuture { + ctx: Arc::clone(&self.0), + id: None + } + } /// Returns a boolean indicating whether the queue was empty or not. /// /// This function is not particularly useful. If you don't understand why, /// then please don't use it. #[cfg_attr(feature = "inline-more", inline)] + #[must_use] pub fn was_empty(&self) -> bool { let inner = self.0.inner.lock(); inner.q.is_empty() } } @@ -105,11 +217,11 @@ let mut inner = self.0.inner.lock(); inner.npullers -= 1; // If this is the last puller then remove all thr nodes. // The nodes may contain some kind of context that must be notified that - // the node will never reach its destination. + // the node will never reach its intended destination. if inner.npullers == 0 { inner.q.clear(); } } } @@ -157,11 +269,63 @@ impl Drop for PopFuture { fn drop(&mut self) { if let Some(id) = self.id { let mut inner = self.ctx.inner.lock(); // Remove this future's waker - let _ = inner.wakers.remove(&id.get()); + let _ = inner.wakers.swap_remove(&id.get()); + } + } +} + + +#[doc(hidden)] +pub struct PopManagedFuture { + ctx: Arc>, + id: Option +} + +impl Future for PopManagedFuture { + type Output = Result, StaleErr>; + fn poll( + mut self: Pin<&mut Self>, + ctx: &mut Context<'_> + ) -> Poll { + let mut inner = self.ctx.inner.lock(); + match inner.q.pop_front() { + Some(node) => { + Poll::Ready(Ok(MustHandle::new(Arc::clone(&self.ctx), node))) + } + None => { + if inner.q.is_empty() && inner.npushers == 0 { + // No more nodes and no more pushers, so return None + Poll::Ready(Err(StaleErr)) + } else { + // Generate a unique identifier for this waker + let id = loop { + let id = self.ctx.idgen.fetch_add(1, Ordering::SeqCst); + // Make sure it is non-zero and unique + if id == 0 || inner.wakers.contains_key(&id) { + continue; + } + break id; + }; + inner.wakers.insert(id, ctx.waker().clone()); + drop(inner); + self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) }); + Poll::Pending + } + } + } + } +} + +impl Drop for PopManagedFuture { + fn drop(&mut self) { + if let Some(id) = self.id { + let mut inner = self.ctx.inner.lock(); + // Remove this future's waker + let _ = inner.wakers.swap_remove(&id.get()); } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/push.rs ================================================================== --- src/push.rs +++ src/push.rs @@ -10,12 +10,13 @@ /// Push a node on to the queue and unlock one queue reader, if any. /// /// If there are any tasks or threads waiting for new nodes to arrive they /// will be notified. /// - /// Returns `Err(StaleErr)` is there are no [`Puller`](super::Puller)'s - /// available to receive any new nodes. + /// # Errors + /// `StaleErr` means there are no [`Puller`](super::Puller)'s available + /// to receive any new nodes. #[cfg_attr(feature = "inline-more", inline)] pub fn push(&self, item: I) -> Result<(), StaleErr> { let mut inner = self.0.inner.lock(); if inner.npullers == 0 { Err(StaleErr) @@ -23,34 +24,36 @@ inner.q.push_back(item); if let Some((_, n)) = inner.wakers.pop() { n.wake(); } self.0.signal.notify_one(); + drop(inner); Ok(()) } } /// Returns a boolean indicating whether the queue was empty or not. /// /// This function is not particularly useful. If you don't understand why, /// then please don't use it. #[cfg_attr(feature = "inline-more", inline)] + #[must_use] pub fn was_empty(&self) -> bool { let inner = self.0.inner.lock(); inner.q.is_empty() } /// Create a weak reference to this `Pusher`. + #[must_use] pub fn weak(&self) -> WeakPusher { WeakPusher(Arc::downgrade(&self.0)) } } impl Clone for Pusher { fn clone(&self) -> Self { - let mut inner = self.0.inner.lock(); - inner.npushers += 1; + self.0.inner.lock().npushers += 1; Self(Arc::clone(&self.0)) } } impl Drop for Pusher { @@ -66,11 +69,11 @@ // receive new items. (When they discover that no pushers remain they will // return None). if inner.npushers == 0 { self.0.signal.notify_all(); for waker in inner.wakers.drain(..).map(|(_k, v)| v) { - waker.wake() + waker.wake(); } } } } @@ -86,17 +89,18 @@ impl WeakPusher { /// Attempt to upgrade `WeakPusher` to a [`Pusher`]. /// /// Returns `None` is all the strong references have been exhausted. + #[must_use] pub fn upgrade(&self) -> Option> { - if let Some(strong) = self.0.upgrade() { - let mut inner = strong.inner.lock(); - inner.npushers += 1; - Some(Pusher(Arc::clone(&strong))) - } else { - None - } + self.0.upgrade().map_or_else( + || None, + |strong| { + strong.inner.lock().npushers += 1; + Some(Pusher(Arc::clone(&strong))) + } + ) } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED tests/managed.rs Index: tests/managed.rs ================================================================== --- /dev/null +++ tests/managed.rs @@ -0,0 +1,48 @@ +#[test] +fn return_to_queue() { + let (tx, rx) = sigq::new(); + + // Add "hello" + tx.push("hello").unwrap(); + + // Add "world" + tx.push("world").unwrap(); + + // Take out "hello" as a managed element + let s = rx.pop_managed().unwrap(); + assert_eq!(*s, "hello"); + + // Drop the managed element, which should put it back on the queue + drop(s); + + // hello, again + let s = rx.pop().unwrap(); + assert_eq!(s, "hello"); +} + +#[test] +fn finalize() { + let (tx, rx) = sigq::new(); + + // Add "hello" + tx.push("hello").unwrap(); + + // All "world" + tx.push("world").unwrap(); + + // Take out "hello" as a managed element + let s = rx.pop_managed().unwrap(); + assert_eq!(*s, "hello"); + + // Mark element as handled + s.handled(); + + // next pull should yield "world" + let s = rx.pop().unwrap(); + assert_eq!(s, "world"); + + // .. and queue should be empty now + assert!(rx.was_empty()); +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -1,56 +1,89 @@ # Change Log ## [Unreleased] +[Details](/vdiff?from=sigq-0.13.5&to=trunk) + ### Added ### Changed ### Removed +--- + +## [0.13.5] - 2024-09-10 + +[Details](/vdiff?from=sigq-0.13.4&to=sigq-0.13.5) + +### Added + +- Implemented `Puller::pop_managed()`, `Puller::try_pop_managed()`, + `Puller::apop_managed()` which return queue elements wrapped in the + `MustHandle` wrapper, which will automatically return elements to the queue + if unhandled when dropped. + +--- ## [0.13.4] - 2023-09-15 + +[Details](/vdiff?from=sigq-0.13.3&to=sigq-0.13.4) ### Added - `Pusher` objects can spawn downgraded to `WeakPusher` objects, that in turn can be upgraded to `Pusher` objects (as long as all the strong `Pusher` objects have not been dropped). +--- ## [0.13.3] - 2023-07-27 + +[Details](/vdiff?from=sigq-0.13.2&to=sigq-0.13.3) ### Changed - Fixed bug that caused `Puller` to return `Err(StaleErr)` even though nodes still remain in the queue. +--- ## [0.13.2] - 2023-07-26 + +[Details](/vdiff?from=sigq-0.13.1&to=sigq-0.13.2) ### Changed - Documentation updates. +--- ## [0.13.1] - 2023-07-25 + +[Details](/vdiff?from=sigq-0.13.0&to=sigq-0.13.1) ### Changed - When the last `Puller` is dropped, clear the queue. +--- ## [0.13.0] - 2023-07-25 + +[Details](/vdiff?from=sigq-0.12.0&to=sigq-0.13.0) ### Changed - `Pusher::push()` will return `Err(StaleErr)` if there are no more associated `Puller`s. +--- ## [0.12.0] - 2023-07-24 + +[Details](/vdiff?from=sigq-0.11.0&to=sigq-0.12.0) ### Added - Add an `inline-more` feature (enabled by default). @@ -64,10 +97,11 @@ - Dependencies updated: - `indexmap` updated to `2.0.0` - Use Rust edition 2021. - Bump MSRV to 1.64 (based on MSRV in `indexmap`) +--- ## [0.11.0] - 2022-09-09 ### Added