sigq

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From sigq-0.12.0 To sigq-0.13.0

2023-07-25
00:33
Add missing release date. check-in: 6d69807551 user: jan tags: trunk
00:30
Release maintenance. check-in: 3d150dc1c3 user: jan tags: sigq-0.13.0, trunk
00:28
Typo. check-in: f60eca957a user: jan tags: trunk
00:28
Keep track of whether the Puller has disappeared. check-in: 58a487a4a1 user: jan tags: trunk
2023-07-24
21:34
Clippy. check-in: 8c44c91ce4 user: jan tags: sigq-0.12.0, trunk
21:29
Release maintenance. check-in: 470cdcb5d3 user: jan tags: trunk

Changes to Cargo.toml.

1
2
3

4
5
6
7
8
9
10
1
2

3
4
5
6
7
8
9
10


-
+







[package]
name = "sigq"
version = "0.12.0"
version = "0.13.0"
authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"]
edition = "2021"
license = "0BSD"
categories = [ "asynchronous", "concurrency", "data-structures" ]
keywords = [ "threads", "sync" ]
repository = "https://repos.qrnch.tech/pub/sigq"
description = "Queue that signals waiting consumers about node availability"

Changes to src/lib.rs.

8
9
10
11
12
13
14
15

16



17
18
19
20
21
22
23

24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

39
40
41
42
43
44
45
8
9
10
11
12
13
14

15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50







-
+

+
+
+







+















+







  collections::VecDeque, sync::atomic::AtomicUsize, sync::Arc, task::Waker
};

use parking_lot::{Condvar, Mutex};

use indexmap::IndexMap;

pub use pull::{Puller, StaleErr};
pub use pull::Puller;
pub use push::Pusher;

#[derive(Debug, PartialEq)]
pub struct StaleErr;

/// Inner shared data.
///
/// This is read/write data, and hence protected by a mutex.
struct Inner<I> {
  q: VecDeque<I>,
  npushers: usize,
  npullers: usize,
  wakers: IndexMap<usize, Waker>
}

/// Inner shared data.
struct Shared<I> {
  signal: Condvar,
  inner: Mutex<Inner<I>>,
  idgen: AtomicUsize
}

