Index: .efiles ================================================================== --- .efiles +++ .efiles @@ -3,7 +3,9 @@ www/index.md www/changelog.md src/lib.rs src/thread.rs src/task.rs +src/wdog.rs tests/term.rs tests/common/mod.rs +examples/wdog_timeout.rs Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "ump-server" -version = "0.2.0" +version = "0.2.1" edition = "2021" license = "0BSD" categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/ump-server" @@ -17,20 +17,24 @@ ] [features] default = ["tokio"] tokio = ["dep:tokio", "dep:async-trait"] +tracing = ["dep:tracing"] +watchdog = ["dep:parking_lot"] [dependencies] async-trait = { version = "0.1.77", optional = true } +parking_lot = { version = "0.12.1", optional = true } # ToDo: Shouldn't need "net", but without it the docs will not build. # Once this is fixed in tokio, remove "net". -tokio = { version = "1.35.1", features = ["net", "rt"], optional = true } +tokio = { version = "1.36.0", features = ["net", "rt"], optional = true } +tracing = { version = "0.1.40", optional = true } ump = { version = "0.12.1" } [dev-dependencies] tokio-test = { version = "0.4.3" } [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"] ADDED examples/wdog_timeout.rs Index: examples/wdog_timeout.rs ================================================================== --- /dev/null +++ examples/wdog_timeout.rs @@ -0,0 +1,52 @@ +use std::ops::ControlFlow; + +#[derive(Debug)] +pub enum Request { + Delay(u64) +} + +#[derive(Debug, PartialEq)] +pub enum Reply { + DelayDone +} + +pub struct ThreadedServer {} + +impl ump_server::ThreadedHandler for ThreadedServer { + fn proc_req( + &mut self, + msg: Request, + rctx: ump_server::ReplyContext + ) -> ControlFlow { + match msg { + Request::Delay(ms) => { + std::thread::sleep(std::time::Duration::from_millis(ms)); + rctx.reply(Reply::DelayDone).unwrap(); + ControlFlow::Continue(()) + } + } + } +} + +// Terminate the dispatcher loop by dropping the only client. +fn main() { + #[cfg(not(feature = "watchdog"))] + eprintln!("Warning: Example not built with watchdog feature"); + + let (clnt, jh) = + ump_server::spawn_thread(|_clnt| Ok(ThreadedServer {})).unwrap(); + + println!("==> Issue request which should not timeout .."); + clnt.req(Request::Delay(190)).unwrap(); + + println!("==> Issue request which should timeout .."); + clnt.req(Request::Delay(200)).unwrap(); + + // Drop the (only) client, which should cause dispatch loop to terminate. + drop(clnt); + + // Termination by clients disappearing should return None + assert_eq!(jh.join().unwrap(), None); +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -31,12 +31,28 @@ //! //! # Application message handlers //! Message handlers are implemented using the [`thread::Handler`] trait (for //! the threaded dispatch loop) and [`task::Handler`] (for the async dispatch //! loop). +//! +//! There are cases where the handler needs to store a clone of the client +//! end-point of the message passing channel used to issue requests to the +//! server (so that message handlers can issue new requests). In order to +//! facilitate this, the application must pass a `Handler`-construction closure +//! to `spawn()`. The closure will be called after the message passing channel +//! has been created so it can be passed a reference to the client end-point. +//! +//! If the dispatch loop should terminate once all the application's client +//! end-points have been dropped, then the handler can store a [`WeakClient`] +//! instead (as storing a cloned [`Client`] object will preventing the dispatch +//! loop from terminating due to all clients being lost). The examples in the +//! [`task`] and [`thread`] modules illustrate how to do this. #![cfg_attr(docsrs, feature(doc_cfg))] + +#[cfg(feature = "watchdog")] +mod wdog; #[cfg(feature = "tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] pub mod task; Index: src/task.rs ================================================================== --- src/task.rs +++ src/task.rs @@ -1,23 +1,29 @@ //! ump server running in an async task. //! +//! # Example //! ``` //! # tokio_test::block_on(async { //! use std::ops::ControlFlow; //! use ump_server::{ //! async_trait, //! task::{Handler, spawn}, -//! ump::ReplyContext +//! ReplyContext, WeakClient //! }; +//! //! enum Request { //! Add(usize, usize) //! } //! enum Reply { //! Sum(usize) //! } +//! #[derive(Debug)] //! enum MyError { } -//! struct MyHandler {}; +//! +//! struct MyHandler { +//! wclnt: WeakClient +//! }; //! #[async_trait] //! impl Handler for MyHandler { //! async fn proc_req( //! &mut self, //! msg: Request, @@ -31,12 +37,16 @@ //! } //! } //! } //! //! let (clnt, jh) = spawn(|clnt| { -//! MyHandler { } -//! }); +//! // Store a weak client in the handler so it doesn't keep the dispatch +//! // loop alive when the Client returned to the application is dropped. +//! Ok(MyHandler { +//! wclnt: clnt.weak() +//! }) +//! }).unwrap(); //! //! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else { //! panic!("Unexpected reply"); //! }; //! assert_eq!(sum, 10); @@ -100,39 +110,56 @@ /// Run a task which will process incoming messages from an ump server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). +#[allow(clippy::type_complexity)] pub fn spawn( - hbldr: impl FnOnce(&Client) -> F -) -> (Client, JoinHandle>) + hbldr: impl FnOnce(&Client) -> Result +) -> Result<(Client, JoinHandle>), E> where S: 'static + Send, R: 'static + Send, E: 'static + Send, RV: 'static + Send, F: Handler + Send + 'static { let (server, client) = channel(); - let mut handler = hbldr(&client); + let mut handler = hbldr(&client)?; + + #[cfg(feature = "watchdog")] + let wdog = crate::wdog::run(); let weak_client = client.weak(); let jh = task::spawn(async move { handler.init(weak_client); let ret = loop { let (msg, rctx) = match server.async_wait().await { Ok(d) => d, Err(_) => break None }; - match handler.proc_req(msg, rctx).await { + + #[cfg(feature = "watchdog")] + wdog.begin_process(); + + let res = handler.proc_req(msg, rctx).await; + + #[cfg(feature = "watchdog")] + wdog.end_process(); + + match res { ControlFlow::Continue(_) => {} ControlFlow::Break(rv) => break Some(rv) } }; + + #[cfg(feature = "watchdog")] + let _ = wdog.kill(); + handler.term(ret) }); - (client, jh) + Ok((client, jh)) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/thread.rs ================================================================== --- src/thread.rs +++ src/thread.rs @@ -1,21 +1,27 @@ //! ump server running on a thread. //! +//! # Example //! ``` //! use std::ops::ControlFlow; //! use ump_server::{ //! thread::{Handler, spawn}, -//! ump::ReplyContext +//! ReplyContext, WeakClient //! }; +//! //! enum Request { //! Add(usize, usize) //! } //! enum Reply { //! Sum(usize) //! } +//! #[derive(Debug)] //! enum MyError { } -//! struct MyHandler {}; +//! +//! struct MyHandler { +//! wclnt: WeakClient +//! }; //! impl Handler for MyHandler { //! fn proc_req( //! &mut self, //! msg: Request, //! rctx: ReplyContext @@ -28,12 +34,16 @@ //! } //! } //! } //! //! let (clnt, jh) = spawn(|clnt| { -//! MyHandler { } -//! }); +//! // Store a weak client in the handler so it doesn't keep the dispatch +//! // loop alive when the Client returned to the application is dropped. +//! Ok(MyHandler { +//! wclnt: clnt.weak() +//! }) +//! }).unwrap(); //! //! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else { //! panic!("Unexpected reply"); //! }; //! assert_eq!(sum, 10); @@ -91,39 +101,56 @@ /// Run a thread which will process incoming messages from an ump server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). +#[allow(clippy::type_complexity)] pub fn spawn( - hbldr: impl FnOnce(&Client) -> F -) -> (Client, thread::JoinHandle>) + hbldr: impl FnOnce(&Client) -> Result +) -> Result<(Client, thread::JoinHandle>), E> where S: 'static + Send, R: 'static + Send, E: 'static + Send, RV: 'static + Send, F: Handler + Send + 'static { let (server, client) = channel(); - let mut handler = hbldr(&client); + let mut handler = hbldr(&client)?; + + #[cfg(feature = "watchdog")] + let wdog = crate::wdog::run(); let weak_client = client.weak(); let jh = thread::spawn(move || { handler.init(weak_client); let ret = loop { let (msg, rctx) = match server.wait() { Ok(d) => d, Err(_) => break None }; - match handler.proc_req(msg, rctx) { + + #[cfg(feature = "watchdog")] + wdog.begin_process(); + + let res = handler.proc_req(msg, rctx); + + #[cfg(feature = "watchdog")] + wdog.end_process(); + + match res { ControlFlow::Continue(_) => {} ControlFlow::Break(rv) => break Some(rv) } }; + + #[cfg(feature = "watchdog")] + let _ = wdog.kill(); + handler.term(ret) }); - (client, jh) + Ok((client, jh)) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/wdog.rs Index: src/wdog.rs ================================================================== --- /dev/null +++ src/wdog.rs @@ -0,0 +1,120 @@ +use std::{ + sync::Arc, + thread::JoinHandle, + time::{Duration, Instant} +}; + +use parking_lot::{Condvar, Mutex}; + +/// Maximum amount of milliseconds allowed +const MAX_PROC_MILLIS: u64 = 200; + +enum State { + /// Waiting for a message to arrive. + Idle, + + /// A message has arrived and is being processed. + Processing { + start_time: Instant + }, + + /// Message processing has timed out. + Timeout, + + Term +} + +struct Inner { + state: State +} + +struct Shared { + inner: Mutex, + signal: Condvar +} + +pub(crate) fn run() -> WatchDog { + let inner = Inner { state: State::Idle }; + + let shared = Shared { + inner: Mutex::new(inner), + signal: Condvar::new() + }; + + let sh = Arc::new(shared); + let shared = Arc::clone(&sh); + let jh = std::thread::spawn(|| monitor_thread(shared)); + + WatchDog { sh, jh } +} + +pub(crate) struct WatchDog { + sh: Arc, + jh: JoinHandle<()> +} + +impl WatchDog { + pub(crate) fn begin_process(&self) { + let mut g = self.sh.inner.lock(); + g.state = State::Processing { + start_time: Instant::now() + }; + self.sh.signal.notify_one(); + } + + pub(crate) fn end_process(&self) { + let mut g = self.sh.inner.lock(); + g.state = State::Idle; + self.sh.signal.notify_one(); + } + + pub(crate) fn kill(self) -> std::thread::Result<()> { + let mut g = self.sh.inner.lock(); + g.state = State::Term; + self.sh.signal.notify_one(); + drop(g); + self.jh.join() + } +} + + +fn monitor_thread(sh: Arc) { + let mut g = sh.inner.lock(); + loop { + match g.state { + State::Idle => { + // Wait to be notified about a state change + sh.signal.wait(&mut g); + } + State::Processing { start_time } => { + let timeout = start_time + Duration::from_millis(MAX_PROC_MILLIS); + if sh.signal.wait_until(&mut g, timeout).timed_out() { + g.state = State::Timeout; + continue; + } + } + State::Timeout => { + #[cfg(feature = "tracing")] + tracing::warn!( + "Message processing held up the dispatcher more than {}ms", + MAX_PROC_MILLIS + ); + + #[cfg(not(feature = "tracing"))] + eprintln!( + "Warning: Message processing held up the dispatcher more than {}ms", + MAX_PROC_MILLIS + ); + + // Retutn to idle state + g.state = State::Idle; + continue; + } + State::Term => { + break; + } + } + } +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: tests/term.rs ================================================================== --- tests/term.rs +++ tests/term.rs @@ -3,11 +3,12 @@ use common::{Reply, Request, ThreadedServer}; // Terminate the dispatcher loop by dropping the only client. #[test] fn no_clients() { - let (clnt, jh) = ump_server::spawn_thread(|_clnt| ThreadedServer {}); + let (clnt, jh) = + ump_server::spawn_thread(|_clnt| Ok(ThreadedServer {})).unwrap(); // Drop the (only) client, which should cause dispatch loop to terminate. drop(clnt); // Termination by clients disappearing should return None @@ -16,14 +17,15 @@ // Terminate the dispatcher loop by explicitly requesting it to terminate from // its handler. #[test] fn handler_req_term() { - let (clnt, jh) = ump_server::spawn_thread(|_clnt| ThreadedServer {}); + let (clnt, jh) = + ump_server::spawn_thread(|_clnt| Ok(ThreadedServer {})).unwrap(); assert_eq!(clnt.req(Request::Add(2, 4)).unwrap(), Reply::Sum(6)); assert_eq!(clnt.req(Request::Croak).unwrap(), Reply::OkIWillCroak); assert_eq!(jh.join().unwrap(), Some(42)); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -1,17 +1,34 @@ # Change Log ## [Unreleased] -[Details](/vdiff?from=ump-server-0.2.0&to=trunk) +[Details](/vdiff?from=ump-server-0.2.1&to=trunk) ### Added ### Changed ### Removed +--- + +## [0.2.1] + +[Details](/vdiff?from=ump-server-0.2.0&to=ump-server-0.2.1) + +### Added + +- Add a watchdog that will warn if a msgproc takes more than 200ms to return. + This is intended for dev builds only and must be enabled using the + `watchdog` feature. Will output to stderr by default, but will output to + `tracing::warn!()` if the `tracing` feature is enabled as well. + +### Changed + +- Make `Handler` builder closure fallible. + --- ## [0.2.0] - 2024-01-28 [Details](/vdiff?from=ump-server-0.1.0&to=ump-server-0.2.0)