ump

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From ump-0.10.1 To ump-0.10.2

2023-07-28
04:12
Include tests in packaging. check-in: bca62b6697 user: jan tags: trunk
01:44
Release maintenance. check-in: 27099be4a4 user: jan tags: trunk, ump-0.10.2
01:25
Rename 'send' message operation to 'request'. Add dev-docs feature for including internal notes to generated documentation. check-in: fb98947448 user: jan tags: trunk
2023-07-27
01:34
Release maintenance. check-in: 7079d4a788 user: jan tags: trunk, ump-0.10.1
01:31
Use sigq 0.13.3 to get puller bugfix. check-in: 3dece6432a user: jan tags: trunk

Changes to Cargo.toml.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23



24
25
26
27
28
29
30
31
32
33
34
35
[package]
name = "ump"
version = "0.10.1"
authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"]
edition = "2018"
license = "0BSD"
categories = [ "concurrency", "asynchronous" ]
keywords = [ "channel", "threads", "sync", "message-passing" ]
repository = "https://repos.qrnch.tech/pub/ump"
description = "Micro message passing library for threads/tasks communication."
rust-version = "1.39"

# Can't exclude "benches", because the [[bench]] section will fail.
exclude = [
  ".efiles",
  ".fossil-settings",
  ".fslckout",
  "examples",
  "rustfmt.toml",
  "tests",
  "www"
]




[dependencies]
parking_lot = { version = "0.12.1" }
sigq = { version = "0.13.3" }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["async_tokio"] }
tokio = { version = "1.29.1", features = ["full"] }

[[bench]]
name = "add_server"
harness = false



|




















>
>
>












1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
[package]
name = "ump"
version = "0.10.2"
authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"]
edition = "2018"
license = "0BSD"
categories = [ "concurrency", "asynchronous" ]
keywords = [ "channel", "threads", "sync", "message-passing" ]
repository = "https://repos.qrnch.tech/pub/ump"
description = "Micro message passing library for threads/tasks communication."
rust-version = "1.39"

# Can't exclude "benches", because the [[bench]] section will fail.
exclude = [
  ".efiles",
  ".fossil-settings",
  ".fslckout",
  "examples",
  "rustfmt.toml",
  "tests",
  "www"
]

[features]
dev-docs = []

[dependencies]
parking_lot = { version = "0.12.1" }
sigq = { version = "0.13.3" }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["async_tokio"] }
tokio = { version = "1.29.1", features = ["full"] }

[[bench]]
name = "add_server"
harness = false

Changes to benches/add_server.rs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use std::thread;

use criterion::{criterion_group, criterion_main, Criterion};

use ump::channel;

enum Ops {
  Die,
  Add(i32, i32),
  AddThreaded(i32, i32)
}

