Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From ump-ng-server-0.2.0 To ump-ng-server-0.2.1
2024-02-20
| ||
16:13 | (Re)release maintenance. check-in: 8ce88d28b5 user: jan tags: trunk, ump-ng-server-0.3.0 | |
11:56 | Happy clippy. check-in: 959380d872 user: jan tags: trunk, ump-ng-server-0.2.1 | |
11:54 | Release maintenance. check-in: 0ebc081337 user: jan tags: trunk | |
2024-01-28
| ||
17:35 | Make handler constructor closure fallible. check-in: 1e85d183b2 user: jan tags: trunk | |
15:19 | Release maintenance. check-in: d37659a48c user: jan tags: trunk, ump-ng-server-0.2.0 | |
15:09 | Use closure to construct Handler. Remove the spawn_preinit() functions, since using a closure to construct the Handler makes it possible to accomplish the same thing, but less ugly. check-in: d485928620 user: jan tags: trunk | |
Changes to .efiles.
1 2 3 4 5 6 7 8 9 10 | Cargo.toml README.md www/index.md www/changelog.md src/lib.rs src/thread.rs src/task.rs tests/common/mod.rs tests/simple.rs tests/term.rs | > | 1 2 3 4 5 6 7 8 9 10 11 | Cargo.toml README.md www/index.md www/changelog.md src/lib.rs src/thread.rs src/task.rs src/wdog.rs tests/common/mod.rs tests/simple.rs tests/term.rs |
Changes to Cargo.toml.
1 2 | [package] name = "ump-ng-server" | | > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | [package] name = "ump-ng-server" version = "0.2.1" edition = "2021" license = "0BSD" categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/ump-ng-server" description = "Server message dispatch loop for ump-ng." rust-version = "1.56" exclude = [ ".fossil-settings", ".efiles", ".fslckout", "www", "rustfmt.toml" ] [features] default = ["tokio"] tokio = ["dep:tokio", "dep:async-trait"] tracing = ["dep:tracing"] watchdog = ["dep:parking_lot"] [dependencies] async-trait = { version = "0.1.77", optional = true } parking_lot = { version = "0.12.1", optional = true } # "net" is added as a temporary workaround. Without it building the docs fail # in tokio. tokio = { version = "1.35.1", features = ["net", "rt"], optional = true } tracing = { version = "0.1.40", optional = true } ump-ng = { version = "0.1.0" } [dev-dependencies] tokio-test = { version = "0.4.3" } [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"] |
Changes to src/lib.rs.
︙ | ︙ | |||
45 46 47 48 49 50 51 52 53 54 55 56 57 58 | //! If the dispatch loop should terminate once all the application's client //! end-points have been dropped, then the handler can store a [`WeakClient`] //! instead (as storing a cloned [`Client`] object will preventing the dispatch //! loop from terminating due to all clients being lost). The examples in the //! [`task`] and [`thread`] modules illustrate how to do this. #![cfg_attr(docsrs, feature(doc_cfg))] #[cfg(feature = "tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] pub mod task; pub mod thread; | > > > | 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | //! If the dispatch loop should terminate once all the application's client //! end-points have been dropped, then the handler can store a [`WeakClient`] //! instead (as storing a cloned [`Client`] object will preventing the dispatch //! loop from terminating due to all clients being lost). The examples in the //! [`task`] and [`thread`] modules illustrate how to do this. #![cfg_attr(docsrs, feature(doc_cfg))] #[cfg(feature = "watchdog")] mod wdog; #[cfg(feature = "tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] pub mod task; pub mod thread; |
︙ | ︙ |
Changes to src/task.rs.
︙ | ︙ | |||
48 49 50 51 52 53 54 | //! } //! } //! } //! //! let (clnt, jh) = spawn(|clnt| { //! // Store a weak client in the handler so it doesn't keep the dispatch //! // loop alive when the Client returned to the application is dropped. | | | | | 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | //! } //! } //! } //! //! let (clnt, jh) = spawn(|clnt| { //! // Store a weak client in the handler so it doesn't keep the dispatch //! // loop alive when the Client returned to the application is dropped. //! Ok(MyHandler { //! wclnt: clnt.weak() //! }) //! }).unwrap(); //! //! clnt.post(Post::ShoutIntoVoid).unwrap(); //! //! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else { //! panic!("Unexpected reply"); //! }; //! assert_eq!(sum, 10); |
︙ | ︙ | |||
75 76 77 78 79 80 81 82 83 84 85 86 87 88 | use tokio::task::{self, JoinHandle}; use async_trait::async_trait; use super::{channel, Client, MsgType, ReplyContext}; #[async_trait] pub trait Handler<P, S, R, E, RV> { /// Optional initialization callback. /// /// This is called on the dispatcher task before the main message /// processing loop is entered. #[allow(unused_variables)] | > | 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | use tokio::task::{self, JoinHandle}; use async_trait::async_trait; use super::{channel, Client, MsgType, ReplyContext}; /// Message processing trait for an async handler. #[async_trait] pub trait Handler<P, S, R, E, RV> { /// Optional initialization callback. /// /// This is called on the dispatcher task before the main message /// processing loop is entered. #[allow(unused_variables)] |
︙ | ︙ | |||
116 117 118 119 120 121 122 123 | } /// Launch a task that will process incoming messages from an ump-ng server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). pub fn spawn<P, S, R, E, RV, F>( | > | | | > > > | > > > > | | < | | > > > > < > > > > > | | 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 | } /// Launch a task that will process incoming messages from an ump-ng server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). #[allow(clippy::type_complexity)] pub fn spawn<P, S, R, E, RV, F>( hbldr: impl FnOnce(&Client<P, S, R, E>) -> Result<F, E> ) -> Result<(Client<P, S, R, E>, JoinHandle<Option<RV>>), E> where P: 'static + Send, S: 'static + Send, R: 'static + Send, E: 'static + Send, RV: 'static + Send, F: Handler<P, S, R, E, RV> + Send + 'static { let (server, client) = channel(); let mut handler = hbldr(&client)?; #[cfg(feature = "watchdog")] let wdog = crate::wdog::run(); let weak_client = client.weak(); let jh = task::spawn(async move { handler.init(weak_client); let ret = loop { match server.async_wait().await { Ok(msg) => { #[cfg(feature = "watchdog")] wdog.begin_process(); let res = match msg { MsgType::Put(m) => handler.post(m).await, MsgType::Request(m, rctx) => handler.req(m, rctx).await }; #[cfg(feature = "watchdog")] wdog.end_process(); match res { ControlFlow::Continue(_) => {} ControlFlow::Break(rv) => break Some(rv) } } Err(_) => break None } }; #[cfg(feature = "watchdog")] let _ = wdog.kill(); handler.term(ret) }); Ok((client, jh)) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/thread.rs.
︙ | ︙ | |||
42 43 44 45 46 47 48 | //! } //! } //! } //! //! let (clnt, jh) = spawn(|clnt| { //! // Store a weak client in the handler so it doesn't keep the dispatch //! // loop alive when the Client returned to the application is dropped. | | | | | 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | //! } //! } //! } //! //! let (clnt, jh) = spawn(|clnt| { //! // Store a weak client in the handler so it doesn't keep the dispatch //! // loop alive when the Client returned to the application is dropped. //! Ok(MyHandler { //! wclnt: clnt.weak() //! }) //! }).unwrap(); //! //! clnt.post(Post::ShoutIntoVoid).unwrap(); //! //! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else { //! panic!("Unexpected reply"); //! }; //! assert_eq!(sum, 10); |
︙ | ︙ | |||
67 68 69 70 71 72 73 74 75 76 77 78 79 80 | use std::{ ops::ControlFlow, thread::{self, JoinHandle} }; use super::{channel, Client, MsgType, ReplyContext}; pub trait Handler<P, S, R, E, RV> { /// Optional initialization callback. /// /// This is called on the dispatcher thread before the main message /// processing loop is entered. #[allow(unused_variables)] fn init(&mut self, weak_client: ump_ng::WeakClient<P, S, R, E>) {} | > | 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | use std::{ ops::ControlFlow, thread::{self, JoinHandle} }; use super::{channel, Client, MsgType, ReplyContext}; /// Message processing trait for a threaded handler. pub trait Handler<P, S, R, E, RV> { /// Optional initialization callback. /// /// This is called on the dispatcher thread before the main message /// processing loop is entered. #[allow(unused_variables)] fn init(&mut self, weak_client: ump_ng::WeakClient<P, S, R, E>) {} |
︙ | ︙ | |||
103 104 105 106 107 108 109 110 | } /// Launch a thread that will process incoming messages from an ump-ng server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). pub fn spawn<P, S, R, E, RV, F>( | > | | | > > > | > > > > | | > | > > | | < > > > > > | | 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | } /// Launch a thread that will process incoming messages from an ump-ng server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). #[allow(clippy::type_complexity)] pub fn spawn<P, S, R, E, RV, F>( hbldr: impl FnOnce(&Client<P, S, R, E>) -> Result<F, E> ) -> Result<(Client<P, S, R, E>, JoinHandle<Option<RV>>), E> where P: 'static + Send, S: 'static + Send, R: 'static + Send, E: 'static + Send, RV: 'static + Send, F: Handler<P, S, R, E, RV> + Send + 'static { let (server, client) = channel(); let mut handler = hbldr(&client)?; #[cfg(feature = "watchdog")] let wdog = crate::wdog::run(); let weak_client = client.weak(); let jh = thread::spawn(move || { handler.init(weak_client); let ret = loop { match server.wait() { Ok(msg) => { #[cfg(feature = "watchdog")] wdog.begin_process(); let res = match msg { MsgType::Put(m) => handler.post(m), MsgType::Request(m, rctx) => handler.req(m, rctx) }; #[cfg(feature = "watchdog")] wdog.end_process(); match res { ControlFlow::Continue(_) => {} ControlFlow::Break(rv) => break Some(rv) } } Err(_) => break None } }; #[cfg(feature = "watchdog")] let _ = wdog.kill(); handler.term(ret) }); Ok((client, jh)) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Added src/wdog.rs.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | use std::{ sync::Arc, thread::JoinHandle, time::{Duration, Instant} }; use parking_lot::{Condvar, Mutex}; /// Maximum amount of milliseconds allowed const MAX_PROC_MILLIS: u64 = 200; enum State { /// Waiting for a message to arrive. Idle, /// A message has arrived and is being processed. Processing { start_time: Instant }, /// Message processing has timed out. Timeout, Term } struct Inner { state: State } struct Shared { inner: Mutex<Inner>, signal: Condvar } pub(crate) fn run() -> WatchDog { let inner = Inner { state: State::Idle }; let shared = Shared { inner: Mutex::new(inner), signal: Condvar::new() }; let sh = Arc::new(shared); let shared = Arc::clone(&sh); let jh = std::thread::spawn(|| monitor_thread(shared)); WatchDog { sh, jh } } pub(crate) struct WatchDog { sh: Arc<Shared>, jh: JoinHandle<()> } impl WatchDog { pub(crate) fn begin_process(&self) { let mut g = self.sh.inner.lock(); g.state = State::Processing { start_time: Instant::now() }; self.sh.signal.notify_one(); } pub(crate) fn end_process(&self) { let mut g = self.sh.inner.lock(); g.state = State::Idle; self.sh.signal.notify_one(); } pub(crate) fn kill(self) -> std::thread::Result<()> { let mut g = self.sh.inner.lock(); g.state = State::Term; self.sh.signal.notify_one(); drop(g); self.jh.join() } } fn monitor_thread(sh: Arc<Shared>) { let mut g = sh.inner.lock(); loop { match g.state { State::Idle => { // Wait to be notified about a state change sh.signal.wait(&mut g); } State::Processing { start_time } => { let timeout = start_time + Duration::from_millis(MAX_PROC_MILLIS); if sh.signal.wait_until(&mut g, timeout).timed_out() { g.state = State::Timeout; continue; } } State::Timeout => { #[cfg(feature = "tracing")] tracing::warn!( "Message processing held up the dispatcher more than {}ms", MAX_PROC_MILLIS ); #[cfg(not(feature = "tracing"))] eprintln!( "Warning: Message processing held up the dispatcher more than {}ms", MAX_PROC_MILLIS ); // Retutn to idle state g.state = State::Idle; continue; } State::Term => { break; } } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to tests/simple.rs.
1 2 3 4 5 6 7 8 9 10 | mod common; use common::{Put, Reply, Request, ThreadedServer}; // Check the signalled state, which should be false. // Then Signal. // And finally check signalled state again, which should be true. #[test] fn no_clients() { let (clnt, jh) = | | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | mod common; use common::{Put, Reply, Request, ThreadedServer}; // Check the signalled state, which should be false. // Then Signal. // And finally check signalled state again, which should be true. #[test] fn no_clients() { let (clnt, jh) = ump_ng_server::spawn_thread(|_clnt| Ok(ThreadedServer::default())) .unwrap(); let reply = clnt.req(Request::GetSignalState).unwrap(); assert_eq!(reply, Reply::SignalState(false)); clnt.post(Put::Signal).unwrap(); let reply = clnt.req(Request::GetSignalState).unwrap(); |
︙ | ︙ |
Changes to tests/term.rs.
1 2 3 4 5 6 7 8 9 | mod common; use std::time::Duration; use common::{Put, Reply, Request, ThreadedServer}; // Terminate the dispatcher loop by dropping the only client. #[test] fn no_clients() { | | > | > | > | > | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | mod common; use std::time::Duration; use common::{Put, Reply, Request, ThreadedServer}; // Terminate the dispatcher loop by dropping the only client. #[test] fn no_clients() { let (clnt, jh) = ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap(); // Drop the (only) client, which should cause dispatch loop to terminate. drop(clnt); // Termination by clients disappearing should return None assert_eq!(jh.join().unwrap(), None); } // Terminate the dispatcher loop by explicitly requesting it to terminate from // its handler. #[test] fn handler_req_term() { let (clnt, jh) = ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap(); assert_eq!(clnt.req(Request::Add(2, 4)).unwrap(), Reply::Sum(6)); assert_eq!(clnt.req(Request::Croak).unwrap(), Reply::OkIWillCroak); let rv = jh.join().unwrap(); assert_eq!(rv, Some(42)); } #[test] fn handler_put_term() { let (clnt, jh) = ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap(); clnt.post(Put::Signal).unwrap(); clnt.post(Put::Croak).unwrap(); let rv = jh.join().unwrap(); assert_eq!(rv, Some(42)); } // Populate the queue with a bunch of nodes, then drop the client while there // are still nodes in the queue #[test] fn handler_delay_term() { let (clnt, jh) = ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap(); // Push a bunch of sleep nodes on the queue for _ in 0..10 { clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap(); } // Drop the client drop(clnt); let rv = jh.join().unwrap(); assert_eq!(rv, None); } // Make sure all the sleepers are processed. // The explicit termination request should take precedence over terminating // over all clients disappearing. #[test] fn handler_delay_term_count() { let (clnt, jh) = ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap(); for _ in 0..10 { clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap(); } clnt.post(Put::CroakSleepCount).unwrap(); drop(clnt); |
︙ | ︙ |
Changes to www/changelog.md.
1 2 3 4 | # Change Log ## [Unreleased] | | > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | # Change Log ## [Unreleased] [Details](/vdiff?from=ump-ng-server-0.2.1&to=trunk) ### Added ### Changed ### Removed --- ## [0.2.1] - 2024-02-20 [Details](/vdiff?from=ump-ng-server-0.2.0&to=ump-ng-server-0.2.1) ### Added - Add a watchdog that will warn if a msgproc takes more than 200ms to return. This is intended for dev builds only and must be enabled using the `watchdog` feature. Will output to stderr by default, but will output to `tracing::warn!()` if the `tracing` feature is enabled as well. ### Changed - Make `Handler` builder closure fallible. --- ## [0.2.0] - 2024-01-28 [Details](/vdiff?from=ump-ng-server-0.1.2&to=ump-ng-server-0.2.0) ### Changed |
︙ | ︙ |