Index: .efiles
==================================================================
--- .efiles
+++ .efiles
@@ -3,8 +3,9 @@
www/index.md
www/changelog.md
src/lib.rs
src/thread.rs
src/task.rs
+src/wdog.rs
tests/common/mod.rs
tests/simple.rs
tests/term.rs
Index: Cargo.toml
==================================================================
--- Cargo.toml
+++ Cargo.toml
@@ -1,8 +1,8 @@
[package]
name = "ump-ng-server"
-version = "0.2.0"
+version = "0.2.1"
edition = "2021"
license = "0BSD"
categories = [ "concurrency", "asynchronous" ]
keywords = [ "channel", "threads", "sync", "message-passing" ]
repository = "https://repos.qrnch.tech/pub/ump-ng-server"
@@ -17,20 +17,24 @@
]
[features]
default = ["tokio"]
tokio = ["dep:tokio", "dep:async-trait"]
+tracing = ["dep:tracing"]
+watchdog = ["dep:parking_lot"]
[dependencies]
async-trait = { version = "0.1.77", optional = true }
+parking_lot = { version = "0.12.1", optional = true }
# "net" is added as a temporary workaround. Without it building the docs fail
# in tokio.
tokio = { version = "1.35.1", features = ["net", "rt"], optional = true }
+tracing = { version = "0.1.40", optional = true }
ump-ng = { version = "0.1.0" }
[dev-dependencies]
tokio-test = { version = "0.4.3" }
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"]
Index: src/lib.rs
==================================================================
--- src/lib.rs
+++ src/lib.rs
@@ -47,10 +47,13 @@
//! instead (as storing a cloned [`Client`] object will preventing the dispatch
//! loop from terminating due to all clients being lost). The examples in the
//! [`task`] and [`thread`] modules illustrate how to do this.
#![cfg_attr(docsrs, feature(doc_cfg))]
+
+#[cfg(feature = "watchdog")]
+mod wdog;
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub mod task;
Index: src/task.rs
==================================================================
--- src/task.rs
+++ src/task.rs
@@ -50,14 +50,14 @@
//! }
//!
//! let (clnt, jh) = spawn(|clnt| {
//! // Store a weak client in the handler so it doesn't keep the dispatch
//! // loop alive when the Client returned to the application is dropped.
-//! MyHandler {
+//! Ok(MyHandler {
//! wclnt: clnt.weak()
-//! }
-//! });
+//! })
+//! }).unwrap();
//!
//! clnt.post(Post::ShoutIntoVoid).unwrap();
//!
//! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else {
//! panic!("Unexpected reply");
@@ -77,10 +77,11 @@
use async_trait::async_trait;
use super::{channel, Client, MsgType, ReplyContext};
+/// Message processing trait for an async handler.
#[async_trait]
pub trait Handler
{
/// Optional initialization callback.
///
/// This is called on the dispatcher task before the main message
@@ -118,13 +119,14 @@
/// Launch a task that will process incoming messages from an ump-ng server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
+#[allow(clippy::type_complexity)]
pub fn spawn
(
- hbldr: impl FnOnce(&Client
) -> F
-) -> (Client
, JoinHandle>)
+ hbldr: impl FnOnce(&Client) -> Result
+) -> Result<(Client, JoinHandle>), E>
where
P: 'static + Send,
S: 'static + Send,
R: 'static + Send,
E: 'static + Send,
@@ -131,32 +133,46 @@
RV: 'static + Send,
F: Handler + Send + 'static
{
let (server, client) = channel();
- let mut handler = hbldr(&client);
+ let mut handler = hbldr(&client)?;
+
+ #[cfg(feature = "watchdog")]
+ let wdog = crate::wdog::run();
let weak_client = client.weak();
let jh = task::spawn(async move {
handler.init(weak_client);
let ret = loop {
match server.async_wait().await {
- Ok(msg) => match msg {
- MsgType::Put(m) => match handler.post(m).await {
- ControlFlow::Continue(_) => {}
- ControlFlow::Break(rv) => break Some(rv)
- },
- MsgType::Request(m, rctx) => match handler.req(m, rctx).await {
+ Ok(msg) => {
+ #[cfg(feature = "watchdog")]
+ wdog.begin_process();
+
+ let res = match msg {
+ MsgType::Put(m) => handler.post(m).await,
+ MsgType::Request(m, rctx) => handler.req(m, rctx).await
+ };
+
+ #[cfg(feature = "watchdog")]
+ wdog.end_process();
+
+ match res {
ControlFlow::Continue(_) => {}
ControlFlow::Break(rv) => break Some(rv)
}
- },
+ }
Err(_) => break None
}
};
+
+ #[cfg(feature = "watchdog")]
+ let _ = wdog.kill();
+
handler.term(ret)
});
- (client, jh)
+ Ok((client, jh))
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
Index: src/thread.rs
==================================================================
--- src/thread.rs
+++ src/thread.rs
@@ -44,14 +44,14 @@
//! }
//!
//! let (clnt, jh) = spawn(|clnt| {
//! // Store a weak client in the handler so it doesn't keep the dispatch
//! // loop alive when the Client returned to the application is dropped.
-//! MyHandler {
+//! Ok(MyHandler {
//! wclnt: clnt.weak()
-//! }
-//! });
+//! })
+//! }).unwrap();
//!
//! clnt.post(Post::ShoutIntoVoid).unwrap();
//!
//! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else {
//! panic!("Unexpected reply");
@@ -69,10 +69,11 @@
thread::{self, JoinHandle}
};
use super::{channel, Client, MsgType, ReplyContext};
+/// Message processing trait for a threaded handler.
pub trait Handler
{
/// Optional initialization callback.
///
/// This is called on the dispatcher thread before the main message
/// processing loop is entered.
@@ -105,13 +106,14 @@
/// Launch a thread that will process incoming messages from an ump-ng server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
+#[allow(clippy::type_complexity)]
pub fn spawn
(
- hbldr: impl FnOnce(&Client
) -> F
-) -> (Client
, JoinHandle>)
+ hbldr: impl FnOnce(&Client) -> Result
+) -> Result<(Client, JoinHandle>), E>
where
P: 'static + Send,
S: 'static + Send,
R: 'static + Send,
E: 'static + Send,
@@ -118,32 +120,46 @@
RV: 'static + Send,
F: Handler + Send + 'static
{
let (server, client) = channel();
- let mut handler = hbldr(&client);
+ let mut handler = hbldr(&client)?;
+
+ #[cfg(feature = "watchdog")]
+ let wdog = crate::wdog::run();
let weak_client = client.weak();
let jh = thread::spawn(move || {
handler.init(weak_client);
let ret = loop {
match server.wait() {
- Ok(msg) => match msg {
- MsgType::Put(m) => match handler.post(m) {
- ControlFlow::Continue(_) => {}
- ControlFlow::Break(rv) => break Some(rv)
- },
- MsgType::Request(m, rctx) => match handler.req(m, rctx) {
+ Ok(msg) => {
+ #[cfg(feature = "watchdog")]
+ wdog.begin_process();
+
+ let res = match msg {
+ MsgType::Put(m) => handler.post(m),
+ MsgType::Request(m, rctx) => handler.req(m, rctx)
+ };
+
+ #[cfg(feature = "watchdog")]
+ wdog.end_process();
+
+ match res {
ControlFlow::Continue(_) => {}
ControlFlow::Break(rv) => break Some(rv)
}
- },
+ }
Err(_) => break None
}
};
+
+ #[cfg(feature = "watchdog")]
+ let _ = wdog.kill();
+
handler.term(ret)
});
- (client, jh)
+ Ok((client, jh))
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
ADDED src/wdog.rs
Index: src/wdog.rs
==================================================================
--- /dev/null
+++ src/wdog.rs
@@ -0,0 +1,120 @@
+use std::{
+ sync::Arc,
+ thread::JoinHandle,
+ time::{Duration, Instant}
+};
+
+use parking_lot::{Condvar, Mutex};
+
+/// Maximum amount of milliseconds allowed
+const MAX_PROC_MILLIS: u64 = 200;
+
+enum State {
+ /// Waiting for a message to arrive.
+ Idle,
+
+ /// A message has arrived and is being processed.
+ Processing {
+ start_time: Instant
+ },
+
+ /// Message processing has timed out.
+ Timeout,
+
+ Term
+}
+
+struct Inner {
+ state: State
+}
+
+struct Shared {
+ inner: Mutex,
+ signal: Condvar
+}
+
+pub(crate) fn run() -> WatchDog {
+ let inner = Inner { state: State::Idle };
+
+ let shared = Shared {
+ inner: Mutex::new(inner),
+ signal: Condvar::new()
+ };
+
+ let sh = Arc::new(shared);
+ let shared = Arc::clone(&sh);
+ let jh = std::thread::spawn(|| monitor_thread(shared));
+
+ WatchDog { sh, jh }
+}
+
+pub(crate) struct WatchDog {
+ sh: Arc,
+ jh: JoinHandle<()>
+}
+
+impl WatchDog {
+ pub(crate) fn begin_process(&self) {
+ let mut g = self.sh.inner.lock();
+ g.state = State::Processing {
+ start_time: Instant::now()
+ };
+ self.sh.signal.notify_one();
+ }
+
+ pub(crate) fn end_process(&self) {
+ let mut g = self.sh.inner.lock();
+ g.state = State::Idle;
+ self.sh.signal.notify_one();
+ }
+
+ pub(crate) fn kill(self) -> std::thread::Result<()> {
+ let mut g = self.sh.inner.lock();
+ g.state = State::Term;
+ self.sh.signal.notify_one();
+ drop(g);
+ self.jh.join()
+ }
+}
+
+
+fn monitor_thread(sh: Arc) {
+ let mut g = sh.inner.lock();
+ loop {
+ match g.state {
+ State::Idle => {
+ // Wait to be notified about a state change
+ sh.signal.wait(&mut g);
+ }
+ State::Processing { start_time } => {
+ let timeout = start_time + Duration::from_millis(MAX_PROC_MILLIS);
+ if sh.signal.wait_until(&mut g, timeout).timed_out() {
+ g.state = State::Timeout;
+ continue;
+ }
+ }
+ State::Timeout => {
+ #[cfg(feature = "tracing")]
+ tracing::warn!(
+ "Message processing held up the dispatcher more than {}ms",
+ MAX_PROC_MILLIS
+ );
+
+ #[cfg(not(feature = "tracing"))]
+ eprintln!(
+ "Warning: Message processing held up the dispatcher more than {}ms",
+ MAX_PROC_MILLIS
+ );
+
+ // Retutn to idle state
+ g.state = State::Idle;
+ continue;
+ }
+ State::Term => {
+ break;
+ }
+ }
+ }
+}
+
+// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
Index: tests/simple.rs
==================================================================
--- tests/simple.rs
+++ tests/simple.rs
@@ -6,11 +6,12 @@
// Then Signal.
// And finally check signalled state again, which should be true.
#[test]
fn no_clients() {
let (clnt, jh) =
- ump_ng_server::spawn_thread(|_clnt| ThreadedServer::default());
+ ump_ng_server::spawn_thread(|_clnt| Ok(ThreadedServer::default()))
+ .unwrap();
let reply = clnt.req(Request::GetSignalState).unwrap();
assert_eq!(reply, Reply::SignalState(false));
clnt.post(Put::Signal).unwrap();
Index: tests/term.rs
==================================================================
--- tests/term.rs
+++ tests/term.rs
@@ -5,11 +5,12 @@
use common::{Put, Reply, Request, ThreadedServer};
// Terminate the dispatcher loop by dropping the only client.
#[test]
fn no_clients() {
- let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());
+ let (clnt, jh) =
+ ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap();
// Drop the (only) client, which should cause dispatch loop to terminate.
drop(clnt);
// Termination by clients disappearing should return None
@@ -19,11 +20,12 @@
// Terminate the dispatcher loop by explicitly requesting it to terminate from
// its handler.
#[test]
fn handler_req_term() {
- let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());
+ let (clnt, jh) =
+ ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap();
assert_eq!(clnt.req(Request::Add(2, 4)).unwrap(), Reply::Sum(6));
assert_eq!(clnt.req(Request::Croak).unwrap(), Reply::OkIWillCroak);
let rv = jh.join().unwrap();
@@ -32,11 +34,12 @@
}
#[test]
fn handler_put_term() {
- let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());
+ let (clnt, jh) =
+ ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap();
clnt.post(Put::Signal).unwrap();
clnt.post(Put::Croak).unwrap();
let rv = jh.join().unwrap();
@@ -46,11 +49,12 @@
// Populate the queue with a bunch of nodes, then drop the client while there
// are still nodes in the queue
#[test]
fn handler_delay_term() {
- let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());
+ let (clnt, jh) =
+ ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap();
// Push a bunch of sleep nodes on the queue
for _ in 0..10 {
clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap();
}
@@ -66,11 +70,12 @@
// Make sure all the sleepers are processed.
// The explicit termination request should take precedence over terminating
// over all clients disappearing.
#[test]
fn handler_delay_term_count() {
- let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());
+ let (clnt, jh) =
+ ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap();
for _ in 0..10 {
clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap();
}
clnt.post(Put::CroakSleepCount).unwrap();
Index: www/changelog.md
==================================================================
--- www/changelog.md
+++ www/changelog.md
@@ -1,17 +1,34 @@
# Change Log
## [Unreleased]
-[Details](/vdiff?from=ump-ng-server-0.2.0&to=trunk)
+[Details](/vdiff?from=ump-ng-server-0.2.1&to=trunk)
### Added
### Changed
### Removed
+---
+
+## [0.2.1] - 2024-02-20
+
+[Details](/vdiff?from=ump-ng-server-0.2.0&to=ump-ng-server-0.2.1)
+
+### Added
+
+- Add a watchdog that will warn if a msgproc takes more than 200ms to return.
+ This is intended for dev builds only and must be enabled using the
+ `watchdog` feature. Will output to stderr by default, but will output to
+ `tracing::warn!()` if the `tracing` feature is enabled as well.
+
+### Changed
+
+- Make `Handler` builder closure fallible.
+
---
## [0.2.0] - 2024-01-28
[Details](/vdiff?from=ump-ng-server-0.1.2&to=ump-ng-server-0.2.0)