sigq

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From sigq-0.11.0 To sigq-0.12.0

2023-07-25
00:28
Keep track of whether the Puller has disappeared. check-in: 58a487a4a1 user: jan tags: trunk
2023-07-24
21:34
Clippy. check-in: 8c44c91ce4 user: jan tags: sigq-0.12.0, trunk
21:29
Release maintenance. check-in: 470cdcb5d3 user: jan tags: trunk
2022-09-11
15:10
Implement inline-more feature. check-in: ee68191032 user: jan tags: trunk
2022-09-09
16:56
Prepare for release of 0.11.0. check-in: c16d005501 user: jan tags: sigq-0.11.0, trunk
16:51
Cargo.toml maintenance. check-in: 2a9cd0b2bb user: jan tags: trunk

Changes to .efiles.

1
2
3



Cargo.toml
www/changelog.md
src/lib.rs






>
>
>
1
2
3
4
5
6
Cargo.toml
www/changelog.md
src/lib.rs
src/push.rs
src/pull.rs
tests/simple.rs

Changes to Cargo.toml.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

16
17
18
19





20
21
22
23
[package]
name = "sigq"
version = "0.11.0"
authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"]
edition = "2018"
license = "0BSD"
categories = [ "asynchronous", "concurrency", "data-structures" ]
keywords = [ "threads", "sync" ]
repository = "https://repos.qrnch.tech/pub/sigq"
description = "Queue that signals waiting consumers about node availability"
exclude = [
  ".efiles",
  ".fossil-settings",
  ".fslckout",
  "rustfmt.toml",

  "www"
]
rust-version = "1.36"






[dependencies]
indexmap = { version = "1.9.1" }
parking_lot = { version = "0.12.1" }



|

|










>


|

>
>
>
>
>

|


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
[package]
name = "sigq"
version = "0.12.0"
authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"]
edition = "2021"
license = "0BSD"
categories = [ "asynchronous", "concurrency", "data-structures" ]
keywords = [ "threads", "sync" ]
repository = "https://repos.qrnch.tech/pub/sigq"
description = "Queue that signals waiting consumers about node availability"
exclude = [
  ".efiles",
  ".fossil-settings",
  ".fslckout",
  "rustfmt.toml",
  "tests",
  "www"
]
rust-version = "1.64"

[features]
default = ["inline-more"]

inline-more = []

[dependencies]
indexmap = { version = "2.0.0" }
parking_lot = { version = "0.12.1" }

Changes to src/lib.rs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14



15
16
17
18
19
20

21
22
23

24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
//! Queue which supports pushing and poping nodes from threads/tasks, crossing
//! sync/async boundaries.

use std::collections::VecDeque;
use std::future::Future;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll, Waker};

use parking_lot::{Condvar, Mutex};

use indexmap::IndexMap;




/// Inner shared data.
///
/// This is read/write data, and hence protected by a mutex.
struct Inner<I> {
  q: VecDeque<I>,

  wakers: IndexMap<usize, Waker>
}


struct Shared<I> {
  signal: Condvar,
  inner: Mutex<Inner<I>>,
  idgen: AtomicUsize
}

#[repr(transparent)]
pub struct Queue<I>(Arc<Shared<I>>);


impl<I> Queue<I> {
  /// Create, and return, a new queue.
  pub fn new() -> Self {
    Queue::default()
  }

  /// Returns a boolean indicating whether the queue was empty or not.
  ///
  /// This function is not particularly useful.  If you don't understand why,
  /// then please don't use it.
  #[inline(always)]
  pub fn was_empty(&self) -> bool {
    let inner = self.0.inner.lock();
    inner.q.is_empty()
  }

  /// Push a node on to the queue and unlock one queue reader, if any.
  ///
  /// If there are any tasks or threads waiting for new nodes to arrive they
  /// will be notified.
  #[inline(always)]
  pub fn push(&self, item: I) {
    let mut inner = self.0.inner.lock();
    inner.q.push_back(item);
    if let Some((_, n)) = inner.wakers.pop() {
      n.wake();
    }
    self.0.signal.notify_one();
    drop(inner);
  }

  /// Pull the oldest node off the queue and return it.  If no nodes are
  /// available on the queue, then block and wait for one to become available.
  #[inline(always)]
  pub fn pop(&self) -> I {
    let mut inner = self.0.inner.lock();
    let node = loop {
      match inner.q.pop_front() {
        Some(node) => {
          break node;
        }
        None => {
          self.0.signal.wait(&mut inner);
        }
      }
    };
    drop(inner);

    node
  }

