Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From ump-server-0.2.0 To ump-server-0.2.1
2024-02-20
| ||
11:58 | Changelog fixup. check-in: b99bf2d8a9 user: jan tags: trunk | |
11:50 | Release maintenance. check-in: 933f121ec8 user: jan tags: trunk, ump-server-0.2.1 | |
08:06 | Up version. check-in: 157c1acc22 user: jan tags: trunk | |
2024-01-28
| ||
14:28 | Update doc examples to store a weak client reference within the handler. check-in: 3ea6f918b0 user: jan tags: trunk | |
13:39 | Release maintenance. check-in: 88c41e1742 user: jan tags: trunk, ump-server-0.2.0 | |
13:26 | Make the spawm methods take in a closure for constructing the handler, to allow the handler to be created after the channel client. Add thread/task examples to module docs. check-in: 752b1bbc09 user: jan tags: trunk | |
Changes to .efiles.
1 2 3 4 5 6 7 8 9 | Cargo.toml README.md www/index.md www/changelog.md src/lib.rs src/thread.rs src/task.rs tests/term.rs tests/common/mod.rs | > > | 1 2 3 4 5 6 7 8 9 10 11 | Cargo.toml README.md www/index.md www/changelog.md src/lib.rs src/thread.rs src/task.rs src/wdog.rs tests/term.rs tests/common/mod.rs examples/wdog_timeout.rs |
Changes to Cargo.toml.
1 2 | [package] name = "ump-server" | | > > > | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | [package] name = "ump-server" version = "0.2.1" edition = "2021" license = "0BSD" categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/ump-server" description = "Server message dispatch loop for ump." rust-version = "1.56" exclude = [ ".fossil-settings", ".efiles", ".fslckout", "www", "rustfmt.toml" ] [features] default = ["tokio"] tokio = ["dep:tokio", "dep:async-trait"] tracing = ["dep:tracing"] watchdog = ["dep:parking_lot"] [dependencies] async-trait = { version = "0.1.77", optional = true } parking_lot = { version = "0.12.1", optional = true } # ToDo: Shouldn't need "net", but without it the docs will not build. # Once this is fixed in tokio, remove "net". tokio = { version = "1.36.0", features = ["net", "rt"], optional = true } tracing = { version = "0.1.40", optional = true } ump = { version = "0.12.1" } [dev-dependencies] tokio-test = { version = "0.4.3" } [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"] |
Added examples/wdog_timeout.rs.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | use std::ops::ControlFlow; #[derive(Debug)] pub enum Request { Delay(u64) } #[derive(Debug, PartialEq)] pub enum Reply { DelayDone } pub struct ThreadedServer {} impl ump_server::ThreadedHandler<Request, Reply, (), u32> for ThreadedServer { fn proc_req( &mut self, msg: Request, rctx: ump_server::ReplyContext<Reply, ()> ) -> ControlFlow<u32, ()> { match msg { Request::Delay(ms) => { std::thread::sleep(std::time::Duration::from_millis(ms)); rctx.reply(Reply::DelayDone).unwrap(); ControlFlow::Continue(()) } } } } // Terminate the dispatcher loop by dropping the only client. fn main() { #[cfg(not(feature = "watchdog"))] eprintln!("Warning: Example not built with watchdog feature"); let (clnt, jh) = ump_server::spawn_thread(|_clnt| Ok(ThreadedServer {})).unwrap(); println!("==> Issue request which should not timeout .."); clnt.req(Request::Delay(190)).unwrap(); println!("==> Issue request which should timeout .."); clnt.req(Request::Delay(200)).unwrap(); // Drop the (only) client, which should cause dispatch loop to terminate. drop(clnt); // Termination by clients disappearing should return None assert_eq!(jh.join().unwrap(), None); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/lib.rs.
︙ | ︙ | |||
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | //! - The message queue is empty and all the associated [`Client`]s have been //! released. This would cause the thread to return `None`. //! //! # Application message handlers //! Message handlers are implemented using the [`thread::Handler`] trait (for //! the threaded dispatch loop) and [`task::Handler`] (for the async dispatch //! loop). #![cfg_attr(docsrs, feature(doc_cfg))] #[cfg(feature = "tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] pub mod task; pub mod thread; | > > > > > > > > > > > > > > > > | 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | //! - The message queue is empty and all the associated [`Client`]s have been //! released. This would cause the thread to return `None`. //! //! # Application message handlers //! Message handlers are implemented using the [`thread::Handler`] trait (for //! the threaded dispatch loop) and [`task::Handler`] (for the async dispatch //! loop). //! //! There are cases where the handler needs to store a clone of the client //! end-point of the message passing channel used to issue requests to the //! server (so that message handlers can issue new requests). In order to //! facilitate this, the application must pass a `Handler`-construction closure //! to `spawn()`. The closure will be called after the message passing channel //! has been created so it can be passed a reference to the client end-point. //! //! If the dispatch loop should terminate once all the application's client //! end-points have been dropped, then the handler can store a [`WeakClient`] //! instead (as storing a cloned [`Client`] object will preventing the dispatch //! loop from terminating due to all clients being lost). The examples in the //! [`task`] and [`thread`] modules illustrate how to do this. #![cfg_attr(docsrs, feature(doc_cfg))] #[cfg(feature = "watchdog")] mod wdog; #[cfg(feature = "tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] pub mod task; pub mod thread; |
︙ | ︙ |
Changes to src/task.rs.
1 2 3 4 5 6 7 8 | //! ump server running in an async task. //! //! ``` //! # tokio_test::block_on(async { //! use std::ops::ControlFlow; //! use ump_server::{ //! async_trait, //! task::{Handler, spawn}, | > | > > > | > > > > > > | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | //! ump server running in an async task. //! //! # Example //! ``` //! # tokio_test::block_on(async { //! use std::ops::ControlFlow; //! use ump_server::{ //! async_trait, //! task::{Handler, spawn}, //! ReplyContext, WeakClient //! }; //! //! enum Request { //! Add(usize, usize) //! } //! enum Reply { //! Sum(usize) //! } //! #[derive(Debug)] //! enum MyError { } //! //! struct MyHandler { //! wclnt: WeakClient<Request, Reply, MyError> //! }; //! #[async_trait] //! impl Handler<Request, Reply, MyError, ()> for MyHandler { //! async fn proc_req( //! &mut self, //! msg: Request, //! rctx: ReplyContext<Reply, MyError> //! ) -> ControlFlow<(), ()> { //! match msg { //! Request::Add(a, b) => { //! rctx.reply(Reply::Sum(a + b)); //! ControlFlow::Continue(()) //! } //! } //! } //! } //! //! let (clnt, jh) = spawn(|clnt| { //! // Store a weak client in the handler so it doesn't keep the dispatch //! // loop alive when the Client returned to the application is dropped. //! Ok(MyHandler { //! wclnt: clnt.weak() //! }) //! }).unwrap(); //! //! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else { //! panic!("Unexpected reply"); //! }; //! assert_eq!(sum, 10); //! //! // Dropping the only client will terminate the dispatch loop |
︙ | ︙ | |||
98 99 100 101 102 103 104 105 | } /// Run a task which will process incoming messages from an ump server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). pub fn spawn<S, R, E, RV, F>( | > | | | > > > > > > > | > > > > > > > > > | | 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | } /// Run a task which will process incoming messages from an ump server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). #[allow(clippy::type_complexity)] pub fn spawn<S, R, E, RV, F>( hbldr: impl FnOnce(&Client<S, R, E>) -> Result<F, E> ) -> Result<(Client<S, R, E>, JoinHandle<Option<RV>>), E> where S: 'static + Send, R: 'static + Send, E: 'static + Send, RV: 'static + Send, F: Handler<S, R, E, RV> + Send + 'static { let (server, client) = channel(); let mut handler = hbldr(&client)?; #[cfg(feature = "watchdog")] let wdog = crate::wdog::run(); let weak_client = client.weak(); let jh = task::spawn(async move { handler.init(weak_client); let ret = loop { let (msg, rctx) = match server.async_wait().await { Ok(d) => d, Err(_) => break None }; #[cfg(feature = "watchdog")] wdog.begin_process(); let res = handler.proc_req(msg, rctx).await; #[cfg(feature = "watchdog")] wdog.end_process(); match res { ControlFlow::Continue(_) => {} ControlFlow::Break(rv) => break Some(rv) } }; #[cfg(feature = "watchdog")] let _ = wdog.kill(); handler.term(ret) }); Ok((client, jh)) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/thread.rs.
1 2 3 4 5 6 | //! ump server running on a thread. //! //! ``` //! use std::ops::ControlFlow; //! use ump_server::{ //! thread::{Handler, spawn}, | > | > > > | > > > > > > | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | //! ump server running on a thread. //! //! # Example //! ``` //! use std::ops::ControlFlow; //! use ump_server::{ //! thread::{Handler, spawn}, //! ReplyContext, WeakClient //! }; //! //! enum Request { //! Add(usize, usize) //! } //! enum Reply { //! Sum(usize) //! } //! #[derive(Debug)] //! enum MyError { } //! //! struct MyHandler { //! wclnt: WeakClient<Request, Reply, MyError> //! }; //! impl Handler<Request, Reply, MyError, ()> for MyHandler { //! fn proc_req( //! &mut self, //! msg: Request, //! rctx: ReplyContext<Reply, MyError> //! ) -> ControlFlow<(), ()> { //! match msg { //! Request::Add(a, b) => { //! rctx.reply(Reply::Sum(a + b)); //! ControlFlow::Continue(()) //! } //! } //! } //! } //! //! let (clnt, jh) = spawn(|clnt| { //! // Store a weak client in the handler so it doesn't keep the dispatch //! // loop alive when the Client returned to the application is dropped. //! Ok(MyHandler { //! wclnt: clnt.weak() //! }) //! }).unwrap(); //! //! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else { //! panic!("Unexpected reply"); //! }; //! assert_eq!(sum, 10); //! //! // Dropping the only client will terminate the dispatch loop |
︙ | ︙ | |||
89 90 91 92 93 94 95 96 | } /// Run a thread which will process incoming messages from an ump server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). pub fn spawn<S, R, E, RV, F>( | > | | | > > > > > > > | > > > > > > > > > | | 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | } /// Run a thread which will process incoming messages from an ump server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). #[allow(clippy::type_complexity)] pub fn spawn<S, R, E, RV, F>( hbldr: impl FnOnce(&Client<S, R, E>) -> Result<F, E> ) -> Result<(Client<S, R, E>, thread::JoinHandle<Option<RV>>), E> where S: 'static + Send, R: 'static + Send, E: 'static + Send, RV: 'static + Send, F: Handler<S, R, E, RV> + Send + 'static { let (server, client) = channel(); let mut handler = hbldr(&client)?; #[cfg(feature = "watchdog")] let wdog = crate::wdog::run(); let weak_client = client.weak(); let jh = thread::spawn(move || { handler.init(weak_client); let ret = loop { let (msg, rctx) = match server.wait() { Ok(d) => d, Err(_) => break None }; #[cfg(feature = "watchdog")] wdog.begin_process(); let res = handler.proc_req(msg, rctx); #[cfg(feature = "watchdog")] wdog.end_process(); match res { ControlFlow::Continue(_) => {} ControlFlow::Break(rv) => break Some(rv) } }; #[cfg(feature = "watchdog")] let _ = wdog.kill(); handler.term(ret) }); Ok((client, jh)) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Added src/wdog.rs.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | use std::{ sync::Arc, thread::JoinHandle, time::{Duration, Instant} }; use parking_lot::{Condvar, Mutex}; /// Maximum amount of milliseconds allowed const MAX_PROC_MILLIS: u64 = 200; enum State { /// Waiting for a message to arrive. Idle, /// A message has arrived and is being processed. Processing { start_time: Instant }, /// Message processing has timed out. Timeout, Term } struct Inner { state: State } struct Shared { inner: Mutex<Inner>, signal: Condvar } pub(crate) fn run() -> WatchDog { let inner = Inner { state: State::Idle }; let shared = Shared { inner: Mutex::new(inner), signal: Condvar::new() }; let sh = Arc::new(shared); let shared = Arc::clone(&sh); let jh = std::thread::spawn(|| monitor_thread(shared)); WatchDog { sh, jh } } pub(crate) struct WatchDog { sh: Arc<Shared>, jh: JoinHandle<()> } impl WatchDog { pub(crate) fn begin_process(&self) { let mut g = self.sh.inner.lock(); g.state = State::Processing { start_time: Instant::now() }; self.sh.signal.notify_one(); } pub(crate) fn end_process(&self) { let mut g = self.sh.inner.lock(); g.state = State::Idle; self.sh.signal.notify_one(); } pub(crate) fn kill(self) -> std::thread::Result<()> { let mut g = self.sh.inner.lock(); g.state = State::Term; self.sh.signal.notify_one(); drop(g); self.jh.join() } } fn monitor_thread(sh: Arc<Shared>) { let mut g = sh.inner.lock(); loop { match g.state { State::Idle => { // Wait to be notified about a state change sh.signal.wait(&mut g); } State::Processing { start_time } => { let timeout = start_time + Duration::from_millis(MAX_PROC_MILLIS); if sh.signal.wait_until(&mut g, timeout).timed_out() { g.state = State::Timeout; continue; } } State::Timeout => { #[cfg(feature = "tracing")] tracing::warn!( "Message processing held up the dispatcher more than {}ms", MAX_PROC_MILLIS ); #[cfg(not(feature = "tracing"))] eprintln!( "Warning: Message processing held up the dispatcher more than {}ms", MAX_PROC_MILLIS ); // Retutn to idle state g.state = State::Idle; continue; } State::Term => { break; } } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to tests/term.rs.
1 2 3 4 5 6 7 | mod common; use common::{Reply, Request, ThreadedServer}; // Terminate the dispatcher loop by dropping the only client. #[test] fn no_clients() { | | > | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | mod common; use common::{Reply, Request, ThreadedServer}; // Terminate the dispatcher loop by dropping the only client. #[test] fn no_clients() { let (clnt, jh) = ump_server::spawn_thread(|_clnt| Ok(ThreadedServer {})).unwrap(); // Drop the (only) client, which should cause dispatch loop to terminate. drop(clnt); // Termination by clients disappearing should return None assert_eq!(jh.join().unwrap(), None); } // Terminate the dispatcher loop by explicitly requesting it to terminate from // its handler. #[test] fn handler_req_term() { let (clnt, jh) = ump_server::spawn_thread(|_clnt| Ok(ThreadedServer {})).unwrap(); assert_eq!(clnt.req(Request::Add(2, 4)).unwrap(), Reply::Sum(6)); assert_eq!(clnt.req(Request::Croak).unwrap(), Reply::OkIWillCroak); assert_eq!(jh.join().unwrap(), Some(42)); } |
︙ | ︙ |
Changes to www/changelog.md.
1 2 3 4 | # Change Log ## [Unreleased] | | > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | # Change Log ## [Unreleased] [Details](/vdiff?from=ump-server-0.2.1&to=trunk) ### Added ### Changed ### Removed --- ## [0.2.1] [Details](/vdiff?from=ump-server-0.2.0&to=ump-server-0.2.1) ### Added - Add a watchdog that will warn if a msgproc takes more than 200ms to return. This is intended for dev builds only and must be enabled using the `watchdog` feature. Will output to stderr by default, but will output to `tracing::warn!()` if the `tracing` feature is enabled as well. ### Changed - Make `Handler` builder closure fallible. --- ## [0.2.0] - 2024-01-28 [Details](/vdiff?from=ump-server-0.1.0&to=ump-server-0.2.0) ### Added |
︙ | ︙ |