Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From sigq-0.13.1 To sigq-0.13.2
2023-07-27
| ||
01:21 | Fix premature Puller stale error bug. check-in: 68dd3c42ad user: jan tags: trunk | |
2023-07-26
| ||
19:54 | Documentation udates. Hide push and pull modules from public. check-in: b485d7b118 user: jan tags: sigq-0.13.2, trunk | |
2023-07-25
| ||
01:49 | Release maintenance. check-in: 052111cfd8 user: jan tags: sigq-0.13.1, trunk | |
01:47 | Clear the internal queue when the last Puller is dropped. check-in: 2e05c4ff29 user: jan tags: trunk | |
Changes to Cargo.toml.
1 2 | [package] name = "sigq" | | | 1 2 3 4 5 6 7 8 9 10 | [package] name = "sigq" version = "0.13.2" authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"] edition = "2021" license = "0BSD" categories = [ "asynchronous", "concurrency", "data-structures" ] keywords = [ "threads", "sync" ] repository = "https://repos.qrnch.tech/pub/sigq" description = "Queue that signals waiting consumers about node availability" |
︙ | ︙ |
Changes to src/lib.rs.
|
| | | > > > > > > > > > > > > > > > > > > > > > > > > > > > > | | > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | //! _sigq_ is a FIFO queue that supports pushing and poping nodes from //! threads/tasks, crossing sync/async boundaries. The interface to interact //! with the queue is a pair of end-points. The [`Pusher`] is used to add data //! to the queue, and the [`Puller`] is used to pull data off the queue. //! //! The `Pusher` has a [`push()`](Pusher::push) method that is used to push new //! nodes onto the queue. //! //! The `Puller` has a blocking [`pop()`](Puller::pop) and a //! [`apop()`](Puller::apop) that returns a `Future` for getting the next node //! off the queue. These will return immediately with the next node if //! available, or block and wait for a new node to be pushed onto the queue. //! [`try_pop()`](Puller::try_pop) can be used as a non-blocking way to get the //! next node, if available. //! //! ``` //! let (pusher, puller) = sigq::new(); //! pusher.push(42).unwrap(); //! assert_eq!(puller.pop(), Ok(42)); //! assert_eq!(puller.try_pop(), Ok(None)); //! ``` //! //! # Semantics //! - Dropping the last `Pusher` end-point will cause waiting `Puller`'s to //! wake up and return `Err(StaleErr)` if there are no more nodes on the //! queue. //! - Dropping the last `Puller` end-point will: //! - Immediately drop all the nodes in the queue. //! - Cause the `Puller`'s to return `Err(StaleErr)` if new nodes are //! attempted to be added to the queue. pub(crate) mod pull; pub(crate) mod push; use std::{ collections::VecDeque, sync::atomic::AtomicUsize, sync::Arc, task::Waker }; use parking_lot::{Condvar, Mutex}; use indexmap::IndexMap; pub use pull::Puller; pub use push::Pusher; /// Error value used to indicate that there are no remote end-points available. /// /// If a `Puller` method returns this it means the queue has no more associated /// `Pusher`'s, which implies that no new nodes can become available. /// /// If a `Pusher` method returns this it means that the queue has no more /// associated `Puller`'s, which implies that there's nothing to take nodes off /// the queue any longer. #[derive(Debug, PartialEq)] pub struct StaleErr; /// Inner shared data. /// /// This is read/write data, and hence protected by a mutex. struct Inner<I> { |
︙ | ︙ |
Changes to src/pull.rs.
︙ | ︙ | |||
15 16 17 18 19 20 21 | impl<I> Puller<I> { /// Pull the oldest node off the queue and return it. /// /// If no nodes are available on the queue, then block and wait for one to /// become available. /// | | | > | 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | impl<I> Puller<I> { /// Pull the oldest node off the queue and return it. /// /// If no nodes are available on the queue, then block and wait for one to /// become available. /// /// Returns `Err(StaleErr)` if there are no more items in queue and there are /// no more [`Pusher`](super::Pusher) objects associated with this /// `Puller`. #[cfg_attr(feature = "inline-more", inline)] pub fn pop(&self) -> Result<I, StaleErr> { let mut inner = self.0.inner.lock(); loop { if inner.npushers == 0 { break Err(StaleErr); } else { |
︙ | ︙ | |||
43 44 45 46 47 48 49 | /// Pull the oldest node off the queue and return it. /// /// If a node is available on the queue then take it off and return it. /// /// If no nodes are available and there are no more [`Pusher`](super::Pusher) /// objects associated with this `Puller`, then return `Err(StaleErr)`. /// | | | 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | /// Pull the oldest node off the queue and return it. /// /// If a node is available on the queue then take it off and return it. /// /// If no nodes are available and there are no more [`Pusher`](super::Pusher) /// objects associated with this `Puller`, then return `Err(StaleErr)`. /// /// If no nodes are available and there's at least one associated `Pusher` /// exists then return `Ok(None)`. #[cfg_attr(feature = "inline-more", inline)] pub fn try_pop(&self) -> Result<Option<I>, StaleErr> { let mut inner = self.0.inner.lock(); if let Some(n) = inner.q.pop_front() { Ok(Some(n)) } else if inner.npushers == 0 { |
︙ | ︙ | |||
91 92 93 94 95 96 97 98 99 100 101 102 103 104 | pub fn was_empty(&self) -> bool { let inner = self.0.inner.lock(); inner.q.is_empty() } } impl<I> Drop for Puller<I> { fn drop(&mut self) { let mut inner = self.0.inner.lock(); inner.npullers -= 1; // If this is the last puller then remove all thr nodes. // The nodes may contain some kind of context that must be notified that // the node will never reach its destination. | > > > > > | 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | pub fn was_empty(&self) -> bool { let inner = self.0.inner.lock(); inner.q.is_empty() } } impl<I> Drop for Puller<I> { /// Drop a `Puller` instance. /// /// If this is the last `Puller` end-point of a sigq instance, then the inner /// queue will be cleared (i.e. all its elements will be immediately /// dropped). fn drop(&mut self) { let mut inner = self.0.inner.lock(); inner.npullers -= 1; // If this is the last puller then remove all thr nodes. // The nodes may contain some kind of context that must be notified that // the node will never reach its destination. |
︙ | ︙ |
Changes to src/push.rs.
︙ | ︙ | |||
45 46 47 48 49 50 51 52 53 54 55 56 57 58 | let mut inner = self.0.inner.lock(); inner.npushers += 1; Self(Arc::clone(&self.0)) } } impl<I> Drop for Pusher<I> { fn drop(&mut self) { let mut inner = self.0.inner.lock(); inner.npushers -= 1; // If this was the last pusher then wake any pullers that are waiting to // receive new items. (When they discover that no pushers remain they will // return None). | > > > > | 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | let mut inner = self.0.inner.lock(); inner.npushers += 1; Self(Arc::clone(&self.0)) } } impl<I> Drop for Pusher<I> { /// Drop a `Pusher` instance. /// /// When the final instance of a sigq's instance's `Pusher` is dropped, wake /// up any `Puller`'s waiting for new nodes to arrive. fn drop(&mut self) { let mut inner = self.0.inner.lock(); inner.npushers -= 1; // If this was the last pusher then wake any pullers that are waiting to // receive new items. (When they discover that no pushers remain they will // return None). |
︙ | ︙ |
Changes to www/changelog.md.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.13.1] - 2023-07-25 ### Added | > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.13.2] - 2023-07-26 ### Added ### Changed - Documentation updates. ### Removed ## [0.13.1] - 2023-07-25 ### Added |
︙ | ︙ |