ump-server

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From ump-server-0.2.0 To ump-server-0.2.1

2024-02-20
11:58
Changelog fixup. check-in: b99bf2d8a9 user: jan tags: trunk
11:50
Release maintenance. check-in: 933f121ec8 user: jan tags: trunk, ump-server-0.2.1
08:06
Up version. check-in: 157c1acc22 user: jan tags: trunk
2024-01-28
14:28
Update doc examples to store a weak client reference within the handler. check-in: 3ea6f918b0 user: jan tags: trunk
13:39
Release maintenance. check-in: 88c41e1742 user: jan tags: trunk, ump-server-0.2.0
13:26
Make the spawm methods take in a closure for constructing the handler, to allow the handler to be created after the channel client. Add thread/task examples to module docs. check-in: 752b1bbc09 user: jan tags: trunk

Changes to .efiles.

1
2
3
4
5
6
7

8
9

1
2
3
4
5
6
7
8
9
10
11







+


+
Cargo.toml
README.md
www/index.md
www/changelog.md
src/lib.rs
src/thread.rs
src/task.rs
src/wdog.rs
tests/term.rs
tests/common/mod.rs
examples/wdog_timeout.rs

Changes to Cargo.toml.

1
2
3

4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21


22
23
24

25
26
27


28
29
30
31
32
33
34
35
36
1
2

3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

30
31
32
33
34
35
36
37
38
39
40


-
+


















+
+



+


-
+
+









[package]
name = "ump-server"
version = "0.2.0"
version = "0.2.1"
edition = "2021"
license = "0BSD"
categories = [ "concurrency", "asynchronous" ]
keywords = [ "channel", "threads", "sync", "message-passing" ]
repository = "https://repos.qrnch.tech/pub/ump-server"
description = "Server message dispatch loop for ump."
rust-version = "1.56"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "www",
  "rustfmt.toml"
]

[features]
default = ["tokio"]
tokio = ["dep:tokio", "dep:async-trait"]
tracing = ["dep:tracing"]
watchdog = ["dep:parking_lot"]

[dependencies]
async-trait = { version = "0.1.77", optional = true }
parking_lot = { version = "0.12.1", optional = true }
# ToDo: Shouldn't need "net", but without it the docs will not build.
#       Once this is fixed in tokio, remove "net".
tokio = { version = "1.35.1", features = ["net", "rt"], optional = true }
tokio = { version = "1.36.0", features = ["net", "rt"], optional = true }
tracing = { version = "0.1.40", optional = true }
ump = { version = "0.12.1" }

[dev-dependencies]
tokio-test = { version = "0.4.3" }

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"]

Added examples/wdog_timeout.rs.





















































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
use std::ops::ControlFlow;

#[derive(Debug)]
pub enum Request {
  Delay(u64)
}

#[derive(Debug, PartialEq)]
pub enum Reply {
  DelayDone
}

pub struct ThreadedServer {}

impl ump_server::ThreadedHandler<Request, Reply, (), u32> for ThreadedServer {
  fn proc_req(
    &mut self,
    msg: Request,
    rctx: ump_server::ReplyContext<Reply, ()>
  ) -> ControlFlow<u32, ()> {
    match msg {
      Request::Delay(ms) => {
        std::thread::sleep(std::time::Duration::from_millis(ms));
        rctx.reply(Reply::DelayDone).unwrap();
        ControlFlow::Continue(())
      }
    }
  }
}

