ump-ng-server

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From ump-ng-server-0.1.2 To ump-ng-server-0.2.0

2024-01-28
17:35
Make handler constructor closure fallible. check-in: 1e85d183b2 user: jan tags: trunk
15:19
Release maintenance. check-in: d37659a48c user: jan tags: trunk, ump-ng-server-0.2.0
15:09
Use closure to construct Handler. Remove the spawn_preinit() functions, since using a closure to construct the Handler makes it possible to accomplish the same thing, but less ugly. check-in: d485928620 user: jan tags: trunk
2024-01-21
18:37
Release maintenance. check-in: 5f420344bf user: jan tags: trunk, ump-ng-server-0.1.2
18:31
Add preinit variants of both thread and task spawners. check-in: e3a0f27d4a user: jan tags: trunk

Changes to Cargo.toml.

1
2
3
4
5
6
7
8
9
10
[package]
name = "ump-ng-server"
version = "0.1.2"
edition = "2021"
license = "0BSD"
categories = [ "concurrency", "asynchronous" ]
keywords = [ "channel", "threads", "sync", "message-passing" ]
repository = "https://repos.qrnch.tech/pub/ump-ng-server"
description = "Server message dispatch loop for ump-ng."
rust-version = "1.56"


|







1
2
3
4
5
6
7
8
9
10
[package]
name = "ump-ng-server"
version = "0.2.0"
edition = "2021"
license = "0BSD"
categories = [ "concurrency", "asynchronous" ]
keywords = [ "channel", "threads", "sync", "message-passing" ]
repository = "https://repos.qrnch.tech/pub/ump-ng-server"
description = "Server message dispatch loop for ump-ng."
rust-version = "1.56"
23
24
25
26
27
28
29



30
31
32
33
[dependencies]
async-trait = { version = "0.1.77", optional = true }
# "net" is added as a temporary workaround.  Without it building the docs fail
# in tokio.
tokio = { version = "1.35.1", features = ["net", "rt"], optional = true }
ump-ng = { version = "0.1.0" }




[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"]








>
>
>




23
24
25
26
27
28
29
30
31
32
33
34
35
36
[dependencies]
async-trait = { version = "0.1.77", optional = true }
# "net" is added as a temporary workaround.  Without it building the docs fail
# in tokio.
tokio = { version = "1.35.1", features = ["net", "rt"], optional = true }
ump-ng = { version = "0.1.0" }

[dev-dependencies]
tokio-test = { version = "0.4.3" }

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"]

Changes to src/lib.rs.

1
2

3
4
5
6
7
8
9
//! _ump-ng-server_ is an abstraction on top of [`ump-ng`] that is used to hide
//! boilerplate code used to implement intra-process message passing servers.

//!
//! # Dispatch loop
//! The core functionality of _ump-ng-server_ is a dispatch loop, whose role it
//! is to pull messages off the message queue and pass them to the
//! application-supplied message handler.
//!
//! There are two different ways to run the dispatcher loop: On a non-async
|
|
>







1
2
3
4
5
6
7
8
9
10
//! _ump-ng-server_ is an abstraction on top of [`ump-ng`](ump_ng) that is used
//! to hide boilerplate code used to implement intra-process message passing
//! servers.
//!
//! # Dispatch loop
//! The core functionality of _ump-ng-server_ is a dispatch loop, whose role it
//! is to pull messages off the message queue and pass them to the
//! application-supplied message handler.
//!
//! There are two different ways to run the dispatcher loop: On a non-async
29
30
31
32
33
34
35













36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
//! - The message queue is empty and all the associated [`Client`]s have been
//!   released.  This would cause the thread to return `None`.
//!
//! # Application message handlers
//! Message handlers are implemented using the [`thread::Handler`] trait (for
//! the threaded dispatch loop) and [`task::Handler`] (for the async dispatch
//! loop).














#![cfg_attr(docsrs, feature(doc_cfg))]

#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub mod task;

pub mod thread;

pub use ump_ng::{
  self, channel, Client, MsgType, ReplyContext, Server, WeakClient
};

pub use thread::{
  spawn as spawn_thread, spawn_preinit as spawn_thread_preinit,
  Handler as ThreadedHandler
};

#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub use task::{
  spawn as spawn_task, spawn_preinit as spawn_task_preinit,
  Handler as AsyncHandler
};

