Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From sigq-0.13.3 To sigq-0.13.4
2024-09-09
| ||
18:41 | Crate maintenance. check-in: 0bee0469d7 user: jan tags: trunk | |
2023-09-15
| ||
16:55 | Release maintenance. check-in: 1dfcd66b82 user: jan tags: sigq-0.13.4, trunk | |
16:04 | Implement Clone manually instead of using derive, because using derive forces the inner type(s) to derive Clone as well. check-in: 1fc050c0ee user: jan tags: trunk | |
13:32 | Implement WeakPusher, a weak reference to Pusher objects. check-in: bd3e071eea user: jan tags: trunk | |
2023-07-27
| ||
01:26 | Release maintenance. check-in: 727d7924a6 user: jan tags: sigq-0.13.3, trunk | |
01:21 | Fix premature Puller stale error bug. check-in: 68dd3c42ad user: jan tags: trunk | |
Changes to Cargo.toml.
1 2 | [package] name = "sigq" | | < > < < | > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | [package] name = "sigq" version = "0.13.4" authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"] edition = "2021" license = "0BSD" categories = [ "asynchronous", "concurrency", "data-structures" ] keywords = [ "threads", "sync" ] repository = "https://repos.qrnch.tech/pub/sigq" description = "Queue that signals waiting consumers about node availability" exclude = [ ".fossil-settings", ".efiles", ".fslckout", "www", "rustfmt.toml" ] rust-version = "1.64" [features] default = ["inline-more"] inline-more = [] [dependencies] indexmap = { version = "2.0.0" } parking_lot = { version = "0.12.1" } [package.metadata.docs.rs] rustdoc-args = ["--generate-link-to-definition"] |
Changes to src/lib.rs.
︙ | ︙ | |||
37 38 39 40 41 42 43 | }; use parking_lot::{Condvar, Mutex}; use indexmap::IndexMap; pub use pull::Puller; | | | 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | }; use parking_lot::{Condvar, Mutex}; use indexmap::IndexMap; pub use pull::Puller; pub use push::{Pusher, WeakPusher}; /// Error value used to indicate that there are no remote end-points available. /// /// If a `Puller` method returns this it means the queue has no more associated /// `Pusher`'s, which implies that no new nodes can become available. /// /// If a `Pusher` method returns this it means that the queue has no more |
︙ | ︙ |
Changes to src/push.rs.
|
| | | 1 2 3 4 5 6 7 8 | use std::sync::{Arc, Weak}; /// The transmitting end-point of queue. #[repr(transparent)] pub struct Pusher<I>(pub(crate) Arc<super::Shared<I>>); use super::StaleErr; |
︙ | ︙ | |||
34 35 36 37 38 39 40 41 42 43 44 45 46 47 | /// This function is not particularly useful. If you don't understand why, /// then please don't use it. #[cfg_attr(feature = "inline-more", inline)] pub fn was_empty(&self) -> bool { let inner = self.0.inner.lock(); inner.q.is_empty() } } impl<I> Clone for Pusher<I> { fn clone(&self) -> Self { let mut inner = self.0.inner.lock(); inner.npushers += 1; Self(Arc::clone(&self.0)) | > > > > > | 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | /// This function is not particularly useful. If you don't understand why, /// then please don't use it. #[cfg_attr(feature = "inline-more", inline)] pub fn was_empty(&self) -> bool { let inner = self.0.inner.lock(); inner.q.is_empty() } /// Create a weak reference to this `Pusher`. pub fn weak(&self) -> WeakPusher<I> { WeakPusher(Arc::downgrade(&self.0)) } } impl<I> Clone for Pusher<I> { fn clone(&self) -> Self { let mut inner = self.0.inner.lock(); inner.npushers += 1; Self(Arc::clone(&self.0)) |
︙ | ︙ | |||
64 65 66 67 68 69 70 71 72 | self.0.signal.notify_all(); for waker in inner.wakers.drain(..).map(|(_k, v)| v) { waker.wake() } } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : | > > > > > > > > > > > > > > > > > > > > > > > > > | 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | self.0.signal.notify_all(); for waker in inner.wakers.drain(..).map(|(_k, v)| v) { waker.wake() } } } } /// A weak reference to a [`Pusher`]. #[repr(transparent)] pub struct WeakPusher<I>(pub(crate) Weak<super::Shared<I>>); impl<I> Clone for WeakPusher<I> { fn clone(&self) -> Self { Self(Weak::clone(&self.0)) } } impl<I> WeakPusher<I> { /// Attempt to upgrade `WeakPusher` to a [`Pusher`]. /// /// Returns `None` is all the strong references have been exhausted. pub fn upgrade(&self) -> Option<Pusher<I>> { if let Some(strong) = self.0.upgrade() { let mut inner = strong.inner.lock(); inner.npushers += 1; Some(Pusher(Arc::clone(&strong))) } else { None } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to tests/simple.rs.
|
| | > > > > > > > > > > > > > > > < < | < | < < < < < > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | use sigq::{Puller, Pusher, StaleErr}; /// A reustable test fn std_test(tx: &Pusher<&str>, rx: &Puller<&str>) { // There's a client, no data; try_pop() should return Ok(None) assert_eq!(rx.try_pop(), Ok(None)); tx.push("hello").unwrap(); // There's a client, data; try_pop() should return Ok(Some) assert_eq!(rx.try_pop(), Ok(Some("hello"))); // There's a client; try_pop() should return Ok(None) assert_eq!(rx.try_pop(), Ok(None)); } #[test] fn try_pop_retvals() { let (tx, rx) = sigq::new(); // Run the common tests std_test(&tx, &rx); // Drop the only pusher. drop(tx); // There are no more clients, so try_pop() should return Err(()) assert_eq!(rx.try_pop(), Err(StaleErr)); } #[test] fn try_pop_with_only_weak_tx() { let (tx, rx) = sigq::new::<&str>(); // Create a weak reference to the pusher let _wtx = tx.weak(); // Drop the only (strong) pusher. drop(tx); // There are no more clients; try_pop() should return Err(StaleErr) assert_eq!(rx.try_pop(), Err(StaleErr)); } #[test] fn drop_nonempty() { let (tx, rx) = sigq::new(); // There's a client, no data, so try_pop() should return Ok(None) assert_eq!(rx.try_pop(), Ok(None)); |
︙ | ︙ | |||
64 65 66 67 68 69 70 71 72 | fn stale_puller() { let (tx, rx) = sigq::new::<&str>(); drop(rx); assert_eq!(tx.push("hello"), Err(StaleErr)); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | fn stale_puller() { let (tx, rx) = sigq::new::<&str>(); drop(rx); assert_eq!(tx.push("hello"), Err(StaleErr)); } /// Make sure that a weakened pusher can be upgraded and used #[test] fn weak_to_strong() { let (tx, rx) = sigq::new::<&str>(); // Create a weak reference to the pusher let wtx = tx.weak(); // Create a strong Pusher from the weak reference let Some(tx2) = wtx.upgrade() else { panic!("Unable to upgrade weak Pusher"); }; // Run the common tests std_test(&tx2, &rx); // Drop the only (strong) pusher. drop(tx); // There's still a Pusher; try_pop() should return Ok(None) assert_eq!(rx.try_pop(), Ok(None)); drop(tx2); // There are no more Pushers; try_pop() should return Err(StaleErr) assert_eq!(rx.try_pop(), Err(StaleErr)); } /// If the last strong `Pusher` is dropped a `WeakPusher` should fail to /// upgrade to a `Pusher`. #[test] fn no_strong_weak_upgrade_fail() { let (tx, _rx) = sigq::new::<&str>(); // Create a weak reference to the pusher let wtx = tx.weak(); // Drop the only (strong) pusher. drop(tx); // The only strong pusher was released; upgrading the weak one should fail let Some(_) = wtx.upgrade() else { panic!("Upgrade unexpectedly successful"); }; } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to www/changelog.md.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.13.3] - 2023-07-27 ### Changed - Fixed bug that caused `Puller` to return `Err(StaleErr)` even though nodes still remain in the queue. | > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.13.4] - 2023-09-15 ### Added - `Pusher` objects can spawn downgraded to `WeakPusher` objects, that in turn can be upgraded to `Pusher` objects (as long as all the strong `Pusher` objects have not been dropped). ## [0.13.3] - 2023-07-27 ### Changed - Fixed bug that caused `Puller` to return `Err(StaleErr)` even though nodes still remain in the queue. |
︙ | ︙ |