Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,10 +1,11 @@ [package] name = "ump-ng" -version = "0.1.0" +version = "0.2.0" edition = "2021" license = "0BSD" +# https://crates.io/category_slugs categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/ump-ng" description = "Micro message passing library for threads/tasks communication." exclude = [ @@ -11,24 +12,28 @@ ".fossil-settings", ".efiles", ".fslckout", "examples", "www", + "bacon.toml", "rustfmt.toml" ] +# https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section +[badges] +maintenance = { status = "passively-maintained" } + [dependencies] -parking_lot = { version = "0.12.1" } -sigq = { version = "0.13.4" } -swctx = { version = "0.2.1" } +sigq = { version = "0.13.5" } +swctx = { version = "0.3.0" } [dev-dependencies] criterion = { version = "0.5.1", features = ["async_tokio"] } -tokio = { version = "1.32.0", features = ["rt-multi-thread"] } +tokio = { version = "1.40.0", features = ["rt-multi-thread"] } [[bench]] name = "add_server" harness = false [package.metadata.docs.rs] rustdoc-args = ["--generate-link-to-definition"] Index: README.md ================================================================== --- README.md +++ README.md @@ -5,5 +5,13 @@ async/non-async communication (for both the server and client endpoints). ump-ng is similar to [ump](https://crates.io/crates/ump), but it has a uni-directional message passing operation. +[![Crates.io][crates-badge]][crates-url] +[![0BSD licensed][0bsd-badge]][0bsd-url] + +[crates-badge]: https://img.shields.io/crates/v/ump-ng.svg +[crates-url]: https://crates.io/crates/ump-ng +[0bsd-badge]: https://img.shields.io/badge/license-0BSD-blue.svg +[0bsd-url]: https://opensource.org/license/0bsd + ADDED bacon.toml Index: bacon.toml ================================================================== --- /dev/null +++ bacon.toml @@ -0,0 +1,130 @@ +# This is a configuration file for the bacon tool +# +# Bacon repository: https://github.com/Canop/bacon +# Complete help on configuration: https://dystroy.org/bacon/config/ +# You can also check bacon's own bacon.toml file +# as an example: https://github.com/Canop/bacon/blob/main/bacon.toml + +# For information about clippy lints, see: +# https://github.com/rust-lang/rust-clippy/blob/master/README.md + +#default_job = "check" +default_job = "clippy-all-pedantic" + +[jobs.check] +command = ["cargo", "check", "--color", "always"] +need_stdout = false + +[jobs.check-all] +command = ["cargo", "check", "--all-targets", "--color", "always"] +need_stdout = false + +# Run clippy on the default target +[jobs.clippy] +command = [ + "cargo", "clippy", + "--color", "always", +] +need_stdout = false + +# Run clippy on all targets +# To disable some lints, you may change the job this way: +# [jobs.clippy-all] +# command = [ +# "cargo", "clippy", +# "--all-targets", +# "--color", "always", +# "--", +# "-A", "clippy::bool_to_int_with_if", +# "-A", "clippy::collapsible_if", +# "-A", "clippy::derive_partial_eq_without_eq", +# ] +# need_stdout = false +[jobs.clippy-all] +command = [ + "cargo", "clippy", + "--all-targets", + "--color", "always", +] +need_stdout = false + +[jobs.clippy-pedantic] +command = [ + "cargo", "clippy", + "--color", "always", + "--", + "-Wclippy::all", + "-Wclippy::pedantic", + "-Wclippy::nursery", + "-Wclippy::cargo" +] +need_stdout = false + +[jobs.clippy-all-pedantic] +command = [ + "cargo", "clippy", + "--all-targets", + "--color", "always", + "--", + "-Wclippy::all", + "-Wclippy::pedantic", + "-Wclippy::nursery", + "-Wclippy::cargo" +] +need_stdout = false + +# This job lets you run +# - all tests: bacon test +# - a specific test: bacon test -- config::test_default_files +# - the tests of a package: bacon test -- -- -p config +[jobs.test] +command = [ + "cargo", "test", "--color", "always", + "--", "--color", "always", # see https://github.com/Canop/bacon/issues/124 +] +need_stdout = true + +[jobs.doc] +command = ["cargo", "doc", "--color", "always", "--no-deps"] +need_stdout = false + +# If the doc compiles, then it opens in your browser and bacon switches +# to the previous job +[jobs.doc-open] +command = ["cargo", "doc", "--color", "always", "--no-deps", "--open"] +need_stdout = false +on_success = "back" # so that we don't open the browser at each change + +# You can run your application and have the result displayed in bacon, +# *if* it makes sense for this crate. +# Don't forget the `--color always` part or the errors won't be +# properly parsed. +# If your program never stops (eg a server), you may set `background` +# to false to have the cargo run output immediately displayed instead +# of waiting for program's end. +[jobs.run] +command = [ + "cargo", "run", + "--color", "always", + # put launch parameters for your program behind a `--` separator +] +need_stdout = true +allow_warnings = true +background = true + +# This parameterized job runs the example of your choice, as soon +# as the code compiles. +# Call it as +# bacon ex -- my-example +[jobs.ex] +command = ["cargo", "run", "--color", "always", "--example"] +need_stdout = true +allow_warnings = true + +# You may define here keybindings that would be specific to +# a project, for example a shortcut to launch a specific job. +# Shortcuts to internal functions (scrolling, toggling, etc.) +# should go in your personal global prefs.toml file instead. +[keybindings] +# alt-m = "job:my-job" +c = "job:clippy-all" # comment this to have 'c' run clippy on only the default target Index: benches/add_server.rs ================================================================== --- benches/add_server.rs +++ benches/add_server.rs @@ -8,10 +8,11 @@ Die, Add(i32, i32), AddThreaded(i32, i32) } +#[allow(clippy::missing_panics_doc)] pub fn criterion_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("send operation"); let (server, client) = channel::<(), Ops, i32, ()>(); @@ -44,11 +45,11 @@ b.iter(|| { p += 2; q -= 3; let result = client.req(Ops::Add(p, q)).unwrap(); assert_eq!(result, q + p); - }) + }); }); p = 0; q = 0; group.bench_function("add (threaded)", |b| { @@ -55,11 +56,11 @@ b.iter(|| { p += 2; q -= 3; let result = client.req(Ops::AddThreaded(p, q)).unwrap(); assert_eq!(result, q + p); - }) + }); }); let rt = tokio::runtime::Runtime::new().unwrap(); group.bench_function("add (async)", |b| { @@ -67,11 +68,11 @@ let p = 1; let q = 2; let result = client.areq(Ops::Add(p, q)).await.unwrap(); assert_eq!(result, q + p); - }) + }); }); let rt = tokio::runtime::Runtime::new().unwrap(); group.bench_function("add (async, threaded)", |b| { @@ -79,11 +80,11 @@ let p = 1; let q = 2; let result = client.areq(Ops::AddThreaded(p, q)).await.unwrap(); assert_eq!(result, q + p); - }) + }); }); let result = client.req(Ops::Die).unwrap(); assert_eq!(result, 0); Index: examples/many_once.rs ================================================================== --- examples/many_once.rs +++ examples/many_once.rs @@ -33,17 +33,17 @@ println!("Server waiting for message .."); let MsgType::Request(data, rctx) = server.wait().unwrap() else { panic!("Unexpected message operation type"); }; - println!("Server received: '{}'", data); + println!("Server received: '{data}'"); // .. process data from client .. // Reply to client - let reply = format!("Hello, {}!", data); - println!("Server replying '{}'", reply); + let reply = format!("Hello, {data}!"); + println!("Server replying '{reply}'"); rctx.reply(reply).unwrap(); // Increase message counter count += 1; } @@ -54,13 +54,13 @@ for i in 0..nclients { let client_clone = client.clone(); let client_thread = thread::spawn(move || { let name = format!("Client {}", i + 1); let msg = String::from(&name); - println!("{} sending '{}'", name, msg); + println!("{name} sending '{msg}'"); let reply = client_clone.req(msg).unwrap(); - println!("{} received reply '{}' -- done", name, reply); + println!("{name} received reply '{reply}' -- done"); }); join_handles.push(client_thread); } for n in join_handles { Index: examples/simple.rs ================================================================== --- examples/simple.rs +++ examples/simple.rs @@ -8,32 +8,32 @@ let server_thread = thread::spawn(move || { // Wait for data to arrive from a client println!("Server waiting for message .."); match server.wait().unwrap() { - MsgType::Put(_m) => { - panic!("Unexpected Put message") + MsgType::Post(_m) => { + panic!("Unexpected Post message") } MsgType::Request(data, rctx) => { - println!("Server received: '{}'", data); + println!("Server received: '{data}'"); // Process data from client // Reply to client - let reply = format!("Hello, {}!", data); - println!("Server replying '{}'", reply); + let reply = format!("Hello, {data}!"); + println!("Server replying '{reply}'"); rctx.reply(reply).unwrap(); } } println!("Server done"); }); let msg = String::from("Client"); - println!("Client sending '{}'", msg); + println!("Client sending '{msg}'"); let reply = client.req(msg).unwrap(); - println!("Client received reply '{}'", reply); + println!("Client received reply '{reply}'"); println!("Client done"); server_thread.join().unwrap(); } Index: examples/threaded_handler.rs ================================================================== --- examples/threaded_handler.rs +++ examples/threaded_handler.rs @@ -29,17 +29,17 @@ }; // Move the received data and reply context into a thread to allow other // messages to be received while processing this message. thread::spawn(move || { - println!("Server received: '{}'", data); + println!("Server received: '{data}'"); // Process data from client // Reply to client - let reply = format!("Hello, {}!", data); - println!("Server replying '{}'", reply); + let reply = format!("Hello, {data}!"); + println!("Server replying '{reply}'"); rctx.reply(reply).unwrap(); }); // Increase message counter count += 1; @@ -51,13 +51,13 @@ for i in 0..nclients { let client_clone = client.clone(); let client_thread = thread::spawn(move || { let name = format!("Client {}", i + 1); let msg = String::from(&name); - println!("{} sending '{}'", name, msg); + println!("{name} sending '{msg}'"); let reply = client_clone.req(msg).unwrap(); - println!("{} received reply '{}' -- done", name, reply); + println!("{name} received reply '{reply}' -- done"); }); join_handles.push(client_thread); } for n in join_handles { Index: src/client.rs ================================================================== --- src/client.rs +++ src/client.rs @@ -18,14 +18,18 @@ P: 'static + Send, R: 'static + Send, E: 'static + Send { /// Transmit a uni-directional message to the server end-point. + /// + /// # Errors + /// [`Error::ServerDisappeared`] means the [`Server`](super::Server) + /// end-point has been dropped. pub fn post(&self, msg: P) -> Result<(), Error> { self .0 - .push(InnerMsgType::Put(msg)) + .push(InnerMsgType::Post(msg)) .map_err(|_| Error::ServerDisappeared)?; Ok(()) } @@ -42,16 +46,18 @@ /// separate `Client` to each thread. /// /// # Return /// On success the function will return `Ok(msg)`. /// + /// # Errors /// If the linked server object has been released, or is released while the - /// message is in the server's queue, `Err(Error:ServerDisappeared)` will be + /// message is in the server's queue, [`Error::ServerDisappeared`] will be /// returned. /// - /// If the server never replied to the message and the reply context was - /// dropped `Err(Error::NoReply)` will be returned. + /// If the server never replied to the message and the + /// [`ReplyContext`](super::ReplyContext) was dropped [`Error::NoReply`] will + /// be returned. /// /// If an application specific error occurs it will be returned as a /// `Err(Error::App(E))`, where `E` is the error type used when creating the /// [`channel`](crate::channel). pub fn req(&self, out: S) -> Result> { @@ -121,10 +127,14 @@ /// println!("Client received reply '{}'", reply); /// println!("Client done"); /// /// server_thread.join().unwrap(); /// ``` + /// + /// # Errors + /// [`Error::ServerDisappeared`] means the [`Server`](super::Server) + /// end-point has been dropped. pub fn req_async(&self, out: S) -> Result, Error> { let (sctx, wctx) = swctx::mkpair(); self .0 .push(InnerMsgType::Request(out, sctx)) @@ -131,12 +141,16 @@ .map_err(|_| Error::ServerDisappeared)?; Ok(WaitReply(wctx)) } - /// Same as [`Client::req()`] but for use in `async` contexts. - pub async fn areq(&self, out: S) -> Result> { + /// Same as [`Client::req()`], but for use in `async` contexts. + #[allow(clippy::missing_errors_doc)] + pub async fn areq(&self, out: S) -> Result> + where + S: Send + { let (sctx, wctx) = swctx::mkpair(); self .0 .push(InnerMsgType::Request(out, sctx)) @@ -144,12 +158,13 @@ Ok(wctx.wait_async().await?) } /// Create a weak `Client` reference. - pub fn weak(&self) -> WeakClient { - WeakClient(self.0.weak()) + #[must_use] + pub fn weak(&self) -> Weak { + Weak(self.0.weak()) } } impl Clone for Client { /// Clone a client. @@ -159,11 +174,11 @@ /// /// This means that a cloned client can be passed to a new thread/task and /// make new independent calls to the server without any risk of collision /// between clone and the original client object. fn clone(&self) -> Self { - Client(self.0.clone()) + Self(self.0.clone()) } } /// Context used to wait for a server to reply to a request. @@ -171,36 +186,54 @@ impl WaitReply { /// Block and wait for a reply. /// /// For use in non-`async` threads. + /// + /// # Errors + /// [`Error::ServerDisappeared`] means the linked server object has been + /// released. + /// + /// If the [`ReplyContext`](super::ReplyContext) is dropped by the server + /// handler it replies to the message, [`Error::NoReply`] will be returned. + /// + /// If an application specific error occurs it will be returned in + /// [`Error::App`]. pub fn wait(self) -> Result> { Ok(self.0.wait()?) } /// Block and wait for a reply. /// - /// For use in `async` tasks. - pub async fn wait_async(self) -> Result> { + /// Same as [`WaitReply::wait()`], but for use in `async` contexts. + #[allow(clippy::missing_errors_doc)] + pub async fn wait_async(self) -> Result> + where + R: Send, + E: Send + { Ok(self.0.wait_async().await?) } } +/// A weak client reference that can be upgraded to a [`Client`] as long as +/// other `Client` objects till exist. #[repr(transparent)] -pub struct WeakClient( +pub struct Weak( pub(crate) sigq::WeakPusher> ); -impl Clone for WeakClient { +impl Clone for Weak { fn clone(&self) -> Self { Self(self.0.clone()) } } -impl WeakClient { +impl Weak { + #[must_use] pub fn upgrade(&self) -> Option> { self.0.upgrade().map(|x| Client(x)) } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/err.rs ================================================================== --- src/err.rs +++ src/err.rs @@ -10,10 +10,14 @@ /// This happens when clients: /// - attempt to send messages to a server that has been deallocated. /// - have their requests dropped from the serrver's queue because the /// server itself was deallocated. ServerDisappeared, + + /// A message reply could not be completed because the original requestor + /// disappearing. + OriginDisappeared, /// All the client objects have shut down. /// /// This happens when a server attempts to pull the latest node on off the /// queue, but there are no more nodes on the queue and all the associated @@ -30,45 +34,58 @@ /// [`channel`](crate::channel()). App(E) } impl Error { + /// Get application-specific error. + /// + /// Will return `None` is the error is not [`Error::App`]. pub fn into_apperr(self) -> Option { match self { - Error::App(e) => Some(e), + Self::App(e) => Some(e), _ => None } } + + /// Unwrap application-specific error. + /// + /// # Panics + /// If the error is not [`Error::App`] this function will panic. pub fn unwrap_apperr(self) -> E { match self { - Error::App(e) => e, + Self::App(e) => e, _ => panic!("Not an Error::App") } } } -impl std::error::Error for Error {} +impl std::error::Error for Error where E: std::error::Error {} -impl fmt::Display for Error { +impl fmt::Display for Error +where + E: std::error::Error +{ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - Error::ServerDisappeared => write!(f, "Server disappeared"), - Error::ClientsDisappeared => write!(f, "Clients disappeared"), - Error::NoReply => write!(f, "Server didn't reply"), - Error::App(err) => write!(f, "Application error; {:?}", err) + Self::ServerDisappeared => write!(f, "Server disappeared"), + Self::OriginDisappeared => write!(f, "Requestor disappeared"), + Self::ClientsDisappeared => write!(f, "Clients disappeared"), + Self::NoReply => write!(f, "Server didn't reply"), + Self::App(err) => write!(f, "Application error; {err}") } } } impl From> for Error { fn from(err: swctx::Error) -> Self { match err { swctx::Error::Aborted(state) => match state { - RCtxState::Queued => Error::ServerDisappeared, - RCtxState::Active => Error::NoReply + RCtxState::Queued => Self::ServerDisappeared, + RCtxState::Active => Self::NoReply }, - swctx::Error::App(e) => Error::App(e) + swctx::Error::LostWaiter => Self::OriginDisappeared, + swctx::Error::App(e) => Self::App(e) } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -1,10 +1,10 @@ //! Micro Message Pass: Next Generation (ump-ng) is a library for passing //! messages between thread/tasks. It is similar to the `ump` library, but //! with an added uni-directional message passing primitive. //! -//! The primary purpose of ump(-ng) is to create simple RPC like designs, but +//! The primary purpose of ump(-ng) is to create simple RPC-like designs, but //! between threads/tasks within a process rather than between processes over //! networks. //! //! # High-level usage overview //! An application calls [`channel`] to create a linked pair of a [`Server`] @@ -41,12 +41,12 @@ //! let server_thread = thread::spawn(move || { //! // Wait for data to arrive from a client //! loop { //! println!("Server waiting for message .."); //! match server.wait().unwrap() { -//! MsgType::Put(data) => { -//! println!("Server received Put: '{}'", data); +//! MsgType::Post(data) => { +//! println!("Server received Post: '{}'", data); //! } //! MsgType::Request(data, rctx) => { //! println!("Server received Request: '{}'", data); //! //! // Process data from client @@ -122,11 +122,11 @@ mod server; pub use err::Error; pub use crate::{ - client::{Client, WaitReply, WeakClient}, + client::{Client, WaitReply, Weak as WeakClient}, rctx::ReplyContext, server::{MsgType, Server} }; /// Create a pair of linked [`Server`] and [`Client`] objects. @@ -146,12 +146,13 @@ /// The `S` type parameter is the "send" data type that clients will transfer /// to the server. The `R` type parameter is the "receive" data type that /// clients will receive from the server. The `E` type parameter can be used /// to return application specific errors from the server to the client. #[allow(clippy::type_complexity)] +#[must_use] pub fn channel() -> (Server, Client) { let (qpusher, qpuller) = sigq::new(); (Server(qpuller), Client(qpusher)) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/rctx.rs ================================================================== --- src/rctx.rs +++ src/rctx.rs @@ -6,11 +6,11 @@ //! another, where the receiver will block until data has been delivered. use crate::err::Error; #[derive(Clone, Default)] -pub(crate) enum RCtxState { +pub enum RCtxState { #[default] Queued, Active } @@ -37,14 +37,18 @@ /// let reply = client.req(msg).unwrap(); /// assert_eq!(reply, "Hello, Client!"); /// server_thread.join().unwrap(); /// ``` /// + /// # Errors + /// If the originating caller is no longer waiting for a reply (i.e. was + /// dropped) [`Error::OriginDisappeared`] is returned. + /// /// # Semantics /// This call is safe to make after the server context has been released. pub fn reply(self, data: T) -> Result<(), Error> { - self.0.set(data); + self.0.set(data)?; Ok(()) } /// Return an error to originating client. /// This will cause the calling client to return an error. The error passed @@ -79,21 +83,32 @@ /// } /// } /// server_thread.join().unwrap(); /// ``` /// + /// # Errors + /// [`Error::OriginDisappeared`] indicates that the originating caller is no + /// longer waiting for a reply (i.e. it was dropped). + /// /// # Semantics /// This call is safe to make after the server context has been released. pub fn fail(self, err: E) -> Result<(), Error> { - self.0.fail(err); + self.0.fail(err)?; Ok(()) } } -impl From> for ReplyContext { - fn from(sctx: swctx::SetCtx) -> Self { - sctx.set_state(RCtxState::Active); - ReplyContext(sctx) +impl TryFrom> for ReplyContext { + type Error = Error; + + /// Convert a `SetCtx` into a `ReplyContext`. + /// + /// Sets the `SetCtx`'s stat to _Active_. + fn try_from( + sctx: swctx::SetCtx + ) -> Result { + let _ = sctx.set_state(RCtxState::Active); + Ok(Self(sctx)) } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/server.rs ================================================================== --- src/server.rs +++ src/server.rs @@ -1,35 +1,37 @@ use crate::{ err::Error, rctx::{RCtxState, ReplyContext} }; -pub(crate) enum InnerMsgType { - Put(P), +pub enum InnerMsgType { + Post(P), Request(S, swctx::SetCtx) } /// Mesage operation types. pub enum MsgType { /// A uni-directional message pass. - Put(P), + Post(P), /// A message pass that expects a reply. Request(S, ReplyContext) } -impl From> for MsgType { - fn from(val: InnerMsgType) -> MsgType { +impl TryFrom> for MsgType { + type Error = Error; + + fn try_from(val: InnerMsgType) -> Result { match val { - InnerMsgType::Put(msg) => MsgType::Put(msg), + InnerMsgType::Post(msg) => Ok(Self::Post(msg)), InnerMsgType::Request(msg, irctx) => { // Create an application reply context from the reply context in the // queue Implicitly changes state of the reply context from // Queued to Waiting - let rctx = ReplyContext::from(irctx); + let rctx = ReplyContext::try_from(irctx)?; - MsgType::Request(msg, rctx) + Ok(Self::Request(msg, rctx)) } } } } @@ -54,38 +56,46 @@ /// [`Client`](crate::Client). /// /// Returns the message sent by the client and a reply context. The server /// must call [`ReplyContext::reply()`] on the reply context to pass a return /// value to the client. + /// + /// # Errors + /// [`Error::ClientsDisappeared`] indicates that the queue is empty and + /// all the client end-points have been dropped. pub fn wait(&self) -> Result, Error> { let msg = self.0.pop().map_err(|_| Error::ClientsDisappeared)?; - - Ok(msg.into()) + msg.try_into() } /// Take next next message off queue or return `None` is queue is empty. + /// + /// # Errors + /// [`Error::ClientsDisappeared`] indicates that the queue is empty and + /// all the client end-points have been dropped. #[allow(clippy::type_complexity)] pub fn try_pop(&self) -> Result>, Error> { let msg = self.0.try_pop().map_err(|_| Error::ClientsDisappeared)?; if let Some(msg) = msg { - Ok(Some(msg.into())) + Ok(Some(msg.try_into()?)) } else { Ok(None) } } /// Same as [`Server::wait()`], but for use in an `async` context. + #[allow(clippy::missing_errors_doc)] pub async fn async_wait(&self) -> Result, Error> { let msg = self.0.apop().await.map_err(|_| Error::ClientsDisappeared)?; - - Ok(msg.into()) + msg.try_into() } /// Returns a boolean indicating whether the queue is/was empty. This isn't /// really useful unless used in very specific situations. It mostly exists /// for test cases. + #[must_use] pub fn was_empty(&self) -> bool { self.0.was_empty() } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: tests/async_client.rs ================================================================== --- tests/async_client.rs +++ tests/async_client.rs @@ -46,14 +46,13 @@ } else { panic!("Didn't get sum"); } } let result = client.areq(Request::Croak).await.unwrap(); - if let Reply::OkICroaked = result { - } else { + let Reply::OkICroaked = result else { panic!("Didn't get a croak"); - } + }; }); server_thread.join().unwrap(); } Index: tests/stress.rs ================================================================== --- tests/stress.rs +++ tests/stress.rs @@ -15,11 +15,11 @@ let server_thread = thread::spawn(move || { let mut croak = false; while !croak { match server.wait().unwrap() { - MsgType::Put(_) => panic!("Unexpected Put message"), + MsgType::Post(()) => panic!("Unexpected Post message"), MsgType::Request(data, rctx) => match data { Ops::Die => { croak = true; rctx.reply(0).unwrap(); } ADDED tests/wait_disappear.rs Index: tests/wait_disappear.rs ================================================================== --- /dev/null +++ tests/wait_disappear.rs @@ -0,0 +1,45 @@ +use ump_ng::{channel, Error, MsgType}; + +#[test] +fn wait_disappered_on_reply() { + let (server, client) = channel::(); + + // Generate a request that will return a wait context. + let wctx = client.req_async(String::from("hello")); + + // Get the message (and reply context) from server end-point + let MsgType::Request(_msg, rctx) = server.wait().unwrap() else { + panic!("Unexpectedly not MsgType::Request"); + }; + + // nuke the wait context + drop(wctx); + + // Replying should fail, because wctx has been dropped + let Err(Error::OriginDisappeared) = rctx.reply(String::from("ahoy")) else { + panic!("Unexpected error"); + }; +} + +#[test] +fn wait_disappered_on_fail() { + let (server, client) = channel::(); + + // Generate a request that will return a wait context. + let wctx = client.req_async(String::from("hello")); + + // Get the message (and reply context) from server end-point + let MsgType::Request(_msg, rctx) = server.wait().unwrap() else { + panic!("Unexpectedly not MsgType::Request"); + }; + + // nuke the wait context + drop(wctx); + + // Failing should fail, because wctx has been dropped + let Err(Error::OriginDisappeared) = rctx.fail(()) else { + panic!("Unexpected error"); + }; +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -1,14 +1,28 @@ # Change Log +⚠️ indicates a breaking change. + ## [Unreleased] + +[Details](/vdiff?from=ump-ng-0.1.0&to=trunk) ### Added ### Changed +- Add some `Send` bounds to make `Future`s `Send`. +- Update to `swctx` to `0.3.0`, allowing `ReplyContext` to detect if the + originating client has been dropped. +- ⚠️ Require `std::error::Error` bound on application-specific error `E` for + `std::error::Error` implementation on `Error` as well as `fmt::Display` + for `Error`. +- ⚠️ Rename `MsgType::Put` to `MsgType::Post`. + ### Removed + +- Remove superfluous `parking_lot` dependency. --- ## [0.1.0] - 2023-10-02