sigq

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From sigq-0.13.4 To sigq-0.13.5

2024-09-09
23:17
Do not publish bacon.toml. check-in: 718a7a76ce user: jan tags: trunk
23:14
Release maintenance. check-in: a2b2d5e4e4 user: jan tags: sigq-0.13.5, trunk
23:07
Support 'managed' pop operations, which return queue elements inside a wrapper that must explicitly be marked as handled, or it will returtn the element to the queue on Drop. check-in: cb0b1b5bbc user: jan tags: trunk
18:41
Crate maintenance. check-in: 0bee0469d7 user: jan tags: trunk
2023-09-15
16:55
Release maintenance. check-in: 1dfcd66b82 user: jan tags: sigq-0.13.4, trunk
16:04
Implement Clone manually instead of using derive, because using derive forces the inner type(s) to derive Clone as well. check-in: 1fc050c0ee user: jan tags: trunk

Changes to .efiles.

1
2
3
4
5
6

Cargo.toml
www/changelog.md
src/lib.rs
src/push.rs
src/pull.rs
tests/simple.rs







>
1
2
3
4
5
6
7
Cargo.toml
www/changelog.md
src/lib.rs
src/push.rs
src/pull.rs
tests/simple.rs
tests/managed.rs

Changes to Cargo.toml.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
[package]
name = "sigq"
version = "0.13.4"
authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"]
edition = "2021"
license = "0BSD"
categories = [ "asynchronous", "concurrency", "data-structures" ]
keywords = [ "threads", "sync" ]
repository = "https://repos.qrnch.tech/pub/sigq"
description = "Queue that signals waiting consumers about node availability"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "www",
  "rustfmt.toml"
]
rust-version = "1.64"

[features]
default = ["inline-more"]

inline-more = []

[dependencies]
indexmap = { version = "2.0.0" }
parking_lot = { version = "0.12.1" }

[package.metadata.docs.rs]
rustdoc-args = ["--generate-link-to-definition"]



|
<

















<



|
|




1
2
3

4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

21
22
23
24
25
26
27
28
29
[package]
name = "sigq"
version = "0.13.5"

edition = "2021"
license = "0BSD"
categories = [ "asynchronous", "concurrency", "data-structures" ]
keywords = [ "threads", "sync" ]
repository = "https://repos.qrnch.tech/pub/sigq"
description = "Queue that signals waiting consumers about node availability"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "www",
  "rustfmt.toml"
]
rust-version = "1.64"

[features]
default = ["inline-more"]

inline-more = []

[dependencies]
indexmap = { version = "2.5.0" }
parking_lot = { version = "0.12.3" }

[package.metadata.docs.rs]
rustdoc-args = ["--generate-link-to-definition"]

Added bacon.toml.





































































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# This is a configuration file for the bacon tool
#
# Bacon repository: https://github.com/Canop/bacon
# Complete help on configuration: https://dystroy.org/bacon/config/
# You can also check bacon's own bacon.toml file
#  as an example: https://github.com/Canop/bacon/blob/main/bacon.toml

# For information about clippy lints, see:
# https://github.com/rust-lang/rust-clippy/blob/master/README.md

#default_job = "check"
default_job = "clippy-all-pedantic"

[jobs.check]
command = ["cargo", "check", "--color", "always"]
need_stdout = false

[jobs.check-all]
command = ["cargo", "check", "--all-targets", "--color", "always"]
need_stdout = false

# Run clippy on the default target
[jobs.clippy]
command = [
    "cargo", "clippy",
    "--color", "always",
]
need_stdout = false

# Run clippy on all targets
# To disable some lints, you may change the job this way:
#    [jobs.clippy-all]
#    command = [
#        "cargo", "clippy",
#        "--all-targets",
#        "--color", "always",
#    	 "--",
#    	 "-A", "clippy::bool_to_int_with_if",
#    	 "-A", "clippy::collapsible_if",
#    	 "-A", "clippy::derive_partial_eq_without_eq",
#    ]
# need_stdout = false
[jobs.clippy-all]
command = [
    "cargo", "clippy",
    "--all-targets",
    "--color", "always",
]
need_stdout = false

[jobs.clippy-pedantic]
command = [
    "cargo", "clippy",
    "--color", "always",
    "--",
    "-Wclippy::all",
    "-Wclippy::pedantic",
    "-Wclippy::nursery",
    "-Wclippy::cargo"
]
need_stdout = false

