Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,18 +1,18 @@ [package] name = "swctx" -version = "0.1.1" +version = "0.2.0" edition = "2021" license = "0BSD" categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/swctx" description = "One-shot channel with some special semantics." rust-version = "1.56" exclude = [ - ".efiles", ".fossil-settings", + ".efiles", ".fslckout", "www", "rustfmt.toml" ] Index: README.md ================================================================== --- README.md +++ README.md @@ -1,5 +1,5 @@ # swctx -Set/Wait Context (_swctx_) is a one-shot channel like construct with some +Set/Wait Context (_swctx_) is a one-shot channel-like construct with some special semantics. Index: src/err.rs ================================================================== --- src/err.rs +++ src/err.rs @@ -2,24 +2,19 @@ use std::fmt; /// Errors that can be returned by the `swctx` library. #[derive(Debug, PartialEq)] -pub enum Error { - /// The [`SetCtx`](super::SetCtx) was dropped while still in "inactive" - /// state. - Aborted, +pub enum Error { + /// The [`SetCtx`](super::SetCtx) was dropped while in state 'S'. + Aborted(S), /// The [`SetCtx`](super::SetCtx) triggered an application-defined error. - App(E), - - /// The [`SetCtx`](super::SetCtx) was dropped after having been made - /// "active". - NoReply + App(E) } -impl Error { +impl Error { /// Return application-specific error, if set. /// /// If the error is not `Error::App(E)` then return `None`, otherwise return /// Some(E). pub fn into_apperr(self) -> Option { @@ -39,22 +34,19 @@ _ => panic!("Not an Error::App") } } } -impl std::error::Error for Error {} +impl std::error::Error for Error {} -impl fmt::Display for Error { +impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - Error::Aborted => { + Error::Aborted(_) => { write!(f, "The set context was dropped prematurely") } - Error::App(err) => write!(f, "Application error; {:?}", err), - Error::NoReply => { - write!(f, "The set context never set an object being dropped") - } + Error::App(err) => write!(f, "Application error; {:?}", err) } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -3,41 +3,36 @@ //! //! ``` //! use std::thread; //! use swctx::mkpair; //! -//! let (sctx, wctx) = mkpair::<&str, &str>(); +//! let (sctx, wctx) = mkpair::<&str, &str, &str>(); //! let jh = thread::spawn(move || { -//! sctx.activate(); +//! sctx.set_state("in thread"); //! sctx.set("hello"); //! }); //! jh.join().unwrap(); //! //! assert_eq!(wctx.wait().unwrap(), "hello"); //! ``` //! //! In a typical use-case an application or library calls [`mkpair()`] to //! create a pair of linked [`SetCtx`] and [`WaitCtx`] object. The `SetCtx` -//! object is transferred to a remote thread/task, and then the `WaitCtx` is -//! used wait for an object to arrive [from the thread/task the `SetCtx` is -//! sent to]. -//! -//! The `SetCtx` begins its life in an _inactive_ state, the assumption being -//! that it has been placed in a queue waiting to be handled by an application. -//! The `SetCtx` is made _active_ by calling [`SetCtx::activate()`]. +//! object is transferred to a remote thread/task, and the `WaitCtx` is used +//! wait for an object to arrive [from the thread/task the `SetCtx` is sent +//! to]. //! //! Once the thread/task has data to send back to the `WaitCtx` it calls //! [`SetCtx::set()`] to send the data. //! -//! If the `SetCtx` object is dropped before calling `SetCtx::set()` the -//! `WaitCtx` will return an error. If the `SetCtx` is in _inactive_ state the -//! error will be `Error::Aborted`. If it is _active_ the error will be -//! `Error::NoReply`. +//! The `SetCtx` has an internal state, settable using [`SetCtx::set_state()`] +//! that will be reported back to the `WaitCtx`, which will return +//! [`Error::Aborted`], if the `SetCtx` is dropped prematurely. //! -//! The `SetCtx` can also signal a failure by calling [`SetCtx::fail(E)`] and +//! The `SetCtx` can also signal a failure by calling [`SetCtx::fail()`] and //! pass along an application-specific error code. This will cause the -//! `WaitCtx` to unblock and return `Error::App(E)`. +//! `WaitCtx` to unblock and return [`Error::App`]. mod err; mod sctx; mod wctx; @@ -48,14 +43,11 @@ pub use sctx::SetCtx; pub use wctx::WaitCtx; pub use err::Error; -enum State { - /// The set context has not yet been activated. - Inactive, - +enum State { /// Waiting for a delivery. Waiting, /// Data was delivered. Data(T), @@ -62,22 +54,23 @@ /// Reply is being returned to caller. Finalized, /// An error occurred. - Err(Error) + Err(Error) } -struct Inner { - state: State, +struct Inner { + state: State, + sctx_state: S, waker: Option } -impl Inner { - fn try_get(&mut self) -> Result, Error> { +impl Inner { + fn try_get(&mut self) -> Result, Error> { match self.state { - State::Inactive | State::Waiting => Ok(None), + State::Waiting => Ok(None), State::Data(_) => { let old = std::mem::replace(&mut self.state, State::Finalized); let State::Data(data) = old else { panic!("Unable to extract data"); }; @@ -96,17 +89,17 @@ } } } -struct Shared { - inner: Mutex>, +struct Shared { + inner: Mutex>, signal: Condvar } -impl Shared { - fn notify_waiter(&self, inner: &mut Inner) { +impl Shared { + fn notify_waiter(&self, inner: &mut Inner) { self.signal.notify_one(); if let Some(waker) = inner.waker.take() { waker.wake() } } @@ -114,13 +107,17 @@ /// Create a linked [`SetCtx`] and [`WaitCtx`] pair. /// /// The `WaitCtx` is used to wait for a value to arrive from the `SetCtx`. -pub fn mkpair() -> (SetCtx, WaitCtx) { +pub fn mkpair() -> (SetCtx, WaitCtx) +where + S: Clone + Default +{ let inner = Inner { - state: State::Inactive, + state: State::Waiting, + sctx_state: S::default(), waker: None }; let sh = Shared { inner: Mutex::new(inner), signal: Condvar::new() Index: src/sctx.rs ================================================================== --- src/sctx.rs +++ src/sctx.rs @@ -5,38 +5,60 @@ use super::{Shared, State}; /// End-point used to send value to the paired [`WaitCtx`](super::WaitCtx). #[repr(transparent)] -pub struct SetCtx(pub(crate) Arc>); +pub struct SetCtx(pub(crate) Arc>); -impl SetCtx { - /// Mark state as "active". +impl SetCtx +where + S: Clone +{ + /// Set the internal [`SetCtx`] state. + /// + /// If an `SetCtx` is dropped prematurely (i.e. without setting a value or + /// repering a failure) an `Error::Aborted(S)` will automatically be sent to + /// the linked [`WaitCtx`](super::WaitCtx) object, where `S` will be set to + /// the value last set using `set_state()`. + /// + /// ``` + /// use std::thread; + /// use swctx::{mkpair, Error}; + /// + /// #[derive(Clone, Debug, Default, PartialEq)] + /// enum State { + /// #[default] + /// Init, + /// InThread + /// } + /// + /// let (sctx, wctx) = mkpair::<&str, State, &str>(); + /// let jh = thread::spawn(move || { + /// sctx.set_state(State::InThread); + /// // sctx is prematurely dropped here + /// }); + /// jh.join().unwrap(); /// - /// A prematurely dropped `SetCtx` in the "inactive" state will cause the - /// wait context to return `Error::Aborted`. This method will change the - /// state to "waiting", which would trigger a `Error::NoReply` to be returned - /// from the wait context. - pub fn activate(&self) { + /// assert_eq!(wctx.wait(), Err(Error::Aborted(State::InThread))); + /// ``` + pub fn set_state(&self, s: S) { let mut inner = self.0.inner.lock(); - if let State::Inactive = inner.state { - inner.state = State::Waiting; - } + inner.sctx_state = s; } /// Consume the `SetCtx` and store a value for the wait context to return. #[cfg_attr( feature = "dev-docs", doc = r#" # Internals - This and `fail()` consume `self`. - The wait context does not modify the state. -- Apart from `activate()` there are no other methods that change the state, - and it does nothing that affects this (and `fail()`). +- The only other method is `set_state()` and it only touches `sctx_state` and + not `state`. These facts allow us to safely assume that the state does not need to be -checked here -- it will either be `Inactive` or `Waiting`. +checked here -- it can only be `Waiting`. The only "weird" state that can happen is that the wait context has been dropped, but we silently ignore this case. "# )] @@ -60,20 +82,19 @@ inner.state = State::Err(Error::App(error)); self.0.notify_waiter(&mut inner); } } -impl Drop for SetCtx { +impl Drop for SetCtx +where + S: Clone +{ fn drop(&mut self) { let mut inner = self.0.inner.lock(); match inner.state { - State::Inactive => { - inner.state = State::Err(Error::Aborted); - self.0.notify_waiter(&mut inner); - } State::Waiting => { - inner.state = State::Err(Error::NoReply); + inner.state = State::Err(Error::Aborted(inner.sctx_state.clone())); self.0.notify_waiter(&mut inner); } State::Data(_) => { // Do nothing. Assume the waiter will handle the data. } Index: src/wctx.rs ================================================================== --- src/wctx.rs +++ src/wctx.rs @@ -10,19 +10,20 @@ use super::{Shared, State}; /// End-point used to wait for a value to be sent from the paired /// [`SetCtx`](super::SetCtx). -pub struct WaitCtx(pub(crate) Arc>); +#[repr(transparent)] +pub struct WaitCtx(pub(crate) Arc>); -impl WaitCtx { +impl WaitCtx { /// Wait for the paired [`SetCtx`](super::SetCtx) to set a value or fail. - pub fn wait(self) -> Result> { + pub fn wait(self) -> Result> { let mut inner = self.0.inner.lock(); loop { match inner.state { - State::Inactive | State::Waiting => { + State::Waiting => { self.0.signal.wait(&mut inner); } State::Data(_) => { let old = std::mem::replace(&mut inner.state, State::Finalized); let State::Data(data) = old else { @@ -50,11 +51,11 @@ /// no value has been stored. /// /// # Panic /// This function will panic if called again after it has resolved to either /// data or error. - pub fn try_get(&self) -> Result, Error> { + pub fn try_get(&self) -> Result, Error> { let mut inner = self.0.inner.lock(); inner.try_get() } /// Return a `Future` that will wait for either data to be set or an error to @@ -64,21 +65,21 @@ /// This method is cancel safe. /// /// # Panic /// This function will panic if called again after it has resolved to either /// data or error. - pub fn wait_async(&self) -> WaitFuture { + pub fn wait_async(&self) -> WaitFuture { WaitFuture(Arc::clone(&self.0)) } } #[repr(transparent)] -pub struct WaitFuture(Arc>); +pub struct WaitFuture(Arc>); -impl Future for WaitFuture { - type Output = Result>; +impl Future for WaitFuture { + type Output = Result>; fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { let mut inner = self.0.inner.lock(); match inner.try_get() { Ok(Some(v)) => Poll::Ready(Ok(v)), Ok(None) => { Index: tests/errors.rs ================================================================== --- tests/errors.rs +++ tests/errors.rs @@ -1,69 +1,78 @@ use std::thread; use swctx::{mkpair, Error}; + +#[derive(Clone, Debug, Default, PartialEq)] +enum State { + #[default] + Abort, + NoReply +} // Trigger an abortion error before wait is called (hopefully). #[test] fn abort_before_wait() { - let (sctx, wctx) = mkpair::<(), ()>(); + let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { let _sctx2 = sctx; }); + // join() ensures that SetCtx has been prematurely dropped jh.join().unwrap(); - assert_eq!(wctx.wait(), Err(Error::Aborted)); + assert_eq!(wctx.wait(), Err(Error::Aborted(State::Abort))); } // Trigger an abortion error after wait is called (hopefully). #[test] fn abort_after_wait() { - let (sctx, wctx) = mkpair::<(), ()>(); + let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { + // yolo assume 500ms is sufficient std::thread::sleep(std::time::Duration::from_millis(500)); let _sctx2 = sctx; }); - assert_eq!(wctx.wait(), Err(Error::Aborted)); + assert_eq!(wctx.wait(), Err(Error::Aborted(State::Abort))); jh.join().unwrap(); } // Trigger a no-reply error before wait is called (hopefully). #[test] fn noreply_before_wait() { - let (sctx, wctx) = mkpair::<(), ()>(); + let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { - sctx.activate(); + sctx.set_state(State::NoReply); }); jh.join().unwrap(); - assert_eq!(wctx.wait(), Err(Error::NoReply)); + assert_eq!(wctx.wait(), Err(Error::Aborted(State::NoReply))); } // Trigger an no-reply error after wait is called (hopefully). #[test] fn noreply_after_wait() { - let (sctx, wctx) = mkpair::<(), ()>(); + let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { - sctx.activate(); + sctx.set_state(State::NoReply); std::thread::sleep(std::time::Duration::from_millis(500)); }); - assert_eq!(wctx.wait(), Err(Error::NoReply)); + assert_eq!(wctx.wait(), Err(Error::Aborted(State::NoReply))); jh.join().unwrap(); } // Trigger an no-reply error before wait is called (hopefully). #[test] fn apperr_before_wait() { - let (sctx, wctx) = mkpair::<(), &str>(); + let (sctx, wctx) = mkpair::<(), State, &str>(); let jh = thread::spawn(move || { sctx.fail("yikes"); }); jh.join().unwrap(); @@ -72,11 +81,11 @@ } // Trigger an no-reply error after wait is called (hopefully). #[test] fn apperr_after_wait() { - let (sctx, wctx) = mkpair::<(), &str>(); + let (sctx, wctx) = mkpair::<(), State, &str>(); let jh = thread::spawn(move || { std::thread::sleep(std::time::Duration::from_millis(500)); sctx.fail("yikes"); }); Index: tests/errors_async.rs ================================================================== --- tests/errors_async.rs +++ tests/errors_async.rs @@ -1,40 +1,47 @@ use std::thread; use swctx::{mkpair, Error}; + +#[derive(Clone, Debug, Default, PartialEq)] +enum State { + #[default] + Abort, + NoReply +} // Trigger an abortion error before wait is called (hopefully). #[test] fn abort_before_wait() { let tokrt = tokio::runtime::Runtime::new().unwrap(); - let (sctx, wctx) = mkpair::<(), ()>(); + let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { let _sctx2 = sctx; }); jh.join().unwrap(); tokrt.block_on(async { - assert_eq!(wctx.wait_async().await, Err(Error::Aborted)); + assert_eq!(wctx.wait_async().await, Err(Error::Aborted(State::Abort))); }); } // Trigger an abortion error after wait is called (hopefully). #[test] fn abort_after_wait() { let tokrt = tokio::runtime::Runtime::new().unwrap(); - let (sctx, wctx) = mkpair::<(), ()>(); + let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { std::thread::sleep(std::time::Duration::from_millis(500)); let _sctx2 = sctx; }); tokrt.block_on(async { - assert_eq!(wctx.wait_async().await, Err(Error::Aborted)); + assert_eq!(wctx.wait_async().await, Err(Error::Aborted(State::Abort))); }); jh.join().unwrap(); } @@ -41,37 +48,37 @@ // Trigger a no-reply error before wait is called (hopefully). #[test] fn noreply_before_wait() { let tokrt = tokio::runtime::Runtime::new().unwrap(); - let (sctx, wctx) = mkpair::<(), ()>(); + let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { - sctx.activate(); + sctx.set_state(State::NoReply); }); jh.join().unwrap(); tokrt.block_on(async { - assert_eq!(wctx.wait_async().await, Err(Error::NoReply)); + assert_eq!(wctx.wait_async().await, Err(Error::Aborted(State::NoReply))); }); } // Trigger an no-reply error after wait is called (hopefully). #[test] fn noreply_after_wait() { let tokrt = tokio::runtime::Runtime::new().unwrap(); - let (sctx, wctx) = mkpair::<(), ()>(); + let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { - sctx.activate(); + sctx.set_state(State::NoReply); std::thread::sleep(std::time::Duration::from_millis(500)); }); tokrt.block_on(async { - assert_eq!(wctx.wait_async().await, Err(Error::NoReply)); + assert_eq!(wctx.wait_async().await, Err(Error::Aborted(State::NoReply))); }); jh.join().unwrap(); } @@ -78,11 +85,11 @@ // Trigger an no-reply error before wait is called (hopefully). #[test] fn apperr_before_wait() { let tokrt = tokio::runtime::Runtime::new().unwrap(); - let (sctx, wctx) = mkpair::<(), &str>(); + let (sctx, wctx) = mkpair::<(), State, &str>(); let jh = thread::spawn(move || { sctx.fail("yikes"); }); jh.join().unwrap(); @@ -95,11 +102,11 @@ // Trigger an no-reply error after wait is called (hopefully). #[test] fn apperr_after_wait() { let tokrt = tokio::runtime::Runtime::new().unwrap(); - let (sctx, wctx) = mkpair::<(), &str>(); + let (sctx, wctx) = mkpair::<(), State, &str>(); let jh = thread::spawn(move || { std::thread::sleep(std::time::Duration::from_millis(500)); sctx.fail("yikes"); }); Index: tests/simple.rs ================================================================== --- tests/simple.rs +++ tests/simple.rs @@ -2,25 +2,27 @@ use swctx::mkpair; #[test] fn say_hello_before_wait() { - let (sctx, wctx) = mkpair::<&str, &str>(); + let (sctx, wctx) = mkpair::<&str, (), &str>(); let jh = thread::spawn(move || { sctx.set("hello"); }); + // join ensures that SetCtx::set() has been called jh.join().unwrap(); assert_eq!(wctx.wait().unwrap(), "hello"); } #[test] fn say_hello_after_wait() { - let (sctx, wctx) = mkpair::<&str, &str>(); + let (sctx, wctx) = mkpair::<&str, (), &str>(); let jh = thread::spawn(move || { + // yolo assume 500ms is sufficient thread::sleep(time::Duration::from_millis(500)); sctx.set("hello"); }); assert_eq!(wctx.wait().unwrap(), "hello"); @@ -30,15 +32,16 @@ #[test] fn async_say_hello_before_wait() { let tokrt = tokio::runtime::Runtime::new().unwrap(); - let (sctx, wctx) = mkpair::<&str, &str>(); + let (sctx, wctx) = mkpair::<&str, (), &str>(); let jh = thread::spawn(move || { sctx.set("hello"); }); + // join ensures that SetCtx::set() has been called jh.join().unwrap(); tokrt.block_on(async { assert_eq!(wctx.wait_async().await.unwrap(), "hello"); }); @@ -46,13 +49,14 @@ #[test] fn async_say_hello_after_wait() { let tokrt = tokio::runtime::Runtime::new().unwrap(); - let (sctx, wctx) = mkpair::<&str, &str>(); + let (sctx, wctx) = mkpair::<&str, (), &str>(); let jh = thread::spawn(move || { + // yolo assume 500ms is sufficient thread::sleep(time::Duration::from_millis(500)); sctx.set("hello"); }); tokrt.block_on(async { Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -6,10 +6,17 @@ ### Changed ### Removed + +## [0.2.0] - 2023-08-10 + +### Changed + +- Generalize `SetCtx` state. + ## [0.1.1] - 2023-08-08 ### Added