Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From sigq-0.12.0 To sigq-0.13.0
2023-07-25
| ||
00:33 | Add missing release date. check-in: 6d69807551 user: jan tags: trunk | |
00:30 | Release maintenance. check-in: 3d150dc1c3 user: jan tags: sigq-0.13.0, trunk | |
00:28 | Typo. check-in: f60eca957a user: jan tags: trunk | |
00:28 | Keep track of whether the Puller has disappeared. check-in: 58a487a4a1 user: jan tags: trunk | |
2023-07-24
| ||
21:34 | Clippy. check-in: 8c44c91ce4 user: jan tags: sigq-0.12.0, trunk | |
21:29 | Release maintenance. check-in: 470cdcb5d3 user: jan tags: trunk | |
Changes to Cargo.toml.
1 2 | [package] name = "sigq" | | | 1 2 3 4 5 6 7 8 9 10 | [package] name = "sigq" version = "0.13.0" authors = ["Jan Danielsson <jan.danielsson@qrnch.com>"] edition = "2021" license = "0BSD" categories = [ "asynchronous", "concurrency", "data-structures" ] keywords = [ "threads", "sync" ] repository = "https://repos.qrnch.tech/pub/sigq" description = "Queue that signals waiting consumers about node availability" |
︙ | ︙ |
Changes to src/lib.rs.
︙ | ︙ | |||
8 9 10 11 12 13 14 | collections::VecDeque, sync::atomic::AtomicUsize, sync::Arc, task::Waker }; use parking_lot::{Condvar, Mutex}; use indexmap::IndexMap; | | > > > > > | 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | collections::VecDeque, sync::atomic::AtomicUsize, sync::Arc, task::Waker }; use parking_lot::{Condvar, Mutex}; use indexmap::IndexMap; pub use pull::Puller; pub use push::Pusher; #[derive(Debug, PartialEq)] pub struct StaleErr; /// Inner shared data. /// /// This is read/write data, and hence protected by a mutex. struct Inner<I> { q: VecDeque<I>, npushers: usize, npullers: usize, wakers: IndexMap<usize, Waker> } /// Inner shared data. struct Shared<I> { signal: Condvar, inner: Mutex<Inner<I>>, idgen: AtomicUsize } /// Create a new queue and return its paired push and pull objects. pub fn new<T>() -> (Pusher<T>, Puller<T>) { let inner = Inner { q: VecDeque::new(), npushers: 1, npullers: 1, wakers: IndexMap::new() }; let shared = Shared { signal: Condvar::new(), inner: Mutex::new(inner), idgen: AtomicUsize::new(1) }; |
︙ | ︙ |
Changes to src/pull.rs.
1 2 3 4 5 6 7 8 9 | use std::{ future::Future, num::NonZeroUsize, pin::Pin, sync::atomic::Ordering, sync::Arc, task::{Context, Poll} }; | < < < > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | use std::{ future::Future, num::NonZeroUsize, pin::Pin, sync::atomic::Ordering, sync::Arc, task::{Context, Poll} }; /// The receiving end-point of queue. #[repr(transparent)] pub struct Puller<I>(pub(crate) Arc<super::Shared<I>>); use super::StaleErr; impl<I> Puller<I> { /// Pull the oldest node off the queue and return it. /// /// If no nodes are available on the queue, then block and wait for one to /// become available. /// |
︙ | ︙ | |||
90 91 92 93 94 95 96 97 98 99 100 101 102 103 | /// then please don't use it. #[cfg_attr(feature = "inline-more", inline)] pub fn was_empty(&self) -> bool { let inner = self.0.inner.lock(); inner.q.is_empty() } } #[doc(hidden)] pub struct PopFuture<I> { ctx: Arc<super::Shared<I>>, id: Option<NonZeroUsize> } | > > > > > > > | 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | /// then please don't use it. #[cfg_attr(feature = "inline-more", inline)] pub fn was_empty(&self) -> bool { let inner = self.0.inner.lock(); inner.q.is_empty() } } impl<I> Drop for Puller<I> { fn drop(&mut self) { let mut inner = self.0.inner.lock(); inner.npullers -= 1; } } #[doc(hidden)] pub struct PopFuture<I> { ctx: Arc<super::Shared<I>>, id: Option<NonZeroUsize> } |
︙ | ︙ |
Changes to src/push.rs.
1 2 3 4 5 6 7 8 9 10 11 12 | use std::sync::Arc; /// The transmitting end-point of queue. #[repr(transparent)] pub struct Pusher<I>(pub(crate) Arc<super::Shared<I>>); impl<I> Pusher<I> { /// Push a node on to the queue and unlock one queue reader, if any. /// /// If there are any tasks or threads waiting for new nodes to arrive they /// will be notified. #[cfg_attr(feature = "inline-more", inline)] | > > > > > | > > > | | | | | | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | use std::sync::Arc; /// The transmitting end-point of queue. #[repr(transparent)] pub struct Pusher<I>(pub(crate) Arc<super::Shared<I>>); use super::StaleErr; impl<I> Pusher<I> { /// Push a node on to the queue and unlock one queue reader, if any. /// /// If there are any tasks or threads waiting for new nodes to arrive they /// will be notified. /// /// Returns `Err(StaleErr)` is there are no [`Puller`](super::Puller)'s /// available to receive any new nodes. #[cfg_attr(feature = "inline-more", inline)] pub fn push(&self, item: I) -> Result<(), StaleErr> { let mut inner = self.0.inner.lock(); if inner.npullers == 0 { Err(StaleErr) } else { inner.q.push_back(item); if let Some((_, n)) = inner.wakers.pop() { n.wake(); } self.0.signal.notify_one(); Ok(()) } } /// Returns a boolean indicating whether the queue was empty or not. /// /// This function is not particularly useful. If you don't understand why, /// then please don't use it. #[cfg_attr(feature = "inline-more", inline)] |
︙ | ︙ |
Changes to tests/simple.rs.
1 2 3 4 5 6 7 8 9 | use sigq::StaleErr; #[test] fn try_pop_retvals() { let (tx, rx) = sigq::new(); // There's a client, no data, so try_pop() should return Ok(None) assert_eq!(rx.try_pop(), Ok(None)); | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | use sigq::StaleErr; #[test] fn try_pop_retvals() { let (tx, rx) = sigq::new(); // There's a client, no data, so try_pop() should return Ok(None) assert_eq!(rx.try_pop(), Ok(None)); tx.push("hello").unwrap(); // There's a client, data, so try_pop() should return Ok(Some) assert_eq!(rx.try_pop(), Ok(Some("hello"))); // There's a client, so try_pop() should return Ok(None) assert_eq!(rx.try_pop(), Ok(None)); |
︙ | ︙ | |||
25 26 27 28 29 30 31 | #[test] fn drop_nonempty() { let (tx, rx) = sigq::new(); // There's a client, no data, so try_pop() should return Ok(None) assert_eq!(rx.try_pop(), Ok(None)); | | | 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | #[test] fn drop_nonempty() { let (tx, rx) = sigq::new(); // There's a client, no data, so try_pop() should return Ok(None) assert_eq!(rx.try_pop(), Ok(None)); tx.push("hello").unwrap(); // Drop the only pusher. It should still be possible to pull existing nodes // off the stack. drop(tx); // There's a client, data, so try_pop() should return Ok(Some) assert_eq!(rx.try_pop(), Ok(Some("hello"))); |
︙ | ︙ | |||
55 56 57 58 59 60 61 62 63 | std::thread::sleep(std::time::Duration::from_millis(500)); drop(tx); assert!(jh.join().unwrap()); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : | > > > > > > > > > | 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | std::thread::sleep(std::time::Duration::from_millis(500)); drop(tx); assert!(jh.join().unwrap()); } #[test] fn stale_puller() { let (tx, rx) = sigq::new::<&str>(); drop(rx); assert_eq!(tx.push("hello"), Err(StaleErr)); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to www/changelog.md.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.12.0] - 2023-07-24 ### Added | > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | # Change Log ## [Unreleased] ### Added ### Changed ### Removed ## [0.13.0] ### Added ### Changed - `Pusher::push()` will return `Err(StaleErr)` if there are no more associated `Puller`s. ### Removed ## [0.12.0] - 2023-07-24 ### Added |
︙ | ︙ |