Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "ump-ng-server" -version = "0.1.2" +version = "0.2.0" edition = "2021" license = "0BSD" categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/ump-ng-server" @@ -25,9 +25,12 @@ # "net" is added as a temporary workaround. Without it building the docs fail # in tokio. tokio = { version = "1.35.1", features = ["net", "rt"], optional = true } ump-ng = { version = "0.1.0" } +[dev-dependencies] +tokio-test = { version = "0.4.3" } + [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"] Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -1,7 +1,8 @@ -//! _ump-ng-server_ is an abstraction on top of [`ump-ng`] that is used to hide -//! boilerplate code used to implement intra-process message passing servers. +//! _ump-ng-server_ is an abstraction on top of [`ump-ng`](ump_ng) that is used +//! to hide boilerplate code used to implement intra-process message passing +//! servers. //! //! # Dispatch loop //! The core functionality of _ump-ng-server_ is a dispatch loop, whose role it //! is to pull messages off the message queue and pass them to the //! application-supplied message handler. @@ -31,10 +32,23 @@ //! //! # Application message handlers //! Message handlers are implemented using the [`thread::Handler`] trait (for //! the threaded dispatch loop) and [`task::Handler`] (for the async dispatch //! loop). +//! +//! There are cases where the handler needs to store a clone of the client +//! end-point of the message passing channel used to issue requests to the +//! server (so that message handlers can issue new requests). In order to +//! facilitate this, the application must pass a `Handler`-construction closure +//! to `spawn()`. The closure will be called after the message passing channel +//! has been created so it can be passed a reference to the client end-point. +//! +//! If the dispatch loop should terminate once all the application's client +//! end-points have been dropped, then the handler can store a [`WeakClient`] +//! instead (as storing a cloned [`Client`] object will preventing the dispatch +//! loop from terminating due to all clients being lost). The examples in the +//! [`task`] and [`thread`] modules illustrate how to do this. #![cfg_attr(docsrs, feature(doc_cfg))] #[cfg(feature = "tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] @@ -44,22 +58,16 @@ pub use ump_ng::{ self, channel, Client, MsgType, ReplyContext, Server, WeakClient }; -pub use thread::{ - spawn as spawn_thread, spawn_preinit as spawn_thread_preinit, - Handler as ThreadedHandler -}; +pub use thread::{spawn as spawn_thread, Handler as ThreadedHandler}; #[cfg(feature = "tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] -pub use task::{ - spawn as spawn_task, spawn_preinit as spawn_task_preinit, - Handler as AsyncHandler -}; +pub use task::{spawn as spawn_task, Handler as AsyncHandler}; #[cfg(feature = "tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] pub use async_trait::async_trait; // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/task.rs ================================================================== --- src/task.rs +++ src/task.rs @@ -1,12 +1,85 @@ +//! ump-ng dispatch server running on a thread. +//! +//! # Example +//! ``` +//! # tokio_test::block_on(async { +//! use std::ops::ControlFlow; +//! use ump_ng_server::{ +//! async_trait, +//! task::{Handler, spawn}, +//! ump_ng::{ReplyContext, WeakClient} +//! }; +//! +//! enum Post { +//! ShoutIntoVoid +//! } +//! enum Request { +//! Add(usize, usize) +//! } +//! enum Reply { +//! Sum(usize) +//! } +//! #[derive(Debug)] +//! enum MyError { } +//! +//! struct MyHandler { +//! wclnt: WeakClient +//! } +//! #[async_trait] +//! impl Handler for MyHandler { +//! async fn post(&mut self, msg: Post) -> ControlFlow<(), ()> { +//! match msg { +//! Post::ShoutIntoVoid => { +//! // No reply .. but keep on trudging on +//! ControlFlow::Continue(()) +//! } +//! } +//! } +//! async fn req( +//! &mut self, +//! msg: Request, +//! rctx: ReplyContext +//! ) -> ControlFlow<(), ()> { +//! match msg { +//! Request::Add(a, b) => { +//! rctx.reply(Reply::Sum(a+b)).unwrap(); +//! ControlFlow::Continue(()) +//! } +//! } +//! } +//! } +//! +//! let (clnt, jh) = spawn(|clnt| { +//! // Store a weak client in the handler so it doesn't keep the dispatch +//! // loop alive when the Client returned to the application is dropped. +//! MyHandler { +//! wclnt: clnt.weak() +//! } +//! }); +//! +//! clnt.post(Post::ShoutIntoVoid).unwrap(); +//! +//! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else { +//! panic!("Unexpected reply"); +//! }; +//! assert_eq!(sum, 10); +//! +//! // drop client to force dispatch loop to terminate +//! drop(clnt); +//! +//! jh.await; +//! # }); +//! ``` + use std::ops::ControlFlow; use tokio::task::{self, JoinHandle}; use async_trait::async_trait; -use super::{channel, Client, MsgType, ReplyContext, Server}; +use super::{channel, Client, MsgType, ReplyContext}; #[async_trait] pub trait Handler { /// Optional initialization callback. /// @@ -45,21 +118,25 @@ /// Launch a task that will process incoming messages from an ump-ng server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). -pub fn spawn( - mut handler: impl Handler + Send + 'static +pub fn spawn( + hbldr: impl FnOnce(&Client) -> F ) -> (Client, JoinHandle>) where P: 'static + Send, S: 'static + Send, R: 'static + Send, E: 'static + Send, - RV: 'static + Send + RV: 'static + Send, + F: Handler + Send + 'static { let (server, client) = channel(); + + let mut handler = hbldr(&client); + let weak_client = client.weak(); let jh = task::spawn(async move { handler.init(weak_client); let ret = loop { match server.async_wait().await { @@ -80,42 +157,6 @@ }); (client, jh) } - -/// Spawn a task to run a pre-initialized handler. -/// -/// It is assumed that the caller has initialized the handler, thus its -/// `init()` method will not be called. -pub fn spawn_preinit( - server: Server, - mut handler: impl Handler + Send + 'static -) -> JoinHandle> -where - P: 'static + Send, - S: 'static + Send, - R: 'static + Send, - E: 'static + Send, - RV: 'static + Send -{ - task::spawn(async move { - let ret = loop { - match server.async_wait().await { - Ok(msg) => match msg { - MsgType::Put(m) => match handler.post(m).await { - ControlFlow::Continue(_) => {} - ControlFlow::Break(rv) => break Some(rv) - }, - MsgType::Request(m, rctx) => match handler.req(m, rctx).await { - ControlFlow::Continue(_) => {} - ControlFlow::Break(rv) => break Some(rv) - } - }, - Err(_) => break None - } - }; - handler.term(ret) - }) -} - // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/thread.rs ================================================================== --- src/thread.rs +++ src/thread.rs @@ -1,11 +1,77 @@ +//! ump-ng dispatch server running on a thread. +//! +//! # Example +//! ``` +//! use std::ops::ControlFlow; +//! use ump_ng_server::{ +//! thread::{Handler, spawn}, +//! ump_ng::{ReplyContext, WeakClient} +//! }; +//! +//! enum Post { +//! ShoutIntoVoid +//! } +//! enum Request { +//! Add(usize, usize) +//! } +//! enum Reply { +//! Sum(usize) +//! } +//! #[derive(Debug)] +//! enum MyError { } +//! +//! struct MyHandler { +//! wclnt: WeakClient +//! } +//! impl Handler for MyHandler { +//! fn post(&mut self, msg: Post) -> ControlFlow<(), ()> { +//! match msg { +//! Post::ShoutIntoVoid => { +//! // No reply .. but keep on trudging on +//! ControlFlow::Continue(()) +//! } +//! } +//! } +//! fn req(&mut self, msg: Request, rctx: ReplyContext) +//! -> ControlFlow<(), ()> { +//! match msg { +//! Request::Add(a, b) => { +//! rctx.reply(Reply::Sum(a+b)).unwrap(); +//! ControlFlow::Continue(()) +//! } +//! } +//! } +//! } +//! +//! let (clnt, jh) = spawn(|clnt| { +//! // Store a weak client in the handler so it doesn't keep the dispatch +//! // loop alive when the Client returned to the application is dropped. +//! MyHandler { +//! wclnt: clnt.weak() +//! } +//! }); +//! +//! clnt.post(Post::ShoutIntoVoid).unwrap(); +//! +//! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else { +//! panic!("Unexpected reply"); +//! }; +//! assert_eq!(sum, 10); +//! +//! // drop client to force dispatch loop to terminate +//! drop(clnt); +//! +//! jh.join(); +//! ``` + use std::{ ops::ControlFlow, thread::{self, JoinHandle} }; -use super::{channel, Client, MsgType, ReplyContext, Server}; +use super::{channel, Client, MsgType, ReplyContext}; pub trait Handler { /// Optional initialization callback. /// /// This is called on the dispatcher thread before the main message @@ -39,21 +105,25 @@ /// Launch a thread that will process incoming messages from an ump-ng server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). -pub fn spawn( - mut handler: impl Handler + Send + 'static +pub fn spawn( + hbldr: impl FnOnce(&Client) -> F ) -> (Client, JoinHandle>) where P: 'static + Send, S: 'static + Send, R: 'static + Send, E: 'static + Send, - RV: 'static + Send + RV: 'static + Send, + F: Handler + Send + 'static { let (server, client) = channel(); + + let mut handler = hbldr(&client); + let weak_client = client.weak(); let jh = thread::spawn(move || { handler.init(weak_client); let ret = loop { match server.wait() { @@ -74,71 +144,6 @@ }); (client, jh) } - -/// Spawn a thread to run a pre-initialized handler. -/// -/// It is assumed that the caller has initialized the handler, thus its -/// `init()` method will not be called. -/// -/// ``` -/// use std::ops::ControlFlow; -/// use ump_ng_server::{ump_ng, spawn_thread_preinit, ThreadedHandler, -/// ReplyContext}; -/// let (server, client) = ump_ng::channel::<(), (), (), ()>(); -/// -/// struct MyHandler { -/// wclnt: ump_ng::WeakClient<(), (), (), ()> -/// } -/// impl ThreadedHandler<(), (), (), (), ()> for MyHandler { -/// fn post(&mut self, _: ()) -> ControlFlow<(), ()> { -/// ControlFlow::Continue(()) -/// } -/// fn req(&mut self, _: (), rctx: ReplyContext<(), ()>) -/// -> ControlFlow<(), ()> { -/// ControlFlow::Continue(()) -/// } -/// } -/// let handler = MyHandler { -/// wclnt: client.weak() -/// }; -/// let jh = spawn_thread_preinit(server, handler); -/// -/// // drop client to force dispatch loop to terminate -/// drop(client); -/// -/// jh.join(); -/// ``` -pub fn spawn_preinit( - server: Server, - mut handler: impl Handler + Send + 'static -) -> JoinHandle> -where - P: 'static + Send, - S: 'static + Send, - R: 'static + Send, - E: 'static + Send, - RV: 'static + Send -{ - thread::spawn(move || { - let ret = loop { - match server.wait() { - Ok(msg) => match msg { - MsgType::Put(m) => match handler.post(m) { - ControlFlow::Continue(_) => {} - ControlFlow::Break(rv) => break Some(rv) - }, - MsgType::Request(m, rctx) => match handler.req(m, rctx) { - ControlFlow::Continue(_) => {} - ControlFlow::Break(rv) => break Some(rv) - } - }, - Err(_) => break None - } - }; - handler.term(ret) - }) -} - // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: tests/simple.rs ================================================================== --- tests/simple.rs +++ tests/simple.rs @@ -5,13 +5,12 @@ // Check the signalled state, which should be false. // Then Signal. // And finally check signalled state again, which should be true. #[test] fn no_clients() { - let handler = ThreadedServer::default(); - - let (clnt, jh) = ump_ng_server::spawn_thread(handler); + let (clnt, jh) = + ump_ng_server::spawn_thread(|_clnt| ThreadedServer::default()); let reply = clnt.req(Request::GetSignalState).unwrap(); assert_eq!(reply, Reply::SignalState(false)); clnt.post(Put::Signal).unwrap(); Index: tests/term.rs ================================================================== --- tests/term.rs +++ tests/term.rs @@ -5,13 +5,11 @@ use common::{Put, Reply, Request, ThreadedServer}; // Terminate the dispatcher loop by dropping the only client. #[test] fn no_clients() { - let handler = ThreadedServer::default(); - - let (clnt, jh) = ump_ng_server::spawn_thread(handler); + let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default()); // Drop the (only) client, which should cause dispatch loop to terminate. drop(clnt); // Termination by clients disappearing should return None @@ -21,13 +19,11 @@ // Terminate the dispatcher loop by explicitly requesting it to terminate from // its handler. #[test] fn handler_req_term() { - let handler = ThreadedServer::default(); - - let (clnt, jh) = ump_ng_server::spawn_thread(handler); + let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default()); assert_eq!(clnt.req(Request::Add(2, 4)).unwrap(), Reply::Sum(6)); assert_eq!(clnt.req(Request::Croak).unwrap(), Reply::OkIWillCroak); let rv = jh.join().unwrap(); @@ -36,13 +32,11 @@ } #[test] fn handler_put_term() { - let handler = ThreadedServer::default(); - - let (clnt, jh) = ump_ng_server::spawn_thread(handler); + let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default()); clnt.post(Put::Signal).unwrap(); clnt.post(Put::Croak).unwrap(); let rv = jh.join().unwrap(); @@ -52,13 +46,11 @@ // Populate the queue with a bunch of nodes, then drop the client while there // are still nodes in the queue #[test] fn handler_delay_term() { - let handler = ThreadedServer::default(); - - let (clnt, jh) = ump_ng_server::spawn_thread(handler); + let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default()); // Push a bunch of sleep nodes on the queue for _ in 0..10 { clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap(); } @@ -74,13 +66,11 @@ // Make sure all the sleepers are processed. // The explicit termination request should take precedence over terminating // over all clients disappearing. #[test] fn handler_delay_term_count() { - let handler = ThreadedServer::default(); - - let (clnt, jh) = ump_ng_server::spawn_thread(handler); + let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default()); for _ in 0..10 { clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap(); } clnt.post(Put::CroakSleepCount).unwrap(); Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -1,17 +1,37 @@ # Change Log ## [Unreleased] -[Details](/vdiff?from=ump-ng-server-0.1.2&to=trunk) +[Details](/vdiff?from=ump-ng-server-0.2.0&to=trunk) ### Added ### Changed ### Removed +--- + +## [0.2.0] - 2024-01-28 + +[Details](/vdiff?from=ump-ng-server-0.1.2&to=ump-ng-server-0.2.0) + +### Changed + +- Instead of taking in an `impl Handler` into the `{thread,task}::spawn()` + function, take in a closure that returns the handler. A reference to the + handler channel's client endpoint is passed to the closure, which makes it + possible to store `Client`/`WeakClient` in the handler, without involving an + `Option` (or similar). + +### Removed + +- Removed the `{thread,task}::spawn_preinit()` functions. These no longer + really serve a purpose since the deferred handler construction was + introduced. + --- ## [0.1.2] - 2024-01-21 [Details](/vdiff?from=ump-ng-server-0.1.1&to=ump-ng-server-0.1.2)