Index: .efiles ================================================================== --- .efiles +++ .efiles @@ -1,8 +1,11 @@ Cargo.toml README.md www/index.md www/changelog.md +src/err.rs src/lib.rs src/tx.rs src/rx.rs tests/basic.rs +tests/closed.rs +tests/ctrl.rs Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,16 +1,16 @@ [package] name = "limqch" -version = "0.1.0" -edition = "2021" +version = "0.2.0" +edition = "2024" license = "0BSD" # https://crates.io/category_slugs categories = [ "concurrency" ] keywords = [ "channel", "bounded" ] repository = "https://repos.qrnch.tech/pub/limqch" description = "A channel built on top of limq." -rust-version = "1.56" +rust-version = "1.85" exclude = [ ".fossil-settings", ".efiles", ".fslckout", "www", @@ -21,11 +21,11 @@ # https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section [badges] maintenance = { status = "actively-developed" } [dependencies] -limq = { version = "0.1.4" } +limq = { version = "0.3.0" } parking_lot = { version = "0.12.3" } wakerizer = { version = "0.1.0" } [dev-dependencies] tokio = { version = "1.44.1", features = ["macros", "rt", "rt-multi-thread"] } ADDED rustfmt.toml Index: rustfmt.toml ================================================================== --- /dev/null +++ rustfmt.toml @@ -0,0 +1,14 @@ +blank_lines_upper_bound = 2 +comment_width = 79 +edition = "2024" +format_strings = true +max_width = 79 +match_block_trailing_comma = false +# merge_imports = true +newline_style = "Unix" +tab_spaces = 2 +trailing_comma = "Never" +unstable_features = true +wrap_comments = true +#reorder_imports = false +#reorder_modules = false ADDED src/err.rs Index: src/err.rs ================================================================== --- /dev/null +++ src/err.rs @@ -0,0 +1,49 @@ +use std::fmt; + +/// Errors that [`Sender`](super::Sender) and [`Receiver`](super::Receiver) +/// methods can return. +#[derive(PartialEq, Eq)] +pub enum Error { + /// No remote channel end-points remain. + Closed, + + /// The node will currently not fit in the queue. + WontFit(T), + + /// The node can't fit in the queue (unless reconfigured to allow larger + /// nodes). + CantFit(T) +} + +impl std::error::Error for Error {} + +impl fmt::Debug for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::Closed => write!(f, "Error::Closed"), + Self::WontFit(_) => write!(f, "Error::WontFit"), + Self::CantFit(_) => write!(f, "Error::CantFit") + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::Closed => write!(f, "Remote end-points closed"), + Self::WontFit(_) => write!(f, "Won't fit"), + Self::CantFit(_) => write!(f, "Can't fit") + } + } +} + +impl From> for Error { + fn from(err: limq::Error) -> Self { + match err { + limq::Error::WontFit(n) => Self::WontFit(n), + limq::Error::CantFit(n) => Self::CantFit(n) + } + } +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -1,7 +1,8 @@ //! Channel based on [`LimQ`]. +mod err; mod rx; mod tx; use std::sync::Arc; @@ -9,27 +10,42 @@ use limq::LimQ; use wakerizer::Wakers; -pub use limq::Overflow; -pub use rx::Receiver; +pub use err::Error; +pub use limq::{Controller, Overflow}; +pub use rx::{Receiver, RecvFuture}; pub use tx::Sender; - -struct Inner { - q: LimQ +struct Inner +where + C: Controller +{ + q: LimQ, + tx_count: usize, + rx_count: usize } -impl Inner { - fn new(qlim: Option) -> Self { - Self { q: LimQ::new(qlim) } +impl Inner +where + C: Controller +{ + fn new(lc: C) -> Self { + Self { + q: LimQ::new(lc), + tx_count: 0, + rx_count: 0 + } } } -struct Shared { - inner: Mutex>, +struct Shared +where + C: Controller +{ + inner: Mutex>, /// Used from receivers in order to wait for senders to add new elements to /// the queue. tx_wakers: Wakers, @@ -40,13 +56,16 @@ rx_wakers: Wakers, rx_signal: Condvar } -impl Shared { - fn new(qlim: Option) -> Self { - let inner = Inner::new(qlim); +impl Shared +where + C: Controller +{ + fn new(lc: C) -> Self { + let inner = Inner::new(lc); let inner = Mutex::new(inner); Self { inner, tx_wakers: Wakers::new(), rx_wakers: Wakers::new(), @@ -54,56 +73,132 @@ rx_signal: Condvar::new() } } /// Called by the receiver end-point when a node has been takwn off the - /// queue, in case a sender is waiting for space to become available. - fn wake_sender(&self) { + /// queue, in case any sendera are waiting for space to become available. + fn wake_senders(&self) { self.tx_wakers.wake_all(); self.tx_signal.notify_one(); } /// Called by the sender end-point when a node has been added to the queue in - /// case a receiver is waiting for nodes to become available. - fn wake_receiver(&self) { + /// case any receivers are waiting for nodes to become available. + fn wake_receivers(&self) { self.rx_wakers.wake_all(); self.rx_signal.notify_one(); } } - /// Create a channel pair, with an optional internal queue limit. +/// +/// ``` +/// use limqch::{channel, Error}; +/// use limq::{LimQ, LengthLimit}; +/// +/// // Construct a channel which uses an internal queue that is limited to +/// // 2 elements. +/// let lenlim = LengthLimit::new(2); +/// let (tx, rx) = channel(lenlim); +/// +/// tx.try_send(1).unwrap(); +/// tx.try_send(2).unwrap(); +/// let Err(Error::WontFit(n)) = tx.try_send(3) else { +/// panic!("Unexpectedly not Error::WontFit"); +/// }; +/// +/// let n = rx.try_recv().unwrap().unwrap(); +/// assert_eq!(n, 1); +/// ``` #[must_use] -pub fn channel(qlim: Option) -> (Sender, Receiver) { - let sh = Shared::new(qlim); +pub fn channel(lc: C) -> (Sender, Receiver) +where + C: Controller, + T: Send + Sync +{ + let sh = Shared::new(lc); let sh = Arc::new(sh); let tx = Sender::new(Arc::clone(&sh)); let rx = Receiver::new(sh); (tx, rx) } - +/* /// Object that can be used to spawn sender and receiver end-points. -pub struct Spawner(Arc>); +pub struct Spawner(Arc>) +where + C: Controller; -impl Spawner { +impl Spawner +where + C: Controller, + T: Send + Sync +{ #[must_use] - pub fn new(qlim: Option) -> Self { - let sh = Shared::new(qlim); + pub fn new(lc: C) -> Self { + let sh = Shared::new(lc); let sh = Arc::new(sh); Self(sh) } #[must_use] - pub fn sender(&self) -> Sender { + pub fn sender(&self) -> Sender { Sender::new(Arc::clone(&self.0)) } #[must_use] - pub fn receiver(&self) -> Receiver { + pub fn receiver(&self) -> Receiver { Receiver::new(Arc::clone(&self.0)) } } +*/ + + +/// Proxy object to allow [`Sender`] and [`Receiver`] instances to +/// access the internal queue's [`Controller`]. +#[derive(Clone)] +#[repr(transparent)] +pub struct Ctrl(Arc>) +where + C: Controller; + +impl Ctrl +where + C: Controller +{ + /// Access the underlying [`Controller`] in a closure. + /// + /// This can be used to access the `Controller`. + /// + /// # Caveat Utilitor + /// An internal lock is held while the closure is called. The application + /// must return quickly to avoid holding up the channel end-points (or, + /// worse, holding up an an async runtime). + pub fn with_ctrl(&self, f: F) -> R + where + F: FnOnce(&C) -> R + { + let inner = self.0.inner.lock(); + f(inner.q.controller()) + } + + /// Access the underlying [`Controller`] mutably in a closure. + /// + /// This can be used to reconfigure the `Controller`, if it support + /// reconfiguration. + /// + /// # Caveat Utilitor + /// An internal lock is held while the closure is called. The application + /// must return quickly to avoid holding up the channel end-points (or, + /// worse, holding up an an async runtime). + pub fn with_ctrl_mut(&self, f: F) -> R + where + F: FnOnce(&mut C) -> R + { + let mut inner = self.0.inner.lock(); + f(inner.q.controller_mut()) + } +} // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/rx.rs ================================================================== --- src/rx.rs +++ src/rx.rs @@ -5,89 +5,159 @@ task::{Context, Poll} }; use wakerizer::Waiter; -use super::Shared; +use super::{Controller, Ctrl, Error, Shared}; /// Channel receiver end-point. -#[derive(Clone)] -pub struct Receiver { - sh: Arc> +pub struct Receiver +where + C: Controller +{ + sh: Arc> +} + +impl Clone for Receiver +where + C: Controller +{ + fn clone(&self) -> Self { + let mut inner = self.sh.inner.lock(); + inner.rx_count += 1; + drop(inner); + + let sh = Arc::clone(&self.sh); + Self { sh } + } } -impl Receiver { +impl Receiver +where + C: Controller +{ #[must_use] - pub(super) const fn new(sh: Arc>) -> Self { + pub(super) fn new(sh: Arc>) -> Self { + let mut inner = sh.inner.lock(); + inner.rx_count += 1; + drop(inner); Self { sh } } + /// Return a [`Ctrl`], which can be used to access the internal + /// [`Controller`]. #[must_use] - pub fn recv_blocking(&self) -> T { + pub fn ctrl(&self) -> Ctrl { + Ctrl(Arc::clone(&self.sh)) + } + + /// Receive a node from queue. If the queue is empty, block and wait for a + /// new node to arrive. + /// + /// # Errors + /// [`Error::Closed`] means that the queue is empty and all the + /// [`Sender`](super::Sender)s have been dropped. + pub fn recv_blocking(&self) -> Result> { let mut inner = self.sh.inner.lock(); let n = loop { + if inner.q.is_empty() && inner.tx_count == 0 { + return Err(Error::Closed); + } + if let Some(n) = inner.q.pop() { break n; } self.sh.rx_signal.wait(&mut inner); }; - // If there's a queue limit configured, then senders may be blocked waiting - // for space to become available. - if inner.q.max_len().is_some() { - self.sh.wake_sender(); - } + // Wake up any async senders that are waiting for space to become + // available. + self.sh.wake_senders(); drop(inner); - n + Ok(n) } + /// Returns a `Future` that is the `async` equivalent of + /// [`Receiver::recv_blocking()`]. #[must_use] - pub fn recv_async(&self) -> RecvFuture { + pub fn recv_async(&self) -> RecvFuture { RecvFuture { sh: Arc::clone(&self.sh), waiter: self.sh.rx_wakers.waiter() } } - #[must_use] - pub fn try_recv(&self) -> Option { + /// Attempt to retreive a node from the queue. + /// + /// Returns `Ok(Some(T))` if there's a node available for immediate pickup. + /// Returns `Ok(None)` is there are no nodes to pick up. + /// + /// # Errors + /// [`Error::Closed`] means that the queue is empty and all the + /// [`Sender`](super::Sender)s have been dropped. + pub fn try_recv(&self) -> Result, Error> { let mut inner = self.sh.inner.lock(); - inner.q.pop().map_or_else( + if inner.q.is_empty() && inner.tx_count == 0 { + return Err(Error::Closed); + } + Ok(inner.q.pop().map_or_else( || None, |n| { - // If there's a queue limit configured, then senders may be blocked - // waiting for space to become available. - if inner.q.max_len().is_some() { - self.sh.wake_sender(); - } + // Wake up any async senders that are waiting for space to become + // available. + self.sh.wake_senders(); drop(inner); Some(n) } - ) + )) + } +} + +impl Drop for Receiver +where + C: Controller +{ + fn drop(&mut self) { + let mut inner = self.sh.inner.lock(); + inner.rx_count -= 1; + drop(inner); + + self.sh.wake_senders(); } } -pub struct RecvFuture { - sh: Arc>, +/// A `Future` that will will resolve when there's data that can be returned +/// from the channel, of it the internal queue is empty but all the +/// [`Sender`](super::Sender) end-points have been dropped. +pub struct RecvFuture +where + C: Controller +{ + sh: Arc>, waiter: Waiter } -impl Future for RecvFuture { - type Output = T; +impl Future for RecvFuture +where + C: Controller +{ + type Output = Result>; fn poll( mut self: Pin<&mut Self>, ctx: &mut Context<'_> ) -> Poll { let mut inner = self.sh.inner.lock(); if let Some(n) = inner.q.pop() { - Poll::Ready(n) + Poll::Ready(Ok(n)) + } else if inner.tx_count == 0 { + Poll::Ready(Err(Error::Closed)) } else { drop(inner); // happy borrow-checker self.waiter.prime(ctx); Poll::Pending } Index: src/tx.rs ================================================================== --- src/tx.rs +++ src/tx.rs @@ -5,63 +5,121 @@ task::{Context, Poll} }; use wakerizer::Waiter; -use super::{Overflow, Shared}; +use super::{Controller, Ctrl, Error, Overflow, Shared}; /// Channel transmitter end-point. -#[derive(Clone)] -pub struct Sender { - sh: Arc> +pub struct Sender +where + C: Controller +{ + sh: Arc> +} + +impl Clone for Sender +where + C: Controller +{ + fn clone(&self) -> Self { + let mut inner = self.sh.inner.lock(); + inner.tx_count += 1; + drop(inner); + + let sh = Arc::clone(&self.sh); + Self { sh } + } } -impl Sender { - pub(super) const fn new(sh: Arc>) -> Self { +impl Sender +where + C: Controller, + T: Send + Sync +{ + pub(super) fn new(sh: Arc>) -> Self { + let mut inner = sh.inner.lock(); + inner.tx_count += 1; + drop(inner); Self { sh } } + + /// Return a [`Ctrl`], which can be used to access the internal + /// [`Controller`]. + #[must_use] + pub fn ctrl(&self) -> Ctrl { + Ctrl(Arc::clone(&self.sh)) + } /// Send an element over channel. /// /// If the channel has a limit, and the limit has been reached, then block /// and wait until a [`Receiver`](super::Receiver) has make more room /// available on the queue. /// - /// Returns `Err(())` if all [`Receiver`](super::Receiver)s have been - /// dropped. - pub fn send_blocking(&self, n: T) { + /// # Errors + /// [`Error::CantFit`] means the node was rejected by the + /// [`Controller`]. [`Error::Closed`] means there are no more + /// [`Receiver`](super::Receiver)s available. + pub fn send_blocking(&self, mut n: T) -> Result<(), Error> { let mut inner = self.sh.inner.lock(); - // Wait until queue isn't full - if let Some(max_len) = inner.q.max_len() { - while inner.q.len() == max_len { - self.sh.tx_signal.wait(&mut inner); - } - } - - // Ignoring error is okay here. It was just determined that there's room - // for the new element in the queue, so this must succeed. - // - // This is true as long as the mutex isn't unlocked between the check above - // and this call - let _ = inner.q.try_push(n); + // Keep trying to `try_push()` until successful. + loop { + if inner.rx_count == 0 { + return Err(Error::Closed); + } + + n = match inner.q.try_push(n) { + Ok(()) => break, + Err(e) => match e { + limq::Error::WontFit(n) => n, + limq::Error::CantFit(n) => { + return Err(Error::CantFit(n)); + } + } + }; + self.sh.tx_signal.wait(&mut inner); + } drop(inner); // Have a new element in queue -- wake up a waiting receiver - self.sh.wake_receiver(); + self.sh.wake_receivers(); + + Ok(()) + } + + /// This exists only because the compiler thinks `send_async()` is not `Send` + /// (because it thinks `inner` is held past the `await`, even though it + /// isn't. + /// + /// # Errors + /// [`Error::CantFit`] means the node was rejected by the + /// [`Controller`]. [`Error::Closed`] means there are no more + /// [`Receiver`](super::Receiver)s available. + fn try_push(&self, n: T) -> Result<(), Error> { + let mut inner = self.sh.inner.lock(); + if inner.rx_count == 0 { + return Err(Error::Closed); + } + + inner.q.try_push(n)?; + drop(inner); + Ok(()) } /// Send an element over channel. /// /// If the channel has a limit, and the limit has been reached, then block /// and wait until a [`Receiver`](super::Receiver) has make more room /// available on the queue. /// - /// Returns `Err(())` if all [`Receiver`](super::Receiver)s have been - /// dropped. - pub async fn send_async(&self, mut n: T) { + /// # Errors + /// [`Error::CantFit`] means the [`Controller`] rejected the node. + /// [`Error::Closed`] means not [`Receiver`](super::Receiver)s remain. + pub async fn send_async(&self, mut n: T) -> Result<(), Error> { // In an ideal world, there'd be a SendFuture that takes in the new node. // However, if the queue is full the node needs to be stored somewhere // until the task is woken up, and this presents a few annoying challenges. // // Instead, we'll use a Future that simply waits for there to be space @@ -68,98 +126,153 @@ // available and when that resolves immediate all the node to the queue. // // For multithreaded runtimes it is possible for a TOCTOU issue here so we // need to loop until the try_push() is successful. loop { - ReserveSpaceFuture { - sh: Arc::clone(&self.sh), - waiter: self.sh.tx_wakers.waiter() - } - .await; - - let mut inner = self.sh.inner.lock(); - - // ReserveSpaceFuture should have made sure that space is available. - n = match inner.q.try_push(n) { + // Attempt to push node onto queue. + n = match self.try_push(n) { Ok(()) => break, - Err(n) => n + Err(e) => match e { + Error::Closed => return Err(Error::Closed), + Error::WontFit(n) => n, + Error::CantFit(n) => return Err(Error::CantFit(n)) + } + }; + + let fut = ReserveSpaceFuture { + sh: Arc::clone(&self.sh), + waiter: self.sh.tx_wakers.waiter(), + n: &n }; + match fut.await { + Ok(()) => { + // fall through + } + Err(e) => match e { + limq::CheckErr::WontFit => { + // fall through + } + limq::CheckErr::CantFit => { + return Err(Error::CantFit(n)); + } + } + } } // Have a new element in queue -- wake up waiting receivers - self.sh.wake_receiver(); + self.sh.wake_receivers(); + + Ok(()) } /// Fallible sending. /// /// # Errors - /// If the queue is full the node is returned. - pub fn try_send(&self, n: T) -> Result<(), T> { - let mut inner = self.sh.inner.lock(); - let res = inner.q.try_push(n); - - // Have a new element in queue -- wake up a waiting receiver - if res.is_ok() { - self.sh.wake_receiver(); - } - + /// [`Error::CantFit`] means the [`Controller`] permanently rejected the + /// node. [`Error::WontFit`] means the [`Controller`] temporarily + /// rejected the node. [`Error::Closed`] means not + /// [`Receiver`](super::Receiver)s remain. + pub fn try_send(&self, n: T) -> Result<(), Error> { + let mut inner = self.sh.inner.lock(); + if inner.rx_count == 0 { + return Err(Error::Closed); + } + inner.q.try_push(n)?; drop(inner); - res + self.sh.wake_receivers(); + + Ok(()) } /// Forcibly add an element to the queue. /// /// If the queue has a limit and the queue is full, then the oldest node will /// be removed before the new element is added. - pub fn force_send(&self, n: T) { + /// + /// # Errors + /// [`Error::CantFit`] means the [`Controller`] rejected the node. + /// [`Error::Closed`] means not [`Receiver`](super::Receiver)s remain. + pub fn force_send(&self, n: T) -> Result<(), Error> { let mut inner = self.sh.inner.lock(); - inner.q.force_push(n); - + inner.q.force_push(n)?; drop(inner); // Have a new element in queue -- wake up a waiting receiver - self.sh.wake_receiver(); + self.sh.wake_receivers(); + + Ok(()) } /// Forcibly add an element to rhe channel, allowing the caller to determine /// how overflow is handled. - pub fn force_send_oc(&self, n: T, overflow: Overflow) { + /// + /// # Errors + /// [`Error::CantFit`] means the [`Controller`] rejected the node. + /// [`Error::Closed`] means not [`Receiver`](super::Receiver)s remain. + pub fn force_send_oc( + &self, + n: T, + overflow: Overflow + ) -> Result<(), Error> { let mut inner = self.sh.inner.lock(); - inner.q.force_push_oc(n, overflow); - + inner.q.force_push_oc(n, overflow)?; drop(inner); // Have a new element in queue -- wake up a waiting receiver - self.sh.wake_receiver(); + self.sh.wake_receivers(); + + Ok(()) + } +} + +impl Drop for Sender +where + C: Controller +{ + fn drop(&mut self) { + let mut inner = self.sh.inner.lock(); + inner.tx_count -= 1; + drop(inner); + + self.sh.wake_receivers(); } } /// A [`Future`] that will resolve when the queue is not full. -struct ReserveSpaceFuture { - sh: Arc>, - waiter: Waiter +struct ReserveSpaceFuture<'n, C, T> +where + C: Controller, + T: Send +{ + sh: Arc>, + waiter: Waiter, + n: &'n T } -impl Future for ReserveSpaceFuture { - type Output = (); +impl Future for ReserveSpaceFuture<'_, C, T> +where + C: Controller, + T: Send +{ + type Output = Result<(), limq::CheckErr>; fn poll( mut self: Pin<&mut Self>, ctx: &mut Context<'_> ) -> Poll { let inner = self.sh.inner.lock(); - if let Some(max_len) = inner.q.max_len() { - if inner.q.len() < max_len { - Poll::Ready(()) - } else { - drop(inner); // happy borrow-checker - self.waiter.prime(ctx); - Poll::Pending - } - } else { - Poll::Ready(()) + match inner.q.would_fit(self.n) { + Ok(()) => Poll::Ready(Ok(())), + Err(e) => match e { + limq::CheckErr::WontFit => { + drop(inner); // happy borrow-checker + self.waiter.prime(ctx); + Poll::Pending + } + limq::CheckErr::CantFit => Poll::Ready(Err(limq::CheckErr::CantFit)) + } } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: tests/basic.rs ================================================================== --- tests/basic.rs +++ tests/basic.rs @@ -1,67 +1,75 @@ use std::thread; use tokio::task; -use limqch::channel; +use limq::OptLenLim; + +use limqch::{Error, channel}; + #[test] fn try_send_full() { - let (tx, rx) = channel(Some(2)); - tx.send_blocking(1); - tx.send_blocking(2); - assert_eq!(tx.try_send(3), Err(3)); - - assert_eq!(rx.recv_blocking(), 1); - assert_eq!(rx.try_recv(), Some(2)); - assert_eq!(rx.try_recv(), None); + let lenlim = OptLenLim::new(Some(2)); + let (tx, rx) = channel(lenlim); + tx.send_blocking(1).unwrap(); + tx.send_blocking(2).unwrap(); + assert_eq!(tx.try_send(3), Err(Error::WontFit(3))); + + assert_eq!(rx.recv_blocking().unwrap(), 1); + assert_eq!(rx.try_recv().unwrap(), Some(2)); + assert_eq!(rx.try_recv().unwrap(), None); } #[tokio::test] async fn try_on_full() { - let (tx, rx) = channel(Some(2)); - tx.send_async(1).await; - tx.send_async(2).await; - assert_eq!(tx.try_send(3), Err(3)); - - assert_eq!(rx.recv_async().await, 1); - assert_eq!(rx.try_recv(), Some(2)); - assert_eq!(rx.try_recv(), None); + let lenlim = OptLenLim::new(Some(2)); + let (tx, rx) = channel(lenlim); + tx.send_async(1).await.unwrap(); + tx.send_async(2).await.unwrap(); + assert_eq!(tx.try_send(3), Err(Error::WontFit(3))); + + assert_eq!(rx.recv_async().await.unwrap(), 1); + assert_eq!(rx.try_recv().unwrap(), Some(2)); + assert_eq!(rx.try_recv().unwrap(), None); } #[tokio::test] async fn send_from_spawned_task() { - let (tx, rx) = channel(Some(2)); + let lenlim = OptLenLim::new(Some(2)); + let (tx, rx) = channel(lenlim); task::spawn(async move { - tx.send_async(1).await; + tx.send_async(1).await.unwrap(); }); - assert_eq!(rx.recv_async().await, 1); + assert_eq!(rx.recv_async().await.unwrap(), 1); } #[tokio::test] async fn blocking_send_from_thread() { - let (tx, rx) = channel(Some(2)); + let lenlim = OptLenLim::new(Some(2)); + let (tx, rx) = channel(lenlim); thread::spawn(move || { - tx.send_blocking(1); + tx.send_blocking(1).unwrap(); }); - assert_eq!(rx.recv_blocking(), 1); + assert_eq!(rx.recv_blocking().unwrap(), 1); } #[tokio::test] async fn task_to_task() { - let (tx, rx) = channel(Some(2)); + let lenlim = OptLenLim::new(Some(2)); + let (tx, rx) = channel(lenlim); task::spawn(async move { - tx.send_async(1).await; + tx.send_async(1).await.unwrap(); }); task::spawn_blocking(move || { - assert_eq!(rx.recv_blocking(), 1); + assert_eq!(rx.recv_blocking().unwrap(), 1); }) .await .unwrap(); } ADDED tests/closed.rs Index: tests/closed.rs ================================================================== --- /dev/null +++ tests/closed.rs @@ -0,0 +1,125 @@ +use limq::OptLenLim; + +use limqch::{Error, channel}; + +// No receiver exists, so sending is pointless +#[test] +fn rx_closed_sync() { + let lenlim = OptLenLim::new(Some(2)); + let (tx, rx) = channel(lenlim); + + // Drop the only receiver + drop(rx); + + // Transmitter should now return Error::Closed + let Err(Error::Closed) = tx.send_blocking(11) else { + panic!("Unexpectedly not Err(Error::Closed)"); + }; + + let Err(Error::Closed) = tx.try_send(11) else { + panic!("Unexpectedly not Err(Error::Closed)"); + }; +} + +// No receiver exists, so sending is pointless +#[tokio::test] +async fn rx_closed_async() { + let lenlim = OptLenLim::new(Some(2)); + let (tx, rx) = channel(lenlim); + + // Drop the only receiver + drop(rx); + + // Transmitter should now return Error::Closed + let Err(Error::Closed) = tx.send_async(11).await else { + panic!("Unexpectedly not Err(Error::Closed)"); + }; +} + +#[test] +fn tx_closed_empty_sync() { + let lenlim = OptLenLim::::new(Some(2)); + let (tx, rx) = channel(lenlim); + + // Drop the only transmitter + drop(tx); + + // Transmitter should now return Error::Closed, because the queue is empty + // and there are no receivers. + let Err(Error::Closed) = rx.recv_blocking() else { + panic!("Unexpectedly not Err(Error::Closed)"); + }; + + let Err(Error::Closed) = rx.try_recv() else { + panic!("Unexpectedly not Err(Error::Closed)"); + }; +} + +#[tokio::test] +async fn tx_closed_empty_async() { + let lenlim = OptLenLim::::new(Some(2)); + let (tx, rx) = channel(lenlim); + + // Drop the only transmitter + drop(tx); + + // Transmitter should now return Error::Closed, because the queue is empty + // and there are no receivers. + let Err(Error::Closed) = rx.recv_async().await else { + panic!("Unexpectedly not Err(Error::Closed)"); + }; +} + +#[test] +fn tx_closed_nonempty_sync() { + let lenlim = OptLenLim::::new(Some(2)); + let (tx, rx) = channel(lenlim); + + // Add two nodes + tx.try_send(1).unwrap(); + tx.try_send(2).unwrap(); + + // Drop the only transmitter + drop(tx); + + // Should succeed + let Ok(1) = rx.recv_blocking() else { + panic!("Unexpectedly not Ok(Some(1))"); + }; + let Ok(Some(2)) = rx.try_recv() else { + panic!("Unexpectedly not Ok(Some(2))"); + }; + + // queue is now empty, so it should be returning Error::Closed + let Err(Error::Closed) = rx.recv_blocking() else { + panic!("Unexpectedly not Err(Error::Closed)"); + }; + + let Err(Error::Closed) = rx.try_recv() else { + panic!("Unexpectedly not Err(Error::Closed)"); + }; +} + +#[tokio::test] +async fn tx_closed_nonempty_async() { + let lenlim = OptLenLim::::new(Some(2)); + let (tx, rx) = channel(lenlim); + + // Add a node + tx.try_send(1).unwrap(); + + // Drop the only transmitter + drop(tx); + + // Should succeed + let Ok(1) = rx.recv_async().await else { + panic!("Unexpectedly not Ok(Some(1))"); + }; + + // queue is now empty, so it should be returning Error::Closed + let Err(Error::Closed) = rx.recv_async().await else { + panic!("Unexpectedly not Err(Error::Closed)"); + }; +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED tests/ctrl.rs Index: tests/ctrl.rs ================================================================== --- /dev/null +++ tests/ctrl.rs @@ -0,0 +1,26 @@ +use limq::BufLim; + +use limqch::channel; + +#[test] +fn access_controller() { + let lenlim = BufLim::new(None, None); + let (tx, _rx) = channel(lenlim); + + let max_len = tx.ctrl().with_ctrl(BufLim::get_max_len); + assert_eq!(max_len, None); + + let max_size = tx.ctrl().with_ctrl(BufLim::get_max_size); + assert_eq!(max_size, None); + + tx.ctrl().with_ctrl_mut(|c| c.set_max_len(Some(2))); + tx.ctrl().with_ctrl_mut(|c| c.set_max_size(Some(8))); + + let max_len = tx.ctrl().with_ctrl(BufLim::get_max_len); + assert_eq!(max_len, Some(2)); + + let max_size = tx.ctrl().with_ctrl(BufLim::get_max_size); + assert_eq!(max_size, Some(8)); +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -2,18 +2,33 @@ ⚠️ indicates a breaking change. ## [Unreleased] -[Details](/vdiff?from=limqch-0.1.0&to=trunk) +[Details](/vdiff?from=limqch-0.2.0&to=trunk) ### Added ### Changed ### Removed +--- + +## [0.2.0] - 2025-04-11 + +[Details](/vdiff?from=limqch-0.1.0&to=limqch-0.2.0) + +### Changed + +-⚠️ Update for `limq` `0.3.0`. Creating a limq channel now requires a + `Controller` implementation. + +### Removed + +- `Spawner` was removed. + --- ## [0.1.0] - 2025-04-01 Initial release.