ump

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From ump-0.12.1 To ump-0.13.0

2024-09-12
20:23
Move clippy configuration from bacon.toml to Cargo.toml. Leaf check-in: ccc864a567 user: jan tags: trunk
2024-09-10
00:14
Change log updates. check-in: f43f27bdc2 user: jan tags: trunk, ump-0.13.0
00:05
Add test cases for wait context disappearing. check-in: 1857251062 user: jan tags: trunk
2024-01-12
14:31
Remove dev-docs feature. check-in: 2ba8f53f39 user: jan tags: trunk
2023-10-02
13:32
Release maintenance. check-in: 45f4a89db5 user: jan tags: trunk, ump-0.12.1
2023-09-17
10:16
Add changelog separators. check-in: e1451ac681 user: jan tags: trunk

Changes to Cargo.toml.

1
2
3

4
5

6
7
8
9
10
11
12
13
14
15
16

17
18
19

20
21


22
23
24
25
26


27
28
29
30

31
32
33
34
35
36
37
1
2

3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22


23
24
25
26



27
28
29
30
31

32
33
34
35
36
37
38
39


-
+


+











+



+
-
-
+
+


-
-
-
+
+



-
+







[package]
name = "ump"
version = "0.12.1"
version = "0.13.0"
edition = "2021"
license = "0BSD"
# https://crates.io/category_slugs
categories = [ "concurrency", "asynchronous" ]
keywords = [ "channel", "threads", "sync", "message-passing" ]
repository = "https://repos.qrnch.tech/pub/ump"
description = "Micro message passing library for threads/tasks communication."
rust-version = "1.56"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "examples",
  "www",
  "bacon.toml",
  "rustfmt.toml"
]

# https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section
[features]
dev-docs = []
[badges]
maintenance = { status = "passively-maintained" }

[dependencies]
parking_lot = { version = "0.12.1" }
sigq = { version = "0.13.4" }
swctx = { version = "0.2.1" }
sigq = { version = "0.13.5" }
swctx = { version = "0.3.0" }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["async_tokio"] }
tokio = { version = "1.32.0", features = ["rt-multi-thread"] }
tokio = { version = "1.40.0", features = ["rt-multi-thread"] }

[[bench]]
name = "add_server"
harness = false

[package.metadata.docs.rs]
rustdoc-args = ["--generate-link-to-definition"]

Changes to README.md.

1
2
3
4
5
6








1
2
3
4
5
6
7
8
9
10
11
12
13
14






+
+
+
+
+
+
+
+
# Micro-Message Passing Library

The _ump_ crate is a simple client/server message passing library for
intra-process communication.  Its primary purpose is to allow cross
async/non-async communication (for both the server and client endpoints).

[![Crates.io][crates-badge]][crates-url]
[![0BSD licensed][0bsd-badge]][0bsd-url]

[crates-badge]: https://img.shields.io/crates/v/ump.svg
[crates-url]: https://crates.io/crates/ump
[0bsd-badge]: https://img.shields.io/badge/license-0BSD-blue.svg
[0bsd-url]: https://opensource.org/license/0bsd

Added bacon.toml.



































































































































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
# This is a configuration file for the bacon tool
#
# Bacon repository: https://github.com/Canop/bacon
# Complete help on configuration: https://dystroy.org/bacon/config/
# You can also check bacon's own bacon.toml file
#  as an example: https://github.com/Canop/bacon/blob/main/bacon.toml

# For information about clippy lints, see:
# https://github.com/rust-lang/rust-clippy/blob/master/README.md

#default_job = "check"
default_job = "clippy-all-pedantic"

[jobs.check]
command = ["cargo", "check", "--color", "always"]
need_stdout = false

[jobs.check-all]
command = ["cargo", "check", "--all-targets", "--color", "always"]
need_stdout = false

# Run clippy on the default target
[jobs.clippy]
command = [
    "cargo", "clippy",
    "--color", "always",
]
need_stdout = false

# Run clippy on all targets
# To disable some lints, you may change the job this way:
#    [jobs.clippy-all]
#    command = [
#        "cargo", "clippy",
#        "--all-targets",
#        "--color", "always",
#    	 "--",
#    	 "-A", "clippy::bool_to_int_with_if",
#    	 "-A", "clippy::collapsible_if",
#    	 "-A", "clippy::derive_partial_eq_without_eq",
#    ]
# need_stdout = false
[jobs.clippy-all]
command = [
    "cargo", "clippy",
    "--all-targets",
    "--color", "always",
]
need_stdout = false