[jobs.clippy-all-pedantic]
command = [
    "cargo", "clippy",
    "--all-targets",
    "--color", "always",
    "--",
    "-Wclippy::all",
    "-Wclippy::pedantic",
    "-Wclippy::nursery",
    "-Wclippy::cargo"
]
need_stdout = false

# This job lets you run
# - all tests: bacon test
# - a specific test: bacon test -- config::test_default_files
# - the tests of a package: bacon test -- -- -p config
[jobs.test]
command = [
    "cargo", "test", "--color", "always",
    "--", "--color", "always", # see https://github.com/Canop/bacon/issues/124
]
need_stdout = true

[jobs.doc]
command = ["cargo", "doc", "--color", "always", "--no-deps"]
need_stdout = false

# If the doc compiles, then it opens in your browser and bacon switches
# to the previous job
[jobs.doc-open]
command = ["cargo", "doc", "--color", "always", "--no-deps", "--open"]
need_stdout = false
on_success = "back" # so that we don't open the browser at each change

# You can run your application and have the result displayed in bacon,
# *if* it makes sense for this crate.
# Don't forget the `--color always` part or the errors won't be
# properly parsed.
# If your program never stops (eg a server), you may set `background`
# to false to have the cargo run output immediately displayed instead
# of waiting for program's end.
[jobs.run]
command = [
    "cargo", "run",
    "--color", "always",
    # put launch parameters for your program behind a `--` separator
]
need_stdout = true
allow_warnings = true
background = true

# This parameterized job runs the example of your choice, as soon
# as the code compiles.
# Call it as
#    bacon ex -- my-example
[jobs.ex]
command = ["cargo", "run", "--color", "always", "--example"]
need_stdout = true
allow_warnings = true

# You may define here keybindings that would be specific to
# a project, for example a shortcut to launch a specific job.
# Shortcuts to internal functions (scrolling, toggling, etc.)
# should go in your personal global prefs.toml file instead.
[keybindings]
# alt-m = "job:my-job"
c = "job:clippy-all" # comment this to have 'c' run clippy on only the default target

Changes to src/lib.rs.

36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

75
76
77
78
79
80
81
  collections::VecDeque, sync::atomic::AtomicUsize, sync::Arc, task::Waker
};

use parking_lot::{Condvar, Mutex};

use indexmap::IndexMap;

pub use pull::Puller;
pub use push::{Pusher, WeakPusher};

/// Error value used to indicate that there are no remote end-points available.
///
/// If a `Puller` method returns this it means the queue has no more associated
/// `Pusher`'s, which implies that no new nodes can become available.
///
/// If a `Pusher` method returns this it means that the queue has no more
/// associated `Puller`'s, which implies that there's nothing to take nodes off
/// the queue any longer.
#[derive(Debug, PartialEq)]
pub struct StaleErr;

/// Inner shared data.
///
/// This is read/write data, and hence protected by a mutex.
struct Inner<I> {
  q: VecDeque<I>,
  npushers: usize,
  npullers: usize,
  wakers: IndexMap<usize, Waker>
}

/// Inner shared data.
struct Shared<I> {
  signal: Condvar,
  inner: Mutex<Inner<I>>,
  idgen: AtomicUsize
}

/// Create a new queue and return its paired push and pull objects.

pub fn new<T>() -> (Pusher<T>, Puller<T>) {
  let inner = Inner {
    q: VecDeque::new(),
    npushers: 1,
    npullers: 1,
    wakers: IndexMap::new()
  };







|










|




















>







36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
  collections::VecDeque, sync::atomic::AtomicUsize, sync::Arc, task::Waker
};

use parking_lot::{Condvar, Mutex};

use indexmap::IndexMap;

pub use pull::{MustHandle, Puller};
pub use push::{Pusher, WeakPusher};

/// Error value used to indicate that there are no remote end-points available.
///
/// If a `Puller` method returns this it means the queue has no more associated
/// `Pusher`'s, which implies that no new nodes can become available.
///
/// If a `Pusher` method returns this it means that the queue has no more
/// associated `Puller`'s, which implies that there's nothing to take nodes off
/// the queue any longer.
#[derive(Debug, PartialEq, Eq)]
pub struct StaleErr;

/// Inner shared data.
///
/// This is read/write data, and hence protected by a mutex.
struct Inner<I> {
  q: VecDeque<I>,
  npushers: usize,
  npullers: usize,
  wakers: IndexMap<usize, Waker>
}

