ump-ng

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From ump-ng-0.1.0 To ump-ng-0.2.0

2024-09-12
20:25
Move clippy config from bacon.toml to Cargo.toml. check-in: e3dd8a1bd0 user: jan tags: trunk
2024-09-10
00:30
Rename `MsgType::Put` to `MsgType::Post`. Dependency maintenance. check-in: 6f4b9d43d1 user: jan tags: trunk, ump-ng-0.2.0
2024-09-09
20:59
Make use of updated swctx's ability to report when the waitctx has been dropped. Clippy fixups. check-in: a48d827d1a user: jan tags: trunk
2023-10-03
07:17
Document WeakClient. check-in: fa626b58be user: jan tags: trunk
2023-10-02
14:47
Happy clippy. check-in: 6b4a0d3b99 user: jan tags: trunk, ump-ng-0.1.0
14:44
Export WaitReply. check-in: 196e3c425c user: jan tags: trunk

Changes to Cargo.toml.

1
2
3

4
5

6
7
8
9
10
11
12
13
14
15

16
17
18




19
20
21
22


23
24
25
26

27
28
29
30
31
32
33
1
2

3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25



26
27
28
29
30

31
32
33
34
35
36
37
38


-
+


+










+



+
+
+
+

-
-
-
+
+



-
+







[package]
name = "ump-ng"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
license = "0BSD"
# https://crates.io/category_slugs
categories = [ "concurrency", "asynchronous" ]
keywords = [ "channel", "threads", "sync", "message-passing" ]
repository = "https://repos.qrnch.tech/pub/ump-ng"
description = "Micro message passing library for threads/tasks communication."
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "examples",
  "www",
  "bacon.toml",
  "rustfmt.toml"
]

# https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section
[badges]
maintenance = { status = "passively-maintained" }

[dependencies]
parking_lot = { version = "0.12.1" }
sigq = { version = "0.13.4" }
swctx = { version = "0.2.1" }
sigq = { version = "0.13.5" }
swctx = { version = "0.3.0" }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["async_tokio"] }
tokio = { version = "1.32.0", features = ["rt-multi-thread"] }
tokio = { version = "1.40.0", features = ["rt-multi-thread"] }

[[bench]]
name = "add_server"
harness = false

[package.metadata.docs.rs]
rustdoc-args = ["--generate-link-to-definition"]

Changes to README.md.

1
2
3
4
5
6
7
8
9








1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17









+
+
+
+
+
+
+
+
# Micro-Message Passing Library

The _ump-ng_ crate is a simple client/server message passing library for
intra-process communication.  Its primary purpose is to allow cross
async/non-async communication (for both the server and client endpoints).

