ump

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From ump-0.12.0 To ump-0.12.1

2024-01-12
14:31
Remove dev-docs feature. check-in: 2ba8f53f39 user: jan tags: trunk
2023-10-02
13:32
Release maintenance. check-in: 45f4a89db5 user: jan tags: trunk, ump-0.12.1
2023-09-17
10:16
Add changelog separators. check-in: e1451ac681 user: jan tags: trunk
2023-08-31
14:34
Document ReplyContext. check-in: 61295fc1c3 user: jan tags: trunk
2023-08-14
23:03
Release maintenance. check-in: b08bb813bf user: jan tags: trunk, ump-0.12.0
22:55
Update swctx to 0.2.1. check-in: 3eaac7ef46 user: jan tags: trunk

Changes to Cargo.toml.

1
2
3

4
5
6
7
8
9
10
11
12
13
14
15

16
17
18
19
20
21
22
23
24
25
26
27

28
29
30
31
32

33
34
35
36
37
38
39
1
2

3
4
5
6
7
8
9
10


11

12
13
14
15
16
17
18
19
20
21
22
23
24

25
26
27
28
29

30
31
32
33
34
35
36
37


-
+







-
-

-

+











-
+




-
+







[package]
name = "ump"
version = "0.12.0"
version = "0.12.1"
edition = "2021"
license = "0BSD"
categories = [ "concurrency", "asynchronous" ]
keywords = [ "channel", "threads", "sync", "message-passing" ]
repository = "https://repos.qrnch.tech/pub/ump"
description = "Micro message passing library for threads/tasks communication."
rust-version = "1.56"

# Can't exclude "benches", because the [[bench]] section will fail.
exclude = [
  ".efiles",
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "examples",
  "www",
  "rustfmt.toml"
]

[features]
dev-docs = []

[dependencies]
parking_lot = { version = "0.12.1" }
sigq = { version = "0.13.3" }
sigq = { version = "0.13.4" }
swctx = { version = "0.2.1" }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["async_tokio"] }
tokio = { version = "1.31.0", features = ["rt-multi-thread"] }
tokio = { version = "1.32.0", features = ["rt-multi-thread"] }

[[bench]]
name = "add_server"
harness = false

[package.metadata.docs.rs]
rustdoc-args = ["--generate-link-to-definition"]

Changes to src/client.rs.

1
2


3


4
5
6
7
8
9

10

11
12
13
14
15
16
17
18
19
20
21
22
23
1
2
3
4

5
6
7
8
9
10
11
12
13

14






15
16
17
18
19
20
21


+
+
-
+
+






+
-
+
-
-
-
-
-
-







use crate::{err::Error, server::ServerQueueNode};

use super::rctx::RCtxState;

/// Representation of a clonable client object.
/// Representation of a clonable client object that can issue requests to
/// [`Server`](super::Server) objects.
///
/// Each instantiation of a `Client` object is itself an isolated client with
/// regards to the server context.  By cloning a client a new independent
/// client is created.  ("Independent" here meaning that it is still tied to
/// the same server object, but the new client can be passed to a separate
/// thread and can independently make calls to the server).
#[repr(transparent)]
pub struct Client<S, R, E> {
pub struct Client<S, R, E>(pub(crate) sigq::Pusher<ServerQueueNode<S, R, E>>);
  /// Weak reference to server queue.
  ///
  /// The server context holds the only strong reference to the queue.  This
  /// allows the clients to detect when the server has terminated.
  pub(crate) qpusher: sigq::Pusher<ServerQueueNode<S, R, E>>
}

