sigq

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From sigq-0.13.1 To sigq-0.13.2

2023-07-27
01:21
Fix premature Puller stale error bug. check-in: 68dd3c42ad user: jan tags: trunk
2023-07-26
19:54
Documentation udates. Hide push and pull modules from public. check-in: b485d7b118 user: jan tags: sigq-0.13.2, trunk
2023-07-25
01:49
Release maintenance. check-in: 052111cfd8 user: jan tags: sigq-0.13.1, trunk
01:47
Clear the internal queue when the last Puller is dropped. check-in: 2e05c4ff29 user: jan tags: trunk

Changes to Cargo.toml.

1
2
3

4
5
6
7
8
9
10
1
2

3
4
5
6
7
8
9
10


-
+







[package]
name = "sigq"
version = "0.13.1"
version = "0.13.2"
authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"]
edition = "2021"
license = "0BSD"
categories = [ "asynchronous", "concurrency", "data-structures" ]
keywords = [ "threads", "sync" ]
repository = "https://repos.qrnch.tech/pub/sigq"
description = "Queue that signals waiting consumers about node availability"

Changes to src/lib.rs.

1
2






























3
4
5


6
7
8
9
10
11
12
13
14
15
16
17








18
19
20
21
22
23
24


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

-
-
+
+












+
+
+
+
+
+
+
+







//! Queue which supports pushing and poping nodes from threads/tasks, crossing
//! sync/async boundaries.
//! _sigq_ is a FIFO queue that supports pushing and poping nodes from
//! threads/tasks, crossing sync/async boundaries.  The interface to interact
//! with the queue is a pair of end-points.  The [`Pusher`] is used to add data
//! to the queue, and the [`Puller`] is used to pull data off the queue.
//!
//! The `Pusher` has a [`push()`](Pusher::push) method that is used to push new
//! nodes onto the queue.
//!
//! The `Puller` has a blocking [`pop()`](Puller::pop) and a
//! [`apop()`](Puller::apop) that returns a `Future` for getting the next node
//! off the queue.  These will return immediately with the next node if
//! available, or block and wait for a new node to be pushed onto the queue.
//! [`try_pop()`](Puller::try_pop) can be used as a non-blocking way to get the
//! next node, if available.
//!
//! ```
//! let (pusher, puller) = sigq::new();
//! pusher.push(42).unwrap();
//! assert_eq!(puller.pop(), Ok(42));
//! assert_eq!(puller.try_pop(), Ok(None));
//! ```
//!
//! # Semantics
//! - Dropping the last `Pusher` end-point will cause waiting `Puller`'s to
//!   wake up and return `Err(StaleErr)` if there are no more nodes on the
//!   queue.
//! - Dropping the last `Puller` end-point will:
//!   - Immediately drop all the nodes in the queue.
//!   - Cause the `Puller`'s to return `Err(StaleErr)` if new nodes are
//!     attempted to be added to the queue.

pub mod pull;
pub mod push;
pub(crate) mod pull;
pub(crate) mod push;

use std::{
  collections::VecDeque, sync::atomic::AtomicUsize, sync::Arc, task::Waker
};

use parking_lot::{Condvar, Mutex};

use indexmap::IndexMap;

pub use pull::Puller;
pub use push::Pusher;

/// Error value used to indicate that there are no remote end-points available.
///
/// If a `Puller` method returns this it means the queue has no more associated
/// `Pusher`'s, which implies that no new nodes can become available.
///
/// If a `Pusher` method returns this it means that the queue has no more
/// associated `Puller`'s, which implies that there's nothing to take nodes off
/// the queue any longer.
#[derive(Debug, PartialEq)]
pub struct StaleErr;

/// Inner shared data.
///
/// This is read/write data, and hence protected by a mutex.
struct Inner<I> {

Changes to src/pull.rs.

15
16
17
18
19
20
21
22
23



24
25
26
27
28
29
30
15
16
17
18
19
20
21


22
23
24
25
26
27
28
29
30
31







-
-
+
+
+








impl<I> Puller<I> {
  /// Pull the oldest node off the queue and return it.
  ///
  /// If no nodes are available on the queue, then block and wait for one to
  /// become available.
  ///
  /// Returns `Err(())` if there are no more items in queue and there are no
  /// more [`Pusher`](super::Pusher) objects associated with this `Puller`.
  /// Returns `Err(StaleErr)` if there are no more items in queue and there are
  /// no more [`Pusher`](super::Pusher) objects associated with this
  /// `Puller`.
  #[cfg_attr(feature = "inline-more", inline)]
  pub fn pop(&self) -> Result<I, StaleErr> {
    let mut inner = self.0.inner.lock();
    loop {
      if inner.npushers == 0 {
        break Err(StaleErr);
      } else {
43
44
45
46
47
48
49
50

51
52
53
54
55
56
57
44
45
46
47
48
49
50

51
52
53
54
55
56
57
58







-
+







  /// Pull the oldest node off the queue and return it.
  ///
  /// If a node is available on the queue then take it off and return it.
  ///
  /// If no nodes are available and there are no more [`Pusher`](super::Pusher)
  /// objects associated with this `Puller`, then return `Err(StaleErr)`.
  ///
  /// If no nodes were available and there's at least one associated `Pusher`
  /// If no nodes are available and there's at least one associated `Pusher`
  /// exists then return `Ok(None)`.
  #[cfg_attr(feature = "inline-more", inline)]
  pub fn try_pop(&self) -> Result<Option<I>, StaleErr> {
    let mut inner = self.0.inner.lock();
    if let Some(n) = inner.q.pop_front() {
      Ok(Some(n))
    } else if inner.npushers == 0 {
91
92
93
94
95
96
97





98
99
100
101
102
103
104
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110







+
+
+
+
+







  pub fn was_empty(&self) -> bool {
    let inner = self.0.inner.lock();
    inner.q.is_empty()
  }
}

impl<I> Drop for Puller<I> {
  /// Drop a `Puller` instance.
  ///
  /// If this is the last `Puller` end-point of a sigq instance, then the inner
  /// queue will be cleared (i.e. all its elements will be immediately
  /// dropped).
  fn drop(&mut self) {
    let mut inner = self.0.inner.lock();
    inner.npullers -= 1;

    // If this is the last puller then remove all thr nodes.
    // The nodes may contain some kind of context that must be notified that
    // the node will never reach its destination.

Changes to src/push.rs.

45
46
47
48
49
50
51




52
53
54
55
56
57
58
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62







+
+
+
+







    let mut inner = self.0.inner.lock();
    inner.npushers += 1;
    Self(Arc::clone(&self.0))
  }
}

impl<I> Drop for Pusher<I> {
  /// Drop a `Pusher` instance.
  ///
  /// When the final instance of a sigq's instance's `Pusher` is dropped, wake
  /// up any `Puller`'s waiting for new nodes to arrive.
  fn drop(&mut self) {
    let mut inner = self.0.inner.lock();
    inner.npushers -= 1;

    // If this was the last pusher then wake any pullers that are waiting to
    // receive new items.  (When they discover that no pushers remain they will
    // return None).

Changes to www/changelog.md.

1
2
3
4
5
6
7
8











9
10
11
12
13
14
15
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26








+
+
+
+
+
+
+
+
+
+
+







# Change Log

## [Unreleased]

### Added

### Changed

### Removed


## [0.13.2] - 2023-07-26

### Added

### Changed

- Documentation updates.

### Removed


## [0.13.1] - 2023-07-25

### Added