Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From ump-ng-server-0.1.2 To ump-ng-server-0.2.0
2024-01-28
| ||
17:35 | Make handler constructor closure fallible. check-in: 1e85d183b2 user: jan tags: trunk | |
15:19 | Release maintenance. check-in: d37659a48c user: jan tags: trunk, ump-ng-server-0.2.0 | |
15:09 | Use closure to construct Handler. Remove the spawn_preinit() functions, since using a closure to construct the Handler makes it possible to accomplish the same thing, but less ugly. check-in: d485928620 user: jan tags: trunk | |
2024-01-21
| ||
18:37 | Release maintenance. check-in: 5f420344bf user: jan tags: trunk, ump-ng-server-0.1.2 | |
18:31 | Add preinit variants of both thread and task spawners. check-in: e3a0f27d4a user: jan tags: trunk | |
Changes to Cargo.toml.
1 2 | [package] name = "ump-ng-server" | | | 1 2 3 4 5 6 7 8 9 10 | [package] name = "ump-ng-server" version = "0.2.0" edition = "2021" license = "0BSD" categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/ump-ng-server" description = "Server message dispatch loop for ump-ng." rust-version = "1.56" |
︙ | ︙ | |||
23 24 25 26 27 28 29 30 31 32 33 | [dependencies] async-trait = { version = "0.1.77", optional = true } # "net" is added as a temporary workaround. Without it building the docs fail # in tokio. tokio = { version = "1.35.1", features = ["net", "rt"], optional = true } ump-ng = { version = "0.1.0" } [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"] | > > > | 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | [dependencies] async-trait = { version = "0.1.77", optional = true } # "net" is added as a temporary workaround. Without it building the docs fail # in tokio. tokio = { version = "1.35.1", features = ["net", "rt"], optional = true } ump-ng = { version = "0.1.0" } [dev-dependencies] tokio-test = { version = "0.4.3" } [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"] |
Changes to src/lib.rs.
|
| | | > | 1 2 3 4 5 6 7 8 9 10 | //! _ump-ng-server_ is an abstraction on top of [`ump-ng`](ump_ng) that is used //! to hide boilerplate code used to implement intra-process message passing //! servers. //! //! # Dispatch loop //! The core functionality of _ump-ng-server_ is a dispatch loop, whose role it //! is to pull messages off the message queue and pass them to the //! application-supplied message handler. //! //! There are two different ways to run the dispatcher loop: On a non-async |
︙ | ︙ | |||
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | //! - The message queue is empty and all the associated [`Client`]s have been //! released. This would cause the thread to return `None`. //! //! # Application message handlers //! Message handlers are implemented using the [`thread::Handler`] trait (for //! the threaded dispatch loop) and [`task::Handler`] (for the async dispatch //! loop). #![cfg_attr(docsrs, feature(doc_cfg))] #[cfg(feature = "tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] pub mod task; pub mod thread; pub use ump_ng::{ self, channel, Client, MsgType, ReplyContext, Server, WeakClient }; | > > > > > > > > > > > > > | < < < | < < < | 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 | //! - The message queue is empty and all the associated [`Client`]s have been //! released. This would cause the thread to return `None`. //! //! # Application message handlers //! Message handlers are implemented using the [`thread::Handler`] trait (for //! the threaded dispatch loop) and [`task::Handler`] (for the async dispatch //! loop). //! //! There are cases where the handler needs to store a clone of the client //! end-point of the message passing channel used to issue requests to the //! server (so that message handlers can issue new requests). In order to //! facilitate this, the application must pass a `Handler`-construction closure //! to `spawn()`. The closure will be called after the message passing channel //! has been created so it can be passed a reference to the client end-point. //! //! If the dispatch loop should terminate once all the application's client //! end-points have been dropped, then the handler can store a [`WeakClient`] //! instead (as storing a cloned [`Client`] object will preventing the dispatch //! loop from terminating due to all clients being lost). The examples in the //! [`task`] and [`thread`] modules illustrate how to do this. #![cfg_attr(docsrs, feature(doc_cfg))] #[cfg(feature = "tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] pub mod task; pub mod thread; pub use ump_ng::{ self, channel, Client, MsgType, ReplyContext, Server, WeakClient }; pub use thread::{spawn as spawn_thread, Handler as ThreadedHandler}; #[cfg(feature = "tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] pub use task::{spawn as spawn_task, Handler as AsyncHandler}; #[cfg(feature = "tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] pub use async_trait::async_trait; // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/task.rs.
1 2 3 4 5 6 | use std::ops::ControlFlow; use tokio::task::{self, JoinHandle}; use async_trait::async_trait; | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | //! ump-ng dispatch server running on a thread. //! //! # Example //! ``` //! # tokio_test::block_on(async { //! use std::ops::ControlFlow; //! use ump_ng_server::{ //! async_trait, //! task::{Handler, spawn}, //! ump_ng::{ReplyContext, WeakClient} //! }; //! //! enum Post { //! ShoutIntoVoid //! } //! enum Request { //! Add(usize, usize) //! } //! enum Reply { //! Sum(usize) //! } //! #[derive(Debug)] //! enum MyError { } //! //! struct MyHandler { //! wclnt: WeakClient<Post, Request, Reply, MyError> //! } //! #[async_trait] //! impl Handler<Post, Request, Reply, MyError, ()> for MyHandler { //! async fn post(&mut self, msg: Post) -> ControlFlow<(), ()> { //! match msg { //! Post::ShoutIntoVoid => { //! // No reply .. but keep on trudging on //! ControlFlow::Continue(()) //! } //! } //! } //! async fn req( //! &mut self, //! msg: Request, //! rctx: ReplyContext<Reply, MyError> //! ) -> ControlFlow<(), ()> { //! match msg { //! Request::Add(a, b) => { //! rctx.reply(Reply::Sum(a+b)).unwrap(); //! ControlFlow::Continue(()) //! } //! } //! } //! } //! //! let (clnt, jh) = spawn(|clnt| { //! // Store a weak client in the handler so it doesn't keep the dispatch //! // loop alive when the Client returned to the application is dropped. //! MyHandler { //! wclnt: clnt.weak() //! } //! }); //! //! clnt.post(Post::ShoutIntoVoid).unwrap(); //! //! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else { //! panic!("Unexpected reply"); //! }; //! assert_eq!(sum, 10); //! //! // drop client to force dispatch loop to terminate //! drop(clnt); //! //! jh.await; //! # }); //! ``` use std::ops::ControlFlow; use tokio::task::{self, JoinHandle}; use async_trait::async_trait; use super::{channel, Client, MsgType, ReplyContext}; #[async_trait] pub trait Handler<P, S, R, E, RV> { /// Optional initialization callback. /// /// This is called on the dispatcher task before the main message /// processing loop is entered. |
︙ | ︙ | |||
43 44 45 46 47 48 49 | } /// Launch a task that will process incoming messages from an ump-ng server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). | | | | > > > > | 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 | } /// Launch a task that will process incoming messages from an ump-ng server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). pub fn spawn<P, S, R, E, RV, F>( hbldr: impl FnOnce(&Client<P, S, R, E>) -> F ) -> (Client<P, S, R, E>, JoinHandle<Option<RV>>) where P: 'static + Send, S: 'static + Send, R: 'static + Send, E: 'static + Send, RV: 'static + Send, F: Handler<P, S, R, E, RV> + Send + 'static { let (server, client) = channel(); let mut handler = hbldr(&client); let weak_client = client.weak(); let jh = task::spawn(async move { handler.init(weak_client); let ret = loop { match server.async_wait().await { Ok(msg) => match msg { MsgType::Put(m) => match handler.post(m).await { |
︙ | ︙ | |||
78 79 80 81 82 83 84 | }; handler.term(ret) }); (client, jh) } | < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < | 155 156 157 158 159 160 161 162 | }; handler.term(ret) }); (client, jh) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/thread.rs.
1 2 3 4 5 | use std::{ ops::ControlFlow, thread::{self, JoinHandle} }; | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 | //! ump-ng dispatch server running on a thread. //! //! # Example //! ``` //! use std::ops::ControlFlow; //! use ump_ng_server::{ //! thread::{Handler, spawn}, //! ump_ng::{ReplyContext, WeakClient} //! }; //! //! enum Post { //! ShoutIntoVoid //! } //! enum Request { //! Add(usize, usize) //! } //! enum Reply { //! Sum(usize) //! } //! #[derive(Debug)] //! enum MyError { } //! //! struct MyHandler { //! wclnt: WeakClient<Post, Request, Reply, MyError> //! } //! impl Handler<Post, Request, Reply, MyError, ()> for MyHandler { //! fn post(&mut self, msg: Post) -> ControlFlow<(), ()> { //! match msg { //! Post::ShoutIntoVoid => { //! // No reply .. but keep on trudging on //! ControlFlow::Continue(()) //! } //! } //! } //! fn req(&mut self, msg: Request, rctx: ReplyContext<Reply, MyError>) //! -> ControlFlow<(), ()> { //! match msg { //! Request::Add(a, b) => { //! rctx.reply(Reply::Sum(a+b)).unwrap(); //! ControlFlow::Continue(()) //! } //! } //! } //! } //! //! let (clnt, jh) = spawn(|clnt| { //! // Store a weak client in the handler so it doesn't keep the dispatch //! // loop alive when the Client returned to the application is dropped. //! MyHandler { //! wclnt: clnt.weak() //! } //! }); //! //! clnt.post(Post::ShoutIntoVoid).unwrap(); //! //! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else { //! panic!("Unexpected reply"); //! }; //! assert_eq!(sum, 10); //! //! // drop client to force dispatch loop to terminate //! drop(clnt); //! //! jh.join(); //! ``` use std::{ ops::ControlFlow, thread::{self, JoinHandle} }; use super::{channel, Client, MsgType, ReplyContext}; pub trait Handler<P, S, R, E, RV> { /// Optional initialization callback. /// /// This is called on the dispatcher thread before the main message /// processing loop is entered. #[allow(unused_variables)] |
︙ | ︙ | |||
37 38 39 40 41 42 43 | } /// Launch a thread that will process incoming messages from an ump-ng server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). | | | | > > > > | 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | } /// Launch a thread that will process incoming messages from an ump-ng server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). pub fn spawn<P, S, R, E, RV, F>( hbldr: impl FnOnce(&Client<P, S, R, E>) -> F ) -> (Client<P, S, R, E>, JoinHandle<Option<RV>>) where P: 'static + Send, S: 'static + Send, R: 'static + Send, E: 'static + Send, RV: 'static + Send, F: Handler<P, S, R, E, RV> + Send + 'static { let (server, client) = channel(); let mut handler = hbldr(&client); let weak_client = client.weak(); let jh = thread::spawn(move || { handler.init(weak_client); let ret = loop { match server.wait() { Ok(msg) => match msg { MsgType::Put(m) => match handler.post(m) { |
︙ | ︙ | |||
72 73 74 75 76 77 78 | }; handler.term(ret) }); (client, jh) } | < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < | 142 143 144 145 146 147 148 149 | }; handler.term(ret) }); (client, jh) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to tests/simple.rs.
1 2 3 4 5 6 7 8 9 | mod common; use common::{Put, Reply, Request, ThreadedServer}; // Check the signalled state, which should be false. // Then Signal. // And finally check signalled state again, which should be true. #[test] fn no_clients() { | < | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | mod common; use common::{Put, Reply, Request, ThreadedServer}; // Check the signalled state, which should be false. // Then Signal. // And finally check signalled state again, which should be true. #[test] fn no_clients() { let (clnt, jh) = ump_ng_server::spawn_thread(|_clnt| ThreadedServer::default()); let reply = clnt.req(Request::GetSignalState).unwrap(); assert_eq!(reply, Reply::SignalState(false)); clnt.post(Put::Signal).unwrap(); let reply = clnt.req(Request::GetSignalState).unwrap(); |
︙ | ︙ |
Changes to tests/term.rs.
1 2 3 4 5 6 7 8 9 | mod common; use std::time::Duration; use common::{Put, Reply, Request, ThreadedServer}; // Terminate the dispatcher loop by dropping the only client. #[test] fn no_clients() { | < < | < < | < < | < < | < < | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | mod common; use std::time::Duration; use common::{Put, Reply, Request, ThreadedServer}; // Terminate the dispatcher loop by dropping the only client. #[test] fn no_clients() { let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default()); // Drop the (only) client, which should cause dispatch loop to terminate. drop(clnt); // Termination by clients disappearing should return None assert_eq!(jh.join().unwrap(), None); } // Terminate the dispatcher loop by explicitly requesting it to terminate from // its handler. #[test] fn handler_req_term() { let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default()); assert_eq!(clnt.req(Request::Add(2, 4)).unwrap(), Reply::Sum(6)); assert_eq!(clnt.req(Request::Croak).unwrap(), Reply::OkIWillCroak); let rv = jh.join().unwrap(); assert_eq!(rv, Some(42)); } #[test] fn handler_put_term() { let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default()); clnt.post(Put::Signal).unwrap(); clnt.post(Put::Croak).unwrap(); let rv = jh.join().unwrap(); assert_eq!(rv, Some(42)); } // Populate the queue with a bunch of nodes, then drop the client while there // are still nodes in the queue #[test] fn handler_delay_term() { let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default()); // Push a bunch of sleep nodes on the queue for _ in 0..10 { clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap(); } // Drop the client drop(clnt); let rv = jh.join().unwrap(); assert_eq!(rv, None); } // Make sure all the sleepers are processed. // The explicit termination request should take precedence over terminating // over all clients disappearing. #[test] fn handler_delay_term_count() { let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default()); for _ in 0..10 { clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap(); } clnt.post(Put::CroakSleepCount).unwrap(); drop(clnt); |
︙ | ︙ |
Changes to www/changelog.md.
1 2 3 4 | # Change Log ## [Unreleased] | | > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | # Change Log ## [Unreleased] [Details](/vdiff?from=ump-ng-server-0.2.0&to=trunk) ### Added ### Changed ### Removed --- ## [0.2.0] - 2024-01-28 [Details](/vdiff?from=ump-ng-server-0.1.2&to=ump-ng-server-0.2.0) ### Changed - Instead of taking in an `impl Handler` into the `{thread,task}::spawn()` function, take in a closure that returns the handler. A reference to the handler channel's client endpoint is passed to the closure, which makes it possible to store `Client`/`WeakClient` in the handler, without involving an `Option` (or similar). ### Removed - Removed the `{thread,task}::spawn_preinit()` functions. These no longer really serve a purpose since the deferred handler construction was introduced. --- ## [0.1.2] - 2024-01-21 [Details](/vdiff?from=ump-ng-server-0.1.1&to=ump-ng-server-0.1.2) ### Added |
︙ | ︙ |