// Terminate the dispatcher loop by dropping the only client.
fn main() {
  #[cfg(not(feature = "watchdog"))]
  eprintln!("Warning: Example not built with watchdog feature");

  let (clnt, jh) =
    ump_server::spawn_thread(|_clnt| Ok(ThreadedServer {})).unwrap();

  println!("==> Issue request which should not timeout ..");
  clnt.req(Request::Delay(190)).unwrap();

  println!("==> Issue request which should timeout ..");
  clnt.req(Request::Delay(200)).unwrap();

  // Drop the (only) client, which should cause dispatch loop to terminate.
  drop(clnt);

  // Termination by clients disappearing should return None
  assert_eq!(jh.join().unwrap(), None);
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/lib.rs.

29
30
31
32
33
34
35













36
37



38
39
40
41
42
43
44
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60







+
+
+
+
+
+
+
+
+
+
+
+
+


+
+
+







//! - The message queue is empty and all the associated [`Client`]s have been
//!   released.  This would cause the thread to return `None`.
//!
//! # Application message handlers
//! Message handlers are implemented using the [`thread::Handler`] trait (for
//! the threaded dispatch loop) and [`task::Handler`] (for the async dispatch
//! loop).
//!
//! There are cases where the handler needs to store a clone of the client
//! end-point of the message passing channel used to issue requests to the
//! server (so that message handlers can issue new requests).  In order to
//! facilitate this, the application must pass a `Handler`-construction closure
//! to `spawn()`.  The closure will be called after the message passing channel
//! has been created so it can be passed a reference to the client end-point.
//!
//! If the dispatch loop should terminate once all the application's client
//! end-points have been dropped, then the handler can store a [`WeakClient`]
//! instead (as storing a cloned [`Client`] object will preventing the dispatch
//! loop from terminating due to all clients being lost).  The examples in the
//! [`task`] and [`thread`] modules illustrate how to do this.

#![cfg_attr(docsrs, feature(doc_cfg))]

#[cfg(feature = "watchdog")]
mod wdog;

#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub mod task;

pub mod thread;

Changes to src/task.rs.

1
2

3
4
5
6
7
8
9

10

11
12
13
14
15
16

17

18



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35




36
37


38
39
40
41
42
43
44
1
2
3
4
5
6
7
8
9

10
11
12
13
14
15
16
17
18
19
20
21

22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45


46
47
48
49
50
51
52
53
54


+






-
+

+






+

+
-
+
+
+

















+
+
+
+
-
-
+
+







//! ump server running in an async task.
//!
//! # Example
//! ```
//! # tokio_test::block_on(async {
//! use std::ops::ControlFlow;
//! use ump_server::{
//!   async_trait,
//!   task::{Handler, spawn},
//!   ump::ReplyContext
//!   ReplyContext, WeakClient
//! };
//!
//! enum Request {
//!   Add(usize, usize)
//! }
//! enum Reply {
//!   Sum(usize)
//! }
//! #[derive(Debug)]
//! enum MyError { }
//!
//! struct MyHandler {};
//! struct MyHandler {
//!   wclnt: WeakClient<Request, Reply, MyError>
//! };
//! #[async_trait]
//! impl Handler<Request, Reply, MyError, ()> for MyHandler {
//!   async fn proc_req(
//!     &mut self,
//!     msg: Request,
//!     rctx: ReplyContext<Reply, MyError>
//!   ) -> ControlFlow<(), ()> {
//!     match msg {
//!       Request::Add(a, b) => {
//!         rctx.reply(Reply::Sum(a + b));
//!         ControlFlow::Continue(())
//!       }
//!     }
//!   }
//! }
//!
//! let (clnt, jh) = spawn(|clnt| {
//!   // Store a weak client in the handler so it doesn't keep the dispatch
//!   // loop alive when the Client returned to the application is dropped.
//!   Ok(MyHandler {
//!     wclnt: clnt.weak()
//!   MyHandler { }
//! });
//!   })
//! }).unwrap();
//!
//! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else {
//!   panic!("Unexpected reply");
//! };
//! assert_eq!(sum, 10);
//!
//! // Dropping the only client will terminate the dispatch loop
98
99
100
101
102
103
104

105
106
107


108
109
110
111
112
113
114
115
116
117




118
119
120
121
122
123
124
125
126




127






128
129
130
131




132
133
134
135

136
137
138
108
109
110
111
112
113
114
115
116


117
118
119
120
121
122
123
124
125
126
127

128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144

145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161

162
163
164
165







+

-
-
+
+









-
+
+
+
+









+
+
+
+
-
+
+
+
+
+
+




+
+
+
+



-
+



}