/// Inner shared data.
struct Shared<I> {
  signal: Condvar,
  inner: Mutex<Inner<I>>,
  idgen: AtomicUsize
}

/// Create a new queue and return its paired push and pull objects.
#[must_use]
pub fn new<T>() -> (Pusher<T>, Puller<T>) {
  let inner = Inner {
    q: VecDeque::new(),
    npushers: 1,
    npullers: 1,
    wakers: IndexMap::new()
  };

Changes to src/pull.rs.

1
2

3

4
5
6
7
8
9
10
11
12
13
14
15

































































16
17
18
19
20
21

22
23
24
25
26
27
28
29
30
31

32
33
34
35
36
37
38
39
40
41
42











43
44
45
46
47
48
49
50

51
52

53

54
55
56
57
58
59
60
61
62
63



















64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

80
81
82
83
84
85












86
87
88
89
90
91

92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use std::{
  future::Future,

  num::NonZeroUsize,

  pin::Pin,
  sync::atomic::Ordering,
  sync::Arc,
  task::{Context, Poll}
};

/// The receiving end-point of queue.
#[repr(transparent)]
pub struct Puller<I>(pub(crate) Arc<super::Shared<I>>);

use super::StaleErr;


































































impl<I> Puller<I> {
  /// Pull the oldest node off the queue and return it.
  ///
  /// If no nodes are available on the queue, then block and wait for one to
  /// become available.
  ///

  /// Returns `Err(StaleErr)` if there are no more items in queue and there are
  /// no more [`Pusher`](super::Pusher) objects associated with this
  /// `Puller`.
  #[cfg_attr(feature = "inline-more", inline)]
  pub fn pop(&self) -> Result<I, StaleErr> {
    let mut inner = self.0.inner.lock();
    loop {
      if inner.q.is_empty() && inner.npushers == 0 {
        break Err(StaleErr);
      } else {

        match inner.q.pop_front() {
          Some(node) => {
            break Ok(node);
          }
          None => {
            self.0.signal.wait(&mut inner);
          }
        }
      }
    }
  }












  /// Pull the oldest node off the queue and return it.
  ///
  /// If a node is available on the queue then take it off and return it.
  ///
  /// If no nodes are available and there are no more [`Pusher`](super::Pusher)
  /// objects associated with this `Puller`, then return `Err(StaleErr)`.
  ///

  /// If no nodes are available and there's at least one associated `Pusher`
  /// exists then return `Ok(None)`.

  #[cfg_attr(feature = "inline-more", inline)]

  pub fn try_pop(&self) -> Result<Option<I>, StaleErr> {
    let mut inner = self.0.inner.lock();
    if let Some(n) = inner.q.pop_front() {
      Ok(Some(n))
    } else if inner.npushers == 0 {
      Err(StaleErr)
    } else {
      Ok(None)
    }
  }




















  /// This method serves the same purpose as the [`pop()`](#method.pop) method,
  /// but rather than block it returns a `Future` to be used to wait for a node
  /// to arrive in an `async` context.
  ///
  /// ```
  /// async fn test() {
  ///   let (tx, rx) = sigq::new();
  ///   tx.push("hello");
  ///   assert_eq!(rx.was_empty(), false);
  ///   let node = rx.apop().await.unwrap();
  ///   assert_eq!(node, "hello");
  ///   assert_eq!(rx.was_empty(), true);
  /// }
  /// ```
  #[cfg_attr(feature = "inline-more", inline)]

  pub fn apop(&self) -> PopFuture<I> {
    PopFuture {
      ctx: Arc::clone(&self.0),
      id: None
    }
  }













  /// Returns a boolean indicating whether the queue was empty or not.
  ///
  /// This function is not particularly useful.  If you don't understand why,
  /// then please don't use it.
  #[cfg_attr(feature = "inline-more", inline)]

  pub fn was_empty(&self) -> bool {
    let inner = self.0.inner.lock();
    inner.q.is_empty()
  }
}

impl<I> Drop for Puller<I> {
  /// Drop a `Puller` instance.
  ///
  /// If this is the last `Puller` end-point of a sigq instance, then the inner
  /// queue will be cleared (i.e. all its elements will be immediately
  /// dropped).
  fn drop(&mut self) {
    let mut inner = self.0.inner.lock();
    inner.npullers -= 1;

    // If this is the last puller then remove all thr nodes.
    // The nodes may contain some kind of context that must be notified that
    // the node will never reach its destination.
    if inner.npullers == 0 {
      inner.q.clear();
    }
  }
}




>

>












>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>






>
|
|
<






<
>
|
|
|
|
|
|
<




>
>
>
>
>
>
>
>
>
>
>





|
|

>
|
<
>

>










>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
















>






>
>
>
>
>
>
>
>
>
>
>
>






>


















|







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91

92
93
94
95
96
97

98
99
100
101
102
103
104

105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129

130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
use std::{
  future::Future,
  mem::ManuallyDrop,
  num::NonZeroUsize,
  ops::{Deref, DerefMut},
  pin::Pin,
  sync::atomic::Ordering,
  sync::Arc,
  task::{Context, Poll}
};

/// The receiving end-point of queue.
#[repr(transparent)]
pub struct Puller<I>(pub(crate) Arc<super::Shared<I>>);

use super::StaleErr;

#[derive(Default)]
enum DropAction {
  #[default]
  ReturnToQueue,
  Drop,
  Nothing
}

/// Wrapper around elements that must be handled by the application.
pub struct MustHandle<T> {
  sh: Arc<super::Shared<T>>,
  inner: ManuallyDrop<T>,
  drop_action: DropAction
}

impl<T> MustHandle<T> {
  fn new(sh: Arc<super::Shared<T>>, inner: T) -> Self {
    Self {
      sh,
      inner: ManuallyDrop::new(inner),
      drop_action: DropAction::default()
    }
  }

  /// Mark the inner object has handled and then drop it.
  pub fn handled(mut self) {
    self.drop_action = DropAction::Drop;
  }

  /// Remove the inner object from the `MustHandle` and return it.
  pub fn into_inner(mut self) -> T {
    self.drop_action = DropAction::Nothing;
    unsafe { ManuallyDrop::take(&mut self.inner) }
  }
}

impl<T> Deref for MustHandle<T> {
  type Target = T;

  fn deref(&self) -> &T {
    &self.inner
  }
}

impl<T> DerefMut for MustHandle<T> {
  fn deref_mut(&mut self) -> &mut T {
    &mut self.inner
  }
}

impl<T> Drop for MustHandle<T> {
  fn drop(&mut self) {
    match self.drop_action {
      DropAction::ReturnToQueue => {
        let t = unsafe { ManuallyDrop::take(&mut self.inner) };
        let mut inner = self.sh.inner.lock();
        inner.q.push_front(t);
      }
      DropAction::Drop => unsafe { ManuallyDrop::drop(&mut self.inner) },
      DropAction::Nothing => {}
    }
  }
}


impl<I> Puller<I> {
  /// Pull the oldest node off the queue and return it.
  ///
  /// If no nodes are available on the queue, then block and wait for one to
  /// become available.
  ///
  /// # Errors
  /// `StaleErr` means there are no more items in queue and there are no more
  /// [`Pusher`](super::Pusher) objects associated with this `Puller`.

  #[cfg_attr(feature = "inline-more", inline)]
  pub fn pop(&self) -> Result<I, StaleErr> {
    let mut inner = self.0.inner.lock();
    loop {
      if inner.q.is_empty() && inner.npushers == 0 {
        break Err(StaleErr);

      }
      match inner.q.pop_front() {
        Some(node) => {
          break Ok(node);
        }
        None => {
          self.0.signal.wait(&mut inner);

        }
      }
    }
  }

  /// Take an element off the queue that must be handled by the application, or
  /// it will be returned to the queue.
  ///
  /// # Errors
  /// `StaleErr` means there are no more items in queue and there are no more
  /// [`Pusher`](super::Pusher) objects associated with this `Puller`.
  pub fn pop_managed(&self) -> Result<MustHandle<I>, StaleErr> {
    let n = self.pop()?;
    Ok(MustHandle::new(Arc::clone(&self.0), n))
  }

  /// Pull the oldest node off the queue and return it.
  ///
  /// If a node is available on the queue then take it off and return it.
  ///
  /// If no nodes are available and there's at least one associated `Pusher`
  /// exists then return `Ok(None)`.
  ///
  /// # Errors
  /// `StaleErr` is returned if no nodes are available and there are no more

  /// [`Pusher`](super::Pusher) objects associated with this `Puller`.
  #[cfg_attr(feature = "inline-more", inline)]
  #[allow(clippy::option_if_let_else)]
  pub fn try_pop(&self) -> Result<Option<I>, StaleErr> {
    let mut inner = self.0.inner.lock();
    if let Some(n) = inner.q.pop_front() {
      Ok(Some(n))
    } else if inner.npushers == 0 {
      Err(StaleErr)
    } else {
      Ok(None)
    }
  }

  /// Take an element off the queue that must be handled by the application, or
  /// it will be returned to the queue.
  ///
  /// If a node is available on the queue then take it off and return it.
  ///
  /// If no nodes are available and there's at least one associated `Pusher`
  /// exists then return `Ok(None)`.
  ///
  /// # Errors
  /// `StaleErr` is returned if no nodes are available and there are no more
  /// [`Pusher`](super::Pusher) objects associated with this `Puller`.
  pub fn try_pop_managed(&self) -> Result<Option<MustHandle<I>>, StaleErr> {
    Ok(
      self
        .try_pop()?
        .map(|n| MustHandle::new(Arc::clone(&self.0), n))
    )
  }

  /// This method serves the same purpose as the [`pop()`](#method.pop) method,
  /// but rather than block it returns a `Future` to be used to wait for a node
  /// to arrive in an `async` context.
  ///
  /// ```
  /// async fn test() {
  ///   let (tx, rx) = sigq::new();
  ///   tx.push("hello");
  ///   assert_eq!(rx.was_empty(), false);
  ///   let node = rx.apop().await.unwrap();
  ///   assert_eq!(node, "hello");
  ///   assert_eq!(rx.was_empty(), true);
  /// }
  /// ```
  #[cfg_attr(feature = "inline-more", inline)]
  #[must_use]
  pub fn apop(&self) -> PopFuture<I> {
    PopFuture {
      ctx: Arc::clone(&self.0),
      id: None
    }
  }

  /// This method serves the same purpose as the [`pop()`](#method.pop) method,
  /// but rather than block it returns a `Future` to be used to wait for a node
  /// to arrive in an `async` context.
  #[cfg_attr(feature = "inline-more", inline)]
  #[must_use]
  pub fn apop_managed(&self) -> PopManagedFuture<I> {
    PopManagedFuture {
      ctx: Arc::clone(&self.0),
      id: None
    }
  }

  /// Returns a boolean indicating whether the queue was empty or not.
  ///
  /// This function is not particularly useful.  If you don't understand why,
  /// then please don't use it.
  #[cfg_attr(feature = "inline-more", inline)]
  #[must_use]
  pub fn was_empty(&self) -> bool {
    let inner = self.0.inner.lock();
    inner.q.is_empty()
  }
}

impl<I> Drop for Puller<I> {
  /// Drop a `Puller` instance.
  ///
  /// If this is the last `Puller` end-point of a sigq instance, then the inner
  /// queue will be cleared (i.e. all its elements will be immediately
  /// dropped).
  fn drop(&mut self) {
    let mut inner = self.0.inner.lock();
    inner.npullers -= 1;

    // If this is the last puller then remove all thr nodes.
    // The nodes may contain some kind of context that must be notified that
    // the node will never reach its intended destination.
    if inner.npullers == 0 {
      inner.q.clear();
    }
  }
}


155
156
157
158
159
160
161
162
163
164
165
166




















































167
}