#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub use async_trait::async_trait;

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :







>
>
>
>
>
>
>
>
>
>
>
>
>













|
<
<
<



|
<
<
<






30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63



64
65
66
67



68
69
70
71
72
73
//! - The message queue is empty and all the associated [`Client`]s have been
//!   released.  This would cause the thread to return `None`.
//!
//! # Application message handlers
//! Message handlers are implemented using the [`thread::Handler`] trait (for
//! the threaded dispatch loop) and [`task::Handler`] (for the async dispatch
//! loop).
//!
//! There are cases where the handler needs to store a clone of the client
//! end-point of the message passing channel used to issue requests to the
//! server (so that message handlers can issue new requests).  In order to
//! facilitate this, the application must pass a `Handler`-construction closure
//! to `spawn()`.  The closure will be called after the message passing channel
//! has been created so it can be passed a reference to the client end-point.
//!
//! If the dispatch loop should terminate once all the application's client
//! end-points have been dropped, then the handler can store a [`WeakClient`]
//! instead (as storing a cloned [`Client`] object will preventing the dispatch
//! loop from terminating due to all clients being lost).  The examples in the
//! [`task`] and [`thread`] modules illustrate how to do this.

#![cfg_attr(docsrs, feature(doc_cfg))]

#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub mod task;

pub mod thread;

pub use ump_ng::{
  self, channel, Client, MsgType, ReplyContext, Server, WeakClient
};

pub use thread::{spawn as spawn_thread, Handler as ThreadedHandler};




#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub use task::{spawn as spawn_task, Handler as AsyncHandler};




#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub use async_trait::async_trait;

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/task.rs.










































































1
2
3
4
5
6
7
8
9
10
11
12
13
14









































































use std::ops::ControlFlow;

use tokio::task::{self, JoinHandle};

use async_trait::async_trait;

use super::{channel, Client, MsgType, ReplyContext, Server};

