sigq

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From sigq-0.13.3 To sigq-0.13.4

2024-09-09
18:41
Crate maintenance. check-in: 0bee0469d7 user: jan tags: trunk
2023-09-15
16:55
Release maintenance. check-in: 1dfcd66b82 user: jan tags: sigq-0.13.4, trunk
16:04
Implement Clone manually instead of using derive, because using derive forces the inner type(s) to derive Clone as well. check-in: 1fc050c0ee user: jan tags: trunk
13:32
Implement WeakPusher, a weak reference to Pusher objects. check-in: bd3e071eea user: jan tags: trunk
2023-07-27
01:26
Release maintenance. check-in: 727d7924a6 user: jan tags: sigq-0.13.3, trunk
01:21
Fix premature Puller stale error bug. check-in: 68dd3c42ad user: jan tags: trunk

Changes to Cargo.toml.

1
2
3

4
5
6
7
8
9
10
11
12
13

14
15
16
17


18
19
20
21
22
23
24
25
26
27
28
29



1
2

3
4
5
6
7
8
9
10
11

12
13
14



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31


-
+








-

+

-
-
-
+
+












+
+
+
[package]
name = "sigq"
version = "0.13.3"
version = "0.13.4"
authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"]
edition = "2021"
license = "0BSD"
categories = [ "asynchronous", "concurrency", "data-structures" ]
keywords = [ "threads", "sync" ]
repository = "https://repos.qrnch.tech/pub/sigq"
description = "Queue that signals waiting consumers about node availability"
exclude = [
  ".efiles",
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "rustfmt.toml",
  "tests",
  "www"
  "www",
  "rustfmt.toml"
]
rust-version = "1.64"

[features]
default = ["inline-more"]

inline-more = []

[dependencies]
indexmap = { version = "2.0.0" }
parking_lot = { version = "0.12.1" }

[package.metadata.docs.rs]
rustdoc-args = ["--generate-link-to-definition"]

Changes to src/lib.rs.

37
38
39
40
41
42
43
44

45
46
47
48
49
50
51
37
38
39
40
41
42
43

44
45
46
47
48
49
50
51







-
+







};

use parking_lot::{Condvar, Mutex};

use indexmap::IndexMap;

pub use pull::Puller;
pub use push::Pusher;
pub use push::{Pusher, WeakPusher};

/// Error value used to indicate that there are no remote end-points available.
///
/// If a `Puller` method returns this it means the queue has no more associated
/// `Pusher`'s, which implies that no new nodes can become available.
///
/// If a `Pusher` method returns this it means that the queue has no more

Changes to src/push.rs.

1

2
3
4
5
6
7
8

1
2
3
4
5
6
7
8
-
+







use std::sync::Arc;
use std::sync::{Arc, Weak};

/// The transmitting end-point of queue.
#[repr(transparent)]
pub struct Pusher<I>(pub(crate) Arc<super::Shared<I>>);

use super::StaleErr;

34
35
36
37
38
39
40





41
42
43
44
45
46
47
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52







+
+
+
+
+







  /// This function is not particularly useful.  If you don't understand why,
  /// then please don't use it.
  #[cfg_attr(feature = "inline-more", inline)]
  pub fn was_empty(&self) -> bool {
    let inner = self.0.inner.lock();
    inner.q.is_empty()
  }

  /// Create a weak reference to this `Pusher`.
  pub fn weak(&self) -> WeakPusher<I> {
    WeakPusher(Arc::downgrade(&self.0))
  }
}

impl<I> Clone for Pusher<I> {
  fn clone(&self) -> Self {
    let mut inner = self.0.inner.lock();
    inner.npushers += 1;
    Self(Arc::clone(&self.0))
64
65
66
67
68
69
70
71

























72
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102








+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

      self.0.signal.notify_all();
      for waker in inner.wakers.drain(..).map(|(_k, v)| v) {
        waker.wake()
      }
    }
  }
}

/// A weak reference to a [`Pusher`].
#[repr(transparent)]
pub struct WeakPusher<I>(pub(crate) Weak<super::Shared<I>>);

impl<I> Clone for WeakPusher<I> {
  fn clone(&self) -> Self {
    Self(Weak::clone(&self.0))
  }
}