  /// Pull the oldest node off the queue and return it.  If no nodes are
  /// available on the queue, then return `None`.
  #[inline(always)]
  pub fn try_pop(&self) -> Option<I> {
    let mut inner = self.0.inner.lock();
    inner.q.pop_front()
  }

  /// This method serves the same purpose as the [`pop()`](#method.pop) method,
  /// but rather than block it returns a `Future` to be used to wait for a node
  /// to arrive in an `async` context.
  ///
  /// ```
  /// use sigq::Queue;
  /// async fn test() {
  ///   let q = Queue::new();
  ///   q.push("hello".to_string());
  ///   assert_eq!(q.was_empty(), false);
  ///   let node = q.apop().await;
  ///   assert_eq!(node, "hello");
  ///   assert_eq!(q.was_empty(), true);
  /// }
  /// ```
  #[inline(always)]
  pub fn apop(&self) -> PopFuture<I> {
    PopFuture {
      ctx: Arc::clone(&self.0),
      id: None
    }
  }
}

impl<I> Default for Queue<I> {
  fn default() -> Self {
    Queue(Arc::new(Shared {
      signal: Condvar::new(),
      inner: Mutex::new(Inner {
        q: VecDeque::new(),
        wakers: IndexMap::new()
      }),
      idgen: AtomicUsize::new(1)
    }))
  }
}


#[doc(hidden)]
pub struct PopFuture<I> {
  ctx: Arc<Shared<I>>,
  id: Option<NonZeroUsize>
}

impl<I: 'static + Send> Future for PopFuture<I> {
  type Output = I;
  fn poll(
    mut self: Pin<&mut Self>,
    ctx: &mut Context<'_>
  ) -> Poll<Self::Output> {
    let mut inner = self.ctx.inner.lock();
    match inner.q.pop_front() {
      Some(node) => Poll::Ready(node),
      None => {
        // Generate a unique identifier for this waker
        let id = loop {
          let id = self.ctx.idgen.fetch_add(1, Ordering::SeqCst);
          // Make sure if is non-zero and unique
          if id == 0 || inner.wakers.contains_key(&id) {
            continue;
          }
          break id;
        };
        inner.wakers.insert(id, ctx.waker().clone());
        drop(inner);
        self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) });
        Poll::Pending
      }
    }
  }
}

