Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,10 +1,11 @@ [package] name = "swctx" -version = "0.2.2" +version = "0.3.0" edition = "2021" license = "0BSD" +# https://crates.io/category_slugs categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/swctx" description = "One-shot channel with some special semantics." rust-version = "1.56" @@ -14,14 +15,18 @@ ".fslckout", "www", "rustfmt.toml" ] +# https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section +[badges] +maintenance = { status = "passively-maintained" } + [dependencies] -parking_lot = { version = "0.12.1" } +parking_lot = { version = "0.12.3" } [dev-dependencies] -tokio = { version = "1.35.1", features = ["rt-multi-thread"] } +tokio = { version = "1.40.0", features = ["rt-multi-thread"] } [package.metadata.docs.rs] rustdoc-args = ["--generate-link-to-definition"] ADDED bacon.toml Index: bacon.toml ================================================================== --- /dev/null +++ bacon.toml @@ -0,0 +1,129 @@ +# This is a configuration file for the bacon tool +# +# Bacon repository: https://github.com/Canop/bacon +# Complete help on configuration: https://dystroy.org/bacon/config/ +# You can also check bacon's own bacon.toml file +# as an example: https://github.com/Canop/bacon/blob/main/bacon.toml + +# For information about clippy lints, see: +# https://github.com/rust-lang/rust-clippy/blob/master/README.md + +default_job = "check" + +[jobs.check] +command = ["cargo", "check", "--color", "always"] +need_stdout = false + +[jobs.check-all] +command = ["cargo", "check", "--all-targets", "--color", "always"] +need_stdout = false + +# Run clippy on the default target +[jobs.clippy] +command = [ + "cargo", "clippy", + "--color", "always", +] +need_stdout = false + +# Run clippy on all targets +# To disable some lints, you may change the job this way: +# [jobs.clippy-all] +# command = [ +# "cargo", "clippy", +# "--all-targets", +# "--color", "always", +# "--", +# "-A", "clippy::bool_to_int_with_if", +# "-A", "clippy::collapsible_if", +# "-A", "clippy::derive_partial_eq_without_eq", +# ] +# need_stdout = false +[jobs.clippy-all] +command = [ + "cargo", "clippy", + "--all-targets", + "--color", "always", +] +need_stdout = false + +[jobs.clippy-pedantic] +command = [ + "cargo", "clippy", + "--color", "always", + "--", + "-Wclippy::all", + "-Wclippy::pedantic", + "-Wclippy::nursery", + "-Wclippy::cargo" +] +need_stdout = false + +[jobs.clippy-all-pedantic] +command = [ + "cargo", "clippy", + "--all-targets", + "--color", "always", + "--", + "-Wclippy::all", + "-Wclippy::pedantic", + "-Wclippy::nursery", + "-Wclippy::cargo" +] +need_stdout = false + +# This job lets you run +# - all tests: bacon test +# - a specific test: bacon test -- config::test_default_files +# - the tests of a package: bacon test -- -- -p config +[jobs.test] +command = [ + "cargo", "test", "--color", "always", + "--", "--color", "always", # see https://github.com/Canop/bacon/issues/124 +] +need_stdout = true + +[jobs.doc] +command = ["cargo", "doc", "--color", "always", "--no-deps"] +need_stdout = false + +# If the doc compiles, then it opens in your browser and bacon switches +# to the previous job +[jobs.doc-open] +command = ["cargo", "doc", "--color", "always", "--no-deps", "--open"] +need_stdout = false +on_success = "back" # so that we don't open the browser at each change + +# You can run your application and have the result displayed in bacon, +# *if* it makes sense for this crate. +# Don't forget the `--color always` part or the errors won't be +# properly parsed. +# If your program never stops (eg a server), you may set `background` +# to false to have the cargo run output immediately displayed instead +# of waiting for program's end. +[jobs.run] +command = [ + "cargo", "run", + "--color", "always", + # put launch parameters for your program behind a `--` separator +] +need_stdout = true +allow_warnings = true +background = true + +# This parameterized job runs the example of your choice, as soon +# as the code compiles. +# Call it as +# bacon ex -- my-example +[jobs.ex] +command = ["cargo", "run", "--color", "always", "--example"] +need_stdout = true +allow_warnings = true + +# You may define here keybindings that would be specific to +# a project, for example a shortcut to launch a specific job. +# Shortcuts to internal functions (scrolling, toggling, etc.) +# should go in your personal global prefs.toml file instead. +[keybindings] +# alt-m = "job:my-job" +c = "job:clippy-all" # comment this to have 'c' run clippy on only the default target Index: src/err.rs ================================================================== --- src/err.rs +++ src/err.rs @@ -1,14 +1,21 @@ //! Error management module. use std::fmt; /// Errors that can be returned by the `swctx` library. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Eq)] pub enum Error { /// The [`SetCtx`](super::SetCtx) was dropped while in state 'S'. Aborted(S), + + /// The [`WaitCtx`](super::WaitCtx) was dropped. + /// + /// This will be returned by [`SetCtx`](super::SetCtx) if it attempts to + /// update the context if there's no longer a `WaitCtx` to receive the + /// changes. + LostWaiter, /// The [`SetCtx`](super::SetCtx) triggered an application-defined error. App(E) } @@ -17,36 +24,43 @@ /// /// If the error is not `Error::App(E)` then return `None`, otherwise return /// Some(E). pub fn into_apperr(self) -> Option { match self { - Error::App(e) => Some(e), + Self::App(e) => Some(e), _ => None } } /// Return application-specific error. /// - /// # Panic + /// # Panics /// This method will panic if the error is not `Error::App(E)`. pub fn unwrap_apperr(self) -> E { match self { - Error::App(e) => e, + Self::App(e) => e, _ => panic!("Not an Error::App") } } } -impl std::error::Error for Error {} +impl std::error::Error for Error where + E: std::error::Error +{ +} -impl fmt::Display for Error { +impl fmt::Display for Error +where + E: std::error::Error +{ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - Error::Aborted(_) => { + Self::Aborted(_) => { write!(f, "The set context was dropped prematurely") } - Error::App(err) => write!(f, "Application error; {:?}", err) + Self::LostWaiter => write!(f, "WaitCtx disappeared"), + Self::App(err) => write!(f, "Application error; {:?}", err) } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -1,7 +1,8 @@ -//! _swctx_ is very similar to a one-shot channel, but with some added -//! semantics. +//! _swctx_ is similar to a cross-thread/task one-shot channel, with the added +//! ability to store a generic "current state" of the channel prior to passing +//! a value over the channel. //! //! ``` //! use std::thread; //! use swctx::mkpair; //! @@ -60,11 +61,12 @@ } struct Inner { state: State, sctx_state: S, - waker: Option + waker: Option, + wctx_dropped: bool } impl Inner { fn try_get(&mut self) -> Result, Error> { match self.state { @@ -81,12 +83,13 @@ let State::Err(err) = old else { panic!("Unable to extract error"); }; Err(err) } - _ => { - panic!("Unexpected state"); + State::Finalized => { + // Shouldn't be possible + unimplemented!("Unexpected state"); } } } } @@ -98,27 +101,29 @@ impl Shared { fn notify_waiter(&self, inner: &mut Inner) { self.signal.notify_one(); if let Some(waker) = inner.waker.take() { - waker.wake() + waker.wake(); } } } /// Create a linked [`SetCtx`] and [`WaitCtx`] pair. /// /// The `WaitCtx` is used to wait for a value to arrive from the `SetCtx`. +#[must_use] pub fn mkpair() -> (SetCtx, WaitCtx) where S: Clone + Default { let inner = Inner { state: State::Waiting, sctx_state: S::default(), - waker: None + waker: None, + wctx_dropped: false }; let sh = Shared { inner: Mutex::new(inner), signal: Condvar::new() }; Index: src/sctx.rs ================================================================== --- src/sctx.rs +++ src/sctx.rs @@ -38,39 +38,67 @@ /// }); /// jh.join().unwrap(); /// /// assert_eq!(wctx.wait(), Err(Error::Aborted(State::InThread))); /// ``` - pub fn set_state(&self, s: S) { + /// + /// # Errors + /// If the corresponding [`WaitCtx`](super::WaitCtx) has been dropped, + /// [`Error::LostWaiter`] will be returned. + pub fn set_state(&self, s: S) -> Result<(), Error> { let mut inner = self.0.inner.lock(); + if inner.wctx_dropped { + return Err(Error::LostWaiter); + } inner.sctx_state = s; + drop(inner); + Ok(()) } /// Consume the `SetCtx` and store a value for the wait context to return. - pub fn set(self, data: T) { + /// + /// # Errors + /// If the corresponding [`WaitCtx`](super::WaitCtx) has been dropped, + /// [`Error::LostWaiter`] will be returned. + pub fn set(self, data: T) -> Result<(), Error> { // - This and `fail()` consume `self`. // - The wait context does not modify the state. // - The only other method is `set_state()` and it only touches // `sctx_state` and not `state`. // // These facts allow us to safely assume that the state does not need to be // checked here -- it can only be `Waiting`. - // - // The only "weird" state that can happen is that the wait context has been - // dropped, but we silently ignore this case. let mut inner = self.0.inner.lock(); + if inner.wctx_dropped { + return Err(Error::LostWaiter); + } + inner.state = State::Data(data); self.0.notify_waiter(&mut inner); + drop(inner); + + Ok(()) } /// Consume the `SetCtx` and store an error value for the wait context to /// return. - pub fn fail(self, error: E) { + /// + /// # Errors + /// If the corresponding [`WaitCtx`](super::WaitCtx) has been dropped, + /// [`Error::LostWaiter`] will be returned. + pub fn fail(self, error: E) -> Result<(), Error> { // See comments in SetCtx::set() for implementation details. let mut inner = self.0.inner.lock(); + if inner.wctx_dropped { + return Err(Error::LostWaiter); + } + inner.state = State::Err(Error::App(error)); self.0.notify_waiter(&mut inner); + drop(inner); + + Ok(()) } } impl Drop for SetCtx where @@ -80,17 +108,16 @@ let mut inner = self.0.inner.lock(); match inner.state { State::Waiting => { inner.state = State::Err(Error::Aborted(inner.sctx_state.clone())); self.0.notify_waiter(&mut inner); - } - State::Data(_) => { - // Do nothing. Assume the waiter will handle the data. + drop(inner); } - State::Err(_) | State::Finalized => { - // Do nothing + State::Data(_) | State::Err(_) | State::Finalized => { + // Do nothing. + // For the Data base, assume the waiter will handle the data. } } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/wctx.rs ================================================================== --- src/wctx.rs +++ src/wctx.rs @@ -15,33 +15,40 @@ #[repr(transparent)] pub struct WaitCtx(pub(crate) Arc>); impl WaitCtx { /// Wait for the paired [`SetCtx`](super::SetCtx) to set a value or fail. + /// + /// # Errors + /// Returns application-specific error wrapped in an [`Error::App`] if the + /// `SetCtx` reported failure. pub fn wait(self) -> Result> { let mut inner = self.0.inner.lock(); loop { match inner.state { State::Waiting => { self.0.signal.wait(&mut inner); } State::Data(_) => { let old = std::mem::replace(&mut inner.state, State::Finalized); + drop(inner); let State::Data(data) = old else { - panic!("Unable to extract data"); + unimplemented!("Unable to extract data"); }; break Ok(data); } State::Err(_) => { let old = std::mem::replace(&mut inner.state, State::Finalized); + drop(inner); let State::Err(err) = old else { - panic!("Unable to extract error"); + unimplemented!("Unable to extract error"); }; break Err(err); } - _ => { - panic!("Unexpected state"); + State::Finalized => { + // Shouldn't be possible + unimplemented!("Unexpected state") } } } } @@ -48,11 +55,15 @@ /// Non-blocking attempt to get the get the stored value. /// /// Returns `Ok(Some(T))` if a value has been stored. Returns `Ok(None)` if /// no value has been stored. /// - /// # Panic + /// # Errors + /// Returns application-specific error wrapped in an [`Error::App`] if the + /// [`SetCtx`](super::SetCtx) reported failure. + /// + /// # Panics /// This function will panic if called again after it has resolved to either /// data or error. pub fn try_get(&self) -> Result, Error> { let mut inner = self.0.inner.lock(); inner.try_get() @@ -60,17 +71,25 @@ /// Return a `Future` that will wait for either data to be set or an error to /// occur. /// /// # Cancel safety - /// This method is cancel safe. + /// The returned `Future` is cancel safe. /// - /// # Panic + /// # Panics /// This function will panic if called again after it has resolved to either /// data or error. - pub fn wait_async(&self) -> WaitFuture { - WaitFuture(Arc::clone(&self.0)) + #[must_use] + pub const fn wait_async(&self) -> WaitFuture { + WaitFuture(self) + } +} + +impl Drop for WaitCtx { + fn drop(&mut self) { + let mut inner = self.0.inner.lock(); + inner.wctx_dropped = true; } } impl Future for WaitCtx { type Output = Result>; @@ -88,17 +107,22 @@ } /// Used to wait for the paired [`SetCtx`](super::SetCtx) to set a value in an /// `async` context. +// A reference to the `WaitCtx` is used (rather than a clone of its +// Arc) to tie the lifetime of the `WaitCtx` and the `WaitFuture` +// together, to avoid `WaitFuture` outliving the `WaitCtx` (which wouldn't make +// sense, because dropping `WaitCtx` sets `wctx_dropped` in the shared +// context). #[repr(transparent)] -pub struct WaitFuture(Arc>); +pub struct WaitFuture<'wctx, T, S, E>(&'wctx WaitCtx); -impl Future for WaitFuture { +impl<'wctx, T, S, E> Future for WaitFuture<'wctx, T, S, E> { type Output = Result>; fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { - let mut inner = self.0.inner.lock(); + let mut inner = self.0 .0.inner.lock(); match inner.try_get() { Ok(Some(v)) => Poll::Ready(Ok(v)), Ok(None) => { inner.waker = Some(ctx.waker().clone()); Poll::Pending Index: tests/errors.rs ================================================================== --- tests/errors.rs +++ tests/errors.rs @@ -43,11 +43,11 @@ #[test] fn noreply_before_wait() { let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { - sctx.set_state(State::NoReply); + sctx.set_state(State::NoReply).unwrap(); }); jh.join().unwrap(); assert_eq!(wctx.wait(), Err(Error::Aborted(State::NoReply))); } @@ -56,11 +56,11 @@ #[test] fn noreply_after_wait() { let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { - sctx.set_state(State::NoReply); + sctx.set_state(State::NoReply).unwrap(); std::thread::sleep(std::time::Duration::from_millis(500)); }); assert_eq!(wctx.wait(), Err(Error::Aborted(State::NoReply))); @@ -71,11 +71,11 @@ #[test] fn apperr_before_wait() { let (sctx, wctx) = mkpair::<(), State, &str>(); let jh = thread::spawn(move || { - sctx.fail("yikes"); + sctx.fail("yikes").unwrap(); }); jh.join().unwrap(); assert_eq!(wctx.wait(), Err(Error::App("yikes"))); } @@ -85,14 +85,22 @@ fn apperr_after_wait() { let (sctx, wctx) = mkpair::<(), State, &str>(); let jh = thread::spawn(move || { std::thread::sleep(std::time::Duration::from_millis(500)); - sctx.fail("yikes"); + sctx.fail("yikes").unwrap(); }); assert_eq!(wctx.wait(), Err(Error::App("yikes"))); jh.join().unwrap(); } + +#[test] +fn lost_waiter() { + let (sctx, wctx) = mkpair::<(), State, &str>(); + drop(wctx); + assert_eq!(sctx.set_state(State::Abort), Err(Error::LostWaiter)); + assert_eq!(sctx.set(()), Err(Error::LostWaiter)); +} // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: tests/errors_async.rs ================================================================== --- tests/errors_async.rs +++ tests/errors_async.rs @@ -51,11 +51,11 @@ let tokrt = tokio::runtime::Runtime::new().unwrap(); let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { - sctx.set_state(State::NoReply); + sctx.set_state(State::NoReply).unwrap(); }); jh.join().unwrap(); tokrt.block_on(async { assert_eq!(wctx.wait_async().await, Err(Error::Aborted(State::NoReply))); @@ -69,11 +69,11 @@ let tokrt = tokio::runtime::Runtime::new().unwrap(); let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { - sctx.set_state(State::NoReply); + sctx.set_state(State::NoReply).unwrap(); std::thread::sleep(std::time::Duration::from_millis(500)); }); tokrt.block_on(async { assert_eq!(wctx.wait_async().await, Err(Error::Aborted(State::NoReply))); @@ -88,11 +88,11 @@ let tokrt = tokio::runtime::Runtime::new().unwrap(); let (sctx, wctx) = mkpair::<(), State, &str>(); let jh = thread::spawn(move || { - sctx.fail("yikes"); + sctx.fail("yikes").unwrap(); }); jh.join().unwrap(); tokrt.block_on(async { assert_eq!(wctx.wait_async().await, Err(Error::App("yikes"))); @@ -106,11 +106,11 @@ let (sctx, wctx) = mkpair::<(), State, &str>(); let jh = thread::spawn(move || { std::thread::sleep(std::time::Duration::from_millis(500)); - sctx.fail("yikes"); + sctx.fail("yikes").unwrap(); }); tokrt.block_on(async { assert_eq!(wctx.wait_async().await, Err(Error::App("yikes"))); }); Index: tests/simple.rs ================================================================== --- tests/simple.rs +++ tests/simple.rs @@ -5,11 +5,11 @@ #[test] fn say_hello_before_wait() { let (sctx, wctx) = mkpair::<&str, (), &str>(); let jh = thread::spawn(move || { - sctx.set("hello"); + sctx.set("hello").unwrap(); }); // join ensures that SetCtx::set() has been called jh.join().unwrap(); assert_eq!(wctx.wait().unwrap(), "hello"); @@ -20,11 +20,11 @@ let (sctx, wctx) = mkpair::<&str, (), &str>(); let jh = thread::spawn(move || { // yolo assume 500ms is sufficient thread::sleep(time::Duration::from_millis(500)); - sctx.set("hello"); + sctx.set("hello").unwrap(); }); assert_eq!(wctx.wait().unwrap(), "hello"); jh.join().unwrap(); @@ -35,11 +35,11 @@ let tokrt = tokio::runtime::Runtime::new().unwrap(); let (sctx, wctx) = mkpair::<&str, (), &str>(); let jh = thread::spawn(move || { - sctx.set("hello"); + sctx.set("hello").unwrap(); }); // join ensures that SetCtx::set() has been called jh.join().unwrap(); tokrt.block_on(async { @@ -54,11 +54,11 @@ let (sctx, wctx) = mkpair::<&str, (), &str>(); let jh = thread::spawn(move || { // yolo assume 500ms is sufficient thread::sleep(time::Duration::from_millis(500)); - sctx.set("hello"); + sctx.set("hello").unwrap(); }); tokrt.block_on(async { assert_eq!(wctx.wait_async().await.unwrap(), "hello"); }); Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -1,12 +1,45 @@ # Change Log +⚠️ indicates a breaking change. + ## [Unreleased] -[Details](/vdiff?from=swctx-0.2.1&to=trunk) +[Details](/vdiff?from=swctx-0.3.0&to=trunk) + +### Added + +### Changed + +### Removed + +--- + +## [0.3.0] + +[Details](/vdiff?from=swctx-0.2.2&to=swctx-0.3.0) ### Added + +- ⚠️ `Error::LostWaiter` will be returned by `SetCtx` functions that are intended + to make changes that would have affected the resolution of the `WaitCtx`, + but it was dropped. + +### Changed + +- `SetCtx`'s functions for passing a value to the `WaitCtx`, setting state or + failing now return a `Result`. +- ⚠️ The `E` bound used for `Error::App(E)` has been changed to + `std::error::Error` from `fmt::Debug`. +- ⚠️ Tie the lifetime of `WaitFuture` to the lifetime of `WaitCtx` to avoid + `WaitFuture` outliving its `WaitCtx`. + +--- + +## [0.2.2] - 2024-01-14 + +[Details](/vdiff?from=swctx-0.2.1&to=swctx-0.2.2) ### Changed - Export `WaitFuture`. - Implement `Future` on `WaitCtx`.