/// Create a new queue and return its paired push and pull objects.
pub fn new<T>() -> (Pusher<T>, Puller<T>) {
  let inner = Inner {
    q: VecDeque::new(),
    npushers: 1,
    npullers: 1,
    wakers: IndexMap::new()
  };
  let shared = Shared {
    signal: Condvar::new(),
    inner: Mutex::new(inner),
    idgen: AtomicUsize::new(1)
  };

Changes to src/pull.rs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15


16
17
18
19
20
21
22
1
2
3
4
5
6
7
8
9



10
11
12
13
14
15
16
17
18
19
20
21









-
-
-



+
+







use std::{
  future::Future,
  num::NonZeroUsize,
  pin::Pin,
  sync::atomic::Ordering,
  sync::Arc,
  task::{Context, Poll}
};

#[derive(Debug, PartialEq)]
pub struct StaleErr;

/// The receiving end-point of queue.
#[repr(transparent)]
pub struct Puller<I>(pub(crate) Arc<super::Shared<I>>);

use super::StaleErr;

impl<I> Puller<I> {
  /// Pull the oldest node off the queue and return it.
  ///
  /// If no nodes are available on the queue, then block and wait for one to
  /// become available.
  ///
90
91
92
93
94
95
96







97
98
99
100
101
102
103
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109







+
+
+
+
+
+
+







  /// then please don't use it.
  #[cfg_attr(feature = "inline-more", inline)]
  pub fn was_empty(&self) -> bool {
    let inner = self.0.inner.lock();
    inner.q.is_empty()
  }
}

impl<I> Drop for Puller<I> {
  fn drop(&mut self) {
    let mut inner = self.0.inner.lock();
    inner.npullers -= 1;
  }
}


#[doc(hidden)]
pub struct PopFuture<I> {
  ctx: Arc<super::Shared<I>>,
  id: Option<NonZeroUsize>
}

Changes to src/push.rs.

1
2
3
4
5
6


7
8
9
10
11



12
13

14



15
16
17
18
19
20







21
22
23
24
25
26
27
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

18
19
20
21
22






23
24
25
26
27
28
29
30
31
32
33
34
35
36






+
+





+
+
+

-
+

+
+
+
-
-
-
-
-
-
+
+
+
+
+
+
+







use std::sync::Arc;

/// The transmitting end-point of queue.
#[repr(transparent)]
pub struct Pusher<I>(pub(crate) Arc<super::Shared<I>>);

use super::StaleErr;

impl<I> Pusher<I> {
  /// Push a node on to the queue and unlock one queue reader, if any.
  ///
  /// If there are any tasks or threads waiting for new nodes to arrive they
  /// will be notified.
  ///
  /// Returns `Err(StaleErr)` is there are no [`Puller`](super::Puller)'s
  /// available to receive any new nodes.
  #[cfg_attr(feature = "inline-more", inline)]
  pub fn push(&self, item: I) {
  pub fn push(&self, item: I) -> Result<(), StaleErr> {
    let mut inner = self.0.inner.lock();
    if inner.npullers == 0 {
      Err(StaleErr)
    } else {
    inner.q.push_back(item);
    if let Some((_, n)) = inner.wakers.pop() {
      n.wake();
    }
    self.0.signal.notify_one();
    drop(inner);
      inner.q.push_back(item);
      if let Some((_, n)) = inner.wakers.pop() {
        n.wake();
      }
      self.0.signal.notify_one();
      Ok(())
    }
  }

  /// Returns a boolean indicating whether the queue was empty or not.
  ///
  /// This function is not particularly useful.  If you don't understand why,
  /// then please don't use it.
  #[cfg_attr(feature = "inline-more", inline)]

Changes to tests/simple.rs.

1
2
3
4
5
6
7
8
9
10

11
12
13
14
15
16
17
1
2
3
4
5
6
7
8
9

10
11
12
13
14
15
16
17









-
+







use sigq::StaleErr;

#[test]
fn try_pop_retvals() {
  let (tx, rx) = sigq::new();

  // There's a client, no data, so try_pop() should return Ok(None)
  assert_eq!(rx.try_pop(), Ok(None));

  tx.push("hello");
  tx.push("hello").unwrap();

  // There's a client, data, so try_pop() should return Ok(Some)
  assert_eq!(rx.try_pop(), Ok(Some("hello")));

  // There's a client, so try_pop() should return Ok(None)
  assert_eq!(rx.try_pop(), Ok(None));

25
26
27
28
29
30
31
32

33
34
35
36
37
38
39
25
26
27
28
29
30
31

32
33
34
35
36
37
38
39







-
+







#[test]
fn drop_nonempty() {
  let (tx, rx) = sigq::new();

  // There's a client, no data, so try_pop() should return Ok(None)
  assert_eq!(rx.try_pop(), Ok(None));

  tx.push("hello");
  tx.push("hello").unwrap();

  // Drop the only pusher.  It should still be possible to pull existing nodes
  // off the stack.
  drop(tx);

  // There's a client, data, so try_pop() should return Ok(Some)
  assert_eq!(rx.try_pop(), Ok(Some("hello")));
55
56
57
58
59
60
61
62









63
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72








+
+
+
+
+
+
+
+
+


  std::thread::sleep(std::time::Duration::from_millis(500));

  drop(tx);

  assert!(jh.join().unwrap());
}

#[test]
fn stale_puller() {
  let (tx, rx) = sigq::new::<&str>();

  drop(rx);

  assert_eq!(tx.push("hello"), Err(StaleErr));
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to www/changelog.md.

1
2
3
4
5
6
7
8












9
10
11
12
13
14
15
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27








+
+
+
+
+
+
+
+
+
+
+
+







# Change Log

## [Unreleased]

### Added

### Changed

### Removed


## [0.13.0]

### Added

### Changed

- `Pusher::push()` will return `Err(StaleErr)` if there are no more associated
  `Puller`s.

### Removed


## [0.12.0] - 2023-07-24

### Added