Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From sigq-0.13.1 To sigq-0.13.2
|
2023-07-27
| ||
| 01:21 | Fix premature Puller stale error bug. check-in: 68dd3c42ad user: jan tags: trunk | |
|
2023-07-26
| ||
| 19:54 | Documentation udates. Hide push and pull modules from public. check-in: b485d7b118 user: jan tags: sigq-0.13.2, trunk | |
|
2023-07-25
| ||
| 01:49 | Release maintenance. check-in: 052111cfd8 user: jan tags: sigq-0.13.1, trunk | |
| 01:47 | Clear the internal queue when the last Puller is dropped. check-in: 2e05c4ff29 user: jan tags: trunk | |
Changes to Cargo.toml.
1 2 | [package] name = "sigq" | | | 1 2 3 4 5 6 7 8 9 10 | [package] name = "sigq" version = "0.13.2" authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"] edition = "2021" license = "0BSD" categories = [ "asynchronous", "concurrency", "data-structures" ] keywords = [ "threads", "sync" ] repository = "https://repos.qrnch.tech/pub/sigq" description = "Queue that signals waiting consumers about node availability" |
| ︙ | ︙ |
Changes to src/lib.rs.
|
| | | > > > > > > > > > > > > > > > > > > > > > > > > > > > > | | > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
//! _sigq_ is a FIFO queue that supports pushing and poping nodes from
//! threads/tasks, crossing sync/async boundaries. The interface to interact
//! with the queue is a pair of end-points. The [`Pusher`] is used to add data
//! to the queue, and the [`Puller`] is used to pull data off the queue.
//!
//! The `Pusher` has a [`push()`](Pusher::push) method that is used to push new
//! nodes onto the queue.
//!
//! The `Puller` has a blocking [`pop()`](Puller::pop) and a
//! [`apop()`](Puller::apop) that returns a `Future` for getting the next node
//! off the queue. These will return immediately with the next node if
//! available, or block and wait for a new node to be pushed onto the queue.
//! [`try_pop()`](Puller::try_pop) can be used as a non-blocking way to get the
//! next node, if available.
//!
//! ```
//! let (pusher, puller) = sigq::new();
//! pusher.push(42).unwrap();
//! assert_eq!(puller.pop(), Ok(42));
//! assert_eq!(puller.try_pop(), Ok(None));
//! ```
//!
//! # Semantics
//! - Dropping the last `Pusher` end-point will cause waiting `Puller`'s to
//! wake up and return `Err(StaleErr)` if there are no more nodes on the
//! queue.
//! - Dropping the last `Puller` end-point will:
//! - Immediately drop all the nodes in the queue.
//! - Cause the `Puller`'s to return `Err(StaleErr)` if new nodes are
//! attempted to be added to the queue.
pub(crate) mod pull;
pub(crate) mod push;
use std::{
collections::VecDeque, sync::atomic::AtomicUsize, sync::Arc, task::Waker
};
use parking_lot::{Condvar, Mutex};
use indexmap::IndexMap;
pub use pull::Puller;
pub use push::Pusher;
/// Error value used to indicate that there are no remote end-points available.
///
/// If a `Puller` method returns this it means the queue has no more associated
/// `Pusher`'s, which implies that no new nodes can become available.
///
/// If a `Pusher` method returns this it means that the queue has no more
/// associated `Puller`'s, which implies that there's nothing to take nodes off
/// the queue any longer.
#[derive(Debug, PartialEq)]
pub struct StaleErr;
/// Inner shared data.
///
/// This is read/write data, and hence protected by a mutex.
struct Inner<I> {
|
| ︙ | ︙ |
Changes to src/pull.rs.
| ︙ | ︙ | |||
15 16 17 18 19 20 21 |
impl<I> Puller<I> {
/// Pull the oldest node off the queue and return it.
///
/// If no nodes are available on the queue, then block and wait for one to
/// become available.
///
| | | > | 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
impl<I> Puller<I> {
/// Pull the oldest node off the queue and return it.
///
/// If no nodes are available on the queue, then block and wait for one to
/// become available.
///
/// Returns `Err(StaleErr)` if there are no more items in queue and there are
/// no more [`Pusher`](super::Pusher) objects associated with this
/// `Puller`.
#[cfg_attr(feature = "inline-more", inline)]
pub fn pop(&self) -> Result<I, StaleErr> {
let mut inner = self.0.inner.lock();
loop {
if inner.npushers == 0 {
break Err(StaleErr);
} else {
|
| ︙ | ︙ | |||
43 44 45 46 47 48 49 | /// Pull the oldest node off the queue and return it. /// /// If a node is available on the queue then take it off and return it. /// /// If no nodes are available and there are no more [`Pusher`](super::Pusher) /// objects associated with this `Puller`, then return `Err(StaleErr)`. /// | | | 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
/// Pull the oldest node off the queue and return it.
///
/// If a node is available on the queue then take it off and return it.
///
/// If no nodes are available and there are no more [`Pusher`](super::Pusher)
/// objects associated with this `Puller`, then return `Err(StaleErr)`.
///
/// If no nodes are available and there's at least one associated `Pusher`
/// exists then return `Ok(None)`.
#[cfg_attr(feature = "inline-more", inline)]
pub fn try_pop(&self) -> Result<Option<I>, StaleErr> {
let mut inner = self.0.inner.lock();
if let Some(n) = inner.q.pop_front() {
Ok(Some(n))
} else if inner.npushers == 0 {
|
| ︙ | ︙ | |||
91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
pub fn was_empty(&self) -> bool {
let inner = self.0.inner.lock();
inner.q.is_empty()
}
}
impl<I> Drop for Puller<I> {
fn drop(&mut self) {
let mut inner = self.0.inner.lock();
inner.npullers -= 1;
// If this is the last puller then remove all thr nodes.
// The nodes may contain some kind of context that must be notified that
// the node will never reach its destination.
| > > > > > | 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
pub fn was_empty(&self) -> bool {
let inner = self.0.inner.lock();
inner.q.is_empty()
}
}
impl<I> Drop for Puller<I> {
/// Drop a `Puller` instance.
///
/// If this is the last `Puller` end-point of a sigq instance, then the inner
/// queue will be cleared (i.e. all its elements will be immediately
/// dropped).
fn drop(&mut self) {
let mut inner = self.0.inner.lock();
inner.npullers -= 1;
// If this is the last puller then remove all thr nodes.
// The nodes may contain some kind of context that must be notified that
// the node will never reach its destination.
|
| ︙ | ︙ |
Changes to src/push.rs.
| ︙ | ︙ | |||
45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
let mut inner = self.0.inner.lock();
inner.npushers += 1;
Self(Arc::clone(&self.0))
}
}
impl<I> Drop for Pusher<I> {
fn drop(&mut self) {
let mut inner = self.0.inner.lock();
inner.npushers -= 1;
// If this was the last pusher then wake any pullers that are waiting to
// receive new items. (When they discover that no pushers remain they will
// return None).
| > > > > | 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
let mut inner = self.0.inner.lock();
inner.npushers += 1;
Self(Arc::clone(&self.0))
}
}
impl<I> Drop for Pusher<I> {
/// Drop a `Pusher` instance.
///
/// When the final instance of a sigq's instance's `Pusher` is dropped, wake
/// up any `Puller`'s waiting for new nodes to arrive.
fn drop(&mut self) {
let mut inner = self.0.inner.lock();
inner.npushers -= 1;
// If this was the last pusher then wake any pullers that are waiting to
// receive new items. (When they discover that no pushers remain they will
// return None).
|
| ︙ | ︙ |
Changes to www/changelog.md.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.13.1] - 2023-07-25 ### Added | > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.13.2] - 2023-07-26 ### Added ### Changed - Documentation updates. ### Removed ## [0.13.1] - 2023-07-25 ### Added |
| ︙ | ︙ |