ump-ng-server

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From ump-ng-server-0.2.0 To ump-ng-server-0.2.1

2024-02-20
16:13
(Re)release maintenance. check-in: 8ce88d28b5 user: jan tags: trunk, ump-ng-server-0.3.0
11:56
Happy clippy. check-in: 959380d872 user: jan tags: trunk, ump-ng-server-0.2.1
11:54
Release maintenance. check-in: 0ebc081337 user: jan tags: trunk
2024-01-28
17:35
Make handler constructor closure fallible. check-in: 1e85d183b2 user: jan tags: trunk
15:19
Release maintenance. check-in: d37659a48c user: jan tags: trunk, ump-ng-server-0.2.0
15:09
Use closure to construct Handler. Remove the spawn_preinit() functions, since using a closure to construct the Handler makes it possible to accomplish the same thing, but less ugly. check-in: d485928620 user: jan tags: trunk

Changes to .efiles.

1
2
3
4
5
6
7

8
9
10
Cargo.toml
README.md
www/index.md
www/changelog.md
src/lib.rs
src/thread.rs
src/task.rs

tests/common/mod.rs
tests/simple.rs
tests/term.rs







>



1
2
3
4
5
6
7
8
9
10
11
Cargo.toml
README.md
www/index.md
www/changelog.md
src/lib.rs
src/thread.rs
src/task.rs
src/wdog.rs
tests/common/mod.rs
tests/simple.rs
tests/term.rs

Changes to Cargo.toml.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21


22
23
24

25
26
27

28
29
30
31
32
33
34
35
36
[package]
name = "ump-ng-server"
version = "0.2.0"
edition = "2021"
license = "0BSD"
categories = [ "concurrency", "asynchronous" ]
keywords = [ "channel", "threads", "sync", "message-passing" ]
repository = "https://repos.qrnch.tech/pub/ump-ng-server"
description = "Server message dispatch loop for ump-ng."
rust-version = "1.56"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "www",
  "rustfmt.toml"
]

[features]
default = ["tokio"]
tokio = ["dep:tokio", "dep:async-trait"]



[dependencies]
async-trait = { version = "0.1.77", optional = true }

# "net" is added as a temporary workaround.  Without it building the docs fail
# in tokio.
tokio = { version = "1.35.1", features = ["net", "rt"], optional = true }

ump-ng = { version = "0.1.0" }

[dev-dependencies]
tokio-test = { version = "0.4.3" }

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"]



|


















>
>



>



>









1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
[package]
name = "ump-ng-server"
version = "0.2.1"
edition = "2021"
license = "0BSD"
categories = [ "concurrency", "asynchronous" ]
keywords = [ "channel", "threads", "sync", "message-passing" ]
repository = "https://repos.qrnch.tech/pub/ump-ng-server"
description = "Server message dispatch loop for ump-ng."
rust-version = "1.56"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "www",
  "rustfmt.toml"
]

[features]
default = ["tokio"]
tokio = ["dep:tokio", "dep:async-trait"]
tracing = ["dep:tracing"]
watchdog = ["dep:parking_lot"]

[dependencies]
async-trait = { version = "0.1.77", optional = true }
parking_lot = { version = "0.12.1", optional = true }
# "net" is added as a temporary workaround.  Without it building the docs fail
# in tokio.
tokio = { version = "1.35.1", features = ["net", "rt"], optional = true }
tracing = { version = "0.1.40", optional = true }
ump-ng = { version = "0.1.0" }

[dev-dependencies]
tokio-test = { version = "0.4.3" }

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"]

Changes to src/lib.rs.

45
46
47
48
49
50
51



52
53
54
55
56
57
58
//! If the dispatch loop should terminate once all the application's client
//! end-points have been dropped, then the handler can store a [`WeakClient`]
//! instead (as storing a cloned [`Client`] object will preventing the dispatch
//! loop from terminating due to all clients being lost).  The examples in the
//! [`task`] and [`thread`] modules illustrate how to do this.

#![cfg_attr(docsrs, feature(doc_cfg))]




