ump-server

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From ump-server-0.1.0 To ump-server-0.2.0

2024-01-28
14:28
Update doc examples to store a weak client reference within the handler. check-in: 3ea6f918b0 user: jan tags: trunk
13:39
Release maintenance. check-in: 88c41e1742 user: jan tags: trunk, ump-server-0.2.0
13:26
Make the spawm methods take in a closure for constructing the handler, to allow the handler to be created after the channel client. Add thread/task examples to module docs. check-in: 752b1bbc09 user: jan tags: trunk
2024-01-14
10:59
Dependency maintenance. check-in: 21afc9bad0 user: jan tags: trunk
2023-10-03
06:53
Move from prototyping repo. check-in: 8f789dc528 user: jan tags: trunk, ump-server-0.1.0
2023-10-02
13:52
initial empty check-in check-in: a0fa0b7764 user: jan tags: trunk

Changes to Cargo.toml.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24


25
26
27



28
29
30
31
[package]
name = "ump-server"
version = "0.1.0"
edition = "2021"
license = "0BSD"
categories = [ "concurrency", "asynchronous" ]
keywords = [ "channel", "threads", "sync", "message-passing" ]
repository = "https://repos.qrnch.tech/pub/ump-server"
description = "Server message dispatch loop for ump."
rust-version = "1.56"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "www",
  "rustfmt.toml"
]

[features]
default = ["tokio"]
tokio = ["dep:tokio", "dep:async-trait"]

[dependencies]
async-trait = { version = "0.1.73", optional = true }


tokio = { version = "1.32.0", features = ["rt"], optional = true }
ump = { version = "0.12.1" }




[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"]



|




















|
>
>
|


>
>
>




1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
[package]
name = "ump-server"
version = "0.2.0"
edition = "2021"
license = "0BSD"
categories = [ "concurrency", "asynchronous" ]
keywords = [ "channel", "threads", "sync", "message-passing" ]
repository = "https://repos.qrnch.tech/pub/ump-server"
description = "Server message dispatch loop for ump."
rust-version = "1.56"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "www",
  "rustfmt.toml"
]

[features]
default = ["tokio"]
tokio = ["dep:tokio", "dep:async-trait"]

[dependencies]
async-trait = { version = "0.1.77", optional = true }
# ToDo: Shouldn't need "net", but without it the docs will not build.
#       Once this is fixed in tokio, remove "net".
tokio = { version = "1.35.1", features = ["net", "rt"], optional = true }
ump = { version = "0.12.1" }

[dev-dependencies]
tokio-test = { version = "0.4.3" }

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"]

Changes to README.md.

1
2
3

4
# Server message dispatch loop for ump

The _ump-server_ crate is a server message dispatch abstraction for ump.




|
>

1
2
3
4
5
# Server message dispatch loop for ump

The _ump-server_ crate is a server message dispatch abstraction for
[ump](https://crates.io/crates/ump).

Changes to src/task.rs.

1
















































2
3
4
5
6
7
8
//! ump server running in an async task.

















































use std::ops::ControlFlow;

use tokio::task::{self, JoinHandle};

use async_trait::async_trait;


>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//! ump server running in an async task.
//!
//! ```
//! # tokio_test::block_on(async {
//! use std::ops::ControlFlow;
//! use ump_server::{
//!   async_trait,
//!   task::{Handler, spawn},
//!   ump::ReplyContext
//! };
//! enum Request {
//!   Add(usize, usize)
//! }
//! enum Reply {
//!   Sum(usize)
//! }
//! enum MyError { }
//! struct MyHandler {};
//! #[async_trait]
//! impl Handler<Request, Reply, MyError, ()> for MyHandler {
//!   async fn proc_req(
//!     &mut self,
//!     msg: Request,
//!     rctx: ReplyContext<Reply, MyError>
//!   ) -> ControlFlow<(), ()> {
//!     match msg {
//!       Request::Add(a, b) => {
//!         rctx.reply(Reply::Sum(a + b));
//!         ControlFlow::Continue(())
//!       }
//!     }
//!   }
//! }
//!
//! let (clnt, jh) = spawn(|clnt| {
//!   MyHandler { }
//! });
//!
//! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else {
//!   panic!("Unexpected reply");
//! };
//! assert_eq!(sum, 10);
//!
//! // Dropping the only client will terminate the dispatch loop
//! drop(clnt);
//!
//! let _ = jh.await;
//! # });
//! ```

use std::ops::ControlFlow;

use tokio::task::{self, JoinHandle};

use async_trait::async_trait;

50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

65
66



67
68
69
70
71
72
73
}