#[async_trait]
pub trait Handler<P, S, R, E, RV> {
  /// Optional initialization callback.
  ///
  /// This is called on the dispatcher task before the main message
  /// processing loop is entered.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>






|







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
//! ump-ng dispatch server running on a thread.
//!
//! # Example
//! ```
//! # tokio_test::block_on(async {
//! use std::ops::ControlFlow;
//! use ump_ng_server::{
//!   async_trait,
//!   task::{Handler, spawn},
//!   ump_ng::{ReplyContext, WeakClient}
//! };
//!
//! enum Post {
//!   ShoutIntoVoid
//! }
//! enum Request {
//!   Add(usize, usize)
//! }
//! enum Reply {
//!   Sum(usize)
//! }
//! #[derive(Debug)]
//! enum MyError { }
//!
//! struct MyHandler {
//!   wclnt: WeakClient<Post, Request, Reply, MyError>
//! }
//! #[async_trait]
//! impl Handler<Post, Request, Reply, MyError, ()> for MyHandler {
//!   async fn post(&mut self, msg: Post) -> ControlFlow<(), ()> {
//!     match msg {
//!       Post::ShoutIntoVoid => {
//!         // No reply .. but keep on trudging on
//!         ControlFlow::Continue(())
//!       }
//!     }
//!   }
//!   async fn req(
//!     &mut self,
//!     msg: Request,
//!     rctx: ReplyContext<Reply, MyError>
//!   ) -> ControlFlow<(), ()> {
//!     match msg {
//!       Request::Add(a, b) => {
//!         rctx.reply(Reply::Sum(a+b)).unwrap();
//!         ControlFlow::Continue(())
//!       }
//!     }
//!   }
//! }
//!
//! let (clnt, jh) = spawn(|clnt| {
//!   // Store a weak client in the handler so it doesn't keep the dispatch
//!   // loop alive when the Client returned to the application is dropped.
//!   MyHandler {
//!     wclnt: clnt.weak()
//!   }
//! });
//!
//! clnt.post(Post::ShoutIntoVoid).unwrap();
//!
//! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else {
//!   panic!("Unexpected reply");
//! };
//! assert_eq!(sum, 10);
//!
//! // drop client to force dispatch loop to terminate
//! drop(clnt);
//!
//! jh.await;
//! # });
//! ```

use std::ops::ControlFlow;

use tokio::task::{self, JoinHandle};

use async_trait::async_trait;

use super::{channel, Client, MsgType, ReplyContext};

#[async_trait]
pub trait Handler<P, S, R, E, RV> {
  /// Optional initialization callback.
  ///
  /// This is called on the dispatcher task before the main message
  /// processing loop is entered.
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

59
60



61
62
63
64
65
66
67
}

/// Launch a task that will process incoming messages from an ump-ng server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
pub fn spawn<P, S, R, E, RV>(
  mut handler: impl Handler<P, S, R, E, RV> + Send + 'static
) -> (Client<P, S, R, E>, JoinHandle<Option<RV>>)
where
  P: 'static + Send,
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send

{
  let (server, client) = channel();



  let weak_client = client.weak();
  let jh = task::spawn(async move {
    handler.init(weak_client);
    let ret = loop {
      match server.async_wait().await {
        Ok(msg) => match msg {
          MsgType::Put(m) => match handler.post(m).await {







|
|






|
>


>
>
>







116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
}

/// Launch a task that will process incoming messages from an ump-ng server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
pub fn spawn<P, S, R, E, RV, F>(
  hbldr: impl FnOnce(&Client<P, S, R, E>) -> F
) -> (Client<P, S, R, E>, JoinHandle<Option<RV>>)
where
  P: 'static + Send,
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send,
  F: Handler<P, S, R, E, RV> + Send + 'static
{
  let (server, client) = channel();

  let mut handler = hbldr(&client);

  let weak_client = client.weak();
  let jh = task::spawn(async move {
    handler.init(weak_client);
    let ret = loop {
      match server.async_wait().await {
        Ok(msg) => match msg {
          MsgType::Put(m) => match handler.post(m).await {
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
    };
    handler.term(ret)
  });

  (client, jh)
}


/// Spawn a task to run a pre-initialized handler.
///
/// It is assumed that the caller has initialized the handler, thus its
/// `init()` method will not be called.
pub fn spawn_preinit<P, S, R, E, RV>(
  server: Server<P, S, R, E>,
  mut handler: impl Handler<P, S, R, E, RV> + Send + 'static
) -> JoinHandle<Option<RV>>
where
  P: 'static + Send,
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send
{
  task::spawn(async move {
    let ret = loop {
      match server.async_wait().await {
        Ok(msg) => match msg {
          MsgType::Put(m) => match handler.post(m).await {
            ControlFlow::Continue(_) => {}
            ControlFlow::Break(rv) => break Some(rv)
          },
          MsgType::Request(m, rctx) => match handler.req(m, rctx).await {
            ControlFlow::Continue(_) => {}
            ControlFlow::Break(rv) => break Some(rv)
          }
        },
        Err(_) => break None
      }
    };
    handler.term(ret)
  })
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<

155
156
157
158
159
160
161




































162
    };
    handler.term(ret)
  });

  (client, jh)
}





































// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/thread.rs.



































































1
2
3
4
5
6
7
8
9
10
11
12
13


































































use std::{
  ops::ControlFlow,
  thread::{self, JoinHandle}
};

use super::{channel, Client, MsgType, ReplyContext, Server};

pub trait Handler<P, S, R, E, RV> {
  /// Optional initialization callback.
  ///
  /// This is called on the dispatcher thread before the main message
  /// processing loop is entered.
  #[allow(unused_variables)]
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>





|







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
//! ump-ng dispatch server running on a thread.
//!
//! # Example
//! ```
//! use std::ops::ControlFlow;
//! use ump_ng_server::{
//!   thread::{Handler, spawn},
//!   ump_ng::{ReplyContext, WeakClient}
//! };
//!
//! enum Post {
//!   ShoutIntoVoid
//! }
//! enum Request {
//!   Add(usize, usize)
//! }
//! enum Reply {
//!   Sum(usize)
//! }
//! #[derive(Debug)]
//! enum MyError { }
//!
//! struct MyHandler {
//!   wclnt: WeakClient<Post, Request, Reply, MyError>
//! }
//! impl Handler<Post, Request, Reply, MyError, ()> for MyHandler {
//!   fn post(&mut self, msg: Post) -> ControlFlow<(), ()> {
//!     match msg {
//!       Post::ShoutIntoVoid => {
//!         // No reply .. but keep on trudging on
//!         ControlFlow::Continue(())
//!       }
//!     }
//!   }
//!   fn req(&mut self, msg: Request, rctx: ReplyContext<Reply, MyError>)
//!     -> ControlFlow<(), ()> {
//!     match msg {
//!       Request::Add(a, b) => {
//!         rctx.reply(Reply::Sum(a+b)).unwrap();
//!         ControlFlow::Continue(())
//!       }
//!     }
//!   }
//! }
//!
//! let (clnt, jh) = spawn(|clnt| {
//!   // Store a weak client in the handler so it doesn't keep the dispatch
//!   // loop alive when the Client returned to the application is dropped.
//!   MyHandler {
//!     wclnt: clnt.weak()
//!   }
//! });
//!
//! clnt.post(Post::ShoutIntoVoid).unwrap();
//!
//! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else {
//!   panic!("Unexpected reply");
//! };
//! assert_eq!(sum, 10);
//!
//! // drop client to force dispatch loop to terminate
//! drop(clnt);
//!
//! jh.join();
//! ```

use std::{
  ops::ControlFlow,
  thread::{self, JoinHandle}
};

use super::{channel, Client, MsgType, ReplyContext};

pub trait Handler<P, S, R, E, RV> {
  /// Optional initialization callback.
  ///
  /// This is called on the dispatcher thread before the main message
  /// processing loop is entered.
  #[allow(unused_variables)]
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

53
54



55
56
57
58
59
60
61
}

/// Launch a thread that will process incoming messages from an ump-ng server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
pub fn spawn<P, S, R, E, RV>(
  mut handler: impl Handler<P, S, R, E, RV> + Send + 'static
) -> (Client<P, S, R, E>, JoinHandle<Option<RV>>)
where
  P: 'static + Send,
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send

{
  let (server, client) = channel();



  let weak_client = client.weak();
  let jh = thread::spawn(move || {
    handler.init(weak_client);
    let ret = loop {
      match server.wait() {
        Ok(msg) => match msg {
          MsgType::Put(m) => match handler.post(m) {







|
|






|
>


>
>
>







103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
}

/// Launch a thread that will process incoming messages from an ump-ng server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
pub fn spawn<P, S, R, E, RV, F>(
  hbldr: impl FnOnce(&Client<P, S, R, E>) -> F
) -> (Client<P, S, R, E>, JoinHandle<Option<RV>>)
where
  P: 'static + Send,
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send,
  F: Handler<P, S, R, E, RV> + Send + 'static
{
  let (server, client) = channel();

  let mut handler = hbldr(&client);

  let weak_client = client.weak();
  let jh = thread::spawn(move || {
    handler.init(weak_client);
    let ret = loop {
      match server.wait() {
        Ok(msg) => match msg {
          MsgType::Put(m) => match handler.post(m) {
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
    };
    handler.term(ret)
  });

  (client, jh)
}


/// Spawn a thread to run a pre-initialized handler.
///
/// It is assumed that the caller has initialized the handler, thus its
/// `init()` method will not be called.
///
/// ```
/// use std::ops::ControlFlow;
/// use ump_ng_server::{ump_ng, spawn_thread_preinit, ThreadedHandler,
///   ReplyContext};
/// let (server, client) = ump_ng::channel::<(), (), (), ()>();
///
/// struct MyHandler {
///   wclnt: ump_ng::WeakClient<(), (), (), ()>
/// }
/// impl ThreadedHandler<(), (), (), (), ()> for MyHandler {
///   fn post(&mut self, _: ()) -> ControlFlow<(), ()> {
///     ControlFlow::Continue(())
///   }
///   fn req(&mut self, _: (), rctx: ReplyContext<(), ()>)
///     -> ControlFlow<(), ()> {
///     ControlFlow::Continue(())
///   }
/// }
/// let handler = MyHandler {
///   wclnt: client.weak()
/// };
/// let jh = spawn_thread_preinit(server, handler);
///
/// // drop client to force dispatch loop to terminate
/// drop(client);
///
/// jh.join();
/// ```
pub fn spawn_preinit<P, S, R, E, RV>(
  server: Server<P, S, R, E>,
  mut handler: impl Handler<P, S, R, E, RV> + Send + 'static
) -> JoinHandle<Option<RV>>
where
  P: 'static + Send,
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send,
  RV: 'static + Send
{
  thread::spawn(move || {
    let ret = loop {
      match server.wait() {
        Ok(msg) => match msg {
          MsgType::Put(m) => match handler.post(m) {
            ControlFlow::Continue(_) => {}
            ControlFlow::Break(rv) => break Some(rv)
          },
          MsgType::Request(m, rctx) => match handler.req(m, rctx) {
            ControlFlow::Continue(_) => {}
            ControlFlow::Break(rv) => break Some(rv)
          }
        },
        Err(_) => break None
      }
    };
    handler.term(ret)
  })
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<

142
143
144
145
146
147
148

































































149
    };
    handler.term(ret)
  });

  (client, jh)
}


































































// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to tests/simple.rs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
mod common;

use common::{Put, Reply, Request, ThreadedServer};

// Check the signalled state, which should be false.
// Then Signal.
// And finally check signalled state again, which should be true.
#[test]
fn no_clients() {
  let handler = ThreadedServer::default();

  let (clnt, jh) = ump_ng_server::spawn_thread(handler);

  let reply = clnt.req(Request::GetSignalState).unwrap();
  assert_eq!(reply, Reply::SignalState(false));

  clnt.post(Put::Signal).unwrap();

  let reply = clnt.req(Request::GetSignalState).unwrap();









<
|
|







1
2
3
4
5
6
7
8
9

10
11
12
13
14
15
16
17
18
mod common;

use common::{Put, Reply, Request, ThreadedServer};

// Check the signalled state, which should be false.
// Then Signal.
// And finally check signalled state again, which should be true.
#[test]
fn no_clients() {

  let (clnt, jh) =
    ump_ng_server::spawn_thread(|_clnt| ThreadedServer::default());

  let reply = clnt.req(Request::GetSignalState).unwrap();
  assert_eq!(reply, Reply::SignalState(false));

  clnt.post(Put::Signal).unwrap();

  let reply = clnt.req(Request::GetSignalState).unwrap();

Changes to tests/term.rs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
mod common;

use std::time::Duration;

use common::{Put, Reply, Request, ThreadedServer};

// Terminate the dispatcher loop by dropping the only client.
#[test]
fn no_clients() {
  let handler = ThreadedServer::default();

  let (clnt, jh) = ump_ng_server::spawn_thread(handler);

  // Drop the (only) client, which should cause dispatch loop to terminate.
  drop(clnt);

  // Termination by clients disappearing should return None
  assert_eq!(jh.join().unwrap(), None);
}


// Terminate the dispatcher loop by explicitly requesting it to terminate from
// its handler.
#[test]
fn handler_req_term() {
  let handler = ThreadedServer::default();

  let (clnt, jh) = ump_ng_server::spawn_thread(handler);

  assert_eq!(clnt.req(Request::Add(2, 4)).unwrap(), Reply::Sum(6));
  assert_eq!(clnt.req(Request::Croak).unwrap(), Reply::OkIWillCroak);

  let rv = jh.join().unwrap();

  assert_eq!(rv, Some(42));
}


#[test]
fn handler_put_term() {
  let handler = ThreadedServer::default();

  let (clnt, jh) = ump_ng_server::spawn_thread(handler);

  clnt.post(Put::Signal).unwrap();
  clnt.post(Put::Croak).unwrap();

  let rv = jh.join().unwrap();

  assert_eq!(rv, Some(42));
}

// Populate the queue with a bunch of nodes, then drop the client while there
// are still nodes in the queue
#[test]
fn handler_delay_term() {
  let handler = ThreadedServer::default();

  let (clnt, jh) = ump_ng_server::spawn_thread(handler);

  // Push a bunch of sleep nodes on the queue
  for _ in 0..10 {
    clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap();
  }

  // Drop the client
  drop(clnt);

  let rv = jh.join().unwrap();

  assert_eq!(rv, None);
}

// Make sure all the sleepers are processed.
// The explicit termination request should take precedence over terminating
// over all clients disappearing.
#[test]
fn handler_delay_term_count() {
  let handler = ThreadedServer::default();

  let (clnt, jh) = ump_ng_server::spawn_thread(handler);

  for _ in 0..10 {
    clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap();
  }
  clnt.post(Put::CroakSleepCount).unwrap();

  drop(clnt);









<
<
|













<
<
|












<
<
|













<
<
|



















<
<
|







1
2
3
4
5
6
7
8
9


10
11
12
13
14
15
16
17
18
19
20
21
22
23


24
25
26
27
28
29
30
31
32
33
34
35
36


37
38
39
40
41
42
43
44
45
46
47
48
49
50


51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70


71
72
73
74
75
76
77
78
mod common;

use std::time::Duration;

use common::{Put, Reply, Request, ThreadedServer};

// Terminate the dispatcher loop by dropping the only client.
#[test]
fn no_clients() {


  let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());

  // Drop the (only) client, which should cause dispatch loop to terminate.
  drop(clnt);

  // Termination by clients disappearing should return None
  assert_eq!(jh.join().unwrap(), None);
}


// Terminate the dispatcher loop by explicitly requesting it to terminate from
// its handler.
#[test]
fn handler_req_term() {


  let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());

  assert_eq!(clnt.req(Request::Add(2, 4)).unwrap(), Reply::Sum(6));
  assert_eq!(clnt.req(Request::Croak).unwrap(), Reply::OkIWillCroak);

  let rv = jh.join().unwrap();

  assert_eq!(rv, Some(42));
}


#[test]
fn handler_put_term() {


  let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());

  clnt.post(Put::Signal).unwrap();
  clnt.post(Put::Croak).unwrap();

  let rv = jh.join().unwrap();

  assert_eq!(rv, Some(42));
}

// Populate the queue with a bunch of nodes, then drop the client while there
// are still nodes in the queue
#[test]
fn handler_delay_term() {


  let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());

  // Push a bunch of sleep nodes on the queue
  for _ in 0..10 {
    clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap();
  }

  // Drop the client
  drop(clnt);

  let rv = jh.join().unwrap();

  assert_eq!(rv, None);
}

// Make sure all the sleepers are processed.
// The explicit termination request should take precedence over terminating
// over all clients disappearing.
#[test]
fn handler_delay_term_count() {


  let (clnt, jh) = ump_ng_server::spawn_thread(|_| ThreadedServer::default());

  for _ in 0..10 {
    clnt.post(Put::Sleep(Duration::from_millis(100))).unwrap();
  }
  clnt.post(Put::CroakSleepCount).unwrap();

  drop(clnt);

Changes to www/changelog.md.

1
2
3
4
5
6
7
8
9
10
11
12




















13
14
15
16
17
18
19
# Change Log

## [Unreleased]

[Details](/vdiff?from=ump-ng-server-0.1.2&to=trunk)

### Added

### Changed

### Removed





















---

## [0.1.2] - 2024-01-21

[Details](/vdiff?from=ump-ng-server-0.1.1&to=ump-ng-server-0.1.2)

### Added




|







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# Change Log

## [Unreleased]

[Details](/vdiff?from=ump-ng-server-0.2.0&to=trunk)

### Added

### Changed

### Removed

---

## [0.2.0] - 2024-01-28

[Details](/vdiff?from=ump-ng-server-0.1.2&to=ump-ng-server-0.2.0)

### Changed

- Instead of taking in an `impl Handler` into the `{thread,task}::spawn()`
  function, take in a closure that returns the handler.  A reference to the
  handler channel's client endpoint is passed to the closure, which makes it
  possible to store `Client`/`WeakClient` in the handler, without involving an
  `Option` (or similar).

### Removed

- Removed the `{thread,task}::spawn_preinit()` functions.  These no longer
  really serve a purpose since the deferred handler construction was
  introduced.

---

## [0.1.2] - 2024-01-21

[Details](/vdiff?from=ump-ng-server-0.1.1&to=ump-ng-server-0.1.2)

### Added