#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub mod task;

pub mod thread;








>
>
>







45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
//! If the dispatch loop should terminate once all the application's client
//! end-points have been dropped, then the handler can store a [`WeakClient`]
//! instead (as storing a cloned [`Client`] object will preventing the dispatch
//! loop from terminating due to all clients being lost).  The examples in the
//! [`task`] and [`thread`] modules illustrate how to do this.

#![cfg_attr(docsrs, feature(doc_cfg))]

#[cfg(feature = "watchdog")]
mod wdog;

#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub mod task;

pub mod thread;

Changes to src/task.rs.

48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
//!     }
//!   }
//! }
//!
//! let (clnt, jh) = spawn(|clnt| {
//!   // Store a weak client in the handler so it doesn't keep the dispatch
//!   // loop alive when the Client returned to the application is dropped.
//!   MyHandler {
//!     wclnt: clnt.weak()
//!   }
//! });
//!
//! clnt.post(Post::ShoutIntoVoid).unwrap();
//!
//! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else {
//!   panic!("Unexpected reply");
//! };
//! assert_eq!(sum, 10);







|

|
|







48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
//!     }
//!   }
//! }
//!
//! let (clnt, jh) = spawn(|clnt| {
//!   // Store a weak client in the handler so it doesn't keep the dispatch
//!   // loop alive when the Client returned to the application is dropped.
//!   Ok(MyHandler {
//!     wclnt: clnt.weak()
//!   })
//! }).unwrap();
//!
//! clnt.post(Post::ShoutIntoVoid).unwrap();
//!
//! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else {
//!   panic!("Unexpected reply");
//! };
//! assert_eq!(sum, 10);
75
76
77
78
79
80
81

82
83
84
85
86
87
88

use tokio::task::{self, JoinHandle};

use async_trait::async_trait;

use super::{channel, Client, MsgType, ReplyContext};


