Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From sigq-0.12.0 To sigq-0.13.0
|
2023-07-25
| ||
| 00:33 | Add missing release date. check-in: 6d69807551 user: jan tags: trunk | |
| 00:30 | Release maintenance. check-in: 3d150dc1c3 user: jan tags: sigq-0.13.0, trunk | |
| 00:28 | Typo. check-in: f60eca957a user: jan tags: trunk | |
| 00:28 | Keep track of whether the Puller has disappeared. check-in: 58a487a4a1 user: jan tags: trunk | |
|
2023-07-24
| ||
| 21:34 | Clippy. check-in: 8c44c91ce4 user: jan tags: sigq-0.12.0, trunk | |
| 21:29 | Release maintenance. check-in: 470cdcb5d3 user: jan tags: trunk | |
Changes to Cargo.toml.
1 2 | [package] name = "sigq" | | | 1 2 3 4 5 6 7 8 9 10 | [package] name = "sigq" version = "0.13.0" authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"] edition = "2021" license = "0BSD" categories = [ "asynchronous", "concurrency", "data-structures" ] keywords = [ "threads", "sync" ] repository = "https://repos.qrnch.tech/pub/sigq" description = "Queue that signals waiting consumers about node availability" |
| ︙ | ︙ |
Changes to src/lib.rs.
| ︙ | ︙ | |||
8 9 10 11 12 13 14 |
collections::VecDeque, sync::atomic::AtomicUsize, sync::Arc, task::Waker
};
use parking_lot::{Condvar, Mutex};
use indexmap::IndexMap;
| | > > > > > | 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
collections::VecDeque, sync::atomic::AtomicUsize, sync::Arc, task::Waker
};
use parking_lot::{Condvar, Mutex};
use indexmap::IndexMap;
pub use pull::Puller;
pub use push::Pusher;
#[derive(Debug, PartialEq)]
pub struct StaleErr;
/// Inner shared data.
///
/// This is read/write data, and hence protected by a mutex.
struct Inner<I> {
q: VecDeque<I>,
npushers: usize,
npullers: usize,
wakers: IndexMap<usize, Waker>
}
/// Inner shared data.
struct Shared<I> {
signal: Condvar,
inner: Mutex<Inner<I>>,
idgen: AtomicUsize
}
/// Create a new queue and return its paired push and pull objects.
pub fn new<T>() -> (Pusher<T>, Puller<T>) {
let inner = Inner {
q: VecDeque::new(),
npushers: 1,
npullers: 1,
wakers: IndexMap::new()
};
let shared = Shared {
signal: Condvar::new(),
inner: Mutex::new(inner),
idgen: AtomicUsize::new(1)
};
|
| ︙ | ︙ |
Changes to src/pull.rs.
1 2 3 4 5 6 7 8 9 |
use std::{
future::Future,
num::NonZeroUsize,
pin::Pin,
sync::atomic::Ordering,
sync::Arc,
task::{Context, Poll}
};
| < < < > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
use std::{
future::Future,
num::NonZeroUsize,
pin::Pin,
sync::atomic::Ordering,
sync::Arc,
task::{Context, Poll}
};
/// The receiving end-point of queue.
#[repr(transparent)]
pub struct Puller<I>(pub(crate) Arc<super::Shared<I>>);
use super::StaleErr;
impl<I> Puller<I> {
/// Pull the oldest node off the queue and return it.
///
/// If no nodes are available on the queue, then block and wait for one to
/// become available.
///
|
| ︙ | ︙ | |||
90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
/// then please don't use it.
#[cfg_attr(feature = "inline-more", inline)]
pub fn was_empty(&self) -> bool {
let inner = self.0.inner.lock();
inner.q.is_empty()
}
}
#[doc(hidden)]
pub struct PopFuture<I> {
ctx: Arc<super::Shared<I>>,
id: Option<NonZeroUsize>
}
| > > > > > > > | 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
/// then please don't use it.
#[cfg_attr(feature = "inline-more", inline)]
pub fn was_empty(&self) -> bool {
let inner = self.0.inner.lock();
inner.q.is_empty()
}
}
impl<I> Drop for Puller<I> {
fn drop(&mut self) {
let mut inner = self.0.inner.lock();
inner.npullers -= 1;
}
}
#[doc(hidden)]
pub struct PopFuture<I> {
ctx: Arc<super::Shared<I>>,
id: Option<NonZeroUsize>
}
|
| ︙ | ︙ |
Changes to src/push.rs.
1 2 3 4 5 6 7 8 9 10 11 12 |
use std::sync::Arc;
/// The transmitting end-point of queue.
#[repr(transparent)]
pub struct Pusher<I>(pub(crate) Arc<super::Shared<I>>);
impl<I> Pusher<I> {
/// Push a node on to the queue and unlock one queue reader, if any.
///
/// If there are any tasks or threads waiting for new nodes to arrive they
/// will be notified.
#[cfg_attr(feature = "inline-more", inline)]
| > > > > > | > > > | | | | | | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
use std::sync::Arc;
/// The transmitting end-point of queue.
#[repr(transparent)]
pub struct Pusher<I>(pub(crate) Arc<super::Shared<I>>);
use super::StaleErr;
impl<I> Pusher<I> {
/// Push a node on to the queue and unlock one queue reader, if any.
///
/// If there are any tasks or threads waiting for new nodes to arrive they
/// will be notified.
///
/// Returns `Err(StaleErr)` is there are no [`Puller`](super::Puller)'s
/// available to receive any new nodes.
#[cfg_attr(feature = "inline-more", inline)]
pub fn push(&self, item: I) -> Result<(), StaleErr> {
let mut inner = self.0.inner.lock();
if inner.npullers == 0 {
Err(StaleErr)
} else {
inner.q.push_back(item);
if let Some((_, n)) = inner.wakers.pop() {
n.wake();
}
self.0.signal.notify_one();
Ok(())
}
}
/// Returns a boolean indicating whether the queue was empty or not.
///
/// This function is not particularly useful. If you don't understand why,
/// then please don't use it.
#[cfg_attr(feature = "inline-more", inline)]
|
| ︙ | ︙ |
Changes to tests/simple.rs.
1 2 3 4 5 6 7 8 9 |
use sigq::StaleErr;
#[test]
fn try_pop_retvals() {
let (tx, rx) = sigq::new();
// There's a client, no data, so try_pop() should return Ok(None)
assert_eq!(rx.try_pop(), Ok(None));
| | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
use sigq::StaleErr;
#[test]
fn try_pop_retvals() {
let (tx, rx) = sigq::new();
// There's a client, no data, so try_pop() should return Ok(None)
assert_eq!(rx.try_pop(), Ok(None));
tx.push("hello").unwrap();
// There's a client, data, so try_pop() should return Ok(Some)
assert_eq!(rx.try_pop(), Ok(Some("hello")));
// There's a client, so try_pop() should return Ok(None)
assert_eq!(rx.try_pop(), Ok(None));
|
| ︙ | ︙ | |||
25 26 27 28 29 30 31 |
#[test]
fn drop_nonempty() {
let (tx, rx) = sigq::new();
// There's a client, no data, so try_pop() should return Ok(None)
assert_eq!(rx.try_pop(), Ok(None));
| | | 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
#[test]
fn drop_nonempty() {
let (tx, rx) = sigq::new();
// There's a client, no data, so try_pop() should return Ok(None)
assert_eq!(rx.try_pop(), Ok(None));
tx.push("hello").unwrap();
// Drop the only pusher. It should still be possible to pull existing nodes
// off the stack.
drop(tx);
// There's a client, data, so try_pop() should return Ok(Some)
assert_eq!(rx.try_pop(), Ok(Some("hello")));
|
| ︙ | ︙ | |||
55 56 57 58 59 60 61 62 63 | std::thread::sleep(std::time::Duration::from_millis(500)); drop(tx); assert!(jh.join().unwrap()); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : | > > > > > > > > > | 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
std::thread::sleep(std::time::Duration::from_millis(500));
drop(tx);
assert!(jh.join().unwrap());
}
#[test]
fn stale_puller() {
let (tx, rx) = sigq::new::<&str>();
drop(rx);
assert_eq!(tx.push("hello"), Err(StaleErr));
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
|
Changes to www/changelog.md.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.12.0] - 2023-07-24 ### Added | > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.13.0] ### Added ### Changed - `Pusher::push()` will return `Err(StaleErr)` if there are no more associated `Puller`s. ### Removed ## [0.12.0] - 2023-07-24 ### Added |
| ︙ | ︙ |