ump

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From ump-0.11.0 To ump-0.12.0

2023-08-31
14:34
Document ReplyContext. check-in: 61295fc1c3 user: jan tags: trunk
2023-08-14
23:03
Release maintenance. check-in: b08bb813bf user: jan tags: trunk, ump-0.12.0
22:55
Update swctx to 0.2.1. check-in: 3eaac7ef46 user: jan tags: trunk
2023-07-29
02:10
Make a note about project status. check-in: eb09b3724b user: jan tags: trunk
2023-07-28
22:19
Release maintenance. check-in: bfeac614fd user: jan tags: trunk, ump-0.11.0
22:11
Return the correct Error type from ReplyContext methods. Documentation updates. check-in: 8ef3a429d7 user: jan tags: trunk

Changes to .efiles.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1
2
3
4
5
6
7
8
9



10
11
12









-
-
-



Cargo.toml
README.md
www/index.md
www/changelog.md
src/err.rs
src/lib.rs
src/server.rs
src/client.rs
src/rctx.rs
src/rctx/err.rs
src/rctx/inner.rs
src/rctx/public.rs
tests/*.rs
examples/*.rs
benches/*.rs

Changes to Cargo.toml.

1
2
3

4
5

6
7
8
9
10
11

12
13
14
15
16
17
18
19
20


21
22
23
24
25
26
27
28

29
30
31
32

33
34
35
36
37



1
2

3


4
5
6
7
8
9

10
11
12
13
14
15
16
17


18
19
20
21
22
23
24
25
26
27
28
29
30
31

32
33
34
35
36
37
38
39
40


-
+
-
-
+





-
+







-
-
+
+








+



-
+





+
+
+
[package]
name = "ump"
version = "0.11.0"
version = "0.12.0"
authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"]
edition = "2018"
edition = "2021"
license = "0BSD"
categories = [ "concurrency", "asynchronous" ]
keywords = [ "channel", "threads", "sync", "message-passing" ]
repository = "https://repos.qrnch.tech/pub/ump"
description = "Micro message passing library for threads/tasks communication."
rust-version = "1.39"
rust-version = "1.56"

# Can't exclude "benches", because the [[bench]] section will fail.
exclude = [
  ".efiles",
  ".fossil-settings",
  ".fslckout",
  "examples",
  "rustfmt.toml",
  "www"
  "www",
  "rustfmt.toml"
]

[features]
dev-docs = []

[dependencies]
parking_lot = { version = "0.12.1" }
sigq = { version = "0.13.3" }
swctx = { version = "0.2.1" }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["async_tokio"] }
tokio = { version = "1.29.1", features = ["full"] }
tokio = { version = "1.31.0", features = ["rt-multi-thread"] }

[[bench]]
name = "add_server"
harness = false

[package.metadata.docs.rs]
rustdoc-args = ["--generate-link-to-definition"]

Changes to src/client.rs.

1

2
3
4
5
6
7
8

1
2
3
4
5
6
7
8
-
+







use crate::{err::Error, rctx::InnerReplyContext, server::ServerQueueNode};
use crate::{err::Error, server::ServerQueueNode};

/// Representation of a clonable client object.
///
/// Each instantiation of a `Client` object is itself an isolated client with
/// regards to the server context.  By cloning a client a new independent
/// client is created.  ("Independent" here meaning that it is still tied to
/// the same server object, but the new client can be passed to a separate
49
50
51
52
53
54
55
56

57
58
59
60
61
62

63
64
65
66
67
68


69
70
71
72
73
74
75
76
77
78

79
80
81
82
83
84

85
86
87
88
89

90
91
92
93
94
95
96
97
49
50
51
52
53
54
55

56
57
58
59
60
61

62
63
64
65



66
67
68
69
70
71
72
73
74
75
76

77
78
79
80
81
82

83
84
85
86


87

88
89
90
91
92
93
94







-
+





-
+



-
-
-
+
+









-
+





-
+



-
-
+
-







    // Create a per-call reply context.
    // This context could be created when the Client object is being created
    // and stored in the context, and thus be reused for reach client call.
    // One side-effect is that some of the state semantics becomes more
    // complicated.
    // The central repo has such an implementation checked in, but it seems to
    // have some more corner cases that aren't properly handled.
    let rctx = InnerReplyContext::new();
    let (sctx, wctx) = swctx::mkpair();

    self
      .qpusher
      .push(ServerQueueNode {
        msg: out,
        reply: rctx.clone()
        reply: sctx
      })
      .map_err(|_| Error::ServerDisappeared)?;

    let reply = rctx.get()?;

    Ok(reply)
    // Wait for a reply to arrive
    Ok(wctx.wait()?)
  }

  #[deprecated(since = "0.10.2", note = "Use req() instead.")]
  pub fn send(&self, out: S) -> Result<R, Error<E>> {
    self.req(out)
  }

  /// Same as [`Client::req()`] but for use in `async` contexts.
  pub async fn areq(&self, out: S) -> Result<R, Error<E>> {
    let rctx = InnerReplyContext::new();
    let (sctx, wctx) = swctx::mkpair();

    self
      .qpusher
      .push(ServerQueueNode {
        msg: out,
        reply: rctx.clone()
        reply: sctx
      })
      .map_err(|_| Error::ServerDisappeared)?;

    let result = rctx.aget().await?;

    Ok(wctx.wait_async().await?)
    Ok(result)
  }

  #[deprecated(since = "0.10.2", note = "Use areq() instead.")]
  pub async fn asend(&self, out: S) -> Result<R, Error<E>> {
    self.areq(out).await
  }
}

Changes to src/err.rs.

1


2
3
4
5
6
7
8
1
2
3
4
5
6
7
8
9
10

+
+







use std::fmt;

use crate::rctx::RCtxState;

/// Module-specific error codes.
#[derive(Debug)]
pub enum Error<E> {
  /// The server object has shut down.
  ///
  /// Happens when clients:
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75












76
50
51
52
53
54
55
56










57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80







-
-
-
-
-
-
-
-
-
-











+
+
+
+
+
+
+
+
+
+
+
+

      _ => panic!("Not an Error::App")
    }
  }
}

impl<E: fmt::Debug> std::error::Error for Error<E> {}

impl<E> From<crate::rctx::Error<E>> for Error<E> {
  fn from(err: crate::rctx::Error<E>) -> Self {
    match err {
      crate::rctx::Error::Aborted => Error::ServerDisappeared,
      crate::rctx::Error::NoReply => Error::NoReply,
      crate::rctx::Error::App(e) => Error::App(e)
    }
  }
}

impl<E: fmt::Debug> fmt::Display for Error<E> {
  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
    match self {
      Error::ServerDisappeared => write!(f, "Server disappeared"),
      Error::ClientsDisappeared => write!(f, "Clients disappeared"),
      Error::NoReply => write!(f, "Server didn't reply"),
      Error::App(err) => write!(f, "Application error; {:?}", err)
    }
  }
}

impl<E> From<swctx::Error<RCtxState, E>> for Error<E> {
  fn from(err: swctx::Error<RCtxState, E>) -> Self {
    match err {
      swctx::Error::Aborted(state) => match state {
        RCtxState::Queued => Error::ServerDisappeared,
        RCtxState::Active => Error::NoReply
      },
      swctx::Error::App(e) => Error::App(e)
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/rctx.rs.

1
2
3
4
5
6
7
8
9
10
11










12





13
14
15
16





































































17
18
1
2
3
4
5
6
7




8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23




24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94







-
-
-
-
+
+
+
+
+
+
+
+
+
+

+
+
+
+
+
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+


//! Allow a thread/task, crossing sync/async boundaries in either direction, to
//! deliver an expected piece of data to another thread/task, with
//! notification.
//!
//! These are simple channels used to deliver data from one endpoint to
//! another, where the receiver will block until data has been delivered.

mod err;
mod inner;

pub mod public;
use crate::err::Error;

#[derive(Clone, Default)]
pub(crate) enum RCtxState {
  #[default]
  Queued,
  Active
}

pub struct ReplyContext<T, E>(swctx::SetCtx<T, RCtxState, E>);

impl<T, E> ReplyContext<T, E> {
  /// Send a reply back to originating client.
  ///
  /// # Example
  /// ```
pub(crate) use err::Error;
pub(crate) use inner::InnerReplyContext;

pub use public::ReplyContext;
  /// use std::thread;
  /// use ump::channel;
  ///
  /// let (server, client) = channel::<String, String, ()>();
  /// let server_thread = thread::spawn(move || {
  ///   let (data, rctx) = server.wait().unwrap();
  ///   let reply = format!("Hello, {}!", data);
  ///   rctx.reply(reply).unwrap();
  /// });
  /// let msg = String::from("Client");
  /// let reply = client.req(msg).unwrap();
  /// assert_eq!(reply, "Hello, Client!");
  /// server_thread.join().unwrap();
  /// ```
  ///
  /// # Semantics
  /// This call is safe to make after the server context has been released.
  pub fn reply(self, data: T) -> Result<(), Error<E>> {
    self.0.set(data);
    Ok(())
  }

  /// Return an error to originating client.
  /// This will cause the calling client to return an error.  The error passed
  /// in the `err` parameter will be wrapped in a `Error::App(err)`.
  ///
  /// # Example
  ///
  /// ```
  /// use std::thread;
  /// use ump::{channel, Error};
  ///
  /// #[derive(Debug, PartialEq)]
  /// enum MyError {
  ///   SomeError(String)
  /// }
  ///
  /// let (server, client) = channel::<String, String, MyError>();
  /// let server_thread = thread::spawn(move || {
  ///   let (_, rctx) = server.wait().unwrap();
  ///   rctx.fail(MyError::SomeError("failed".to_string())).unwrap();
  /// });
  /// let msg = String::from("Client");
  /// let reply = client.req(msg);
  /// match reply {
  ///   Err(Error::App(MyError::SomeError(s))) => {
  ///     assert_eq!(s, "failed");
  ///   }
  ///   _ => {
  ///     panic!("Unexpected return value");
  ///   }
  /// }
  /// server_thread.join().unwrap();
  /// ```
  ///
  /// # Semantics
  /// This call is safe to make after the server context has been released.
  pub fn fail(self, err: E) -> Result<(), Error<E>> {
    self.0.fail(err);
    Ok(())
  }
}

impl<T, E> From<swctx::SetCtx<T, RCtxState, E>> for ReplyContext<T, E> {
  fn from(sctx: swctx::SetCtx<T, RCtxState, E>) -> Self {
    sctx.set_state(RCtxState::Active);
    ReplyContext(sctx)
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Deleted src/rctx/err.rs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30






























-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
use std::fmt;

/// Module-specific error codes.
#[derive(Debug)]
pub enum Error<E> {
  /// The reply was aborted.
  Aborted,

  /// The public [`ReplyContext`] object is required to reply with a value.
  /// If it does not the endpoint waiting to receive a value will abort and
  /// return this error.
  NoReply,

  /// An application-specific error occurred.
  App(E)
}

impl<E: fmt::Debug> std::error::Error for Error<E> {}

impl<E: fmt::Debug> fmt::Display for Error<E> {
  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
    match self {
      Error::Aborted => write!(f, "Aborted call"),
      Error::NoReply => write!(f, "Application failed to reply"),
      Error::App(err) => write!(f, "Application error; {:?}", err)
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Deleted src/rctx/inner.rs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229





































































































































































































































-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
use std::{
  future::Future,
  pin::Pin,
  sync::Arc,
  task::{Context, Poll, Waker}
};

use parking_lot::{Condvar, Mutex};

use crate::rctx::err::Error;

pub(crate) enum State<I, E> {
  /// (Still) in queue, waiting to be picked up by the server.
  Queued,

  /// Was picked up, but (still) waiting for a reply to arrive.
  Waiting,

  /// Have a reply, but it wasn't delivered yet.
  Item(I),

  /// The application returned an error.
  AppErr(E),

  /// Reply is being returned to caller.
  Finalized,

  /// The server never received the message; it was dropped while in the
  /// queue.  Most likely this means that the message was still in the queue
  /// when the server was dropped.
  Aborted,

  /// The message was received by the server, but its reply context was
  /// released before sending back a reply.
  NoReply
}

pub struct InnerReplyContext<I, E> {
  pub(crate) signal: Arc<Condvar>,
  pub(crate) data: Arc<Mutex<State<I, E>>>,
  pub(crate) taskwaker: Arc<Mutex<Option<Waker>>>
}

impl<I: 'static + Send, E> InnerReplyContext<I, E> {
  /// Create a new reply context in "Queued" state.
  pub(crate) fn new() -> Self {
    InnerReplyContext {
      signal: Arc::new(Condvar::new()),
      data: Arc::new(Mutex::new(State::Queued)),
      taskwaker: Arc::new(Mutex::new(None))
    }
  }

  /// Store a reply and signal the originator that a reply has arrived.
  pub fn put(&self, item: I) {
    let mut mg = self.data.lock();
    *mg = State::Item(item);
    drop(mg);

    self.signal_waiters();
  }

  /// Store an error and signal the originator that a result has arrived.
  pub fn fail(&self, err: E) {
    let mut mg = self.data.lock();
    *mg = State::AppErr(err);
    drop(mg);

    self.signal_waiters();
  }

  pub(crate) fn signal_waiters(&self) {
    let mut g = self.taskwaker.lock();
    if let Some(waker) = (*g).take() {
      waker.wake();
    }

    self.signal.notify_one();
  }

  /// Retreive reply.  If a reply has not arrived yet then enter a loop that
  /// waits for a reply to arrive.
  pub fn get(&self) -> Result<I, Error<E>> {
    let mut mg = self.data.lock();

    let ret = loop {
      match &*mg {
        State::Queued | State::Waiting => {
          // Still waiting for server to report back with data
          self.signal.wait(&mut mg);
          continue;
        }
        State::Item(_msg) => {
          // Set Finalized state and return item
          if let State::Item(msg) =
            std::mem::replace(&mut *mg, State::Finalized)
          {
            break Ok(msg);
          } else {
            // We're *really* in trouble if this happens ..
            panic!("Unexpected state; not State::Item()");
          }
        }
        State::AppErr(_err) => {
          // Set Finalized state and return error
          if let State::AppErr(err) =
            std::mem::replace(&mut *mg, State::Finalized)
          {
            break Err(Error::App(err));
          } else {
            // We're *really* in trouble if this happens ..
            panic!("Unexpected state; not State::AppErr()");
          }
        }
        State::Finalized => {
          // We're *really* in trouble if this happens at this point ..
          panic!("Unexpected state State::Finalized");
        }
        State::Aborted => {
          // Dropped while in queue
          return Err(Error::Aborted);
        }
        State::NoReply => {
          // Dropped after reply context was picked up, but before replying
          return Err(Error::NoReply);
        }
      }
    };
    drop(mg);

    ret
  }

  pub fn aget(&self) -> WaitReplyFuture<I, E> {
    WaitReplyFuture::new(self)
  }
}

impl<I, E> Clone for InnerReplyContext<I, E> {
  fn clone(&self) -> Self {
    InnerReplyContext {
      signal: Arc::clone(&self.signal),
      data: Arc::clone(&self.data),
      taskwaker: Arc::clone(&self.taskwaker)
    }
  }
}

impl<I, E> Drop for InnerReplyContext<I, E> {
  /// If the reply context never left the server queue before being destroyed
  /// it means that the server has died.  Signal this to the original caller
  /// waiting for a reply.
  fn drop(&mut self) {
    let mut do_signal: bool = false;
    let mut mg = self.data.lock();

    if let State::Queued = *mg {
      *mg = State::Aborted;
      do_signal = true;
    }
    drop(mg);
    if do_signal {
      let mut g = self.taskwaker.lock();
      if let Some(waker) = (*g).take() {
        waker.wake();
      }
      self.signal.notify_one();
    }
  }
}


pub struct WaitReplyFuture<I, E> {
  data: Arc<Mutex<State<I, E>>>,
  waker: Arc<Mutex<Option<Waker>>>
}

impl<I, E> WaitReplyFuture<I, E> {
  fn new(irctx: &InnerReplyContext<I, E>) -> Self {
    WaitReplyFuture {
      data: Arc::clone(&irctx.data),
      waker: Arc::clone(&irctx.taskwaker)
    }
  }
}

impl<I: 'static + Send, E: 'static + Send> Future for WaitReplyFuture<I, E> {
  type Output = Result<I, Error<E>>;
  fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
    let mut state = self.data.lock();
    match &*state {
      State::Queued | State::Waiting => {
        let waker = ctx.waker().clone();
        let mut g = self.waker.lock();
        *g = Some(waker);
        drop(g);
        drop(state);
        Poll::Pending
      }
      State::Item(_msg) => {
        if let State::Item(msg) =
          std::mem::replace(&mut *state, State::Finalized)
        {
          Poll::Ready(Ok(msg))
        } else {
          // We're *really* in trouble if this happens ..
          panic!("Unexpected state; not State::Item()");
        }
      }
      State::AppErr(_err) => {
        if let State::AppErr(err) =
          std::mem::replace(&mut *state, State::Finalized)
        {
          Poll::Ready(Err(Error::App(err)))
        } else {
          // We're *really* in trouble if this happens ..
          panic!("Unexpected state; not State::App()");
        }
      }
      State::Finalized => {
        panic!("Unexpected state");
      }
      State::Aborted => Poll::Ready(Err(Error::Aborted)),
      State::NoReply => Poll::Ready(Err(Error::NoReply))
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Deleted src/rctx/public.rs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149





















































































































































-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
use crate::{
  rctx::{inner::State, InnerReplyContext},
  Error
};

/// Context used to transmit a reply back to the originating requester.
#[cfg_attr(
  feature = "dev-docs",
  doc = r#"
# Internals
Public-facing sender part of the `ReplyContext` object.

This, as opposed to `InnerReplyContext`, is safe to pass to applications that
are meant to only be able to put a value through the `ReplyContext` channel,
but not extract the value from it.
"#
)]
pub struct ReplyContext<I, E> {
  inner: InnerReplyContext<I, E>,
  did_handover: bool
}

impl<I: 'static + Send, E> ReplyContext<I, E> {
  /// Send a reply back to originating client.
  ///
  /// # Example
  /// ```
  /// use std::thread;
  /// use ump::channel;
  ///
  /// let (server, client) = channel::<String, String, ()>();
  /// let server_thread = thread::spawn(move || {
  ///   let (data, rctx) = server.wait().unwrap();
  ///   let reply = format!("Hello, {}!", data);
  ///   rctx.reply(reply).unwrap();
  /// });
  /// let msg = String::from("Client");
  /// let reply = client.req(msg).unwrap();
  /// assert_eq!(reply, "Hello, Client!");
  /// server_thread.join().unwrap();
  /// ```
  ///
  /// # Semantics
  /// This call is safe to make after the server context has been released.
  pub fn reply(mut self, data: I) -> Result<(), Error<E>> {
    self.inner.put(data);

    self.did_handover = true;

    Ok(())
  }

  /// Return an error to originating client.
  /// This will cause the calling client to return an error.  The error passed
  /// in the `err` parameter will be wrapped in a `Error::App(err)`.
  ///
  /// # Example
  ///
  /// ```
  /// use std::thread;
  /// use ump::{channel, Error};
  ///
  /// #[derive(Debug, PartialEq)]
  /// enum MyError {
  ///   SomeError(String)
  /// }
  ///
  /// let (server, client) = channel::<String, String, MyError>();
  /// let server_thread = thread::spawn(move || {
  ///   let (_, rctx) = server.wait().unwrap();
  ///   rctx.fail(MyError::SomeError("failed".to_string())).unwrap();
  /// });
  /// let msg = String::from("Client");
  /// let reply = client.req(msg);
  /// match reply {
  ///   Err(Error::App(MyError::SomeError(s))) => {
  ///     assert_eq!(s, "failed");
  ///   }
  ///   _ => {
  ///     panic!("Unexpected return value");
  ///   }
  /// }
  /// server_thread.join().unwrap();
  /// ```
  ///
  /// # Semantics
  /// This call is safe to make after the server context has been released.
  pub fn fail(mut self, err: E) -> Result<(), Error<E>> {
    self.inner.fail(err);

    self.did_handover = true;

    Ok(())
  }
}

impl<I, E> Drop for ReplyContext<I, E> {
  /// If the reply context is dropped while still waiting for a reply then
  /// report back to the caller that it should expect no reply.
  fn drop(&mut self) {
    if !self.did_handover {
      let mut do_signal: bool = false;
      let mut mg = self.inner.data.lock();

      if let State::Waiting = *mg {
        *mg = State::NoReply;
        do_signal = true;
      }
      drop(mg);
      if do_signal {
        let mut g = self.inner.taskwaker.lock();
        if let Some(waker) = (*g).take() {
          waker.wake();
        }

        self.inner.signal.notify_one();
      }
    }
  }
}

impl<I, E> From<InnerReplyContext<I, E>> for ReplyContext<I, E> {
  /// Transform an internal reply context into a public one and change the
  /// state from Queued to Waiting to signal that the node has left the
  /// queue.
  fn from(inner: InnerReplyContext<I, E>) -> Self {
    // Switch state from "Queued" to "Waiting", to mark that the reply context
    // has been "picked up".
    let mut mg = inner.data.lock();
    match *mg {
      State::Queued => {
        *mg = State::Waiting;
        drop(mg);
      }
      _ => {
        // Should never happen
        drop(mg);
        panic!("Unexpected node state.");
      }
    }

    ReplyContext {
      inner: inner.clone(),
      did_handover: false
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/server.rs.

1

2
3



4
5
6
7
8
9
10

11
12
13
14
15
16
17

1


2
3
4
5
6
7
8
9
10

11
12
13
14
15
16
17
18
-
+
-
-
+
+
+






-
+







use crate::rctx::{InnerReplyContext, ReplyContext};
use crate::{

use crate::err::Error;
  err::Error,
  rctx::{RCtxState, ReplyContext}
};

pub(crate) struct ServerQueueNode<S, R, E> {
  /// Raw message being sent from the client to the server.
  pub(crate) msg: S,

  /// Keep track of data needed to share reply data.
  pub(crate) reply: InnerReplyContext<R, E>
  pub(crate) reply: swctx::SetCtx<R, RCtxState, E>
}

/// Representation of a server object.
///
/// Each instantiation of a [`Server`] object represents an end-point which
/// will be used to receive messages from connected [`Client`](crate::Client)
/// objects.

Changes to www/changelog.md.

1
2
3
4
5
6
7







8
9
10
11
12
13
14




15
16
17
18
19
20
21
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19


20
21
22
23
24
25
26
27
28
29
30







+
+
+
+
+
+
+





-
-
+
+
+
+







# Change Log

## [Unreleased]

### Added

### Changed

### Removed


## [0.12.0] - 2023-08-15

### Changed

- Include tests when publishing crate.
- Bugfix: Use `err::Error` rather than `rctx::err::Error` in rctx::public,
  giving `ReplyContext::reply()` and `ReplyContext::fail()` the correct return
  types.

### Removed
- Use the `swctx` crate for sending back the reply rather than use a custom
  in-tree implementation.
- Update `edition` to `2021` and `rust-version` to `1.56`.
- Add `--generate-link-to-definition` to `rustdoc-args` in `Cargo.toml`


## [0.11.0] - 2023-07-29

### Changed

- Include tests when publishing crate.
31
32
33
34
35
36
37
38

39
40
41
42
43
44
45
40
41
42
43
44
45
46

47
48
49
50
51
52
53
54







-
+







- Add `send()`/`asend()` wrappers around the new `req()`/`areq()` methods with
  a deprecation notice.
- Add a `dev-docs` feature to allow internal documentation notes to be
  included in generated documentation.

### Changed

- Rename `send()`/`asend()` to `req()/`areq()`.
- Rename `send()`/`asend()` to `req()`/`areq()`.


## [0.10.1] - 2023-07-27

### Changed

- Runtime dependencies:

Changes to www/index.md.

1
2
3
4
5
6
7
8
9
10
11
12
13






1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19













+
+
+
+
+
+
# Micro-Message Passing Library

The _ump_ crate is a simple client/server message passing library for
intra-process communication.  Its primary purpose is to allow cross
async/non-async communication (for both the server and client endpoints).


## Change log

The details of changes can always be found in the timeline, but for a
high-level view of changes between released versions there's a manually
maintained [Change Log](./changelog.md).


## Project Status

_ump_ is in _maintenance mode_; it is feature-complete, but will receive
bugfixes and improvements to implementation/documentation.