Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,20 +1,18 @@ [package] name = "ump" -version = "0.12.0" +version = "0.12.1" edition = "2021" license = "0BSD" categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/ump" description = "Micro message passing library for threads/tasks communication." rust-version = "1.56" - -# Can't exclude "benches", because the [[bench]] section will fail. exclude = [ - ".efiles", ".fossil-settings", + ".efiles", ".fslckout", "examples", "www", "rustfmt.toml" ] @@ -22,19 +20,19 @@ [features] dev-docs = [] [dependencies] parking_lot = { version = "0.12.1" } -sigq = { version = "0.13.3" } +sigq = { version = "0.13.4" } swctx = { version = "0.2.1" } [dev-dependencies] criterion = { version = "0.5.1", features = ["async_tokio"] } -tokio = { version = "1.31.0", features = ["rt-multi-thread"] } +tokio = { version = "1.32.0", features = ["rt-multi-thread"] } [[bench]] name = "add_server" harness = false [package.metadata.docs.rs] rustdoc-args = ["--generate-link-to-definition"] Index: src/client.rs ================================================================== --- src/client.rs +++ src/client.rs @@ -1,21 +1,19 @@ use crate::{err::Error, server::ServerQueueNode}; -/// Representation of a clonable client object. +use super::rctx::RCtxState; + +/// Representation of a clonable client object that can issue requests to +/// [`Server`](super::Server) objects. /// /// Each instantiation of a `Client` object is itself an isolated client with /// regards to the server context. By cloning a client a new independent /// client is created. ("Independent" here meaning that it is still tied to /// the same server object, but the new client can be passed to a separate /// thread and can independently make calls to the server). -pub struct Client { - /// Weak reference to server queue. - /// - /// The server context holds the only strong reference to the queue. This - /// allows the clients to detect when the server has terminated. - pub(crate) qpusher: sigq::Pusher> -} +#[repr(transparent)] +pub struct Client(pub(crate) sigq::Pusher>); impl Client where R: 'static + Send, E: 'static + Send @@ -54,20 +52,81 @@ // The central repo has such an implementation checked in, but it seems to // have some more corner cases that aren't properly handled. let (sctx, wctx) = swctx::mkpair(); self - .qpusher + .0 .push(ServerQueueNode { msg: out, reply: sctx }) .map_err(|_| Error::ServerDisappeared)?; // Wait for a reply to arrive Ok(wctx.wait()?) } + + /// Issue a request, immediately returning a context that is used to wait for + /// the server's reply. + /// + /// The `_async` naming is slightly misleading -- this method isn't an + /// `async` in a language/`Future` sense, but rather it doesn't block and + /// wait for a reply before returning. Instead it returns a [`WaitReply`] + /// object that is used to wait for the reply. + /// + /// This can be useful (in place of [`req()`](Client::req) or + /// [`areq()`](Client::areq()) methods) if the caller knows that the server + /// will take some time to respond to the request and the caller has other + /// tasks it can perform in the meantime. + /// + /// ``` + /// use std::thread; + /// + /// use ump::channel; + /// + /// let (server, client) = channel::(); + /// + /// let server_thread = thread::spawn(move || { + /// // Wait for data to arrive from a client + /// println!("Server waiting for message .."); + /// let (data, mut rctx) = server.wait().unwrap(); + /// + /// println!("Server received: '{}'", data); + /// + /// // Long processing of data from client + /// + /// // Reply to client + /// let reply = format!("Hello, {}!", data); + /// println!("Server replying '{}'", reply); + /// rctx.reply(reply); + /// + /// println!("Server done"); + /// }); + /// + /// let msg = String::from("Client"); + /// println!("Client sending '{}'", msg); + /// let wrctx = client.req_async(msg).unwrap(); + /// + /// // .. perform some operation while server is processing the request .. + /// + /// let reply = wrctx.wait().unwrap(); + /// println!("Client received reply '{}'", reply); + /// println!("Client done"); + /// + /// server_thread.join().unwrap(); + /// ``` + pub fn req_async(&self, out: S) -> Result, Error> { + let (sctx, wctx) = swctx::mkpair(); + self + .0 + .push(ServerQueueNode { + msg: out, + reply: sctx + }) + .map_err(|_| Error::ServerDisappeared)?; + Ok(WaitReply(wctx)) + } #[deprecated(since = "0.10.2", note = "Use req() instead.")] pub fn send(&self, out: S) -> Result> { self.req(out) } @@ -75,11 +134,11 @@ /// Same as [`Client::req()`] but for use in `async` contexts. pub async fn areq(&self, out: S) -> Result> { let (sctx, wctx) = swctx::mkpair(); self - .qpusher + .0 .push(ServerQueueNode { msg: out, reply: sctx }) .map_err(|_| Error::ServerDisappeared)?; @@ -89,10 +148,15 @@ #[deprecated(since = "0.10.2", note = "Use areq() instead.")] pub async fn asend(&self, out: S) -> Result> { self.areq(out).await } + + /// Create a weak `Client` reference. + pub fn weak(&self) -> WeakClient { + WeakClient(self.0.weak()) + } } impl Clone for Client { /// Clone a client. @@ -102,12 +166,77 @@ /// /// This means that a cloned client can be passed to a new thread/task and /// make new independent calls to the server without any risk of collision /// between clone and the original client object. fn clone(&self) -> Self { - Client { - qpusher: self.qpusher.clone() - } + Client(self.0.clone()) + } +} + +/// Context used to wait for a server to reply to a request. +pub struct WaitReply(swctx::WaitCtx); + +impl WaitReply { + /// Block and wait for a reply. + /// + /// For use in non-`async` threads. + pub fn wait(self) -> Result> { + Ok(self.0.wait()?) + } + + /// Block and wait for a reply. + /// + /// For use in `async` tasks. + pub async fn wait_async(self) -> Result> { + Ok(self.0.wait_async().await?) + } +} + + +/// A weak client reference that can be upgraded to a [`Client`] as long as +/// other `Client` objects till exist. +#[repr(transparent)] +pub struct WeakClient( + pub(crate) sigq::WeakPusher> +); + +impl Clone for WeakClient { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl WeakClient { + /// Upgrade a `WeakClient` to a [`Client`]. + /// + /// If no strong `Client` objects still exist then `None` is returned. + /// + /// # Examples + /// + /// Upgrading a weak client while stong clients exists works: + /// ``` + /// use ump::{channel, Error}; + /// + /// let (server, client) = channel::(); + /// let weak_client = client.weak(); + /// let Some(client2) = weak_client.upgrade() else { + /// panic!("Unable to upgrade weak client"); + /// }; + /// ``` + /// + /// Upgrading a weak client when no stong clients exists fails: + /// ``` + /// use ump::{channel, Error}; + /// + /// let (server, client) = channel::(); + /// let weak_client = client.weak(); + /// drop(client); + /// let Some(_) = weak_client.upgrade() else { + /// panic!("Unexpectedly able to upgrade weak client"); + /// }; + /// ``` + pub fn upgrade(&self) -> Option> { + self.0.upgrade().map(|x| Client(x)) } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -6,30 +6,29 @@ //! The primary purpose of ump is to create simple RPC like designs, but //! between threads/tasks within a process rather than between processes over //! networks. //! //! # High-level usage overview -//! An application calls [`channel`] to create a linked pair of a [`Server`] -//! and a [`Client`]. -//! -//! The server calls [`Server::wait()`]/[`Server::async_wait()`], which -//! blocks and waits for an incoming message from a client. -//! -//! A client, in a separate thread or task, calls -//! [`Client::req()`]/[`Client::areq()`] to send a message to the server. -//! -//! The server's wait call returns two objects: The message sent by the -//! client, and a [`ReplyContext`]. -//! -//! After processing its application-defined message, the server *must* call -//! the [`ReplyContext::reply()`] on the returned reply context object to -//! return a reply message to the client. -//! -//! Typically the server calls wait again to wait for next message from a -//! client. -//! -//! The client receives the reply from the server and processes it. +//! 1. An application calls [`channel`] to create a linked pair of a [`Server`] +//! and a [`Client`]. +//! 2. The server calls [`Server::wait()`]/[`Server::async_wait()`], which +//! blocks and waits for an incoming message from a client. +//! 3. A client, in a separate thread or task, sends a message to the server +//! and wait for a reply using: +//! - [`Client::req()`] for non-`async` contexts. +//! - [`Client::areq()`] to `async` contexts. +//! - [`Client::req_async()`] (and wait for a reply using the returned +//! [`WaitReply`]) +//! 4. The server's wait call returns two objects: The message sent by the +//! client, and a [`ReplyContext`]. +//! 5. After processing its application-defined message, the server *must* call +//! the [`ReplyContext::reply()`] on the returned reply context object to +//! return a reply message to the client. +//! +//! Typically the server calls wait again to wait for next message from a +//! client. +//! 6. The client receives the reply from the server and processes it. //! //! # Example //! ``` //! use std::thread; //! @@ -73,11 +72,11 @@ //! describe some semantics that you can rely on, and others that you should be //! careful about relying on. //! //! ## Stable invariants //! -//! These are behaviors which should not change in future versions. +//! Semantics that should not change in future versions. //! //! - The reply contexts are independent of the `Server` context. This has //! some useful implications for server threads that spawn separate threads //! to process messages and return replies: *The server can safely terminate //! while there are clients waiting for replies* (implied: the server can @@ -88,13 +87,13 @@ //! - Any permutation of sync/async server/clients can be combined. `async` //! code must use the async method variants when available. //! //! ## Unstable invariants //! -//! These are invariants you can trust will work in the current version, but -//! they exist merely as a side-effect of the current implementation. Avoid -//! relying on these if possible. +//! Semantics you can trust will work in the current version, but they exist +//! merely as a side-effect of the current implementation. Avoid relying on +//! these if possible. //! //! - A single client can be used from two different threads. If a `Client` //! object in placed in an Arc, is cloned and passed to another thread/task //! then both the clone and the original can be used simultaneously. In the //! future this may not be allowed. It is recommended that a new clone of the @@ -105,11 +104,15 @@ mod rctx; mod server; pub use err::Error; -pub use crate::{client::Client, rctx::ReplyContext, server::Server}; +pub use crate::{ + client::{Client, WaitReply, WeakClient}, + rctx::ReplyContext, + server::Server +}; /// Create a pair of linked [`Server`] and [`Client`] objects. /// /// The [`Server`] object is used to wait for incoming messages from connected /// clients. Once a message arrives it must reply to it using a @@ -128,13 +131,12 @@ /// that clients will receive from the server. The `E` type parameter can be /// used to return application specific errors from the server to the client. pub fn channel() -> (Server, Client) { let (qpusher, qpuller) = sigq::new(); - let server = Server { qpuller }; - - let client = Client { qpusher }; + let server = Server(qpuller); + let client = Client(qpusher); (server, client) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/rctx.rs ================================================================== --- src/rctx.rs +++ src/rctx.rs @@ -12,10 +12,12 @@ #[default] Queued, Active } +/// Object used to respond to requests that have been received by a +/// [`Server`](super::Server). pub struct ReplyContext(swctx::SetCtx); impl ReplyContext { /// Send a reply back to originating client. /// Index: src/server.rs ================================================================== --- src/server.rs +++ src/server.rs @@ -14,13 +14,12 @@ /// Representation of a server object. /// /// Each instantiation of a [`Server`] object represents an end-point which /// will be used to receive messages from connected [`Client`](crate::Client) /// objects. -pub struct Server { - pub(crate) qpuller: sigq::Puller> -} +#[repr(transparent)] +pub struct Server(pub(crate) sigq::Puller>); impl Server where S: 'static + Send, R: 'static + Send, @@ -31,11 +30,11 @@ /// /// Returns the message sent by the client and a reply context. The server /// must call [`ReplyContext::reply()`] on the reply context to pass a return /// value to the client. pub fn wait(&self) -> Result<(S, ReplyContext), Error> { - let node = self.qpuller.pop().map_err(|_| Error::ClientsDisappeared)?; + let node = self.0.pop().map_err(|_| Error::ClientsDisappeared)?; // Extract the data from the node let msg = node.msg; // Create an application reply context from the reply context in the queue @@ -42,18 +41,34 @@ // Implicitly changes state of the reply context from Queued to Waiting let rctx = ReplyContext::from(node.reply); Ok((msg, rctx)) } + + /// Take next next message off queue or return `None` is queue is empty. + #[allow(clippy::type_complexity)] + pub fn try_pop(&self) -> Result)>, Error> { + let node = self.0.try_pop().map_err(|_| Error::ClientsDisappeared)?; + + if let Some(node) = node { + // Extract the data from the node + let msg = node.msg; + + // Create an application reply context from the reply context in the + // queue Implicitly changes state of the reply context from Queued + // to Waiting + let rctx = ReplyContext::from(node.reply); + + Ok(Some((msg, rctx))) + } else { + Ok(None) + } + } /// Same as [`Server::wait()`], but for use in an `async` context. pub async fn async_wait(&self) -> Result<(S, ReplyContext), Error> { - let node = self - .qpuller - .apop() - .await - .map_err(|_| Error::ClientsDisappeared)?; + let node = self.0.apop().await.map_err(|_| Error::ClientsDisappeared)?; // Extract the data from the node let msg = node.msg; // Create an application reply context from the reply context in the queue @@ -65,10 +80,10 @@ /// Returns a boolean indicating whether the queue is/was empty. This isn't /// really useful unless used in very specific situations. It mostly exists /// for test cases. pub fn was_empty(&self) -> bool { - self.qpuller.was_empty() + self.0.was_empty() } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED tests/clnt_disappear.rs Index: tests/clnt_disappear.rs ================================================================== --- /dev/null +++ tests/clnt_disappear.rs @@ -0,0 +1,31 @@ +use ump::{channel, Error}; + +#[test] +fn sync_expect_err_if_server_dropped() { + let (server, client) = channel::(); + + // nuke the only client + drop(client); + + let Err(Error::ClientsDisappeared) = server.wait() else { + panic!("Unexpected error"); + }; +} + +#[test] +fn async_expect_err_if_server_dropped() { + let tokrt = tokio::runtime::Runtime::new().unwrap(); + + let (server, client) = channel::(); + + // nuke the only client + drop(client); + + tokrt.block_on(async { + let Err(Error::ClientsDisappeared) = server.async_wait().await else { + panic!("Unexpected error"); + }; + }); +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : DELETED tests/queue_cleanup.rs Index: tests/queue_cleanup.rs ================================================================== --- tests/queue_cleanup.rs +++ /dev/null @@ -1,63 +0,0 @@ -// Make sure that the InnerReplyContext aborts on Drop if object is still -// queued. -use std::{thread, time}; - -use ump::{channel, Error}; - -#[test] -fn sync_expect_server_death() { - let (server, client) = channel::(); - - let server_thread = thread::spawn(move || { - // Should be doing something more robust .. - let one_second = time::Duration::from_secs(1); - thread::sleep(one_second); - drop(server); - }); - - let msg = String::from("Client"); - let reply = client.req(msg); - match reply { - Err(Error::ServerDisappeared) => { - // This is the expected error - } - _ => { - panic!("Unexpected return value"); - } - } - - server_thread.join().unwrap(); -} - - -#[test] -fn async_expect_server_death() { - let tokrt = tokio::runtime::Runtime::new().unwrap(); - - let (server, client) = channel::(); - - let server_thread = thread::spawn(move || { - // Should be doing something more robust .. - let one_second = time::Duration::from_secs(1); - thread::sleep(one_second); - drop(server); - }); - - tokrt.block_on(async { - let msg = String::from("Client"); - let reply = client.areq(msg).await; - //let reply = client.req(msg); - match reply { - Err(Error::ServerDisappeared) => { - // This is the expected error - } - _ => { - panic!("Unexpected return value"); - } - } - }); - - server_thread.join().unwrap(); -} - -// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED tests/svc_disappear.rs Index: tests/svc_disappear.rs ================================================================== --- /dev/null +++ tests/svc_disappear.rs @@ -0,0 +1,101 @@ +//! Tests for cases that would case the client to return +//! `Error::ServerDisappeared`: +//! - When a client issues a request after the server end-point has already +//! been dropped. +//! - If a server end-point is dropped while there are requests in the queue, +//! all those requests should abort with a `ServerDisappeared` error. + +use std::{thread, time}; + +use ump::{channel, Error}; + +/// Issuing a request should fail immediately if the server end-point has been +/// dropped. +#[test] +fn sync_expect_err_if_server_dropped() { + let (server, client) = channel::(); + + // nuke the server end-point + drop(server); + + let msg = String::from("Client"); + let reply = client.req(msg); + let Err(Error::ServerDisappeared) = reply else { + panic!("Unexpected return value"); + }; +} + +/// Issuing a request should fail immediately if the server end-point has been +/// dropped. +#[test] +fn async_expect_err_if_server_dropped() { + let tokrt = tokio::runtime::Runtime::new().unwrap(); + + let (server, client) = channel::(); + + // nuke the server end-point + drop(server); + + tokrt.block_on(async { + let msg = String::from("Client"); + let reply = client.areq(msg).await; + let Err(Error::ServerDisappeared) = reply else { + panic!("Unexpected return value"); + }; + }); +} + +/// If a request is still in the queue when the server end-point is dropped, +/// the client shoull return `Error::ServerDisappeared` +#[test] +fn sync_expect_err_if_queue_dropped() { + let (server, client) = channel::(); + + // Don't actually take any requests off queue -- just terminate the server + // end-point. + let server_thread = thread::spawn(move || { + // Should be doing something more robust .. + let one_second = time::Duration::from_secs(1); + thread::sleep(one_second); + drop(server); + }); + + let msg = String::from("Client"); + let reply = client.req(msg); + let Err(Error::ServerDisappeared) = reply else { + panic!("Unexpected return value"); + }; + + server_thread.join().unwrap(); +} + +/// If a request is still in the queue when the server end-point is dropped, +/// the client shoull return `Error::ServerDisappeared` +#[test] +fn async_expect_err_if_queue_dropped() { + let tokrt = tokio::runtime::Runtime::new().unwrap(); + + let (server, client) = channel::(); + + // Don't actually take any requests off queue -- just terminate the server + // end-point. + let server_thread = thread::spawn(move || { + // Should be doing something more robust .. + let one_second = time::Duration::from_secs(1); + thread::sleep(one_second); + drop(server); + }); + + tokrt.block_on(async { + let msg = String::from("Client"); + let reply = client.areq(msg).await; + + let Err(Error::ServerDisappeared) = reply else { + panic!("Unexpected return value"); + }; + }); + + server_thread.join().unwrap(); +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED tests/weak.rs Index: tests/weak.rs ================================================================== --- /dev/null +++ tests/weak.rs @@ -0,0 +1,53 @@ +use ump::{channel, Error}; + +#[test] +fn sync_weak_should_not_count() { + let (server, client) = channel::(); + + let _weak_client = client.weak(); + + // nuke the only (strong) client + drop(client); + + let Err(Error::ClientsDisappeared) = server.wait() else { + panic!("Unexpected error"); + }; +} + +#[test] +fn async_weak_should_not_count() { + let tokrt = tokio::runtime::Runtime::new().unwrap(); + + let (server, client) = channel::(); + + let _weak_client = client.weak(); + + // nuke the only client + drop(client); + + tokrt.block_on(async { + let Err(Error::ClientsDisappeared) = server.async_wait().await else { + panic!("Unexpected error"); + }; + }); +} + +#[test] +fn upgraded_should_count() { + let (server, client) = channel::(); + + let weak_client = client.weak(); + + let Some(_client2) = weak_client.upgrade() else { + panic!("Unable to upgrade weak_client"); + }; + + // nuke original client + drop(client); + + let Ok(None) = server.try_pop() else { + panic!("Unexpected error"); + }; +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -6,12 +6,27 @@ ### Changed ### Removed +--- + +## [0.12.1] - 2023-10-02 + +### Added + +- Add `Client::req_async()`. +- Add `Server::try_pop()`. +- `Client` objects can spawn downgraded to `WeakClient` objects, that in turn + can be upgraded to `Client` objects (as long as all the strong `Client` + objects have not been dropped). + +--- ## [0.12.0] - 2023-08-15 + +[Details](https://repos.qrnch.tech/pub/ump/vdiff?from=ump-0.11.0&to=ump-0.12.0) ### Changed - Include tests when publishing crate. - Bugfix: Use `err::Error` rather than `rctx::err::Error` in rctx::public, @@ -20,22 +35,28 @@ - Use the `swctx` crate for sending back the reply rather than use a custom in-tree implementation. - Update `edition` to `2021` and `rust-version` to `1.56`. - Add `--generate-link-to-definition` to `rustdoc-args` in `Cargo.toml` +--- ## [0.11.0] - 2023-07-29 + +[Details](https://repos.qrnch.tech/pub/ump/vdiff?from=ump-0.10.2&to=ump-0.11.0) ### Changed - Include tests when publishing crate. - Bugfix: Use `err::Error` rather than `rctx::err::Error` in rctx::public, giving `ReplyContext::reply()` and `ReplyContext::fail()` the correct return types. +--- ## [0.10.2] - 2023-07-28 + +[Details](https://repos.qrnch.tech/pub/ump/vdiff?from=ump-0.10.1&to=ump-0.10.2) ### Added - Add `send()`/`asend()` wrappers around the new `req()`/`areq()` methods with a deprecation notice. @@ -44,20 +65,26 @@ ### Changed - Rename `send()`/`asend()` to `req()`/`areq()`. +--- ## [0.10.1] - 2023-07-27 + +[Details](https://repos.qrnch.tech/pub/ump/vdiff?from=ump-0.10.0&to=ump-0.10.1) ### Changed - Runtime dependencies: - Updated `sigq` to `0.13.3`. +--- ## [0.10.0] - 2023-07-26 + +[Details](https://repos.qrnch.tech/pub/ump/vdiff?from=ump-0.9.0&to=ump-0.10.0) ### Added - Server's receive methods will fail with `Error::ClientsDisappeared` if all the associated Client objects have been dropped. @@ -67,10 +94,11 @@ - Runtime dependencies: - Updated `sigq` to `0.13.2`. - Development dependencies: - Updated `criterion` to `0.5.1` +--- ## [0.9.0] - 2022-09-09 ### Added