[jobs.clippy-pedantic]
command = [
    "cargo", "clippy",
    "--color", "always",
    "--",
    "-Wclippy::all",
    "-Wclippy::pedantic",
    "-Wclippy::nursery",
    "-Wclippy::cargo"
]
need_stdout = false

[jobs.clippy-all-pedantic]
command = [
    "cargo", "clippy",
    "--all-targets",
    "--color", "always",
    "--",
    "-Wclippy::all",
    "-Wclippy::pedantic",
    "-Wclippy::nursery",
    "-Wclippy::cargo"
]
need_stdout = false

# This job lets you run
# - all tests: bacon test
# - a specific test: bacon test -- config::test_default_files
# - the tests of a package: bacon test -- -- -p config
[jobs.test]
command = [
    "cargo", "test", "--color", "always",
    "--", "--color", "always", # see https://github.com/Canop/bacon/issues/124
]
need_stdout = true

[jobs.doc]
command = ["cargo", "doc", "--color", "always", "--no-deps"]
need_stdout = false

# If the doc compiles, then it opens in your browser and bacon switches
# to the previous job
[jobs.doc-open]
command = ["cargo", "doc", "--color", "always", "--no-deps", "--open"]
need_stdout = false
on_success = "back" # so that we don't open the browser at each change

# You can run your application and have the result displayed in bacon,
# *if* it makes sense for this crate.
# Don't forget the `--color always` part or the errors won't be
# properly parsed.
# If your program never stops (eg a server), you may set `background`
# to false to have the cargo run output immediately displayed instead
# of waiting for program's end.
[jobs.run]
command = [
    "cargo", "run",
    "--color", "always",
    # put launch parameters for your program behind a `--` separator
]
need_stdout = true
allow_warnings = true
background = true

# This parameterized job runs the example of your choice, as soon
# as the code compiles.
# Call it as
#    bacon ex -- my-example
[jobs.ex]
command = ["cargo", "run", "--color", "always", "--example"]
need_stdout = true
allow_warnings = true

# You may define here keybindings that would be specific to
# a project, for example a shortcut to launch a specific job.
# Shortcuts to internal functions (scrolling, toggling, etc.)
# should go in your personal global prefs.toml file instead.
[keybindings]
# alt-m = "job:my-job"
c = "job:clippy-all" # comment this to have 'c' run clippy on only the default target

Changes to benches/add_server.rs.

1
2
3
4
5
6
7
8
9
10
11
12

13
14
15
16
17
18
19
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20












+







use std::thread;

use criterion::{criterion_group, criterion_main, Criterion};

use ump::channel;

enum Ops {
  Die,
  Add(i32, i32),
  AddThreaded(i32, i32)
}

#[allow(clippy::missing_panics_doc)]
pub fn criterion_benchmark(c: &mut Criterion) {
  let mut group = c.benchmark_group("req operation");

  let (server, client) = channel::<Ops, i32, ()>();

  let server_thread = thread::spawn(move || {
    let mut croak = false;
39
40
41
42
43
44
45
46

47
48
49
50
51
52
53
54
55
56
57

58
59
60
61
62
63
64
65
66
67
68
69

70
71
72
73
74
75
76
77
78
79
80
81

82
83
84
85
86
87
88
40
41
42
43
44
45
46

47
48
49
50
51
52
53
54
55
56
57

58
59
60
61
62
63
64
65
66
67
68
69

70
71
72
73
74
75
76
77
78
79
80
81

82
83
84
85
86
87
88
89







-
+










-
+











-
+











-
+








  group.bench_function("add", |b| {
    b.iter(|| {
      p += 2;
      q -= 3;
      let result = client.req(Ops::Add(p, q)).unwrap();
      assert_eq!(result, q + p);
    })
    });
  });

  p = 0;
  q = 0;
  group.bench_function("add (threaded)", |b| {
    b.iter(|| {
      p += 2;
      q -= 3;
      let result = client.req(Ops::AddThreaded(p, q)).unwrap();
      assert_eq!(result, q + p);
    })
    });
  });


  let rt = tokio::runtime::Runtime::new().unwrap();
  group.bench_function("add (async)", |b| {
    b.to_async(&rt).iter(|| async {
      let p = 1;
      let q = 2;

      let result = client.areq(Ops::Add(p, q)).await.unwrap();
      assert_eq!(result, q + p);
    })
    });
  });


  let rt = tokio::runtime::Runtime::new().unwrap();
  group.bench_function("add (async, threaded)", |b| {
    b.to_async(&rt).iter(|| async {
      let p = 1;
      let q = 2;

      let result = client.areq(Ops::AddThreaded(p, q)).await.unwrap();
      assert_eq!(result, q + p);
    })
    });
  });


  let result = client.req(Ops::Die).unwrap();
  assert_eq!(result, 0);

  server_thread.join().unwrap();

