1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
-
-
-
-
-
+
+
+
+
+
-
-
+
+
+
+
+
+
-
-
-
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
+
-
-
-
-
-
-
-
-
+
-
-
-
+
+
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
+
+
-
-
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
+
-
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
|
//! Queue which supports pushing and poping nodes from threads/tasks, crossing
//! sync/async boundaries.
use std::collections::VecDeque;
use std::future::Future;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
pub mod pull;
pub mod push;
use std::{
collections::VecDeque, sync::atomic::AtomicUsize, sync::Arc, task::Waker
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
};
use parking_lot::{Condvar, Mutex};
use indexmap::IndexMap;
pub use pull::{Puller, StaleErr};
pub use push::Pusher;
/// Inner shared data.
///
/// This is read/write data, and hence protected by a mutex.
struct Inner<I> {
q: VecDeque<I>,
npushers: usize,
wakers: IndexMap<usize, Waker>
}
/// Inner shared data.
struct Shared<I> {
signal: Condvar,
inner: Mutex<Inner<I>>,
idgen: AtomicUsize
}
#[repr(transparent)]
pub struct Queue<I>(Arc<Shared<I>>);
/// Create a new queue and return its paired push and pull objects.
pub fn new<T>() -> (Pusher<T>, Puller<T>) {
impl<I> Queue<I> {
/// Create, and return, a new queue.
pub fn new() -> Self {
Queue::default()
}
/// Returns a boolean indicating whether the queue was empty or not.
///
/// This function is not particularly useful. If you don't understand why,
/// then please don't use it.
#[inline(always)]
pub fn was_empty(&self) -> bool {
let inner = self.0.inner.lock();
let inner = Inner {
inner.q.is_empty()
}
q: VecDeque::new(),
/// Push a node on to the queue and unlock one queue reader, if any.
///
/// If there are any tasks or threads waiting for new nodes to arrive they
/// will be notified.
#[inline(always)]
pub fn push(&self, item: I) {
let mut inner = self.0.inner.lock();
inner.q.push_back(item);
npushers: 1,
if let Some((_, n)) = inner.wakers.pop() {
n.wake();
}
wakers: IndexMap::new()
};
self.0.signal.notify_one();
drop(inner);
}
let shared = Shared {
/// Pull the oldest node off the queue and return it. If no nodes are
/// available on the queue, then block and wait for one to become available.
#[inline(always)]
pub fn pop(&self) -> I {
let mut inner = self.0.inner.lock();
let node = loop {
match inner.q.pop_front() {
Some(node) => {
break node;
}
None => {
self.0.signal.wait(&mut inner);
signal: Condvar::new(),
}
}
};
drop(inner);
inner: Mutex::new(inner),
idgen: AtomicUsize::new(1)
node
}
};
let shared = Arc::new(shared);
/// Pull the oldest node off the queue and return it. If no nodes are
/// available on the queue, then return `None`.
#[inline(always)]
pub fn try_pop(&self) -> Option<I> {
let mut inner = self.0.inner.lock();
inner.q.pop_front()
}
/// This method serves the same purpose as the [`pop()`](#method.pop) method,
/// but rather than block it returns a `Future` to be used to wait for a node
/// to arrive in an `async` context.
///
/// ```
/// use sigq::Queue;
/// async fn test() {
/// let q = Queue::new();
/// q.push("hello".to_string());
/// assert_eq!(q.was_empty(), false);
/// let node = q.apop().await;
/// assert_eq!(node, "hello");
/// assert_eq!(q.was_empty(), true);
/// }
/// ```
#[inline(always)]
pub fn apop(&self) -> PopFuture<I> {
PopFuture {
ctx: Arc::clone(&self.0),
id: None
}
}
}
let pusher = Pusher(Arc::clone(&shared));
impl<I> Default for Queue<I> {
fn default() -> Self {
Queue(Arc::new(Shared {
signal: Condvar::new(),
inner: Mutex::new(Inner {
q: VecDeque::new(),
wakers: IndexMap::new()
}),
idgen: AtomicUsize::new(1)
}))
}
}
let puller = Puller(shared);
#[doc(hidden)]
pub struct PopFuture<I> {
ctx: Arc<Shared<I>>,
id: Option<NonZeroUsize>
}
(pusher, puller)
impl<I: 'static + Send> Future for PopFuture<I> {
type Output = I;
fn poll(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>
) -> Poll<Self::Output> {
let mut inner = self.ctx.inner.lock();
match inner.q.pop_front() {
Some(node) => Poll::Ready(node),
None => {
// Generate a unique identifier for this waker
let id = loop {
let id = self.ctx.idgen.fetch_add(1, Ordering::SeqCst);
// Make sure if is non-zero and unique
if id == 0 || inner.wakers.contains_key(&id) {
continue;
}
break id;
};
inner.wakers.insert(id, ctx.waker().clone());
drop(inner);
self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) });
Poll::Pending
}
}
}
}
impl<I> Drop for PopFuture<I> {
fn drop(&mut self) {
if let Some(id) = self.id {
let mut inner = self.ctx.inner.lock();
// Remove this future's waker
let _ = inner.wakers.remove(&id.get());
}
}
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
|