/// Run a task which will process incoming messages from an ump server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
#[allow(clippy::type_complexity)]
pub fn spawn<S, R, E, RV, F>(
  hbldr: impl FnOnce(&Client<S, R, E>) -> F
) -> (Client<S, R, E>, JoinHandle<Option<RV>>)
  hbldr: impl FnOnce(&Client<S, R, E>) -> Result<F, E>
) -> Result<(Client<S, R, E>, JoinHandle<Option<RV>>), E>
where
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send,
  F: Handler<S, R, E, RV> + Send + 'static
{
  let (server, client) = channel();

  let mut handler = hbldr(&client);
  let mut handler = hbldr(&client)?;

  #[cfg(feature = "watchdog")]
  let wdog = crate::wdog::run();

  let weak_client = client.weak();
  let jh = task::spawn(async move {
    handler.init(weak_client);
    let ret = loop {
      let (msg, rctx) = match server.async_wait().await {
        Ok(d) => d,
        Err(_) => break None
      };

      #[cfg(feature = "watchdog")]
      wdog.begin_process();

      match handler.proc_req(msg, rctx).await {
      let res = handler.proc_req(msg, rctx).await;

      #[cfg(feature = "watchdog")]
      wdog.end_process();

      match res {
        ControlFlow::Continue(_) => {}
        ControlFlow::Break(rv) => break Some(rv)
      }
    };

    #[cfg(feature = "watchdog")]
    let _ = wdog.kill();

    handler.term(ret)
  });

  (client, jh)
  Ok((client, jh))
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/thread.rs.

1
2

3
4
5
6
7

8

9
10
11
12
13
14

15

16



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32




33
34


35
36
37
38
39
40
41
1
2
3
4
5
6
7

8
9
10
11
12
13
14
15
16
17
18
19

20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42


43
44
45
46
47
48
49
50
51


+




-
+

+






+

+
-
+
+
+
















+
+
+
+
-
-
+
+







//! ump server running on a thread.
//!
//! # Example
//! ```
//! use std::ops::ControlFlow;
//! use ump_server::{
//!   thread::{Handler, spawn},
//!   ump::ReplyContext
//!   ReplyContext, WeakClient
//! };
//!
//! enum Request {
//!   Add(usize, usize)
//! }
//! enum Reply {
//!   Sum(usize)
//! }
//! #[derive(Debug)]
//! enum MyError { }
//!
//! struct MyHandler {};
//! struct MyHandler {
//!   wclnt: WeakClient<Request, Reply, MyError>
//! };
//! impl Handler<Request, Reply, MyError, ()> for MyHandler {
//!   fn proc_req(
//!     &mut self,
//!     msg: Request,
//!     rctx: ReplyContext<Reply, MyError>
//!   ) -> ControlFlow<(), ()> {
//!     match msg {
//!       Request::Add(a, b) => {
//!         rctx.reply(Reply::Sum(a + b));
//!         ControlFlow::Continue(())
//!       }
//!     }
//!   }
//! }
//!
//! let (clnt, jh) = spawn(|clnt| {
//!   // Store a weak client in the handler so it doesn't keep the dispatch
//!   // loop alive when the Client returned to the application is dropped.
//!   Ok(MyHandler {
//!     wclnt: clnt.weak()
//!   MyHandler { }
//! });
//!   })
//! }).unwrap();
//!
//! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else {
//!   panic!("Unexpected reply");
//! };
//! assert_eq!(sum, 10);
//!
//! // Dropping the only client will terminate the dispatch loop
89
90
91
92
93
94
95

96
97
98


99
100
101
102
103
104
105
106
107
108




109
110
111
112
113
114
115
116
117




118






119
120
121
122




123
124
125
126

127
128
129
99
100
101
102
103
104
105
106
107


108
109
110
111
112
113
114
115
116
117
118

119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135

136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152

153
154
155
156







+

-
-
+
+









-
+
+
+
+









+
+
+
+
-
+
+
+
+
+
+




+
+
+
+



-
+



}

/// Run a thread which will process incoming messages from an ump server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
#[allow(clippy::type_complexity)]
pub fn spawn<S, R, E, RV, F>(
  hbldr: impl FnOnce(&Client<S, R, E>) -> F
) -> (Client<S, R, E>, thread::JoinHandle<Option<RV>>)
  hbldr: impl FnOnce(&Client<S, R, E>) -> Result<F, E>
) -> Result<(Client<S, R, E>, thread::JoinHandle<Option<RV>>), E>
where
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send,
  F: Handler<S, R, E, RV> + Send + 'static
{
  let (server, client) = channel();

  let mut handler = hbldr(&client);
  let mut handler = hbldr(&client)?;

  #[cfg(feature = "watchdog")]
  let wdog = crate::wdog::run();

  let weak_client = client.weak();
  let jh = thread::spawn(move || {
    handler.init(weak_client);
    let ret = loop {
      let (msg, rctx) = match server.wait() {
        Ok(d) => d,
        Err(_) => break None
      };

      #[cfg(feature = "watchdog")]
      wdog.begin_process();

      match handler.proc_req(msg, rctx) {
      let res = handler.proc_req(msg, rctx);

      #[cfg(feature = "watchdog")]
      wdog.end_process();

      match res {
        ControlFlow::Continue(_) => {}
        ControlFlow::Break(rv) => break Some(rv)
      }
    };

    #[cfg(feature = "watchdog")]
    let _ = wdog.kill();

    handler.term(ret)
  });

  (client, jh)
  Ok((client, jh))
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Added src/wdog.rs.

























































































































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
use std::{
  sync::Arc,
  thread::JoinHandle,
  time::{Duration, Instant}
};

use parking_lot::{Condvar, Mutex};

/// Maximum amount of milliseconds allowed
const MAX_PROC_MILLIS: u64 = 200;

enum State {
  /// Waiting for a message to arrive.
  Idle,

  /// A message has arrived and is being processed.
  Processing {
    start_time: Instant
  },

  /// Message processing has timed out.
  Timeout,

  Term
}

struct Inner {
  state: State
}

struct Shared {
  inner: Mutex<Inner>,
  signal: Condvar
}

pub(crate) fn run() -> WatchDog {
  let inner = Inner { state: State::Idle };

  let shared = Shared {
    inner: Mutex::new(inner),
    signal: Condvar::new()
  };

  let sh = Arc::new(shared);
  let shared = Arc::clone(&sh);
  let jh = std::thread::spawn(|| monitor_thread(shared));

  WatchDog { sh, jh }
}

pub(crate) struct WatchDog {
  sh: Arc<Shared>,
  jh: JoinHandle<()>
}

impl WatchDog {
  pub(crate) fn begin_process(&self) {
    let mut g = self.sh.inner.lock();
    g.state = State::Processing {
      start_time: Instant::now()
    };
    self.sh.signal.notify_one();
  }

  pub(crate) fn end_process(&self) {
    let mut g = self.sh.inner.lock();
    g.state = State::Idle;
    self.sh.signal.notify_one();
  }

  pub(crate) fn kill(self) -> std::thread::Result<()> {
    let mut g = self.sh.inner.lock();
    g.state = State::Term;
    self.sh.signal.notify_one();
    drop(g);
    self.jh.join()
  }
}


fn monitor_thread(sh: Arc<Shared>) {
  let mut g = sh.inner.lock();
  loop {
    match g.state {
      State::Idle => {
        // Wait to be notified about a state change
        sh.signal.wait(&mut g);
      }
      State::Processing { start_time } => {
        let timeout = start_time + Duration::from_millis(MAX_PROC_MILLIS);
        if sh.signal.wait_until(&mut g, timeout).timed_out() {
          g.state = State::Timeout;
          continue;
        }
      }
      State::Timeout => {
        #[cfg(feature = "tracing")]
        tracing::warn!(
          "Message processing held up the dispatcher more than {}ms",
          MAX_PROC_MILLIS
        );

        #[cfg(not(feature = "tracing"))]
        eprintln!(
          "Warning: Message processing held up the dispatcher more than {}ms",
          MAX_PROC_MILLIS
        );

        // Retutn to idle state
        g.state = State::Idle;
        continue;
      }
      State::Term => {
        break;
      }
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to tests/term.rs.

1
2
3
4
5
6
7
8


9
10
11
12
13
14
15
16
17
18
19
20
21


22
23
24
25
26
27
28
1
2
3
4
5
6
7

8
9
10
11
12
13
14
15
16
17
18
19
20
21

22
23
24
25
26
27
28
29
30







-
+
+












-
+
+







mod common;

use common::{Reply, Request, ThreadedServer};

// Terminate the dispatcher loop by dropping the only client.
#[test]
fn no_clients() {
  let (clnt, jh) = ump_server::spawn_thread(|_clnt| ThreadedServer {});
  let (clnt, jh) =
    ump_server::spawn_thread(|_clnt| Ok(ThreadedServer {})).unwrap();

  // Drop the (only) client, which should cause dispatch loop to terminate.
  drop(clnt);

  // Termination by clients disappearing should return None
  assert_eq!(jh.join().unwrap(), None);
}

// Terminate the dispatcher loop by explicitly requesting it to terminate from
// its handler.
#[test]
fn handler_req_term() {
  let (clnt, jh) = ump_server::spawn_thread(|_clnt| ThreadedServer {});
  let (clnt, jh) =
    ump_server::spawn_thread(|_clnt| Ok(ThreadedServer {})).unwrap();

  assert_eq!(clnt.req(Request::Add(2, 4)).unwrap(), Reply::Sum(6));
  assert_eq!(clnt.req(Request::Croak).unwrap(), Reply::OkIWillCroak);

  assert_eq!(jh.join().unwrap(), Some(42));
}

Changes to www/changelog.md.

1
2
3
4
5

6
7
8
9
10
11
12

















13
14
15
16
17
18
19
1
2
3
4

5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36




-
+







+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







# Change Log

## [Unreleased]

[Details](/vdiff?from=ump-server-0.2.0&to=trunk)
[Details](/vdiff?from=ump-server-0.2.1&to=trunk)

### Added

### Changed

### Removed

---

## [0.2.1]

[Details](/vdiff?from=ump-server-0.2.0&to=ump-server-0.2.1)

### Added

- Add a watchdog that will warn if a msgproc takes more than 200ms to return.
  This is intended for dev builds only and must be enabled using the
  `watchdog` feature.  Will output to stderr by default, but will output to
  `tracing::warn!()` if the `tracing` feature is enabled as well.

### Changed

- Make `Handler` builder closure fallible.

---

## [0.2.0] - 2024-01-28

[Details](/vdiff?from=ump-server-0.1.0&to=ump-server-0.2.0)

### Added