impl<S, R, E> Client<S, R, E>
where
  R: 'static + Send,
  E: 'static + Send
{
  /// Send a message to the server, wait for a reply, and return the reply.
52
53
54
55
56
57
58
59

60
61
62
63
64
65
66
67
68





























































69
70
71
72
73
74
75
76
77
78
79
80

81
82
83
84
85
86
87
88
89
90
91
92
93





94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109













110
111
112

























































113
50
51
52
53
54
55
56

57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138

139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170



171
172
173
174
175
176
177
178
179
180
181
182
183
184


185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242







-
+









+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+











-
+













+
+
+
+
+













-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+

-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

    // One side-effect is that some of the state semantics becomes more
    // complicated.
    // The central repo has such an implementation checked in, but it seems to
    // have some more corner cases that aren't properly handled.
    let (sctx, wctx) = swctx::mkpair();

    self
      .qpusher
      .0
      .push(ServerQueueNode {
        msg: out,
        reply: sctx
      })
      .map_err(|_| Error::ServerDisappeared)?;

    // Wait for a reply to arrive
    Ok(wctx.wait()?)
  }

  /// Issue a request, immediately returning a context that is used to wait for
  /// the server's reply.
  ///
  /// The `_async` naming is slightly misleading -- this method isn't an
  /// `async` in a language/`Future` sense, but rather it doesn't block and
  /// wait for a reply before returning.  Instead it returns a [`WaitReply`]
  /// object that is used to wait for the reply.
  ///
  /// This can be useful (in place of [`req()`](Client::req) or
  /// [`areq()`](Client::areq()) methods) if the caller knows that the server
  /// will take some time to respond to the request and the caller has other
  /// tasks it can perform in the meantime.
  ///
  /// ```
  /// use std::thread;
  ///
  /// use ump::channel;
  ///
  /// let (server, client) = channel::<String, String, ()>();
  ///
  /// let server_thread = thread::spawn(move || {
  ///   // Wait for data to arrive from a client
  ///   println!("Server waiting for message ..");
  ///   let (data, mut rctx) = server.wait().unwrap();
  ///
  ///   println!("Server received: '{}'", data);
  ///
  ///   // Long processing of data from client
  ///
  ///   // Reply to client
  ///   let reply = format!("Hello, {}!", data);
  ///   println!("Server replying '{}'", reply);
  ///   rctx.reply(reply);
  ///
  ///   println!("Server done");
  /// });
  ///
  /// let msg = String::from("Client");
  /// println!("Client sending '{}'", msg);
  /// let wrctx = client.req_async(msg).unwrap();
  ///
  /// // .. perform some operation while server is processing the request ..
  ///
  /// let reply = wrctx.wait().unwrap();
  /// println!("Client received reply '{}'", reply);
  /// println!("Client done");
  ///
  /// server_thread.join().unwrap();
  /// ```
  pub fn req_async(&self, out: S) -> Result<WaitReply<R, E>, Error<E>> {
    let (sctx, wctx) = swctx::mkpair();
    self
      .0
      .push(ServerQueueNode {
        msg: out,
        reply: sctx
      })
      .map_err(|_| Error::ServerDisappeared)?;
    Ok(WaitReply(wctx))
  }

  #[deprecated(since = "0.10.2", note = "Use req() instead.")]
  pub fn send(&self, out: S) -> Result<R, Error<E>> {
    self.req(out)
  }

  /// Same as [`Client::req()`] but for use in `async` contexts.
  pub async fn areq(&self, out: S) -> Result<R, Error<E>> {
    let (sctx, wctx) = swctx::mkpair();

    self
      .qpusher
      .0
      .push(ServerQueueNode {
        msg: out,
        reply: sctx
      })
      .map_err(|_| Error::ServerDisappeared)?;

    Ok(wctx.wait_async().await?)
  }

  #[deprecated(since = "0.10.2", note = "Use areq() instead.")]
  pub async fn asend(&self, out: S) -> Result<R, Error<E>> {
    self.areq(out).await
  }

  /// Create a weak `Client` reference.
  pub fn weak(&self) -> WeakClient<S, R, E> {
    WeakClient(self.0.weak())
  }
}


impl<S, R, E> Clone for Client<S, R, E> {
  /// Clone a client.
  ///
  /// When a client is cloned the new object will be linked to the same server,
  /// but in all other respects the clone is a completely independent client.
  ///
  /// This means that a cloned client can be passed to a new thread/task and
  /// make new independent calls to the server without any risk of collision
  /// between clone and the original client object.
  fn clone(&self) -> Self {
    Client {
      qpusher: self.qpusher.clone()
    }
    Client(self.0.clone())
  }
}

/// Context used to wait for a server to reply to a request.
pub struct WaitReply<R, E>(swctx::WaitCtx<R, RCtxState, E>);