impl<I> Drop for PopFuture<I> {
  fn drop(&mut self) {
    if let Some(id) = self.id {
      let mut inner = self.ctx.inner.lock();
      // Remove this future's waker
      let _ = inner.wakers.remove(&id.get());
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :



|
|
|
|
|
<
|




>
>
>






>



>






<
<
|
|
<
<
<
<
<
<
<
<
<
<
<
<
|
<
<
|
<
<
<
<
<
<
<
|
<
|
|
<
<
<
|
<
<
<
<
<
<
<
<
<
<
<
|
<
<
<
|
|
<
|
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
|
<
<
|
<
<
<
<
<
<
<
<
<
<
<
<
|

<
<
<
<
<
|
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<



1
2
3
4
5
6
7
8

9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33


34
35












36


37







38

39
40



41











42



43
44

45
46




























47


48












49
50





51




































52
53
54
//! Queue which supports pushing and poping nodes from threads/tasks, crossing
//! sync/async boundaries.

pub mod pull;
pub mod push;

use std::{
  collections::VecDeque, sync::atomic::AtomicUsize, sync::Arc, task::Waker

};

use parking_lot::{Condvar, Mutex};

use indexmap::IndexMap;

pub use pull::{Puller, StaleErr};
pub use push::Pusher;

/// Inner shared data.
///
/// This is read/write data, and hence protected by a mutex.
struct Inner<I> {
  q: VecDeque<I>,
  npushers: usize,
  wakers: IndexMap<usize, Waker>
}

/// Inner shared data.
struct Shared<I> {
  signal: Condvar,
  inner: Mutex<Inner<I>>,
  idgen: AtomicUsize
}



/// Create a new queue and return its paired push and pull objects.
pub fn new<T>() -> (Pusher<T>, Puller<T>) {












  let inner = Inner {


    q: VecDeque::new(),







    npushers: 1,

    wakers: IndexMap::new()
  };



  let shared = Shared {











    signal: Condvar::new(),



    inner: Mutex::new(inner),
    idgen: AtomicUsize::new(1)

  };
  let shared = Arc::new(shared);































  let pusher = Pusher(Arc::clone(&shared));












  let puller = Puller(shared);






  (pusher, puller)




































}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Added src/pull.rs.









































































































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
use std::{
  future::Future,
  num::NonZeroUsize,
  pin::Pin,
  sync::atomic::Ordering,
  sync::Arc,
  task::{Context, Poll}
};

#[derive(Debug, PartialEq)]
pub struct StaleErr;

/// The receiving end-point of queue.
#[repr(transparent)]
pub struct Puller<I>(pub(crate) Arc<super::Shared<I>>);

impl<I> Puller<I> {
  /// Pull the oldest node off the queue and return it.
  ///
  /// If no nodes are available on the queue, then block and wait for one to
  /// become available.
  ///
  /// Returns `Err(())` if there are no more items in queue and there are no
  /// more [`Pusher`](super::Pusher) objects associated with this `Puller`.
  #[cfg_attr(feature = "inline-more", inline)]
  pub fn pop(&self) -> Result<I, StaleErr> {
    let mut inner = self.0.inner.lock();
    loop {
      if inner.npushers == 0 {
        break Err(StaleErr);
      } else {
        match inner.q.pop_front() {
          Some(node) => {
            break Ok(node);
          }
          None => {
            self.0.signal.wait(&mut inner);
          }
        }
      }
    }
  }

  /// Pull the oldest node off the queue and return it.
  ///
  /// If a node is available on the queue then take it off and return it.
  ///
  /// If no nodes are available and there are no more [`Pusher`](super::Pusher)
  /// objects associated with this `Puller`, then return `Err(StaleErr)`.
  ///
  /// If no nodes were available and there's at least one associated `Pusher`
  /// exists then return `Ok(None)`.
  #[cfg_attr(feature = "inline-more", inline)]
  pub fn try_pop(&self) -> Result<Option<I>, StaleErr> {
    let mut inner = self.0.inner.lock();
    if let Some(n) = inner.q.pop_front() {
      Ok(Some(n))
    } else if inner.npushers == 0 {
      Err(StaleErr)
    } else {
      Ok(None)
    }
  }

  /// This method serves the same purpose as the [`pop()`](#method.pop) method,
  /// but rather than block it returns a `Future` to be used to wait for a node
  /// to arrive in an `async` context.
  ///
  /// ```
  /// async fn test() {
  ///   let (tx, rx) = sigq::new();
  ///   tx.push("hello");
  ///   assert_eq!(rx.was_empty(), false);
  ///   let node = rx.apop().await.unwrap();
  ///   assert_eq!(node, "hello");
  ///   assert_eq!(rx.was_empty(), true);
  /// }
  /// ```
  #[cfg_attr(feature = "inline-more", inline)]
  pub fn apop(&self) -> PopFuture<I> {
    PopFuture {
      ctx: Arc::clone(&self.0),
      id: None
    }
  }

  /// Returns a boolean indicating whether the queue was empty or not.
  ///
  /// This function is not particularly useful.  If you don't understand why,
  /// then please don't use it.
  #[cfg_attr(feature = "inline-more", inline)]
  pub fn was_empty(&self) -> bool {
    let inner = self.0.inner.lock();
    inner.q.is_empty()
  }
}


#[doc(hidden)]
pub struct PopFuture<I> {
  ctx: Arc<super::Shared<I>>,
  id: Option<NonZeroUsize>
}

impl<I: 'static + Send> Future for PopFuture<I> {
  type Output = Result<I, StaleErr>;
  fn poll(
    mut self: Pin<&mut Self>,
    ctx: &mut Context<'_>
  ) -> Poll<Self::Output> {
    let mut inner = self.ctx.inner.lock();
    match inner.q.pop_front() {
      Some(node) => Poll::Ready(Ok(node)),
      None => {
        if inner.npushers == 0 {
          // No more nodes and no more pushers, so return None
          Poll::Ready(Err(StaleErr))
        } else {
          // Generate a unique identifier for this waker
          let id = loop {
            let id = self.ctx.idgen.fetch_add(1, Ordering::SeqCst);
            // Make sure it is non-zero and unique
            if id == 0 || inner.wakers.contains_key(&id) {
              continue;
            }
            break id;
          };
          inner.wakers.insert(id, ctx.waker().clone());
          drop(inner);
          self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) });
          Poll::Pending
        }
      }
    }
  }
}