ump-ng is similar to [ump](https://crates.io/crates/ump), but it has a
uni-directional message passing operation.

[![Crates.io][crates-badge]][crates-url]
[![0BSD licensed][0bsd-badge]][0bsd-url]

[crates-badge]: https://img.shields.io/crates/v/ump-ng.svg
[crates-url]: https://crates.io/crates/ump-ng
[0bsd-badge]: https://img.shields.io/badge/license-0BSD-blue.svg
[0bsd-url]: https://opensource.org/license/0bsd

Added bacon.toml.



































































































































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
# This is a configuration file for the bacon tool
#
# Bacon repository: https://github.com/Canop/bacon
# Complete help on configuration: https://dystroy.org/bacon/config/
# You can also check bacon's own bacon.toml file
#  as an example: https://github.com/Canop/bacon/blob/main/bacon.toml

# For information about clippy lints, see:
# https://github.com/rust-lang/rust-clippy/blob/master/README.md

#default_job = "check"
default_job = "clippy-all-pedantic"

[jobs.check]
command = ["cargo", "check", "--color", "always"]
need_stdout = false

[jobs.check-all]
command = ["cargo", "check", "--all-targets", "--color", "always"]
need_stdout = false

# Run clippy on the default target
[jobs.clippy]
command = [
    "cargo", "clippy",
    "--color", "always",
]
need_stdout = false

# Run clippy on all targets
# To disable some lints, you may change the job this way:
#    [jobs.clippy-all]
#    command = [
#        "cargo", "clippy",
#        "--all-targets",
#        "--color", "always",
#    	 "--",
#    	 "-A", "clippy::bool_to_int_with_if",
#    	 "-A", "clippy::collapsible_if",
#    	 "-A", "clippy::derive_partial_eq_without_eq",
#    ]
# need_stdout = false
[jobs.clippy-all]
command = [
    "cargo", "clippy",
    "--all-targets",
    "--color", "always",
]
need_stdout = false

[jobs.clippy-pedantic]
command = [
    "cargo", "clippy",
    "--color", "always",
    "--",
    "-Wclippy::all",
    "-Wclippy::pedantic",
    "-Wclippy::nursery",
    "-Wclippy::cargo"
]
need_stdout = false

[jobs.clippy-all-pedantic]
command = [
    "cargo", "clippy",
    "--all-targets",
    "--color", "always",
    "--",
    "-Wclippy::all",
    "-Wclippy::pedantic",
    "-Wclippy::nursery",
    "-Wclippy::cargo"
]
need_stdout = false

# This job lets you run
# - all tests: bacon test
# - a specific test: bacon test -- config::test_default_files
# - the tests of a package: bacon test -- -- -p config
[jobs.test]
command = [
    "cargo", "test", "--color", "always",
    "--", "--color", "always", # see https://github.com/Canop/bacon/issues/124
]
need_stdout = true

[jobs.doc]
command = ["cargo", "doc", "--color", "always", "--no-deps"]
need_stdout = false

# If the doc compiles, then it opens in your browser and bacon switches
# to the previous job
[jobs.doc-open]
command = ["cargo", "doc", "--color", "always", "--no-deps", "--open"]
need_stdout = false
on_success = "back" # so that we don't open the browser at each change

# You can run your application and have the result displayed in bacon,
# *if* it makes sense for this crate.
# Don't forget the `--color always` part or the errors won't be
# properly parsed.
# If your program never stops (eg a server), you may set `background`
# to false to have the cargo run output immediately displayed instead
# of waiting for program's end.
[jobs.run]
command = [
    "cargo", "run",
    "--color", "always",
    # put launch parameters for your program behind a `--` separator
]
need_stdout = true
allow_warnings = true
background = true

# This parameterized job runs the example of your choice, as soon
# as the code compiles.
# Call it as
#    bacon ex -- my-example
[jobs.ex]
command = ["cargo", "run", "--color", "always", "--example"]
need_stdout = true
allow_warnings = true

# You may define here keybindings that would be specific to
# a project, for example a shortcut to launch a specific job.
# Shortcuts to internal functions (scrolling, toggling, etc.)
# should go in your personal global prefs.toml file instead.
[keybindings]
# alt-m = "job:my-job"
c = "job:clippy-all" # comment this to have 'c' run clippy on only the default target

Changes to benches/add_server.rs.

1
2
3
4
5
6
7
8
9
10
11
12

13
14
15
16
17
18
19
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20












+







use std::thread;

use criterion::{criterion_group, criterion_main, Criterion};

use ump_ng::{channel, MsgType};

enum Ops {
  Die,
  Add(i32, i32),
  AddThreaded(i32, i32)
}

#[allow(clippy::missing_panics_doc)]
pub fn criterion_benchmark(c: &mut Criterion) {
  let mut group = c.benchmark_group("send operation");

  let (server, client) = channel::<(), Ops, i32, ()>();

  let server_thread = thread::spawn(move || {
    let mut croak = false;
42
43
44
45
46
47
48
49

50
51
52
53
54
55
56
57
58
59
60

61
62
63
64
65
66
67
68
69
70
71
72

73
74
75
76
77
78
79
80
81
82
83
84

85
86
87
88
89
90
91
43
44
45
46
47
48
49

50
51
52
53
54
55
56
57
58
59
60

61
62
63
64
65
66
67
68
69
70
71
72

73
74
75
76
77
78
79
80
81
82
83
84

85
86
87
88
89
90
91
92







-
+










-
+











-
+











-
+








  group.bench_function("add", |b| {
    b.iter(|| {
      p += 2;
      q -= 3;
      let result = client.req(Ops::Add(p, q)).unwrap();
      assert_eq!(result, q + p);
    })
    });
  });

  p = 0;
  q = 0;
  group.bench_function("add (threaded)", |b| {
    b.iter(|| {
      p += 2;
      q -= 3;
      let result = client.req(Ops::AddThreaded(p, q)).unwrap();
      assert_eq!(result, q + p);
    })
    });
  });


  let rt = tokio::runtime::Runtime::new().unwrap();
  group.bench_function("add (async)", |b| {
    b.to_async(&rt).iter(|| async {
      let p = 1;
      let q = 2;

      let result = client.areq(Ops::Add(p, q)).await.unwrap();
      assert_eq!(result, q + p);
    })
    });
  });


  let rt = tokio::runtime::Runtime::new().unwrap();
  group.bench_function("add (async, threaded)", |b| {
    b.to_async(&rt).iter(|| async {
      let p = 1;
      let q = 2;

      let result = client.areq(Ops::AddThreaded(p, q)).await.unwrap();
      assert_eq!(result, q + p);
    })
    });
  });


  let result = client.req(Ops::Die).unwrap();
  assert_eq!(result, 0);

  server_thread.join().unwrap();

Changes to examples/many_once.rs.

31
32
33
34
35
36
37
38

39
40
41
42
43
44


45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

60
61

62
63
64
65
66
67
68
69
70
71
72
31
32
33
34
35
36
37

38
39
40
41
42


43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

59
60

61
62
63
64
65
66
67
68
69
70
71
72







-
+




-
-
+
+














-
+

