Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From ump-ng-0.1.0 To ump-ng-0.2.0
2024-09-12
| ||
20:25 | Move clippy config from bacon.toml to Cargo.toml. check-in: e3dd8a1bd0 user: jan tags: trunk | |
2024-09-10
| ||
00:30 | Rename `MsgType::Put` to `MsgType::Post`. Dependency maintenance. check-in: 6f4b9d43d1 user: jan tags: trunk, ump-ng-0.2.0 | |
2024-09-09
| ||
20:59 | Make use of updated swctx's ability to report when the waitctx has been dropped. Clippy fixups. check-in: a48d827d1a user: jan tags: trunk | |
2023-10-03
| ||
07:17 | Document WeakClient. check-in: fa626b58be user: jan tags: trunk | |
2023-10-02
| ||
14:47 | Happy clippy. check-in: 6b4a0d3b99 user: jan tags: trunk, ump-ng-0.1.0 | |
14:44 | Export WaitReply. check-in: 196e3c425c user: jan tags: trunk | |
Changes to Cargo.toml.
1 2 | [package] name = "ump-ng" | | > > > > > > < | | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | [package] name = "ump-ng" version = "0.2.0" edition = "2021" license = "0BSD" # https://crates.io/category_slugs categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/ump-ng" description = "Micro message passing library for threads/tasks communication." exclude = [ ".fossil-settings", ".efiles", ".fslckout", "examples", "www", "bacon.toml", "rustfmt.toml" ] # https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section [badges] maintenance = { status = "passively-maintained" } [dependencies] sigq = { version = "0.13.5" } swctx = { version = "0.3.0" } [dev-dependencies] criterion = { version = "0.5.1", features = ["async_tokio"] } tokio = { version = "1.40.0", features = ["rt-multi-thread"] } [[bench]] name = "add_server" harness = false [package.metadata.docs.rs] rustdoc-args = ["--generate-link-to-definition"] |
︙ | ︙ |
Changes to README.md.
1 2 3 4 5 6 7 8 9 | # Micro-Message Passing Library The _ump-ng_ crate is a simple client/server message passing library for intra-process communication. Its primary purpose is to allow cross async/non-async communication (for both the server and client endpoints). ump-ng is similar to [ump](https://crates.io/crates/ump), but it has a uni-directional message passing operation. | > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | # Micro-Message Passing Library The _ump-ng_ crate is a simple client/server message passing library for intra-process communication. Its primary purpose is to allow cross async/non-async communication (for both the server and client endpoints). ump-ng is similar to [ump](https://crates.io/crates/ump), but it has a uni-directional message passing operation. [![Crates.io][crates-badge]][crates-url] [![0BSD licensed][0bsd-badge]][0bsd-url] [crates-badge]: https://img.shields.io/crates/v/ump-ng.svg [crates-url]: https://crates.io/crates/ump-ng [0bsd-badge]: https://img.shields.io/badge/license-0BSD-blue.svg [0bsd-url]: https://opensource.org/license/0bsd |
Added bacon.toml.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | # This is a configuration file for the bacon tool # # Bacon repository: https://github.com/Canop/bacon # Complete help on configuration: https://dystroy.org/bacon/config/ # You can also check bacon's own bacon.toml file # as an example: https://github.com/Canop/bacon/blob/main/bacon.toml # For information about clippy lints, see: # https://github.com/rust-lang/rust-clippy/blob/master/README.md #default_job = "check" default_job = "clippy-all-pedantic" [jobs.check] command = ["cargo", "check", "--color", "always"] need_stdout = false [jobs.check-all] command = ["cargo", "check", "--all-targets", "--color", "always"] need_stdout = false # Run clippy on the default target [jobs.clippy] command = [ "cargo", "clippy", "--color", "always", ] need_stdout = false # Run clippy on all targets # To disable some lints, you may change the job this way: # [jobs.clippy-all] # command = [ # "cargo", "clippy", # "--all-targets", # "--color", "always", # "--", # "-A", "clippy::bool_to_int_with_if", # "-A", "clippy::collapsible_if", # "-A", "clippy::derive_partial_eq_without_eq", # ] # need_stdout = false [jobs.clippy-all] command = [ "cargo", "clippy", "--all-targets", "--color", "always", ] need_stdout = false [jobs.clippy-pedantic] command = [ "cargo", "clippy", "--color", "always", "--", "-Wclippy::all", "-Wclippy::pedantic", "-Wclippy::nursery", "-Wclippy::cargo" ] need_stdout = false [jobs.clippy-all-pedantic] command = [ "cargo", "clippy", "--all-targets", "--color", "always", "--", "-Wclippy::all", "-Wclippy::pedantic", "-Wclippy::nursery", "-Wclippy::cargo" ] need_stdout = false # This job lets you run # - all tests: bacon test # - a specific test: bacon test -- config::test_default_files # - the tests of a package: bacon test -- -- -p config [jobs.test] command = [ "cargo", "test", "--color", "always", "--", "--color", "always", # see https://github.com/Canop/bacon/issues/124 ] need_stdout = true [jobs.doc] command = ["cargo", "doc", "--color", "always", "--no-deps"] need_stdout = false # If the doc compiles, then it opens in your browser and bacon switches # to the previous job [jobs.doc-open] command = ["cargo", "doc", "--color", "always", "--no-deps", "--open"] need_stdout = false on_success = "back" # so that we don't open the browser at each change # You can run your application and have the result displayed in bacon, # *if* it makes sense for this crate. # Don't forget the `--color always` part or the errors won't be # properly parsed. # If your program never stops (eg a server), you may set `background` # to false to have the cargo run output immediately displayed instead # of waiting for program's end. [jobs.run] command = [ "cargo", "run", "--color", "always", # put launch parameters for your program behind a `--` separator ] need_stdout = true allow_warnings = true background = true # This parameterized job runs the example of your choice, as soon # as the code compiles. # Call it as # bacon ex -- my-example [jobs.ex] command = ["cargo", "run", "--color", "always", "--example"] need_stdout = true allow_warnings = true # You may define here keybindings that would be specific to # a project, for example a shortcut to launch a specific job. # Shortcuts to internal functions (scrolling, toggling, etc.) # should go in your personal global prefs.toml file instead. [keybindings] # alt-m = "job:my-job" c = "job:clippy-all" # comment this to have 'c' run clippy on only the default target |
Changes to benches/add_server.rs.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | use std::thread; use criterion::{criterion_group, criterion_main, Criterion}; use ump_ng::{channel, MsgType}; enum Ops { Die, Add(i32, i32), AddThreaded(i32, i32) } pub fn criterion_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("send operation"); let (server, client) = channel::<(), Ops, i32, ()>(); let server_thread = thread::spawn(move || { let mut croak = false; | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | use std::thread; use criterion::{criterion_group, criterion_main, Criterion}; use ump_ng::{channel, MsgType}; enum Ops { Die, Add(i32, i32), AddThreaded(i32, i32) } #[allow(clippy::missing_panics_doc)] pub fn criterion_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("send operation"); let (server, client) = channel::<(), Ops, i32, ()>(); let server_thread = thread::spawn(move || { let mut croak = false; |
︙ | ︙ | |||
42 43 44 45 46 47 48 | group.bench_function("add", |b| { b.iter(|| { p += 2; q -= 3; let result = client.req(Ops::Add(p, q)).unwrap(); assert_eq!(result, q + p); | | | | | | 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 | group.bench_function("add", |b| { b.iter(|| { p += 2; q -= 3; let result = client.req(Ops::Add(p, q)).unwrap(); assert_eq!(result, q + p); }); }); p = 0; q = 0; group.bench_function("add (threaded)", |b| { b.iter(|| { p += 2; q -= 3; let result = client.req(Ops::AddThreaded(p, q)).unwrap(); assert_eq!(result, q + p); }); }); let rt = tokio::runtime::Runtime::new().unwrap(); group.bench_function("add (async)", |b| { b.to_async(&rt).iter(|| async { let p = 1; let q = 2; let result = client.areq(Ops::Add(p, q)).await.unwrap(); assert_eq!(result, q + p); }); }); let rt = tokio::runtime::Runtime::new().unwrap(); group.bench_function("add (async, threaded)", |b| { b.to_async(&rt).iter(|| async { let p = 1; let q = 2; let result = client.areq(Ops::AddThreaded(p, q)).await.unwrap(); assert_eq!(result, q + p); }); }); let result = client.req(Ops::Die).unwrap(); assert_eq!(result, 0); server_thread.join().unwrap(); |
︙ | ︙ |
Changes to examples/many_once.rs.
︙ | ︙ | |||
31 32 33 34 35 36 37 | while count < nclients { // Wait for data to arrive from a client println!("Server waiting for message .."); let MsgType::Request(data, rctx) = server.wait().unwrap() else { panic!("Unexpected message operation type"); }; | | | | | | | 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | while count < nclients { // Wait for data to arrive from a client println!("Server waiting for message .."); let MsgType::Request(data, rctx) = server.wait().unwrap() else { panic!("Unexpected message operation type"); }; println!("Server received: '{data}'"); // .. process data from client .. // Reply to client let reply = format!("Hello, {data}!"); println!("Server replying '{reply}'"); rctx.reply(reply).unwrap(); // Increase message counter count += 1; } println!("Server done"); }); let mut join_handles = Vec::new(); for i in 0..nclients { let client_clone = client.clone(); let client_thread = thread::spawn(move || { let name = format!("Client {}", i + 1); let msg = String::from(&name); println!("{name} sending '{msg}'"); let reply = client_clone.req(msg).unwrap(); println!("{name} received reply '{reply}' -- done"); }); join_handles.push(client_thread); } for n in join_handles { n.join().unwrap(); } server_thread.join().unwrap(); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to examples/simple.rs.
1 2 3 4 5 6 7 8 9 10 11 12 | use std::thread; use ump_ng::{channel, MsgType}; fn main() { let (server, client) = channel::<(), String, String, ()>(); let server_thread = thread::spawn(move || { // Wait for data to arrive from a client println!("Server waiting for message .."); match server.wait().unwrap() { | | | | | | | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | use std::thread; use ump_ng::{channel, MsgType}; fn main() { let (server, client) = channel::<(), String, String, ()>(); let server_thread = thread::spawn(move || { // Wait for data to arrive from a client println!("Server waiting for message .."); match server.wait().unwrap() { MsgType::Post(_m) => { panic!("Unexpected Post message") } MsgType::Request(data, rctx) => { println!("Server received: '{data}'"); // Process data from client // Reply to client let reply = format!("Hello, {data}!"); println!("Server replying '{reply}'"); rctx.reply(reply).unwrap(); } } println!("Server done"); }); let msg = String::from("Client"); println!("Client sending '{msg}'"); let reply = client.req(msg).unwrap(); println!("Client received reply '{reply}'"); println!("Client done"); server_thread.join().unwrap(); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to examples/threaded_handler.rs.
︙ | ︙ | |||
27 28 29 30 31 32 33 | let MsgType::Request(data, rctx) = server.wait().unwrap() else { panic!("Unexpected message operation type"); }; // Move the received data and reply context into a thread to allow other // messages to be received while processing this message. thread::spawn(move || { | | | | | | | 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 | let MsgType::Request(data, rctx) = server.wait().unwrap() else { panic!("Unexpected message operation type"); }; // Move the received data and reply context into a thread to allow other // messages to be received while processing this message. thread::spawn(move || { println!("Server received: '{data}'"); // Process data from client // Reply to client let reply = format!("Hello, {data}!"); println!("Server replying '{reply}'"); rctx.reply(reply).unwrap(); }); // Increase message counter count += 1; } println!("Server done"); }); let mut join_handles = Vec::new(); for i in 0..nclients { let client_clone = client.clone(); let client_thread = thread::spawn(move || { let name = format!("Client {}", i + 1); let msg = String::from(&name); println!("{name} sending '{msg}'"); let reply = client_clone.req(msg).unwrap(); println!("{name} received reply '{reply}' -- done"); }); join_handles.push(client_thread); } for n in join_handles { n.join().unwrap(); } server_thread.join().unwrap(); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/client.rs.
︙ | ︙ | |||
16 17 18 19 20 21 22 23 24 25 | impl<P, S, R, E> Client<P, S, R, E> where P: 'static + Send, R: 'static + Send, E: 'static + Send { /// Transmit a uni-directional message to the server end-point. pub fn post(&self, msg: P) -> Result<(), Error<E>> { self .0 | > > > > | > | | > | | 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | impl<P, S, R, E> Client<P, S, R, E> where P: 'static + Send, R: 'static + Send, E: 'static + Send { /// Transmit a uni-directional message to the server end-point. /// /// # Errors /// [`Error::ServerDisappeared`] means the [`Server`](super::Server) /// end-point has been dropped. pub fn post(&self, msg: P) -> Result<(), Error<E>> { self .0 .push(InnerMsgType::Post(msg)) .map_err(|_| Error::ServerDisappeared)?; Ok(()) } /// Send a message to the server, wait for a reply, and return the reply. /// /// A complete round-trip (the message is delivered to the server, and the /// server sends a reply) must complete before this function returns /// success. /// /// This method is _currently_ reentrant: It is safe to use share a single /// `Client` among multiple threads. _This may change in the future_; it's /// recommended to not rely on this. The recommended way to send messages to /// a server from multiple threads is to clone the `Client` and assign one /// separate `Client` to each thread. /// /// # Return /// On success the function will return `Ok(msg)`. /// /// # Errors /// If the linked server object has been released, or is released while the /// message is in the server's queue, [`Error::ServerDisappeared`] will be /// returned. /// /// If the server never replied to the message and the /// [`ReplyContext`](super::ReplyContext) was dropped [`Error::NoReply`] will /// be returned. /// /// If an application specific error occurs it will be returned as a /// `Err(Error::App(E))`, where `E` is the error type used when creating the /// [`channel`](crate::channel). pub fn req(&self, out: S) -> Result<R, Error<E>> { // Create a per-call reply context. // This context could be created when the Client object is being created |
︙ | ︙ | |||
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | /// /// let reply = wrctx.wait().unwrap(); /// println!("Client received reply '{}'", reply); /// println!("Client done"); /// /// server_thread.join().unwrap(); /// ``` pub fn req_async(&self, out: S) -> Result<WaitReply<R, E>, Error<E>> { let (sctx, wctx) = swctx::mkpair(); self .0 .push(InnerMsgType::Request(out, sctx)) .map_err(|_| Error::ServerDisappeared)?; Ok(WaitReply(wctx)) } | > > > > | > | > > > > | | | > > > > > > > > > > | > | > > > > > > | | | > | 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 | /// /// let reply = wrctx.wait().unwrap(); /// println!("Client received reply '{}'", reply); /// println!("Client done"); /// /// server_thread.join().unwrap(); /// ``` /// /// # Errors /// [`Error::ServerDisappeared`] means the [`Server`](super::Server) /// end-point has been dropped. pub fn req_async(&self, out: S) -> Result<WaitReply<R, E>, Error<E>> { let (sctx, wctx) = swctx::mkpair(); self .0 .push(InnerMsgType::Request(out, sctx)) .map_err(|_| Error::ServerDisappeared)?; Ok(WaitReply(wctx)) } /// Same as [`Client::req()`], but for use in `async` contexts. #[allow(clippy::missing_errors_doc)] pub async fn areq(&self, out: S) -> Result<R, Error<E>> where S: Send { let (sctx, wctx) = swctx::mkpair(); self .0 .push(InnerMsgType::Request(out, sctx)) .map_err(|_| Error::ServerDisappeared)?; Ok(wctx.wait_async().await?) } /// Create a weak `Client` reference. #[must_use] pub fn weak(&self) -> Weak<P, S, R, E> { Weak(self.0.weak()) } } impl<P, S, R, E> Clone for Client<P, S, R, E> { /// Clone a client. /// /// When a client is cloned the new object will be linked to the same server, /// but in all other respects the clone is a completely independent client. /// /// This means that a cloned client can be passed to a new thread/task and /// make new independent calls to the server without any risk of collision /// between clone and the original client object. fn clone(&self) -> Self { Self(self.0.clone()) } } /// Context used to wait for a server to reply to a request. pub struct WaitReply<R, E>(swctx::WaitCtx<R, RCtxState, E>); impl<R, E> WaitReply<R, E> { /// Block and wait for a reply. /// /// For use in non-`async` threads. /// /// # Errors /// [`Error::ServerDisappeared`] means the linked server object has been /// released. /// /// If the [`ReplyContext`](super::ReplyContext) is dropped by the server /// handler it replies to the message, [`Error::NoReply`] will be returned. /// /// If an application specific error occurs it will be returned in /// [`Error::App`]. pub fn wait(self) -> Result<R, Error<E>> { Ok(self.0.wait()?) } /// Block and wait for a reply. /// /// Same as [`WaitReply::wait()`], but for use in `async` contexts. #[allow(clippy::missing_errors_doc)] pub async fn wait_async(self) -> Result<R, Error<E>> where R: Send, E: Send { Ok(self.0.wait_async().await?) } } /// A weak client reference that can be upgraded to a [`Client`] as long as /// other `Client` objects till exist. #[repr(transparent)] pub struct Weak<P, S, R, E>( pub(crate) sigq::WeakPusher<InnerMsgType<P, S, R, E>> ); impl<P, S, R, E> Clone for Weak<P, S, R, E> { fn clone(&self) -> Self { Self(self.0.clone()) } } impl<P, S, R, E> Weak<P, S, R, E> { #[must_use] pub fn upgrade(&self) -> Option<Client<P, S, R, E>> { self.0.upgrade().map(|x| Client(x)) } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/err.rs.
︙ | ︙ | |||
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | /// The server object has shut down. /// /// This happens when clients: /// - attempt to send messages to a server that has been deallocated. /// - have their requests dropped from the serrver's queue because the /// server itself was deallocated. ServerDisappeared, /// All the client objects have shut down. /// /// This happens when a server attempts to pull the latest node on off the /// queue, but there are no more nodes on the queue and all the associated /// client objects have been released (impled: New nodes can never be added /// to the queue). ClientsDisappeared, /// The message was delivered to the server, but the reply context was /// released before sending back a reply. NoReply, /// Application-specific error. /// The `E` type is typically declared as the third generic parameter to /// [`channel`](crate::channel()). App(E) } impl<E> Error<E> { pub fn into_apperr(self) -> Option<E> { match self { | > > > > > > > | > > > > > | | | > > > | > | | | | | > | | 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 | /// The server object has shut down. /// /// This happens when clients: /// - attempt to send messages to a server that has been deallocated. /// - have their requests dropped from the serrver's queue because the /// server itself was deallocated. ServerDisappeared, /// A message reply could not be completed because the original requestor /// disappearing. OriginDisappeared, /// All the client objects have shut down. /// /// This happens when a server attempts to pull the latest node on off the /// queue, but there are no more nodes on the queue and all the associated /// client objects have been released (impled: New nodes can never be added /// to the queue). ClientsDisappeared, /// The message was delivered to the server, but the reply context was /// released before sending back a reply. NoReply, /// Application-specific error. /// The `E` type is typically declared as the third generic parameter to /// [`channel`](crate::channel()). App(E) } impl<E> Error<E> { /// Get application-specific error. /// /// Will return `None` is the error is not [`Error::App`]. pub fn into_apperr(self) -> Option<E> { match self { Self::App(e) => Some(e), _ => None } } /// Unwrap application-specific error. /// /// # Panics /// If the error is not [`Error::App`] this function will panic. pub fn unwrap_apperr(self) -> E { match self { Self::App(e) => e, _ => panic!("Not an Error::App") } } } impl<E> std::error::Error for Error<E> where E: std::error::Error {} impl<E> fmt::Display for Error<E> where E: std::error::Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Self::ServerDisappeared => write!(f, "Server disappeared"), Self::OriginDisappeared => write!(f, "Requestor disappeared"), Self::ClientsDisappeared => write!(f, "Clients disappeared"), Self::NoReply => write!(f, "Server didn't reply"), Self::App(err) => write!(f, "Application error; {err}") } } } impl<E> From<swctx::Error<RCtxState, E>> for Error<E> { fn from(err: swctx::Error<RCtxState, E>) -> Self { match err { swctx::Error::Aborted(state) => match state { RCtxState::Queued => Self::ServerDisappeared, RCtxState::Active => Self::NoReply }, swctx::Error::LostWaiter => Self::OriginDisappeared, swctx::Error::App(e) => Self::App(e) } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/lib.rs.
1 2 3 4 | //! Micro Message Pass: Next Generation (ump-ng) is a library for passing //! messages between thread/tasks. It is similar to the `ump` library, but //! with an added uni-directional message passing primitive. //! | | | 1 2 3 4 5 6 7 8 9 10 11 12 | //! Micro Message Pass: Next Generation (ump-ng) is a library for passing //! messages between thread/tasks. It is similar to the `ump` library, but //! with an added uni-directional message passing primitive. //! //! The primary purpose of ump(-ng) is to create simple RPC-like designs, but //! between threads/tasks within a process rather than between processes over //! networks. //! //! # High-level usage overview //! An application calls [`channel`] to create a linked pair of a [`Server`] //! and a [`Client`]. //! |
︙ | ︙ | |||
39 40 41 42 43 44 45 | //! let (server, client) = channel::<String, String, String, ()>(); //! //! let server_thread = thread::spawn(move || { //! // Wait for data to arrive from a client //! loop { //! println!("Server waiting for message .."); //! match server.wait().unwrap() { | | | | 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | //! let (server, client) = channel::<String, String, String, ()>(); //! //! let server_thread = thread::spawn(move || { //! // Wait for data to arrive from a client //! loop { //! println!("Server waiting for message .."); //! match server.wait().unwrap() { //! MsgType::Post(data) => { //! println!("Server received Post: '{}'", data); //! } //! MsgType::Request(data, rctx) => { //! println!("Server received Request: '{}'", data); //! //! // Process data from client //! //! // Reply to client |
︙ | ︙ | |||
120 121 122 123 124 125 126 | mod err; mod rctx; mod server; pub use err::Error; pub use crate::{ | | | 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | mod err; mod rctx; mod server; pub use err::Error; pub use crate::{ client::{Client, WaitReply, Weak as WeakClient}, rctx::ReplyContext, server::{MsgType, Server} }; /// Create a pair of linked [`Server`] and [`Client`] objects. /// /// The [`Server`] object is used to wait for incoming messages from connected |
︙ | ︙ | |||
144 145 146 147 148 149 150 151 152 153 154 155 156 157 | /// completely independent of the original client. /// /// The `S` type parameter is the "send" data type that clients will transfer /// to the server. The `R` type parameter is the "receive" data type that /// clients will receive from the server. The `E` type parameter can be used /// to return application specific errors from the server to the client. #[allow(clippy::type_complexity)] pub fn channel<P, S, R, E>() -> (Server<P, S, R, E>, Client<P, S, R, E>) { let (qpusher, qpuller) = sigq::new(); (Server(qpuller), Client(qpusher)) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : | > | 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | /// completely independent of the original client. /// /// The `S` type parameter is the "send" data type that clients will transfer /// to the server. The `R` type parameter is the "receive" data type that /// clients will receive from the server. The `E` type parameter can be used /// to return application specific errors from the server to the client. #[allow(clippy::type_complexity)] #[must_use] pub fn channel<P, S, R, E>() -> (Server<P, S, R, E>, Client<P, S, R, E>) { let (qpusher, qpuller) = sigq::new(); (Server(qpuller), Client(qpusher)) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/rctx.rs.
1 2 3 4 5 6 7 8 9 10 | //! Allow a thread/task, crossing sync/async boundaries in either direction, to //! deliver an expected piece of data to another thread/task, with //! notification. //! //! These are simple channels used to deliver data from one endpoint to //! another, where the receiver will block until data has been delivered. use crate::err::Error; #[derive(Clone, Default)] | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | //! Allow a thread/task, crossing sync/async boundaries in either direction, to //! deliver an expected piece of data to another thread/task, with //! notification. //! //! These are simple channels used to deliver data from one endpoint to //! another, where the receiver will block until data has been delivered. use crate::err::Error; #[derive(Clone, Default)] pub enum RCtxState { #[default] Queued, Active } /// Object used to reply to requests passed to the server. pub struct ReplyContext<T, E>(swctx::SetCtx<T, RCtxState, E>); |
︙ | ︙ | |||
35 36 37 38 39 40 41 42 43 44 | /// }); /// let msg = String::from("Client"); /// let reply = client.req(msg).unwrap(); /// assert_eq!(reply, "Hello, Client!"); /// server_thread.join().unwrap(); /// ``` /// /// # Semantics /// This call is safe to make after the server context has been released. pub fn reply(self, data: T) -> Result<(), Error<E>> { | > > > > | | 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | /// }); /// let msg = String::from("Client"); /// let reply = client.req(msg).unwrap(); /// assert_eq!(reply, "Hello, Client!"); /// server_thread.join().unwrap(); /// ``` /// /// # Errors /// If the originating caller is no longer waiting for a reply (i.e. was /// dropped) [`Error::OriginDisappeared`] is returned. /// /// # Semantics /// This call is safe to make after the server context has been released. pub fn reply(self, data: T) -> Result<(), Error<E>> { self.0.set(data)?; Ok(()) } /// Return an error to originating client. /// This will cause the calling client to return an error. The error passed /// in the `err` parameter will be wrapped in a `Error::App(err)`. /// |
︙ | ︙ | |||
77 78 79 80 81 82 83 84 85 86 | /// _ => { /// panic!("Unexpected return value"); /// } /// } /// server_thread.join().unwrap(); /// ``` /// /// # Semantics /// This call is safe to make after the server context has been released. pub fn fail(self, err: E) -> Result<(), Error<E>> { | > > > > | | > > > > > > | > | | | 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | /// _ => { /// panic!("Unexpected return value"); /// } /// } /// server_thread.join().unwrap(); /// ``` /// /// # Errors /// [`Error::OriginDisappeared`] indicates that the originating caller is no /// longer waiting for a reply (i.e. it was dropped). /// /// # Semantics /// This call is safe to make after the server context has been released. pub fn fail(self, err: E) -> Result<(), Error<E>> { self.0.fail(err)?; Ok(()) } } impl<T, E> TryFrom<swctx::SetCtx<T, RCtxState, E>> for ReplyContext<T, E> { type Error = Error<E>; /// Convert a `SetCtx` into a `ReplyContext`. /// /// Sets the `SetCtx`'s stat to _Active_. fn try_from( sctx: swctx::SetCtx<T, RCtxState, E> ) -> Result<Self, Self::Error> { let _ = sctx.set_state(RCtxState::Active); Ok(Self(sctx)) } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/server.rs.
1 2 3 4 5 | use crate::{ err::Error, rctx::{RCtxState, ReplyContext} }; | | | | | > > | | | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | use crate::{ err::Error, rctx::{RCtxState, ReplyContext} }; pub enum InnerMsgType<P, S, R, E> { Post(P), Request(S, swctx::SetCtx<R, RCtxState, E>) } /// Mesage operation types. pub enum MsgType<P, S, R, E> { /// A uni-directional message pass. Post(P), /// A message pass that expects a reply. Request(S, ReplyContext<R, E>) } impl<P, S, R, E> TryFrom<InnerMsgType<P, S, R, E>> for MsgType<P, S, R, E> { type Error = Error<E>; fn try_from(val: InnerMsgType<P, S, R, E>) -> Result<Self, Self::Error> { match val { InnerMsgType::Post(msg) => Ok(Self::Post(msg)), InnerMsgType::Request(msg, irctx) => { // Create an application reply context from the reply context in the // queue Implicitly changes state of the reply context from // Queued to Waiting let rctx = ReplyContext::try_from(irctx)?; Ok(Self::Request(msg, rctx)) } } } } /// Representation of a server object. /// |
︙ | ︙ | |||
52 53 54 55 56 57 58 59 60 | { /// Block and wait, indefinitely, for an incoming message from a /// [`Client`](crate::Client). /// /// Returns the message sent by the client and a reply context. The server /// must call [`ReplyContext::reply()`] on the reply context to pass a return /// value to the client. pub fn wait(&self) -> Result<MsgType<P, S, R, E>, Error<E>> { let msg = self.0.pop().map_err(|_| Error::ClientsDisappeared)?; | > > > > < | > > > > | > < | > | 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | { /// Block and wait, indefinitely, for an incoming message from a /// [`Client`](crate::Client). /// /// Returns the message sent by the client and a reply context. The server /// must call [`ReplyContext::reply()`] on the reply context to pass a return /// value to the client. /// /// # Errors /// [`Error::ClientsDisappeared`] indicates that the queue is empty and /// all the client end-points have been dropped. pub fn wait(&self) -> Result<MsgType<P, S, R, E>, Error<E>> { let msg = self.0.pop().map_err(|_| Error::ClientsDisappeared)?; msg.try_into() } /// Take next next message off queue or return `None` is queue is empty. /// /// # Errors /// [`Error::ClientsDisappeared`] indicates that the queue is empty and /// all the client end-points have been dropped. #[allow(clippy::type_complexity)] pub fn try_pop(&self) -> Result<Option<MsgType<P, S, R, E>>, Error<E>> { let msg = self.0.try_pop().map_err(|_| Error::ClientsDisappeared)?; if let Some(msg) = msg { Ok(Some(msg.try_into()?)) } else { Ok(None) } } /// Same as [`Server::wait()`], but for use in an `async` context. #[allow(clippy::missing_errors_doc)] pub async fn async_wait(&self) -> Result<MsgType<P, S, R, E>, Error<E>> { let msg = self.0.apop().await.map_err(|_| Error::ClientsDisappeared)?; msg.try_into() } /// Returns a boolean indicating whether the queue is/was empty. This isn't /// really useful unless used in very specific situations. It mostly exists /// for test cases. #[must_use] pub fn was_empty(&self) -> bool { self.0.was_empty() } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to tests/async_client.rs.
︙ | ︙ | |||
44 45 46 47 48 49 50 | if let Reply::Sum(sum) = result { assert_eq!(sum, a + b); } else { panic!("Didn't get sum"); } } let result = client.areq(Request::Croak).await.unwrap(); | | < | | 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | if let Reply::Sum(sum) = result { assert_eq!(sum, a + b); } else { panic!("Didn't get sum"); } } let result = client.areq(Request::Croak).await.unwrap(); let Reply::OkICroaked = result else { panic!("Didn't get a croak"); }; }); server_thread.join().unwrap(); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to tests/stress.rs.
︙ | ︙ | |||
13 14 15 16 17 18 19 | let (server, client) = channel::<(), Ops, i32, ()>(); let server_thread = thread::spawn(move || { let mut croak = false; while !croak { match server.wait().unwrap() { | | | 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | let (server, client) = channel::<(), Ops, i32, ()>(); let server_thread = thread::spawn(move || { let mut croak = false; while !croak { match server.wait().unwrap() { MsgType::Post(()) => panic!("Unexpected Post message"), MsgType::Request(data, rctx) => match data { Ops::Die => { croak = true; rctx.reply(0).unwrap(); } Ops::Add(a, b) => { rctx.reply(a + b).unwrap(); |
︙ | ︙ |
Added tests/wait_disappear.rs.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | use ump_ng::{channel, Error, MsgType}; #[test] fn wait_disappered_on_reply() { let (server, client) = channel::<String, String, String, ()>(); // Generate a request that will return a wait context. let wctx = client.req_async(String::from("hello")); // Get the message (and reply context) from server end-point let MsgType::Request(_msg, rctx) = server.wait().unwrap() else { panic!("Unexpectedly not MsgType::Request"); }; // nuke the wait context drop(wctx); // Replying should fail, because wctx has been dropped let Err(Error::OriginDisappeared) = rctx.reply(String::from("ahoy")) else { panic!("Unexpected error"); }; } #[test] fn wait_disappered_on_fail() { let (server, client) = channel::<String, String, String, ()>(); // Generate a request that will return a wait context. let wctx = client.req_async(String::from("hello")); // Get the message (and reply context) from server end-point let MsgType::Request(_msg, rctx) = server.wait().unwrap() else { panic!("Unexpectedly not MsgType::Request"); }; // nuke the wait context drop(wctx); // Failing should fail, because wctx has been dropped let Err(Error::OriginDisappeared) = rctx.fail(()) else { panic!("Unexpected error"); }; } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to www/changelog.md.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed --- ## [0.1.0] - 2023-10-02 Initial release. | > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | # Change Log ⚠️ indicates a breaking change. ## [Unreleased] [Details](/vdiff?from=ump-ng-0.1.0&to=trunk) ### Added ### Changed - Add some `Send` bounds to make `Future`s `Send`. - Update to `swctx` to `0.3.0`, allowing `ReplyContext` to detect if the originating client has been dropped. - ⚠️ Require `std::error::Error` bound on application-specific error `E` for `std::error::Error` implementation on `Error<E>` as well as `fmt::Display` for `Error<E>`. - ⚠️ Rename `MsgType::Put` to `MsgType::Post`. ### Removed - Remove superfluous `parking_lot` dependency. --- ## [0.1.0] - 2023-10-02 Initial release. |