pub fn criterion_benchmark(c: &mut Criterion) {
  let mut group = c.benchmark_group("send operation");

  let (server, client) = channel::<Ops, i32, ()>();

  let server_thread = thread::spawn(move || {
    let mut croak = false;
    while !croak {
      let (data, rctx) = server.wait().unwrap();













|







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use std::thread;

use criterion::{criterion_group, criterion_main, Criterion};

use ump::channel;

enum Ops {
  Die,
  Add(i32, i32),
  AddThreaded(i32, i32)
}

pub fn criterion_benchmark(c: &mut Criterion) {
  let mut group = c.benchmark_group("req operation");

  let (server, client) = channel::<Ops, i32, ()>();

  let server_thread = thread::spawn(move || {
    let mut croak = false;
    while !croak {
      let (data, rctx) = server.wait().unwrap();
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
  let mut p: i32 = 0;
  let mut q: i32 = 0;

  group.bench_function("add", |b| {
    b.iter(|| {
      p += 2;
      q -= 3;
      let result = client.send(Ops::Add(p, q)).unwrap();
      assert_eq!(result, q + p);
    })
  });

  p = 0;
  q = 0;
  group.bench_function("add (threaded)", |b| {
    b.iter(|| {
      p += 2;
      q -= 3;
      let result = client.send(Ops::AddThreaded(p, q)).unwrap();
      assert_eq!(result, q + p);
    })
  });


  let rt = tokio::runtime::Runtime::new().unwrap();
  group.bench_function("add (async)", |b| {
    b.to_async(&rt).iter(|| async {
      let p = 1;
      let q = 2;

      let result = client.asend(Ops::Add(p, q)).await.unwrap();
      assert_eq!(result, q + p);
    })
  });


  let rt = tokio::runtime::Runtime::new().unwrap();
  group.bench_function("add (async, threaded)", |b| {
    b.to_async(&rt).iter(|| async {
      let p = 1;
      let q = 2;

      let result = client.asend(Ops::AddThreaded(p, q)).await.unwrap();
      assert_eq!(result, q + p);
    })
  });


  let result = client.send(Ops::Die).unwrap();
  assert_eq!(result, 0);

  server_thread.join().unwrap();
}


criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :







|










|











|











|





|










37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
  let mut p: i32 = 0;
  let mut q: i32 = 0;

  group.bench_function("add", |b| {
    b.iter(|| {
      p += 2;
      q -= 3;
      let result = client.req(Ops::Add(p, q)).unwrap();
      assert_eq!(result, q + p);
    })
  });

  p = 0;
  q = 0;
  group.bench_function("add (threaded)", |b| {
    b.iter(|| {
      p += 2;
      q -= 3;
      let result = client.req(Ops::AddThreaded(p, q)).unwrap();
      assert_eq!(result, q + p);
    })
  });


  let rt = tokio::runtime::Runtime::new().unwrap();
  group.bench_function("add (async)", |b| {
    b.to_async(&rt).iter(|| async {
      let p = 1;
      let q = 2;

      let result = client.areq(Ops::Add(p, q)).await.unwrap();
      assert_eq!(result, q + p);
    })
  });


  let rt = tokio::runtime::Runtime::new().unwrap();
  group.bench_function("add (async, threaded)", |b| {
    b.to_async(&rt).iter(|| async {
      let p = 1;
      let q = 2;

      let result = client.areq(Ops::AddThreaded(p, q)).await.unwrap();
      assert_eq!(result, q + p);
    })
  });


  let result = client.req(Ops::Die).unwrap();
  assert_eq!(result, 0);

  server_thread.join().unwrap();
}


criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to examples/cloneclientserver.rs.

35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
        rctx.reply(Reply::OkICroaked).unwrap();
        break;
      }
    }
  });

  if let Reply::ClientClone(cloned_client) =
    client.send(Request::CloneClient).unwrap()
  {
    if let Reply::Sum(x) = cloned_client.send(Request::Add(5, 7)).unwrap() {
      assert_eq!(x, 12);
    } else {
      panic!("Unexpected result");
    }
  } else {
    panic!("Unexpected result");
  }

  let _ = client.send(Request::Croak);

  server_thread.join().unwrap();
}


// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :







|

|








|



<


35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

57
58
        rctx.reply(Reply::OkICroaked).unwrap();
        break;
      }
    }
  });

  if let Reply::ClientClone(cloned_client) =
    client.req(Request::CloneClient).unwrap()
  {
    if let Reply::Sum(x) = cloned_client.req(Request::Add(5, 7)).unwrap() {
      assert_eq!(x, 12);
    } else {
      panic!("Unexpected result");
    }
  } else {
    panic!("Unexpected result");
  }

  let _ = client.req(Request::Croak);

  server_thread.join().unwrap();
}


// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to examples/many_once.rs.

