Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From ump-0.12.1 To ump-0.13.0
2024-09-12
| ||
20:23 | Move clippy configuration from bacon.toml to Cargo.toml. Leaf check-in: ccc864a567 user: jan tags: trunk | |
2024-09-10
| ||
00:14 | Change log updates. check-in: f43f27bdc2 user: jan tags: trunk, ump-0.13.0 | |
00:05 | Add test cases for wait context disappearing. check-in: 1857251062 user: jan tags: trunk | |
2024-01-12
| ||
14:31 | Remove dev-docs feature. check-in: 2ba8f53f39 user: jan tags: trunk | |
2023-10-02
| ||
13:32 | Release maintenance. check-in: 45f4a89db5 user: jan tags: trunk, ump-0.12.1 | |
2023-09-17
| ||
10:16 | Add changelog separators. check-in: e1451ac681 user: jan tags: trunk | |
Changes to Cargo.toml.
1 2 | [package] name = "ump" | | > > > | | < | | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | [package] name = "ump" version = "0.13.0" edition = "2021" license = "0BSD" # https://crates.io/category_slugs categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/ump" description = "Micro message passing library for threads/tasks communication." rust-version = "1.56" exclude = [ ".fossil-settings", ".efiles", ".fslckout", "examples", "www", "bacon.toml", "rustfmt.toml" ] # https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section [badges] maintenance = { status = "passively-maintained" } [dependencies] sigq = { version = "0.13.5" } swctx = { version = "0.3.0" } [dev-dependencies] criterion = { version = "0.5.1", features = ["async_tokio"] } tokio = { version = "1.40.0", features = ["rt-multi-thread"] } [[bench]] name = "add_server" harness = false [package.metadata.docs.rs] rustdoc-args = ["--generate-link-to-definition"] |
︙ | ︙ |
Changes to README.md.
1 2 3 4 5 6 | # Micro-Message Passing Library The _ump_ crate is a simple client/server message passing library for intra-process communication. Its primary purpose is to allow cross async/non-async communication (for both the server and client endpoints). | > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | # Micro-Message Passing Library The _ump_ crate is a simple client/server message passing library for intra-process communication. Its primary purpose is to allow cross async/non-async communication (for both the server and client endpoints). [![Crates.io][crates-badge]][crates-url] [![0BSD licensed][0bsd-badge]][0bsd-url] [crates-badge]: https://img.shields.io/crates/v/ump.svg [crates-url]: https://crates.io/crates/ump [0bsd-badge]: https://img.shields.io/badge/license-0BSD-blue.svg [0bsd-url]: https://opensource.org/license/0bsd |
Added bacon.toml.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | # This is a configuration file for the bacon tool # # Bacon repository: https://github.com/Canop/bacon # Complete help on configuration: https://dystroy.org/bacon/config/ # You can also check bacon's own bacon.toml file # as an example: https://github.com/Canop/bacon/blob/main/bacon.toml # For information about clippy lints, see: # https://github.com/rust-lang/rust-clippy/blob/master/README.md #default_job = "check" default_job = "clippy-all-pedantic" [jobs.check] command = ["cargo", "check", "--color", "always"] need_stdout = false [jobs.check-all] command = ["cargo", "check", "--all-targets", "--color", "always"] need_stdout = false # Run clippy on the default target [jobs.clippy] command = [ "cargo", "clippy", "--color", "always", ] need_stdout = false # Run clippy on all targets # To disable some lints, you may change the job this way: # [jobs.clippy-all] # command = [ # "cargo", "clippy", # "--all-targets", # "--color", "always", # "--", # "-A", "clippy::bool_to_int_with_if", # "-A", "clippy::collapsible_if", # "-A", "clippy::derive_partial_eq_without_eq", # ] # need_stdout = false [jobs.clippy-all] command = [ "cargo", "clippy", "--all-targets", "--color", "always", ] need_stdout = false [jobs.clippy-pedantic] command = [ "cargo", "clippy", "--color", "always", "--", "-Wclippy::all", "-Wclippy::pedantic", "-Wclippy::nursery", "-Wclippy::cargo" ] need_stdout = false [jobs.clippy-all-pedantic] command = [ "cargo", "clippy", "--all-targets", "--color", "always", "--", "-Wclippy::all", "-Wclippy::pedantic", "-Wclippy::nursery", "-Wclippy::cargo" ] need_stdout = false # This job lets you run # - all tests: bacon test # - a specific test: bacon test -- config::test_default_files # - the tests of a package: bacon test -- -- -p config [jobs.test] command = [ "cargo", "test", "--color", "always", "--", "--color", "always", # see https://github.com/Canop/bacon/issues/124 ] need_stdout = true [jobs.doc] command = ["cargo", "doc", "--color", "always", "--no-deps"] need_stdout = false # If the doc compiles, then it opens in your browser and bacon switches # to the previous job [jobs.doc-open] command = ["cargo", "doc", "--color", "always", "--no-deps", "--open"] need_stdout = false on_success = "back" # so that we don't open the browser at each change # You can run your application and have the result displayed in bacon, # *if* it makes sense for this crate. # Don't forget the `--color always` part or the errors won't be # properly parsed. # If your program never stops (eg a server), you may set `background` # to false to have the cargo run output immediately displayed instead # of waiting for program's end. [jobs.run] command = [ "cargo", "run", "--color", "always", # put launch parameters for your program behind a `--` separator ] need_stdout = true allow_warnings = true background = true # This parameterized job runs the example of your choice, as soon # as the code compiles. # Call it as # bacon ex -- my-example [jobs.ex] command = ["cargo", "run", "--color", "always", "--example"] need_stdout = true allow_warnings = true # You may define here keybindings that would be specific to # a project, for example a shortcut to launch a specific job. # Shortcuts to internal functions (scrolling, toggling, etc.) # should go in your personal global prefs.toml file instead. [keybindings] # alt-m = "job:my-job" c = "job:clippy-all" # comment this to have 'c' run clippy on only the default target |
Changes to benches/add_server.rs.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | use std::thread; use criterion::{criterion_group, criterion_main, Criterion}; use ump::channel; enum Ops { Die, Add(i32, i32), AddThreaded(i32, i32) } pub fn criterion_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("req operation"); let (server, client) = channel::<Ops, i32, ()>(); let server_thread = thread::spawn(move || { let mut croak = false; | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | use std::thread; use criterion::{criterion_group, criterion_main, Criterion}; use ump::channel; enum Ops { Die, Add(i32, i32), AddThreaded(i32, i32) } #[allow(clippy::missing_panics_doc)] pub fn criterion_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("req operation"); let (server, client) = channel::<Ops, i32, ()>(); let server_thread = thread::spawn(move || { let mut croak = false; |
︙ | ︙ | |||
39 40 41 42 43 44 45 | group.bench_function("add", |b| { b.iter(|| { p += 2; q -= 3; let result = client.req(Ops::Add(p, q)).unwrap(); assert_eq!(result, q + p); | | | | | | 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | group.bench_function("add", |b| { b.iter(|| { p += 2; q -= 3; let result = client.req(Ops::Add(p, q)).unwrap(); assert_eq!(result, q + p); }); }); p = 0; q = 0; group.bench_function("add (threaded)", |b| { b.iter(|| { p += 2; q -= 3; let result = client.req(Ops::AddThreaded(p, q)).unwrap(); assert_eq!(result, q + p); }); }); let rt = tokio::runtime::Runtime::new().unwrap(); group.bench_function("add (async)", |b| { b.to_async(&rt).iter(|| async { let p = 1; let q = 2; let result = client.areq(Ops::Add(p, q)).await.unwrap(); assert_eq!(result, q + p); }); }); let rt = tokio::runtime::Runtime::new().unwrap(); group.bench_function("add (async, threaded)", |b| { b.to_async(&rt).iter(|| async { let p = 1; let q = 2; let result = client.areq(Ops::AddThreaded(p, q)).await.unwrap(); assert_eq!(result, q + p); }); }); let result = client.req(Ops::Die).unwrap(); assert_eq!(result, 0); server_thread.join().unwrap(); |
︙ | ︙ |
Changes to src/client.rs.
|
| | | 1 2 3 4 5 6 7 8 | use crate::{err::Error, server::QueueNode as ServerQueueNode}; use super::rctx::RCtxState; /// Representation of a clonable client object that can issue requests to /// [`Server`](super::Server) objects. /// /// Each instantiation of a `Client` object is itself an isolated client with |
︙ | ︙ | |||
20 21 22 23 24 25 26 27 28 29 30 31 | { /// Send a message to the server, wait for a reply, and return the reply. /// /// A complete round-trip (the message is delivered to the server, and the /// server sends a reply) must complete before this function returns /// success. /// /// This method is _currently_ reentrant: It is safe to use share a single /// `Client` among multiple threads. _This may change in the future_; it's /// recommended to not rely on this. The recommended way to send messages to /// a server from multiple threads is to clone the `Client` and assign one /// separate `Client` to each thread. | > > > > > > > > > > > > > > > < < < < < < < < < < < < < < | 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | { /// Send a message to the server, wait for a reply, and return the reply. /// /// A complete round-trip (the message is delivered to the server, and the /// server sends a reply) must complete before this function returns /// success. /// /// # Return /// On success the function will return `Ok(msg)`. /// /// # Errors /// If the linked server object has been released, or is released while the /// message is in the server's queue, [`Error::ServerDisappeared`] will be /// returned. /// /// If the server never replied to the message and the reply context was /// dropped [`Error::NoReply`] will be returned. /// /// If an application specific error occurs it will be returned as a /// [`Error::App`]. /// /// # Implementation details /// This method is _currently_ reentrant: It is safe to use share a single /// `Client` among multiple threads. _This may change in the future_; it's /// recommended to not rely on this. The recommended way to send messages to /// a server from multiple threads is to clone the `Client` and assign one /// separate `Client` to each thread. pub fn req(&self, out: S) -> Result<R, Error<E>> { // Create a per-call reply context. // This context could be created when the Client object is being created // and stored in the context, and thus be reused for reach client call. // One side-effect is that some of the state semantics becomes more // complicated. // The central repo has such an implementation checked in, but it seems to |
︙ | ︙ | |||
110 111 112 113 114 115 116 | /// /// let reply = wrctx.wait().unwrap(); /// println!("Client received reply '{}'", reply); /// println!("Client done"); /// /// server_thread.join().unwrap(); /// ``` | > > > > > | > > > > > | > > > > | > > > > | | | > > > > > > > > > > | > | > > > > | | | | 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 | /// /// let reply = wrctx.wait().unwrap(); /// println!("Client received reply '{}'", reply); /// println!("Client done"); /// /// server_thread.join().unwrap(); /// ``` /// /// # Errors /// If the linked server object has been released, or is released while the /// message is in the server's queue, [`Error::ServerDisappeared`] will be /// returned. pub fn req_async(&self, out: S) -> Result<WaitReply<R, E>, Error<E>> where S: Send { let (sctx, wctx) = swctx::mkpair(); self .0 .push(ServerQueueNode { msg: out, reply: sctx }) .map_err(|_| Error::ServerDisappeared)?; Ok(WaitReply(wctx)) } #[allow(clippy::missing_errors_doc)] #[deprecated(since = "0.10.2", note = "Use req() instead.")] pub fn send(&self, out: S) -> Result<R, Error<E>> { self.req(out) } /// Same as [`Client::req()`] but for use in `async` contexts. #[allow(clippy::missing_errors_doc)] pub async fn areq(&self, out: S) -> Result<R, Error<E>> where S: Send { let (sctx, wctx) = swctx::mkpair(); self .0 .push(ServerQueueNode { msg: out, reply: sctx }) .map_err(|_| Error::ServerDisappeared)?; Ok(wctx.wait_async().await?) } #[deprecated(since = "0.10.2", note = "Use areq() instead.")] #[allow(clippy::missing_errors_doc)] pub async fn asend(&self, out: S) -> Result<R, Error<E>> where S: Send { self.areq(out).await } /// Create a weak `Client` reference. #[must_use] pub fn weak(&self) -> Weak<S, R, E> { Weak(self.0.weak()) } } impl<S, R, E> Clone for Client<S, R, E> { /// Clone a client. /// /// When a client is cloned the new object will be linked to the same server, /// but in all other respects the clone is a completely independent client. /// /// This means that a cloned client can be passed to a new thread/task and /// make new independent calls to the server without any risk of collision /// between clone and the original client object. fn clone(&self) -> Self { Self(self.0.clone()) } } /// Context used to wait for a server to reply to a request. pub struct WaitReply<R, E>(swctx::WaitCtx<R, RCtxState, E>); impl<R, E> WaitReply<R, E> { /// Block and wait for a reply. /// /// For use in non-`async` threads. /// /// # Errors /// [`Error::ServerDisappeared`] means the linked server object has been /// released. /// /// If the [`ReplyContext`](super::ReplyContext) is dropped by the server /// handler it replies to the message, [`Error::NoReply`] will be returned. /// /// If an application specific error occurs it will be returned in /// [`Error::App`]. pub fn wait(self) -> Result<R, Error<E>> { Ok(self.0.wait()?) } /// Block and wait for a reply. /// /// Same as [`WaitReply::wait()`], but for use in `async` contexts. #[allow(clippy::missing_errors_doc)] pub async fn wait_async(self) -> Result<R, Error<E>> where R: Send, E: Send { Ok(self.0.wait_async().await?) } } /// A weak client reference that can be upgraded to a [`Client`] as long as /// other `Client` objects till exist. #[repr(transparent)] pub struct Weak<S, R, E>( pub(crate) sigq::WeakPusher<ServerQueueNode<S, R, E>> ); impl<S, R, E> Clone for Weak<S, R, E> { fn clone(&self) -> Self { Self(self.0.clone()) } } impl<S, R, E> Weak<S, R, E> { /// Upgrade a `WeakClient` to a [`Client`]. /// /// If no strong `Client` objects still exist then `None` is returned. /// /// # Examples /// /// Upgrading a weak client while stong clients exists works: |
︙ | ︙ | |||
230 231 232 233 234 235 236 237 238 239 240 241 242 | /// let (server, client) = channel::<String, String, ()>(); /// let weak_client = client.weak(); /// drop(client); /// let Some(_) = weak_client.upgrade() else { /// panic!("Unexpectedly able to upgrade weak client"); /// }; /// ``` pub fn upgrade(&self) -> Option<Client<S, R, E>> { self.0.upgrade().map(|x| Client(x)) } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : | > | 264 265 266 267 268 269 270 271 272 273 274 275 276 277 | /// let (server, client) = channel::<String, String, ()>(); /// let weak_client = client.weak(); /// drop(client); /// let Some(_) = weak_client.upgrade() else { /// panic!("Unexpectedly able to upgrade weak client"); /// }; /// ``` #[must_use] pub fn upgrade(&self) -> Option<Client<S, R, E>> { self.0.upgrade().map(|x| Client(x)) } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/err.rs.
︙ | ︙ | |||
8 9 10 11 12 13 14 15 16 17 18 19 20 21 | /// The server object has shut down. /// /// Happens when clients: /// - attempt to transmit messages to a server that has been deallocated. /// - have their requests dropped from the serrver's queue because the /// server itself was deallocated. ServerDisappeared, /// No more client end-points remain. /// /// There are no more nodes to pick up in the queue and all client /// end-points have been dropped (implied: no new nodes will ever be added /// to the queue). ClientsDisappeared, | > > > > | 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | /// The server object has shut down. /// /// Happens when clients: /// - attempt to transmit messages to a server that has been deallocated. /// - have their requests dropped from the serrver's queue because the /// server itself was deallocated. ServerDisappeared, /// A message reply could not be completed because the original requestor /// disappearing. OriginDisappeared, /// No more client end-points remain. /// /// There are no more nodes to pick up in the queue and all client /// end-points have been dropped (implied: no new nodes will ever be added /// to the queue). ClientsDisappeared, |
︙ | ︙ | |||
31 32 33 34 35 36 37 | App(E) } impl<E> Error<E> { /// Attempt to convert [`Error`] into application-specific error. pub fn into_apperr(self) -> Option<E> { match self { | | | | | | > > > | > | | | | | > | | 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | App(E) } impl<E> Error<E> { /// Attempt to convert [`Error`] into application-specific error. pub fn into_apperr(self) -> Option<E> { match self { Self::App(e) => Some(e), _ => None } } /// Unwrap application-specific error from an [`Error`]. /// /// # Panics /// Panics if `Error` variant is not `Error::App()`. pub fn unwrap_apperr(self) -> E { match self { Self::App(e) => e, _ => panic!("Not an Error::App") } } } impl<E: std::error::Error> std::error::Error for Error<E> {} impl<E> fmt::Display for Error<E> where E: std::error::Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Self::ServerDisappeared => write!(f, "Server disappeared"), Self::OriginDisappeared => write!(f, "Requestor disappeared"), Self::ClientsDisappeared => write!(f, "Clients disappeared"), Self::NoReply => write!(f, "Server didn't reply"), Self::App(err) => write!(f, "Application error; {}", err) } } } impl<E> From<swctx::Error<RCtxState, E>> for Error<E> { fn from(err: swctx::Error<RCtxState, E>) -> Self { match err { swctx::Error::Aborted(state) => match state { RCtxState::Queued => Self::ServerDisappeared, RCtxState::Active => Self::NoReply }, swctx::Error::LostWaiter => Self::OriginDisappeared, swctx::Error::App(e) => Self::App(e) } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/lib.rs.
︙ | ︙ | |||
103 104 105 106 107 108 109 | mod err; mod rctx; mod server; pub use err::Error; pub use crate::{ | | | 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 | mod err; mod rctx; mod server; pub use err::Error; pub use crate::{ client::{Client, WaitReply, Weak as WeakClient}, rctx::ReplyContext, server::Server }; /// Create a pair of linked [`Server`] and [`Client`] objects. /// /// The [`Server`] object is used to wait for incoming messages from connected |
︙ | ︙ | |||
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | /// new client object that is connected to the same server object, but is /// completely independent of the original client. /// /// The `S` type parameter is the "request" data type that clients will /// transfer to the server. The `R` type parameter is the "receive" data type /// that clients will receive from the server. The `E` type parameter can be /// used to return application specific errors from the server to the client. pub fn channel<S, R, E>() -> (Server<S, R, E>, Client<S, R, E>) { let (qpusher, qpuller) = sigq::new(); let server = Server(qpuller); let client = Client(qpusher); (server, client) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : | > | 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | /// new client object that is connected to the same server object, but is /// completely independent of the original client. /// /// The `S` type parameter is the "request" data type that clients will /// transfer to the server. The `R` type parameter is the "receive" data type /// that clients will receive from the server. The `E` type parameter can be /// used to return application specific errors from the server to the client. #[must_use] pub fn channel<S, R, E>() -> (Server<S, R, E>, Client<S, R, E>) { let (qpusher, qpuller) = sigq::new(); let server = Server(qpuller); let client = Client(qpusher); (server, client) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/rctx.rs.
1 2 3 4 5 6 7 8 9 10 | //! Allow a thread/task, crossing sync/async boundaries in either direction, to //! deliver an expected piece of data to another thread/task, with //! notification. //! //! These are simple channels used to deliver data from one endpoint to //! another, where the receiver will block until data has been delivered. use crate::err::Error; #[derive(Clone, Default)] | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | //! Allow a thread/task, crossing sync/async boundaries in either direction, to //! deliver an expected piece of data to another thread/task, with //! notification. //! //! These are simple channels used to deliver data from one endpoint to //! another, where the receiver will block until data has been delivered. use crate::err::Error; #[derive(Clone, Default)] pub enum RCtxState { #[default] Queued, Active } /// Object used to respond to requests that have been received by a /// [`Server`](super::Server). |
︙ | ︙ | |||
34 35 36 37 38 39 40 41 42 43 | /// }); /// let msg = String::from("Client"); /// let reply = client.req(msg).unwrap(); /// assert_eq!(reply, "Hello, Client!"); /// server_thread.join().unwrap(); /// ``` /// /// # Semantics /// This call is safe to make after the server context has been released. pub fn reply(self, data: T) -> Result<(), Error<E>> { | > > > > | | 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | /// }); /// let msg = String::from("Client"); /// let reply = client.req(msg).unwrap(); /// assert_eq!(reply, "Hello, Client!"); /// server_thread.join().unwrap(); /// ``` /// /// # Errors /// If the originating caller is no longer waiting for a reply (i.e. was /// dropped) [`Error::OriginDisappeared`] is returned. /// /// # Semantics /// This call is safe to make after the server context has been released. pub fn reply(self, data: T) -> Result<(), Error<E>> { self.0.set(data)?; Ok(()) } /// Return an error to originating client. /// This will cause the calling client to return an error. The error passed /// in the `err` parameter will be wrapped in a `Error::App(err)`. /// |
︙ | ︙ | |||
74 75 76 77 78 79 80 81 82 83 | /// _ => { /// panic!("Unexpected return value"); /// } /// } /// server_thread.join().unwrap(); /// ``` /// /// # Semantics /// This call is safe to make after the server context has been released. pub fn fail(self, err: E) -> Result<(), Error<E>> { | > > > > | | > > > > > > | > | | | 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | /// _ => { /// panic!("Unexpected return value"); /// } /// } /// server_thread.join().unwrap(); /// ``` /// /// # Errors /// If the originating caller is no longer waiting for a reply (i.e. was /// dropped) [`Error::OriginDisappeared`] is returned. /// /// # Semantics /// This call is safe to make after the server context has been released. pub fn fail(self, err: E) -> Result<(), Error<E>> { self.0.fail(err)?; Ok(()) } } impl<T, E> TryFrom<swctx::SetCtx<T, RCtxState, E>> for ReplyContext<T, E> { type Error = Error<E>; /// Convert a `SetCtx` into a `ReplyContext`. /// /// Sets the `SetCtx`'s stat to _Active_. fn try_from( sctx: swctx::SetCtx<T, RCtxState, E> ) -> Result<Self, Self::Error> { let _ = sctx.set_state(RCtxState::Active); Ok(Self(sctx)) } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/server.rs.
1 2 3 4 5 | use crate::{ err::Error, rctx::{RCtxState, ReplyContext} }; | | | > > > > | > > > > | > | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 | use crate::{ err::Error, rctx::{RCtxState, ReplyContext} }; pub struct QueueNode<S, R, E> { /// Raw message being sent from the client to the server. pub(crate) msg: S, /// Keep track of data needed to share reply data. pub(crate) reply: swctx::SetCtx<R, RCtxState, E> } /// Representation of a server object. /// /// Each instantiation of a [`Server`] object represents an end-point which /// will be used to receive messages from connected [`Client`](crate::Client) /// objects. #[repr(transparent)] pub struct Server<S, R, E>(pub(crate) sigq::Puller<QueueNode<S, R, E>>); impl<S, R, E> Server<S, R, E> where S: 'static + Send, R: 'static + Send, E: 'static + Send { /// Block and wait, indefinitely, for an incoming message from a /// [`Client`](crate::Client). /// /// Returns the message sent by the client and a reply context. The server /// must call [`ReplyContext::reply()`] on the reply context to pass a return /// value to the client. /// /// # Errors /// `Err(Error::ClientsDisappeared)` indicates that the queue is empty and /// all the client end-points have been dropped. pub fn wait(&self) -> Result<(S, ReplyContext<R, E>), Error<E>> { let node = self.0.pop().map_err(|_| Error::ClientsDisappeared)?; // Extract the data from the node let msg = node.msg; // Create an application reply context from the reply context in the queue // Implicitly changes state of the reply context from Queued to Waiting let rctx = ReplyContext::try_from(node.reply)?; Ok((msg, rctx)) } /// Take next next message off queue or return `None` is queue is empty. /// /// # Errors /// [`Error::ClientsDisappeared`] indicates that the queue is empty and /// all the client end-points have been dropped. #[allow(clippy::type_complexity)] pub fn try_pop(&self) -> Result<Option<(S, ReplyContext<R, E>)>, Error<E>> { let node = self.0.try_pop().map_err(|_| Error::ClientsDisappeared)?; if let Some(node) = node { // Extract the data from the node let msg = node.msg; // Create an application reply context from the reply context in the // queue Implicitly changes state of the reply context from Queued // to Waiting let rctx = ReplyContext::try_from(node.reply)?; Ok(Some((msg, rctx))) } else { Ok(None) } } /// Same as [`Server::wait()`], but for use in an `async` context. #[allow(clippy::missing_errors_doc)] pub async fn async_wait(&self) -> Result<(S, ReplyContext<R, E>), Error<E>> { let node = self.0.apop().await.map_err(|_| Error::ClientsDisappeared)?; // Extract the data from the node let msg = node.msg; // Create an application reply context from the reply context in the queue // Implicitly changes state of the reply context from Queued to Waiting let rctx = ReplyContext::try_from(node.reply)?; Ok((msg, rctx)) } /// Returns a boolean indicating whether the queue is/was empty. This isn't /// really useful unless used in very specific situations. It mostly exists /// for test cases. #[must_use] pub fn was_empty(&self) -> bool { self.0.was_empty() } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to tests/async_client.rs.
︙ | ︙ | |||
42 43 44 45 46 47 48 | if let Reply::Sum(sum) = result { assert_eq!(sum, a + b); } else { panic!("Didn't get sum"); } } let result = client.areq(Request::Croak).await.unwrap(); | | < | | 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | if let Reply::Sum(sum) = result { assert_eq!(sum, a + b); } else { panic!("Didn't get sum"); } } let result = client.areq(Request::Croak).await.unwrap(); let Reply::OkICroaked = result else { panic!("Didn't get a croak"); }; }); server_thread.join().unwrap(); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Added tests/wait_disappear.rs.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | use ump::{channel, Error}; #[test] fn wait_disappered_on_reply() { let (server, client) = channel::<String, String, ()>(); // Generate a request that will return a wait context. let wctx = client.req_async(String::from("hello")); // Get the message (and reply context) from server end-point let (_msg, rctx) = server.wait().unwrap(); // nuke the wait context drop(wctx); // Replying should fail, because wctx has been dropped let Err(Error::OriginDisappeared) = rctx.reply(String::from("ahoy")) else { panic!("Unexpected error"); }; } #[test] fn wait_disappered_on_fail() { let (server, client) = channel::<String, String, ()>(); // Generate a request that will return a wait context. let wctx = client.req_async(String::from("hello")); // Get the message (and reply context) from server end-point let (_msg, rctx) = server.wait().unwrap(); // nuke the wait context drop(wctx); // Failing should fail, because wctx has been dropped let Err(Error::OriginDisappeared) = rctx.fail(()) else { panic!("Unexpected error"); }; } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to www/changelog.md.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed --- ## [0.12.1] - 2023-10-02 ### Added - Add `Client::req_async()`. - Add `Server::try_pop()`. - `Client` objects can spawn downgraded to `WeakClient` objects, that in turn can be upgraded to `Client` objects (as long as all the strong `Client` objects have not been dropped). --- ## [0.12.0] - 2023-08-15 | > > > > > > > > > > > > > > > > > > > > > > > > > | | | | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 | # Change Log ⚠️ indicates a breaking change. ## [Unreleased] [Details](/vdiff?from=ump-0.13.0&to=trunk) ### Added ### Changed ### Removed --- ## [0.13.0] - 2024-09-10 [Details](/vdiff?from=ump-0.12.1&to=ump-0.13.0) ### Changed - Update to `swctx` to `0.3.0`, allowing `ReplyContext` to detect if the originating client has been dropped. - ⚠️ Require `std::error::Error` bound on application-specific error `E` for `std::error::Error` implementation on `Error<E>` as well as `fmt::Display` for `Error<E>`. ### Removed - Remove `dev-docs` feature - Remove superfluous `parking_lot` dependency. --- ## [0.12.1] - 2023-10-02 [Details](/vdiff?from=ump-0.12.0&to=ump-0.12.1) ### Added - Add `Client::req_async()`. - Add `Server::try_pop()`. - `Client` objects can spawn downgraded to `WeakClient` objects, that in turn can be upgraded to `Client` objects (as long as all the strong `Client` objects have not been dropped). --- ## [0.12.0] - 2023-08-15 [Details](/vdiff?from=ump-0.11.0&to=ump-0.12.0) ### Changed - Include tests when publishing crate. - Bugfix: Use `err::Error` rather than `rctx::err::Error` in rctx::public, giving `ReplyContext::reply()` and `ReplyContext::fail()` the correct return types. - Use the `swctx` crate for sending back the reply rather than use a custom in-tree implementation. - Update `edition` to `2021` and `rust-version` to `1.56`. - Add `--generate-link-to-definition` to `rustdoc-args` in `Cargo.toml` --- ## [0.11.0] - 2023-07-29 [Details](/vdiff?from=ump-0.10.2&to=ump-0.11.0) ### Changed - Include tests when publishing crate. - Bugfix: Use `err::Error` rather than `rctx::err::Error` in rctx::public, giving `ReplyContext::reply()` and `ReplyContext::fail()` the correct return types. --- ## [0.10.2] - 2023-07-28 [Details](/vdiff?from=ump-0.10.1&to=ump-0.10.2) ### Added - Add `send()`/`asend()` wrappers around the new `req()`/`areq()` methods with a deprecation notice. - Add a `dev-docs` feature to allow internal documentation notes to be included in generated documentation. ### Changed - Rename `send()`/`asend()` to `req()`/`areq()`. --- ## [0.10.1] - 2023-07-27 [Details](/vdiff?from=ump-0.10.0&to=ump-0.10.1) ### Changed - Runtime dependencies: - Updated `sigq` to `0.13.3`. --- ## [0.10.0] - 2023-07-26 [Details](/vdiff?from=ump-0.9.0&to=ump-0.10.0) ### Added - Server's receive methods will fail with `Error::ClientsDisappeared` if all the associated Client objects have been dropped. ### Changed |
︙ | ︙ |