ump

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From ump-0.9.0 To ump-0.10.0

2023-07-27
01:31
Use sigq 0.13.3 to get puller bugfix. check-in: 3dece6432a user: jan tags: trunk
2023-07-26
21:01
Release maintenance. check-in: 95345f9568 user: jan tags: trunk, ump-0.10.0
20:55
Typo. check-in: 0485b4dd20 user: jan tags: trunk
2023-07-25
02:45
Update to sigq 0.13.1 and make server fail to wait for new messages if there are none to pickup and all clients have been dropped. check-in: 8877adb2d3 user: jan tags: trunk
2022-09-09
18:17
0.9.0 release preparation. check-in: 71b2832c90 user: jan tags: trunk, ump-0.9.0
17:55
Hello version 0.9.0. check-in: 0ac544acc3 user: jan tags: trunk

Changes to .efiles.

1
2
3
4








5
6
7
8
9
1
2


3
4
5
6
7
8
9
10
11
12
13
14
15


-
-
+
+
+
+
+
+
+
+





Cargo.toml
README.md
src/*.rs
src/rctx/*.rs
src/err.rs
src/lib.rs
src/server.rs
src/client.rs
src/rctx.rs
src/rctx/err.rs
src/rctx/inner.rs
src/rctx/public.rs
tests/*.rs
examples/*.rs
benches/*.rs
www/index.md
www/changelog.md

Changes to Cargo.toml.

1
2
3

4
5
6
7
8
9
10
11
12
13

14
15
16
17
18
19
20
21
22
23
24
25
26

27
28
29
30


31
32
33
34
35
1
2

3
4
5
6
7
8
9
10
11
12

13
14
15
16
17
18
19
20
21
22
23
24
25

26
27
28


29
30
31
32
33
34
35


-
+









-
+












-
+


-
-
+
+





[package]
name = "ump"
version = "0.9.0"
version = "0.10.0"
authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"]
edition = "2018"
license = "0BSD"
categories = [ "concurrency", "asynchronous" ]
keywords = [ "channel", "threads", "sync", "message-passing" ]
repository = "https://repos.qrnch.tech/pub/ump"
description = "Micro message passing library for threads/tasks communication."
rust-version = "1.39"

# Can't exclude benches", because the [[bench]] section will fail.
# Can't exclude "benches", because the [[bench]] section will fail.
exclude = [
  ".efiles",
  ".fossil-settings",
  ".fslckout",
  "examples",
  "rustfmt.toml",
  "tests",
  "www"
]

[dependencies]
parking_lot = { version = "0.12.1" }
sigq = { version = "0.11.0" }
sigq = { version = "0.13.2" }

[dev-dependencies]
criterion = { version = "0.3.6", features = ["async_tokio"] }
tokio = { version = "1.21.0", features = ["full"] }
criterion = { version = "0.5.1", features = ["async_tokio"] }
tokio = { version = "1.29.1", features = ["full"] }

[[bench]]
name = "add_server"
harness = false

Changes to benches/add_server.rs.

13
14
15
16
17
18
19
20

21
22

23
24
25
26
27
28
29
13
14
15
16
17
18
19

20


21
22
23
24
25
26
27
28







-
+
-
-
+







pub fn criterion_benchmark(c: &mut Criterion) {
  let mut group = c.benchmark_group("send operation");

  let (server, client) = channel::<Ops, i32, ()>();

  let server_thread = thread::spawn(move || {
    let mut croak = false;

    while !croak {
    while croak == false {
      let (data, rctx) = server.wait();
      let (data, rctx) = server.wait().unwrap();
      match data {
        Ops::Die => {
          croak = true;
          rctx.reply(0).unwrap();
        }
        Ops::Add(a, b) => rctx.reply(a + b).unwrap(),
        Ops::AddThreaded(a, b) => {

Changes to examples/cloneclientserver.rs.

21
22
23
24
25
26
27
28

29
30
31
32
33
34
35
21
22
23
24
25
26
27

28
29
30
31
32
33
34
35







-
+







}

fn main() {
  let (server, client) = channel::<Request, Reply, ()>();

  let client_blueprint = client.clone();
  let server_thread = thread::spawn(move || loop {
    let (req, rctx) = server.wait();
    let (req, rctx) = server.wait().unwrap();
    match req {
      Request::CloneClient => rctx
        .reply(Reply::ClientClone(client_blueprint.clone()))
        .unwrap(),
      Request::Add(a, b) => rctx.reply(Reply::Sum(a + b)).unwrap(),
      Request::Croak => {
        rctx.reply(Reply::OkICroaked).unwrap();

Changes to examples/many_once.rs.

1

2
3
4
5
6
7
8
9

1

2
3
4
5
6
7
8
-
+
-







use std::env;
use std::{env, thread};
use std::thread;

use ump::channel;

// Run several clients, but each client iterates only once.
//
// - Get number of requested clients from command line
// - Start a server on a thread
28
29
30
31
32
33
34
35

36
37
38
39
40
41
42
27
28
29
30
31
32
33

34
35
36
37
38
39
40
41







-
+







  let server_thread = thread::spawn(move || {
    let mut count = 0;

    // Keep looping until each client as sent a message
    while count < nclients {
      // Wait for data to arrive from a client
      println!("Server waiting for message ..");
      let (data, rctx) = server.wait();
      let (data, rctx) = server.wait().unwrap();

      println!("Server received: '{}'", data);

      // .. process data from client ..

      // Reply to client
      let reply = format!("Hello, {}!", data);
52
53
54
55
56
57
58
59

60
61
62
63
64
65
66
67
68
69
70
71
51
52
53
54
55
56
57

58
59
60
61
62
63
64
65
66
67
68
69
70







-
+












  let mut join_handles = Vec::new();
  for i in 0..nclients {
    let client_clone = client.clone();
    let client_thread = thread::spawn(move || {
      let name = format!("Client {}", i + 1);
      let msg = String::from(&name);
      println!("{} sending '{}'", name, msg);
      let reply = client_clone.send(String::from(msg)).unwrap();
      let reply = client_clone.send(msg).unwrap();
      println!("{} received reply '{}' -- done", name, reply);
    });
    join_handles.push(client_thread);
  }

  for n in join_handles {
    n.join().unwrap();
  }
  server_thread.join().unwrap();
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to examples/simple.rs.

1
2
3
4
5
6
7
8
9
10
11

12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

28
29
30
31
32
33
34
1
2
3
4
5
6
7
8
9
10

11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

27
28
29
30
31
32
33
34










-
+















-
+







use std::thread;

use ump::channel;

fn main() {
  let (server, client) = channel::<String, String, ()>();

  let server_thread = thread::spawn(move || {
    // Wait for data to arrive from a client
    println!("Server waiting for message ..");
    let (data, rctx) = server.wait();
    let (data, rctx) = server.wait().unwrap();

    println!("Server received: '{}'", data);

    // Process data from client

    // Reply to client
    let reply = format!("Hello, {}!", data);
    println!("Server replying '{}'", reply);
    rctx.reply(reply).unwrap();

    println!("Server done");
  });

  let msg = String::from("Client");
  println!("Client sending '{}'", msg);
  let reply = client.send(String::from(msg)).unwrap();
  let reply = client.send(msg).unwrap();
  println!("Client received reply '{}'", reply);
  println!("Client done");

  server_thread.join().unwrap();
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to examples/threaded_handler.rs.

1

2
3
4
5
6
7
8
9

1

2
3
4
5
6
7
8
-
+
-







use std::env;
use std::{env, thread};
use std::thread;

use ump::channel;

// This is basically the same test as many_once, but the server launches a new
// thread to process and reply to client requests.
fn main() {
  // Get number of client threads to kick off.  Default to two.
21
22
23
24
25
26
27
28

29
30
31
32
33
34
35
20
21
22
23
24
25
26

27
28
29
30
31
32
33
34







-
+







  let server_thread = thread::spawn(move || {
    let mut count = 0;

    // Keep looping until each client as sent a message
    while count < nclients {
      // Wait for data to arrive from a client
      println!("Server waiting for message ..");
      let (data, rctx) = server.wait();
      let (data, rctx) = server.wait().unwrap();

      // Move the received data and reply context into a thread to allow other
      // messages to be received while processing this message.
      thread::spawn(move || {
        println!("Server received: '{}'", data);

        // Process data from client
49
50
51
52
53
54
55
56

57
58
59
60
61
62
63
64
65
66
67
68
48
49
50
51
52
53
54

55
56
57
58
59
60
61
62
63
64
65
66
67







-
+












  let mut join_handles = Vec::new();
  for i in 0..nclients {
    let client_clone = client.clone();
    let client_thread = thread::spawn(move || {
      let name = format!("Client {}", i + 1);
      let msg = String::from(&name);
      println!("{} sending '{}'", name, msg);
      let reply = client_clone.send(String::from(msg)).unwrap();
      let reply = client_clone.send(msg).unwrap();
      println!("{} received reply '{}' -- done", name, reply);
    });
    join_handles.push(client_thread);
  }

  for n in join_handles {
    n.join().unwrap();
  }
  server_thread.join().unwrap();
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/client.rs.

1
2
3
4
5
6

7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

22
23
24
25
26
27
28






1

2
3
4
5
6
7
8
9
10
11
12
13
14

15
16
17
18
19
20
21
22
-
-
-
-
-
-
+
-













-
+







use std::sync::Weak;

use sigq::Queue as NotifyQueue;

use crate::err::Error;
use crate::rctx::InnerReplyContext;
use crate::{err::Error, rctx::InnerReplyContext, server::ServerQueueNode};
use crate::server::ServerQueueNode;

/// Representation of a clonable client object.
///
/// Each instantiation of a `Client` object is itself an isolated client with
/// regards to the server context.  By cloning a client a new independent
/// client is created.  ("Independent" here meaning that it is still tied to
/// the same server object, but the new client can be passed to a separate
/// thread and can independently make calls to the server).
pub struct Client<S, R, E> {
  /// Weak reference to server queue.
  ///
  /// The server context holds the only strong reference to the queue.  This
  /// allows the clients to detect when the server has terminated.
  pub(crate) srvq: Weak<NotifyQueue<ServerQueueNode<S, R, E>>>
  pub(crate) qpusher: sigq::Pusher<ServerQueueNode<S, R, E>>
}

impl<S, R, E> Client<S, R, E>
where
  R: 'static + Send,
  E: 'static + Send
{
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69


70
71
72
73
74





75
76
77
78
79

80
81
82
83
84
85
86
87
88
89
90
91


92
93
94
95
96





97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119

120
121
122
123
124
42
43
44
45
46
47
48






49
50
51
52
53
54
55
56
57
58
59





60
61
62
63
64



65
66
67
68
69
70
71
72





73
74
75
76





77
78
79
80
81



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

101
102
103
104
105
106







-
-
-
-
-
-









+
+
-
-
-
-
-
+
+
+
+
+
-
-
-


+





-
-
-
-
-


+
+
-
-
-
-
-
+
+
+
+
+
-
-
-



















-
+





  /// If the server never replied to the message and the reply context was
  /// dropped `Err(Error::NoReply)` will be returned.
  ///
  /// If an application specific error occurs it will be returned as a
  /// `Err(Error::App(E))`, where `E` is the error type used when creating the
  /// [`channel`](crate::channel).
  pub fn send(&self, out: S) -> Result<R, Error<E>> {
    // Make sure the server still lives; Weak -> Arc
    let srvq = match self.srvq.upgrade() {
      Some(srvq) => srvq,
      None => return Err(Error::ServerDisappeared)
    };

    // Create a per-call reply context.
    // This context could be created when the Client object is being created
    // and stored in the context, and thus be reused for reach client call.
    // One side-effect is that some of the state semantics becomes more
    // complicated.
    // The central repo has such an implementation checked in, but it seems to
    // have some more corner cases that aren't properly handled.
    let rctx = InnerReplyContext::new();

    self
      .qpusher
    srvq.push(ServerQueueNode {
      msg: out,
      reply: rctx.clone()
    });

      .push(ServerQueueNode {
        msg: out,
        reply: rctx.clone()
      })
      .map_err(|_| Error::ServerDisappeared)?;
    // Drop the strong server queue ref immediately so it's not held as a
    // strong ref while we're waiting for a reply.
    drop(srvq);

    let reply = rctx.get()?;

    Ok(reply)
  }

  /// Same as [`Client::send()`] but for use in `async` contexts.
  pub async fn asend(&self, out: S) -> Result<R, Error<E>> {
    let srvq = match self.srvq.upgrade() {
      Some(srvq) => srvq,
      None => return Err(Error::ServerDisappeared)
    };

    let rctx = InnerReplyContext::new();

    self
      .qpusher
    srvq.push(ServerQueueNode {
      msg: out,
      reply: rctx.clone()
    });

      .push(ServerQueueNode {
        msg: out,
        reply: rctx.clone()
      })
      .map_err(|_| Error::ServerDisappeared)?;
    // Drop the strong server queue ref immediately so it's not held as a
    // strong ref while we're waiting for a reply.
    drop(srvq);

    let result = rctx.aget().await?;

    Ok(result)
  }
}


impl<S, R, E> Clone for Client<S, R, E> {
  /// Clone a client.
  ///
  /// When a client is cloned the new object will be linked to the same server,
  /// but in all other respects the clone is a completely independent client.
  ///
  /// This means that a cloned client can be passed to a new thread/task and
  /// make new independent calls to the server without any risk of collision
  /// between clone and the original client object.
  fn clone(&self) -> Self {
    Client {
      srvq: Weak::clone(&self.srvq)
      qpusher: self.qpusher.clone()
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/err.rs.

1
2
3
4
5
6
7
8
9
10




11
12
13
14
15
16
17
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21










+
+
+
+







use std::fmt;

/// Module-specific error codes.
#[derive(Debug)]
pub enum Error<E> {
  /// The server object has shut down.  This happens when clients:
  /// - attempt to send messages to a server that has been deallocated.
  /// - have their requests dropped from the serrver's queue because the
  ///   server itself was deallocated.
  ServerDisappeared,

  /// There are no more nodes to pick up in the queue and all client
  /// end-points have been dropped.
  ClientsDisappeared,

  /// The message was delivered to the server, but the reply context was
  /// released before sending back a reply.
  NoReply,

  /// Application-specific error.
  /// The `E` type is typically declared as the third generic parameter to
44
45
46
47
48
49
50
51

52

53
54
55
56
57
58
59
48
49
50
51
52
53
54

55
56
57
58
59
60
61
62
63
64







-
+

+







      crate::rctx::Error::App(e) => Error::App(e)
    }
  }
}

impl<E: fmt::Debug> fmt::Display for Error<E> {
  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
    match &*self {
    match self {
      Error::ServerDisappeared => write!(f, "Server disappeared"),
      Error::ClientsDisappeared => write!(f, "Clients disappeared"),
      Error::NoReply => write!(f, "Server didn't reply"),
      Error::App(err) => write!(f, "Application error; {:?}", err)
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/lib.rs.

31
32
33
34
35
36
37
38
39

40
41
42
43
44




45
46

47
48

49
50
51
52
53




54
55
56


57
58
59
60
61
62





63
64

65
66
67
68
69
70
71
72
31
32
33
34
35
36
37


38
39




40
41
42
43
44

45
46

47
48




49
50
51
52
53


54
55
56





57
58
59
60
61
62

63

64
65
66
67
68
69
70







-
-
+

-
-
-
-
+
+
+
+

-
+

-
+

-
-
-
-
+
+
+
+

-
-
+
+

-
-
-
-
-
+
+
+
+
+

-
+
-







//!
//! # Example
//! ```
//! use std::thread;
//!
//! use ump::channel;
//!
//! fn main() {
//!  let (server, client) = channel::<String, String, ()>();
//! let (server, client) = channel::<String, String, ()>();
//!
//!  let server_thread = thread::spawn(move || {
//!    // Wait for data to arrive from a client
//!    println!("Server waiting for message ..");
//!    let (data, mut rctx) = server.wait();
//! let server_thread = thread::spawn(move || {
//!   // Wait for data to arrive from a client
//!   println!("Server waiting for message ..");
//!   let (data, mut rctx) = server.wait().unwrap();
//!
//!    println!("Server received: '{}'", data);
//!   println!("Server received: '{}'", data);
//!
//!    // Process data from client
//!   // Process data from client
//!
//!    // Reply to client
//!    let reply = format!("Hello, {}!", data);
//!    println!("Server replying '{}'", reply);
//!    rctx.reply(reply);
//!   // Reply to client
//!   let reply = format!("Hello, {}!", data);
//!   println!("Server replying '{}'", reply);
//!   rctx.reply(reply);
//!
//!    println!("Server done");
//!  });
//!   println!("Server done");
//! });
//!
//!  let msg = String::from("Client");
//!  println!("Client sending '{}'", msg);
//!  let reply = client.send(String::from(msg)).unwrap();
//!  println!("Client received reply '{}'", reply);
//!  println!("Client done");
//! let msg = String::from("Client");
//! println!("Client sending '{}'", msg);
//! let reply = client.send(String::from(msg)).unwrap();
//! println!("Client received reply '{}'", reply);
//! println!("Client done");
//!
//!  server_thread.join().unwrap();
//! server_thread.join().unwrap();
//! }
//! ```
//! In practice the send/reply types will probably be `enum`s used to
//! indicate command/return type with associated data.  The third type argument
//! to [`channel`] is an error type that can be used to explicitly pass errors
//! back to the sender.
//!
//! # Semantics
105
106
107
108
109
110
111
112
113

114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138

139
140


141
142
143
144
145

146
147
148
149
150
151
152
103
104
105
106
107
108
109


110





111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131


132
133


134


135


136
137
138
139
140







-
-
+
-
-
-
-
-




















+
-
-
+
+
-
-

-
-
+
-
-





mod client;
mod err;
mod rctx;
mod server;

pub use err::Error;

use std::sync::Arc;

pub use crate::{client::Client, rctx::ReplyContext, server::Server};
use sigq::Queue as NotifyQueue;

pub use crate::client::Client;
pub use crate::rctx::ReplyContext;
pub use crate::server::Server;

/// Create a pair of linked [`Server`] and [`Client`] objects.
///
/// The [`Server`] object is used to wait for incoming messages from connected
/// clients.  Once a message arrives it must reply to it using a
/// [`ReplyContext`] that's returned to it in the same call that returned the
/// message.
///
/// The [`Client`] object can be used to send messages to the [`Server`].  The
/// [`Client::send()`] call will not return until the server has replied.
///
/// Clients can be [cloned](Client::clone()); each clone will create a
/// new client object that is connected to the same server object, but is
/// completely independent of the original client.
///
/// The `S` type parameter is the "send" data type that clients will transfer
/// to the server.  The `R` type parameter is the "receive" data type that
/// clients will receive from the server.  The `E` type parameter can be used
/// to return application specific errors from the server to the client.
pub fn channel<S, R, E>() -> (Server<S, R, E>, Client<S, R, E>) {
  let (qpusher, qpuller) = sigq::new();
  let srvq = Arc::new(NotifyQueue::new());
  let server = Server {

  let server = Server { qpuller };
    srvq: Arc::clone(&srvq)
  };

  // Note: The client stores a weak reference to the server object
  let client = Client {
  let client = Client { qpusher };
    srvq: Arc::downgrade(&srvq)
  };

  (server, client)
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Added src/rctx.rs.



















1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
//! Allow a thread/task, crossing sync/async boundaries in either direction, to
//! deliver an expected piece of data to another thread/task, with
//! notification.
//!
//! These are simple channels used to deliver data from one endpoint to
//! another, where the receiver will block until data has been delivered.

mod err;
mod inner;

pub mod public;

pub(crate) use err::Error;
pub(crate) use inner::InnerReplyContext;

pub use public::ReplyContext;

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/rctx/err.rs.

15
16
17
18
19
20
21
22

23
24
25
26
27
28
29
15
16
17
18
19
20
21

22
23
24
25
26
27
28
29







-
+







  App(E)
}

impl<E: fmt::Debug> std::error::Error for Error<E> {}

impl<E: fmt::Debug> fmt::Display for Error<E> {
  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
    match &*self {
    match self {
      Error::Aborted => write!(f, "Aborted call"),
      Error::NoReply => write!(f, "Application failed to reply"),
      Error::App(err) => write!(f, "Application error; {:?}", err)
    }
  }
}

Changes to src/rctx/inner.rs.

1
2
3
4






5
6
7
8
9
10
11




1
2
3
4
5
6
7
8
9
10
11
12
13
-
-
-
-
+
+
+
+
+
+







use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::{
  future::Future,
  pin::Pin,
  sync::Arc,
  task::{Context, Poll, Waker}
};

use parking_lot::{Condvar, Mutex};

use crate::rctx::err::Error;

pub(crate) enum State<I, E> {
  /// (Still) in queue, waiting to be picked up by the server.
65
66
67
68
69
70
71
72

73
74
75
76
77
78
79
67
68
69
70
71
72
73

74
75
76
77
78
79
80
81







-
+







    drop(mg);

    self.signal_waiters();
  }

  pub(crate) fn signal_waiters(&self) {
    let mut g = self.taskwaker.lock();
    if let Some(waker) = std::mem::replace(&mut *g, None) {
    if let Some(waker) = (*g).take() {
      waker.wake();
    }

    self.signal.notify_one();
  }

  /// Retreive reply.  If a reply has not arrived yet then enter a loop that
147
148
149
150
151
152
153
154
155
156
157




158
159
160
161
162
163
164

165
166
167
168
169
170
171
149
150
151
152
153
154
155




156
157
158
159


160
161
162
163

164
165
166
167
168
169
170
171







-
-
-
-
+
+
+
+
-
-




-
+







impl<I, E> Drop for InnerReplyContext<I, E> {
  /// If the reply context never left the server queue before being destroyed
  /// it means that the server has died.  Signal this to the original caller
  /// waiting for a reply.
  fn drop(&mut self) {
    let mut do_signal: bool = false;
    let mut mg = self.data.lock();
    match *mg {
      State::Queued => {
        *mg = State::Aborted;
        do_signal = true;

    if let State::Queued = *mg {
      *mg = State::Aborted;
      do_signal = true;
      }
      _ => {}
    }
    drop(mg);
    if do_signal {
      let mut g = self.taskwaker.lock();
      if let Some(waker) = std::mem::replace(&mut *g, None) {
      if let Some(waker) = (*g).take() {
        waker.wake();
      }
      self.signal.notify_one();
    }
  }
}

Deleted src/rctx/mod.rs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18


















-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
//! Allow a thread/task, crossing sync/async boundaries in either direction, to
//! deliver an expected piece of data to another thread/task, with
//! notification.
//!
//! These are simple channels used to deliver data from one endpoint to
//! another, where the receiver will block until data has been delivered.

mod err;
mod inner;

pub mod public;

pub(crate) use err::Error;
pub(crate) use inner::InnerReplyContext;

pub use public::ReplyContext;

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/rctx/public.rs.

1

2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33










34
35
36
37
38
39
40
41

1


2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20











21
22
23
24
25
26
27
28
29
30

31
32
33
34
35
36
37
-
+
-
-



















-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
-







use crate::rctx::err::Error;
use crate::rctx::{err::Error, inner::State, InnerReplyContext};
use crate::rctx::inner::State;
use crate::rctx::InnerReplyContext;

/// Public-facing sender part of the `ReplyContext` object.
///
/// This is safe to pass to applications which are meant to only be able to put
/// a value through the `ReplyContext` channel, but not extract the value from
/// it.
pub struct ReplyContext<I, E> {
  inner: InnerReplyContext<I, E>,
  did_handover: bool
}

impl<I: 'static + Send, E> ReplyContext<I, E> {
  /// Send a reply back to originating client.
  ///
  /// # Example
  /// ```
  /// use std::thread;
  /// use ump::channel;
  ///
  /// fn main() {
  ///   let (server, client) = channel::<String, String, ()>();
  ///   let server_thread = thread::spawn(move || {
  ///     let (data, rctx) = server.wait();
  ///     let reply = format!("Hello, {}!", data);
  ///     rctx.reply(reply).unwrap();
  ///   });
  ///   let msg = String::from("Client");
  ///   let reply = client.send(String::from(msg)).unwrap();
  ///   assert_eq!(reply, "Hello, Client!");
  ///   server_thread.join().unwrap();
  /// let (server, client) = channel::<String, String, ()>();
  /// let server_thread = thread::spawn(move || {
  ///   let (data, rctx) = server.wait().unwrap();
  ///   let reply = format!("Hello, {}!", data);
  ///   rctx.reply(reply).unwrap();
  /// });
  /// let msg = String::from("Client");
  /// let reply = client.send(String::from(msg)).unwrap();
  /// assert_eq!(reply, "Hello, Client!");
  /// server_thread.join().unwrap();
  /// }
  /// ```
  ///
  /// # Semantics
  /// This call is safe to make after the server context has been released.
  pub fn reply(mut self, data: I) -> Result<(), Error<E>> {
    self.inner.put(data);

55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
















79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97

98
99
100
101
102
103




104
105
106
107
108
109
110

111
112
113
114
115
116
117
51
52
53
54
55
56
57

















58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90

91
92
93




94
95
96
97


98
99
100
101

102
103
104
105
106
107
108
109







-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-

















-
+


-
-
-
-
+
+
+
+
-
-




-
+







  /// use ump::{channel, Error};
  ///
  /// #[derive(Debug, PartialEq)]
  /// enum MyError {
  ///   SomeError(String)
  /// }
  ///
  /// fn main() {
  ///   let (server, client) = channel::<String, String, MyError>();
  ///   let server_thread = thread::spawn(move || {
  ///     let (_, rctx) = server.wait();
  ///     rctx.fail(MyError::SomeError("failed".to_string())).unwrap();
  ///   });
  ///   let msg = String::from("Client");
  ///   let reply = client.send(String::from(msg));
  ///   match reply {
  ///     Err(Error::App(MyError::SomeError(s))) => {
  ///       assert_eq!(s, "failed");
  ///     }
  ///     _ => {
  ///       panic!("Unexpected return value");
  ///     }
  ///   }
  ///   server_thread.join().unwrap();
  /// let (server, client) = channel::<String, String, MyError>();
  /// let server_thread = thread::spawn(move || {
  ///   let (_, rctx) = server.wait().unwrap();
  ///   rctx.fail(MyError::SomeError("failed".to_string())).unwrap();
  /// });
  /// let msg = String::from("Client");
  /// let reply = client.send(String::from(msg));
  /// match reply {
  ///   Err(Error::App(MyError::SomeError(s))) => {
  ///     assert_eq!(s, "failed");
  ///   }
  ///   _ => {
  ///     panic!("Unexpected return value");
  ///   }
  /// }
  /// server_thread.join().unwrap();
  /// }
  /// ```
  ///
  /// # Semantics
  /// This call is safe to make after the server context has been released.
  pub fn fail(mut self, err: E) -> Result<(), Error<E>> {
    self.inner.fail(err);

    self.did_handover = true;

    Ok(())
  }
}

impl<I, E> Drop for ReplyContext<I, E> {
  /// If the reply context is dropped while still waiting for a reply then
  /// report back to the caller that it should expect no reply.
  fn drop(&mut self) {
    if self.did_handover == false {
    if !self.did_handover {
      let mut do_signal: bool = false;
      let mut mg = self.inner.data.lock();
      match *mg {
        State::Waiting => {
          *mg = State::NoReply;
          do_signal = true;

      if let State::Waiting = *mg {
        *mg = State::NoReply;
        do_signal = true;
        }
        _ => {}
      }
      drop(mg);
      if do_signal {
        let mut g = self.inner.taskwaker.lock();
        if let Some(waker) = std::mem::replace(&mut *g, None) {
        if let Some(waker) = (*g).take() {
          waker.wake();
        }

        self.inner.signal.notify_one();
      }
    }
  }

Changes to src/server.rs.

1
2

3
4
5

6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37


38
39
40
41
42
43
44
45
46

47
48
49
50
51






52
53
54
55
56
57
58
59
60

61
62
63
64
65
66
67

68
69
70
71


1

2

3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

19
20
21
22
23
24
25
26
27
28
29
30
31
32
33


34
35
36
37
38
39
40
41
42
43

44
45
46
47


48
49
50
51
52
53
54
55
56
57
58
59
60
61

62
63
64
65
66
67
68

69
70
71
72
73
-
-
+
-

-
+















-
+














-
-
+
+








-
+



-
-
+
+
+
+
+
+








-
+






-
+




use std::sync::Arc;

use crate::rctx::{InnerReplyContext, ReplyContext};
use sigq::Queue as NotifyQueue;

use crate::rctx::{InnerReplyContext, ReplyContext};
use crate::err::Error;

pub(crate) struct ServerQueueNode<S, R, E> {
  /// Raw message being sent from the client to the server.
  pub(crate) msg: S,

  /// Keep track of data needed to share reply data.
  pub(crate) reply: InnerReplyContext<R, E>
}

/// Representation of a server object.
///
/// Each instantiation of a [`Server`] object represents an end-point which
/// will be used to receive messages from connected [`Client`](crate::Client)
/// objects.
pub struct Server<S, R, E> {
  pub(crate) srvq: Arc<NotifyQueue<ServerQueueNode<S, R, E>>>
  pub(crate) qpuller: sigq::Puller<ServerQueueNode<S, R, E>>
}

impl<S, R, E> Server<S, R, E>
where
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send
{
  /// Block and wait, indefinitely, for an incoming message from a
  /// [`Client`](crate::Client).
  ///
  /// Returns the message sent by the client and a reply context.  The server
  /// must call [`ReplyContext::reply()`] on the reply context to pass a return
  /// value to the client.
  pub fn wait(&self) -> (S, ReplyContext<R, E>) {
    let node = self.srvq.pop();
  pub fn wait(&self) -> Result<(S, ReplyContext<R, E>), Error<E>> {
    let node = self.qpuller.pop().map_err(|_| Error::ClientsDisappeared)?;

    // Extract the data from the node
    let msg = node.msg;

    // Create an application reply context from the reply context in the queue
    // Implicitly changes state of the reply context from Queued to Waiting
    let rctx = ReplyContext::from(node.reply);

    (msg, rctx)
    Ok((msg, rctx))
  }

  /// Same as [`Server::wait()`], but for use in an `async` context.
  pub async fn async_wait(&self) -> (S, ReplyContext<R, E>) {
    let node = self.srvq.apop().await;
  pub async fn async_wait(&self) -> Result<(S, ReplyContext<R, E>), Error<E>> {
    let node = self
      .qpuller
      .apop()
      .await
      .map_err(|_| Error::ClientsDisappeared)?;

    // Extract the data from the node
    let msg = node.msg;

    // Create an application reply context from the reply context in the queue
    // Implicitly changes state of the reply context from Queued to Waiting
    let rctx = ReplyContext::from(node.reply);

    (msg, rctx)
    Ok((msg, rctx))
  }

  /// Returns a boolean indicating whether the queue is/was empty.  This isn't
  /// really useful unless used in very specific situations.  It mostly exists
  /// for test cases.
  pub fn was_empty(&self) -> bool {
    self.srvq.was_empty()
    self.qpuller.was_empty()
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to tests/async_client.rs.

17
18
19
20
21
22
23
24

25
26
27
28
29
30
31
17
18
19
20
21
22
23

24
25
26
27
28
29
30
31







-
+







  let tokrt = tokio::runtime::Runtime::new().unwrap();

  let niterations = 256;

  let (server, client) = channel::<Request, Reply, ()>();

  let server_thread = thread::spawn(move || loop {
    let (req, rctx) = server.wait();
    let (req, rctx) = server.wait().unwrap();
    match req {
      Request::Add(a, b) => rctx.reply(Reply::Sum(a + b)).unwrap(),
      Request::Croak => {
        rctx.reply(Reply::OkICroaked).unwrap();
        break;
      }
    }

Changes to tests/fail.rs.

10
11
12
13
14
15
16
17

18
19
20
21
22
23

24
25
26
27
28
29
30
10
11
12
13
14
15
16

17
18
19
20
21
22

23
24
25
26
27
28
29
30







-
+





-
+








#[test]
fn sync_expect_noreply() {
  let (server, client) = channel::<String, String, MyError>();

  let server_thread = thread::spawn(move || {
    // Wait for data to arrive from a client
    let (_, rctx) = server.wait();
    let (_, rctx) = server.wait().unwrap();

    rctx.fail(MyError::SomeError("failed".to_string())).unwrap();
  });

  let msg = String::from("Client");
  let reply = client.send(String::from(msg));
  let reply = client.send(msg);
  match reply {
    Err(Error::App(MyError::SomeError(s))) => {
      assert_eq!(s, "failed");
    }
    _ => {
      panic!("Unexpected return value");
    }
38
39
40
41
42
43
44
45

46
47
48
49
50
51
52
38
39
40
41
42
43
44

45
46
47
48
49
50
51
52







-
+







fn async_expect_noreply() {
  let tokrt = tokio::runtime::Runtime::new().unwrap();

  let (server, client) = channel::<String, String, MyError>();

  let server_thread = thread::spawn(move || {
    // Wait for data to arrive from a client
    let (_, rctx) = server.wait();
    let (_, rctx) = server.wait().unwrap();

    rctx.fail(MyError::SomeError("failed".to_string())).unwrap();
  });

  tokrt.block_on(async {
    let msg = String::from("Client");
    let reply = client.asend(msg).await;

Changes to tests/noreply.rs.

1
2
3
4
5
6
7
8
9
10
11
12

13
14
15
16
17
18
19

20
21
22
23
24
25
26
1
2
3
4
5
6
7
8
9
10
11

12
13
14
15
16
17
18

19
20
21
22
23
24
25
26











-
+






-
+







// Make sure that the ReplyContext aborts on Drop of no reply was sent.
use std::thread;

use ump::{channel, Error};

#[test]
fn sync_expect_noreply() {
  let (server, client) = channel::<String, String, ()>();

  let server_thread = thread::spawn(move || {
    // Wait for data to arrive from a client
    let (_, rctx) = server.wait();
    let (_, rctx) = server.wait().unwrap();

    // Don't do this.
    drop(rctx);
  });

  let msg = String::from("Client");
  let reply = client.send(String::from(msg));
  let reply = client.send(msg);
  match reply {
    Err(Error::NoReply) => {
      // This is the expected error
    }
    _ => {
      panic!("Unexpected return value");
    }
34
35
36
37
38
39
40
41

42
43
44
45
46
47
48
34
35
36
37
38
39
40

41
42
43
44
45
46
47
48







-
+







fn async_expect_noreply() {
  let tokrt = tokio::runtime::Runtime::new().unwrap();

  let (server, client) = channel::<String, String, ()>();

  let server_thread = thread::spawn(move || {
    // Wait for data to arrive from a client
    let (_, rctx) = server.wait();
    let (_, rctx) = server.wait().unwrap();

    // Don't do this.
    drop(rctx);
  });

  tokrt.block_on(async {
    let msg = String::from("Client");

Changes to tests/queue_cleanup.rs.

12
13
14
15
16
17
18
19

20
21
22
23
24
25
26
12
13
14
15
16
17
18

19
20
21
22
23
24
25
26







-
+







    // Should be doing something more robust ..
    let one_second = time::Duration::from_secs(1);
    thread::sleep(one_second);
    drop(server);
  });

  let msg = String::from("Client");
  let reply = client.send(String::from(msg));
  let reply = client.send(msg);
  match reply {
    Err(Error::ServerDisappeared) => {
      // This is the expected error
    }
    _ => {
      panic!("Unexpected return value");
    }

Changes to tests/stress.rs.

11
12
13
14
15
16
17
18
19


20
21
22
23
24
25
26
11
12
13
14
15
16
17


18
19
20
21
22
23
24
25
26







-
-
+
+







#[test]
fn one_at_a_time() {
  let (server, client) = channel::<Ops, i32, ()>();

  let server_thread = thread::spawn(move || {
    let mut croak = false;

    while croak == false {
      let (data, rctx) = server.wait();
    while !croak {
      let (data, rctx) = server.wait().unwrap();
      match data {
        Ops::Die => {
          croak = true;
          rctx.reply(0).unwrap();
        }
        Ops::Add(a, b) => {
          rctx.reply(a + b).unwrap();
54
55
56
57
58
59
60
61

62
63
64
65
66
67
68
54
55
56
57
58
59
60

61
62
63
64
65
66
67
68







-
+







  let niterations = 256;

  let server_thread = thread::spawn(move || {
    let mut count = 0;
    let mut handles = Vec::new();
    // +1 because we want to wait for the croak message as well
    while count < niterations + 1 {
      let (data, rctx) = server.wait();
      let (data, rctx) = server.wait().unwrap();
      let h = thread::spawn(move || match data {
        Ops::Die => {
          rctx.reply(0).unwrap();
        }
        Ops::Add(a, b) => {
          rctx.reply(a + b).unwrap();
        }

Changes to www/changelog.md.

1
2
3
4
5
6
7
8
9
10















11
12
13
14
15
16
17
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32










+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







# Change Log

## [Unreleased]

### Added

### Changed

### Removed


## [0.10.0] - 2023-07-26

### Added

- Server's receive methods will fail with `Error::ClientsDisappeared` if all
  the associated Client objects have been dropped.

### Changed

- Runtime dependencies:
  - Updated `sigq` to `0.13.2`.
- Development dependencies:
  - Updated `criterion` to `0.5.1`


## [0.9.0] - 2022-09-09

### Added

- Explicitly set MSRV is `1.36`