Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From ump-0.9.0 To ump-0.10.0
2023-07-27
| ||
01:31 | Use sigq 0.13.3 to get puller bugfix. check-in: 3dece6432a user: jan tags: trunk | |
2023-07-26
| ||
21:01 | Release maintenance. check-in: 95345f9568 user: jan tags: trunk, ump-0.10.0 | |
20:55 | Typo. check-in: 0485b4dd20 user: jan tags: trunk | |
2023-07-25
| ||
02:45 | Update to sigq 0.13.1 and make server fail to wait for new messages if there are none to pickup and all clients have been dropped. check-in: 8877adb2d3 user: jan tags: trunk | |
2022-09-09
| ||
18:17 | 0.9.0 release preparation. check-in: 71b2832c90 user: jan tags: trunk, ump-0.9.0 | |
17:55 | Hello version 0.9.0. check-in: 0ac544acc3 user: jan tags: trunk | |
Changes to .efiles.
1 2 | Cargo.toml README.md | | > > > > > | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | Cargo.toml README.md src/err.rs src/lib.rs src/server.rs src/client.rs src/rctx.rs src/rctx/err.rs src/rctx/inner.rs src/rctx/public.rs tests/*.rs examples/*.rs benches/*.rs www/index.md www/changelog.md |
Changes to Cargo.toml.
1 2 | [package] name = "ump" | | | | | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | [package] name = "ump" version = "0.10.0" authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"] edition = "2018" license = "0BSD" categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/ump" description = "Micro message passing library for threads/tasks communication." rust-version = "1.39" # Can't exclude "benches", because the [[bench]] section will fail. exclude = [ ".efiles", ".fossil-settings", ".fslckout", "examples", "rustfmt.toml", "tests", "www" ] [dependencies] parking_lot = { version = "0.12.1" } sigq = { version = "0.13.2" } [dev-dependencies] criterion = { version = "0.5.1", features = ["async_tokio"] } tokio = { version = "1.29.1", features = ["full"] } [[bench]] name = "add_server" harness = false |
Changes to benches/add_server.rs.
︙ | ︙ | |||
13 14 15 16 17 18 19 | pub fn criterion_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("send operation"); let (server, client) = channel::<Ops, i32, ()>(); let server_thread = thread::spawn(move || { let mut croak = false; | | < | | 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | pub fn criterion_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("send operation"); let (server, client) = channel::<Ops, i32, ()>(); let server_thread = thread::spawn(move || { let mut croak = false; while !croak { let (data, rctx) = server.wait().unwrap(); match data { Ops::Die => { croak = true; rctx.reply(0).unwrap(); } Ops::Add(a, b) => rctx.reply(a + b).unwrap(), Ops::AddThreaded(a, b) => { |
︙ | ︙ |
Changes to examples/cloneclientserver.rs.
︙ | ︙ | |||
21 22 23 24 25 26 27 | } fn main() { let (server, client) = channel::<Request, Reply, ()>(); let client_blueprint = client.clone(); let server_thread = thread::spawn(move || loop { | | | 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | } fn main() { let (server, client) = channel::<Request, Reply, ()>(); let client_blueprint = client.clone(); let server_thread = thread::spawn(move || loop { let (req, rctx) = server.wait().unwrap(); match req { Request::CloneClient => rctx .reply(Reply::ClientClone(client_blueprint.clone())) .unwrap(), Request::Add(a, b) => rctx.reply(Reply::Sum(a + b)).unwrap(), Request::Croak => { rctx.reply(Reply::OkICroaked).unwrap(); |
︙ | ︙ |
Changes to examples/many_once.rs.
|
| | < | 1 2 3 4 5 6 7 8 | use std::{env, thread}; use ump::channel; // Run several clients, but each client iterates only once. // // - Get number of requested clients from command line // - Start a server on a thread |
︙ | ︙ | |||
28 29 30 31 32 33 34 | let server_thread = thread::spawn(move || { let mut count = 0; // Keep looping until each client as sent a message while count < nclients { // Wait for data to arrive from a client println!("Server waiting for message .."); | | | 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | let server_thread = thread::spawn(move || { let mut count = 0; // Keep looping until each client as sent a message while count < nclients { // Wait for data to arrive from a client println!("Server waiting for message .."); let (data, rctx) = server.wait().unwrap(); println!("Server received: '{}'", data); // .. process data from client .. // Reply to client let reply = format!("Hello, {}!", data); |
︙ | ︙ | |||
52 53 54 55 56 57 58 | let mut join_handles = Vec::new(); for i in 0..nclients { let client_clone = client.clone(); let client_thread = thread::spawn(move || { let name = format!("Client {}", i + 1); let msg = String::from(&name); println!("{} sending '{}'", name, msg); | | | 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 | let mut join_handles = Vec::new(); for i in 0..nclients { let client_clone = client.clone(); let client_thread = thread::spawn(move || { let name = format!("Client {}", i + 1); let msg = String::from(&name); println!("{} sending '{}'", name, msg); let reply = client_clone.send(msg).unwrap(); println!("{} received reply '{}' -- done", name, reply); }); join_handles.push(client_thread); } for n in join_handles { n.join().unwrap(); } server_thread.join().unwrap(); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to examples/simple.rs.
1 2 3 4 5 6 7 8 9 10 | use std::thread; use ump::channel; fn main() { let (server, client) = channel::<String, String, ()>(); let server_thread = thread::spawn(move || { // Wait for data to arrive from a client println!("Server waiting for message .."); | | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | use std::thread; use ump::channel; fn main() { let (server, client) = channel::<String, String, ()>(); let server_thread = thread::spawn(move || { // Wait for data to arrive from a client println!("Server waiting for message .."); let (data, rctx) = server.wait().unwrap(); println!("Server received: '{}'", data); // Process data from client // Reply to client let reply = format!("Hello, {}!", data); println!("Server replying '{}'", reply); rctx.reply(reply).unwrap(); println!("Server done"); }); let msg = String::from("Client"); println!("Client sending '{}'", msg); let reply = client.send(msg).unwrap(); println!("Client received reply '{}'", reply); println!("Client done"); server_thread.join().unwrap(); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to examples/threaded_handler.rs.
|
| | < | 1 2 3 4 5 6 7 8 | use std::{env, thread}; use ump::channel; // This is basically the same test as many_once, but the server launches a new // thread to process and reply to client requests. fn main() { // Get number of client threads to kick off. Default to two. |
︙ | ︙ | |||
21 22 23 24 25 26 27 | let server_thread = thread::spawn(move || { let mut count = 0; // Keep looping until each client as sent a message while count < nclients { // Wait for data to arrive from a client println!("Server waiting for message .."); | | | 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | let server_thread = thread::spawn(move || { let mut count = 0; // Keep looping until each client as sent a message while count < nclients { // Wait for data to arrive from a client println!("Server waiting for message .."); let (data, rctx) = server.wait().unwrap(); // Move the received data and reply context into a thread to allow other // messages to be received while processing this message. thread::spawn(move || { println!("Server received: '{}'", data); // Process data from client |
︙ | ︙ | |||
49 50 51 52 53 54 55 | let mut join_handles = Vec::new(); for i in 0..nclients { let client_clone = client.clone(); let client_thread = thread::spawn(move || { let name = format!("Client {}", i + 1); let msg = String::from(&name); println!("{} sending '{}'", name, msg); | | | 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | let mut join_handles = Vec::new(); for i in 0..nclients { let client_clone = client.clone(); let client_thread = thread::spawn(move || { let name = format!("Client {}", i + 1); let msg = String::from(&name); println!("{} sending '{}'", name, msg); let reply = client_clone.send(msg).unwrap(); println!("{} received reply '{}' -- done", name, reply); }); join_handles.push(client_thread); } for n in join_handles { n.join().unwrap(); } server_thread.join().unwrap(); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/client.rs.
|
| < < < < < | < | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | use crate::{err::Error, rctx::InnerReplyContext, server::ServerQueueNode}; /// Representation of a clonable client object. /// /// Each instantiation of a `Client` object is itself an isolated client with /// regards to the server context. By cloning a client a new independent /// client is created. ("Independent" here meaning that it is still tied to /// the same server object, but the new client can be passed to a separate /// thread and can independently make calls to the server). pub struct Client<S, R, E> { /// Weak reference to server queue. /// /// The server context holds the only strong reference to the queue. This /// allows the clients to detect when the server has terminated. pub(crate) qpusher: sigq::Pusher<ServerQueueNode<S, R, E>> } impl<S, R, E> Client<S, R, E> where R: 'static + Send, E: 'static + Send { |
︙ | ︙ | |||
48 49 50 51 52 53 54 | /// If the server never replied to the message and the reply context was /// dropped `Err(Error::NoReply)` will be returned. /// /// If an application specific error occurs it will be returned as a /// `Err(Error::App(E))`, where `E` is the error type used when creating the /// [`channel`](crate::channel). pub fn send(&self, out: S) -> Result<R, Error<E>> { | < < < < < < > > | | | | | < < < > < < < < < > > | | | | | < < < | | 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 | /// If the server never replied to the message and the reply context was /// dropped `Err(Error::NoReply)` will be returned. /// /// If an application specific error occurs it will be returned as a /// `Err(Error::App(E))`, where `E` is the error type used when creating the /// [`channel`](crate::channel). pub fn send(&self, out: S) -> Result<R, Error<E>> { // Create a per-call reply context. // This context could be created when the Client object is being created // and stored in the context, and thus be reused for reach client call. // One side-effect is that some of the state semantics becomes more // complicated. // The central repo has such an implementation checked in, but it seems to // have some more corner cases that aren't properly handled. let rctx = InnerReplyContext::new(); self .qpusher .push(ServerQueueNode { msg: out, reply: rctx.clone() }) .map_err(|_| Error::ServerDisappeared)?; let reply = rctx.get()?; Ok(reply) } /// Same as [`Client::send()`] but for use in `async` contexts. pub async fn asend(&self, out: S) -> Result<R, Error<E>> { let rctx = InnerReplyContext::new(); self .qpusher .push(ServerQueueNode { msg: out, reply: rctx.clone() }) .map_err(|_| Error::ServerDisappeared)?; let result = rctx.aget().await?; Ok(result) } } impl<S, R, E> Clone for Client<S, R, E> { /// Clone a client. /// /// When a client is cloned the new object will be linked to the same server, /// but in all other respects the clone is a completely independent client. /// /// This means that a cloned client can be passed to a new thread/task and /// make new independent calls to the server without any risk of collision /// between clone and the original client object. fn clone(&self) -> Self { Client { qpusher: self.qpusher.clone() } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/err.rs.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | use std::fmt; /// Module-specific error codes. #[derive(Debug)] pub enum Error<E> { /// The server object has shut down. This happens when clients: /// - attempt to send messages to a server that has been deallocated. /// - have their requests dropped from the serrver's queue because the /// server itself was deallocated. ServerDisappeared, /// The message was delivered to the server, but the reply context was /// released before sending back a reply. NoReply, /// Application-specific error. /// The `E` type is typically declared as the third generic parameter to | > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | use std::fmt; /// Module-specific error codes. #[derive(Debug)] pub enum Error<E> { /// The server object has shut down. This happens when clients: /// - attempt to send messages to a server that has been deallocated. /// - have their requests dropped from the serrver's queue because the /// server itself was deallocated. ServerDisappeared, /// There are no more nodes to pick up in the queue and all client /// end-points have been dropped. ClientsDisappeared, /// The message was delivered to the server, but the reply context was /// released before sending back a reply. NoReply, /// Application-specific error. /// The `E` type is typically declared as the third generic parameter to |
︙ | ︙ | |||
44 45 46 47 48 49 50 | crate::rctx::Error::App(e) => Error::App(e) } } } impl<E: fmt::Debug> fmt::Display for Error<E> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | | > | 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | crate::rctx::Error::App(e) => Error::App(e) } } } impl<E: fmt::Debug> fmt::Display for Error<E> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Error::ServerDisappeared => write!(f, "Server disappeared"), Error::ClientsDisappeared => write!(f, "Clients disappeared"), Error::NoReply => write!(f, "Server didn't reply"), Error::App(err) => write!(f, "Application error; {:?}", err) } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/lib.rs.
︙ | ︙ | |||
31 32 33 34 35 36 37 | //! //! # Example //! ``` //! use std::thread; //! //! use ump::channel; //! | < | | | | | | | | | | | | | | | | | | | < | 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 | //! //! # Example //! ``` //! use std::thread; //! //! use ump::channel; //! //! let (server, client) = channel::<String, String, ()>(); //! //! let server_thread = thread::spawn(move || { //! // Wait for data to arrive from a client //! println!("Server waiting for message .."); //! let (data, mut rctx) = server.wait().unwrap(); //! //! println!("Server received: '{}'", data); //! //! // Process data from client //! //! // Reply to client //! let reply = format!("Hello, {}!", data); //! println!("Server replying '{}'", reply); //! rctx.reply(reply); //! //! println!("Server done"); //! }); //! //! let msg = String::from("Client"); //! println!("Client sending '{}'", msg); //! let reply = client.send(String::from(msg)).unwrap(); //! println!("Client received reply '{}'", reply); //! println!("Client done"); //! //! server_thread.join().unwrap(); //! ``` //! In practice the send/reply types will probably be `enum`s used to //! indicate command/return type with associated data. The third type argument //! to [`channel`] is an error type that can be used to explicitly pass errors //! back to the sender. //! //! # Semantics |
︙ | ︙ | |||
105 106 107 108 109 110 111 | mod client; mod err; mod rctx; mod server; pub use err::Error; | < | < < < < < > | | < < < | < < | 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | mod client; mod err; mod rctx; mod server; pub use err::Error; pub use crate::{client::Client, rctx::ReplyContext, server::Server}; /// Create a pair of linked [`Server`] and [`Client`] objects. /// /// The [`Server`] object is used to wait for incoming messages from connected /// clients. Once a message arrives it must reply to it using a /// [`ReplyContext`] that's returned to it in the same call that returned the /// message. /// /// The [`Client`] object can be used to send messages to the [`Server`]. The /// [`Client::send()`] call will not return until the server has replied. /// /// Clients can be [cloned](Client::clone()); each clone will create a /// new client object that is connected to the same server object, but is /// completely independent of the original client. /// /// The `S` type parameter is the "send" data type that clients will transfer /// to the server. The `R` type parameter is the "receive" data type that /// clients will receive from the server. The `E` type parameter can be used /// to return application specific errors from the server to the client. pub fn channel<S, R, E>() -> (Server<S, R, E>, Client<S, R, E>) { let (qpusher, qpuller) = sigq::new(); let server = Server { qpuller }; let client = Client { qpusher }; (server, client) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Added src/rctx.rs.
> > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | //! Allow a thread/task, crossing sync/async boundaries in either direction, to //! deliver an expected piece of data to another thread/task, with //! notification. //! //! These are simple channels used to deliver data from one endpoint to //! another, where the receiver will block until data has been delivered. mod err; mod inner; pub mod public; pub(crate) use err::Error; pub(crate) use inner::InnerReplyContext; pub use public::ReplyContext; // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/rctx/err.rs.
︙ | ︙ | |||
15 16 17 18 19 20 21 | App(E) } impl<E: fmt::Debug> std::error::Error for Error<E> {} impl<E: fmt::Debug> fmt::Display for Error<E> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | | | 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | App(E) } impl<E: fmt::Debug> std::error::Error for Error<E> {} impl<E: fmt::Debug> fmt::Display for Error<E> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Error::Aborted => write!(f, "Aborted call"), Error::NoReply => write!(f, "Application failed to reply"), Error::App(err) => write!(f, "Application error; {:?}", err) } } } |
︙ | ︙ |
Changes to src/rctx/inner.rs.
|
| | > | | | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 | use std::{ future::Future, pin::Pin, sync::Arc, task::{Context, Poll, Waker} }; use parking_lot::{Condvar, Mutex}; use crate::rctx::err::Error; pub(crate) enum State<I, E> { /// (Still) in queue, waiting to be picked up by the server. |
︙ | ︙ | |||
65 66 67 68 69 70 71 | drop(mg); self.signal_waiters(); } pub(crate) fn signal_waiters(&self) { let mut g = self.taskwaker.lock(); | | | 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | drop(mg); self.signal_waiters(); } pub(crate) fn signal_waiters(&self) { let mut g = self.taskwaker.lock(); if let Some(waker) = (*g).take() { waker.wake(); } self.signal.notify_one(); } /// Retreive reply. If a reply has not arrived yet then enter a loop that |
︙ | ︙ | |||
147 148 149 150 151 152 153 | impl<I, E> Drop for InnerReplyContext<I, E> { /// If the reply context never left the server queue before being destroyed /// it means that the server has died. Signal this to the original caller /// waiting for a reply. fn drop(&mut self) { let mut do_signal: bool = false; let mut mg = self.data.lock(); | | | | | < < | | 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | impl<I, E> Drop for InnerReplyContext<I, E> { /// If the reply context never left the server queue before being destroyed /// it means that the server has died. Signal this to the original caller /// waiting for a reply. fn drop(&mut self) { let mut do_signal: bool = false; let mut mg = self.data.lock(); if let State::Queued = *mg { *mg = State::Aborted; do_signal = true; } drop(mg); if do_signal { let mut g = self.taskwaker.lock(); if let Some(waker) = (*g).take() { waker.wake(); } self.signal.notify_one(); } } } |
︙ | ︙ |
Deleted src/rctx/mod.rs.
|
| < < < < < < < < < < < < < < < < < < |
Changes to src/rctx/public.rs.
|
| | < < < | | | | | | | | | | < | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | use crate::rctx::{err::Error, inner::State, InnerReplyContext}; /// Public-facing sender part of the `ReplyContext` object. /// /// This is safe to pass to applications which are meant to only be able to put /// a value through the `ReplyContext` channel, but not extract the value from /// it. pub struct ReplyContext<I, E> { inner: InnerReplyContext<I, E>, did_handover: bool } impl<I: 'static + Send, E> ReplyContext<I, E> { /// Send a reply back to originating client. /// /// # Example /// ``` /// use std::thread; /// use ump::channel; /// /// let (server, client) = channel::<String, String, ()>(); /// let server_thread = thread::spawn(move || { /// let (data, rctx) = server.wait().unwrap(); /// let reply = format!("Hello, {}!", data); /// rctx.reply(reply).unwrap(); /// }); /// let msg = String::from("Client"); /// let reply = client.send(String::from(msg)).unwrap(); /// assert_eq!(reply, "Hello, Client!"); /// server_thread.join().unwrap(); /// ``` /// /// # Semantics /// This call is safe to make after the server context has been released. pub fn reply(mut self, data: I) -> Result<(), Error<E>> { self.inner.put(data); |
︙ | ︙ | |||
55 56 57 58 59 60 61 | /// use ump::{channel, Error}; /// /// #[derive(Debug, PartialEq)] /// enum MyError { /// SomeError(String) /// } /// | < | | | | | | | | | | | | | | | | < | | | | | < < | | 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | /// use ump::{channel, Error}; /// /// #[derive(Debug, PartialEq)] /// enum MyError { /// SomeError(String) /// } /// /// let (server, client) = channel::<String, String, MyError>(); /// let server_thread = thread::spawn(move || { /// let (_, rctx) = server.wait().unwrap(); /// rctx.fail(MyError::SomeError("failed".to_string())).unwrap(); /// }); /// let msg = String::from("Client"); /// let reply = client.send(String::from(msg)); /// match reply { /// Err(Error::App(MyError::SomeError(s))) => { /// assert_eq!(s, "failed"); /// } /// _ => { /// panic!("Unexpected return value"); /// } /// } /// server_thread.join().unwrap(); /// ``` /// /// # Semantics /// This call is safe to make after the server context has been released. pub fn fail(mut self, err: E) -> Result<(), Error<E>> { self.inner.fail(err); self.did_handover = true; Ok(()) } } impl<I, E> Drop for ReplyContext<I, E> { /// If the reply context is dropped while still waiting for a reply then /// report back to the caller that it should expect no reply. fn drop(&mut self) { if !self.did_handover { let mut do_signal: bool = false; let mut mg = self.inner.data.lock(); if let State::Waiting = *mg { *mg = State::NoReply; do_signal = true; } drop(mg); if do_signal { let mut g = self.inner.taskwaker.lock(); if let Some(waker) = (*g).take() { waker.wake(); } self.inner.signal.notify_one(); } } } |
︙ | ︙ |
Changes to src/server.rs.
|
| < | < | | | | | | | > > > > | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 | use crate::rctx::{InnerReplyContext, ReplyContext}; use crate::err::Error; pub(crate) struct ServerQueueNode<S, R, E> { /// Raw message being sent from the client to the server. pub(crate) msg: S, /// Keep track of data needed to share reply data. pub(crate) reply: InnerReplyContext<R, E> } /// Representation of a server object. /// /// Each instantiation of a [`Server`] object represents an end-point which /// will be used to receive messages from connected [`Client`](crate::Client) /// objects. pub struct Server<S, R, E> { pub(crate) qpuller: sigq::Puller<ServerQueueNode<S, R, E>> } impl<S, R, E> Server<S, R, E> where S: 'static + Send, R: 'static + Send, E: 'static + Send { /// Block and wait, indefinitely, for an incoming message from a /// [`Client`](crate::Client). /// /// Returns the message sent by the client and a reply context. The server /// must call [`ReplyContext::reply()`] on the reply context to pass a return /// value to the client. pub fn wait(&self) -> Result<(S, ReplyContext<R, E>), Error<E>> { let node = self.qpuller.pop().map_err(|_| Error::ClientsDisappeared)?; // Extract the data from the node let msg = node.msg; // Create an application reply context from the reply context in the queue // Implicitly changes state of the reply context from Queued to Waiting let rctx = ReplyContext::from(node.reply); Ok((msg, rctx)) } /// Same as [`Server::wait()`], but for use in an `async` context. pub async fn async_wait(&self) -> Result<(S, ReplyContext<R, E>), Error<E>> { let node = self .qpuller .apop() .await .map_err(|_| Error::ClientsDisappeared)?; // Extract the data from the node let msg = node.msg; // Create an application reply context from the reply context in the queue // Implicitly changes state of the reply context from Queued to Waiting let rctx = ReplyContext::from(node.reply); Ok((msg, rctx)) } /// Returns a boolean indicating whether the queue is/was empty. This isn't /// really useful unless used in very specific situations. It mostly exists /// for test cases. pub fn was_empty(&self) -> bool { self.qpuller.was_empty() } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to tests/async_client.rs.
︙ | ︙ | |||
17 18 19 20 21 22 23 | let tokrt = tokio::runtime::Runtime::new().unwrap(); let niterations = 256; let (server, client) = channel::<Request, Reply, ()>(); let server_thread = thread::spawn(move || loop { | | | 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | let tokrt = tokio::runtime::Runtime::new().unwrap(); let niterations = 256; let (server, client) = channel::<Request, Reply, ()>(); let server_thread = thread::spawn(move || loop { let (req, rctx) = server.wait().unwrap(); match req { Request::Add(a, b) => rctx.reply(Reply::Sum(a + b)).unwrap(), Request::Croak => { rctx.reply(Reply::OkICroaked).unwrap(); break; } } |
︙ | ︙ |
Changes to tests/fail.rs.
︙ | ︙ | |||
10 11 12 13 14 15 16 | #[test] fn sync_expect_noreply() { let (server, client) = channel::<String, String, MyError>(); let server_thread = thread::spawn(move || { // Wait for data to arrive from a client | | | | 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | #[test] fn sync_expect_noreply() { let (server, client) = channel::<String, String, MyError>(); let server_thread = thread::spawn(move || { // Wait for data to arrive from a client let (_, rctx) = server.wait().unwrap(); rctx.fail(MyError::SomeError("failed".to_string())).unwrap(); }); let msg = String::from("Client"); let reply = client.send(msg); match reply { Err(Error::App(MyError::SomeError(s))) => { assert_eq!(s, "failed"); } _ => { panic!("Unexpected return value"); } |
︙ | ︙ | |||
38 39 40 41 42 43 44 | fn async_expect_noreply() { let tokrt = tokio::runtime::Runtime::new().unwrap(); let (server, client) = channel::<String, String, MyError>(); let server_thread = thread::spawn(move || { // Wait for data to arrive from a client | | | 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | fn async_expect_noreply() { let tokrt = tokio::runtime::Runtime::new().unwrap(); let (server, client) = channel::<String, String, MyError>(); let server_thread = thread::spawn(move || { // Wait for data to arrive from a client let (_, rctx) = server.wait().unwrap(); rctx.fail(MyError::SomeError("failed".to_string())).unwrap(); }); tokrt.block_on(async { let msg = String::from("Client"); let reply = client.asend(msg).await; |
︙ | ︙ |
Changes to tests/noreply.rs.
1 2 3 4 5 6 7 8 9 10 11 | // Make sure that the ReplyContext aborts on Drop of no reply was sent. use std::thread; use ump::{channel, Error}; #[test] fn sync_expect_noreply() { let (server, client) = channel::<String, String, ()>(); let server_thread = thread::spawn(move || { // Wait for data to arrive from a client | | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | // Make sure that the ReplyContext aborts on Drop of no reply was sent. use std::thread; use ump::{channel, Error}; #[test] fn sync_expect_noreply() { let (server, client) = channel::<String, String, ()>(); let server_thread = thread::spawn(move || { // Wait for data to arrive from a client let (_, rctx) = server.wait().unwrap(); // Don't do this. drop(rctx); }); let msg = String::from("Client"); let reply = client.send(msg); match reply { Err(Error::NoReply) => { // This is the expected error } _ => { panic!("Unexpected return value"); } |
︙ | ︙ | |||
34 35 36 37 38 39 40 | fn async_expect_noreply() { let tokrt = tokio::runtime::Runtime::new().unwrap(); let (server, client) = channel::<String, String, ()>(); let server_thread = thread::spawn(move || { // Wait for data to arrive from a client | | | 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | fn async_expect_noreply() { let tokrt = tokio::runtime::Runtime::new().unwrap(); let (server, client) = channel::<String, String, ()>(); let server_thread = thread::spawn(move || { // Wait for data to arrive from a client let (_, rctx) = server.wait().unwrap(); // Don't do this. drop(rctx); }); tokrt.block_on(async { let msg = String::from("Client"); |
︙ | ︙ |
Changes to tests/queue_cleanup.rs.
︙ | ︙ | |||
12 13 14 15 16 17 18 | // Should be doing something more robust .. let one_second = time::Duration::from_secs(1); thread::sleep(one_second); drop(server); }); let msg = String::from("Client"); | | | 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | // Should be doing something more robust .. let one_second = time::Duration::from_secs(1); thread::sleep(one_second); drop(server); }); let msg = String::from("Client"); let reply = client.send(msg); match reply { Err(Error::ServerDisappeared) => { // This is the expected error } _ => { panic!("Unexpected return value"); } |
︙ | ︙ |
Changes to tests/stress.rs.
︙ | ︙ | |||
11 12 13 14 15 16 17 | #[test] fn one_at_a_time() { let (server, client) = channel::<Ops, i32, ()>(); let server_thread = thread::spawn(move || { let mut croak = false; | | | | 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | #[test] fn one_at_a_time() { let (server, client) = channel::<Ops, i32, ()>(); let server_thread = thread::spawn(move || { let mut croak = false; while !croak { let (data, rctx) = server.wait().unwrap(); match data { Ops::Die => { croak = true; rctx.reply(0).unwrap(); } Ops::Add(a, b) => { rctx.reply(a + b).unwrap(); |
︙ | ︙ | |||
54 55 56 57 58 59 60 | let niterations = 256; let server_thread = thread::spawn(move || { let mut count = 0; let mut handles = Vec::new(); // +1 because we want to wait for the croak message as well while count < niterations + 1 { | | | 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | let niterations = 256; let server_thread = thread::spawn(move || { let mut count = 0; let mut handles = Vec::new(); // +1 because we want to wait for the croak message as well while count < niterations + 1 { let (data, rctx) = server.wait().unwrap(); let h = thread::spawn(move || match data { Ops::Die => { rctx.reply(0).unwrap(); } Ops::Add(a, b) => { rctx.reply(a + b).unwrap(); } |
︙ | ︙ |
Changes to www/changelog.md.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.9.0] - 2022-09-09 ### Added - Explicitly set MSRV is `1.36` | > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.10.0] - 2023-07-26 ### Added - Server's receive methods will fail with `Error::ClientsDisappeared` if all the associated Client objects have been dropped. ### Changed - Runtime dependencies: - Updated `sigq` to `0.13.2`. - Development dependencies: - Updated `criterion` to `0.5.1` ## [0.9.0] - 2022-09-09 ### Added - Explicitly set MSRV is `1.36` |
︙ | ︙ |