impl<I> Drop for PopFuture<I> {
  fn drop(&mut self) {
    if let Some(id) = self.id {
      let mut inner = self.ctx.inner.lock();
      // Remove this future's waker
      let _ = inner.wakers.remove(&id.get());
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Added src/push.rs.























































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
use std::sync::Arc;

/// The transmitting end-point of queue.
#[repr(transparent)]
pub struct Pusher<I>(pub(crate) Arc<super::Shared<I>>);

impl<I> Pusher<I> {
  /// Push a node on to the queue and unlock one queue reader, if any.
  ///
  /// If there are any tasks or threads waiting for new nodes to arrive they
  /// will be notified.
  #[cfg_attr(feature = "inline-more", inline)]
  pub fn push(&self, item: I) {
    let mut inner = self.0.inner.lock();
    inner.q.push_back(item);
    if let Some((_, n)) = inner.wakers.pop() {
      n.wake();
    }
    self.0.signal.notify_one();
    drop(inner);
  }

  /// Returns a boolean indicating whether the queue was empty or not.
  ///
  /// This function is not particularly useful.  If you don't understand why,
  /// then please don't use it.
  #[cfg_attr(feature = "inline-more", inline)]
  pub fn was_empty(&self) -> bool {
    let inner = self.0.inner.lock();
    inner.q.is_empty()
  }
}

impl<I> Clone for Pusher<I> {
  fn clone(&self) -> Self {
    let mut inner = self.0.inner.lock();
    inner.npushers += 1;
    Self(Arc::clone(&self.0))
  }
}

impl<I> Drop for Pusher<I> {
  fn drop(&mut self) {
    let mut inner = self.0.inner.lock();
    inner.npushers -= 1;

    // If this was the last pusher then wake any pullers that are waiting to
    // receive new items.  (When they discover that no pushers remain they will
    // return None).
    if inner.npushers == 0 {
      self.0.signal.notify_all();
      for waker in inner.wakers.drain(..).map(|(_k, v)| v) {
        waker.wake()
      }
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Added tests/simple.rs.































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
use sigq::StaleErr;

#[test]
fn try_pop_retvals() {
  let (tx, rx) = sigq::new();

  // There's a client, no data, so try_pop() should return Ok(None)
  assert_eq!(rx.try_pop(), Ok(None));

  tx.push("hello");

  // There's a client, data, so try_pop() should return Ok(Some)
  assert_eq!(rx.try_pop(), Ok(Some("hello")));

  // There's a client, so try_pop() should return Ok(None)
  assert_eq!(rx.try_pop(), Ok(None));

  // Drop the only pusher.
  drop(tx);

  // There are no more clients, so try_pop() should return Err(())
  assert_eq!(rx.try_pop(), Err(StaleErr));
}

#[test]
fn drop_nonempty() {
  let (tx, rx) = sigq::new();

  // There's a client, no data, so try_pop() should return Ok(None)
  assert_eq!(rx.try_pop(), Ok(None));

  tx.push("hello");

  // Drop the only pusher.  It should still be possible to pull existing nodes
  // off the stack.
  drop(tx);

  // There's a client, data, so try_pop() should return Ok(Some)
  assert_eq!(rx.try_pop(), Ok(Some("hello")));

  // There are no more clients, so try_pop() should return Err(())
  assert_eq!(rx.try_pop(), Err(StaleErr));
}

#[test]
fn unblock_on_stale() {
  let (tx, rx) = sigq::new::<&str>();

  // Kick off a thread with a Puller; blocking and waiting for a node or
  // termination.  When the Pusher is dropped this should return Err(StaleErr).
  let jh = std::thread::spawn(move || match rx.pop() {
    Ok(_) => false,
    Err(StaleErr) => true
  });

  std::thread::sleep(std::time::Duration::from_millis(500));

  drop(tx);

  assert!(jh.join().unwrap());
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to www/changelog.md.

1
2
3
4
5
6
7
8





















9
10
11
12
13
14
15
# Change Log

## [Unreleased]

### Added

### Changed






















### Removed


## [0.11.0] - 2022-09-09

### Added









>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# Change Log

## [Unreleased]

### Added

### Changed

### Removed


## [0.12.0] - 2023-07-24

### Added

- Add an `inline-more` feature (enabled by default).

### Changed

- Split `Queue` into `Pusher` and `Puller` end-points.
  - When taking nodes off the queue using `Puller`, return a `Result` which can
    indicate an error if no nodes remain and there are no more `Pushers`
    associated with the queue (implied: If no pushers remain, no new nodes will
    be added, so the queue is effectively dead).
- Dependencies updated:
  - `indexmap` updated to `2.0.0`
- Use Rust edition 2021.
- Bump MSRV to 1.64 (based on MSRV in `indexmap`)

### Removed


## [0.11.0] - 2022-09-09

### Added