/// Run a task which will process incoming messages from an ump server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
pub fn spawn<S, R, E, RV>(
  mut handler: impl Handler<S, R, E, RV> + Send + 'static
) -> (Client<S, R, E>, JoinHandle<Option<RV>>)
where
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send

{
  let (server, client) = channel();



  let weak_client = client.weak();
  let jh = task::spawn(async move {
    handler.init(weak_client);
    let ret = loop {
      let (msg, rctx) = match server.async_wait().await {
        Ok(d) => d,
        Err(_) => break None







|
|





|
>


>
>
>







98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
}

/// Run a task which will process incoming messages from an ump server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
pub fn spawn<S, R, E, RV, F>(
  hbldr: impl FnOnce(&Client<S, R, E>) -> F
) -> (Client<S, R, E>, JoinHandle<Option<RV>>)
where
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send,
  F: Handler<S, R, E, RV> + Send + 'static
{
  let (server, client) = channel();

  let mut handler = hbldr(&client);

  let weak_client = client.weak();
  let jh = task::spawn(async move {
    handler.init(weak_client);
    let ret = loop {
      let (msg, rctx) = match server.async_wait().await {
        Ok(d) => d,
        Err(_) => break None

Changes to src/thread.rs.

1












































2
3
4
5
6
7
8
//! ump server running on a thread.













































use std::{ops::ControlFlow, thread};

use super::{channel, Client, ReplyContext};

/// Message processing trait for a threaded handler.
pub trait Handler<S, R, E, RV> {

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//! ump server running on a thread.
//!
//! ```
//! use std::ops::ControlFlow;
//! use ump_server::{
//!   thread::{Handler, spawn},
//!   ump::ReplyContext
//! };
//! enum Request {
//!   Add(usize, usize)
//! }
//! enum Reply {
//!   Sum(usize)
//! }
//! enum MyError { }
//! struct MyHandler {};
//! impl Handler<Request, Reply, MyError, ()> for MyHandler {
//!   fn proc_req(
//!     &mut self,
//!     msg: Request,
//!     rctx: ReplyContext<Reply, MyError>
//!   ) -> ControlFlow<(), ()> {
//!     match msg {
//!       Request::Add(a, b) => {
//!         rctx.reply(Reply::Sum(a + b));
//!         ControlFlow::Continue(())
//!       }
//!     }
//!   }
//! }
//!
//! let (clnt, jh) = spawn(|clnt| {
//!   MyHandler { }
//! });
//!
//! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else {
//!   panic!("Unexpected reply");
//! };
//! assert_eq!(sum, 10);
//!
//! // Dropping the only client will terminate the dispatch loop
//! drop(clnt);
//!
//! let _ = jh.join();
//! ```

use std::{ops::ControlFlow, thread};

use super::{channel, Client, ReplyContext};

/// Message processing trait for a threaded handler.
pub trait Handler<S, R, E, RV> {
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

60
61



62
63
64
65
66
67
68
}

/// Run a thread which will process incoming messages from an ump server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
pub fn spawn<S, R, E, RV>(
  mut handler: impl Handler<S, R, E, RV> + Send + 'static
) -> (Client<S, R, E>, thread::JoinHandle<Option<RV>>)
where
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send

{
  let (server, client) = channel();



  let weak_client = client.weak();
  let jh = thread::spawn(move || {
    handler.init(weak_client);
    let ret = loop {
      let (msg, rctx) = match server.wait() {
        Ok(d) => d,
        Err(_) => break None







|
|





|
>


>
>
>







89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
}

/// Run a thread which will process incoming messages from an ump server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
pub fn spawn<S, R, E, RV, F>(
  hbldr: impl FnOnce(&Client<S, R, E>) -> F
) -> (Client<S, R, E>, thread::JoinHandle<Option<RV>>)
where
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send,
  F: Handler<S, R, E, RV> + Send + 'static
{
  let (server, client) = channel();

  let mut handler = hbldr(&client);

  let weak_client = client.weak();
  let jh = thread::spawn(move || {
    handler.init(weak_client);
    let ret = loop {
      let (msg, rctx) = match server.wait() {
        Ok(d) => d,
        Err(_) => break None

Changes to tests/term.rs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
mod common;

use common::{Reply, Request, ThreadedServer};

// Terminate the dispatcher loop by dropping the only client.
#[test]
fn no_clients() {
  let handler = ThreadedServer {};
  let (clnt, jh) = ump_server::spawn_thread(handler);

  // Drop the (only) client, which should cause dispatch loop to terminate.
  drop(clnt);

  // Termination by clients disappearing should return None
  assert_eq!(jh.join().unwrap(), None);
}

// Terminate the dispatcher loop by explicitly requesting it to terminate from
// its handler.
#[test]
fn handler_req_term() {
  let handler = ThreadedServer {};
  let (clnt, jh) = ump_server::spawn_thread(handler);

  assert_eq!(clnt.req(Request::Add(2, 4)).unwrap(), Reply::Sum(6));
  assert_eq!(clnt.req(Request::Croak).unwrap(), Reply::OkIWillCroak);

  assert_eq!(jh.join().unwrap(), Some(42));
}








<
|












<
|







1
2
3
4
5
6
7

8
9
10
11
12
13
14
15
16
17
18
19
20

21
22
23
24
25
26
27
28
mod common;

use common::{Reply, Request, ThreadedServer};

// Terminate the dispatcher loop by dropping the only client.
#[test]
fn no_clients() {

  let (clnt, jh) = ump_server::spawn_thread(|_clnt| ThreadedServer {});

  // Drop the (only) client, which should cause dispatch loop to terminate.
  drop(clnt);

  // Termination by clients disappearing should return None
  assert_eq!(jh.join().unwrap(), None);
}

// Terminate the dispatcher loop by explicitly requesting it to terminate from
// its handler.
#[test]
fn handler_req_term() {

  let (clnt, jh) = ump_server::spawn_thread(|_clnt| ThreadedServer {});

  assert_eq!(clnt.req(Request::Add(2, 4)).unwrap(), Reply::Sum(6));
  assert_eq!(clnt.req(Request::Croak).unwrap(), Reply::OkIWillCroak);

  assert_eq!(jh.join().unwrap(), Some(42));
}

Changes to www/changelog.md.

1
2
3
4


5
6
7
8
9
10



















11
12
13
14
15
16
# Change Log

## [Unreleased]



### Added

### Changed

### Removed




















---

## [0.1.0] - 2023-10-03

Initial release.





>
>






>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>






1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Change Log

## [Unreleased]

[Details](/vdiff?from=ump-server-0.2.0&to=trunk)

### Added

### Changed

### Removed

---

## [0.2.0] - 2024-01-28

[Details](/vdiff?from=ump-server-0.1.0&to=ump-server-0.2.0)

### Added

- Add `net` feature to `tokio` dependency to work around what appears to be a
  bug in tokio which prohibits doc generation without it.

### Changed

- Instead of taking in an `impl Handler` into the `{thread,task}::spawn()`
  function, take in a closure that returns the handler.  A reference to the
  handler channel's client endpoint is passed to the closure, which makes it
  possible to store `Client`/`WeakClient` in the handler, without involving an
  `Option` (or similar).

---

## [0.1.0] - 2023-10-03

Initial release.

Changes to www/index.md.

1
2
3

4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# ump-server

The _ump-server_ crate is a server message dispatch abstraction for ump.



## Feature labels in documentation

The crate's documentation uses automatically generated feature labels, which
currently requires nightly featuers.  To build the documentation locally use:

```
RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features
```


## Change log

The details of changes can always be found in the timeline, but for a
high-level view of changes between released versions there's a manually
maintained [Change Log](./changelog.md).



|
>








|

<







1
2
3
4
5
6
7
8
9
10
11
12
13
14

15
16
17
18
19
20
21
# ump-server

The _ump-server_ crate is a server message dispatch abstraction for
[ump](https://repos.qrnch.tech/pub/ump).


## Feature labels in documentation

The crate's documentation uses automatically generated feature labels, which
currently requires nightly featuers.  To build the documentation locally use:

```
RUSTFLAGS="--cfg docsrs" RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features
```


## Change log

The details of changes can always be found in the timeline, but for a
high-level view of changes between released versions there's a manually
maintained [Change Log](./changelog.md).