impl<I> Drop for PopFuture<I> {
  fn drop(&mut self) {
    if let Some(id) = self.id {
      let mut inner = self.ctx.inner.lock();
      // Remove this future's waker
      let _ = inner.wakers.remove(&id.get());
    }
  }
}





















































// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :







|




>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
}

impl<I> Drop for PopFuture<I> {
  fn drop(&mut self) {
    if let Some(id) = self.id {
      let mut inner = self.ctx.inner.lock();
      // Remove this future's waker
      let _ = inner.wakers.swap_remove(&id.get());
    }
  }
}


#[doc(hidden)]
pub struct PopManagedFuture<I> {
  ctx: Arc<super::Shared<I>>,
  id: Option<NonZeroUsize>
}

impl<I: 'static + Send> Future for PopManagedFuture<I> {
  type Output = Result<MustHandle<I>, StaleErr>;
  fn poll(
    mut self: Pin<&mut Self>,
    ctx: &mut Context<'_>
  ) -> Poll<Self::Output> {
    let mut inner = self.ctx.inner.lock();
    match inner.q.pop_front() {
      Some(node) => {
        Poll::Ready(Ok(MustHandle::new(Arc::clone(&self.ctx), node)))
      }
      None => {
        if inner.q.is_empty() && inner.npushers == 0 {
          // No more nodes and no more pushers, so return None
          Poll::Ready(Err(StaleErr))
        } else {
          // Generate a unique identifier for this waker
          let id = loop {
            let id = self.ctx.idgen.fetch_add(1, Ordering::SeqCst);
            // Make sure it is non-zero and unique
            if id == 0 || inner.wakers.contains_key(&id) {
              continue;
            }
            break id;
          };
          inner.wakers.insert(id, ctx.waker().clone());
          drop(inner);
          self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) });
          Poll::Pending
        }
      }
    }
  }
}