-
+











    while count < nclients {
      // Wait for data to arrive from a client
      println!("Server waiting for message ..");
      let MsgType::Request(data, rctx) = server.wait().unwrap() else {
        panic!("Unexpected message operation type");
      };

      println!("Server received: '{}'", data);
      println!("Server received: '{data}'");

      // .. process data from client ..

      // Reply to client
      let reply = format!("Hello, {}!", data);
      println!("Server replying '{}'", reply);
      let reply = format!("Hello, {data}!");
      println!("Server replying '{reply}'");
      rctx.reply(reply).unwrap();

      // Increase message counter
      count += 1;
    }
    println!("Server done");
  });

  let mut join_handles = Vec::new();
  for i in 0..nclients {
    let client_clone = client.clone();
    let client_thread = thread::spawn(move || {
      let name = format!("Client {}", i + 1);
      let msg = String::from(&name);
      println!("{} sending '{}'", name, msg);
      println!("{name} sending '{msg}'");
      let reply = client_clone.req(msg).unwrap();
      println!("{} received reply '{}' -- done", name, reply);
      println!("{name} received reply '{reply}' -- done");
    });
    join_handles.push(client_thread);
  }

  for n in join_handles {
    n.join().unwrap();
  }
  server_thread.join().unwrap();
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to examples/simple.rs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14


15
16
17

18
19
20
21
22
23


24
25
26
27
28
29
30
31
32

33
34

35
36
37
38
39
40
1
2
3
4
5
6
7
8
9
10
11
12


13
14
15
16

17
18
19
20
21


22
23
24
25
26
27
28
29
30
31

32
33

34
35
36
37
38
39
40












-
-
+
+


-
+




-
-
+
+








-
+

-
+






use std::thread;

use ump_ng::{channel, MsgType};

fn main() {
  let (server, client) = channel::<(), String, String, ()>();

  let server_thread = thread::spawn(move || {
    // Wait for data to arrive from a client
    println!("Server waiting for message ..");

    match server.wait().unwrap() {
      MsgType::Put(_m) => {
        panic!("Unexpected Put message")
      MsgType::Post(_m) => {
        panic!("Unexpected Post message")
      }
      MsgType::Request(data, rctx) => {
        println!("Server received: '{}'", data);
        println!("Server received: '{data}'");

        // Process data from client

        // Reply to client
        let reply = format!("Hello, {}!", data);
        println!("Server replying '{}'", reply);
        let reply = format!("Hello, {data}!");
        println!("Server replying '{reply}'");
        rctx.reply(reply).unwrap();
      }
    }

    println!("Server done");
  });

  let msg = String::from("Client");
  println!("Client sending '{}'", msg);
  println!("Client sending '{msg}'");
  let reply = client.req(msg).unwrap();
  println!("Client received reply '{}'", reply);
  println!("Client received reply '{reply}'");
  println!("Client done");

  server_thread.join().unwrap();
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to examples/threaded_handler.rs.

27
28
29
30
31
32
33
34

35
36
37
38
39
40


41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

57
58

59
60
61
62
63
64
65
66
67
68
69
27
28
29
30
31
32
33

34
35
36
37
38


39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

56
57

58
59
60
61
62
63
64
65
66
67
68
69







-
+




-
-
+
+















-
+

-
+











      let MsgType::Request(data, rctx) = server.wait().unwrap() else {
        panic!("Unexpected message operation type");
      };

      // Move the received data and reply context into a thread to allow other
      // messages to be received while processing this message.
      thread::spawn(move || {
        println!("Server received: '{}'", data);
        println!("Server received: '{data}'");

        // Process data from client

        // Reply to client
        let reply = format!("Hello, {}!", data);
        println!("Server replying '{}'", reply);
        let reply = format!("Hello, {data}!");
        println!("Server replying '{reply}'");
        rctx.reply(reply).unwrap();
      });

      // Increase message counter
      count += 1;
    }
    println!("Server done");
  });

  let mut join_handles = Vec::new();
  for i in 0..nclients {
    let client_clone = client.clone();
    let client_thread = thread::spawn(move || {
      let name = format!("Client {}", i + 1);
      let msg = String::from(&name);
      println!("{} sending '{}'", name, msg);
      println!("{name} sending '{msg}'");
      let reply = client_clone.req(msg).unwrap();
      println!("{} received reply '{}' -- done", name, reply);
      println!("{name} received reply '{reply}' -- done");
    });
    join_handles.push(client_thread);
  }

  for n in join_handles {
    n.join().unwrap();
  }
  server_thread.join().unwrap();
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/client.rs.

16
17
18
19
20
21
22




23
24
25
26

27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

47
48

49
50
51
52



53
54
55
56
57
58
59
16
17
18
19
20
21
22
23
24
25
26
27
28
29

30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

53
54
55


56
57
58
59
60
61
62
63
64
65







+
+
+
+



-
+




















+

-
+


-
-
+
+
+







