Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From limqch-0.1.0 To limqch-0.2.0
|
2025-04-12
| ||
| 20:35 | Use limq 0.3.1 to get bugfix. check-in: ef18178262 user: jan tags: trunk | |
|
2025-04-10
| ||
| 23:43 | Release maintenance. check-in: 51ff61f471 user: jan tags: limqch-0.2.0, trunk | |
| 23:24 | Docs. check-in: 8c996a0bf2 user: jan tags: trunk | |
| 23:03 | Merge. check-in: 7429ab4ba0 user: jan tags: trunk | |
|
2025-04-04
| ||
| 01:59 | Redesign for limq 0.2.0. check-in: 1024ad1186 user: jan tags: limq-0.2.0-updates | |
|
2025-04-01
| ||
| 01:48 | Link to limq. check-in: c40e4c6151 user: jan tags: limqch-0.1.0, trunk | |
| 01:47 | Move from old repo. check-in: 984b708d78 user: jan tags: trunk | |
Changes to .efiles.
1 2 3 4 5 6 7 8 | Cargo.toml README.md www/index.md www/changelog.md src/lib.rs src/tx.rs src/rx.rs tests/basic.rs | > > > | 1 2 3 4 5 6 7 8 9 10 11 | Cargo.toml README.md www/index.md www/changelog.md src/err.rs src/lib.rs src/tx.rs src/rx.rs tests/basic.rs tests/closed.rs tests/ctrl.rs |
Changes to Cargo.toml.
1 2 | [package] name = "limqch" | | | | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
[package]
name = "limqch"
version = "0.2.0"
edition = "2024"
license = "0BSD"
# https://crates.io/category_slugs
categories = [ "concurrency" ]
keywords = [ "channel", "bounded" ]
repository = "https://repos.qrnch.tech/pub/limqch"
description = "A channel built on top of limq."
rust-version = "1.85"
exclude = [
".fossil-settings",
".efiles",
".fslckout",
"www",
"bacon.toml",
"rustfmt.toml"
]
# https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section
[badges]
maintenance = { status = "actively-developed" }
[dependencies]
limq = { version = "0.3.0" }
parking_lot = { version = "0.12.3" }
wakerizer = { version = "0.1.0" }
[dev-dependencies]
tokio = { version = "1.44.1", features = ["macros", "rt", "rt-multi-thread"] }
[package.metadata.docs.rs]
|
| ︙ | ︙ |
Added rustfmt.toml.
> > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | blank_lines_upper_bound = 2 comment_width = 79 edition = "2024" format_strings = true max_width = 79 match_block_trailing_comma = false # merge_imports = true newline_style = "Unix" tab_spaces = 2 trailing_comma = "Never" unstable_features = true wrap_comments = true #reorder_imports = false #reorder_modules = false |
Added src/err.rs.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
use std::fmt;
/// Errors that [`Sender`](super::Sender) and [`Receiver`](super::Receiver)
/// methods can return.
#[derive(PartialEq, Eq)]
pub enum Error<T> {
/// No remote channel end-points remain.
Closed,
/// The node will currently not fit in the queue.
WontFit(T),
/// The node can't fit in the queue (unless reconfigured to allow larger
/// nodes).
CantFit(T)
}
impl<T> std::error::Error for Error<T> {}
impl<T> fmt::Debug for Error<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Closed => write!(f, "Error::Closed"),
Self::WontFit(_) => write!(f, "Error::WontFit"),
Self::CantFit(_) => write!(f, "Error::CantFit")
}
}
}
impl<T> fmt::Display for Error<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Closed => write!(f, "Remote end-points closed"),
Self::WontFit(_) => write!(f, "Won't fit"),
Self::CantFit(_) => write!(f, "Can't fit")
}
}
}
impl<T> From<limq::Error<T>> for Error<T> {
fn from(err: limq::Error<T>) -> Self {
match err {
limq::Error::WontFit(n) => Self::WontFit(n),
limq::Error::CantFit(n) => Self::CantFit(n)
}
}
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
|
Changes to src/lib.rs.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
//! Channel based on [`LimQ`].
mod rx;
mod tx;
use std::sync::Arc;
use parking_lot::{Condvar, Mutex};
use limq::LimQ;
use wakerizer::Wakers;
| > > | | > > > | < | > > | > > > | > | > > > | > > > | | > > > | | | | | | < > > > > > > > > > > > > > > > > > > > | > > > > | | | > > | > > > > | | | | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
//! Channel based on [`LimQ`].
mod err;
mod rx;
mod tx;
use std::sync::Arc;
use parking_lot::{Condvar, Mutex};
use limq::LimQ;
use wakerizer::Wakers;
pub use err::Error;
pub use limq::{Controller, Overflow};
pub use rx::{Receiver, RecvFuture};
pub use tx::Sender;
struct Inner<C, T>
where
C: Controller<Item = T>
{
q: LimQ<C, T>,
tx_count: usize,
rx_count: usize
}
impl<C, T> Inner<C, T>
where
C: Controller<Item = T>
{
fn new(lc: C) -> Self {
Self {
q: LimQ::new(lc),
tx_count: 0,
rx_count: 0
}
}
}
struct Shared<C, T>
where
C: Controller<Item = T>
{
inner: Mutex<Inner<C, T>>,
/// Used from receivers in order to wait for senders to add new elements to
/// the queue.
tx_wakers: Wakers,
tx_signal: Condvar,
/// Used from senders in order to wait for receivers to make room available
/// in the queue.
rx_wakers: Wakers,
rx_signal: Condvar
}
impl<C, T> Shared<C, T>
where
C: Controller<Item = T>
{
fn new(lc: C) -> Self {
let inner = Inner::new(lc);
let inner = Mutex::new(inner);
Self {
inner,
tx_wakers: Wakers::new(),
rx_wakers: Wakers::new(),
tx_signal: Condvar::new(),
rx_signal: Condvar::new()
}
}
/// Called by the receiver end-point when a node has been takwn off the
/// queue, in case any sendera are waiting for space to become available.
fn wake_senders(&self) {
self.tx_wakers.wake_all();
self.tx_signal.notify_one();
}
/// Called by the sender end-point when a node has been added to the queue in
/// case any receivers are waiting for nodes to become available.
fn wake_receivers(&self) {
self.rx_wakers.wake_all();
self.rx_signal.notify_one();
}
}
/// Create a channel pair, with an optional internal queue limit.
///
/// ```
/// use limqch::{channel, Error};
/// use limq::{LimQ, LengthLimit};
///
/// // Construct a channel which uses an internal queue that is limited to
/// // 2 elements.
/// let lenlim = LengthLimit::new(2);
/// let (tx, rx) = channel(lenlim);
///
/// tx.try_send(1).unwrap();
/// tx.try_send(2).unwrap();
/// let Err(Error::WontFit(n)) = tx.try_send(3) else {
/// panic!("Unexpectedly not Error::WontFit");
/// };
///
/// let n = rx.try_recv().unwrap().unwrap();
/// assert_eq!(n, 1);
/// ```
#[must_use]
pub fn channel<C, T>(lc: C) -> (Sender<C, T>, Receiver<C, T>)
where
C: Controller<Item = T>,
T: Send + Sync
{
let sh = Shared::new(lc);
let sh = Arc::new(sh);
let tx = Sender::new(Arc::clone(&sh));
let rx = Receiver::new(sh);
(tx, rx)
}
/*
/// Object that can be used to spawn sender and receiver end-points.
pub struct Spawner<C, T>(Arc<Shared<C, T>>)
where
C: Controller<Item = T>;
impl<C, T> Spawner<C, T>
where
C: Controller<Item = T>,
T: Send + Sync
{
#[must_use]
pub fn new(lc: C) -> Self {
let sh = Shared::new(lc);
let sh = Arc::new(sh);
Self(sh)
}
#[must_use]
pub fn sender(&self) -> Sender<C, T> {
Sender::new(Arc::clone(&self.0))
}
#[must_use]
pub fn receiver(&self) -> Receiver<C, T> {
Receiver::new(Arc::clone(&self.0))
}
}
*/
/// Proxy object to allow [`Sender`] and [`Receiver`] instances to
/// access the internal queue's [`Controller`].
#[derive(Clone)]
#[repr(transparent)]
pub struct Ctrl<C, T>(Arc<Shared<C, T>>)
where
C: Controller<Item = T>;
impl<C, T> Ctrl<C, T>
where
C: Controller<Item = T>
{
/// Access the underlying [`Controller`] in a closure.
///
/// This can be used to access the `Controller`.
///
/// # Caveat Utilitor
/// An internal lock is held while the closure is called. The application
/// must return quickly to avoid holding up the channel end-points (or,
/// worse, holding up an an async runtime).
pub fn with_ctrl<F, R>(&self, f: F) -> R
where
F: FnOnce(&C) -> R
{
let inner = self.0.inner.lock();
f(inner.q.controller())
}
/// Access the underlying [`Controller`] mutably in a closure.
///
/// This can be used to reconfigure the `Controller`, if it support
/// reconfiguration.
///
/// # Caveat Utilitor
/// An internal lock is held while the closure is called. The application
/// must return quickly to avoid holding up the channel end-points (or,
/// worse, holding up an an async runtime).
pub fn with_ctrl_mut<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut C) -> R
{
let mut inner = self.0.inner.lock();
f(inner.q.controller_mut())
}
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
|
Changes to src/rx.rs.
1 2 3 4 5 6 7 8 9 |
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll}
};
use wakerizer::Waiter;
| | < | > > > | > > > > > > > > > > > > > > | > > > | > > > > > > > > > > > > > > > | > > > > | | < | < | > > | | > > > > > > > | > > > | < | < > | < > > > | > > > > > > > > > > > > > | > > > | | > > > | | > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll}
};
use wakerizer::Waiter;
use super::{Controller, Ctrl, Error, Shared};
/// Channel receiver end-point.
pub struct Receiver<C, T>
where
C: Controller<Item = T>
{
sh: Arc<Shared<C, T>>
}
impl<C, T> Clone for Receiver<C, T>
where
C: Controller<Item = T>
{
fn clone(&self) -> Self {
let mut inner = self.sh.inner.lock();
inner.rx_count += 1;
drop(inner);
let sh = Arc::clone(&self.sh);
Self { sh }
}
}
impl<C, T> Receiver<C, T>
where
C: Controller<Item = T>
{
#[must_use]
pub(super) fn new(sh: Arc<Shared<C, T>>) -> Self {
let mut inner = sh.inner.lock();
inner.rx_count += 1;
drop(inner);
Self { sh }
}
/// Return a [`Ctrl`], which can be used to access the internal
/// [`Controller`].
#[must_use]
pub fn ctrl(&self) -> Ctrl<C, T> {
Ctrl(Arc::clone(&self.sh))
}
/// Receive a node from queue. If the queue is empty, block and wait for a
/// new node to arrive.
///
/// # Errors
/// [`Error::Closed`] means that the queue is empty and all the
/// [`Sender`](super::Sender)s have been dropped.
pub fn recv_blocking(&self) -> Result<T, Error<T>> {
let mut inner = self.sh.inner.lock();
let n = loop {
if inner.q.is_empty() && inner.tx_count == 0 {
return Err(Error::Closed);
}
if let Some(n) = inner.q.pop() {
break n;
}
self.sh.rx_signal.wait(&mut inner);
};
// Wake up any async senders that are waiting for space to become
// available.
self.sh.wake_senders();
drop(inner);
Ok(n)
}
/// Returns a `Future` that is the `async` equivalent of
/// [`Receiver::recv_blocking()`].
#[must_use]
pub fn recv_async(&self) -> RecvFuture<C, T> {
RecvFuture {
sh: Arc::clone(&self.sh),
waiter: self.sh.rx_wakers.waiter()
}
}
/// Attempt to retreive a node from the queue.
///
/// Returns `Ok(Some(T))` if there's a node available for immediate pickup.
/// Returns `Ok(None)` is there are no nodes to pick up.
///
/// # Errors
/// [`Error::Closed`] means that the queue is empty and all the
/// [`Sender`](super::Sender)s have been dropped.
pub fn try_recv(&self) -> Result<Option<T>, Error<T>> {
let mut inner = self.sh.inner.lock();
if inner.q.is_empty() && inner.tx_count == 0 {
return Err(Error::Closed);
}
Ok(inner.q.pop().map_or_else(
|| None,
|n| {
// Wake up any async senders that are waiting for space to become
// available.
self.sh.wake_senders();
drop(inner);
Some(n)
}
))
}
}
impl<C, T> Drop for Receiver<C, T>
where
C: Controller<Item = T>
{
fn drop(&mut self) {
let mut inner = self.sh.inner.lock();
inner.rx_count -= 1;
drop(inner);
self.sh.wake_senders();
}
}
/// A `Future` that will will resolve when there's data that can be returned
/// from the channel, of it the internal queue is empty but all the
/// [`Sender`](super::Sender) end-points have been dropped.
pub struct RecvFuture<C, T>
where
C: Controller<Item = T>
{
sh: Arc<Shared<C, T>>,
waiter: Waiter
}
impl<C, T> Future for RecvFuture<C, T>
where
C: Controller<Item = T>
{
type Output = Result<T, Error<T>>;
fn poll(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>
) -> Poll<Self::Output> {
let mut inner = self.sh.inner.lock();
if let Some(n) = inner.q.pop() {
Poll::Ready(Ok(n))
} else if inner.tx_count == 0 {
Poll::Ready(Err(Error::Closed))
} else {
drop(inner); // happy borrow-checker
self.waiter.prime(ctx);
Poll::Pending
}
}
}
|
| ︙ | ︙ |
Changes to src/tx.rs.
1 2 3 4 5 6 7 8 9 |
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll}
};
use wakerizer::Waiter;
| | < | > > > | > > > > > > > > > > > > > > | > > > > | > > > > > > > > > > > > > | < | > > | > > | | > > > > > > > > | | | > > > | > > | > > > | < | > > > > > | > | > > | < < > > | < | > > > > > > > > > > | | > | | > > | < > > > | < < | | < > > > | > > > > | > | | | < < < | > | > > > > > | | < | > > > > > > | > > > > | < | > > > > > > > > > > > > > > > | > > > > | | > | > > > > | < | | | > | | | | < | > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll}
};
use wakerizer::Waiter;
use super::{Controller, Ctrl, Error, Overflow, Shared};
/// Channel transmitter end-point.
pub struct Sender<C, T>
where
C: Controller<Item = T>
{
sh: Arc<Shared<C, T>>
}
impl<C, T> Clone for Sender<C, T>
where
C: Controller<Item = T>
{
fn clone(&self) -> Self {
let mut inner = self.sh.inner.lock();
inner.tx_count += 1;
drop(inner);
let sh = Arc::clone(&self.sh);
Self { sh }
}
}
impl<C, T> Sender<C, T>
where
C: Controller<Item = T>,
T: Send + Sync
{
pub(super) fn new(sh: Arc<Shared<C, T>>) -> Self {
let mut inner = sh.inner.lock();
inner.tx_count += 1;
drop(inner);
Self { sh }
}
/// Return a [`Ctrl`], which can be used to access the internal
/// [`Controller`].
#[must_use]
pub fn ctrl(&self) -> Ctrl<C, T> {
Ctrl(Arc::clone(&self.sh))
}
/// Send an element over channel.
///
/// If the channel has a limit, and the limit has been reached, then block
/// and wait until a [`Receiver`](super::Receiver) has make more room
/// available on the queue.
///
/// # Errors
/// [`Error::CantFit`] means the node was rejected by the
/// [`Controller`]. [`Error::Closed`] means there are no more
/// [`Receiver`](super::Receiver)s available.
pub fn send_blocking(&self, mut n: T) -> Result<(), Error<T>> {
let mut inner = self.sh.inner.lock();
// Keep trying to `try_push()` until successful.
loop {
if inner.rx_count == 0 {
return Err(Error::Closed);
}
n = match inner.q.try_push(n) {
Ok(()) => break,
Err(e) => match e {
limq::Error::WontFit(n) => n,
limq::Error::CantFit(n) => {
return Err(Error::CantFit(n));
}
}
};
self.sh.tx_signal.wait(&mut inner);
}
drop(inner);
// Have a new element in queue -- wake up a waiting receiver
self.sh.wake_receivers();
Ok(())
}
/// This exists only because the compiler thinks `send_async()` is not `Send`
/// (because it thinks `inner` is held past the `await`, even though it
/// isn't.
///
/// # Errors
/// [`Error::CantFit`] means the node was rejected by the
/// [`Controller`]. [`Error::Closed`] means there are no more
/// [`Receiver`](super::Receiver)s available.
fn try_push(&self, n: T) -> Result<(), Error<T>> {
let mut inner = self.sh.inner.lock();
if inner.rx_count == 0 {
return Err(Error::Closed);
}
inner.q.try_push(n)?;
drop(inner);
Ok(())
}
/// Send an element over channel.
///
/// If the channel has a limit, and the limit has been reached, then block
/// and wait until a [`Receiver`](super::Receiver) has make more room
/// available on the queue.
///
/// # Errors
/// [`Error::CantFit`] means the [`Controller`] rejected the node.
/// [`Error::Closed`] means not [`Receiver`](super::Receiver)s remain.
pub async fn send_async(&self, mut n: T) -> Result<(), Error<T>> {
// In an ideal world, there'd be a SendFuture that takes in the new node.
// However, if the queue is full the node needs to be stored somewhere
// until the task is woken up, and this presents a few annoying challenges.
//
// Instead, we'll use a Future that simply waits for there to be space
// available and when that resolves immediate all the node to the queue.
//
// For multithreaded runtimes it is possible for a TOCTOU issue here so we
// need to loop until the try_push() is successful.
loop {
// Attempt to push node onto queue.
n = match self.try_push(n) {
Ok(()) => break,
Err(e) => match e {
Error::Closed => return Err(Error::Closed),
Error::WontFit(n) => n,
Error::CantFit(n) => return Err(Error::CantFit(n))
}
};
let fut = ReserveSpaceFuture {
sh: Arc::clone(&self.sh),
waiter: self.sh.tx_wakers.waiter(),
n: &n
};
match fut.await {
Ok(()) => {
// fall through
}
Err(e) => match e {
limq::CheckErr::WontFit => {
// fall through
}
limq::CheckErr::CantFit => {
return Err(Error::CantFit(n));
}
}
}
}
// Have a new element in queue -- wake up waiting receivers
self.sh.wake_receivers();
Ok(())
}
/// Fallible sending.
///
/// # Errors
/// [`Error::CantFit`] means the [`Controller`] permanently rejected the
/// node. [`Error::WontFit`] means the [`Controller`] temporarily
/// rejected the node. [`Error::Closed`] means not
/// [`Receiver`](super::Receiver)s remain.
pub fn try_send(&self, n: T) -> Result<(), Error<T>> {
let mut inner = self.sh.inner.lock();
if inner.rx_count == 0 {
return Err(Error::Closed);
}
inner.q.try_push(n)?;
drop(inner);
self.sh.wake_receivers();
Ok(())
}
/// Forcibly add an element to the queue.
///
/// If the queue has a limit and the queue is full, then the oldest node will
/// be removed before the new element is added.
///
/// # Errors
/// [`Error::CantFit`] means the [`Controller`] rejected the node.
/// [`Error::Closed`] means not [`Receiver`](super::Receiver)s remain.
pub fn force_send(&self, n: T) -> Result<(), Error<T>> {
let mut inner = self.sh.inner.lock();
inner.q.force_push(n)?;
drop(inner);
// Have a new element in queue -- wake up a waiting receiver
self.sh.wake_receivers();
Ok(())
}
/// Forcibly add an element to rhe channel, allowing the caller to determine
/// how overflow is handled.
///
/// # Errors
/// [`Error::CantFit`] means the [`Controller`] rejected the node.
/// [`Error::Closed`] means not [`Receiver`](super::Receiver)s remain.
pub fn force_send_oc(
&self,
n: T,
overflow: Overflow
) -> Result<(), Error<T>> {
let mut inner = self.sh.inner.lock();
inner.q.force_push_oc(n, overflow)?;
drop(inner);
// Have a new element in queue -- wake up a waiting receiver
self.sh.wake_receivers();
Ok(())
}
}
impl<C, T> Drop for Sender<C, T>
where
C: Controller<Item = T>
{
fn drop(&mut self) {
let mut inner = self.sh.inner.lock();
inner.tx_count -= 1;
drop(inner);
self.sh.wake_receivers();
}
}
/// A [`Future`] that will resolve when the queue is not full.
struct ReserveSpaceFuture<'n, C, T>
where
C: Controller<Item = T>,
T: Send
{
sh: Arc<Shared<C, T>>,
waiter: Waiter,
n: &'n T
}
impl<C, T> Future for ReserveSpaceFuture<'_, C, T>
where
C: Controller<Item = T>,
T: Send
{
type Output = Result<(), limq::CheckErr>;
fn poll(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>
) -> Poll<Self::Output> {
let inner = self.sh.inner.lock();
match inner.q.would_fit(self.n) {
Ok(()) => Poll::Ready(Ok(())),
Err(e) => match e {
limq::CheckErr::WontFit => {
drop(inner); // happy borrow-checker
self.waiter.prime(ctx);
Poll::Pending
}
limq::CheckErr::CantFit => Poll::Ready(Err(limq::CheckErr::CantFit))
}
}
}
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
|
Changes to tests/basic.rs.
1 2 3 4 | use std::thread; use tokio::task; | > > | > > | | | | | | | > | | | | | | | > | | | > | | | > | | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
use std::thread;
use tokio::task;
use limq::OptLenLim;
use limqch::{Error, channel};
#[test]
fn try_send_full() {
let lenlim = OptLenLim::new(Some(2));
let (tx, rx) = channel(lenlim);
tx.send_blocking(1).unwrap();
tx.send_blocking(2).unwrap();
assert_eq!(tx.try_send(3), Err(Error::WontFit(3)));
assert_eq!(rx.recv_blocking().unwrap(), 1);
assert_eq!(rx.try_recv().unwrap(), Some(2));
assert_eq!(rx.try_recv().unwrap(), None);
}
#[tokio::test]
async fn try_on_full() {
let lenlim = OptLenLim::new(Some(2));
let (tx, rx) = channel(lenlim);
tx.send_async(1).await.unwrap();
tx.send_async(2).await.unwrap();
assert_eq!(tx.try_send(3), Err(Error::WontFit(3)));
assert_eq!(rx.recv_async().await.unwrap(), 1);
assert_eq!(rx.try_recv().unwrap(), Some(2));
assert_eq!(rx.try_recv().unwrap(), None);
}
#[tokio::test]
async fn send_from_spawned_task() {
let lenlim = OptLenLim::new(Some(2));
let (tx, rx) = channel(lenlim);
task::spawn(async move {
tx.send_async(1).await.unwrap();
});
assert_eq!(rx.recv_async().await.unwrap(), 1);
}
#[tokio::test]
async fn blocking_send_from_thread() {
let lenlim = OptLenLim::new(Some(2));
let (tx, rx) = channel(lenlim);
thread::spawn(move || {
tx.send_blocking(1).unwrap();
});
assert_eq!(rx.recv_blocking().unwrap(), 1);
}
#[tokio::test]
async fn task_to_task() {
let lenlim = OptLenLim::new(Some(2));
let (tx, rx) = channel(lenlim);
task::spawn(async move {
tx.send_async(1).await.unwrap();
});
task::spawn_blocking(move || {
assert_eq!(rx.recv_blocking().unwrap(), 1);
})
.await
.unwrap();
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
|
Added tests/closed.rs.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
use limq::OptLenLim;
use limqch::{Error, channel};
// No receiver exists, so sending is pointless
#[test]
fn rx_closed_sync() {
let lenlim = OptLenLim::new(Some(2));
let (tx, rx) = channel(lenlim);
// Drop the only receiver
drop(rx);
// Transmitter should now return Error::Closed
let Err(Error::Closed) = tx.send_blocking(11) else {
panic!("Unexpectedly not Err(Error::Closed)");
};
let Err(Error::Closed) = tx.try_send(11) else {
panic!("Unexpectedly not Err(Error::Closed)");
};
}
// No receiver exists, so sending is pointless
#[tokio::test]
async fn rx_closed_async() {
let lenlim = OptLenLim::new(Some(2));
let (tx, rx) = channel(lenlim);
// Drop the only receiver
drop(rx);
// Transmitter should now return Error::Closed
let Err(Error::Closed) = tx.send_async(11).await else {
panic!("Unexpectedly not Err(Error::Closed)");
};
}
#[test]
fn tx_closed_empty_sync() {
let lenlim = OptLenLim::<u32>::new(Some(2));
let (tx, rx) = channel(lenlim);
// Drop the only transmitter
drop(tx);
// Transmitter should now return Error::Closed, because the queue is empty
// and there are no receivers.
let Err(Error::Closed) = rx.recv_blocking() else {
panic!("Unexpectedly not Err(Error::Closed)");
};
let Err(Error::Closed) = rx.try_recv() else {
panic!("Unexpectedly not Err(Error::Closed)");
};
}
#[tokio::test]
async fn tx_closed_empty_async() {
let lenlim = OptLenLim::<u32>::new(Some(2));
let (tx, rx) = channel(lenlim);
// Drop the only transmitter
drop(tx);
// Transmitter should now return Error::Closed, because the queue is empty
// and there are no receivers.
let Err(Error::Closed) = rx.recv_async().await else {
panic!("Unexpectedly not Err(Error::Closed)");
};
}
#[test]
fn tx_closed_nonempty_sync() {
let lenlim = OptLenLim::<u32>::new(Some(2));
let (tx, rx) = channel(lenlim);
// Add two nodes
tx.try_send(1).unwrap();
tx.try_send(2).unwrap();
// Drop the only transmitter
drop(tx);
// Should succeed
let Ok(1) = rx.recv_blocking() else {
panic!("Unexpectedly not Ok(Some(1))");
};
let Ok(Some(2)) = rx.try_recv() else {
panic!("Unexpectedly not Ok(Some(2))");
};
// queue is now empty, so it should be returning Error::Closed
let Err(Error::Closed) = rx.recv_blocking() else {
panic!("Unexpectedly not Err(Error::Closed)");
};
let Err(Error::Closed) = rx.try_recv() else {
panic!("Unexpectedly not Err(Error::Closed)");
};
}
#[tokio::test]
async fn tx_closed_nonempty_async() {
let lenlim = OptLenLim::<u32>::new(Some(2));
let (tx, rx) = channel(lenlim);
// Add a node
tx.try_send(1).unwrap();
// Drop the only transmitter
drop(tx);
// Should succeed
let Ok(1) = rx.recv_async().await else {
panic!("Unexpectedly not Ok(Some(1))");
};
// queue is now empty, so it should be returning Error::Closed
let Err(Error::Closed) = rx.recv_async().await else {
panic!("Unexpectedly not Err(Error::Closed)");
};
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
|
Added tests/ctrl.rs.
> > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
use limq::BufLim;
use limqch::channel;
#[test]
fn access_controller() {
let lenlim = BufLim::new(None, None);
let (tx, _rx) = channel(lenlim);
let max_len = tx.ctrl().with_ctrl(BufLim::get_max_len);
assert_eq!(max_len, None);
let max_size = tx.ctrl().with_ctrl(BufLim::get_max_size);
assert_eq!(max_size, None);
tx.ctrl().with_ctrl_mut(|c| c.set_max_len(Some(2)));
tx.ctrl().with_ctrl_mut(|c| c.set_max_size(Some(8)));
let max_len = tx.ctrl().with_ctrl(BufLim::get_max_len);
assert_eq!(max_len, Some(2));
let max_size = tx.ctrl().with_ctrl(BufLim::get_max_size);
assert_eq!(max_size, Some(8));
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
|
Changes to www/changelog.md.
1 2 3 4 5 6 | # Change Log ⚠️ indicates a breaking change. ## [Unreleased] | | > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | # Change Log ⚠️ indicates a breaking change. ## [Unreleased] [Details](/vdiff?from=limqch-0.2.0&to=trunk) ### Added ### Changed ### Removed --- ## [0.2.0] - 2025-04-11 [Details](/vdiff?from=limqch-0.1.0&to=limqch-0.2.0) ### Changed -⚠️ Update for `limq` `0.3.0`. Creating a limq channel now requires a `Controller` implementation. ### Removed - `Spawner` was removed. --- ## [0.1.0] - 2025-04-01 Initial release. |