51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
  let mut join_handles = Vec::new();
  for i in 0..nclients {
    let client_clone = client.clone();
    let client_thread = thread::spawn(move || {
      let name = format!("Client {}", i + 1);
      let msg = String::from(&name);
      println!("{} sending '{}'", name, msg);
      let reply = client_clone.send(msg).unwrap();
      println!("{} received reply '{}' -- done", name, reply);
    });
    join_handles.push(client_thread);
  }

  for n in join_handles {
    n.join().unwrap();







|







51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
  let mut join_handles = Vec::new();
  for i in 0..nclients {
    let client_clone = client.clone();
    let client_thread = thread::spawn(move || {
      let name = format!("Client {}", i + 1);
      let msg = String::from(&name);
      println!("{} sending '{}'", name, msg);
      let reply = client_clone.req(msg).unwrap();
      println!("{} received reply '{}' -- done", name, reply);
    });
    join_handles.push(client_thread);
  }

  for n in join_handles {
    n.join().unwrap();

Changes to examples/simple.rs.

20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    rctx.reply(reply).unwrap();

    println!("Server done");
  });

  let msg = String::from("Client");
  println!("Client sending '{}'", msg);
  let reply = client.send(msg).unwrap();
  println!("Client received reply '{}'", reply);
  println!("Client done");

  server_thread.join().unwrap();
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :







|







20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    rctx.reply(reply).unwrap();

    println!("Server done");
  });

  let msg = String::from("Client");
  println!("Client sending '{}'", msg);
  let reply = client.req(msg).unwrap();
  println!("Client received reply '{}'", reply);
  println!("Client done");

  server_thread.join().unwrap();
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to examples/threaded_handler.rs.