impl<P, S, R, E> Client<P, S, R, E>
where
  P: 'static + Send,
  R: 'static + Send,
  E: 'static + Send
{
  /// Transmit a uni-directional message to the server end-point.
  ///
  /// # Errors
  /// [`Error::ServerDisappeared`] means the [`Server`](super::Server)
  /// end-point has been dropped.
  pub fn post(&self, msg: P) -> Result<(), Error<E>> {
    self
      .0
      .push(InnerMsgType::Put(msg))
      .push(InnerMsgType::Post(msg))
      .map_err(|_| Error::ServerDisappeared)?;

    Ok(())
  }

  /// Send a message to the server, wait for a reply, and return the reply.
  ///
  /// A complete round-trip (the message is delivered to the server, and the
  /// server sends a reply) must complete before this function returns
  /// success.
  ///
  /// This method is _currently_ reentrant: It is safe to use share a single
  /// `Client` among multiple threads.  _This may change in the future_; it's
  /// recommended to not rely on this.  The recommended way to send messages to
  /// a server from multiple threads is to clone the `Client` and assign one
  /// separate `Client` to each thread.
  ///
  /// # Return
  /// On success the function will return `Ok(msg)`.
  ///
  /// # Errors
  /// If the linked server object has been released, or is released while the
  /// message is in the server's queue, `Err(Error:ServerDisappeared)` will be
  /// message is in the server's queue, [`Error::ServerDisappeared`] will be
  /// returned.
  ///
  /// If the server never replied to the message and the reply context was
  /// dropped `Err(Error::NoReply)` will be returned.
  /// If the server never replied to the message and the
  /// [`ReplyContext`](super::ReplyContext) was dropped [`Error::NoReply`] will
  /// be returned.
  ///
  /// If an application specific error occurs it will be returned as a
  /// `Err(Error::App(E))`, where `E` is the error type used when creating the
  /// [`channel`](crate::channel).
  pub fn req(&self, out: S) -> Result<R, Error<E>> {
    // Create a per-call reply context.
    // This context could be created when the Client object is being created
119
120
121
122
123
124
125




126
127
128
129
130
131
132
133
134
135
136
137






138
139
140
141
142
143
144
145
146
147
148

149
150


151
152
153
154
155
156
157
158
159
160
161
162
163
164

165
166
167
168
169
170
171
172
173
174
175










176
177
178
179
180
181
182
183







184
185
186
187
188


189
190

191
192
193
194

195
196
197
198
199
200


201
202
203
204
205
206
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145


146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163


164
165
166
167
168
169
170
171
172
173
174
175
176
177
178

179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206


207
208
209
210
211
212
213
214
215
216
217
218
219
220
221

222
223
224
225

226
227
228
229
230
231

232
233
234
235
236
237
238
239







+
+
+
+










-
-
+
+
+
+
+
+











+
-
-
+
+













-
+











+
+
+
+
+
+
+
+
+
+






-
-
+
+
+
+
+
+
+





+
+

-
+



-
+





-
+
+






  ///
  /// let reply = wrctx.wait().unwrap();
  /// println!("Client received reply '{}'", reply);
  /// println!("Client done");
  ///
  /// server_thread.join().unwrap();
  /// ```
  ///
  /// # Errors
  /// [`Error::ServerDisappeared`] means the [`Server`](super::Server)
  /// end-point has been dropped.
  pub fn req_async(&self, out: S) -> Result<WaitReply<R, E>, Error<E>> {
    let (sctx, wctx) = swctx::mkpair();
    self
      .0
      .push(InnerMsgType::Request(out, sctx))
      .map_err(|_| Error::ServerDisappeared)?;

    Ok(WaitReply(wctx))
  }

  /// Same as [`Client::req()`] but for use in `async` contexts.
  pub async fn areq(&self, out: S) -> Result<R, Error<E>> {
  /// Same as [`Client::req()`], but for use in `async` contexts.
  #[allow(clippy::missing_errors_doc)]
  pub async fn areq(&self, out: S) -> Result<R, Error<E>>
  where
    S: Send
  {
    let (sctx, wctx) = swctx::mkpair();

    self
      .0
      .push(InnerMsgType::Request(out, sctx))
      .map_err(|_| Error::ServerDisappeared)?;

    Ok(wctx.wait_async().await?)
  }

  /// Create a weak `Client` reference.
  #[must_use]
  pub fn weak(&self) -> WeakClient<P, S, R, E> {
    WeakClient(self.0.weak())
  pub fn weak(&self) -> Weak<P, S, R, E> {
    Weak(self.0.weak())
  }
}

impl<P, S, R, E> Clone for Client<P, S, R, E> {
  /// Clone a client.
  ///
  /// When a client is cloned the new object will be linked to the same server,
  /// but in all other respects the clone is a completely independent client.
  ///
  /// This means that a cloned client can be passed to a new thread/task and
  /// make new independent calls to the server without any risk of collision
  /// between clone and the original client object.
  fn clone(&self) -> Self {
    Client(self.0.clone())
    Self(self.0.clone())
  }
}


/// Context used to wait for a server to reply to a request.
pub struct WaitReply<R, E>(swctx::WaitCtx<R, RCtxState, E>);

impl<R, E> WaitReply<R, E> {
  /// Block and wait for a reply.
  ///
  /// For use in non-`async` threads.
  ///
  /// # Errors
  /// [`Error::ServerDisappeared`] means the linked server object has been
  /// released.
  ///
  /// If the [`ReplyContext`](super::ReplyContext) is dropped by the server
  /// handler it replies to the message, [`Error::NoReply`] will be returned.
  ///
  /// If an application specific error occurs it will be returned in
  /// [`Error::App`].
  pub fn wait(self) -> Result<R, Error<E>> {
    Ok(self.0.wait()?)
  }

  /// Block and wait for a reply.
  ///
  /// For use in `async` tasks.
  pub async fn wait_async(self) -> Result<R, Error<E>> {
  /// Same as [`WaitReply::wait()`], but for use in `async` contexts.
  #[allow(clippy::missing_errors_doc)]
  pub async fn wait_async(self) -> Result<R, Error<E>>
  where
    R: Send,
    E: Send
  {
    Ok(self.0.wait_async().await?)
  }
}


/// A weak client reference that can be upgraded to a [`Client`] as long as
/// other `Client` objects till exist.
#[repr(transparent)]
pub struct WeakClient<P, S, R, E>(
pub struct Weak<P, S, R, E>(
  pub(crate) sigq::WeakPusher<InnerMsgType<P, S, R, E>>
);

impl<P, S, R, E> Clone for WeakClient<P, S, R, E> {
impl<P, S, R, E> Clone for Weak<P, S, R, E> {
  fn clone(&self) -> Self {
    Self(self.0.clone())
  }
}

impl<P, S, R, E> WeakClient<P, S, R, E> {
impl<P, S, R, E> Weak<P, S, R, E> {
  #[must_use]
  pub fn upgrade(&self) -> Option<Client<P, S, R, E>> {
    self.0.upgrade().map(|x| Client(x))
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/err.rs.

8
9
10
11
12
13
14




15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34



35
36
37

38
39
40





41
42
43

44
45
46
47
48
49

50
51




52
53
54
55
56
57





58
59
60
61
62
63
64
65
66
67


68

69

70
71
72
73
74
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

44
45
46
47
48
49
50
51
52
53
54

55
56
57
58
59
60

61
62

63
64
65
66
67
68




69
70
71
72
73
74
75
76
77
78
79
80
81


82
83
84
85

86
87
88
89
90
91







+
+
+
+




















+
+
+


-
+



+
+
+
+
+


-
+





-
+

-
+
+
+
+


-
-
-
-
+
+
+
+
+








-
-
+
+

+
-
+





  /// The server object has shut down.
  ///
  /// This happens when clients:
  /// - attempt to send messages to a server that has been deallocated.
  /// - have their requests dropped from the serrver's queue because the
  ///   server itself was deallocated.
  ServerDisappeared,

  /// A message reply could not be completed because the original requestor
  /// disappearing.
  OriginDisappeared,

  /// All the client objects have shut down.
  ///
  /// This happens when a server attempts to pull the latest node on off the
  /// queue, but there are no more nodes on the queue and all the associated
  /// client objects have been released (impled:  New nodes can never be added
  /// to the queue).
  ClientsDisappeared,

  /// The message was delivered to the server, but the reply context was
  /// released before sending back a reply.
  NoReply,

  /// Application-specific error.
  /// The `E` type is typically declared as the third generic parameter to
  /// [`channel`](crate::channel()).
  App(E)
}

impl<E> Error<E> {
  /// Get application-specific error.
  ///
  /// Will return `None` is the error is not [`Error::App`].
  pub fn into_apperr(self) -> Option<E> {
    match self {
      Error::App(e) => Some(e),
      Self::App(e) => Some(e),
      _ => None
    }
  }

  /// Unwrap application-specific error.
  ///
  /// # Panics
  /// If the error is not [`Error::App`] this function will panic.
  pub fn unwrap_apperr(self) -> E {
    match self {
      Error::App(e) => e,
      Self::App(e) => e,
      _ => panic!("Not an Error::App")
    }
  }
}

impl<E: fmt::Debug> std::error::Error for Error<E> {}
impl<E> std::error::Error for Error<E> where E: std::error::Error {}

impl<E: fmt::Debug> fmt::Display for Error<E> {
impl<E> fmt::Display for Error<E>
where
  E: std::error::Error
{
  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
    match self {
      Error::ServerDisappeared => write!(f, "Server disappeared"),
      Error::ClientsDisappeared => write!(f, "Clients disappeared"),
      Error::NoReply => write!(f, "Server didn't reply"),
      Error::App(err) => write!(f, "Application error; {:?}", err)
      Self::ServerDisappeared => write!(f, "Server disappeared"),
      Self::OriginDisappeared => write!(f, "Requestor disappeared"),
      Self::ClientsDisappeared => write!(f, "Clients disappeared"),
      Self::NoReply => write!(f, "Server didn't reply"),
      Self::App(err) => write!(f, "Application error; {err}")
    }
  }
}

impl<E> From<swctx::Error<RCtxState, E>> for Error<E> {
  fn from(err: swctx::Error<RCtxState, E>) -> Self {
    match err {
      swctx::Error::Aborted(state) => match state {
        RCtxState::Queued => Error::ServerDisappeared,
        RCtxState::Active => Error::NoReply
        RCtxState::Queued => Self::ServerDisappeared,
        RCtxState::Active => Self::NoReply
      },
      swctx::Error::LostWaiter => Self::OriginDisappeared,
      swctx::Error::App(e) => Error::App(e)
      swctx::Error::App(e) => Self::App(e)
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/lib.rs.

1
2
3
4
5

6
7
8
9
10
11
12
1
2
3
4

5
6
7
8
9
10
11
12




-
+







//! Micro Message Pass: Next Generation (ump-ng) is a library for passing
//! messages between thread/tasks.  It is similar to the `ump` library, but
//! with an added uni-directional message passing primitive.
//!
//! The primary purpose of ump(-ng) is to create simple RPC like designs, but
//! The primary purpose of ump(-ng) is to create simple RPC-like designs, but
//! between threads/tasks within a process rather than between processes over
//! networks.
//!
//! # High-level usage overview
//! An application calls [`channel`] to create a linked pair of a [`Server`]
//! and a [`Client`].
//!
39
40
41
42
43
44
45
46
47


48
49
50
51
52
53
54
39
40
41
42
43
44
45


46
47
48
49
50
51
52
53
54







-
-
+
+







//! let (server, client) = channel::<String, String, String, ()>();
//!
//! let server_thread = thread::spawn(move || {
//!   // Wait for data to arrive from a client
//!   loop {
//!     println!("Server waiting for message ..");
//!     match server.wait().unwrap() {
//!       MsgType::Put(data) => {
//!         println!("Server received Put: '{}'", data);
//!       MsgType::Post(data) => {
//!         println!("Server received Post: '{}'", data);
//!       }
//!       MsgType::Request(data, rctx) => {
//!         println!("Server received Request: '{}'", data);
//!
//!         // Process data from client
//!
//!         // Reply to client
120
121
122
123
124
125
126
127

128
129
130
131
132
133
134
120
121
122
123
124
125
126

127
128
129
130
131
132
133
134







-
+







mod err;
mod rctx;
mod server;

pub use err::Error;

pub use crate::{
  client::{Client, WaitReply, WeakClient},
  client::{Client, WaitReply, Weak as WeakClient},
  rctx::ReplyContext,
  server::{MsgType, Server}
};

/// Create a pair of linked [`Server`] and [`Client`] objects.
///
/// The [`Server`] object is used to wait for incoming messages from connected
144
145
146
147
148
149
150

151
152
153
154
155
156
157
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158







+







/// completely independent of the original client.
///
/// The `S` type parameter is the "send" data type that clients will transfer
/// to the server.  The `R` type parameter is the "receive" data type that
/// clients will receive from the server.  The `E` type parameter can be used
/// to return application specific errors from the server to the client.
#[allow(clippy::type_complexity)]
#[must_use]
pub fn channel<P, S, R, E>() -> (Server<P, S, R, E>, Client<P, S, R, E>) {
  let (qpusher, qpuller) = sigq::new();

  (Server(qpuller), Client(qpusher))
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/rctx.rs.

1
2
3
4
5
6
7
8
9
10
11

12
13
14
15
16
17
18
1
2
3
4
5
6
7
8
9
10

11
12
13
14
15
16
17
18










-
+







//! Allow a thread/task, crossing sync/async boundaries in either direction, to
//! deliver an expected piece of data to another thread/task, with
//! notification.
//!
//! These are simple channels used to deliver data from one endpoint to
//! another, where the receiver will block until data has been delivered.

use crate::err::Error;

#[derive(Clone, Default)]
pub(crate) enum RCtxState {
pub enum RCtxState {
  #[default]
  Queued,
  Active
}

/// Object used to reply to requests passed to the server.
pub struct ReplyContext<T, E>(swctx::SetCtx<T, RCtxState, E>);
35
36
37
38
39
40
41




42
43
44
45

46
47
48
49
50
51
52
35
36
37
38
39
40
41
42
43
44
45
46
47
48

49
50
51
52
53
54
55
56







+
+
+
+



-
+







  /// });
  /// let msg = String::from("Client");
  /// let reply = client.req(msg).unwrap();
  /// assert_eq!(reply, "Hello, Client!");
  /// server_thread.join().unwrap();
  /// ```
  ///
  /// # Errors
  /// If the originating caller is no longer waiting for a reply (i.e. was
  /// dropped) [`Error::OriginDisappeared`] is returned.
  ///
  /// # Semantics
  /// This call is safe to make after the server context has been released.
  pub fn reply(self, data: T) -> Result<(), Error<E>> {
    self.0.set(data);
    self.0.set(data)?;
    Ok(())
  }

  /// Return an error to originating client.
  /// This will cause the calling client to return an error.  The error passed
  /// in the `err` parameter will be wrapped in a `Error::App(err)`.
  ///
77
78
79
80
81
82
83




84
85
86
87

88
89
90
91
92
93
94
95











96
97
98
99
81
82
83
84
85
86
87
88
89
90
91
92
93
94

95
96
97
98
99




100
101
102
103
104
105
106
107
108
109
110
111
112
113
114







+
+
+
+



-
+




-
-
-
-
+
+
+
+
+
+
+
+
+
+
+




  ///   _ => {
  ///     panic!("Unexpected return value");
  ///   }
  /// }
  /// server_thread.join().unwrap();
  /// ```
  ///
  /// # Errors
  /// [`Error::OriginDisappeared`] indicates that the originating caller is no
  /// longer waiting for a reply (i.e. it was dropped).
  ///
  /// # Semantics
  /// This call is safe to make after the server context has been released.
  pub fn fail(self, err: E) -> Result<(), Error<E>> {
    self.0.fail(err);
    self.0.fail(err)?;
    Ok(())
  }
}

impl<T, E> From<swctx::SetCtx<T, RCtxState, E>> for ReplyContext<T, E> {
  fn from(sctx: swctx::SetCtx<T, RCtxState, E>) -> Self {
    sctx.set_state(RCtxState::Active);
    ReplyContext(sctx)
impl<T, E> TryFrom<swctx::SetCtx<T, RCtxState, E>> for ReplyContext<T, E> {
  type Error = Error<E>;

  /// Convert a `SetCtx` into a `ReplyContext`.
  ///
  /// Sets the `SetCtx`'s stat to _Active_.
  fn try_from(
    sctx: swctx::SetCtx<T, RCtxState, E>
  ) -> Result<Self, Self::Error> {
    let _ = sctx.set_state(RCtxState::Active);
    Ok(Self(sctx))
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/server.rs.

1
2
3
4
5
6
7


8
9
10
11
12
13
14

15
16
17
18
19
20
21




22
23

24
25
26
27
28

29
30

31
32
33
34
35
36
37
1
2
3
4
5


6
7
8
9
10
11
12
13

14
15
16
17
18
19


20
21
22
23
24

25
26
27
28
29

30
31

32
33
34
35
36
37
38
39





-
-
+
+






-
+





-
-
+
+
+
+

-
+




-
+

-
+







use crate::{
  err::Error,
  rctx::{RCtxState, ReplyContext}
};

pub(crate) enum InnerMsgType<P, S, R, E> {
  Put(P),
pub enum InnerMsgType<P, S, R, E> {
  Post(P),
  Request(S, swctx::SetCtx<R, RCtxState, E>)
}

/// Mesage operation types.
pub enum MsgType<P, S, R, E> {
  /// A uni-directional message pass.
  Put(P),
  Post(P),

  /// A message pass that expects a reply.
  Request(S, ReplyContext<R, E>)
}

impl<P, S, R, E> From<InnerMsgType<P, S, R, E>> for MsgType<P, S, R, E> {
  fn from(val: InnerMsgType<P, S, R, E>) -> MsgType<P, S, R, E> {
impl<P, S, R, E> TryFrom<InnerMsgType<P, S, R, E>> for MsgType<P, S, R, E> {
  type Error = Error<E>;

  fn try_from(val: InnerMsgType<P, S, R, E>) -> Result<Self, Self::Error> {
    match val {
      InnerMsgType::Put(msg) => MsgType::Put(msg),
      InnerMsgType::Post(msg) => Ok(Self::Post(msg)),
      InnerMsgType::Request(msg, irctx) => {
        // Create an application reply context from the reply context in the
        // queue Implicitly changes state of the reply context from
        // Queued to Waiting
        let rctx = ReplyContext::from(irctx);
        let rctx = ReplyContext::try_from(irctx)?;

        MsgType::Request(msg, rctx)
        Ok(Self::Request(msg, rctx))
      }
    }
  }
}

/// Representation of a server object.
///
52
53
54
55
56
57
58




59
60
61
62

63
64
65




66
67
68
69
70

71
72
73
74
75
76

77
78
79
80

81
82
83
84
85

86
87
88
89
90
91
54
55
56
57
58
59
60
61
62
63
64
65
66


67
68
69
70
71
72
73
74
75
76
77
78

79
80
81
82
83
84
85
86
87
88


89
90
91
92
93
94
95
96
97
98
99
100
101







+
+
+
+


-
-
+



+
+
+
+




-
+






+


-
-
+





+






{
  /// Block and wait, indefinitely, for an incoming message from a
  /// [`Client`](crate::Client).
  ///
  /// Returns the message sent by the client and a reply context.  The server
  /// must call [`ReplyContext::reply()`] on the reply context to pass a return
  /// value to the client.
  ///
  /// # Errors
  /// [`Error::ClientsDisappeared`] indicates that the queue is empty and
  /// all the client end-points have been dropped.
  pub fn wait(&self) -> Result<MsgType<P, S, R, E>, Error<E>> {
    let msg = self.0.pop().map_err(|_| Error::ClientsDisappeared)?;

    Ok(msg.into())
    msg.try_into()
  }

  /// Take next next message off queue or return `None` is queue is empty.
  ///
  /// # Errors
  /// [`Error::ClientsDisappeared`] indicates that the queue is empty and
  /// all the client end-points have been dropped.
  #[allow(clippy::type_complexity)]
  pub fn try_pop(&self) -> Result<Option<MsgType<P, S, R, E>>, Error<E>> {
    let msg = self.0.try_pop().map_err(|_| Error::ClientsDisappeared)?;
    if let Some(msg) = msg {
      Ok(Some(msg.into()))
      Ok(Some(msg.try_into()?))
    } else {
      Ok(None)
    }
  }

  /// Same as [`Server::wait()`], but for use in an `async` context.
  #[allow(clippy::missing_errors_doc)]
  pub async fn async_wait(&self) -> Result<MsgType<P, S, R, E>, Error<E>> {
    let msg = self.0.apop().await.map_err(|_| Error::ClientsDisappeared)?;

    Ok(msg.into())
    msg.try_into()
  }

  /// Returns a boolean indicating whether the queue is/was empty.  This isn't
  /// really useful unless used in very specific situations.  It mostly exists
  /// for test cases.
  #[must_use]
  pub fn was_empty(&self) -> bool {
    self.0.was_empty()
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to tests/async_client.rs.

44
45
46
47
48
49
50
51

52
53
54

55
56
57
58
59
60
44
45
46
47
48
49
50

51

52

53
54
55
56
57
58
59







-
+
-

-
+






      if let Reply::Sum(sum) = result {
        assert_eq!(sum, a + b);
      } else {
        panic!("Didn't get sum");
      }
    }
    let result = client.areq(Request::Croak).await.unwrap();
    if let Reply::OkICroaked = result {
    let Reply::OkICroaked = result else {
    } else {
      panic!("Didn't get a croak");
    }
    };
  });

  server_thread.join().unwrap();
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to tests/stress.rs.

13
14
15
16
17
18
19
20

21
22
23
24
25
26
27
13
14
15
16
17
18
19

20
21
22
23
24
25
26
27







-
+







  let (server, client) = channel::<(), Ops, i32, ()>();

  let server_thread = thread::spawn(move || {
    let mut croak = false;

    while !croak {
      match server.wait().unwrap() {
        MsgType::Put(_) => panic!("Unexpected Put message"),
        MsgType::Post(()) => panic!("Unexpected Post message"),
        MsgType::Request(data, rctx) => match data {
          Ops::Die => {
            croak = true;
            rctx.reply(0).unwrap();
          }
          Ops::Add(a, b) => {
            rctx.reply(a + b).unwrap();

Added tests/wait_disappear.rs.














































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
use ump_ng::{channel, Error, MsgType};

#[test]
fn wait_disappered_on_reply() {
  let (server, client) = channel::<String, String, String, ()>();

  // Generate a request that will return a wait context.
  let wctx = client.req_async(String::from("hello"));

  // Get the message (and reply context) from server end-point
  let MsgType::Request(_msg, rctx) = server.wait().unwrap() else {
    panic!("Unexpectedly not MsgType::Request");
  };

  // nuke the wait context
  drop(wctx);

  // Replying should fail, because wctx has been dropped
  let Err(Error::OriginDisappeared) = rctx.reply(String::from("ahoy")) else {
    panic!("Unexpected error");
  };
}

#[test]
fn wait_disappered_on_fail() {
  let (server, client) = channel::<String, String, String, ()>();

  // Generate a request that will return a wait context.
  let wctx = client.req_async(String::from("hello"));

  // Get the message (and reply context) from server end-point
  let MsgType::Request(_msg, rctx) = server.wait().unwrap() else {
    panic!("Unexpectedly not MsgType::Request");
  };

  // nuke the wait context
  drop(wctx);

  // Failing should fail, because wctx has been dropped
  let Err(Error::OriginDisappeared) = rctx.fail(()) else {
    panic!("Unexpected error");
  };
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to www/changelog.md.

1
2


3


4
5
6
7
8








9


10
11
12
13
14
15
16
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30


+
+

+
+





+
+
+
+
+
+
+
+

+
+







# Change Log

⚠️  indicates a breaking change.

## [Unreleased]

[Details](/vdiff?from=ump-ng-0.1.0&to=trunk)

### Added

### Changed

- Add some `Send` bounds to make `Future`s `Send`.
- Update to `swctx` to `0.3.0`, allowing `ReplyContext` to detect if the
  originating client has been dropped.
- ⚠️ Require `std::error::Error` bound on application-specific error `E` for
  `std::error::Error` implementation on `Error<E>` as well as `fmt::Display`
  for `Error<E>`.
- ⚠️ Rename `MsgType::Put` to `MsgType::Post`.

### Removed

- Remove superfluous `parking_lot` dependency.

---

## [0.1.0] - 2023-10-02

Initial release.