Index: .efiles ================================================================== --- .efiles +++ .efiles @@ -3,8 +3,9 @@ www/index.md www/changelog.md src/lib.rs src/thread.rs src/task.rs +src/wdog.rs tests/common/mod.rs tests/simple.rs tests/term.rs Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "ump-ng-server" -version = "0.2.0" +version = "0.2.1" edition = "2021" license = "0BSD" categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/ump-ng-server" @@ -17,20 +17,24 @@ ] [features] default = ["tokio"] tokio = ["dep:tokio", "dep:async-trait"] +tracing = ["dep:tracing"] +watchdog = ["dep:parking_lot"] [dependencies] async-trait = { version = "0.1.77", optional = true } +parking_lot = { version = "0.12.1", optional = true } # "net" is added as a temporary workaround. Without it building the docs fail # in tokio. tokio = { version = "1.35.1", features = ["net", "rt"], optional = true } +tracing = { version = "0.1.40", optional = true } ump-ng = { version = "0.1.0" } [dev-dependencies] tokio-test = { version = "0.4.3" } [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"] Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -47,10 +47,13 @@ //! instead (as storing a cloned [`Client`] object will preventing the dispatch //! loop from terminating due to all clients being lost). The examples in the //! [`task`] and [`thread`] modules illustrate how to do this. #![cfg_attr(docsrs, feature(doc_cfg))] + +#[cfg(feature = "watchdog")] +mod wdog; #[cfg(feature = "tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] pub mod task; Index: src/task.rs ================================================================== --- src/task.rs +++ src/task.rs @@ -50,14 +50,14 @@ //! } //! //! let (clnt, jh) = spawn(|clnt| { //! // Store a weak client in the handler so it doesn't keep the dispatch //! // loop alive when the Client returned to the application is dropped. -//! MyHandler { +//! Ok(MyHandler { //! wclnt: clnt.weak() -//! } -//! }); +//! }) +//! }).unwrap(); //! //! clnt.post(Post::ShoutIntoVoid).unwrap(); //! //! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else { //! panic!("Unexpected reply"); @@ -77,10 +77,11 @@ use async_trait::async_trait; use super::{channel, Client, MsgType, ReplyContext}; +/// Message processing trait for an async handler. #[async_trait] pub trait Handler { /// Optional initialization callback. /// /// This is called on the dispatcher task before the main message @@ -118,13 +119,14 @@ /// Launch a task that will process incoming messages from an ump-ng server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). +#[allow(clippy::type_complexity)] pub fn spawn( - hbldr: impl FnOnce(&Client) -> F -) -> (Client, JoinHandle>) + hbldr: impl FnOnce(&Client) -> Result +) -> Result<(Client, JoinHandle>), E> where P: 'static + Send, S: 'static + Send, R: 'static + Send, E: 'static + Send, @@ -131,32 +133,46 @@ RV: 'static + Send, F: Handler + Send + 'static { let (server, client) = channel(); - let mut handler = hbldr(&client); + let mut handler = hbldr(&client)?; + + #[cfg(feature = "watchdog")] + let wdog = crate::wdog::run(); let weak_client = client.weak(); let jh = task::spawn(async move { handler.init(weak_client); let ret = loop { match server.async_wait().await { - Ok(msg) => match msg { - MsgType::Put(m) => match handler.post(m).await { - ControlFlow::Continue(_) => {} - ControlFlow::Break(rv) => break Some(rv) - }, - MsgType::Request(m, rctx) => match handler.req(m, rctx).await { + Ok(msg) => { + #[cfg(feature = "watchdog")] + wdog.begin_process(); + + let res = match msg { + MsgType::Put(m) => handler.post(m).await, + MsgType::Request(m, rctx) => handler.req(m, rctx).await + }; + + #[cfg(feature = "watchdog")] + wdog.end_process(); + + match res { ControlFlow::Continue(_) => {} ControlFlow::Break(rv) => break Some(rv) } - }, + } Err(_) => break None } }; + + #[cfg(feature = "watchdog")] + let _ = wdog.kill(); + handler.term(ret) }); - (client, jh) + Ok((client, jh)) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/thread.rs ================================================================== --- src/thread.rs +++ src/thread.rs @@ -44,14 +44,14 @@ //! } //! //! let (clnt, jh) = spawn(|clnt| { //! // Store a weak client in the handler so it doesn't keep the dispatch //! // loop alive when the Client returned to the application is dropped. -//! MyHandler { +//! Ok(MyHandler { //! wclnt: clnt.weak() -//! } -//! }); +//! }) +//! }).unwrap(); //! //! clnt.post(Post::ShoutIntoVoid).unwrap(); //! //! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else { //! panic!("Unexpected reply"); @@ -69,10 +69,11 @@ thread::{self, JoinHandle} }; use super::{channel, Client, MsgType, ReplyContext}; +/// Message processing trait for a threaded handler. pub trait Handler { /// Optional initialization callback. /// /// This is called on the dispatcher thread before the main message /// processing loop is entered. @@ -105,13 +106,14 @@ /// Launch a thread that will process incoming messages from an ump-ng server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). +#[allow(clippy::type_complexity)] pub fn spawn( - hbldr: impl FnOnce(&Client) -> F -) -> (Client, JoinHandle>) + hbldr: impl FnOnce(&Client) -> Result +) -> Result<(Client, JoinHandle>), E> where P: 'static + Send, S: 'static + Send, R: 'static + Send, E: 'static + Send, @@ -118,32 +120,46 @@ RV: 'static + Send, F: Handler + Send + 'static { let (server, client) = channel(); - let mut handler = hbldr(&client); + let mut handler = hbldr(&client)?; + + #[cfg(feature = "watchdog")] + let wdog = crate::wdog::run(); let weak_client = client.weak(); let jh = thread::spawn(move || { handler.init(weak_client); let ret = loop { match server.wait() { - Ok(msg) => match msg { - MsgType::Put(m) => match handler.post(m) { - ControlFlow::Continue(_) => {} - ControlFlow::Break(rv) => break Some(rv) - }, - MsgType::Request(m, rctx) => match handler.req(m, rctx) { + Ok(msg) => { + #[cfg(feature = "watchdog")] + wdog.begin_process(); + + let res = match msg { + MsgType::Put(m) => handler.post(m), + MsgType::Request(m, rctx) => handler.req(m, rctx) + }; + + #[cfg(feature = "watchdog")] + wdog.end_process(); + + match res { ControlFlow::Continue(_) => {} ControlFlow::Break(rv) => break Some(rv) } - }, + } Err(_) => break None } }; + + #[cfg(feature = "watchdog")] + let _ = wdog.kill(); + handler.term(ret) }); - (client, jh) + Ok((client, jh)) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/wdog.rs Index: src/wdog.rs ================================================================== --- /dev/null +++ src/wdog.rs @@ -0,0 +1,120 @@ +use std::{ + sync::Arc, + thread::JoinHandle, + time::{Duration, Instant} +}; + +use parking_lot::{Condvar, Mutex}; + +/// Maximum amount of milliseconds allowed +const MAX_PROC_MILLIS: u64 = 200; + +enum State { + /// Waiting for a message to arrive. + Idle, + + /// A message has arrived and is being processed. + Processing { + start_time: Instant + }, + + /// Message processing has timed out. + Timeout, + + Term +} + +struct Inner { + state: State +} + +struct Shared { + inner: Mutex, + signal: Condvar +} + +pub(crate) fn run() -> WatchDog { + let inner = Inner { state: State::Idle }; + + let shared = Shared { + inner: Mutex::new(inner), + signal: Condvar::new() + }; + + let sh = Arc::new(shared); + let shared = Arc::clone(&sh); + let jh = std::thread::spawn(|| monitor_thread(shared)); + + WatchDog { sh, jh } +} + +pub(crate) struct WatchDog { + sh: Arc, + jh: JoinHandle<()> +} + +impl WatchDog { + pub(crate) fn begin_process(&self) { + let mut g = self.sh.inner.lock(); + g.state = State::Processing { + start_time: Instant::now() + }; + self.sh.signal.notify_one(); + } + + pub(crate) fn end_process(&self) { + let mut g = self.sh.inner.lock(); + g.state = State::Idle; + self.sh.signal.notify_one(); + } + + pub(crate) fn kill(self) -> std::thread::Result<()> { + let mut g = self.sh.inner.lock(); + g.state = State::Term; + self.sh.signal.notify_one(); + drop(g); + self.jh.join() + } +} + + +fn monitor_thread(sh: Arc) { + let mut g = sh.inner.lock(); + loop { + match g.state { + State::Idle => { + // Wait to be notified about a state change + sh.signal.wait(&mut g); + } + State::Processing { start_time } => { + let timeout = start_time + Duration::from_millis(MAX_PROC_MILLIS); + if sh.signal.wait_until(&mut g, timeout).timed_out() { + g.state = State::Timeout; + continue; + } + } + State::Timeout => { + #[cfg(feature = "tracing")] + tracing::warn!( + "Message processing held up the dispatcher more than {}ms", + MAX_PROC_MILLIS + ); + + #[cfg(not(feature = "tracing"))] + eprintln!( + "Warning: Message processing held up the dispatcher more than {}ms", + MAX_PROC_MILLIS + ); + + // Retutn to idle state + g.state = State::Idle; + continue; + } + State::Term => { + break; + } + } + } +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: tests/simple.rs ================================================================== --- tests/simple.rs +++ tests/simple.rs @@ -6,11 +6,12 @@ // Then Signal. // And finally check signalled state again, which should be true. #[test] fn no_clients() { let (clnt, jh) = - ump_ng_server::spawn_thread(|_clnt| ThreadedServer::default()); + ump_ng_server::spawn_thread(|_clnt| Ok(ThreadedServer::default())) + .unwrap(); let reply = clnt.req(Request::GetSignalState).unwrap(); assert_eq!(reply, Reply::SignalState(false)); clnt.post(Put::Signal).unwrap(); Index: tests/term.rs ================================================================== --- tests/term.rs +++ tests/term.rs @@ -5,11 +5,12 @@ use common::{Put, Reply, Request, ThreadedServer}; // Terminate the dispatcher loop by dropping the only client. #[test] fn no_clients() { - let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default()); + let (clnt, jh) = + ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap(); // Drop the (only) client, which should cause dispatch loop to terminate. drop(clnt); // Termination by clients disappearing should return None @@ -19,11 +20,12 @@ // Terminate the dispatcher loop by explicitly requesting it to terminate from // its handler. #[test] fn handler_req_term() { - let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default()); + let (clnt, jh) = + ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap(); assert_eq!(clnt.req(Request::Add(2, 4)).unwrap(), Reply::Sum(6)); assert_eq!(clnt.req(Request::Croak).unwrap(), Reply::OkIWillCroak); let rv = jh.join().unwrap(); @@ -32,11 +34,12 @@ } #[test] fn handler_put_term() { - let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default()); + let (clnt, jh) = + ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap(); clnt.post(Put::Signal).unwrap(); clnt.post(Put::Croak).unwrap(); let rv = jh.join().unwrap(); @@ -46,11 +49,12 @@ // Populate the queue with a bunch of nodes, then drop the client while there // are still nodes in the queue #[test] fn handler_delay_term() { - let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default()); + let (clnt, jh) = + ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap(); // Push a bunch of sleep nodes on the queue for _ in 0..10 { clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap(); } @@ -66,11 +70,12 @@ // Make sure all the sleepers are processed. // The explicit termination request should take precedence over terminating // over all clients disappearing. #[test] fn handler_delay_term_count() { - let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default()); + let (clnt, jh) = + ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap(); for _ in 0..10 { clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap(); } clnt.post(Put::CroakSleepCount).unwrap(); Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -1,17 +1,34 @@ # Change Log ## [Unreleased] -[Details](/vdiff?from=ump-ng-server-0.2.0&to=trunk) +[Details](/vdiff?from=ump-ng-server-0.2.1&to=trunk) ### Added ### Changed ### Removed +--- + +## [0.2.1] - 2024-02-20 + +[Details](/vdiff?from=ump-ng-server-0.2.0&to=ump-ng-server-0.2.1) + +### Added + +- Add a watchdog that will warn if a msgproc takes more than 200ms to return. + This is intended for dev builds only and must be enabled using the + `watchdog` feature. Will output to stderr by default, but will output to + `tracing::warn!()` if the `tracing` feature is enabled as well. + +### Changed + +- Make `Handler` builder closure fallible. + --- ## [0.2.0] - 2024-01-28 [Details](/vdiff?from=ump-ng-server-0.1.2&to=ump-ng-server-0.2.0)