#[async_trait]
pub trait Handler<P, S, R, E, RV> {
  /// Optional initialization callback.
  ///
  /// This is called on the dispatcher task before the main message
  /// processing loop is entered.
  #[allow(unused_variables)]







>







75
76
77
78
79
80
81
82
83
84
85
86
87
88
89

use tokio::task::{self, JoinHandle};

use async_trait::async_trait;

use super::{channel, Client, MsgType, ReplyContext};

/// Message processing trait for an async handler.
#[async_trait]
pub trait Handler<P, S, R, E, RV> {
  /// Optional initialization callback.
  ///
  /// This is called on the dispatcher task before the main message
  /// processing loop is entered.
  #[allow(unused_variables)]
116
117
118
119
120
121
122

123
124
125
126
127
128
129
130
131
132
133
134
135
136



137
138
139
140
141
142
143




144
145
146
147
148




149
150
151
152

153
154
155




156
157
158
159
160
161
162
}

/// Launch a task that will process incoming messages from an ump-ng server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).

pub fn spawn<P, S, R, E, RV, F>(
  hbldr: impl FnOnce(&Client<P, S, R, E>) -> F
) -> (Client<P, S, R, E>, JoinHandle<Option<RV>>)
where
  P: 'static + Send,
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send,
  F: Handler<P, S, R, E, RV> + Send + 'static
{
  let (server, client) = channel();

  let mut handler = hbldr(&client);




  let weak_client = client.weak();
  let jh = task::spawn(async move {
    handler.init(weak_client);
    let ret = loop {
      match server.async_wait().await {
        Ok(msg) => match msg {




          MsgType::Put(m) => match handler.post(m).await {
            ControlFlow::Continue(_) => {}
            ControlFlow::Break(rv) => break Some(rv)
          },
          MsgType::Request(m, rctx) => match handler.req(m, rctx).await {




            ControlFlow::Continue(_) => {}
            ControlFlow::Break(rv) => break Some(rv)
          }
        },

        Err(_) => break None
      }
    };




    handler.term(ret)
  });

  (client, jh)
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :







>

|
|










|
>
>
>






|
>
>
>
>
|
|
<
|
|
>
>
>
>



<
>



>
>
>
>



|



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154

155
156
157
158
159
160
161
162
163

164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
}

/// Launch a task that will process incoming messages from an ump-ng server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
#[allow(clippy::type_complexity)]
pub fn spawn<P, S, R, E, RV, F>(
  hbldr: impl FnOnce(&Client<P, S, R, E>) -> Result<F, E>
) -> Result<(Client<P, S, R, E>, JoinHandle<Option<RV>>), E>
where
  P: 'static + Send,
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send,
  F: Handler<P, S, R, E, RV> + Send + 'static
{
  let (server, client) = channel();

  let mut handler = hbldr(&client)?;

  #[cfg(feature = "watchdog")]
  let wdog = crate::wdog::run();

  let weak_client = client.weak();
  let jh = task::spawn(async move {
    handler.init(weak_client);
    let ret = loop {
      match server.async_wait().await {
        Ok(msg) => {
          #[cfg(feature = "watchdog")]
          wdog.begin_process();

          let res = match msg {
            MsgType::Put(m) => handler.post(m).await,
            MsgType::Request(m, rctx) => handler.req(m, rctx).await

          };

          #[cfg(feature = "watchdog")]
          wdog.end_process();

          match res {
            ControlFlow::Continue(_) => {}
            ControlFlow::Break(rv) => break Some(rv)
          }

        }
        Err(_) => break None
      }
    };

    #[cfg(feature = "watchdog")]
    let _ = wdog.kill();

    handler.term(ret)
  });

  Ok((client, jh))
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/thread.rs.

42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//!     }
//!   }
//! }
//!
//! let (clnt, jh) = spawn(|clnt| {
//!   // Store a weak client in the handler so it doesn't keep the dispatch
//!   // loop alive when the Client returned to the application is dropped.
//!   MyHandler {
//!     wclnt: clnt.weak()
//!   }
//! });
//!
//! clnt.post(Post::ShoutIntoVoid).unwrap();
//!
//! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else {
//!   panic!("Unexpected reply");
//! };
//! assert_eq!(sum, 10);







|

|
|







42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//!     }
//!   }
//! }
//!
//! let (clnt, jh) = spawn(|clnt| {
//!   // Store a weak client in the handler so it doesn't keep the dispatch
//!   // loop alive when the Client returned to the application is dropped.
//!   Ok(MyHandler {
//!     wclnt: clnt.weak()
//!   })
//! }).unwrap();
//!
//! clnt.post(Post::ShoutIntoVoid).unwrap();
//!
//! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else {
//!   panic!("Unexpected reply");
//! };
//! assert_eq!(sum, 10);
67
68
69
70
71
72
73

74
75
76
77
78
79
80
use std::{
  ops::ControlFlow,
  thread::{self, JoinHandle}
};

use super::{channel, Client, MsgType, ReplyContext};


pub trait Handler<P, S, R, E, RV> {
  /// Optional initialization callback.
  ///
  /// This is called on the dispatcher thread before the main message
  /// processing loop is entered.
  #[allow(unused_variables)]
  fn init(&mut self, weak_client: ump_ng::WeakClient<P, S, R, E>) {}







>







67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
use std::{
  ops::ControlFlow,
  thread::{self, JoinHandle}
};

use super::{channel, Client, MsgType, ReplyContext};

/// Message processing trait for a threaded handler.
pub trait Handler<P, S, R, E, RV> {
  /// Optional initialization callback.
  ///
  /// This is called on the dispatcher thread before the main message
  /// processing loop is entered.
  #[allow(unused_variables)]
  fn init(&mut self, weak_client: ump_ng::WeakClient<P, S, R, E>) {}
103
104
105
106
107
108
109

110
111
112
113
114
115
116
117
118
119
120
121
122
123



124
125
126
127
128
129
130




131
132

133


134
135
136
137
138
139

140
141
142




143
144
145
146
147
148
149
}

/// Launch a thread that will process incoming messages from an ump-ng server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).

pub fn spawn<P, S, R, E, RV, F>(
  hbldr: impl FnOnce(&Client<P, S, R, E>) -> F
) -> (Client<P, S, R, E>, JoinHandle<Option<RV>>)
where
  P: 'static + Send,
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send,
  F: Handler<P, S, R, E, RV> + Send + 'static
{
  let (server, client) = channel();

  let mut handler = hbldr(&client);




  let weak_client = client.weak();
  let jh = thread::spawn(move || {
    handler.init(weak_client);
    let ret = loop {
      match server.wait() {
        Ok(msg) => match msg {




          MsgType::Put(m) => match handler.post(m) {
            ControlFlow::Continue(_) => {}

            ControlFlow::Break(rv) => break Some(rv)


          },
          MsgType::Request(m, rctx) => match handler.req(m, rctx) {
            ControlFlow::Continue(_) => {}
            ControlFlow::Break(rv) => break Some(rv)
          }
        },

        Err(_) => break None
      }
    };




    handler.term(ret)
  });

  (client, jh)
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :







>

|
|










|
>
>
>






|
>
>
>
>
|
|
>
|
>
>
|
|



<
>



>
>
>
>



|



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150

151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
}

/// Launch a thread that will process incoming messages from an ump-ng server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
#[allow(clippy::type_complexity)]
pub fn spawn<P, S, R, E, RV, F>(
  hbldr: impl FnOnce(&Client<P, S, R, E>) -> Result<F, E>
) -> Result<(Client<P, S, R, E>, JoinHandle<Option<RV>>), E>
where
  P: 'static + Send,
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send,
  F: Handler<P, S, R, E, RV> + Send + 'static
{
  let (server, client) = channel();

  let mut handler = hbldr(&client)?;

  #[cfg(feature = "watchdog")]
  let wdog = crate::wdog::run();

  let weak_client = client.weak();
  let jh = thread::spawn(move || {
    handler.init(weak_client);
    let ret = loop {
      match server.wait() {
        Ok(msg) => {
          #[cfg(feature = "watchdog")]
          wdog.begin_process();

          let res = match msg {
            MsgType::Put(m) => handler.post(m),
            MsgType::Request(m, rctx) => handler.req(m, rctx)
          };

          #[cfg(feature = "watchdog")]
          wdog.end_process();

          match res {
            ControlFlow::Continue(_) => {}
            ControlFlow::Break(rv) => break Some(rv)
          }

        }
        Err(_) => break None
      }
    };

    #[cfg(feature = "watchdog")]
    let _ = wdog.kill();

    handler.term(ret)
  });

  Ok((client, jh))
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Added src/wdog.rs.

















































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use std::{
  sync::Arc,
  thread::JoinHandle,
  time::{Duration, Instant}
};

use parking_lot::{Condvar, Mutex};

/// Maximum amount of milliseconds allowed
const MAX_PROC_MILLIS: u64 = 200;

enum State {
  /// Waiting for a message to arrive.
  Idle,

  /// A message has arrived and is being processed.
  Processing {
    start_time: Instant
  },

  /// Message processing has timed out.
  Timeout,

  Term
}

struct Inner {
  state: State
}

struct Shared {
  inner: Mutex<Inner>,
  signal: Condvar
}

pub(crate) fn run() -> WatchDog {
  let inner = Inner { state: State::Idle };

  let shared = Shared {
    inner: Mutex::new(inner),
    signal: Condvar::new()
  };

  let sh = Arc::new(shared);
  let shared = Arc::clone(&sh);
  let jh = std::thread::spawn(|| monitor_thread(shared));

  WatchDog { sh, jh }
}

pub(crate) struct WatchDog {
  sh: Arc<Shared>,
  jh: JoinHandle<()>
}

impl WatchDog {
  pub(crate) fn begin_process(&self) {
    let mut g = self.sh.inner.lock();
    g.state = State::Processing {
      start_time: Instant::now()
    };
    self.sh.signal.notify_one();
  }

  pub(crate) fn end_process(&self) {
    let mut g = self.sh.inner.lock();
    g.state = State::Idle;
    self.sh.signal.notify_one();
  }

  pub(crate) fn kill(self) -> std::thread::Result<()> {
    let mut g = self.sh.inner.lock();
    g.state = State::Term;
    self.sh.signal.notify_one();
    drop(g);
    self.jh.join()
  }
}


fn monitor_thread(sh: Arc<Shared>) {
  let mut g = sh.inner.lock();
  loop {
    match g.state {
      State::Idle => {
        // Wait to be notified about a state change
        sh.signal.wait(&mut g);
      }
      State::Processing { start_time } => {
        let timeout = start_time + Duration::from_millis(MAX_PROC_MILLIS);
        if sh.signal.wait_until(&mut g, timeout).timed_out() {
          g.state = State::Timeout;
          continue;
        }
      }
      State::Timeout => {
        #[cfg(feature = "tracing")]
        tracing::warn!(
          "Message processing held up the dispatcher more than {}ms",
          MAX_PROC_MILLIS
        );

        #[cfg(not(feature = "tracing"))]
        eprintln!(
          "Warning: Message processing held up the dispatcher more than {}ms",
          MAX_PROC_MILLIS
        );

        // Retutn to idle state
        g.state = State::Idle;
        continue;
      }
      State::Term => {
        break;
      }
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to tests/simple.rs.

1
2
3
4
5
6
7
8
9
10
11

12
13
14
15
16
17
18
mod common;

use common::{Put, Reply, Request, ThreadedServer};

// Check the signalled state, which should be false.
// Then Signal.
// And finally check signalled state again, which should be true.
#[test]
fn no_clients() {
  let (clnt, jh) =
    ump_ng_server::spawn_thread(|_clnt| ThreadedServer::default());


  let reply = clnt.req(Request::GetSignalState).unwrap();
  assert_eq!(reply, Reply::SignalState(false));

  clnt.post(Put::Signal).unwrap();

  let reply = clnt.req(Request::GetSignalState).unwrap();










|
>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
mod common;

use common::{Put, Reply, Request, ThreadedServer};

// Check the signalled state, which should be false.
// Then Signal.
// And finally check signalled state again, which should be true.
#[test]
fn no_clients() {
  let (clnt, jh) =
    ump_ng_server::spawn_thread(|_clnt| Ok(ThreadedServer::default()))
      .unwrap();

  let reply = clnt.req(Request::GetSignalState).unwrap();
  assert_eq!(reply, Reply::SignalState(false));

  clnt.post(Put::Signal).unwrap();

  let reply = clnt.req(Request::GetSignalState).unwrap();

Changes to tests/term.rs.

1
2
3
4
5
6
7
8
9
10

11
12
13
14
15
16
17
18
19
20
21
22
23
24

25
26
27
28
29
30
31
32
33
34
35
36
37

38
39
40
41
42
43
44
45
46
47
48
49
50
51

52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

72
73
74
75
76
77
78
mod common;

use std::time::Duration;

use common::{Put, Reply, Request, ThreadedServer};

// Terminate the dispatcher loop by dropping the only client.
#[test]
fn no_clients() {
  let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());


  // Drop the (only) client, which should cause dispatch loop to terminate.
  drop(clnt);

  // Termination by clients disappearing should return None
  assert_eq!(jh.join().unwrap(), None);
}


// Terminate the dispatcher loop by explicitly requesting it to terminate from
// its handler.
#[test]
fn handler_req_term() {
  let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());


  assert_eq!(clnt.req(Request::Add(2, 4)).unwrap(), Reply::Sum(6));
  assert_eq!(clnt.req(Request::Croak).unwrap(), Reply::OkIWillCroak);

  let rv = jh.join().unwrap();

  assert_eq!(rv, Some(42));
}


#[test]
fn handler_put_term() {
  let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());


  clnt.post(Put::Signal).unwrap();
  clnt.post(Put::Croak).unwrap();

  let rv = jh.join().unwrap();

  assert_eq!(rv, Some(42));
}

// Populate the queue with a bunch of nodes, then drop the client while there
// are still nodes in the queue
#[test]
fn handler_delay_term() {
  let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());


  // Push a bunch of sleep nodes on the queue
  for _ in 0..10 {
    clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap();
  }

  // Drop the client
  drop(clnt);

  let rv = jh.join().unwrap();

  assert_eq!(rv, None);
}

// Make sure all the sleepers are processed.
// The explicit termination request should take precedence over terminating
// over all clients disappearing.
#[test]
fn handler_delay_term_count() {
  let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());


  for _ in 0..10 {
    clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap();
  }
  clnt.post(Put::CroakSleepCount).unwrap();

  drop(clnt);









|
>













|
>












|
>













|
>



















|
>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
mod common;

use std::time::Duration;

use common::{Put, Reply, Request, ThreadedServer};

// Terminate the dispatcher loop by dropping the only client.
#[test]
fn no_clients() {
  let (clnt, jh) =
    ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap();

  // Drop the (only) client, which should cause dispatch loop to terminate.
  drop(clnt);

  // Termination by clients disappearing should return None
  assert_eq!(jh.join().unwrap(), None);
}


// Terminate the dispatcher loop by explicitly requesting it to terminate from
// its handler.
#[test]
fn handler_req_term() {
  let (clnt, jh) =
    ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap();

  assert_eq!(clnt.req(Request::Add(2, 4)).unwrap(), Reply::Sum(6));
  assert_eq!(clnt.req(Request::Croak).unwrap(), Reply::OkIWillCroak);

  let rv = jh.join().unwrap();

  assert_eq!(rv, Some(42));
}


#[test]
fn handler_put_term() {
  let (clnt, jh) =
    ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap();

  clnt.post(Put::Signal).unwrap();
  clnt.post(Put::Croak).unwrap();

  let rv = jh.join().unwrap();

  assert_eq!(rv, Some(42));
}

// Populate the queue with a bunch of nodes, then drop the client while there
// are still nodes in the queue
#[test]
fn handler_delay_term() {
  let (clnt, jh) =
    ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap();

  // Push a bunch of sleep nodes on the queue
  for _ in 0..10 {
    clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap();
  }

  // Drop the client
  drop(clnt);

  let rv = jh.join().unwrap();

  assert_eq!(rv, None);
}

// Make sure all the sleepers are processed.
// The explicit termination request should take precedence over terminating
// over all clients disappearing.
#[test]
fn handler_delay_term_count() {
  let (clnt, jh) =
    ump_ng_server::spawn_thread(|_| Ok(ThreadedServer::default())).unwrap();

  for _ in 0..10 {
    clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap();
  }
  clnt.post(Put::CroakSleepCount).unwrap();

  drop(clnt);

Changes to www/changelog.md.

1
2
3
4
5
6
7
8
9
10
11
12

















13
14
15
16
17
18
19
# Change Log

## [Unreleased]

[Details](/vdiff?from=ump-ng-server-0.2.0&to=trunk)

### Added

### Changed

### Removed


















---

## [0.2.0] - 2024-01-28

[Details](/vdiff?from=ump-ng-server-0.1.2&to=ump-ng-server-0.2.0)

### Changed




|







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# Change Log

## [Unreleased]

[Details](/vdiff?from=ump-ng-server-0.2.1&to=trunk)

### Added

### Changed

### Removed

---

## [0.2.1] - 2024-02-20

[Details](/vdiff?from=ump-ng-server-0.2.0&to=ump-ng-server-0.2.1)

### Added

- Add a watchdog that will warn if a msgproc takes more than 200ms to return.
  This is intended for dev builds only and must be enabled using the
  `watchdog` feature.  Will output to stderr by default, but will output to
  `tracing::warn!()` if the `tracing` feature is enabled as well.

### Changed

- Make `Handler` builder closure fallible.

---

## [0.2.0] - 2024-01-28

[Details](/vdiff?from=ump-ng-server-0.1.2&to=ump-ng-server-0.2.0)

### Changed