Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From ump-0.10.1 To ump-0.10.2
2023-07-28
| ||
04:12 | Include tests in packaging. check-in: bca62b6697 user: jan tags: trunk | |
01:44 | Release maintenance. check-in: 27099be4a4 user: jan tags: trunk, ump-0.10.2 | |
01:25 | Rename 'send' message operation to 'request'. Add dev-docs feature for including internal notes to generated documentation. check-in: fb98947448 user: jan tags: trunk | |
2023-07-27
| ||
01:34 | Release maintenance. check-in: 7079d4a788 user: jan tags: trunk, ump-0.10.1 | |
01:31 | Use sigq 0.13.3 to get puller bugfix. check-in: 3dece6432a user: jan tags: trunk | |
Changes to Cargo.toml.
1 2 | [package] name = "ump" | | > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | [package] name = "ump" version = "0.10.2" authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"] edition = "2018" license = "0BSD" categories = [ "concurrency", "asynchronous" ] keywords = [ "channel", "threads", "sync", "message-passing" ] repository = "https://repos.qrnch.tech/pub/ump" description = "Micro message passing library for threads/tasks communication." rust-version = "1.39" # Can't exclude "benches", because the [[bench]] section will fail. exclude = [ ".efiles", ".fossil-settings", ".fslckout", "examples", "rustfmt.toml", "tests", "www" ] [features] dev-docs = [] [dependencies] parking_lot = { version = "0.12.1" } sigq = { version = "0.13.3" } [dev-dependencies] criterion = { version = "0.5.1", features = ["async_tokio"] } tokio = { version = "1.29.1", features = ["full"] } [[bench]] name = "add_server" harness = false |
Changes to benches/add_server.rs.
1 2 3 4 5 6 7 8 9 10 11 12 13 | use std::thread; use criterion::{criterion_group, criterion_main, Criterion}; use ump::channel; enum Ops { Die, Add(i32, i32), AddThreaded(i32, i32) } pub fn criterion_benchmark(c: &mut Criterion) { | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | use std::thread; use criterion::{criterion_group, criterion_main, Criterion}; use ump::channel; enum Ops { Die, Add(i32, i32), AddThreaded(i32, i32) } pub fn criterion_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("req operation"); let (server, client) = channel::<Ops, i32, ()>(); let server_thread = thread::spawn(move || { let mut croak = false; while !croak { let (data, rctx) = server.wait().unwrap(); |
︙ | ︙ | |||
37 38 39 40 41 42 43 | let mut p: i32 = 0; let mut q: i32 = 0; group.bench_function("add", |b| { b.iter(|| { p += 2; q -= 3; | | | | | | | 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | let mut p: i32 = 0; let mut q: i32 = 0; group.bench_function("add", |b| { b.iter(|| { p += 2; q -= 3; let result = client.req(Ops::Add(p, q)).unwrap(); assert_eq!(result, q + p); }) }); p = 0; q = 0; group.bench_function("add (threaded)", |b| { b.iter(|| { p += 2; q -= 3; let result = client.req(Ops::AddThreaded(p, q)).unwrap(); assert_eq!(result, q + p); }) }); let rt = tokio::runtime::Runtime::new().unwrap(); group.bench_function("add (async)", |b| { b.to_async(&rt).iter(|| async { let p = 1; let q = 2; let result = client.areq(Ops::Add(p, q)).await.unwrap(); assert_eq!(result, q + p); }) }); let rt = tokio::runtime::Runtime::new().unwrap(); group.bench_function("add (async, threaded)", |b| { b.to_async(&rt).iter(|| async { let p = 1; let q = 2; let result = client.areq(Ops::AddThreaded(p, q)).await.unwrap(); assert_eq!(result, q + p); }) }); let result = client.req(Ops::Die).unwrap(); assert_eq!(result, 0); server_thread.join().unwrap(); } criterion_group!(benches, criterion_benchmark); criterion_main!(benches); // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to examples/cloneclientserver.rs.
︙ | ︙ | |||
35 36 37 38 39 40 41 | rctx.reply(Reply::OkICroaked).unwrap(); break; } } }); if let Reply::ClientClone(cloned_client) = | | | | < | 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | rctx.reply(Reply::OkICroaked).unwrap(); break; } } }); if let Reply::ClientClone(cloned_client) = client.req(Request::CloneClient).unwrap() { if let Reply::Sum(x) = cloned_client.req(Request::Add(5, 7)).unwrap() { assert_eq!(x, 12); } else { panic!("Unexpected result"); } } else { panic!("Unexpected result"); } let _ = client.req(Request::Croak); server_thread.join().unwrap(); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to examples/many_once.rs.
︙ | ︙ | |||
51 52 53 54 55 56 57 | let mut join_handles = Vec::new(); for i in 0..nclients { let client_clone = client.clone(); let client_thread = thread::spawn(move || { let name = format!("Client {}", i + 1); let msg = String::from(&name); println!("{} sending '{}'", name, msg); | | | 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | let mut join_handles = Vec::new(); for i in 0..nclients { let client_clone = client.clone(); let client_thread = thread::spawn(move || { let name = format!("Client {}", i + 1); let msg = String::from(&name); println!("{} sending '{}'", name, msg); let reply = client_clone.req(msg).unwrap(); println!("{} received reply '{}' -- done", name, reply); }); join_handles.push(client_thread); } for n in join_handles { n.join().unwrap(); |
︙ | ︙ |
Changes to examples/simple.rs.
︙ | ︙ | |||
20 21 22 23 24 25 26 | rctx.reply(reply).unwrap(); println!("Server done"); }); let msg = String::from("Client"); println!("Client sending '{}'", msg); | | | 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | rctx.reply(reply).unwrap(); println!("Server done"); }); let msg = String::from("Client"); println!("Client sending '{}'", msg); let reply = client.req(msg).unwrap(); println!("Client received reply '{}'", reply); println!("Client done"); server_thread.join().unwrap(); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to examples/threaded_handler.rs.
︙ | ︙ | |||
48 49 50 51 52 53 54 | let mut join_handles = Vec::new(); for i in 0..nclients { let client_clone = client.clone(); let client_thread = thread::spawn(move || { let name = format!("Client {}", i + 1); let msg = String::from(&name); println!("{} sending '{}'", name, msg); | | | 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | let mut join_handles = Vec::new(); for i in 0..nclients { let client_clone = client.clone(); let client_thread = thread::spawn(move || { let name = format!("Client {}", i + 1); let msg = String::from(&name); println!("{} sending '{}'", name, msg); let reply = client_clone.req(msg).unwrap(); println!("{} received reply '{}' -- done", name, reply); }); join_handles.push(client_thread); } for n in join_handles { n.join().unwrap(); |
︙ | ︙ |
Changes to src/client.rs.
︙ | ︙ | |||
41 42 43 44 45 46 47 | /// /// If the server never replied to the message and the reply context was /// dropped `Err(Error::NoReply)` will be returned. /// /// If an application specific error occurs it will be returned as a /// `Err(Error::App(E))`, where `E` is the error type used when creating the /// [`channel`](crate::channel). | | | 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | /// /// If the server never replied to the message and the reply context was /// dropped `Err(Error::NoReply)` will be returned. /// /// If an application specific error occurs it will be returned as a /// `Err(Error::App(E))`, where `E` is the error type used when creating the /// [`channel`](crate::channel). pub fn req(&self, out: S) -> Result<R, Error<E>> { // Create a per-call reply context. // This context could be created when the Client object is being created // and stored in the context, and thus be reused for reach client call. // One side-effect is that some of the state semantics becomes more // complicated. // The central repo has such an implementation checked in, but it seems to // have some more corner cases that aren't properly handled. |
︙ | ︙ | |||
64 65 66 67 68 69 70 | .map_err(|_| Error::ServerDisappeared)?; let reply = rctx.get()?; Ok(reply) } | > > > > > | | > > > > > | 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 | .map_err(|_| Error::ServerDisappeared)?; let reply = rctx.get()?; Ok(reply) } #[deprecated(since = "0.10.2", note = "Use req() instead.")] pub fn send(&self, out: S) -> Result<R, Error<E>> { self.req(out) } /// Same as [`Client::req()`] but for use in `async` contexts. pub async fn areq(&self, out: S) -> Result<R, Error<E>> { let rctx = InnerReplyContext::new(); self .qpusher .push(ServerQueueNode { msg: out, reply: rctx.clone() }) .map_err(|_| Error::ServerDisappeared)?; let result = rctx.aget().await?; Ok(result) } #[deprecated(since = "0.10.2", note = "Use areq() instead.")] pub async fn asend(&self, out: S) -> Result<R, Error<E>> { self.areq(out).await } } impl<S, R, E> Clone for Client<S, R, E> { /// Clone a client. /// /// When a client is cloned the new object will be linked to the same server, |
︙ | ︙ |
Changes to src/err.rs.
1 2 3 4 5 | use std::fmt; /// Module-specific error codes. #[derive(Debug)] pub enum Error<E> { | | > > | > > | > | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | use std::fmt; /// Module-specific error codes. #[derive(Debug)] pub enum Error<E> { /// The server object has shut down. /// /// This happens when clients: /// - attempt to transmit messages to a server that has been deallocated. /// - have their requests dropped from the serrver's queue because the /// server itself was deallocated. ServerDisappeared, /// No more client end-points remain. /// /// There are no more nodes to pick up in the queue and all client /// end-points have been dropped (implied: no new nodes will ever be added /// to the queue). ClientsDisappeared, /// The message was delivered to the server, but the reply context was /// dropped before transmitting a reply. NoReply, /// Application-specific error. /// /// The `E` type is typically declared as the third generic parameter to /// [`channel`](crate::channel()). App(E) } impl<E> Error<E> { pub fn into_apperr(self) -> Option<E> { |
︙ | ︙ |
Changes to src/lib.rs.
︙ | ︙ | |||
11 12 13 14 15 16 17 | //! An application calls [`channel`] to create a linked pair of a [`Server`] //! and a [`Client`]. //! //! The server calls [`Server::wait()`]/[`Server::async_wait()`], which //! blocks and waits for an incoming message from a client. //! //! A client, in a separate thread or task, calls | | | 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | //! An application calls [`channel`] to create a linked pair of a [`Server`] //! and a [`Client`]. //! //! The server calls [`Server::wait()`]/[`Server::async_wait()`], which //! blocks and waits for an incoming message from a client. //! //! A client, in a separate thread or task, calls //! [`Client::req()`]/[`Client::areq()`] to send a message to the server. //! //! The server's wait call returns two objects: The message sent by the //! client, and a [`ReplyContext`]. //! //! After processing its application-defined message, the server *must* call //! the [`ReplyContext::reply()`] on the returned reply context object to //! return a reply message to the client. |
︙ | ︙ | |||
52 53 54 55 56 57 58 | //! rctx.reply(reply); //! //! println!("Server done"); //! }); //! //! let msg = String::from("Client"); //! println!("Client sending '{}'", msg); | | | | 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | //! rctx.reply(reply); //! //! println!("Server done"); //! }); //! //! let msg = String::from("Client"); //! println!("Client sending '{}'", msg); //! let reply = client.req(msg).unwrap(); //! println!("Client received reply '{}'", reply); //! println!("Client done"); //! //! server_thread.join().unwrap(); //! ``` //! In practice the req/reply types will probably be `enum`s used to //! indicate command/return type with associated data. The third type argument //! to [`channel`] is an error type that can be used to explicitly pass errors //! back to the sender. //! //! # Semantics //! There are some potentially useful semantic quirks that can be good to know //! about, but some of them should be used with caution. This section will |
︙ | ︙ | |||
113 114 115 116 117 118 119 | /// /// The [`Server`] object is used to wait for incoming messages from connected /// clients. Once a message arrives it must reply to it using a /// [`ReplyContext`] that's returned to it in the same call that returned the /// message. /// /// The [`Client`] object can be used to send messages to the [`Server`]. The | | | | | | | 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | /// /// The [`Server`] object is used to wait for incoming messages from connected /// clients. Once a message arrives it must reply to it using a /// [`ReplyContext`] that's returned to it in the same call that returned the /// message. /// /// The [`Client`] object can be used to send messages to the [`Server`]. The /// [`Client::req()`] call will not return until the server has replied. /// /// Clients can be [cloned](Client::clone()); each clone will create a /// new client object that is connected to the same server object, but is /// completely independent of the original client. /// /// The `S` type parameter is the "request" data type that clients will /// transfer to the server. The `R` type parameter is the "receive" data type /// that clients will receive from the server. The `E` type parameter can be /// used to return application specific errors from the server to the client. pub fn channel<S, R, E>() -> (Server<S, R, E>, Client<S, R, E>) { let (qpusher, qpuller) = sigq::new(); let server = Server { qpuller }; let client = Client { qpusher }; (server, client) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/rctx/public.rs.
1 2 | use crate::rctx::{err::Error, inner::State, InnerReplyContext}; | > > > > > | | | | | > > | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | use crate::rctx::{err::Error, inner::State, InnerReplyContext}; /// Context used to transmit a reply back to the originating requester. #[cfg_attr( feature = "dev-docs", doc = r#" # Internals Public-facing sender part of the `ReplyContext` object. This, as opposed to `InnerReplyContext`, is safe to pass to applications that are meant to only be able to put a value through the `ReplyContext` channel, but not extract the value from it. "# )] pub struct ReplyContext<I, E> { inner: InnerReplyContext<I, E>, did_handover: bool } impl<I: 'static + Send, E> ReplyContext<I, E> { /// Send a reply back to originating client. /// /// # Example /// ``` /// use std::thread; /// use ump::channel; /// /// let (server, client) = channel::<String, String, ()>(); /// let server_thread = thread::spawn(move || { /// let (data, rctx) = server.wait().unwrap(); /// let reply = format!("Hello, {}!", data); /// rctx.reply(reply).unwrap(); /// }); /// let msg = String::from("Client"); /// let reply = client.req(msg).unwrap(); /// assert_eq!(reply, "Hello, Client!"); /// server_thread.join().unwrap(); /// ``` /// /// # Semantics /// This call is safe to make after the server context has been released. pub fn reply(mut self, data: I) -> Result<(), Error<E>> { |
︙ | ︙ | |||
57 58 59 60 61 62 63 | /// /// let (server, client) = channel::<String, String, MyError>(); /// let server_thread = thread::spawn(move || { /// let (_, rctx) = server.wait().unwrap(); /// rctx.fail(MyError::SomeError("failed".to_string())).unwrap(); /// }); /// let msg = String::from("Client"); | | | 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | /// /// let (server, client) = channel::<String, String, MyError>(); /// let server_thread = thread::spawn(move || { /// let (_, rctx) = server.wait().unwrap(); /// rctx.fail(MyError::SomeError("failed".to_string())).unwrap(); /// }); /// let msg = String::from("Client"); /// let reply = client.req(msg); /// match reply { /// Err(Error::App(MyError::SomeError(s))) => { /// assert_eq!(s, "failed"); /// } /// _ => { /// panic!("Unexpected return value"); /// } |
︙ | ︙ |
Changes to tests/async_client.rs.
︙ | ︙ | |||
34 35 36 37 38 39 40 | tokrt.block_on(async { let mut a: i32 = 0; let mut b: i32 = 0; for _ in 0..niterations { a += 2; b -= 3; | | | | 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | tokrt.block_on(async { let mut a: i32 = 0; let mut b: i32 = 0; for _ in 0..niterations { a += 2; b -= 3; let result = client.areq(Request::Add(a, b)).await.unwrap(); if let Reply::Sum(sum) = result { assert_eq!(sum, a + b); } else { panic!("Didn't get sum"); } } let result = client.areq(Request::Croak).await.unwrap(); if let Reply::OkICroaked = result { } else { panic!("Didn't get a croak"); } }); server_thread.join().unwrap(); |
︙ | ︙ |
Changes to tests/fail.rs.
︙ | ︙ | |||
16 17 18 19 20 21 22 | // Wait for data to arrive from a client let (_, rctx) = server.wait().unwrap(); rctx.fail(MyError::SomeError("failed".to_string())).unwrap(); }); let msg = String::from("Client"); | | | 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | // Wait for data to arrive from a client let (_, rctx) = server.wait().unwrap(); rctx.fail(MyError::SomeError("failed".to_string())).unwrap(); }); let msg = String::from("Client"); let reply = client.req(msg); match reply { Err(Error::App(MyError::SomeError(s))) => { assert_eq!(s, "failed"); } _ => { panic!("Unexpected return value"); } |
︙ | ︙ | |||
45 46 47 48 49 50 51 | let (_, rctx) = server.wait().unwrap(); rctx.fail(MyError::SomeError("failed".to_string())).unwrap(); }); tokrt.block_on(async { let msg = String::from("Client"); | | | 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | let (_, rctx) = server.wait().unwrap(); rctx.fail(MyError::SomeError("failed".to_string())).unwrap(); }); tokrt.block_on(async { let msg = String::from("Client"); let reply = client.areq(msg).await; match reply { Err(Error::App(MyError::SomeError(s))) => { assert_eq!(s, "failed"); } _ => { panic!("Unexpected return value"); } |
︙ | ︙ |
Changes to tests/noreply.rs.
︙ | ︙ | |||
12 13 14 15 16 17 18 | let (_, rctx) = server.wait().unwrap(); // Don't do this. drop(rctx); }); let msg = String::from("Client"); | | | 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | let (_, rctx) = server.wait().unwrap(); // Don't do this. drop(rctx); }); let msg = String::from("Client"); let reply = client.req(msg); match reply { Err(Error::NoReply) => { // This is the expected error } _ => { panic!("Unexpected return value"); } |
︙ | ︙ | |||
42 43 44 45 46 47 48 | // Don't do this. drop(rctx); }); tokrt.block_on(async { let msg = String::from("Client"); | | | 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | // Don't do this. drop(rctx); }); tokrt.block_on(async { let msg = String::from("Client"); let reply = client.areq(msg).await; match reply { Err(Error::NoReply) => { // This is the expected error } _ => { panic!("Unexpected return value"); } |
︙ | ︙ |
Changes to tests/queue_cleanup.rs.
︙ | ︙ | |||
12 13 14 15 16 17 18 | // Should be doing something more robust .. let one_second = time::Duration::from_secs(1); thread::sleep(one_second); drop(server); }); let msg = String::from("Client"); | | | 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | // Should be doing something more robust .. let one_second = time::Duration::from_secs(1); thread::sleep(one_second); drop(server); }); let msg = String::from("Client"); let reply = client.req(msg); match reply { Err(Error::ServerDisappeared) => { // This is the expected error } _ => { panic!("Unexpected return value"); } |
︙ | ︙ | |||
41 42 43 44 45 46 47 | let one_second = time::Duration::from_secs(1); thread::sleep(one_second); drop(server); }); tokrt.block_on(async { let msg = String::from("Client"); | | | | 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | let one_second = time::Duration::from_secs(1); thread::sleep(one_second); drop(server); }); tokrt.block_on(async { let msg = String::from("Client"); let reply = client.areq(msg).await; //let reply = client.req(msg); match reply { Err(Error::ServerDisappeared) => { // This is the expected error } _ => { panic!("Unexpected return value"); } |
︙ | ︙ |
Changes to tests/stress.rs.
︙ | ︙ | |||
34 35 36 37 38 39 40 | let mut a: i32 = 0; let mut b: i32 = 0; for _ in 0..65535 { a += 2; b -= 3; | | | | 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | let mut a: i32 = 0; let mut b: i32 = 0; for _ in 0..65535 { a += 2; b -= 3; let result = client.req(Ops::Add(a, b)).unwrap(); assert_eq!(result, a + b); } let result = client.req(Ops::Die).unwrap(); assert_eq!(result, 0); server_thread.join().unwrap(); } #[test] fn one_at_a_time_threaded_handler() { |
︙ | ︙ | |||
80 81 82 83 84 85 86 | let mut a: i32 = 0; let mut b: i32 = 0; for _ in 0..niterations { a += 2; b -= 3; | | | | 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | let mut a: i32 = 0; let mut b: i32 = 0; for _ in 0..niterations { a += 2; b -= 3; let result = client.req(Ops::Sub(a, b)).unwrap(); assert_eq!(result, a - b); } let result = client.req(Ops::Die).unwrap(); assert_eq!(result, 0); server_thread.join().unwrap(); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to www/changelog.md.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.10.1] - 2023-07-27 ### Changed - Runtime dependencies: - Updated `sigq` to `0.13.3`. | > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.10.2] - 2023-07-28 ### Added - Add `send()`/`asend()` wrappers around the new `req()`/`areq()` methods with a deprecation notice. - Add a `dev-docs` feature to allow internal documentation notes to be included in generated documentation. ### Changed - Rename `send()`/`asend()` to `req()/`areq()`. ## [0.10.1] - 2023-07-27 ### Changed - Runtime dependencies: - Updated `sigq` to `0.13.3`. |
︙ | ︙ |