48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
  let mut join_handles = Vec::new();
  for i in 0..nclients {
    let client_clone = client.clone();
    let client_thread = thread::spawn(move || {
      let name = format!("Client {}", i + 1);
      let msg = String::from(&name);
      println!("{} sending '{}'", name, msg);
      let reply = client_clone.send(msg).unwrap();
      println!("{} received reply '{}' -- done", name, reply);
    });
    join_handles.push(client_thread);
  }

  for n in join_handles {
    n.join().unwrap();







|







48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
  let mut join_handles = Vec::new();
  for i in 0..nclients {
    let client_clone = client.clone();
    let client_thread = thread::spawn(move || {
      let name = format!("Client {}", i + 1);
      let msg = String::from(&name);
      println!("{} sending '{}'", name, msg);
      let reply = client_clone.req(msg).unwrap();
      println!("{} received reply '{}' -- done", name, reply);
    });
    join_handles.push(client_thread);
  }

  for n in join_handles {
    n.join().unwrap();

Changes to src/client.rs.

41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
  ///
  /// If the server never replied to the message and the reply context was
  /// dropped `Err(Error::NoReply)` will be returned.
  ///
  /// If an application specific error occurs it will be returned as a
  /// `Err(Error::App(E))`, where `E` is the error type used when creating the
  /// [`channel`](crate::channel).
  pub fn send(&self, out: S) -> Result<R, Error<E>> {
    // Create a per-call reply context.
    // This context could be created when the Client object is being created
    // and stored in the context, and thus be reused for reach client call.
    // One side-effect is that some of the state semantics becomes more
    // complicated.
    // The central repo has such an implementation checked in, but it seems to
    // have some more corner cases that aren't properly handled.







|







41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
  ///
  /// If the server never replied to the message and the reply context was
  /// dropped `Err(Error::NoReply)` will be returned.
  ///
  /// If an application specific error occurs it will be returned as a
  /// `Err(Error::App(E))`, where `E` is the error type used when creating the
  /// [`channel`](crate::channel).
  pub fn req(&self, out: S) -> Result<R, Error<E>> {
    // Create a per-call reply context.
    // This context could be created when the Client object is being created
    // and stored in the context, and thus be reused for reach client call.
    // One side-effect is that some of the state semantics becomes more
    // complicated.
    // The central repo has such an implementation checked in, but it seems to
    // have some more corner cases that aren't properly handled.
64
65
66
67
68
69
70





71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86





87
88
89
90
91
92
93
      .map_err(|_| Error::ServerDisappeared)?;

    let reply = rctx.get()?;

    Ok(reply)
  }






  /// Same as [`Client::send()`] but for use in `async` contexts.
  pub async fn asend(&self, out: S) -> Result<R, Error<E>> {
    let rctx = InnerReplyContext::new();

    self
      .qpusher
      .push(ServerQueueNode {
        msg: out,
        reply: rctx.clone()
      })
      .map_err(|_| Error::ServerDisappeared)?;

    let result = rctx.aget().await?;

    Ok(result)
  }





}


impl<S, R, E> Clone for Client<S, R, E> {
  /// Clone a client.
  ///
  /// When a client is cloned the new object will be linked to the same server,







>
>
>
>
>
|
|














>
>
>
>
>







64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
      .map_err(|_| Error::ServerDisappeared)?;

    let reply = rctx.get()?;

    Ok(reply)
  }

  #[deprecated(since = "0.10.2", note = "Use req() instead.")]
  pub fn send(&self, out: S) -> Result<R, Error<E>> {
    self.req(out)
  }

  /// Same as [`Client::req()`] but for use in `async` contexts.
  pub async fn areq(&self, out: S) -> Result<R, Error<E>> {
    let rctx = InnerReplyContext::new();

    self
      .qpusher
      .push(ServerQueueNode {
        msg: out,
        reply: rctx.clone()
      })
      .map_err(|_| Error::ServerDisappeared)?;

    let result = rctx.aget().await?;

    Ok(result)
  }

  #[deprecated(since = "0.10.2", note = "Use areq() instead.")]
  pub async fn asend(&self, out: S) -> Result<R, Error<E>> {
    self.areq(out).await
  }
}


impl<S, R, E> Clone for Client<S, R, E> {
  /// Clone a client.
  ///
  /// When a client is cloned the new object will be linked to the same server,

Changes to src/err.rs.

1
2
3
4
5
6


7
8
9
10
11


12
13

14
15
16
17
18
19
20

21
22
23
24
25
26
27
use std::fmt;

/// Module-specific error codes.
#[derive(Debug)]
pub enum Error<E> {
  /// The server object has shut down.  This happens when clients:


  /// - attempt to send messages to a server that has been deallocated.
  /// - have their requests dropped from the serrver's queue because the
  ///   server itself was deallocated.
  ServerDisappeared,



  /// There are no more nodes to pick up in the queue and all client
  /// end-points have been dropped.

  ClientsDisappeared,

  /// The message was delivered to the server, but the reply context was
  /// released before sending back a reply.
  NoReply,

  /// Application-specific error.

  /// The `E` type is typically declared as the third generic parameter to
  /// [`channel`](crate::channel()).
  App(E)
}

impl<E> Error<E> {
  pub fn into_apperr(self) -> Option<E> {





|
>
>
|




>
>

|
>



|



>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
use std::fmt;

/// Module-specific error codes.
#[derive(Debug)]
pub enum Error<E> {
  /// The server object has shut down.
  ///
  /// This happens when clients:
  /// - attempt to transmit messages to a server that has been deallocated.
  /// - have their requests dropped from the serrver's queue because the
  ///   server itself was deallocated.
  ServerDisappeared,

  /// No more client end-points remain.
  ///
  /// There are no more nodes to pick up in the queue and all client
  /// end-points have been dropped (implied: no new nodes will ever be added
  /// to the queue).
  ClientsDisappeared,

  /// The message was delivered to the server, but the reply context was
  /// dropped before transmitting a reply.
  NoReply,

  /// Application-specific error.
  ///
  /// The `E` type is typically declared as the third generic parameter to
  /// [`channel`](crate::channel()).
  App(E)
}

impl<E> Error<E> {
  pub fn into_apperr(self) -> Option<E> {

Changes to src/lib.rs.

11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//! An application calls [`channel`] to create a linked pair of a [`Server`]
//! and a [`Client`].
//!
//! The server calls [`Server::wait()`]/[`Server::async_wait()`], which
//! blocks and waits for an incoming message from a client.
//!
//! A client, in a separate thread or task, calls
//! [`Client::send()`]/[`Client::asend()`] to send a message to the server.
//!
//! The server's wait call returns two objects:  The message sent by the
//! client, and a [`ReplyContext`].
//!
//! After processing its application-defined message, the server *must* call
//! the [`ReplyContext::reply()`] on the returned reply context object to
//! return a reply message to the client.







|







11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//! An application calls [`channel`] to create a linked pair of a [`Server`]
//! and a [`Client`].
//!
//! The server calls [`Server::wait()`]/[`Server::async_wait()`], which
//! blocks and waits for an incoming message from a client.
//!
//! A client, in a separate thread or task, calls
//! [`Client::req()`]/[`Client::areq()`] to send a message to the server.
//!
//! The server's wait call returns two objects:  The message sent by the
//! client, and a [`ReplyContext`].
//!
//! After processing its application-defined message, the server *must* call
//! the [`ReplyContext::reply()`] on the returned reply context object to
//! return a reply message to the client.
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
//!   rctx.reply(reply);
//!
//!   println!("Server done");
//! });
//!
//! let msg = String::from("Client");
//! println!("Client sending '{}'", msg);
//! let reply = client.send(String::from(msg)).unwrap();
//! println!("Client received reply '{}'", reply);
//! println!("Client done");
//!
//! server_thread.join().unwrap();
//! ```
//! In practice the send/reply types will probably be `enum`s used to
//! indicate command/return type with associated data.  The third type argument
//! to [`channel`] is an error type that can be used to explicitly pass errors
//! back to the sender.
//!
//! # Semantics
//! There are some potentially useful semantic quirks that can be good to know
//! about, but some of them should be used with caution.  This section will







|





|







52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
//!   rctx.reply(reply);
//!
//!   println!("Server done");
//! });
//!
//! let msg = String::from("Client");
//! println!("Client sending '{}'", msg);
//! let reply = client.req(msg).unwrap();
//! println!("Client received reply '{}'", reply);
//! println!("Client done");
//!
//! server_thread.join().unwrap();
//! ```
//! In practice the req/reply types will probably be `enum`s used to
//! indicate command/return type with associated data.  The third type argument
//! to [`channel`] is an error type that can be used to explicitly pass errors
//! back to the sender.
//!
//! # Semantics
//! There are some potentially useful semantic quirks that can be good to know
//! about, but some of them should be used with caution.  This section will
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
///
/// The [`Server`] object is used to wait for incoming messages from connected
/// clients.  Once a message arrives it must reply to it using a
/// [`ReplyContext`] that's returned to it in the same call that returned the
/// message.
///
/// The [`Client`] object can be used to send messages to the [`Server`].  The
/// [`Client::send()`] call will not return until the server has replied.
///
/// Clients can be [cloned](Client::clone()); each clone will create a
/// new client object that is connected to the same server object, but is
/// completely independent of the original client.
///
/// The `S` type parameter is the "send" data type that clients will transfer
/// to the server.  The `R` type parameter is the "receive" data type that
/// clients will receive from the server.  The `E` type parameter can be used
/// to return application specific errors from the server to the client.
pub fn channel<S, R, E>() -> (Server<S, R, E>, Client<S, R, E>) {
  let (qpusher, qpuller) = sigq::new();

  let server = Server { qpuller };

  let client = Client { qpusher };

  (server, client)
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :







|





|
|
|
|











113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
///
/// The [`Server`] object is used to wait for incoming messages from connected
/// clients.  Once a message arrives it must reply to it using a
/// [`ReplyContext`] that's returned to it in the same call that returned the
/// message.
///
/// The [`Client`] object can be used to send messages to the [`Server`].  The
/// [`Client::req()`] call will not return until the server has replied.
///
/// Clients can be [cloned](Client::clone()); each clone will create a
/// new client object that is connected to the same server object, but is
/// completely independent of the original client.
///
/// The `S` type parameter is the "request" data type that clients will
/// transfer to the server.  The `R` type parameter is the "receive" data type
/// that clients will receive from the server.  The `E` type parameter can be
/// used to return application specific errors from the server to the client.
pub fn channel<S, R, E>() -> (Server<S, R, E>, Client<S, R, E>) {
  let (qpusher, qpuller) = sigq::new();

  let server = Server { qpuller };

  let client = Client { qpusher };

  (server, client)
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/rctx/public.rs.

1
2





3
4
5
6
7


8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use crate::rctx::{err::Error, inner::State, InnerReplyContext};






/// Public-facing sender part of the `ReplyContext` object.
///
/// This is safe to pass to applications which are meant to only be able to put
/// a value through the `ReplyContext` channel, but not extract the value from
/// it.


pub struct ReplyContext<I, E> {
  inner: InnerReplyContext<I, E>,
  did_handover: bool
}

impl<I: 'static + Send, E> ReplyContext<I, E> {
  /// Send a reply back to originating client.
  ///
  /// # Example
  /// ```
  /// use std::thread;
  /// use ump::channel;
  ///
  /// let (server, client) = channel::<String, String, ()>();
  /// let server_thread = thread::spawn(move || {
  ///   let (data, rctx) = server.wait().unwrap();
  ///   let reply = format!("Hello, {}!", data);
  ///   rctx.reply(reply).unwrap();
  /// });
  /// let msg = String::from("Client");
  /// let reply = client.send(String::from(msg)).unwrap();
  /// assert_eq!(reply, "Hello, Client!");
  /// server_thread.join().unwrap();
  /// ```
  ///
  /// # Semantics
  /// This call is safe to make after the server context has been released.
  pub fn reply(mut self, data: I) -> Result<(), Error<E>> {


>
>
>
>
>
|
|
|
|
|
>
>




















|







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use crate::rctx::{err::Error, inner::State, InnerReplyContext};

/// Context used to transmit a reply back to the originating requester.
#[cfg_attr(
  feature = "dev-docs",
  doc = r#"
# Internals
Public-facing sender part of the `ReplyContext` object.

This, as opposed to `InnerReplyContext`, is safe to pass to applications that
are meant to only be able to put a value through the `ReplyContext` channel,
but not extract the value from it.
"#
)]
pub struct ReplyContext<I, E> {
  inner: InnerReplyContext<I, E>,
  did_handover: bool
}

impl<I: 'static + Send, E> ReplyContext<I, E> {
  /// Send a reply back to originating client.
  ///
  /// # Example
  /// ```
  /// use std::thread;
  /// use ump::channel;
  ///
  /// let (server, client) = channel::<String, String, ()>();
  /// let server_thread = thread::spawn(move || {
  ///   let (data, rctx) = server.wait().unwrap();
  ///   let reply = format!("Hello, {}!", data);
  ///   rctx.reply(reply).unwrap();
  /// });
  /// let msg = String::from("Client");
  /// let reply = client.req(msg).unwrap();
  /// assert_eq!(reply, "Hello, Client!");
  /// server_thread.join().unwrap();
  /// ```
  ///
  /// # Semantics
  /// This call is safe to make after the server context has been released.
  pub fn reply(mut self, data: I) -> Result<(), Error<E>> {
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
  ///
  /// let (server, client) = channel::<String, String, MyError>();
  /// let server_thread = thread::spawn(move || {
  ///   let (_, rctx) = server.wait().unwrap();
  ///   rctx.fail(MyError::SomeError("failed".to_string())).unwrap();
  /// });
  /// let msg = String::from("Client");
  /// let reply = client.send(String::from(msg));
  /// match reply {
  ///   Err(Error::App(MyError::SomeError(s))) => {
  ///     assert_eq!(s, "failed");
  ///   }
  ///   _ => {
  ///     panic!("Unexpected return value");
  ///   }







|







64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
  ///
  /// let (server, client) = channel::<String, String, MyError>();
  /// let server_thread = thread::spawn(move || {
  ///   let (_, rctx) = server.wait().unwrap();
  ///   rctx.fail(MyError::SomeError("failed".to_string())).unwrap();
  /// });
  /// let msg = String::from("Client");
  /// let reply = client.req(msg);
  /// match reply {
  ///   Err(Error::App(MyError::SomeError(s))) => {
  ///     assert_eq!(s, "failed");
  ///   }
  ///   _ => {
  ///     panic!("Unexpected return value");
  ///   }

Changes to tests/async_client.rs.

34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
  tokrt.block_on(async {
    let mut a: i32 = 0;
    let mut b: i32 = 0;

    for _ in 0..niterations {
      a += 2;
      b -= 3;
      let result = client.asend(Request::Add(a, b)).await.unwrap();
      if let Reply::Sum(sum) = result {
        assert_eq!(sum, a + b);
      } else {
        panic!("Didn't get sum");
      }
    }
    let result = client.asend(Request::Croak).await.unwrap();
    if let Reply::OkICroaked = result {
    } else {
      panic!("Didn't get a croak");
    }
  });

  server_thread.join().unwrap();







|






|







34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
  tokrt.block_on(async {
    let mut a: i32 = 0;
    let mut b: i32 = 0;

    for _ in 0..niterations {
      a += 2;
      b -= 3;
      let result = client.areq(Request::Add(a, b)).await.unwrap();
      if let Reply::Sum(sum) = result {
        assert_eq!(sum, a + b);
      } else {
        panic!("Didn't get sum");
      }
    }
    let result = client.areq(Request::Croak).await.unwrap();
    if let Reply::OkICroaked = result {
    } else {
      panic!("Didn't get a croak");
    }
  });

  server_thread.join().unwrap();

Changes to tests/fail.rs.

16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    // Wait for data to arrive from a client
    let (_, rctx) = server.wait().unwrap();

    rctx.fail(MyError::SomeError("failed".to_string())).unwrap();
  });

  let msg = String::from("Client");
  let reply = client.send(msg);
  match reply {
    Err(Error::App(MyError::SomeError(s))) => {
      assert_eq!(s, "failed");
    }
    _ => {
      panic!("Unexpected return value");
    }







|







16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    // Wait for data to arrive from a client
    let (_, rctx) = server.wait().unwrap();

    rctx.fail(MyError::SomeError("failed".to_string())).unwrap();
  });

  let msg = String::from("Client");
  let reply = client.req(msg);
  match reply {
    Err(Error::App(MyError::SomeError(s))) => {
      assert_eq!(s, "failed");
    }
    _ => {
      panic!("Unexpected return value");
    }
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
    let (_, rctx) = server.wait().unwrap();

    rctx.fail(MyError::SomeError("failed".to_string())).unwrap();
  });

  tokrt.block_on(async {
    let msg = String::from("Client");
    let reply = client.asend(msg).await;
    match reply {
      Err(Error::App(MyError::SomeError(s))) => {
        assert_eq!(s, "failed");
      }
      _ => {
        panic!("Unexpected return value");
      }







|







45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
    let (_, rctx) = server.wait().unwrap();

    rctx.fail(MyError::SomeError("failed".to_string())).unwrap();
  });

  tokrt.block_on(async {
    let msg = String::from("Client");
    let reply = client.areq(msg).await;
    match reply {
      Err(Error::App(MyError::SomeError(s))) => {
        assert_eq!(s, "failed");
      }
      _ => {
        panic!("Unexpected return value");
      }

Changes to tests/noreply.rs.

12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    let (_, rctx) = server.wait().unwrap();

    // Don't do this.
    drop(rctx);
  });

  let msg = String::from("Client");
  let reply = client.send(msg);
  match reply {
    Err(Error::NoReply) => {
      // This is the expected error
    }
    _ => {
      panic!("Unexpected return value");
    }







|







12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    let (_, rctx) = server.wait().unwrap();

    // Don't do this.
    drop(rctx);
  });

  let msg = String::from("Client");
  let reply = client.req(msg);
  match reply {
    Err(Error::NoReply) => {
      // This is the expected error
    }
    _ => {
      panic!("Unexpected return value");
    }
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

    // Don't do this.
    drop(rctx);
  });

  tokrt.block_on(async {
    let msg = String::from("Client");
    let reply = client.asend(msg).await;
    match reply {
      Err(Error::NoReply) => {
        // This is the expected error
      }
      _ => {
        panic!("Unexpected return value");
      }







|







42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

    // Don't do this.
    drop(rctx);
  });

  tokrt.block_on(async {
    let msg = String::from("Client");
    let reply = client.areq(msg).await;
    match reply {
      Err(Error::NoReply) => {
        // This is the expected error
      }
      _ => {
        panic!("Unexpected return value");
      }

Changes to tests/queue_cleanup.rs.

12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    // Should be doing something more robust ..
    let one_second = time::Duration::from_secs(1);
    thread::sleep(one_second);
    drop(server);
  });

  let msg = String::from("Client");
  let reply = client.send(msg);
  match reply {
    Err(Error::ServerDisappeared) => {
      // This is the expected error
    }
    _ => {
      panic!("Unexpected return value");
    }







|







12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    // Should be doing something more robust ..
    let one_second = time::Duration::from_secs(1);
    thread::sleep(one_second);
    drop(server);
  });

  let msg = String::from("Client");
  let reply = client.req(msg);
  match reply {
    Err(Error::ServerDisappeared) => {
      // This is the expected error
    }
    _ => {
      panic!("Unexpected return value");
    }
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
    let one_second = time::Duration::from_secs(1);
    thread::sleep(one_second);
    drop(server);
  });

  tokrt.block_on(async {
    let msg = String::from("Client");
    let reply = client.asend(msg).await;
    //let reply = client.send(msg);
    match reply {
      Err(Error::ServerDisappeared) => {
        // This is the expected error
      }
      _ => {
        panic!("Unexpected return value");
      }







|
|







41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
    let one_second = time::Duration::from_secs(1);
    thread::sleep(one_second);
    drop(server);
  });

  tokrt.block_on(async {
    let msg = String::from("Client");
    let reply = client.areq(msg).await;
    //let reply = client.req(msg);
    match reply {
      Err(Error::ServerDisappeared) => {
        // This is the expected error
      }
      _ => {
        panic!("Unexpected return value");
      }

Changes to tests/stress.rs.

34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

  let mut a: i32 = 0;
  let mut b: i32 = 0;

  for _ in 0..65535 {
    a += 2;
    b -= 3;
    let result = client.send(Ops::Add(a, b)).unwrap();
    assert_eq!(result, a + b);
  }
  let result = client.send(Ops::Die).unwrap();
  assert_eq!(result, 0);

  server_thread.join().unwrap();
}

#[test]
fn one_at_a_time_threaded_handler() {







|


|







34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

  let mut a: i32 = 0;
  let mut b: i32 = 0;

  for _ in 0..65535 {
    a += 2;
    b -= 3;
    let result = client.req(Ops::Add(a, b)).unwrap();
    assert_eq!(result, a + b);
  }
  let result = client.req(Ops::Die).unwrap();
  assert_eq!(result, 0);

  server_thread.join().unwrap();
}

#[test]
fn one_at_a_time_threaded_handler() {
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96

  let mut a: i32 = 0;
  let mut b: i32 = 0;

  for _ in 0..niterations {
    a += 2;
    b -= 3;
    let result = client.send(Ops::Sub(a, b)).unwrap();
    assert_eq!(result, a - b);
  }
  let result = client.send(Ops::Die).unwrap();
  assert_eq!(result, 0);

  server_thread.join().unwrap();
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :







|


|






80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96

  let mut a: i32 = 0;
  let mut b: i32 = 0;

  for _ in 0..niterations {
    a += 2;
    b -= 3;
    let result = client.req(Ops::Sub(a, b)).unwrap();
    assert_eq!(result, a - b);
  }
  let result = client.req(Ops::Die).unwrap();
  assert_eq!(result, 0);

  server_thread.join().unwrap();
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to www/changelog.md.

1
2
3
4
5
6
7
8
9
10














11
12
13
14
15
16
17
# Change Log

## [Unreleased]

### Added

### Changed

### Removed
















## [0.10.1] - 2023-07-27

### Changed

- Runtime dependencies:
  - Updated `sigq` to `0.13.3`.










>
>
>
>
>
>
>
>
>
>
>
>
>
>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# Change Log

## [Unreleased]

### Added

### Changed

### Removed


## [0.10.2] - 2023-07-28

### Added

- Add `send()`/`asend()` wrappers around the new `req()`/`areq()` methods with
  a deprecation notice.
- Add a `dev-docs` feature to allow internal documentation notes to be
  included in generated documentation.

### Changed

- Rename `send()`/`asend()` to `req()/`areq()`.


## [0.10.1] - 2023-07-27

### Changed

- Runtime dependencies:
  - Updated `sigq` to `0.13.3`.