Index: .efiles ================================================================== --- .efiles +++ .efiles @@ -1,9 +1,15 @@ Cargo.toml README.md -src/*.rs -src/rctx/*.rs +src/err.rs +src/lib.rs +src/server.rs +src/client.rs +src/rctx.rs +src/rctx/err.rs +src/rctx/inner.rs +src/rctx/public.rs tests/*.rs examples/*.rs benches/*.rs www/index.md www/changelog.md Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,18 +1,18 @@ [package] name = "ump" -version = "0.9.0" +version = "0.10.0" authors = ["Jan Danielsson "] edition = "2018" license = "0BSD" categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/ump" description = "Micro message passing library for threads/tasks communication." rust-version = "1.39" -# Can't exclude benches", because the [[bench]] section will fail. +# Can't exclude "benches", because the [[bench]] section will fail. exclude = [ ".efiles", ".fossil-settings", ".fslckout", "examples", @@ -21,15 +21,15 @@ "www" ] [dependencies] parking_lot = { version = "0.12.1" } -sigq = { version = "0.11.0" } +sigq = { version = "0.13.2" } [dev-dependencies] -criterion = { version = "0.3.6", features = ["async_tokio"] } -tokio = { version = "1.21.0", features = ["full"] } +criterion = { version = "0.5.1", features = ["async_tokio"] } +tokio = { version = "1.29.1", features = ["full"] } [[bench]] name = "add_server" harness = false Index: benches/add_server.rs ================================================================== --- benches/add_server.rs +++ benches/add_server.rs @@ -15,13 +15,12 @@ let (server, client) = channel::(); let server_thread = thread::spawn(move || { let mut croak = false; - - while croak == false { - let (data, rctx) = server.wait(); + while !croak { + let (data, rctx) = server.wait().unwrap(); match data { Ops::Die => { croak = true; rctx.reply(0).unwrap(); } Index: examples/cloneclientserver.rs ================================================================== --- examples/cloneclientserver.rs +++ examples/cloneclientserver.rs @@ -23,11 +23,11 @@ fn main() { let (server, client) = channel::(); let client_blueprint = client.clone(); let server_thread = thread::spawn(move || loop { - let (req, rctx) = server.wait(); + let (req, rctx) = server.wait().unwrap(); match req { Request::CloneClient => rctx .reply(Reply::ClientClone(client_blueprint.clone())) .unwrap(), Request::Add(a, b) => rctx.reply(Reply::Sum(a + b)).unwrap(), Index: examples/many_once.rs ================================================================== --- examples/many_once.rs +++ examples/many_once.rs @@ -1,7 +1,6 @@ -use std::env; -use std::thread; +use std::{env, thread}; use ump::channel; // Run several clients, but each client iterates only once. // @@ -30,11 +29,11 @@ // Keep looping until each client as sent a message while count < nclients { // Wait for data to arrive from a client println!("Server waiting for message .."); - let (data, rctx) = server.wait(); + let (data, rctx) = server.wait().unwrap(); println!("Server received: '{}'", data); // .. process data from client .. @@ -54,11 +53,11 @@ let client_clone = client.clone(); let client_thread = thread::spawn(move || { let name = format!("Client {}", i + 1); let msg = String::from(&name); println!("{} sending '{}'", name, msg); - let reply = client_clone.send(String::from(msg)).unwrap(); + let reply = client_clone.send(msg).unwrap(); println!("{} received reply '{}' -- done", name, reply); }); join_handles.push(client_thread); } Index: examples/simple.rs ================================================================== --- examples/simple.rs +++ examples/simple.rs @@ -6,11 +6,11 @@ let (server, client) = channel::(); let server_thread = thread::spawn(move || { // Wait for data to arrive from a client println!("Server waiting for message .."); - let (data, rctx) = server.wait(); + let (data, rctx) = server.wait().unwrap(); println!("Server received: '{}'", data); // Process data from client @@ -22,13 +22,13 @@ println!("Server done"); }); let msg = String::from("Client"); println!("Client sending '{}'", msg); - let reply = client.send(String::from(msg)).unwrap(); + let reply = client.send(msg).unwrap(); println!("Client received reply '{}'", reply); println!("Client done"); server_thread.join().unwrap(); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: examples/threaded_handler.rs ================================================================== --- examples/threaded_handler.rs +++ examples/threaded_handler.rs @@ -1,7 +1,6 @@ -use std::env; -use std::thread; +use std::{env, thread}; use ump::channel; // This is basically the same test as many_once, but the server launches a new // thread to process and reply to client requests. @@ -23,11 +22,11 @@ // Keep looping until each client as sent a message while count < nclients { // Wait for data to arrive from a client println!("Server waiting for message .."); - let (data, rctx) = server.wait(); + let (data, rctx) = server.wait().unwrap(); // Move the received data and reply context into a thread to allow other // messages to be received while processing this message. thread::spawn(move || { println!("Server received: '{}'", data); @@ -51,11 +50,11 @@ let client_clone = client.clone(); let client_thread = thread::spawn(move || { let name = format!("Client {}", i + 1); let msg = String::from(&name); println!("{} sending '{}'", name, msg); - let reply = client_clone.send(String::from(msg)).unwrap(); + let reply = client_clone.send(msg).unwrap(); println!("{} received reply '{}' -- done", name, reply); }); join_handles.push(client_thread); } Index: src/client.rs ================================================================== --- src/client.rs +++ src/client.rs @@ -1,12 +1,6 @@ -use std::sync::Weak; - -use sigq::Queue as NotifyQueue; - -use crate::err::Error; -use crate::rctx::InnerReplyContext; -use crate::server::ServerQueueNode; +use crate::{err::Error, rctx::InnerReplyContext, server::ServerQueueNode}; /// Representation of a clonable client object. /// /// Each instantiation of a `Client` object is itself an isolated client with /// regards to the server context. By cloning a client a new independent @@ -16,11 +10,11 @@ pub struct Client { /// Weak reference to server queue. /// /// The server context holds the only strong reference to the queue. This /// allows the clients to detect when the server has terminated. - pub(crate) srvq: Weak>> + pub(crate) qpusher: sigq::Pusher> } impl Client where R: 'static + Send, @@ -50,55 +44,43 @@ /// /// If an application specific error occurs it will be returned as a /// `Err(Error::App(E))`, where `E` is the error type used when creating the /// [`channel`](crate::channel). pub fn send(&self, out: S) -> Result> { - // Make sure the server still lives; Weak -> Arc - let srvq = match self.srvq.upgrade() { - Some(srvq) => srvq, - None => return Err(Error::ServerDisappeared) - }; - // Create a per-call reply context. // This context could be created when the Client object is being created // and stored in the context, and thus be reused for reach client call. // One side-effect is that some of the state semantics becomes more // complicated. // The central repo has such an implementation checked in, but it seems to // have some more corner cases that aren't properly handled. let rctx = InnerReplyContext::new(); - srvq.push(ServerQueueNode { - msg: out, - reply: rctx.clone() - }); - - // Drop the strong server queue ref immediately so it's not held as a - // strong ref while we're waiting for a reply. - drop(srvq); + self + .qpusher + .push(ServerQueueNode { + msg: out, + reply: rctx.clone() + }) + .map_err(|_| Error::ServerDisappeared)?; let reply = rctx.get()?; + Ok(reply) } /// Same as [`Client::send()`] but for use in `async` contexts. pub async fn asend(&self, out: S) -> Result> { - let srvq = match self.srvq.upgrade() { - Some(srvq) => srvq, - None => return Err(Error::ServerDisappeared) - }; - let rctx = InnerReplyContext::new(); - srvq.push(ServerQueueNode { - msg: out, - reply: rctx.clone() - }); - - // Drop the strong server queue ref immediately so it's not held as a - // strong ref while we're waiting for a reply. - drop(srvq); + self + .qpusher + .push(ServerQueueNode { + msg: out, + reply: rctx.clone() + }) + .map_err(|_| Error::ServerDisappeared)?; let result = rctx.aget().await?; Ok(result) } @@ -114,11 +96,11 @@ /// This means that a cloned client can be passed to a new thread/task and /// make new independent calls to the server without any risk of collision /// between clone and the original client object. fn clone(&self) -> Self { Client { - srvq: Weak::clone(&self.srvq) + qpusher: self.qpusher.clone() } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/err.rs ================================================================== --- src/err.rs +++ src/err.rs @@ -6,10 +6,14 @@ /// The server object has shut down. This happens when clients: /// - attempt to send messages to a server that has been deallocated. /// - have their requests dropped from the serrver's queue because the /// server itself was deallocated. ServerDisappeared, + + /// There are no more nodes to pick up in the queue and all client + /// end-points have been dropped. + ClientsDisappeared, /// The message was delivered to the server, but the reply context was /// released before sending back a reply. NoReply, @@ -46,14 +50,15 @@ } } impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match &*self { + match self { Error::ServerDisappeared => write!(f, "Server disappeared"), + Error::ClientsDisappeared => write!(f, "Clients disappeared"), Error::NoReply => write!(f, "Server didn't reply"), Error::App(err) => write!(f, "Application error; {:?}", err) } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -33,38 +33,36 @@ //! ``` //! use std::thread; //! //! use ump::channel; //! -//! fn main() { -//! let (server, client) = channel::(); -//! -//! let server_thread = thread::spawn(move || { -//! // Wait for data to arrive from a client -//! println!("Server waiting for message .."); -//! let (data, mut rctx) = server.wait(); -//! -//! println!("Server received: '{}'", data); -//! -//! // Process data from client -//! -//! // Reply to client -//! let reply = format!("Hello, {}!", data); -//! println!("Server replying '{}'", reply); -//! rctx.reply(reply); -//! -//! println!("Server done"); -//! }); -//! -//! let msg = String::from("Client"); -//! println!("Client sending '{}'", msg); -//! let reply = client.send(String::from(msg)).unwrap(); -//! println!("Client received reply '{}'", reply); -//! println!("Client done"); -//! -//! server_thread.join().unwrap(); -//! } +//! let (server, client) = channel::(); +//! +//! let server_thread = thread::spawn(move || { +//! // Wait for data to arrive from a client +//! println!("Server waiting for message .."); +//! let (data, mut rctx) = server.wait().unwrap(); +//! +//! println!("Server received: '{}'", data); +//! +//! // Process data from client +//! +//! // Reply to client +//! let reply = format!("Hello, {}!", data); +//! println!("Server replying '{}'", reply); +//! rctx.reply(reply); +//! +//! println!("Server done"); +//! }); +//! +//! let msg = String::from("Client"); +//! println!("Client sending '{}'", msg); +//! let reply = client.send(String::from(msg)).unwrap(); +//! println!("Client received reply '{}'", reply); +//! println!("Client done"); +//! +//! server_thread.join().unwrap(); //! ``` //! In practice the send/reply types will probably be `enum`s used to //! indicate command/return type with associated data. The third type argument //! to [`channel`] is an error type that can be used to explicitly pass errors //! back to the sender. @@ -107,17 +105,11 @@ mod rctx; mod server; pub use err::Error; -use std::sync::Arc; - -use sigq::Queue as NotifyQueue; - -pub use crate::client::Client; -pub use crate::rctx::ReplyContext; -pub use crate::server::Server; +pub use crate::{client::Client, rctx::ReplyContext, server::Server}; /// Create a pair of linked [`Server`] and [`Client`] objects. /// /// The [`Server`] object is used to wait for incoming messages from connected /// clients. Once a message arrives it must reply to it using a @@ -134,19 +126,15 @@ /// The `S` type parameter is the "send" data type that clients will transfer /// to the server. The `R` type parameter is the "receive" data type that /// clients will receive from the server. The `E` type parameter can be used /// to return application specific errors from the server to the client. pub fn channel() -> (Server, Client) { - let srvq = Arc::new(NotifyQueue::new()); - let server = Server { - srvq: Arc::clone(&srvq) - }; - - // Note: The client stores a weak reference to the server object - let client = Client { - srvq: Arc::downgrade(&srvq) - }; + let (qpusher, qpuller) = sigq::new(); + + let server = Server { qpuller }; + + let client = Client { qpusher }; (server, client) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/rctx.rs Index: src/rctx.rs ================================================================== --- /dev/null +++ src/rctx.rs @@ -0,0 +1,18 @@ +//! Allow a thread/task, crossing sync/async boundaries in either direction, to +//! deliver an expected piece of data to another thread/task, with +//! notification. +//! +//! These are simple channels used to deliver data from one endpoint to +//! another, where the receiver will block until data has been delivered. + +mod err; +mod inner; + +pub mod public; + +pub(crate) use err::Error; +pub(crate) use inner::InnerReplyContext; + +pub use public::ReplyContext; + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/rctx/err.rs ================================================================== --- src/rctx/err.rs +++ src/rctx/err.rs @@ -17,11 +17,11 @@ impl std::error::Error for Error {} impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match &*self { + match self { Error::Aborted => write!(f, "Aborted call"), Error::NoReply => write!(f, "Application failed to reply"), Error::App(err) => write!(f, "Application error; {:?}", err) } } Index: src/rctx/inner.rs ================================================================== --- src/rctx/inner.rs +++ src/rctx/inner.rs @@ -1,9 +1,11 @@ -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll, Waker}; +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll, Waker} +}; use parking_lot::{Condvar, Mutex}; use crate::rctx::err::Error; @@ -67,11 +69,11 @@ self.signal_waiters(); } pub(crate) fn signal_waiters(&self) { let mut g = self.taskwaker.lock(); - if let Some(waker) = std::mem::replace(&mut *g, None) { + if let Some(waker) = (*g).take() { waker.wake(); } self.signal.notify_one(); } @@ -149,21 +151,19 @@ /// it means that the server has died. Signal this to the original caller /// waiting for a reply. fn drop(&mut self) { let mut do_signal: bool = false; let mut mg = self.data.lock(); - match *mg { - State::Queued => { - *mg = State::Aborted; - do_signal = true; - } - _ => {} + + if let State::Queued = *mg { + *mg = State::Aborted; + do_signal = true; } drop(mg); if do_signal { let mut g = self.taskwaker.lock(); - if let Some(waker) = std::mem::replace(&mut *g, None) { + if let Some(waker) = (*g).take() { waker.wake(); } self.signal.notify_one(); } } DELETED src/rctx/mod.rs Index: src/rctx/mod.rs ================================================================== --- src/rctx/mod.rs +++ /dev/null @@ -1,18 +0,0 @@ -//! Allow a thread/task, crossing sync/async boundaries in either direction, to -//! deliver an expected piece of data to another thread/task, with -//! notification. -//! -//! These are simple channels used to deliver data from one endpoint to -//! another, where the receiver will block until data has been delivered. - -mod err; -mod inner; - -pub mod public; - -pub(crate) use err::Error; -pub(crate) use inner::InnerReplyContext; - -pub use public::ReplyContext; - -// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/rctx/public.rs ================================================================== --- src/rctx/public.rs +++ src/rctx/public.rs @@ -1,8 +1,6 @@ -use crate::rctx::err::Error; -use crate::rctx::inner::State; -use crate::rctx::InnerReplyContext; +use crate::rctx::{err::Error, inner::State, InnerReplyContext}; /// Public-facing sender part of the `ReplyContext` object. /// /// This is safe to pass to applications which are meant to only be able to put /// a value through the `ReplyContext` channel, but not extract the value from @@ -18,22 +16,20 @@ /// # Example /// ``` /// use std::thread; /// use ump::channel; /// - /// fn main() { - /// let (server, client) = channel::(); - /// let server_thread = thread::spawn(move || { - /// let (data, rctx) = server.wait(); - /// let reply = format!("Hello, {}!", data); - /// rctx.reply(reply).unwrap(); - /// }); - /// let msg = String::from("Client"); - /// let reply = client.send(String::from(msg)).unwrap(); - /// assert_eq!(reply, "Hello, Client!"); - /// server_thread.join().unwrap(); - /// } + /// let (server, client) = channel::(); + /// let server_thread = thread::spawn(move || { + /// let (data, rctx) = server.wait().unwrap(); + /// let reply = format!("Hello, {}!", data); + /// rctx.reply(reply).unwrap(); + /// }); + /// let msg = String::from("Client"); + /// let reply = client.send(String::from(msg)).unwrap(); + /// assert_eq!(reply, "Hello, Client!"); + /// server_thread.join().unwrap(); /// ``` /// /// # Semantics /// This call is safe to make after the server context has been released. pub fn reply(mut self, data: I) -> Result<(), Error> { @@ -57,28 +53,26 @@ /// #[derive(Debug, PartialEq)] /// enum MyError { /// SomeError(String) /// } /// - /// fn main() { - /// let (server, client) = channel::(); - /// let server_thread = thread::spawn(move || { - /// let (_, rctx) = server.wait(); - /// rctx.fail(MyError::SomeError("failed".to_string())).unwrap(); - /// }); - /// let msg = String::from("Client"); - /// let reply = client.send(String::from(msg)); - /// match reply { - /// Err(Error::App(MyError::SomeError(s))) => { - /// assert_eq!(s, "failed"); - /// } - /// _ => { - /// panic!("Unexpected return value"); - /// } + /// let (server, client) = channel::(); + /// let server_thread = thread::spawn(move || { + /// let (_, rctx) = server.wait().unwrap(); + /// rctx.fail(MyError::SomeError("failed".to_string())).unwrap(); + /// }); + /// let msg = String::from("Client"); + /// let reply = client.send(String::from(msg)); + /// match reply { + /// Err(Error::App(MyError::SomeError(s))) => { + /// assert_eq!(s, "failed"); + /// } + /// _ => { + /// panic!("Unexpected return value"); /// } - /// server_thread.join().unwrap(); /// } + /// server_thread.join().unwrap(); /// ``` /// /// # Semantics /// This call is safe to make after the server context has been released. pub fn fail(mut self, err: E) -> Result<(), Error> { @@ -92,24 +86,22 @@ impl Drop for ReplyContext { /// If the reply context is dropped while still waiting for a reply then /// report back to the caller that it should expect no reply. fn drop(&mut self) { - if self.did_handover == false { + if !self.did_handover { let mut do_signal: bool = false; let mut mg = self.inner.data.lock(); - match *mg { - State::Waiting => { - *mg = State::NoReply; - do_signal = true; - } - _ => {} + + if let State::Waiting = *mg { + *mg = State::NoReply; + do_signal = true; } drop(mg); if do_signal { let mut g = self.inner.taskwaker.lock(); - if let Some(waker) = std::mem::replace(&mut *g, None) { + if let Some(waker) = (*g).take() { waker.wake(); } self.inner.signal.notify_one(); } Index: src/server.rs ================================================================== --- src/server.rs +++ src/server.rs @@ -1,10 +1,8 @@ -use std::sync::Arc; - -use sigq::Queue as NotifyQueue; - use crate::rctx::{InnerReplyContext, ReplyContext}; + +use crate::err::Error; pub(crate) struct ServerQueueNode { /// Raw message being sent from the client to the server. pub(crate) msg: S, @@ -16,11 +14,11 @@ /// /// Each instantiation of a [`Server`] object represents an end-point which /// will be used to receive messages from connected [`Client`](crate::Client) /// objects. pub struct Server { - pub(crate) srvq: Arc>> + pub(crate) qpuller: sigq::Puller> } impl Server where S: 'static + Send, @@ -31,41 +29,45 @@ /// [`Client`](crate::Client). /// /// Returns the message sent by the client and a reply context. The server /// must call [`ReplyContext::reply()`] on the reply context to pass a return /// value to the client. - pub fn wait(&self) -> (S, ReplyContext) { - let node = self.srvq.pop(); + pub fn wait(&self) -> Result<(S, ReplyContext), Error> { + let node = self.qpuller.pop().map_err(|_| Error::ClientsDisappeared)?; // Extract the data from the node let msg = node.msg; // Create an application reply context from the reply context in the queue // Implicitly changes state of the reply context from Queued to Waiting let rctx = ReplyContext::from(node.reply); - (msg, rctx) + Ok((msg, rctx)) } /// Same as [`Server::wait()`], but for use in an `async` context. - pub async fn async_wait(&self) -> (S, ReplyContext) { - let node = self.srvq.apop().await; + pub async fn async_wait(&self) -> Result<(S, ReplyContext), Error> { + let node = self + .qpuller + .apop() + .await + .map_err(|_| Error::ClientsDisappeared)?; // Extract the data from the node let msg = node.msg; // Create an application reply context from the reply context in the queue // Implicitly changes state of the reply context from Queued to Waiting let rctx = ReplyContext::from(node.reply); - (msg, rctx) + Ok((msg, rctx)) } /// Returns a boolean indicating whether the queue is/was empty. This isn't /// really useful unless used in very specific situations. It mostly exists /// for test cases. pub fn was_empty(&self) -> bool { - self.srvq.was_empty() + self.qpuller.was_empty() } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: tests/async_client.rs ================================================================== --- tests/async_client.rs +++ tests/async_client.rs @@ -19,11 +19,11 @@ let niterations = 256; let (server, client) = channel::(); let server_thread = thread::spawn(move || loop { - let (req, rctx) = server.wait(); + let (req, rctx) = server.wait().unwrap(); match req { Request::Add(a, b) => rctx.reply(Reply::Sum(a + b)).unwrap(), Request::Croak => { rctx.reply(Reply::OkICroaked).unwrap(); break; Index: tests/fail.rs ================================================================== --- tests/fail.rs +++ tests/fail.rs @@ -12,17 +12,17 @@ fn sync_expect_noreply() { let (server, client) = channel::(); let server_thread = thread::spawn(move || { // Wait for data to arrive from a client - let (_, rctx) = server.wait(); + let (_, rctx) = server.wait().unwrap(); rctx.fail(MyError::SomeError("failed".to_string())).unwrap(); }); let msg = String::from("Client"); - let reply = client.send(String::from(msg)); + let reply = client.send(msg); match reply { Err(Error::App(MyError::SomeError(s))) => { assert_eq!(s, "failed"); } _ => { @@ -40,11 +40,11 @@ let (server, client) = channel::(); let server_thread = thread::spawn(move || { // Wait for data to arrive from a client - let (_, rctx) = server.wait(); + let (_, rctx) = server.wait().unwrap(); rctx.fail(MyError::SomeError("failed".to_string())).unwrap(); }); tokrt.block_on(async { Index: tests/noreply.rs ================================================================== --- tests/noreply.rs +++ tests/noreply.rs @@ -7,18 +7,18 @@ fn sync_expect_noreply() { let (server, client) = channel::(); let server_thread = thread::spawn(move || { // Wait for data to arrive from a client - let (_, rctx) = server.wait(); + let (_, rctx) = server.wait().unwrap(); // Don't do this. drop(rctx); }); let msg = String::from("Client"); - let reply = client.send(String::from(msg)); + let reply = client.send(msg); match reply { Err(Error::NoReply) => { // This is the expected error } _ => { @@ -36,11 +36,11 @@ let (server, client) = channel::(); let server_thread = thread::spawn(move || { // Wait for data to arrive from a client - let (_, rctx) = server.wait(); + let (_, rctx) = server.wait().unwrap(); // Don't do this. drop(rctx); }); Index: tests/queue_cleanup.rs ================================================================== --- tests/queue_cleanup.rs +++ tests/queue_cleanup.rs @@ -14,11 +14,11 @@ thread::sleep(one_second); drop(server); }); let msg = String::from("Client"); - let reply = client.send(String::from(msg)); + let reply = client.send(msg); match reply { Err(Error::ServerDisappeared) => { // This is the expected error } _ => { Index: tests/stress.rs ================================================================== --- tests/stress.rs +++ tests/stress.rs @@ -13,12 +13,12 @@ let (server, client) = channel::(); let server_thread = thread::spawn(move || { let mut croak = false; - while croak == false { - let (data, rctx) = server.wait(); + while !croak { + let (data, rctx) = server.wait().unwrap(); match data { Ops::Die => { croak = true; rctx.reply(0).unwrap(); } @@ -56,11 +56,11 @@ let server_thread = thread::spawn(move || { let mut count = 0; let mut handles = Vec::new(); // +1 because we want to wait for the croak message as well while count < niterations + 1 { - let (data, rctx) = server.wait(); + let (data, rctx) = server.wait().unwrap(); let h = thread::spawn(move || match data { Ops::Die => { rctx.reply(0).unwrap(); } Ops::Add(a, b) => { Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -6,10 +6,25 @@ ### Changed ### Removed + +## [0.10.0] - 2023-07-26 + +### Added + +- Server's receive methods will fail with `Error::ClientsDisappeared` if all + the associated Client objects have been dropped. + +### Changed + +- Runtime dependencies: + - Updated `sigq` to `0.13.2`. +- Development dependencies: + - Updated `criterion` to `0.5.1` + ## [0.9.0] - 2022-09-09 ### Added