impl<I> Drop for PopManagedFuture<I> {
  fn drop(&mut self) {
    if let Some(id) = self.id {
      let mut inner = self.ctx.inner.lock();
      // Remove this future's waker
      let _ = inner.wakers.swap_remove(&id.get());
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/push.rs.

8
9
10
11
12
13
14

15
16
17
18
19
20
21
22
23
24
25
26
27

28
29
30
31
32
33
34
35
36

37
38
39
40
41
42

43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90

91
92

93
94
95
96
97
98

99
100
101
102

impl<I> Pusher<I> {
  /// Push a node on to the queue and unlock one queue reader, if any.
  ///
  /// If there are any tasks or threads waiting for new nodes to arrive they
  /// will be notified.
  ///

  /// Returns `Err(StaleErr)` is there are no [`Puller`](super::Puller)'s
  /// available to receive any new nodes.
  #[cfg_attr(feature = "inline-more", inline)]
  pub fn push(&self, item: I) -> Result<(), StaleErr> {
    let mut inner = self.0.inner.lock();
    if inner.npullers == 0 {
      Err(StaleErr)
    } else {
      inner.q.push_back(item);
      if let Some((_, n)) = inner.wakers.pop() {
        n.wake();
      }
      self.0.signal.notify_one();

      Ok(())
    }
  }

  /// Returns a boolean indicating whether the queue was empty or not.
  ///
  /// This function is not particularly useful.  If you don't understand why,
  /// then please don't use it.
  #[cfg_attr(feature = "inline-more", inline)]

  pub fn was_empty(&self) -> bool {
    let inner = self.0.inner.lock();
    inner.q.is_empty()
  }

  /// Create a weak reference to this `Pusher`.

  pub fn weak(&self) -> WeakPusher<I> {
    WeakPusher(Arc::downgrade(&self.0))
  }
}

impl<I> Clone for Pusher<I> {
  fn clone(&self) -> Self {
    let mut inner = self.0.inner.lock();
    inner.npushers += 1;
    Self(Arc::clone(&self.0))
  }
}

impl<I> Drop for Pusher<I> {
  /// Drop a `Pusher` instance.
  ///
  /// When the final instance of a sigq's instance's `Pusher` is dropped, wake
  /// up any `Puller`'s waiting for new nodes to arrive.
  fn drop(&mut self) {
    let mut inner = self.0.inner.lock();
    inner.npushers -= 1;

    // If this was the last pusher then wake any pullers that are waiting to
    // receive new items.  (When they discover that no pushers remain they will
    // return None).
    if inner.npushers == 0 {
      self.0.signal.notify_all();
      for waker in inner.wakers.drain(..).map(|(_k, v)| v) {
        waker.wake()
      }
    }
  }
}

/// A weak reference to a [`Pusher`].
#[repr(transparent)]
pub struct WeakPusher<I>(pub(crate) Weak<super::Shared<I>>);

impl<I> Clone for WeakPusher<I> {
  fn clone(&self) -> Self {
    Self(Weak::clone(&self.0))
  }
}

impl<I> WeakPusher<I> {
  /// Attempt to upgrade `WeakPusher` to a [`Pusher`].
  ///
  /// Returns `None` is all the strong references have been exhausted.

  pub fn upgrade(&self) -> Option<Pusher<I>> {
    if let Some(strong) = self.0.upgrade() {

      let mut inner = strong.inner.lock();
      inner.npushers += 1;
      Some(Pusher(Arc::clone(&strong)))
    } else {
      None
    }

  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :







>
|
|











>









>






>







|
<



















|



















>

|
>
|
|
|
<
<
|
>




8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100


101
102
103
104
105
106

impl<I> Pusher<I> {
  /// Push a node on to the queue and unlock one queue reader, if any.
  ///
  /// If there are any tasks or threads waiting for new nodes to arrive they
  /// will be notified.
  ///
  /// # Errors
  /// `StaleErr` means there are no [`Puller`](super::Puller)'s available
  /// to receive any new nodes.
  #[cfg_attr(feature = "inline-more", inline)]
  pub fn push(&self, item: I) -> Result<(), StaleErr> {
    let mut inner = self.0.inner.lock();
    if inner.npullers == 0 {
      Err(StaleErr)
    } else {
      inner.q.push_back(item);
      if let Some((_, n)) = inner.wakers.pop() {
        n.wake();
      }
      self.0.signal.notify_one();
      drop(inner);
      Ok(())
    }
  }

  /// Returns a boolean indicating whether the queue was empty or not.
  ///
  /// This function is not particularly useful.  If you don't understand why,
  /// then please don't use it.
  #[cfg_attr(feature = "inline-more", inline)]
  #[must_use]
  pub fn was_empty(&self) -> bool {
    let inner = self.0.inner.lock();
    inner.q.is_empty()
  }

  /// Create a weak reference to this `Pusher`.
  #[must_use]
  pub fn weak(&self) -> WeakPusher<I> {
    WeakPusher(Arc::downgrade(&self.0))
  }
}

impl<I> Clone for Pusher<I> {
  fn clone(&self) -> Self {
    self.0.inner.lock().npushers += 1;

    Self(Arc::clone(&self.0))
  }
}

impl<I> Drop for Pusher<I> {
  /// Drop a `Pusher` instance.
  ///
  /// When the final instance of a sigq's instance's `Pusher` is dropped, wake
  /// up any `Puller`'s waiting for new nodes to arrive.
  fn drop(&mut self) {
    let mut inner = self.0.inner.lock();
    inner.npushers -= 1;

    // If this was the last pusher then wake any pullers that are waiting to
    // receive new items.  (When they discover that no pushers remain they will
    // return None).
    if inner.npushers == 0 {
      self.0.signal.notify_all();
      for waker in inner.wakers.drain(..).map(|(_k, v)| v) {
        waker.wake();
      }
    }
  }
}

/// A weak reference to a [`Pusher`].
#[repr(transparent)]
pub struct WeakPusher<I>(pub(crate) Weak<super::Shared<I>>);

impl<I> Clone for WeakPusher<I> {
  fn clone(&self) -> Self {
    Self(Weak::clone(&self.0))
  }
}

impl<I> WeakPusher<I> {
  /// Attempt to upgrade `WeakPusher` to a [`Pusher`].
  ///
  /// Returns `None` is all the strong references have been exhausted.
  #[must_use]
  pub fn upgrade(&self) -> Option<Pusher<I>> {
    self.0.upgrade().map_or_else(
      || None,
      |strong| {
        strong.inner.lock().npushers += 1;
        Some(Pusher(Arc::clone(&strong)))


      }
    )
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Added tests/managed.rs.

































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#[test]
fn return_to_queue() {
  let (tx, rx) = sigq::new();

  // Add "hello"
  tx.push("hello").unwrap();

  // Add "world"
  tx.push("world").unwrap();

  // Take out "hello" as a managed element
  let s = rx.pop_managed().unwrap();
  assert_eq!(*s, "hello");

  // Drop the managed element, which should put it back on the queue
  drop(s);

  // hello, again
  let s = rx.pop().unwrap();
  assert_eq!(s, "hello");
}

#[test]
fn finalize() {
  let (tx, rx) = sigq::new();

  // Add "hello"
  tx.push("hello").unwrap();

  // All "world"
  tx.push("world").unwrap();

  // Take out "hello" as a managed element
  let s = rx.pop_managed().unwrap();
  assert_eq!(*s, "hello");

  // Mark element as handled
  s.handled();

  // next pull should yield "world"
  let s = rx.pop().unwrap();
  assert_eq!(s, "world");

  // .. and queue should be empty now
  assert!(rx.was_empty());
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to www/changelog.md.

1
2
3
4


5
6
7
8
9
10

11













12


13
14
15
16
17
18
19

20
21


22
23
24
25
26
27

28
29


30
31
32
33
34

35
36


37
38
39
40
41

42
43


44
45
46
47
48
49

50
51


52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

69
70
71
72
73
74
75
# Change Log

## [Unreleased]



### Added

### Changed

### Removed
















## [0.13.4] - 2023-09-15



### Added

- `Pusher` objects can spawn downgraded to `WeakPusher` objects, that in turn
  can be upgraded to `Pusher` objects (as long as all the strong `Pusher`
  objects have not been dropped).



## [0.13.3] - 2023-07-27



### Changed

- Fixed bug that caused `Puller` to return `Err(StaleErr)` even though nodes
  still remain in the queue.



## [0.13.2] - 2023-07-26



### Changed

- Documentation updates.



## [0.13.1] - 2023-07-25



### Changed

- When the last `Puller` is dropped, clear the queue.



## [0.13.0] - 2023-07-25



### Changed

- `Pusher::push()` will return `Err(StaleErr)` if there are no more associated
  `Puller`s.



## [0.12.0] - 2023-07-24



### Added

- Add an `inline-more` feature (enabled by default).

### Changed

- Split `Queue` into `Pusher` and `Puller` end-points.
  - When taking nodes off the queue using `Puller`, return a `Result` which can
    indicate an error if no nodes remain and there are no more `Pushers`
    associated with the queue (implied: If no pushers remain, no new nodes will
    be added, so the queue is effectively dead).
- Dependencies updated:
  - `indexmap` updated to `2.0.0`
- Use Rust edition 2021.
- Bump MSRV to 1.64 (based on MSRV in `indexmap`)



## [0.11.0] - 2022-09-09

### Added

- Add a `Default` implementation for `Queue`.
- Explicitly set MSRV is `1.36`




>
>






>

>
>
>
>
>
>
>
>
>
>
>
>
>

>
>







>


>
>






>


>
>





>


>
>





>


>
>






>


>
>

















>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# Change Log

## [Unreleased]

[Details](/vdiff?from=sigq-0.13.5&to=trunk)

### Added

### Changed

### Removed

---

## [0.13.5] - 2024-09-10

[Details](/vdiff?from=sigq-0.13.4&to=sigq-0.13.5)

### Added

- Implemented `Puller::pop_managed()`, `Puller::try_pop_managed()`,
  `Puller::apop_managed()` which return queue elements wrapped in the
  `MustHandle` wrapper, which will automatically return elements to the queue
  if unhandled when dropped.

---

## [0.13.4] - 2023-09-15

[Details](/vdiff?from=sigq-0.13.3&to=sigq-0.13.4)

### Added

- `Pusher` objects can spawn downgraded to `WeakPusher` objects, that in turn
  can be upgraded to `Pusher` objects (as long as all the strong `Pusher`
  objects have not been dropped).

---

## [0.13.3] - 2023-07-27

[Details](/vdiff?from=sigq-0.13.2&to=sigq-0.13.3)

### Changed

- Fixed bug that caused `Puller` to return `Err(StaleErr)` even though nodes
  still remain in the queue.

---

## [0.13.2] - 2023-07-26

[Details](/vdiff?from=sigq-0.13.1&to=sigq-0.13.2)

### Changed

- Documentation updates.

---

## [0.13.1] - 2023-07-25

[Details](/vdiff?from=sigq-0.13.0&to=sigq-0.13.1)

### Changed

- When the last `Puller` is dropped, clear the queue.

---

## [0.13.0] - 2023-07-25

[Details](/vdiff?from=sigq-0.12.0&to=sigq-0.13.0)

### Changed

- `Pusher::push()` will return `Err(StaleErr)` if there are no more associated
  `Puller`s.

---

## [0.12.0] - 2023-07-24

[Details](/vdiff?from=sigq-0.11.0&to=sigq-0.12.0)

### Added

- Add an `inline-more` feature (enabled by default).

### Changed

- Split `Queue` into `Pusher` and `Puller` end-points.
  - When taking nodes off the queue using `Puller`, return a `Result` which can
    indicate an error if no nodes remain and there are no more `Pushers`
    associated with the queue (implied: If no pushers remain, no new nodes will
    be added, so the queue is effectively dead).
- Dependencies updated:
  - `indexmap` updated to `2.0.0`
- Use Rust edition 2021.
- Bump MSRV to 1.64 (based on MSRV in `indexmap`)

---

## [0.11.0] - 2022-09-09

### Added

- Add a `Default` implementation for `Queue`.
- Explicitly set MSRV is `1.36`