Changes to src/client.rs.

1

2
3
4
5
6
7
8

1
2
3
4
5
6
7
8
-
+







use crate::{err::Error, server::ServerQueueNode};
use crate::{err::Error, server::QueueNode as ServerQueueNode};

use super::rctx::RCtxState;

/// Representation of a clonable client object that can issue requests to
/// [`Server`](super::Server) objects.
///
/// Each instantiation of a `Client` object is itself an isolated client with
20
21
22
23
24
25
26















27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46














47
48
49
50
51
52
53







+
+
+
+
+
+
+
+
+
+
+
+
+
+
+





-
-
-
-
-
-
-
-
-
-
-
-
-
-







{
  /// Send a message to the server, wait for a reply, and return the reply.
  ///
  /// A complete round-trip (the message is delivered to the server, and the
  /// server sends a reply) must complete before this function returns
  /// success.
  ///
  /// # Return
  /// On success the function will return `Ok(msg)`.
  ///
  /// # Errors
  /// If the linked server object has been released, or is released while the
  /// message is in the server's queue, [`Error::ServerDisappeared`] will be
  /// returned.
  ///
  /// If the server never replied to the message and the reply context was
  /// dropped [`Error::NoReply`] will be returned.
  ///
  /// If an application specific error occurs it will be returned as a
  /// [`Error::App`].
  ///
  /// # Implementation details
  /// This method is _currently_ reentrant: It is safe to use share a single
  /// `Client` among multiple threads.  _This may change in the future_; it's
  /// recommended to not rely on this.  The recommended way to send messages to
  /// a server from multiple threads is to clone the `Client` and assign one
  /// separate `Client` to each thread.
  ///
  /// # Return
  /// On success the function will return `Ok(msg)`.
  ///
  /// If the linked server object has been released, or is released while the
  /// message is in the server's queue, `Err(Error:ServerDisappeared)` will be
  /// returned.
  ///
  /// If the server never replied to the message and the reply context was
  /// dropped `Err(Error::NoReply)` will be returned.
  ///
  /// If an application specific error occurs it will be returned as a
  /// `Err(Error::App(E))`, where `E` is the error type used when creating the
  /// [`channel`](crate::channel).
  pub fn req(&self, out: S) -> Result<R, Error<E>> {
    // Create a per-call reply context.
    // This context could be created when the Client object is being created
    // and stored in the context, and thus be reused for reach client call.
    // One side-effect is that some of the state semantics becomes more
    // complicated.
    // The central repo has such an implementation checked in, but it seems to
110
111
112
113
114
115
116





117




118
119
120
121
122
123
124
125
126
127
128

129
130
131
132
133
134

135




136
137
138
139
140
141
142
143
144
145
146
147
148
149

150




151
152
153
154

155
156


157
158
159
160
161
162
163
164
165
166
167
168
169
170
171

172
173
174
175
176
177
178
179
180
181










182
183
184
185
186
187
188
189







190
191
192
193
194
195
196
197
198

199
200
201
202

203
204
205
206
207
208

209
210
211
212
213
214
215
111
112
113
114
115
116
117
118
119
120
121
122

123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145

146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164

165
166
167
168
169
170
171
172
173


174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189

190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216


217
218
219
220
221
222
223
224
225
226
227
228
229
230
231

232
233
234
235

236
237
238
239
240
241

242
243
244
245
246
247
248
249







+
+
+
+
+
-
+
+
+
+











+






+
-
+
+
+
+














+
-
+
+
+
+




+
-
-
+
+














-
+










+
+
+
+
+
+
+
+
+
+






-
-
+
+
+
+
+
+
+








-
+



-
+





-
+







  ///
  /// let reply = wrctx.wait().unwrap();
  /// println!("Client received reply '{}'", reply);
  /// println!("Client done");
  ///
  /// server_thread.join().unwrap();
  /// ```
  ///
  /// # Errors
  /// If the linked server object has been released, or is released while the
  /// message is in the server's queue, [`Error::ServerDisappeared`] will be
  /// returned.
  pub fn req_async(&self, out: S) -> Result<WaitReply<R, E>, Error<E>> {
  pub fn req_async(&self, out: S) -> Result<WaitReply<R, E>, Error<E>>
  where
    S: Send
  {
    let (sctx, wctx) = swctx::mkpair();
    self
      .0
      .push(ServerQueueNode {
        msg: out,
        reply: sctx
      })
      .map_err(|_| Error::ServerDisappeared)?;
    Ok(WaitReply(wctx))
  }

  #[allow(clippy::missing_errors_doc)]
  #[deprecated(since = "0.10.2", note = "Use req() instead.")]
  pub fn send(&self, out: S) -> Result<R, Error<E>> {
    self.req(out)
  }

  /// Same as [`Client::req()`] but for use in `async` contexts.
  #[allow(clippy::missing_errors_doc)]
  pub async fn areq(&self, out: S) -> Result<R, Error<E>> {
  pub async fn areq(&self, out: S) -> Result<R, Error<E>>
  where
    S: Send
  {
    let (sctx, wctx) = swctx::mkpair();

    self
      .0
      .push(ServerQueueNode {
        msg: out,
        reply: sctx
      })
      .map_err(|_| Error::ServerDisappeared)?;

    Ok(wctx.wait_async().await?)
  }

  #[deprecated(since = "0.10.2", note = "Use areq() instead.")]
  #[allow(clippy::missing_errors_doc)]
  pub async fn asend(&self, out: S) -> Result<R, Error<E>> {
  pub async fn asend(&self, out: S) -> Result<R, Error<E>>
  where
    S: Send
  {
    self.areq(out).await
  }

  /// Create a weak `Client` reference.
  #[must_use]
  pub fn weak(&self) -> WeakClient<S, R, E> {
    WeakClient(self.0.weak())
  pub fn weak(&self) -> Weak<S, R, E> {
    Weak(self.0.weak())
  }
}


impl<S, R, E> Clone for Client<S, R, E> {
  /// Clone a client.
  ///
  /// When a client is cloned the new object will be linked to the same server,
  /// but in all other respects the clone is a completely independent client.
  ///
  /// This means that a cloned client can be passed to a new thread/task and
  /// make new independent calls to the server without any risk of collision
  /// between clone and the original client object.
  fn clone(&self) -> Self {
    Client(self.0.clone())
    Self(self.0.clone())
  }
}

/// Context used to wait for a server to reply to a request.
pub struct WaitReply<R, E>(swctx::WaitCtx<R, RCtxState, E>);

impl<R, E> WaitReply<R, E> {
  /// Block and wait for a reply.
  ///
  /// For use in non-`async` threads.
  ///
  /// # Errors
  /// [`Error::ServerDisappeared`] means the linked server object has been
  /// released.
  ///
  /// If the [`ReplyContext`](super::ReplyContext) is dropped by the server
  /// handler it replies to the message, [`Error::NoReply`] will be returned.
  ///
  /// If an application specific error occurs it will be returned in
  /// [`Error::App`].
  pub fn wait(self) -> Result<R, Error<E>> {
    Ok(self.0.wait()?)
  }

  /// Block and wait for a reply.
  ///
  /// For use in `async` tasks.
  pub async fn wait_async(self) -> Result<R, Error<E>> {
  /// Same as [`WaitReply::wait()`], but for use in `async` contexts.
  #[allow(clippy::missing_errors_doc)]
  pub async fn wait_async(self) -> Result<R, Error<E>>
  where
    R: Send,
    E: Send
  {
    Ok(self.0.wait_async().await?)
  }
}


/// A weak client reference that can be upgraded to a [`Client`] as long as
/// other `Client` objects till exist.
#[repr(transparent)]
pub struct WeakClient<S, R, E>(
pub struct Weak<S, R, E>(
  pub(crate) sigq::WeakPusher<ServerQueueNode<S, R, E>>
);

impl<S, R, E> Clone for WeakClient<S, R, E> {
impl<S, R, E> Clone for Weak<S, R, E> {
  fn clone(&self) -> Self {
    Self(self.0.clone())
  }
}

impl<S, R, E> WeakClient<S, R, E> {
impl<S, R, E> Weak<S, R, E> {
  /// Upgrade a `WeakClient` to a [`Client`].
  ///
  /// If no strong `Client` objects still exist then `None` is returned.
  ///
  /// # Examples
  ///
  /// Upgrading a weak client while stong clients exists works:
230
231
232
233
234
235
236

237
238
239
240
241
242
264
265
266
267
268
269
270
271
272
273
274
275
276
277







+






  /// let (server, client) = channel::<String, String, ()>();
  /// let weak_client = client.weak();
  /// drop(client);
  /// let Some(_) = weak_client.upgrade() else {
  ///   panic!("Unexpectedly able to upgrade weak client");
  /// };
  /// ```
  #[must_use]
  pub fn upgrade(&self) -> Option<Client<S, R, E>> {
    self.0.upgrade().map(|x| Client(x))
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/err.rs.

8
9
10
11
12
13
14




15
16
17
18
19
20
21
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25







+
+
+
+







  /// The server object has shut down.
  ///
  /// Happens when clients:
  /// - attempt to transmit messages to a server that has been deallocated.
  /// - have their requests dropped from the serrver's queue because the
  ///   server itself was deallocated.
  ServerDisappeared,

  /// A message reply could not be completed because the original requestor
  /// disappearing.
  OriginDisappeared,

  /// No more client end-points remain.
  ///
  /// There are no more nodes to pick up in the queue and all client
  /// end-points have been dropped (implied: no new nodes will ever be added
  /// to the queue).
  ClientsDisappeared,
31
32
33
34
35
36
37
38

39
40
41
42
43
44
45

46
47
48
49

50
51
52
53
54
55

56
57




58
59
60
61
62
63





64
65
66
67
68
69
70
71
72
73


74

75

76
77
78
79
80
35
36
37
38
39
40
41

42
43
44
45
46
47
48

49
50
51
52

53
54
55
56
57
58

59
60

61
62
63
64
65
66




67
68
69
70
71
72
73
74
75
76
77
78
79


80
81
82
83

84
85
86
87
88
89







-
+






-
+



-
+





-
+

-
+
+
+
+


-
-
-
-
+
+
+
+
+








-
-
+
+

+
-
+





  App(E)
}

impl<E> Error<E> {
  /// Attempt to convert [`Error`] into application-specific error.
  pub fn into_apperr(self) -> Option<E> {
    match self {
      Error::App(e) => Some(e),
      Self::App(e) => Some(e),
      _ => None
    }
  }

  /// Unwrap application-specific error from an [`Error`].
  ///
  /// # Panic
  /// # Panics
  /// Panics if `Error` variant is not `Error::App()`.
  pub fn unwrap_apperr(self) -> E {
    match self {
      Error::App(e) => e,
      Self::App(e) => e,
      _ => panic!("Not an Error::App")
    }
  }
}

impl<E: fmt::Debug> std::error::Error for Error<E> {}
impl<E: std::error::Error> std::error::Error for Error<E> {}

impl<E: fmt::Debug> fmt::Display for Error<E> {
impl<E> fmt::Display for Error<E>
where
  E: std::error::Error
{
  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
    match self {
      Error::ServerDisappeared => write!(f, "Server disappeared"),
      Error::ClientsDisappeared => write!(f, "Clients disappeared"),
      Error::NoReply => write!(f, "Server didn't reply"),
      Error::App(err) => write!(f, "Application error; {:?}", err)
      Self::ServerDisappeared => write!(f, "Server disappeared"),
      Self::OriginDisappeared => write!(f, "Requestor disappeared"),
      Self::ClientsDisappeared => write!(f, "Clients disappeared"),
      Self::NoReply => write!(f, "Server didn't reply"),
      Self::App(err) => write!(f, "Application error; {}", err)
    }
  }
}

impl<E> From<swctx::Error<RCtxState, E>> for Error<E> {
  fn from(err: swctx::Error<RCtxState, E>) -> Self {
    match err {
      swctx::Error::Aborted(state) => match state {
        RCtxState::Queued => Error::ServerDisappeared,
        RCtxState::Active => Error::NoReply
        RCtxState::Queued => Self::ServerDisappeared,
        RCtxState::Active => Self::NoReply
      },
      swctx::Error::LostWaiter => Self::OriginDisappeared,
      swctx::Error::App(e) => Error::App(e)
      swctx::Error::App(e) => Self::App(e)
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/lib.rs.

103
104
105
106
107
108
109
110

111
112
113
114
115
116
117
103
104
105
106
107
108
109

110
111
112
113
114
115
116
117







-
+







mod err;
mod rctx;
mod server;

pub use err::Error;

pub use crate::{
  client::{Client, WaitReply, WeakClient},
  client::{Client, WaitReply, Weak as WeakClient},
  rctx::ReplyContext,
  server::Server
};

/// Create a pair of linked [`Server`] and [`Client`] objects.
///
/// The [`Server`] object is used to wait for incoming messages from connected
126
127
128
129
130
131
132

133
134
135
136
137
138
139
140
141
142
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143







+










/// new client object that is connected to the same server object, but is
/// completely independent of the original client.
///
/// The `S` type parameter is the "request" data type that clients will
/// transfer to the server.  The `R` type parameter is the "receive" data type
/// that clients will receive from the server.  The `E` type parameter can be
/// used to return application specific errors from the server to the client.
#[must_use]
pub fn channel<S, R, E>() -> (Server<S, R, E>, Client<S, R, E>) {
  let (qpusher, qpuller) = sigq::new();

  let server = Server(qpuller);
  let client = Client(qpusher);

  (server, client)
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/rctx.rs.

1
2
3
4
5
6
7
8
9
10
11

12
13
14
15
16
17
18
1
2
3
4
5
6
7
8
9
10

11
12
13
14
15
16
17
18










-
+







//! Allow a thread/task, crossing sync/async boundaries in either direction, to
//! deliver an expected piece of data to another thread/task, with
//! notification.
//!
//! These are simple channels used to deliver data from one endpoint to
//! another, where the receiver will block until data has been delivered.

use crate::err::Error;

#[derive(Clone, Default)]
pub(crate) enum RCtxState {
pub enum RCtxState {
  #[default]
  Queued,
  Active
}

/// Object used to respond to requests that have been received by a
/// [`Server`](super::Server).
34
35
36
37
38
39
40




41
42
43
44

45
46
47
48
49
50
51
34
35
36
37
38
39
40
41
42
43
44
45
46
47

48
49
50
51
52
53
54
55







+
+
+
+



-
+







  /// });
  /// let msg = String::from("Client");
  /// let reply = client.req(msg).unwrap();
  /// assert_eq!(reply, "Hello, Client!");
  /// server_thread.join().unwrap();
  /// ```
  ///
  /// # Errors
  /// If the originating caller is no longer waiting for a reply (i.e. was
  /// dropped) [`Error::OriginDisappeared`] is returned.
  ///
  /// # Semantics
  /// This call is safe to make after the server context has been released.
  pub fn reply(self, data: T) -> Result<(), Error<E>> {
    self.0.set(data);
    self.0.set(data)?;
    Ok(())
  }

  /// Return an error to originating client.
  /// This will cause the calling client to return an error.  The error passed
  /// in the `err` parameter will be wrapped in a `Error::App(err)`.
  ///
74
75
76
77
78
79
80




81
82
83
84

85
86
87
88
89
90
91
92











93
94
95
96
78
79
80
81
82
83
84
85
86
87
88
89
90
91

92
93
94
95
96




97
98
99
100
101
102
103
104
105
106
107
108
109
110
111







+
+
+
+



-
+




-
-
-
-
+
+
+
+
+
+
+
+
+
+
+




  ///   _ => {
  ///     panic!("Unexpected return value");
  ///   }
  /// }
  /// server_thread.join().unwrap();
  /// ```
  ///
  /// # Errors
  /// If the originating caller is no longer waiting for a reply (i.e. was
  /// dropped) [`Error::OriginDisappeared`] is returned.
  ///
  /// # Semantics
  /// This call is safe to make after the server context has been released.
  pub fn fail(self, err: E) -> Result<(), Error<E>> {
    self.0.fail(err);
    self.0.fail(err)?;
    Ok(())
  }
}

impl<T, E> From<swctx::SetCtx<T, RCtxState, E>> for ReplyContext<T, E> {
  fn from(sctx: swctx::SetCtx<T, RCtxState, E>) -> Self {
    sctx.set_state(RCtxState::Active);
    ReplyContext(sctx)
impl<T, E> TryFrom<swctx::SetCtx<T, RCtxState, E>> for ReplyContext<T, E> {
  type Error = Error<E>;

  /// Convert a `SetCtx` into a `ReplyContext`.
  ///
  /// Sets the `SetCtx`'s stat to _Active_.
  fn try_from(
    sctx: swctx::SetCtx<T, RCtxState, E>
  ) -> Result<Self, Self::Error> {
    let _ = sctx.set_state(RCtxState::Active);
    Ok(Self(sctx))
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/server.rs.

1
2
3
4
5
6

7
8
9
10
11
12
13
14
15
16
17
18
19
20

21
22
23
24
25
26
27
28
29
30
31
32
33




34
35
36
37
38
39
40
41
42

43
44
45
46
47




48
49
50
51
52
53
54
55
56
57
58
59

60
61
62
63
64
65
66
67

68
69
70
71
72
73
74
75
76

77
78
79
80
81
82
83

84
85
86
87
88
89
1
2
3
4
5

6
7
8
9
10
11
12
13
14
15
16
17
18
19

20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

85
86
87
88
89
90
91
92
93
94
95
96
97
98
99





-
+













-
+













+
+
+
+








-
+





+
+
+
+











-
+








+








-
+







+






use crate::{
  err::Error,
  rctx::{RCtxState, ReplyContext}
};

pub(crate) struct ServerQueueNode<S, R, E> {
pub struct QueueNode<S, R, E> {
  /// Raw message being sent from the client to the server.
  pub(crate) msg: S,

  /// Keep track of data needed to share reply data.
  pub(crate) reply: swctx::SetCtx<R, RCtxState, E>
}

/// Representation of a server object.
///
/// Each instantiation of a [`Server`] object represents an end-point which
/// will be used to receive messages from connected [`Client`](crate::Client)
/// objects.
#[repr(transparent)]
pub struct Server<S, R, E>(pub(crate) sigq::Puller<ServerQueueNode<S, R, E>>);
pub struct Server<S, R, E>(pub(crate) sigq::Puller<QueueNode<S, R, E>>);

impl<S, R, E> Server<S, R, E>
where
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send
{
  /// Block and wait, indefinitely, for an incoming message from a
  /// [`Client`](crate::Client).
  ///
  /// Returns the message sent by the client and a reply context.  The server
  /// must call [`ReplyContext::reply()`] on the reply context to pass a return
  /// value to the client.
  ///
  /// # Errors
  /// `Err(Error::ClientsDisappeared)` indicates that the queue is empty and
  /// all the client end-points have been dropped.
  pub fn wait(&self) -> Result<(S, ReplyContext<R, E>), Error<E>> {
    let node = self.0.pop().map_err(|_| Error::ClientsDisappeared)?;

    // Extract the data from the node
    let msg = node.msg;

    // Create an application reply context from the reply context in the queue
    // Implicitly changes state of the reply context from Queued to Waiting
    let rctx = ReplyContext::from(node.reply);
    let rctx = ReplyContext::try_from(node.reply)?;

    Ok((msg, rctx))
  }

  /// Take next next message off queue or return `None` is queue is empty.
  ///
  /// # Errors
  /// [`Error::ClientsDisappeared`] indicates that the queue is empty and
  /// all the client end-points have been dropped.
  #[allow(clippy::type_complexity)]
  pub fn try_pop(&self) -> Result<Option<(S, ReplyContext<R, E>)>, Error<E>> {
    let node = self.0.try_pop().map_err(|_| Error::ClientsDisappeared)?;

    if let Some(node) = node {
      // Extract the data from the node
      let msg = node.msg;

      // Create an application reply context from the reply context in the
      // queue Implicitly changes state of the reply context from Queued
      // to Waiting
      let rctx = ReplyContext::from(node.reply);
      let rctx = ReplyContext::try_from(node.reply)?;

      Ok(Some((msg, rctx)))
    } else {
      Ok(None)
    }
  }

  /// Same as [`Server::wait()`], but for use in an `async` context.
  #[allow(clippy::missing_errors_doc)]
  pub async fn async_wait(&self) -> Result<(S, ReplyContext<R, E>), Error<E>> {
    let node = self.0.apop().await.map_err(|_| Error::ClientsDisappeared)?;

    // Extract the data from the node
    let msg = node.msg;

    // Create an application reply context from the reply context in the queue
    // Implicitly changes state of the reply context from Queued to Waiting
    let rctx = ReplyContext::from(node.reply);
    let rctx = ReplyContext::try_from(node.reply)?;

    Ok((msg, rctx))
  }

  /// Returns a boolean indicating whether the queue is/was empty.  This isn't
  /// really useful unless used in very specific situations.  It mostly exists
  /// for test cases.
  #[must_use]
  pub fn was_empty(&self) -> bool {
    self.0.was_empty()
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to tests/async_client.rs.

42
43
44
45
46
47
48
49

50
51
52

53
54
55
56
57
58
42
43
44
45
46
47
48

49

50

51
52
53
54
55
56
57







-
+
-

-
+






      if let Reply::Sum(sum) = result {
        assert_eq!(sum, a + b);
      } else {
        panic!("Didn't get sum");
      }
    }
    let result = client.areq(Request::Croak).await.unwrap();
    if let Reply::OkICroaked = result {
    let Reply::OkICroaked = result else {
    } else {
      panic!("Didn't get a croak");
    }
    };
  });

  server_thread.join().unwrap();
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Added tests/wait_disappear.rs.










































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
use ump::{channel, Error};

#[test]
fn wait_disappered_on_reply() {
  let (server, client) = channel::<String, String, ()>();

  // Generate a request that will return a wait context.
  let wctx = client.req_async(String::from("hello"));

  // Get the message (and reply context) from server end-point
  let (_msg, rctx) = server.wait().unwrap();

  // nuke the wait context
  drop(wctx);

  // Replying should fail, because wctx has been dropped
  let Err(Error::OriginDisappeared) = rctx.reply(String::from("ahoy")) else {
    panic!("Unexpected error");
  };
}

#[test]
fn wait_disappered_on_fail() {
  let (server, client) = channel::<String, String, ()>();

  // Generate a request that will return a wait context.
  let wctx = client.req_async(String::from("hello"));

  // Get the message (and reply context) from server end-point
  let (_msg, rctx) = server.wait().unwrap();

  // nuke the wait context
  drop(wctx);

  // Failing should fail, because wctx has been dropped
  let Err(Error::OriginDisappeared) = rctx.fail(()) else {
    panic!("Unexpected error");
  };
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to www/changelog.md.

1
2


3


4
5
6
7
8
9
10
11
12



















13


14
15
16
17
18
19
20
21
22
23
24
25
26
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

45
46
47
48
49
50
51
52
53
54
55
56
57

58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

75
76
77
78
79
80
81
82
83
84
85

86
87
88
89
90
91
92
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

69
70
71
72
73
74
75
76
77
78
79
80
81

82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98

99
100
101
102
103
104
105
106
107
108
109

110
111
112
113
114
115
116
117


+
+

+
+









+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

+
+













-
+
















-
+












-
+
















-
+










-
+







# Change Log

⚠️  indicates a breaking change.

## [Unreleased]

[Details](/vdiff?from=ump-0.13.0&to=trunk)

### Added

### Changed

### Removed

---

## [0.13.0] - 2024-09-10

[Details](/vdiff?from=ump-0.12.1&to=ump-0.13.0)

### Changed

- Update to `swctx` to `0.3.0`, allowing `ReplyContext` to detect if the
  originating client has been dropped.
- ⚠️ Require `std::error::Error` bound on application-specific error `E` for
  `std::error::Error` implementation on `Error<E>` as well as `fmt::Display`
  for `Error<E>`.

### Removed

- Remove `dev-docs` feature
- Remove superfluous `parking_lot` dependency.

---

## [0.12.1] - 2023-10-02

[Details](/vdiff?from=ump-0.12.0&to=ump-0.12.1)

### Added

- Add `Client::req_async()`.
- Add `Server::try_pop()`.
- `Client` objects can spawn downgraded to `WeakClient` objects, that in turn
  can be upgraded to `Client` objects (as long as all the strong `Client`
  objects have not been dropped).

---

## [0.12.0] - 2023-08-15

[Details](https://repos.qrnch.tech/pub/ump/vdiff?from=ump-0.11.0&to=ump-0.12.0)
[Details](/vdiff?from=ump-0.11.0&to=ump-0.12.0)

### Changed

- Include tests when publishing crate.
- Bugfix: Use `err::Error` rather than `rctx::err::Error` in rctx::public,
  giving `ReplyContext::reply()` and `ReplyContext::fail()` the correct return
  types.
- Use the `swctx` crate for sending back the reply rather than use a custom
  in-tree implementation.
- Update `edition` to `2021` and `rust-version` to `1.56`.
- Add `--generate-link-to-definition` to `rustdoc-args` in `Cargo.toml`

---

## [0.11.0] - 2023-07-29

[Details](https://repos.qrnch.tech/pub/ump/vdiff?from=ump-0.10.2&to=ump-0.11.0)
[Details](/vdiff?from=ump-0.10.2&to=ump-0.11.0)

### Changed

- Include tests when publishing crate.
- Bugfix: Use `err::Error` rather than `rctx::err::Error` in rctx::public,
  giving `ReplyContext::reply()` and `ReplyContext::fail()` the correct return
  types.

---

## [0.10.2] - 2023-07-28

[Details](https://repos.qrnch.tech/pub/ump/vdiff?from=ump-0.10.1&to=ump-0.10.2)
[Details](/vdiff?from=ump-0.10.1&to=ump-0.10.2)

### Added

- Add `send()`/`asend()` wrappers around the new `req()`/`areq()` methods with
  a deprecation notice.
- Add a `dev-docs` feature to allow internal documentation notes to be
  included in generated documentation.

### Changed

- Rename `send()`/`asend()` to `req()`/`areq()`.

---

## [0.10.1] - 2023-07-27

[Details](https://repos.qrnch.tech/pub/ump/vdiff?from=ump-0.10.0&to=ump-0.10.1)
[Details](/vdiff?from=ump-0.10.0&to=ump-0.10.1)

### Changed

- Runtime dependencies:
  - Updated `sigq` to `0.13.3`.

---

## [0.10.0] - 2023-07-26

[Details](https://repos.qrnch.tech/pub/ump/vdiff?from=ump-0.9.0&to=ump-0.10.0)
[Details](/vdiff?from=ump-0.9.0&to=ump-0.10.0)

### Added

- Server's receive methods will fail with `Error::ClientsDisappeared` if all
  the associated Client objects have been dropped.

### Changed