Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From ump-ng-server-0.1.2 To ump-ng-server-0.2.0
|
2024-01-28
| ||
| 17:35 | Make handler constructor closure fallible. check-in: 1e85d183b2 user: jan tags: trunk | |
| 15:19 | Release maintenance. check-in: d37659a48c user: jan tags: trunk, ump-ng-server-0.2.0 | |
| 15:09 | Use closure to construct Handler. Remove the spawn_preinit() functions, since using a closure to construct the Handler makes it possible to accomplish the same thing, but less ugly. check-in: d485928620 user: jan tags: trunk | |
|
2024-01-21
| ||
| 18:37 | Release maintenance. check-in: 5f420344bf user: jan tags: trunk, ump-ng-server-0.1.2 | |
| 18:31 | Add preinit variants of both thread and task spawners. check-in: e3a0f27d4a user: jan tags: trunk | |
Changes to Cargo.toml.
1 2 | [package] name = "ump-ng-server" | | | 1 2 3 4 5 6 7 8 9 10 | [package] name = "ump-ng-server" version = "0.2.0" edition = "2021" license = "0BSD" categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/ump-ng-server" description = "Server message dispatch loop for ump-ng." rust-version = "1.56" |
| ︙ | ︙ | |||
23 24 25 26 27 28 29 30 31 32 33 |
[dependencies]
async-trait = { version = "0.1.77", optional = true }
# "net" is added as a temporary workaround. Without it building the docs fail
# in tokio.
tokio = { version = "1.35.1", features = ["net", "rt"], optional = true }
ump-ng = { version = "0.1.0" }
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"]
| > > > | 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
[dependencies]
async-trait = { version = "0.1.77", optional = true }
# "net" is added as a temporary workaround. Without it building the docs fail
# in tokio.
tokio = { version = "1.35.1", features = ["net", "rt"], optional = true }
ump-ng = { version = "0.1.0" }
[dev-dependencies]
tokio-test = { version = "0.4.3" }
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"]
|
Changes to src/lib.rs.
|
| | | > | 1 2 3 4 5 6 7 8 9 10 | //! _ump-ng-server_ is an abstraction on top of [`ump-ng`](ump_ng) that is used //! to hide boilerplate code used to implement intra-process message passing //! servers. //! //! # Dispatch loop //! The core functionality of _ump-ng-server_ is a dispatch loop, whose role it //! is to pull messages off the message queue and pass them to the //! application-supplied message handler. //! //! There are two different ways to run the dispatcher loop: On a non-async |
| ︙ | ︙ | |||
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
//! - The message queue is empty and all the associated [`Client`]s have been
//! released. This would cause the thread to return `None`.
//!
//! # Application message handlers
//! Message handlers are implemented using the [`thread::Handler`] trait (for
//! the threaded dispatch loop) and [`task::Handler`] (for the async dispatch
//! loop).
#![cfg_attr(docsrs, feature(doc_cfg))]
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub mod task;
pub mod thread;
pub use ump_ng::{
self, channel, Client, MsgType, ReplyContext, Server, WeakClient
};
| > > > > > > > > > > > > > | < < < | < < < | 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
//! - The message queue is empty and all the associated [`Client`]s have been
//! released. This would cause the thread to return `None`.
//!
//! # Application message handlers
//! Message handlers are implemented using the [`thread::Handler`] trait (for
//! the threaded dispatch loop) and [`task::Handler`] (for the async dispatch
//! loop).
//!
//! There are cases where the handler needs to store a clone of the client
//! end-point of the message passing channel used to issue requests to the
//! server (so that message handlers can issue new requests). In order to
//! facilitate this, the application must pass a `Handler`-construction closure
//! to `spawn()`. The closure will be called after the message passing channel
//! has been created so it can be passed a reference to the client end-point.
//!
//! If the dispatch loop should terminate once all the application's client
//! end-points have been dropped, then the handler can store a [`WeakClient`]
//! instead (as storing a cloned [`Client`] object will preventing the dispatch
//! loop from terminating due to all clients being lost). The examples in the
//! [`task`] and [`thread`] modules illustrate how to do this.
#![cfg_attr(docsrs, feature(doc_cfg))]
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub mod task;
pub mod thread;
pub use ump_ng::{
self, channel, Client, MsgType, ReplyContext, Server, WeakClient
};
pub use thread::{spawn as spawn_thread, Handler as ThreadedHandler};
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub use task::{spawn as spawn_task, Handler as AsyncHandler};
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub use async_trait::async_trait;
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
|
Changes to src/task.rs.
1 2 3 4 5 6 |
use std::ops::ControlFlow;
use tokio::task::{self, JoinHandle};
use async_trait::async_trait;
| > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
//! ump-ng dispatch server running on a thread.
//!
//! # Example
//! ```
//! # tokio_test::block_on(async {
//! use std::ops::ControlFlow;
//! use ump_ng_server::{
//! async_trait,
//! task::{Handler, spawn},
//! ump_ng::{ReplyContext, WeakClient}
//! };
//!
//! enum Post {
//! ShoutIntoVoid
//! }
//! enum Request {
//! Add(usize, usize)
//! }
//! enum Reply {
//! Sum(usize)
//! }
//! #[derive(Debug)]
//! enum MyError { }
//!
//! struct MyHandler {
//! wclnt: WeakClient<Post, Request, Reply, MyError>
//! }
//! #[async_trait]
//! impl Handler<Post, Request, Reply, MyError, ()> for MyHandler {
//! async fn post(&mut self, msg: Post) -> ControlFlow<(), ()> {
//! match msg {
//! Post::ShoutIntoVoid => {
//! // No reply .. but keep on trudging on
//! ControlFlow::Continue(())
//! }
//! }
//! }
//! async fn req(
//! &mut self,
//! msg: Request,
//! rctx: ReplyContext<Reply, MyError>
//! ) -> ControlFlow<(), ()> {
//! match msg {
//! Request::Add(a, b) => {
//! rctx.reply(Reply::Sum(a+b)).unwrap();
//! ControlFlow::Continue(())
//! }
//! }
//! }
//! }
//!
//! let (clnt, jh) = spawn(|clnt| {
//! // Store a weak client in the handler so it doesn't keep the dispatch
//! // loop alive when the Client returned to the application is dropped.
//! MyHandler {
//! wclnt: clnt.weak()
//! }
//! });
//!
//! clnt.post(Post::ShoutIntoVoid).unwrap();
//!
//! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else {
//! panic!("Unexpected reply");
//! };
//! assert_eq!(sum, 10);
//!
//! // drop client to force dispatch loop to terminate
//! drop(clnt);
//!
//! jh.await;
//! # });
//! ```
use std::ops::ControlFlow;
use tokio::task::{self, JoinHandle};
use async_trait::async_trait;
use super::{channel, Client, MsgType, ReplyContext};
#[async_trait]
pub trait Handler<P, S, R, E, RV> {
/// Optional initialization callback.
///
/// This is called on the dispatcher task before the main message
/// processing loop is entered.
|
| ︙ | ︙ | |||
43 44 45 46 47 48 49 | } /// Launch a task that will process incoming messages from an ump-ng server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). | | | | > > > > | 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
}
/// Launch a task that will process incoming messages from an ump-ng server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
pub fn spawn<P, S, R, E, RV, F>(
hbldr: impl FnOnce(&Client<P, S, R, E>) -> F
) -> (Client<P, S, R, E>, JoinHandle<Option<RV>>)
where
P: 'static + Send,
S: 'static + Send,
R: 'static + Send,
E: 'static + Send,
RV: 'static + Send,
F: Handler<P, S, R, E, RV> + Send + 'static
{
let (server, client) = channel();
let mut handler = hbldr(&client);
let weak_client = client.weak();
let jh = task::spawn(async move {
handler.init(weak_client);
let ret = loop {
match server.async_wait().await {
Ok(msg) => match msg {
MsgType::Put(m) => match handler.post(m).await {
|
| ︙ | ︙ | |||
78 79 80 81 82 83 84 |
};
handler.term(ret)
});
(client, jh)
}
| < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < | 155 156 157 158 159 160 161 162 |
};
handler.term(ret)
});
(client, jh)
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
|
Changes to src/thread.rs.
1 2 3 4 5 |
use std::{
ops::ControlFlow,
thread::{self, JoinHandle}
};
| > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
//! ump-ng dispatch server running on a thread.
//!
//! # Example
//! ```
//! use std::ops::ControlFlow;
//! use ump_ng_server::{
//! thread::{Handler, spawn},
//! ump_ng::{ReplyContext, WeakClient}
//! };
//!
//! enum Post {
//! ShoutIntoVoid
//! }
//! enum Request {
//! Add(usize, usize)
//! }
//! enum Reply {
//! Sum(usize)
//! }
//! #[derive(Debug)]
//! enum MyError { }
//!
//! struct MyHandler {
//! wclnt: WeakClient<Post, Request, Reply, MyError>
//! }
//! impl Handler<Post, Request, Reply, MyError, ()> for MyHandler {
//! fn post(&mut self, msg: Post) -> ControlFlow<(), ()> {
//! match msg {
//! Post::ShoutIntoVoid => {
//! // No reply .. but keep on trudging on
//! ControlFlow::Continue(())
//! }
//! }
//! }
//! fn req(&mut self, msg: Request, rctx: ReplyContext<Reply, MyError>)
//! -> ControlFlow<(), ()> {
//! match msg {
//! Request::Add(a, b) => {
//! rctx.reply(Reply::Sum(a+b)).unwrap();
//! ControlFlow::Continue(())
//! }
//! }
//! }
//! }
//!
//! let (clnt, jh) = spawn(|clnt| {
//! // Store a weak client in the handler so it doesn't keep the dispatch
//! // loop alive when the Client returned to the application is dropped.
//! MyHandler {
//! wclnt: clnt.weak()
//! }
//! });
//!
//! clnt.post(Post::ShoutIntoVoid).unwrap();
//!
//! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else {
//! panic!("Unexpected reply");
//! };
//! assert_eq!(sum, 10);
//!
//! // drop client to force dispatch loop to terminate
//! drop(clnt);
//!
//! jh.join();
//! ```
use std::{
ops::ControlFlow,
thread::{self, JoinHandle}
};
use super::{channel, Client, MsgType, ReplyContext};
pub trait Handler<P, S, R, E, RV> {
/// Optional initialization callback.
///
/// This is called on the dispatcher thread before the main message
/// processing loop is entered.
#[allow(unused_variables)]
|
| ︙ | ︙ | |||
37 38 39 40 41 42 43 | } /// Launch a thread that will process incoming messages from an ump-ng server /// end-point. /// /// See top module's documentation for an overview of the [dispatch /// loop](crate#dispatch-loop). | | | | > > > > | 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
}
/// Launch a thread that will process incoming messages from an ump-ng server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
pub fn spawn<P, S, R, E, RV, F>(
hbldr: impl FnOnce(&Client<P, S, R, E>) -> F
) -> (Client<P, S, R, E>, JoinHandle<Option<RV>>)
where
P: 'static + Send,
S: 'static + Send,
R: 'static + Send,
E: 'static + Send,
RV: 'static + Send,
F: Handler<P, S, R, E, RV> + Send + 'static
{
let (server, client) = channel();
let mut handler = hbldr(&client);
let weak_client = client.weak();
let jh = thread::spawn(move || {
handler.init(weak_client);
let ret = loop {
match server.wait() {
Ok(msg) => match msg {
MsgType::Put(m) => match handler.post(m) {
|
| ︙ | ︙ | |||
72 73 74 75 76 77 78 |
};
handler.term(ret)
});
(client, jh)
}
| < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < | 142 143 144 145 146 147 148 149 |
};
handler.term(ret)
});
(client, jh)
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
|
Changes to tests/simple.rs.
1 2 3 4 5 6 7 8 9 |
mod common;
use common::{Put, Reply, Request, ThreadedServer};
// Check the signalled state, which should be false.
// Then Signal.
// And finally check signalled state again, which should be true.
#[test]
fn no_clients() {
| < | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
mod common;
use common::{Put, Reply, Request, ThreadedServer};
// Check the signalled state, which should be false.
// Then Signal.
// And finally check signalled state again, which should be true.
#[test]
fn no_clients() {
let (clnt, jh) =
ump_ng_server::spawn_thread(|_clnt| ThreadedServer::default());
let reply = clnt.req(Request::GetSignalState).unwrap();
assert_eq!(reply, Reply::SignalState(false));
clnt.post(Put::Signal).unwrap();
let reply = clnt.req(Request::GetSignalState).unwrap();
|
| ︙ | ︙ |
Changes to tests/term.rs.
1 2 3 4 5 6 7 8 9 |
mod common;
use std::time::Duration;
use common::{Put, Reply, Request, ThreadedServer};
// Terminate the dispatcher loop by dropping the only client.
#[test]
fn no_clients() {
| < < | < < | < < | < < | < < | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
mod common;
use std::time::Duration;
use common::{Put, Reply, Request, ThreadedServer};
// Terminate the dispatcher loop by dropping the only client.
#[test]
fn no_clients() {
let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());
// Drop the (only) client, which should cause dispatch loop to terminate.
drop(clnt);
// Termination by clients disappearing should return None
assert_eq!(jh.join().unwrap(), None);
}
// Terminate the dispatcher loop by explicitly requesting it to terminate from
// its handler.
#[test]
fn handler_req_term() {
let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());
assert_eq!(clnt.req(Request::Add(2, 4)).unwrap(), Reply::Sum(6));
assert_eq!(clnt.req(Request::Croak).unwrap(), Reply::OkIWillCroak);
let rv = jh.join().unwrap();
assert_eq!(rv, Some(42));
}
#[test]
fn handler_put_term() {
let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());
clnt.post(Put::Signal).unwrap();
clnt.post(Put::Croak).unwrap();
let rv = jh.join().unwrap();
assert_eq!(rv, Some(42));
}
// Populate the queue with a bunch of nodes, then drop the client while there
// are still nodes in the queue
#[test]
fn handler_delay_term() {
let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());
// Push a bunch of sleep nodes on the queue
for _ in 0..10 {
clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap();
}
// Drop the client
drop(clnt);
let rv = jh.join().unwrap();
assert_eq!(rv, None);
}
// Make sure all the sleepers are processed.
// The explicit termination request should take precedence over terminating
// over all clients disappearing.
#[test]
fn handler_delay_term_count() {
let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());
for _ in 0..10 {
clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap();
}
clnt.post(Put::CroakSleepCount).unwrap();
drop(clnt);
|
| ︙ | ︙ |
Changes to www/changelog.md.
1 2 3 4 | # Change Log ## [Unreleased] | | > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# Change Log
## [Unreleased]
[Details](/vdiff?from=ump-ng-server-0.2.0&to=trunk)
### Added
### Changed
### Removed
---
## [0.2.0] - 2024-01-28
[Details](/vdiff?from=ump-ng-server-0.1.2&to=ump-ng-server-0.2.0)
### Changed
- Instead of taking in an `impl Handler` into the `{thread,task}::spawn()`
function, take in a closure that returns the handler. A reference to the
handler channel's client endpoint is passed to the closure, which makes it
possible to store `Client`/`WeakClient` in the handler, without involving an
`Option` (or similar).
### Removed
- Removed the `{thread,task}::spawn_preinit()` functions. These no longer
really serve a purpose since the deferred handler construction was
introduced.
---
## [0.1.2] - 2024-01-21
[Details](/vdiff?from=ump-ng-server-0.1.1&to=ump-ng-server-0.1.2)
### Added
|
| ︙ | ︙ |