impl<I> WeakPusher<I> {
  /// Attempt to upgrade `WeakPusher` to a [`Pusher`].
  ///
  /// Returns `None` is all the strong references have been exhausted.
  pub fn upgrade(&self) -> Option<Pusher<I>> {
    if let Some(strong) = self.0.upgrade() {
      let mut inner = strong.inner.lock();
      inner.npushers += 1;
      Some(Pusher(Arc::clone(&strong)))
    } else {
      None
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to tests/simple.rs.

1
















2
3
4
5
6
7
8
9

10
11

12
13
14
15
16
17
18
19
20
21
22
23














24
25
26
27
28
29
30

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21



22


23





24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+





-
-
-
+
-
-
+
-
-
-
-
-







+
+
+
+
+
+
+
+
+
+
+
+
+
+







use sigq::StaleErr;
use sigq::{Puller, Pusher, StaleErr};


/// A reustable test
fn std_test(tx: &Pusher<&str>, rx: &Puller<&str>) {
  // There's a client, no data; try_pop() should return Ok(None)
  assert_eq!(rx.try_pop(), Ok(None));

  tx.push("hello").unwrap();

  // There's a client, data; try_pop() should return Ok(Some)
  assert_eq!(rx.try_pop(), Ok(Some("hello")));

  // There's a client; try_pop() should return Ok(None)
  assert_eq!(rx.try_pop(), Ok(None));
}

#[test]
fn try_pop_retvals() {
  let (tx, rx) = sigq::new();

  // There's a client, no data, so try_pop() should return Ok(None)
  assert_eq!(rx.try_pop(), Ok(None));

  // Run the common tests
  tx.push("hello").unwrap();

  std_test(&tx, &rx);
  // There's a client, data, so try_pop() should return Ok(Some)
  assert_eq!(rx.try_pop(), Ok(Some("hello")));

  // There's a client, so try_pop() should return Ok(None)
  assert_eq!(rx.try_pop(), Ok(None));

  // Drop the only pusher.
  drop(tx);

  // There are no more clients, so try_pop() should return Err(())
  assert_eq!(rx.try_pop(), Err(StaleErr));
}

#[test]
fn try_pop_with_only_weak_tx() {
  let (tx, rx) = sigq::new::<&str>();

  // Create a weak reference to the pusher
  let _wtx = tx.weak();

  // Drop the only (strong) pusher.
  drop(tx);

  // There are no more clients; try_pop() should return Err(StaleErr)
  assert_eq!(rx.try_pop(), Err(StaleErr));
}

#[test]
fn drop_nonempty() {
  let (tx, rx) = sigq::new();

  // There's a client, no data, so try_pop() should return Ok(None)
  assert_eq!(rx.try_pop(), Ok(None));
64
65
66
67
68
69
70
71














































72
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139








+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

fn stale_puller() {
  let (tx, rx) = sigq::new::<&str>();

  drop(rx);

  assert_eq!(tx.push("hello"), Err(StaleErr));
}

/// Make sure that a weakened pusher can be upgraded and used
#[test]
fn weak_to_strong() {
  let (tx, rx) = sigq::new::<&str>();

  // Create a weak reference to the pusher
  let wtx = tx.weak();

  // Create a strong Pusher from the weak reference
  let Some(tx2) = wtx.upgrade() else {
    panic!("Unable to upgrade weak Pusher");
  };

  // Run the common tests
  std_test(&tx2, &rx);

  // Drop the only (strong) pusher.
  drop(tx);

  // There's still a Pusher; try_pop() should return Ok(None)
  assert_eq!(rx.try_pop(), Ok(None));

  drop(tx2);

  // There are no more Pushers; try_pop() should return Err(StaleErr)
  assert_eq!(rx.try_pop(), Err(StaleErr));
}

/// If the last strong `Pusher` is dropped a `WeakPusher` should fail to
/// upgrade to a `Pusher`.
#[test]
fn no_strong_weak_upgrade_fail() {
  let (tx, _rx) = sigq::new::<&str>();

  // Create a weak reference to the pusher
  let wtx = tx.weak();

  // Drop the only (strong) pusher.
  drop(tx);

  // The only strong pusher was released; upgrading the weak one should fail
  let Some(_) = wtx.upgrade() else {
    panic!("Upgrade unexpectedly successful");
  };
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to www/changelog.md.

1
2
3
4
5
6
7
8
9
10









11
12
13
14
15
16
17
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26










+
+
+
+
+
+
+
+
+







# Change Log

## [Unreleased]

### Added

### Changed

### Removed


## [0.13.4] - 2023-09-15

### Added

- `Pusher` objects can spawn downgraded to `WeakPusher` objects, that in turn
  can be upgraded to `Pusher` objects (as long as all the strong `Pusher`
  objects have not been dropped).


## [0.13.3] - 2023-07-27

### Changed

- Fixed bug that caused `Puller` to return `Err(StaleErr)` even though nodes
  still remain in the queue.