Index: .efiles ================================================================== --- .efiles +++ .efiles @@ -7,5 +7,6 @@ src/tx.rs src/rx.rs tests/basics.rs tests/force.rs tests/closed.rs +tests/managed.rs Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "bndpresbufch" -version = "0.1.0" +version = "0.1.1" edition = "2021" license = "0BSD" # https://crates.io/category_slugs categories = [ "concurrency", "data-structures" ] keywords = [ "channel" ] @@ -18,11 +18,11 @@ "rustfmt.toml" ] [dependencies] parking_lot = { version = "0.12.3" } -bndpresbufq = { version = "0.1.2" } +bndpresbufq = { version = "0.1.3" } rustc-hash = { version = "2.0.0" } [dev-dependencies] tokio = { version = "1.40.0", features = [ "macros", "rt-multi-thread" Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -40,11 +40,15 @@ use rustc_hash::FxHashMap; use bndpresbufq::BndPresLimBufQ; -pub use {err::Error, rx::Receiver, tx::Sender}; +pub use { + err::Error, + rx::{MustHandle, Receiver}, + tx::Sender +}; /// Builder for a bounds-preserving buffer channel. #[derive(Default)] pub struct Builder { max_len: Option, Index: src/rx.rs ================================================================== --- src/rx.rs +++ src/rx.rs @@ -1,14 +1,82 @@ use std::{ future::Future, + mem::ManuallyDrop, + ops::{Deref, DerefMut}, pin::Pin, sync::Arc, task::{Context, Poll} }; use crate::err::Error; + +#[derive(Default)] +enum DropAction { + #[default] + ReturnToQueue, + Drop, + Nothing +} + +/// Wrapper around elements that must be handled by the application. +pub struct MustHandle { + sh: Arc, + inner: ManuallyDrop>, + drop_action: DropAction +} + +impl MustHandle { + fn new(sh: Arc, inner: Vec) -> Self { + Self { + sh, + inner: ManuallyDrop::new(inner), + drop_action: DropAction::default() + } + } + + /// Mark the inner object has handled and then drop it. + pub fn handled(mut self) { + self.drop_action = DropAction::Drop; + } + + /// Remove the inner object from the `MustHandle` and return it. + #[must_use] + pub fn into_inner(mut self) -> Vec { + self.drop_action = DropAction::Nothing; + unsafe { ManuallyDrop::take(&mut self.inner) } + } +} + +impl Deref for MustHandle { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + &self.inner + } +} + +impl DerefMut for MustHandle { + fn deref_mut(&mut self) -> &mut [u8] { + &mut self.inner + } +} + +impl Drop for MustHandle { + fn drop(&mut self) { + match self.drop_action { + DropAction::ReturnToQueue => { + let t = unsafe { ManuallyDrop::take(&mut self.inner) }; + let mut inner = self.sh.inner.lock(); + let _ = inner.q.try_return(t); + } + DropAction::Drop => unsafe { ManuallyDrop::drop(&mut self.inner) }, + DropAction::Nothing => {} + } + } +} + /// Receiving end-point used to receive bounds-preserved buffers from a /// [`Sender`](super::Sender) end-point. #[repr(transparent)] pub struct Receiver(pub(super) Arc); @@ -33,10 +101,22 @@ break None; } self.0.signal.wait(&mut inner); } } + + /// Take an element off the queue that must be handled by the application, or + /// it will be returned to the queue. + /// + /// If no elements are available on the queue, then block and wait for one to + /// be added. If the queue us empty and the sender has been dropped this + /// `None` will returned. + #[must_use] + pub fn pop_managed(&self) -> Option { + let n = self.pop()?; + Some(MustHandle::new(Arc::clone(&self.0), n)) + } /// Attempt to get next buffer in queue. /// /// # Errors /// [`Error::Closed`] will be returned if the queue is empty and all @@ -44,10 +124,26 @@ pub fn try_pop(&self) -> Result>, Error> { let mut inner = self.0.inner.lock(); self.0.pop(&mut inner) } + + /// Attempt to get next buffer in queue, wrapped in a [`MustHandle`] wrapper. + /// + /// # Errors + /// [`Error::Closed`] will be returned if the queue is empty and all + /// sender end-points have been dropped. + pub fn try_pop_managed(&self) -> Result, Error> { + let mut inner = self.0.inner.lock(); + Ok( + self + .0 + .pop(&mut inner)? + .map(|n| MustHandle::new(Arc::clone(&self.0), n)) + ) + } + /// Return a [`Future`] that will return a buffer from the queue or wait for /// a buffer to become available. /// /// The `Future` will resolve to `Ok(Some(Vec))` on success. /// @@ -59,10 +155,27 @@ RecvFuture { sh: Arc::clone(&self.0), waker_id: None } } + + + /// Return a [`Future`] that will return a buffer from the queue or wait for + /// a buffer to become available. + /// + /// The `Future` will resolve to `Ok(Some(Vec))` on success. + /// + /// # Errors + /// [`Error::Closed`] will be returned if the queue is empty and all + /// sender end-points have been dropped. + #[must_use] + pub fn apop_managed(&self) -> RecvManagedFuture { + RecvManagedFuture { + sh: Arc::clone(&self.0), + waker_id: None + } + } } impl Drop for Receiver { fn drop(&mut self) { let mut inner = self.0.inner.lock(); @@ -115,14 +228,66 @@ } impl Drop for RecvFuture { /// When a [`RecvFuture`] is dropped, make sure to deregister its waker [if /// registered]. + fn drop(&mut self) { + if let Some(id) = self.waker_id.take() { + let mut inner = self.sh.lock_inner(); + inner.rx_wakers.remove(&id); + } + } +} + + +pub struct RecvManagedFuture { + sh: Arc, + waker_id: Option +} + +impl Future for RecvManagedFuture { + type Output = Result; + fn poll( + mut self: Pin<&mut Self>, + ctx: &mut Context<'_> + ) -> Poll { + let mut inner = self.sh.lock_inner(); + match self.sh.pop(&mut inner) { + Ok(Some(buf)) => { + let ret = MustHandle::new(Arc::clone(&self.sh), buf); + Poll::Ready(Ok(ret)) + } + Ok(None) => { + // Queue is empty -- add this future to the collection of wakers and + // return pending + // ToDo: exhaust-deadlock + let id = loop { + inner.idgen = inner.idgen.wrapping_add(1); + if !inner.rx_wakers.contains_key(&inner.idgen) { + break inner.idgen; + } + }; + inner.rx_wakers.insert(id, ctx.waker().clone()); + + drop(inner); + + self.waker_id = Some(id); + + Poll::Pending + } + Err(e) => Poll::Ready(Err(e)) + } + } +} + +impl Drop for RecvManagedFuture { + /// When a [`RecvFuture`] is dropped, make sure to deregister its waker [if + /// registered]. fn drop(&mut self) { if let Some(id) = self.waker_id.take() { let mut inner = self.sh.lock_inner(); inner.rx_wakers.remove(&id); } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED tests/managed.rs Index: tests/managed.rs ================================================================== --- /dev/null +++ tests/managed.rs @@ -0,0 +1,45 @@ +use bndpresbufch::Builder; + +#[test] +fn wait() { + // Create a queue that can hold at most 4 bytes of data + let (tx, rx) = Builder::new().max_size(4).build(); + + // Fill queue up with 4 1-byte nodes + for idx in 0..4 { + tx.try_push([idx].into()).unwrap(); + } + + // Pull managed node off queue + let n = rx.pop_managed().unwrap(); + assert_eq!(&*n, vec![0]); + + // Drop node, which should put it back in the channel + drop(n); + + // Should get [0] again + assert_eq!(rx.pop(), Some(vec![0])); +} + +#[test] +fn try_pop() { + // Create a queue that can hold at most 4 bytes of data + let (tx, rx) = Builder::new().max_size(4).build(); + + // Fill queue up with 4 1-byte nodes + for idx in 0..4 { + tx.try_push([idx].into()).unwrap(); + } + + // Pull managed node off queue + let n = rx.try_pop_managed().unwrap().unwrap(); + assert_eq!(&*n, vec![0]); + + // Drop node, which should put it back in the channel + drop(n); + + // Should get [0] again + assert_eq!(rx.pop(), Some(vec![0])); +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -1,17 +1,28 @@ # Change Log ## [Unreleased] -[Details](/vdiff?from=bndpresbufch-0.1.0&to=trunk) +[Details](/vdiff?from=bndpresbufch-0.1.1&to=trunk) ### Added ### Changed ### Removed +--- + +## [0.1.1] - 2024-10-05 + +[Details](/vdiff?from=bndpresbufch-0.1.0&to=bndpresbufch-0.1.1) + +### Added + +- Added ability to take buffers off channel that need to be explicitly handled + or they will be returned to the channel. + --- ## [0.1.0] - 2024-09-13 First release.