impl<R, E> WaitReply<R, E> {
  /// Block and wait for a reply.
  ///
  /// For use in non-`async` threads.
  pub fn wait(self) -> Result<R, Error<E>> {
    Ok(self.0.wait()?)
  }
}


  /// Block and wait for a reply.
  ///
  /// For use in `async` tasks.
  pub async fn wait_async(self) -> Result<R, Error<E>> {
    Ok(self.0.wait_async().await?)
  }
}


/// A weak client reference that can be upgraded to a [`Client`] as long as
/// other `Client` objects till exist.
#[repr(transparent)]
pub struct WeakClient<S, R, E>(
  pub(crate) sigq::WeakPusher<ServerQueueNode<S, R, E>>
);

impl<S, R, E> Clone for WeakClient<S, R, E> {
  fn clone(&self) -> Self {
    Self(self.0.clone())
  }
}

impl<S, R, E> WeakClient<S, R, E> {
  /// Upgrade a `WeakClient` to a [`Client`].
  ///
  /// If no strong `Client` objects still exist then `None` is returned.
  ///
  /// # Examples
  ///
  /// Upgrading a weak client while stong clients exists works:
  /// ```
  /// use ump::{channel, Error};
  ///
  /// let (server, client) = channel::<String, String, ()>();
  /// let weak_client = client.weak();
  /// let Some(client2) = weak_client.upgrade() else {
  ///   panic!("Unable to upgrade weak client");
  /// };
  /// ```
  ///
  /// Upgrading a weak client when no stong clients exists fails:
  /// ```
  /// use ump::{channel, Error};
  ///
  /// let (server, client) = channel::<String, String, ()>();
  /// let weak_client = client.weak();
  /// drop(client);
  /// let Some(_) = weak_client.upgrade() else {
  ///   panic!("Unexpectedly able to upgrade weak client");
  /// };
  /// ```
  pub fn upgrade(&self) -> Option<Client<S, R, E>> {
    self.0.upgrade().map(|x| Client(x))
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/lib.rs.

1
2
3
4
5
6
7
8
9
10
11
12


13
14
15


16
17
18
19
20
21








22
23
24
25



26
27
28


29
30

31
32
33
34
35
36
37
1
2
3
4
5
6
7
8
9
10


11
12



13
14






15
16
17
18
19
20
21
22




23
24
25
26


27
28


29
30
31
32
33
34
35
36










-
-
+
+
-
-
-
+
+
-
-
-
-
-
-
+
+
+
+
+
+
+
+
-
-
-
-
+
+
+

-
-
+
+
-
-
+







//! Micro Message Pass (ump) is a library for passing messages between
//! thread/tasks.  It has some similarities with the common mpsc channel
//! libraries, but with the most notable difference that each time a client
//! sends a message the server must send back a reply.
//!
//! The primary purpose of ump is to create simple RPC like designs, but
//! between threads/tasks within a process rather than between processes over
//! networks.
//!
//! # High-level usage overview
//! An application calls [`channel`] to create a linked pair of a [`Server`]
//! and a [`Client`].
//! 1. An application calls [`channel`] to create a linked pair of a [`Server`]
//!    and a [`Client`].
//!
//! The server calls [`Server::wait()`]/[`Server::async_wait()`], which
//! blocks and waits for an incoming message from a client.
//! 2. The server calls [`Server::wait()`]/[`Server::async_wait()`], which
//!    blocks and waits for an incoming message from a client.
//!
//! A client, in a separate thread or task, calls
//! [`Client::req()`]/[`Client::areq()`] to send a message to the server.
//!
//! The server's wait call returns two objects:  The message sent by the
//! client, and a [`ReplyContext`].
//! 3. A client, in a separate thread or task, sends a message to the server
//!    and wait for a reply using:
//!    - [`Client::req()`] for non-`async` contexts.
//!    - [`Client::areq()`] to `async` contexts.
//!    - [`Client::req_async()`] (and wait for a reply using the returned
//!      [`WaitReply`])
//! 4. The server's wait call returns two objects:  The message sent by the
//!    client, and a [`ReplyContext`].
//!
//! After processing its application-defined message, the server *must* call
//! the [`ReplyContext::reply()`] on the returned reply context object to
//! return a reply message to the client.
//! 5. After processing its application-defined message, the server *must* call
//!    the [`ReplyContext::reply()`] on the returned reply context object to
//!    return a reply message to the client.
//!
//! Typically the server calls wait again to wait for next message from a
//! client.
//!    Typically the server calls wait again to wait for next message from a
//!    client.
//!
//! The client receives the reply from the server and processes it.
//! 6. The client receives the reply from the server and processes it.
//!
//! # Example
//! ```
//! use std::thread;
//!
//! use ump::channel;
//!
71
72
73
74
75
76
77
78

79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110





111
112
113
114
115
116
117
70
71
72
73
74
75
76

77
78
79
80
81
82
83
84
85
86
87
88
89
90
91



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108

109
110
111
112
113
114
115
116
117
118
119
120







-
+














-
-
-
+
+
+














-
+
+
+
+
+







//! There are some potentially useful semantic quirks that can be good to know
//! about, but some of them should be used with caution.  This section will
//! describe some semantics that you can rely on, and others that you should be
//! careful about relying on.
//!
//! ## Stable invariants
//!
//! These are behaviors which should not change in future versions.
//! Semantics that should not change in future versions.
//!
//! - The reply contexts are independent of the `Server` context.  This has
//!   some useful implications for server threads that spawn separate threads
//!   to process messages and return replies:  *The server can safely terminate
//!   while there are clients waiting for replies* (implied: the server can
//!   safely terminate while there are reply contexts in-flight).
//! - A cloned client is paired with the same server as its origin, but in all
//!   other respects the clone and its origin are independent of each other.
//! - A client can be moved to a new thread.
//! - Any permutation of sync/async server/clients can be combined.  `async`
//!   code must use the async method variants when available.
//!
//! ## Unstable invariants
//!
//! These are invariants you can trust will work in the current version, but
//! they exist merely as a side-effect of the current implementation.  Avoid
//! relying on these if possible.
//! Semantics you can trust will work in the current version, but they exist
//! merely as a side-effect of the current implementation.  Avoid relying on
//! these if possible.
//!
//! - A single client can be used from two different threads.  If a `Client`
//!   object in placed in an Arc, is cloned and passed to another thread/task
//!   then both the clone and the original can be used simultaneously.  In the
//!   future this may not be allowed. It is recommended that a new clone of the
//!   client be created instead.

mod client;
mod err;
mod rctx;
mod server;

pub use err::Error;

pub use crate::{client::Client, rctx::ReplyContext, server::Server};
pub use crate::{
  client::{Client, WaitReply, WeakClient},
  rctx::ReplyContext,
  server::Server
};

/// Create a pair of linked [`Server`] and [`Client`] objects.
///
/// The [`Server`] object is used to wait for incoming messages from connected
/// clients.  Once a message arrives it must reply to it using a
/// [`ReplyContext`] that's returned to it in the same call that returned the
/// message.
126
127
128
129
130
131
132
133

134
135

136
137
138
139
140
129
130
131
132
133
134
135

136


137
138
139
140
141
142







-
+
-
-
+





/// The `S` type parameter is the "request" data type that clients will
/// transfer to the server.  The `R` type parameter is the "receive" data type
/// that clients will receive from the server.  The `E` type parameter can be
/// used to return application specific errors from the server to the client.
pub fn channel<S, R, E>() -> (Server<S, R, E>, Client<S, R, E>) {
  let (qpusher, qpuller) = sigq::new();

  let server = Server { qpuller };
  let server = Server(qpuller);

  let client = Client { qpusher };
  let client = Client(qpusher);

  (server, client)
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/rctx.rs.

10
11
12
13
14
15
16


17
18
19
20
21
22
23
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25







+
+







#[derive(Clone, Default)]
pub(crate) enum RCtxState {
  #[default]
  Queued,
  Active
}

/// Object used to respond to requests that have been received by a
/// [`Server`](super::Server).
pub struct ReplyContext<T, E>(swctx::SetCtx<T, RCtxState, E>);

impl<T, E> ReplyContext<T, E> {
  /// Send a reply back to originating client.
  ///
  /// # Example
  /// ```

Changes to src/server.rs.

12
13
14
15
16
17
18

19

20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

37
38
39
40
41
42
43
44
45
46
47




















48
49
50
51
52
53
54

55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

71
72
73
74
12
13
14
15
16
17
18
19

20


21
22
23
24
25
26
27
28
29
30
31
32
33
34

35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68





69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

85
86
87
88
89







+
-
+
-
-














-
+











+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+


-
-
-
-
-
+















-
+




}

/// Representation of a server object.
///
/// Each instantiation of a [`Server`] object represents an end-point which
/// will be used to receive messages from connected [`Client`](crate::Client)
/// objects.
#[repr(transparent)]
pub struct Server<S, R, E> {
pub struct Server<S, R, E>(pub(crate) sigq::Puller<ServerQueueNode<S, R, E>>);
  pub(crate) qpuller: sigq::Puller<ServerQueueNode<S, R, E>>
}

impl<S, R, E> Server<S, R, E>
where
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send
{
  /// Block and wait, indefinitely, for an incoming message from a
  /// [`Client`](crate::Client).
  ///
  /// Returns the message sent by the client and a reply context.  The server
  /// must call [`ReplyContext::reply()`] on the reply context to pass a return
  /// value to the client.
  pub fn wait(&self) -> Result<(S, ReplyContext<R, E>), Error<E>> {
    let node = self.qpuller.pop().map_err(|_| Error::ClientsDisappeared)?;
    let node = self.0.pop().map_err(|_| Error::ClientsDisappeared)?;

    // Extract the data from the node
    let msg = node.msg;

    // Create an application reply context from the reply context in the queue
    // Implicitly changes state of the reply context from Queued to Waiting
    let rctx = ReplyContext::from(node.reply);

    Ok((msg, rctx))
  }

  /// Take next next message off queue or return `None` is queue is empty.
  #[allow(clippy::type_complexity)]
  pub fn try_pop(&self) -> Result<Option<(S, ReplyContext<R, E>)>, Error<E>> {
    let node = self.0.try_pop().map_err(|_| Error::ClientsDisappeared)?;

    if let Some(node) = node {
      // Extract the data from the node
      let msg = node.msg;

      // Create an application reply context from the reply context in the
      // queue Implicitly changes state of the reply context from Queued
      // to Waiting
      let rctx = ReplyContext::from(node.reply);

      Ok(Some((msg, rctx)))
    } else {
      Ok(None)
    }
  }

  /// Same as [`Server::wait()`], but for use in an `async` context.
  pub async fn async_wait(&self) -> Result<(S, ReplyContext<R, E>), Error<E>> {
    let node = self
      .qpuller
      .apop()
      .await
      .map_err(|_| Error::ClientsDisappeared)?;
    let node = self.0.apop().await.map_err(|_| Error::ClientsDisappeared)?;

    // Extract the data from the node
    let msg = node.msg;

    // Create an application reply context from the reply context in the queue
    // Implicitly changes state of the reply context from Queued to Waiting
    let rctx = ReplyContext::from(node.reply);

    Ok((msg, rctx))
  }

  /// Returns a boolean indicating whether the queue is/was empty.  This isn't
  /// really useful unless used in very specific situations.  It mostly exists
  /// for test cases.
  pub fn was_empty(&self) -> bool {
    self.qpuller.was_empty()
    self.0.was_empty()
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Added tests/clnt_disappear.rs.
































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
use ump::{channel, Error};

#[test]
fn sync_expect_err_if_server_dropped() {
  let (server, client) = channel::<String, String, ()>();

  // nuke the only client
  drop(client);

  let Err(Error::ClientsDisappeared) = server.wait() else {
    panic!("Unexpected error");
  };
}

#[test]
fn async_expect_err_if_server_dropped() {
  let tokrt = tokio::runtime::Runtime::new().unwrap();

  let (server, client) = channel::<String, String, ()>();

  // nuke the only client
  drop(client);

  tokrt.block_on(async {
    let Err(Error::ClientsDisappeared) = server.async_wait().await else {
      panic!("Unexpected error");
    };
  });
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Deleted tests/queue_cleanup.rs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63































































-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
// Make sure that the InnerReplyContext aborts on Drop if object is still
// queued.
use std::{thread, time};

use ump::{channel, Error};

#[test]
fn sync_expect_server_death() {
  let (server, client) = channel::<String, String, ()>();

  let server_thread = thread::spawn(move || {
    // Should be doing something more robust ..
    let one_second = time::Duration::from_secs(1);
    thread::sleep(one_second);
    drop(server);
  });

  let msg = String::from("Client");
  let reply = client.req(msg);
  match reply {
    Err(Error::ServerDisappeared) => {
      // This is the expected error
    }
    _ => {
      panic!("Unexpected return value");
    }
  }

  server_thread.join().unwrap();
}


#[test]
fn async_expect_server_death() {
  let tokrt = tokio::runtime::Runtime::new().unwrap();

  let (server, client) = channel::<String, String, ()>();

  let server_thread = thread::spawn(move || {
    // Should be doing something more robust ..
    let one_second = time::Duration::from_secs(1);
    thread::sleep(one_second);
    drop(server);
  });

  tokrt.block_on(async {
    let msg = String::from("Client");
    let reply = client.areq(msg).await;
    //let reply = client.req(msg);
    match reply {
      Err(Error::ServerDisappeared) => {
        // This is the expected error
      }
      _ => {
        panic!("Unexpected return value");
      }
    }
  });

  server_thread.join().unwrap();
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Added tests/svc_disappear.rs.






































































































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
//! Tests for cases that would case the client to return
//! `Error::ServerDisappeared`:
//! - When a client issues a request after the server end-point has already
//!   been dropped.
//! - If a server end-point is dropped while there are requests in the queue,
//!   all those requests should abort with a `ServerDisappeared` error.

use std::{thread, time};

use ump::{channel, Error};

/// Issuing a request should fail immediately if the server end-point has been
/// dropped.
#[test]
fn sync_expect_err_if_server_dropped() {
  let (server, client) = channel::<String, String, ()>();

  // nuke the server end-point
  drop(server);

  let msg = String::from("Client");
  let reply = client.req(msg);
  let Err(Error::ServerDisappeared) = reply else {
    panic!("Unexpected return value");
  };
}

/// Issuing a request should fail immediately if the server end-point has been
/// dropped.
#[test]
fn async_expect_err_if_server_dropped() {
  let tokrt = tokio::runtime::Runtime::new().unwrap();

  let (server, client) = channel::<String, String, ()>();

  // nuke the server end-point
  drop(server);

  tokrt.block_on(async {
    let msg = String::from("Client");
    let reply = client.areq(msg).await;
    let Err(Error::ServerDisappeared) = reply else {
      panic!("Unexpected return value");
    };
  });
}

/// If a request is still in the queue when the server end-point is dropped,
/// the client shoull return `Error::ServerDisappeared`
#[test]
fn sync_expect_err_if_queue_dropped() {
  let (server, client) = channel::<String, String, ()>();

  // Don't actually take any requests off queue -- just terminate the server
  // end-point.
  let server_thread = thread::spawn(move || {
    // Should be doing something more robust ..
    let one_second = time::Duration::from_secs(1);
    thread::sleep(one_second);
    drop(server);
  });

  let msg = String::from("Client");
  let reply = client.req(msg);
  let Err(Error::ServerDisappeared) = reply else {
    panic!("Unexpected return value");
  };

  server_thread.join().unwrap();
}

/// If a request is still in the queue when the server end-point is dropped,
/// the client shoull return `Error::ServerDisappeared`
#[test]
fn async_expect_err_if_queue_dropped() {
  let tokrt = tokio::runtime::Runtime::new().unwrap();

  let (server, client) = channel::<String, String, ()>();

  // Don't actually take any requests off queue -- just terminate the server
  // end-point.
  let server_thread = thread::spawn(move || {
    // Should be doing something more robust ..
    let one_second = time::Duration::from_secs(1);
    thread::sleep(one_second);
    drop(server);
  });

  tokrt.block_on(async {
    let msg = String::from("Client");
    let reply = client.areq(msg).await;

    let Err(Error::ServerDisappeared) = reply else {
      panic!("Unexpected return value");
    };
  });

  server_thread.join().unwrap();
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Added tests/weak.rs.






















































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
use ump::{channel, Error};

#[test]
fn sync_weak_should_not_count() {
  let (server, client) = channel::<String, String, ()>();

  let _weak_client = client.weak();

  // nuke the only (strong) client
  drop(client);

  let Err(Error::ClientsDisappeared) = server.wait() else {
    panic!("Unexpected error");
  };
}

#[test]
fn async_weak_should_not_count() {
  let tokrt = tokio::runtime::Runtime::new().unwrap();

  let (server, client) = channel::<String, String, ()>();

  let _weak_client = client.weak();

  // nuke the only client
  drop(client);

  tokrt.block_on(async {
    let Err(Error::ClientsDisappeared) = server.async_wait().await else {
      panic!("Unexpected error");
    };
  });
}

#[test]
fn upgraded_should_count() {
  let (server, client) = channel::<String, String, ()>();

  let weak_client = client.weak();

  let Some(_client2) = weak_client.upgrade() else {
    panic!("Unable to upgrade weak_client");
  };

  // nuke original client
  drop(client);

  let Ok(None) = server.try_pop() else {
    panic!("Unexpected error");
  };
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to www/changelog.md.

1
2
3
4
5
6
7
8
9
10

11












12


13
14
15
16
17
18
19
20
21
22
23
24

25
26


27
28
29
30
31
32
33
34

35
36


37
38
39
40
41
42
43
44
45
46
47
48

49
50


51
52
53
54
55
56

57
58


59
60
61
62
63
64
65
66
67
68
69
70
71

72
73
74
75
76
77
78
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106










+

+
+
+
+
+
+
+
+
+
+
+
+

+
+












+


+
+








+


+
+












+


+
+






+


+
+













+







# Change Log

## [Unreleased]

### Added

### Changed

### Removed

---

## [0.12.1] - 2023-10-02

### Added

- Add `Client::req_async()`.
- Add `Server::try_pop()`.
- `Client` objects can spawn downgraded to `WeakClient` objects, that in turn
  can be upgraded to `Client` objects (as long as all the strong `Client`
  objects have not been dropped).

---

## [0.12.0] - 2023-08-15

[Details](https://repos.qrnch.tech/pub/ump/vdiff?from=ump-0.11.0&to=ump-0.12.0)

### Changed

- Include tests when publishing crate.
- Bugfix: Use `err::Error` rather than `rctx::err::Error` in rctx::public,
  giving `ReplyContext::reply()` and `ReplyContext::fail()` the correct return
  types.
- Use the `swctx` crate for sending back the reply rather than use a custom
  in-tree implementation.
- Update `edition` to `2021` and `rust-version` to `1.56`.
- Add `--generate-link-to-definition` to `rustdoc-args` in `Cargo.toml`

---

## [0.11.0] - 2023-07-29

[Details](https://repos.qrnch.tech/pub/ump/vdiff?from=ump-0.10.2&to=ump-0.11.0)

### Changed

- Include tests when publishing crate.
- Bugfix: Use `err::Error` rather than `rctx::err::Error` in rctx::public,
  giving `ReplyContext::reply()` and `ReplyContext::fail()` the correct return
  types.

---

## [0.10.2] - 2023-07-28

[Details](https://repos.qrnch.tech/pub/ump/vdiff?from=ump-0.10.1&to=ump-0.10.2)

### Added

- Add `send()`/`asend()` wrappers around the new `req()`/`areq()` methods with
  a deprecation notice.
- Add a `dev-docs` feature to allow internal documentation notes to be
  included in generated documentation.

### Changed

- Rename `send()`/`asend()` to `req()`/`areq()`.

---

## [0.10.1] - 2023-07-27

[Details](https://repos.qrnch.tech/pub/ump/vdiff?from=ump-0.10.0&to=ump-0.10.1)

### Changed

- Runtime dependencies:
  - Updated `sigq` to `0.13.3`.

---

## [0.10.0] - 2023-07-26

[Details](https://repos.qrnch.tech/pub/ump/vdiff?from=ump-0.9.0&to=ump-0.10.0)

### Added

- Server's receive methods will fail with `Error::ClientsDisappeared` if all
  the associated Client objects have been dropped.

### Changed

- Runtime dependencies:
  - Updated `sigq` to `0.13.2`.
- Development dependencies:
  - Updated `criterion` to `0.5.1`

---

## [0.9.0] - 2022-09-09

### Added

- Explicitly set MSRV is `1.36`