Index: .efiles ================================================================== --- .efiles +++ .efiles @@ -5,11 +5,8 @@ src/err.rs src/lib.rs src/server.rs src/client.rs src/rctx.rs -src/rctx/err.rs -src/rctx/inner.rs -src/rctx/public.rs tests/*.rs examples/*.rs benches/*.rs Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,37 +1,40 @@ [package] name = "ump" -version = "0.11.0" -authors = ["Jan Danielsson "] -edition = "2018" +version = "0.12.0" +edition = "2021" license = "0BSD" categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/ump" description = "Micro message passing library for threads/tasks communication." -rust-version = "1.39" +rust-version = "1.56" # Can't exclude "benches", because the [[bench]] section will fail. exclude = [ ".efiles", ".fossil-settings", ".fslckout", "examples", - "rustfmt.toml", - "www" + "www", + "rustfmt.toml" ] [features] dev-docs = [] [dependencies] parking_lot = { version = "0.12.1" } sigq = { version = "0.13.3" } +swctx = { version = "0.2.1" } [dev-dependencies] criterion = { version = "0.5.1", features = ["async_tokio"] } -tokio = { version = "1.29.1", features = ["full"] } +tokio = { version = "1.31.0", features = ["rt-multi-thread"] } [[bench]] name = "add_server" harness = false +[package.metadata.docs.rs] +rustdoc-args = ["--generate-link-to-definition"] + Index: src/client.rs ================================================================== --- src/client.rs +++ src/client.rs @@ -1,6 +1,6 @@ -use crate::{err::Error, rctx::InnerReplyContext, server::ServerQueueNode}; +use crate::{err::Error, server::ServerQueueNode}; /// Representation of a clonable client object. /// /// Each instantiation of a `Client` object is itself an isolated client with /// regards to the server context. By cloning a client a new independent @@ -51,45 +51,42 @@ // and stored in the context, and thus be reused for reach client call. // One side-effect is that some of the state semantics becomes more // complicated. // The central repo has such an implementation checked in, but it seems to // have some more corner cases that aren't properly handled. - let rctx = InnerReplyContext::new(); + let (sctx, wctx) = swctx::mkpair(); self .qpusher .push(ServerQueueNode { msg: out, - reply: rctx.clone() + reply: sctx }) .map_err(|_| Error::ServerDisappeared)?; - let reply = rctx.get()?; - - Ok(reply) + // Wait for a reply to arrive + Ok(wctx.wait()?) } #[deprecated(since = "0.10.2", note = "Use req() instead.")] pub fn send(&self, out: S) -> Result> { self.req(out) } /// Same as [`Client::req()`] but for use in `async` contexts. pub async fn areq(&self, out: S) -> Result> { - let rctx = InnerReplyContext::new(); + let (sctx, wctx) = swctx::mkpair(); self .qpusher .push(ServerQueueNode { msg: out, - reply: rctx.clone() + reply: sctx }) .map_err(|_| Error::ServerDisappeared)?; - let result = rctx.aget().await?; - - Ok(result) + Ok(wctx.wait_async().await?) } #[deprecated(since = "0.10.2", note = "Use areq() instead.")] pub async fn asend(&self, out: S) -> Result> { self.areq(out).await Index: src/err.rs ================================================================== --- src/err.rs +++ src/err.rs @@ -1,6 +1,8 @@ use std::fmt; + +use crate::rctx::RCtxState; /// Module-specific error codes. #[derive(Debug)] pub enum Error { /// The server object has shut down. @@ -50,20 +52,10 @@ } } impl std::error::Error for Error {} -impl From> for Error { - fn from(err: crate::rctx::Error) -> Self { - match err { - crate::rctx::Error::Aborted => Error::ServerDisappeared, - crate::rctx::Error::NoReply => Error::NoReply, - crate::rctx::Error::App(e) => Error::App(e) - } - } -} - impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Error::ServerDisappeared => write!(f, "Server disappeared"), Error::ClientsDisappeared => write!(f, "Clients disappeared"), @@ -70,7 +62,19 @@ Error::NoReply => write!(f, "Server didn't reply"), Error::App(err) => write!(f, "Application error; {:?}", err) } } } + +impl From> for Error { + fn from(err: swctx::Error) -> Self { + match err { + swctx::Error::Aborted(state) => match state { + RCtxState::Queued => Error::ServerDisappeared, + RCtxState::Active => Error::NoReply + }, + swctx::Error::App(e) => Error::App(e) + } + } +} // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/rctx.rs ================================================================== --- src/rctx.rs +++ src/rctx.rs @@ -3,16 +3,92 @@ //! notification. //! //! These are simple channels used to deliver data from one endpoint to //! another, where the receiver will block until data has been delivered. -mod err; -mod inner; - -pub mod public; - -pub(crate) use err::Error; -pub(crate) use inner::InnerReplyContext; - -pub use public::ReplyContext; +use crate::err::Error; + +#[derive(Clone, Default)] +pub(crate) enum RCtxState { + #[default] + Queued, + Active +} + +pub struct ReplyContext(swctx::SetCtx); + +impl ReplyContext { + /// Send a reply back to originating client. + /// + /// # Example + /// ``` + /// use std::thread; + /// use ump::channel; + /// + /// let (server, client) = channel::(); + /// let server_thread = thread::spawn(move || { + /// let (data, rctx) = server.wait().unwrap(); + /// let reply = format!("Hello, {}!", data); + /// rctx.reply(reply).unwrap(); + /// }); + /// let msg = String::from("Client"); + /// let reply = client.req(msg).unwrap(); + /// assert_eq!(reply, "Hello, Client!"); + /// server_thread.join().unwrap(); + /// ``` + /// + /// # Semantics + /// This call is safe to make after the server context has been released. + pub fn reply(self, data: T) -> Result<(), Error> { + self.0.set(data); + Ok(()) + } + + /// Return an error to originating client. + /// This will cause the calling client to return an error. The error passed + /// in the `err` parameter will be wrapped in a `Error::App(err)`. + /// + /// # Example + /// + /// ``` + /// use std::thread; + /// use ump::{channel, Error}; + /// + /// #[derive(Debug, PartialEq)] + /// enum MyError { + /// SomeError(String) + /// } + /// + /// let (server, client) = channel::(); + /// let server_thread = thread::spawn(move || { + /// let (_, rctx) = server.wait().unwrap(); + /// rctx.fail(MyError::SomeError("failed".to_string())).unwrap(); + /// }); + /// let msg = String::from("Client"); + /// let reply = client.req(msg); + /// match reply { + /// Err(Error::App(MyError::SomeError(s))) => { + /// assert_eq!(s, "failed"); + /// } + /// _ => { + /// panic!("Unexpected return value"); + /// } + /// } + /// server_thread.join().unwrap(); + /// ``` + /// + /// # Semantics + /// This call is safe to make after the server context has been released. + pub fn fail(self, err: E) -> Result<(), Error> { + self.0.fail(err); + Ok(()) + } +} + +impl From> for ReplyContext { + fn from(sctx: swctx::SetCtx) -> Self { + sctx.set_state(RCtxState::Active); + ReplyContext(sctx) + } +} // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : DELETED src/rctx/err.rs Index: src/rctx/err.rs ================================================================== --- src/rctx/err.rs +++ /dev/null @@ -1,30 +0,0 @@ -use std::fmt; - -/// Module-specific error codes. -#[derive(Debug)] -pub enum Error { - /// The reply was aborted. - Aborted, - - /// The public [`ReplyContext`] object is required to reply with a value. - /// If it does not the endpoint waiting to receive a value will abort and - /// return this error. - NoReply, - - /// An application-specific error occurred. - App(E) -} - -impl std::error::Error for Error {} - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Error::Aborted => write!(f, "Aborted call"), - Error::NoReply => write!(f, "Application failed to reply"), - Error::App(err) => write!(f, "Application error; {:?}", err) - } - } -} - -// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : DELETED src/rctx/inner.rs Index: src/rctx/inner.rs ================================================================== --- src/rctx/inner.rs +++ /dev/null @@ -1,229 +0,0 @@ -use std::{ - future::Future, - pin::Pin, - sync::Arc, - task::{Context, Poll, Waker} -}; - -use parking_lot::{Condvar, Mutex}; - -use crate::rctx::err::Error; - -pub(crate) enum State { - /// (Still) in queue, waiting to be picked up by the server. - Queued, - - /// Was picked up, but (still) waiting for a reply to arrive. - Waiting, - - /// Have a reply, but it wasn't delivered yet. - Item(I), - - /// The application returned an error. - AppErr(E), - - /// Reply is being returned to caller. - Finalized, - - /// The server never received the message; it was dropped while in the - /// queue. Most likely this means that the message was still in the queue - /// when the server was dropped. - Aborted, - - /// The message was received by the server, but its reply context was - /// released before sending back a reply. - NoReply -} - -pub struct InnerReplyContext { - pub(crate) signal: Arc, - pub(crate) data: Arc>>, - pub(crate) taskwaker: Arc>> -} - -impl InnerReplyContext { - /// Create a new reply context in "Queued" state. - pub(crate) fn new() -> Self { - InnerReplyContext { - signal: Arc::new(Condvar::new()), - data: Arc::new(Mutex::new(State::Queued)), - taskwaker: Arc::new(Mutex::new(None)) - } - } - - /// Store a reply and signal the originator that a reply has arrived. - pub fn put(&self, item: I) { - let mut mg = self.data.lock(); - *mg = State::Item(item); - drop(mg); - - self.signal_waiters(); - } - - /// Store an error and signal the originator that a result has arrived. - pub fn fail(&self, err: E) { - let mut mg = self.data.lock(); - *mg = State::AppErr(err); - drop(mg); - - self.signal_waiters(); - } - - pub(crate) fn signal_waiters(&self) { - let mut g = self.taskwaker.lock(); - if let Some(waker) = (*g).take() { - waker.wake(); - } - - self.signal.notify_one(); - } - - /// Retreive reply. If a reply has not arrived yet then enter a loop that - /// waits for a reply to arrive. - pub fn get(&self) -> Result> { - let mut mg = self.data.lock(); - - let ret = loop { - match &*mg { - State::Queued | State::Waiting => { - // Still waiting for server to report back with data - self.signal.wait(&mut mg); - continue; - } - State::Item(_msg) => { - // Set Finalized state and return item - if let State::Item(msg) = - std::mem::replace(&mut *mg, State::Finalized) - { - break Ok(msg); - } else { - // We're *really* in trouble if this happens .. - panic!("Unexpected state; not State::Item()"); - } - } - State::AppErr(_err) => { - // Set Finalized state and return error - if let State::AppErr(err) = - std::mem::replace(&mut *mg, State::Finalized) - { - break Err(Error::App(err)); - } else { - // We're *really* in trouble if this happens .. - panic!("Unexpected state; not State::AppErr()"); - } - } - State::Finalized => { - // We're *really* in trouble if this happens at this point .. - panic!("Unexpected state State::Finalized"); - } - State::Aborted => { - // Dropped while in queue - return Err(Error::Aborted); - } - State::NoReply => { - // Dropped after reply context was picked up, but before replying - return Err(Error::NoReply); - } - } - }; - drop(mg); - - ret - } - - pub fn aget(&self) -> WaitReplyFuture { - WaitReplyFuture::new(self) - } -} - -impl Clone for InnerReplyContext { - fn clone(&self) -> Self { - InnerReplyContext { - signal: Arc::clone(&self.signal), - data: Arc::clone(&self.data), - taskwaker: Arc::clone(&self.taskwaker) - } - } -} - -impl Drop for InnerReplyContext { - /// If the reply context never left the server queue before being destroyed - /// it means that the server has died. Signal this to the original caller - /// waiting for a reply. - fn drop(&mut self) { - let mut do_signal: bool = false; - let mut mg = self.data.lock(); - - if let State::Queued = *mg { - *mg = State::Aborted; - do_signal = true; - } - drop(mg); - if do_signal { - let mut g = self.taskwaker.lock(); - if let Some(waker) = (*g).take() { - waker.wake(); - } - self.signal.notify_one(); - } - } -} - - -pub struct WaitReplyFuture { - data: Arc>>, - waker: Arc>> -} - -impl WaitReplyFuture { - fn new(irctx: &InnerReplyContext) -> Self { - WaitReplyFuture { - data: Arc::clone(&irctx.data), - waker: Arc::clone(&irctx.taskwaker) - } - } -} - -impl Future for WaitReplyFuture { - type Output = Result>; - fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { - let mut state = self.data.lock(); - match &*state { - State::Queued | State::Waiting => { - let waker = ctx.waker().clone(); - let mut g = self.waker.lock(); - *g = Some(waker); - drop(g); - drop(state); - Poll::Pending - } - State::Item(_msg) => { - if let State::Item(msg) = - std::mem::replace(&mut *state, State::Finalized) - { - Poll::Ready(Ok(msg)) - } else { - // We're *really* in trouble if this happens .. - panic!("Unexpected state; not State::Item()"); - } - } - State::AppErr(_err) => { - if let State::AppErr(err) = - std::mem::replace(&mut *state, State::Finalized) - { - Poll::Ready(Err(Error::App(err))) - } else { - // We're *really* in trouble if this happens .. - panic!("Unexpected state; not State::App()"); - } - } - State::Finalized => { - panic!("Unexpected state"); - } - State::Aborted => Poll::Ready(Err(Error::Aborted)), - State::NoReply => Poll::Ready(Err(Error::NoReply)) - } - } -} - -// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : DELETED src/rctx/public.rs Index: src/rctx/public.rs ================================================================== --- src/rctx/public.rs +++ /dev/null @@ -1,149 +0,0 @@ -use crate::{ - rctx::{inner::State, InnerReplyContext}, - Error -}; - -/// Context used to transmit a reply back to the originating requester. -#[cfg_attr( - feature = "dev-docs", - doc = r#" -# Internals -Public-facing sender part of the `ReplyContext` object. - -This, as opposed to `InnerReplyContext`, is safe to pass to applications that -are meant to only be able to put a value through the `ReplyContext` channel, -but not extract the value from it. -"# -)] -pub struct ReplyContext { - inner: InnerReplyContext, - did_handover: bool -} - -impl ReplyContext { - /// Send a reply back to originating client. - /// - /// # Example - /// ``` - /// use std::thread; - /// use ump::channel; - /// - /// let (server, client) = channel::(); - /// let server_thread = thread::spawn(move || { - /// let (data, rctx) = server.wait().unwrap(); - /// let reply = format!("Hello, {}!", data); - /// rctx.reply(reply).unwrap(); - /// }); - /// let msg = String::from("Client"); - /// let reply = client.req(msg).unwrap(); - /// assert_eq!(reply, "Hello, Client!"); - /// server_thread.join().unwrap(); - /// ``` - /// - /// # Semantics - /// This call is safe to make after the server context has been released. - pub fn reply(mut self, data: I) -> Result<(), Error> { - self.inner.put(data); - - self.did_handover = true; - - Ok(()) - } - - /// Return an error to originating client. - /// This will cause the calling client to return an error. The error passed - /// in the `err` parameter will be wrapped in a `Error::App(err)`. - /// - /// # Example - /// - /// ``` - /// use std::thread; - /// use ump::{channel, Error}; - /// - /// #[derive(Debug, PartialEq)] - /// enum MyError { - /// SomeError(String) - /// } - /// - /// let (server, client) = channel::(); - /// let server_thread = thread::spawn(move || { - /// let (_, rctx) = server.wait().unwrap(); - /// rctx.fail(MyError::SomeError("failed".to_string())).unwrap(); - /// }); - /// let msg = String::from("Client"); - /// let reply = client.req(msg); - /// match reply { - /// Err(Error::App(MyError::SomeError(s))) => { - /// assert_eq!(s, "failed"); - /// } - /// _ => { - /// panic!("Unexpected return value"); - /// } - /// } - /// server_thread.join().unwrap(); - /// ``` - /// - /// # Semantics - /// This call is safe to make after the server context has been released. - pub fn fail(mut self, err: E) -> Result<(), Error> { - self.inner.fail(err); - - self.did_handover = true; - - Ok(()) - } -} - -impl Drop for ReplyContext { - /// If the reply context is dropped while still waiting for a reply then - /// report back to the caller that it should expect no reply. - fn drop(&mut self) { - if !self.did_handover { - let mut do_signal: bool = false; - let mut mg = self.inner.data.lock(); - - if let State::Waiting = *mg { - *mg = State::NoReply; - do_signal = true; - } - drop(mg); - if do_signal { - let mut g = self.inner.taskwaker.lock(); - if let Some(waker) = (*g).take() { - waker.wake(); - } - - self.inner.signal.notify_one(); - } - } - } -} - -impl From> for ReplyContext { - /// Transform an internal reply context into a public one and change the - /// state from Queued to Waiting to signal that the node has left the - /// queue. - fn from(inner: InnerReplyContext) -> Self { - // Switch state from "Queued" to "Waiting", to mark that the reply context - // has been "picked up". - let mut mg = inner.data.lock(); - match *mg { - State::Queued => { - *mg = State::Waiting; - drop(mg); - } - _ => { - // Should never happen - drop(mg); - panic!("Unexpected node state."); - } - } - - ReplyContext { - inner: inner.clone(), - did_handover: false - } - } -} - -// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/server.rs ================================================================== --- src/server.rs +++ src/server.rs @@ -1,15 +1,16 @@ -use crate::rctx::{InnerReplyContext, ReplyContext}; - -use crate::err::Error; +use crate::{ + err::Error, + rctx::{RCtxState, ReplyContext} +}; pub(crate) struct ServerQueueNode { /// Raw message being sent from the client to the server. pub(crate) msg: S, /// Keep track of data needed to share reply data. - pub(crate) reply: InnerReplyContext + pub(crate) reply: swctx::SetCtx } /// Representation of a server object. /// /// Each instantiation of a [`Server`] object represents an end-point which Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -3,17 +3,26 @@ ## [Unreleased] ### Added ### Changed + +### Removed + + +## [0.12.0] - 2023-08-15 + +### Changed - Include tests when publishing crate. - Bugfix: Use `err::Error` rather than `rctx::err::Error` in rctx::public, giving `ReplyContext::reply()` and `ReplyContext::fail()` the correct return types. - -### Removed +- Use the `swctx` crate for sending back the reply rather than use a custom + in-tree implementation. +- Update `edition` to `2021` and `rust-version` to `1.56`. +- Add `--generate-link-to-definition` to `rustdoc-args` in `Cargo.toml` ## [0.11.0] - 2023-07-29 ### Changed @@ -33,11 +42,11 @@ - Add a `dev-docs` feature to allow internal documentation notes to be included in generated documentation. ### Changed -- Rename `send()`/`asend()` to `req()/`areq()`. +- Rename `send()`/`asend()` to `req()`/`areq()`. ## [0.10.1] - 2023-07-27 ### Changed Index: www/index.md ================================================================== --- www/index.md +++ www/index.md @@ -9,5 +9,11 @@ The details of changes can always be found in the timeline, but for a high-level view of changes between released versions there's a manually maintained [Change Log](./changelog.md). + +## Project Status + +_ump_ is in _maintenance mode_; it is feature-complete, but will receive +bugfixes and improvements to implementation/documentation. +