Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From swctx-0.1.1 To swctx-0.2.0
2023-08-14
| ||
21:30 | Add --generate-link-to-definition to rustdoc. check-in: fb0d7a6b05 user: jan tags: trunk | |
2023-08-10
| ||
15:40 | Add some test comments. check-in: cab5919dd1 user: jan tags: swctx-0.2.0, trunk | |
15:28 | Release maintenance. check-in: 369d9fded8 user: jan tags: trunk | |
15:15 | Merge. check-in: b2f56db268 user: jan tags: trunk | |
2023-08-09
| ||
13:30 | Experimenting with generalized SetCtx state. check-in: e69956c141 user: jan tags: generalized-setctx-state | |
2023-08-08
| ||
07:22 | Release maintenance. check-in: 580e75c17c user: jan tags: swctx-0.1.1, trunk | |
07:19 | Add try_get() as a non-blocking alternative to wait(). check-in: c6619994a8 user: jan tags: trunk | |
Changes to Cargo.toml.
1 2 | [package] name = "swctx" | | < > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | [package] name = "swctx" version = "0.2.0" edition = "2021" license = "0BSD" categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/swctx" description = "One-shot channel with some special semantics." rust-version = "1.56" exclude = [ ".fossil-settings", ".efiles", ".fslckout", "www", "rustfmt.toml" ] [features] dev-docs = [] |
︙ | ︙ |
Changes to README.md.
1 2 | # swctx | | | 1 2 3 4 5 | # swctx Set/Wait Context (_swctx_) is a one-shot channel-like construct with some special semantics. |
Changes to src/err.rs.
1 2 3 4 5 6 | //! Error management module. use std::fmt; /// Errors that can be returned by the `swctx` library. #[derive(Debug, PartialEq)] | | | < | | | < < < | < | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | //! Error management module. use std::fmt; /// Errors that can be returned by the `swctx` library. #[derive(Debug, PartialEq)] pub enum Error<S, E> { /// The [`SetCtx`](super::SetCtx) was dropped while in state 'S'. Aborted(S), /// The [`SetCtx`](super::SetCtx) triggered an application-defined error. App(E) } impl<S, E> Error<S, E> { /// Return application-specific error, if set. /// /// If the error is not `Error::App(E)` then return `None`, otherwise return /// Some(E). pub fn into_apperr(self) -> Option<E> { match self { Error::App(e) => Some(e), |
︙ | ︙ | |||
37 38 39 40 41 42 43 | match self { Error::App(e) => e, _ => panic!("Not an Error::App") } } } | | | | | < < < | 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | match self { Error::App(e) => e, _ => panic!("Not an Error::App") } } } impl<S: fmt::Debug, E: fmt::Debug> std::error::Error for Error<S, E> {} impl<S, E: fmt::Debug> fmt::Display for Error<S, E> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Error::Aborted(_) => { write!(f, "The set context was dropped prematurely") } Error::App(err) => write!(f, "Application error; {:?}", err) } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/lib.rs.
1 2 3 4 5 6 7 | //! _swctx_ is very similar to a one-shot channel, but with some added //! semantics. //! //! ``` //! use std::thread; //! use swctx::mkpair; //! | | | | | < | < < < | | | < | | | < < < | | | > | | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | //! _swctx_ is very similar to a one-shot channel, but with some added //! semantics. //! //! ``` //! use std::thread; //! use swctx::mkpair; //! //! let (sctx, wctx) = mkpair::<&str, &str, &str>(); //! let jh = thread::spawn(move || { //! sctx.set_state("in thread"); //! sctx.set("hello"); //! }); //! jh.join().unwrap(); //! //! assert_eq!(wctx.wait().unwrap(), "hello"); //! ``` //! //! In a typical use-case an application or library calls [`mkpair()`] to //! create a pair of linked [`SetCtx`] and [`WaitCtx`] object. The `SetCtx` //! object is transferred to a remote thread/task, and the `WaitCtx` is used //! wait for an object to arrive [from the thread/task the `SetCtx` is sent //! to]. //! //! Once the thread/task has data to send back to the `WaitCtx` it calls //! [`SetCtx::set()`] to send the data. //! //! The `SetCtx` has an internal state, settable using [`SetCtx::set_state()`] //! that will be reported back to the `WaitCtx`, which will return //! [`Error::Aborted`], if the `SetCtx` is dropped prematurely. //! //! The `SetCtx` can also signal a failure by calling [`SetCtx::fail()`] and //! pass along an application-specific error code. This will cause the //! `WaitCtx` to unblock and return [`Error::App`]. mod err; mod sctx; mod wctx; use std::{sync::Arc, task::Waker}; use parking_lot::{Condvar, Mutex}; pub use sctx::SetCtx; pub use wctx::WaitCtx; pub use err::Error; enum State<T, S, E> { /// Waiting for a delivery. Waiting, /// Data was delivered. Data(T), /// Reply is being returned to caller. Finalized, /// An error occurred. Err(Error<S, E>) } struct Inner<T, S, E> { state: State<T, S, E>, sctx_state: S, waker: Option<Waker> } impl<T, S, E> Inner<T, S, E> { fn try_get(&mut self) -> Result<Option<T>, Error<S, E>> { match self.state { State::Waiting => Ok(None), State::Data(_) => { let old = std::mem::replace(&mut self.state, State::Finalized); let State::Data(data) = old else { panic!("Unable to extract data"); }; Ok(Some(data)) } |
︙ | ︙ | |||
94 95 96 97 98 99 100 | panic!("Unexpected state"); } } } } | | | | | | > > > | > | 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | panic!("Unexpected state"); } } } } struct Shared<T, S, E> { inner: Mutex<Inner<T, S, E>>, signal: Condvar } impl<T, S, E> Shared<T, S, E> { fn notify_waiter(&self, inner: &mut Inner<T, S, E>) { self.signal.notify_one(); if let Some(waker) = inner.waker.take() { waker.wake() } } } /// Create a linked [`SetCtx`] and [`WaitCtx`] pair. /// /// The `WaitCtx` is used to wait for a value to arrive from the `SetCtx`. pub fn mkpair<T, S, E>() -> (SetCtx<T, S, E>, WaitCtx<T, S, E>) where S: Clone + Default { let inner = Inner { state: State::Waiting, sctx_state: S::default(), waker: None }; let sh = Shared { inner: Mutex::new(inner), signal: Condvar::new() }; let sh = Arc::new(sh); |
︙ | ︙ |
Changes to src/sctx.rs.
1 2 3 4 5 6 7 8 9 | use std::sync::Arc; use crate::err::Error; use super::{Shared, State}; /// End-point used to send value to the paired [`WaitCtx`](super::WaitCtx). #[repr(transparent)] | | | > > > > > > > > > > > > > > > > > > > | > > > | > > > | < | | < | < | | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | use std::sync::Arc; use crate::err::Error; use super::{Shared, State}; /// End-point used to send value to the paired [`WaitCtx`](super::WaitCtx). #[repr(transparent)] pub struct SetCtx<T, S: Clone, E>(pub(crate) Arc<Shared<T, S, E>>); impl<T, S, E> SetCtx<T, S, E> where S: Clone { /// Set the internal [`SetCtx`] state. /// /// If an `SetCtx` is dropped prematurely (i.e. without setting a value or /// repering a failure) an `Error::Aborted(S)` will automatically be sent to /// the linked [`WaitCtx`](super::WaitCtx) object, where `S` will be set to /// the value last set using `set_state()`. /// /// ``` /// use std::thread; /// use swctx::{mkpair, Error}; /// /// #[derive(Clone, Debug, Default, PartialEq)] /// enum State { /// #[default] /// Init, /// InThread /// } /// /// let (sctx, wctx) = mkpair::<&str, State, &str>(); /// let jh = thread::spawn(move || { /// sctx.set_state(State::InThread); /// // sctx is prematurely dropped here /// }); /// jh.join().unwrap(); /// /// assert_eq!(wctx.wait(), Err(Error::Aborted(State::InThread))); /// ``` pub fn set_state(&self, s: S) { let mut inner = self.0.inner.lock(); inner.sctx_state = s; } /// Consume the `SetCtx` and store a value for the wait context to return. #[cfg_attr( feature = "dev-docs", doc = r#" # Internals - This and `fail()` consume `self`. - The wait context does not modify the state. - The only other method is `set_state()` and it only touches `sctx_state` and not `state`. These facts allow us to safely assume that the state does not need to be checked here -- it can only be `Waiting`. The only "weird" state that can happen is that the wait context has been dropped, but we silently ignore this case. "# )] pub fn set(self, data: T) { let mut inner = self.0.inner.lock(); |
︙ | ︙ | |||
58 59 60 61 62 63 64 | pub fn fail(self, error: E) { let mut inner = self.0.inner.lock(); inner.state = State::Err(Error::App(error)); self.0.notify_waiter(&mut inner); } } | | > > > < < < < | | 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | pub fn fail(self, error: E) { let mut inner = self.0.inner.lock(); inner.state = State::Err(Error::App(error)); self.0.notify_waiter(&mut inner); } } impl<T, S, E> Drop for SetCtx<T, S, E> where S: Clone { fn drop(&mut self) { let mut inner = self.0.inner.lock(); match inner.state { State::Waiting => { inner.state = State::Err(Error::Aborted(inner.sctx_state.clone())); self.0.notify_waiter(&mut inner); } State::Data(_) => { // Do nothing. Assume the waiter will handle the data. } State::Err(_) | State::Finalized => { // Do nothing } } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/wctx.rs.
︙ | ︙ | |||
8 9 10 11 12 13 14 | use crate::err::Error; use super::{Shared, State}; /// End-point used to wait for a value to be sent from the paired /// [`SetCtx`](super::SetCtx). | > | | | | | 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | use crate::err::Error; use super::{Shared, State}; /// End-point used to wait for a value to be sent from the paired /// [`SetCtx`](super::SetCtx). #[repr(transparent)] pub struct WaitCtx<T, S, E>(pub(crate) Arc<Shared<T, S, E>>); impl<T, S, E> WaitCtx<T, S, E> { /// Wait for the paired [`SetCtx`](super::SetCtx) to set a value or fail. pub fn wait(self) -> Result<T, Error<S, E>> { let mut inner = self.0.inner.lock(); loop { match inner.state { State::Waiting => { self.0.signal.wait(&mut inner); } State::Data(_) => { let old = std::mem::replace(&mut inner.state, State::Finalized); let State::Data(data) = old else { panic!("Unable to extract data"); }; |
︙ | ︙ | |||
48 49 50 51 52 53 54 | /// /// Returns `Ok(Some(T))` if a value has been stored. Returns `Ok(None)` if /// no value has been stored. /// /// # Panic /// This function will panic if called again after it has resolved to either /// data or error. | | | | | | | 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | /// /// Returns `Ok(Some(T))` if a value has been stored. Returns `Ok(None)` if /// no value has been stored. /// /// # Panic /// This function will panic if called again after it has resolved to either /// data or error. pub fn try_get(&self) -> Result<Option<T>, Error<S, E>> { let mut inner = self.0.inner.lock(); inner.try_get() } /// Return a `Future` that will wait for either data to be set or an error to /// occur. /// /// # Cancel safety /// This method is cancel safe. /// /// # Panic /// This function will panic if called again after it has resolved to either /// data or error. pub fn wait_async(&self) -> WaitFuture<T, S, E> { WaitFuture(Arc::clone(&self.0)) } } #[repr(transparent)] pub struct WaitFuture<T, S, E>(Arc<Shared<T, S, E>>); impl<T, S, E> Future for WaitFuture<T, S, E> { type Output = Result<T, Error<S, E>>; fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> { let mut inner = self.0.inner.lock(); match inner.try_get() { Ok(Some(v)) => Poll::Ready(Ok(v)), Ok(None) => { inner.waker = Some(ctx.waker().clone()); Poll::Pending |
︙ | ︙ |
Changes to tests/errors.rs.
1 2 3 4 5 6 7 | use std::thread; use swctx::{mkpair, Error}; // Trigger an abortion error before wait is called (hopefully). #[test] fn abort_before_wait() { | > > > > > > > | > | | > | | | | | | | | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 | use std::thread; use swctx::{mkpair, Error}; #[derive(Clone, Debug, Default, PartialEq)] enum State { #[default] Abort, NoReply } // Trigger an abortion error before wait is called (hopefully). #[test] fn abort_before_wait() { let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { let _sctx2 = sctx; }); // join() ensures that SetCtx has been prematurely dropped jh.join().unwrap(); assert_eq!(wctx.wait(), Err(Error::Aborted(State::Abort))); } // Trigger an abortion error after wait is called (hopefully). #[test] fn abort_after_wait() { let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { // yolo assume 500ms is sufficient std::thread::sleep(std::time::Duration::from_millis(500)); let _sctx2 = sctx; }); assert_eq!(wctx.wait(), Err(Error::Aborted(State::Abort))); jh.join().unwrap(); } // Trigger a no-reply error before wait is called (hopefully). #[test] fn noreply_before_wait() { let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { sctx.set_state(State::NoReply); }); jh.join().unwrap(); assert_eq!(wctx.wait(), Err(Error::Aborted(State::NoReply))); } // Trigger an no-reply error after wait is called (hopefully). #[test] fn noreply_after_wait() { let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { sctx.set_state(State::NoReply); std::thread::sleep(std::time::Duration::from_millis(500)); }); assert_eq!(wctx.wait(), Err(Error::Aborted(State::NoReply))); jh.join().unwrap(); } // Trigger an no-reply error before wait is called (hopefully). #[test] fn apperr_before_wait() { let (sctx, wctx) = mkpair::<(), State, &str>(); let jh = thread::spawn(move || { sctx.fail("yikes"); }); jh.join().unwrap(); assert_eq!(wctx.wait(), Err(Error::App("yikes"))); } // Trigger an no-reply error after wait is called (hopefully). #[test] fn apperr_after_wait() { let (sctx, wctx) = mkpair::<(), State, &str>(); let jh = thread::spawn(move || { std::thread::sleep(std::time::Duration::from_millis(500)); sctx.fail("yikes"); }); assert_eq!(wctx.wait(), Err(Error::App("yikes"))); |
︙ | ︙ |
Changes to tests/errors_async.rs.
1 2 3 4 5 6 7 8 9 | use std::thread; use swctx::{mkpair, Error}; // Trigger an abortion error before wait is called (hopefully). #[test] fn abort_before_wait() { let tokrt = tokio::runtime::Runtime::new().unwrap(); | > > > > > > > | | | | | | | | | | | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | use std::thread; use swctx::{mkpair, Error}; #[derive(Clone, Debug, Default, PartialEq)] enum State { #[default] Abort, NoReply } // Trigger an abortion error before wait is called (hopefully). #[test] fn abort_before_wait() { let tokrt = tokio::runtime::Runtime::new().unwrap(); let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { let _sctx2 = sctx; }); jh.join().unwrap(); tokrt.block_on(async { assert_eq!(wctx.wait_async().await, Err(Error::Aborted(State::Abort))); }); } // Trigger an abortion error after wait is called (hopefully). #[test] fn abort_after_wait() { let tokrt = tokio::runtime::Runtime::new().unwrap(); let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { std::thread::sleep(std::time::Duration::from_millis(500)); let _sctx2 = sctx; }); tokrt.block_on(async { assert_eq!(wctx.wait_async().await, Err(Error::Aborted(State::Abort))); }); jh.join().unwrap(); } // Trigger a no-reply error before wait is called (hopefully). #[test] fn noreply_before_wait() { let tokrt = tokio::runtime::Runtime::new().unwrap(); let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { sctx.set_state(State::NoReply); }); jh.join().unwrap(); tokrt.block_on(async { assert_eq!(wctx.wait_async().await, Err(Error::Aborted(State::NoReply))); }); } // Trigger an no-reply error after wait is called (hopefully). #[test] fn noreply_after_wait() { let tokrt = tokio::runtime::Runtime::new().unwrap(); let (sctx, wctx) = mkpair::<(), State, ()>(); let jh = thread::spawn(move || { sctx.set_state(State::NoReply); std::thread::sleep(std::time::Duration::from_millis(500)); }); tokrt.block_on(async { assert_eq!(wctx.wait_async().await, Err(Error::Aborted(State::NoReply))); }); jh.join().unwrap(); } // Trigger an no-reply error before wait is called (hopefully). #[test] fn apperr_before_wait() { let tokrt = tokio::runtime::Runtime::new().unwrap(); let (sctx, wctx) = mkpair::<(), State, &str>(); let jh = thread::spawn(move || { sctx.fail("yikes"); }); jh.join().unwrap(); tokrt.block_on(async { assert_eq!(wctx.wait_async().await, Err(Error::App("yikes"))); }); } // Trigger an no-reply error after wait is called (hopefully). #[test] fn apperr_after_wait() { let tokrt = tokio::runtime::Runtime::new().unwrap(); let (sctx, wctx) = mkpair::<(), State, &str>(); let jh = thread::spawn(move || { std::thread::sleep(std::time::Duration::from_millis(500)); sctx.fail("yikes"); }); tokrt.block_on(async { |
︙ | ︙ |
Changes to tests/simple.rs.
1 2 3 4 5 6 | use std::{thread, time}; use swctx::mkpair; #[test] fn say_hello_before_wait() { | | > | > | > | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | use std::{thread, time}; use swctx::mkpair; #[test] fn say_hello_before_wait() { let (sctx, wctx) = mkpair::<&str, (), &str>(); let jh = thread::spawn(move || { sctx.set("hello"); }); // join ensures that SetCtx::set() has been called jh.join().unwrap(); assert_eq!(wctx.wait().unwrap(), "hello"); } #[test] fn say_hello_after_wait() { let (sctx, wctx) = mkpair::<&str, (), &str>(); let jh = thread::spawn(move || { // yolo assume 500ms is sufficient thread::sleep(time::Duration::from_millis(500)); sctx.set("hello"); }); assert_eq!(wctx.wait().unwrap(), "hello"); jh.join().unwrap(); } #[test] fn async_say_hello_before_wait() { let tokrt = tokio::runtime::Runtime::new().unwrap(); let (sctx, wctx) = mkpair::<&str, (), &str>(); let jh = thread::spawn(move || { sctx.set("hello"); }); // join ensures that SetCtx::set() has been called jh.join().unwrap(); tokrt.block_on(async { assert_eq!(wctx.wait_async().await.unwrap(), "hello"); }); } #[test] fn async_say_hello_after_wait() { let tokrt = tokio::runtime::Runtime::new().unwrap(); let (sctx, wctx) = mkpair::<&str, (), &str>(); let jh = thread::spawn(move || { // yolo assume 500ms is sufficient thread::sleep(time::Duration::from_millis(500)); sctx.set("hello"); }); tokrt.block_on(async { assert_eq!(wctx.wait_async().await.unwrap(), "hello"); }); |
︙ | ︙ |
Changes to www/changelog.md.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.1.1] - 2023-08-08 ### Added - Add a `try_get()` as a non-blocking alternative to `wait()`. | > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.2.0] - 2023-08-10 ### Changed - Generalize `SetCtx` state. ## [0.1.1] - 2023-08-08 ### Added - Add a `try_get()` as a non-blocking alternative to `wait()`. |
︙ | ︙ |