limqch

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From limqch-0.1.0 To limqch-0.2.0

2025-04-12
20:35
Use limq 0.3.1 to get bugfix. check-in: ef18178262 user: jan tags: trunk
2025-04-10
23:43
Release maintenance. check-in: 51ff61f471 user: jan tags: limqch-0.2.0, trunk
23:24
Docs. check-in: 8c996a0bf2 user: jan tags: trunk
23:03
Merge. check-in: 7429ab4ba0 user: jan tags: trunk
2025-04-04
01:59
Redesign for limq 0.2.0. check-in: 1024ad1186 user: jan tags: limq-0.2.0-updates
2025-04-01
01:48
Link to limq. check-in: c40e4c6151 user: jan tags: limqch-0.1.0, trunk
01:47
Move from old repo. check-in: 984b708d78 user: jan tags: trunk

Changes to .efiles.
1
2
3
4

5
6
7
8


1
2
3
4
5
6
7
8
9
10
11




+




+
+
Cargo.toml
README.md
www/index.md
www/changelog.md
src/err.rs
src/lib.rs
src/tx.rs
src/rx.rs
tests/basic.rs
tests/closed.rs
tests/ctrl.rs
Changes to Cargo.toml.
1
2
3
4


5
6
7
8
9
10
11

12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

27
28
29
30
31
32
33
1
2


3
4
5
6
7
8
9
10

11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

26
27
28
29
30
31
32
33


-
-
+
+






-
+














-
+







[package]
name = "limqch"
version = "0.1.0"
edition = "2021"
version = "0.2.0"
edition = "2024"
license = "0BSD"
# https://crates.io/category_slugs
categories = [ "concurrency" ]
keywords = [ "channel", "bounded" ]
repository = "https://repos.qrnch.tech/pub/limqch"
description = "A channel built on top of limq."
rust-version = "1.56"
rust-version = "1.85"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "www",
  "bacon.toml",
  "rustfmt.toml"
]

# https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section
[badges]
maintenance = { status = "actively-developed" }

[dependencies]
limq = { version = "0.1.4" }
limq = { version = "0.3.0" }
parking_lot = { version = "0.12.3" }
wakerizer = { version = "0.1.0" }

[dev-dependencies]
tokio = { version = "1.44.1", features = ["macros", "rt", "rt-multi-thread"] }

[package.metadata.docs.rs]
Added rustfmt.toml.














1
2
3
4
5
6
7
8
9
10
11
12
13
14
+
+
+
+
+
+
+
+
+
+
+
+
+
+
blank_lines_upper_bound = 2
comment_width = 79
edition = "2024"
format_strings = true
max_width = 79
match_block_trailing_comma = false
# merge_imports = true
newline_style = "Unix"
tab_spaces = 2
trailing_comma = "Never"
unstable_features = true
wrap_comments = true
#reorder_imports = false
#reorder_modules = false
Added src/err.rs.

















































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
use std::fmt;

/// Errors that [`Sender`](super::Sender) and [`Receiver`](super::Receiver)
/// methods can return.
#[derive(PartialEq, Eq)]
pub enum Error<T> {
  /// No remote channel end-points remain.
  Closed,

  /// The node will currently not fit in the queue.
  WontFit(T),

  /// The node can't fit in the queue (unless reconfigured to allow larger
  /// nodes).
  CantFit(T)
}

impl<T> std::error::Error for Error<T> {}

impl<T> fmt::Debug for Error<T> {
  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
    match self {
      Self::Closed => write!(f, "Error::Closed"),
      Self::WontFit(_) => write!(f, "Error::WontFit"),
      Self::CantFit(_) => write!(f, "Error::CantFit")
    }
  }
}

impl<T> fmt::Display for Error<T> {
  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
    match self {
      Self::Closed => write!(f, "Remote end-points closed"),
      Self::WontFit(_) => write!(f, "Won't fit"),
      Self::CantFit(_) => write!(f, "Can't fit")
    }
  }
}

impl<T> From<limq::Error<T>> for Error<T> {
  fn from(err: limq::Error<T>) -> Self {
    match err {
      limq::Error::WontFit(n) => Self::WontFit(n),
      limq::Error::CantFit(n) => Self::CantFit(n)
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
Changes to src/lib.rs.
1
2

3
4
5
6
7
8
9
10
11
12
13

14
15


16
17



18

19
20



21
22
23
24
25










26
27
28
29
30





31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47






48
49
50
51
52
53
54
55
56
57
58
59
60


61
62
63
64
65
66
67


68
69
70
71
72
73
74



















75
76
77






78
79
80
81
82
83
84
85
86

87
88



89
90





91
92
93


94
95
96
97
98
99

100
101
102
103
104

105
106
107

108















































109
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15


16
17
18
19
20
21
22

23


24
25
26
27
28



29
30
31
32
33
34
35
36
37
38
39
40
41


42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77


78
79
80
81
82
83
84


85
86
87
88
89
90
91

92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112


113
114
115
116
117
118
119
120
121
122
123
124
125
126

127
128

129
130
131
132

133
134
135
136
137
138


139
140
141
142
143
144
145

146
147
148
149
150

151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204


+











+
-
-
+
+


+
+
+
-
+
-
-
+
+
+


-
-
-
+
+
+
+
+
+
+
+
+
+



-
-
+
+
+
+
+














-
-
-
+
+
+
+
+
+











-
-
+
+





-
-
+
+





-

+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

-
-
+
+
+
+
+
+








-
+

-
+
+
+

-
+
+
+
+
+

-
-
+
+





-
+




-
+



+

+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

//! Channel based on [`LimQ`].

mod err;
mod rx;
mod tx;

use std::sync::Arc;

use parking_lot::{Condvar, Mutex};

use limq::LimQ;

use wakerizer::Wakers;

pub use err::Error;
pub use limq::Overflow;
pub use rx::Receiver;
pub use limq::{Controller, Overflow};
pub use rx::{Receiver, RecvFuture};
pub use tx::Sender;

struct Inner<C, T>
where
  C: Controller<Item = T>

{
struct Inner<T> {
  q: LimQ<T>
  q: LimQ<C, T>,
  tx_count: usize,
  rx_count: usize
}

impl<T> Inner<T> {
  fn new(qlim: Option<usize>) -> Self {
    Self { q: LimQ::new(qlim) }
impl<C, T> Inner<C, T>
where
  C: Controller<Item = T>
{
  fn new(lc: C) -> Self {
    Self {
      q: LimQ::new(lc),
      tx_count: 0,
      rx_count: 0
    }
  }
}

struct Shared<T> {
  inner: Mutex<Inner<T>>,
struct Shared<C, T>
where
  C: Controller<Item = T>
{
  inner: Mutex<Inner<C, T>>,

  /// Used from receivers in order to wait for senders to add new elements to
  /// the queue.
  tx_wakers: Wakers,

  tx_signal: Condvar,

  /// Used from senders in order to wait for receivers to make room available
  /// in the queue.
  rx_wakers: Wakers,

  rx_signal: Condvar
}

impl<T> Shared<T> {
  fn new(qlim: Option<usize>) -> Self {
    let inner = Inner::new(qlim);
impl<C, T> Shared<C, T>
where
  C: Controller<Item = T>
{
  fn new(lc: C) -> Self {
    let inner = Inner::new(lc);
    let inner = Mutex::new(inner);
    Self {
      inner,
      tx_wakers: Wakers::new(),
      rx_wakers: Wakers::new(),
      tx_signal: Condvar::new(),
      rx_signal: Condvar::new()
    }
  }

  /// Called by the receiver end-point when a node has been takwn off the
  /// queue, in case a sender is waiting for space to become available.
  fn wake_sender(&self) {
  /// queue, in case any sendera are waiting for space to become available.
  fn wake_senders(&self) {
    self.tx_wakers.wake_all();
    self.tx_signal.notify_one();
  }

  /// Called by the sender end-point when a node has been added to the queue in
  /// case a receiver is waiting for nodes to become available.
  fn wake_receiver(&self) {
  /// case any receivers are waiting for nodes to become available.
  fn wake_receivers(&self) {
    self.rx_wakers.wake_all();
    self.rx_signal.notify_one();
  }
}


/// Create a channel pair, with an optional internal queue limit.
///
/// ```
/// use limqch::{channel, Error};
/// use limq::{LimQ, LengthLimit};
///
/// // Construct a channel which uses an internal queue that is limited to
/// // 2 elements.
/// let lenlim = LengthLimit::new(2);
/// let (tx, rx) = channel(lenlim);
///
/// tx.try_send(1).unwrap();
/// tx.try_send(2).unwrap();
/// let Err(Error::WontFit(n)) = tx.try_send(3) else {
///   panic!("Unexpectedly not Error::WontFit");
/// };
///
/// let n = rx.try_recv().unwrap().unwrap();
/// assert_eq!(n, 1);
/// ```
#[must_use]
pub fn channel<T>(qlim: Option<usize>) -> (Sender<T>, Receiver<T>) {
  let sh = Shared::new(qlim);
pub fn channel<C, T>(lc: C) -> (Sender<C, T>, Receiver<C, T>)
where
  C: Controller<Item = T>,
  T: Send + Sync
{
  let sh = Shared::new(lc);
  let sh = Arc::new(sh);

  let tx = Sender::new(Arc::clone(&sh));
  let rx = Receiver::new(sh);

  (tx, rx)
}


/*
/// Object that can be used to spawn sender and receiver end-points.
pub struct Spawner<T>(Arc<Shared<T>>);
pub struct Spawner<C, T>(Arc<Shared<C, T>>)
where
  C: Controller<Item = T>;

impl<T> Spawner<T> {
impl<C, T> Spawner<C, T>
where
  C: Controller<Item = T>,
  T: Send + Sync
{
  #[must_use]
  pub fn new(qlim: Option<usize>) -> Self {
    let sh = Shared::new(qlim);
  pub fn new(lc: C) -> Self {
    let sh = Shared::new(lc);
    let sh = Arc::new(sh);
    Self(sh)
  }

  #[must_use]
  pub fn sender(&self) -> Sender<T> {
  pub fn sender(&self) -> Sender<C, T> {
    Sender::new(Arc::clone(&self.0))
  }

  #[must_use]
  pub fn receiver(&self) -> Receiver<T> {
  pub fn receiver(&self) -> Receiver<C, T> {
    Receiver::new(Arc::clone(&self.0))
  }
}
*/


/// Proxy object to allow [`Sender`] and [`Receiver`] instances to
/// access the internal queue's [`Controller`].
#[derive(Clone)]
#[repr(transparent)]
pub struct Ctrl<C, T>(Arc<Shared<C, T>>)
where
  C: Controller<Item = T>;

impl<C, T> Ctrl<C, T>
where
  C: Controller<Item = T>
{
  /// Access the underlying [`Controller`] in a closure.
  ///
  /// This can be used to access the `Controller`.
  ///
  /// # Caveat Utilitor
  /// An internal lock is held while the closure is called.  The application
  /// must return quickly to avoid holding up the channel end-points (or,
  /// worse, holding up an an async runtime).
  pub fn with_ctrl<F, R>(&self, f: F) -> R
  where
    F: FnOnce(&C) -> R
  {
    let inner = self.0.inner.lock();
    f(inner.q.controller())
  }

  /// Access the underlying [`Controller`] mutably in a closure.
  ///
  /// This can be used to reconfigure the `Controller`, if it support
  /// reconfiguration.
  ///
  /// # Caveat Utilitor
  /// An internal lock is held while the closure is called.  The application
  /// must return quickly to avoid holding up the channel end-points (or,
  /// worse, holding up an an async runtime).
  pub fn with_ctrl_mut<F, R>(&self, f: F) -> R
  where
    F: FnOnce(&mut C) -> R
  {
    let mut inner = self.0.inner.lock();
    f(inner.q.controller_mut())
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
Changes to src/rx.rs.
1
2
3
4
5
6
7
8
9
10

11
12
13
14
15





16
17














18




19
20




21
22
23


24










25

26
27
28




29
30
31
32
33
34
35
36


37
38

39
40
41
42
43

44
45


46
47

48
49
50
51
52
53
54
55









56



57

58
59
60
61

62
63


64
65
66
67
68
69



70











71
72
73
74



75
76





77
78
79
80
81





82
83
84
85
86
87
88



89
90
91
92
93
94
95
1
2
3
4
5
6
7
8
9

10
11
12



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

34
35
36
37
38

39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

59
60
61
62
63
64
65
66
67
68
69
70
71
72


73
74


75

76
77
78

79
80
81
82
83
84

85
86
87
88
89
90
91


92
93
94
95
96
97
98
99
100
101
102
103
104

105
106
107


108


109
110

111
112
113
114
115
116
117
118

119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136


137
138
139
140
141
142
143
144


145
146
147
148
149
150
151
152
153
154
155

156
157
158
159
160
161
162
163
164
165









-
+


-
-
-
+
+
+
+
+


+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+
+
+

-
+
+
+
+



+
+

+
+
+
+
+
+
+
+
+
+
-
+



+
+
+
+






-
-
+
+
-
-
+
-



-
+


+
+

-
+






-
-
+
+
+
+
+
+
+
+
+

+
+
+
-
+


-
-
+
-
-
+
+
-





+
+
+
-
+
+
+
+
+
+
+
+
+
+
+




+
+
+
-
-
+
+
+
+
+



-
-
+
+
+
+
+






-
+
+
+







use std::{
  future::Future,
  pin::Pin,
  sync::Arc,
  task::{Context, Poll}
};

use wakerizer::Waiter;

use super::Shared;
use super::{Controller, Ctrl, Error, Shared};

/// Channel receiver end-point.
#[derive(Clone)]
pub struct Receiver<T> {
  sh: Arc<Shared<T>>
pub struct Receiver<C, T>
where
  C: Controller<Item = T>
{
  sh: Arc<Shared<C, T>>
}

impl<C, T> Clone for Receiver<C, T>
where
  C: Controller<Item = T>
{
  fn clone(&self) -> Self {
    let mut inner = self.sh.inner.lock();
    inner.rx_count += 1;
    drop(inner);

    let sh = Arc::clone(&self.sh);
    Self { sh }
  }
}

impl<T> Receiver<T> {
impl<C, T> Receiver<C, T>
where
  C: Controller<Item = T>
{
  #[must_use]
  pub(super) const fn new(sh: Arc<Shared<T>>) -> Self {
  pub(super) fn new(sh: Arc<Shared<C, T>>) -> Self {
    let mut inner = sh.inner.lock();
    inner.rx_count += 1;
    drop(inner);
    Self { sh }
  }

  /// Return a [`Ctrl`], which can be used to access the internal
  /// [`Controller`].
  #[must_use]
  pub fn ctrl(&self) -> Ctrl<C, T> {
    Ctrl(Arc::clone(&self.sh))
  }

  /// Receive a node from queue.  If the queue is empty, block and wait for a
  /// new node to arrive.
  ///
  /// # Errors
  /// [`Error::Closed`] means that the queue is empty and all the
  /// [`Sender`](super::Sender)s have been dropped.
  pub fn recv_blocking(&self) -> T {
  pub fn recv_blocking(&self) -> Result<T, Error<T>> {
    let mut inner = self.sh.inner.lock();

    let n = loop {
      if inner.q.is_empty() && inner.tx_count == 0 {
        return Err(Error::Closed);
      }

      if let Some(n) = inner.q.pop() {
        break n;
      }
      self.sh.rx_signal.wait(&mut inner);
    };

    // If there's a queue limit configured, then senders may be blocked waiting
    // for space to become available.
    // Wake up any async senders that are waiting for space to become
    // available.
    if inner.q.max_len().is_some() {
      self.sh.wake_sender();
    self.sh.wake_senders();
    }

    drop(inner);

    n
    Ok(n)
  }

  /// Returns a `Future` that is the `async` equivalent of
  /// [`Receiver::recv_blocking()`].
  #[must_use]
  pub fn recv_async(&self) -> RecvFuture<T> {
  pub fn recv_async(&self) -> RecvFuture<C, T> {
    RecvFuture {
      sh: Arc::clone(&self.sh),
      waiter: self.sh.rx_wakers.waiter()
    }
  }

  #[must_use]
  pub fn try_recv(&self) -> Option<T> {
  /// Attempt to retreive a node from the queue.
  ///
  /// Returns `Ok(Some(T))` if there's a node available for immediate pickup.
  /// Returns `Ok(None)` is there are no nodes to pick up.
  ///
  /// # Errors
  /// [`Error::Closed`] means that the queue is empty and all the
  /// [`Sender`](super::Sender)s have been dropped.
  pub fn try_recv(&self) -> Result<Option<T>, Error<T>> {
    let mut inner = self.sh.inner.lock();
    if inner.q.is_empty() && inner.tx_count == 0 {
      return Err(Error::Closed);
    }
    inner.q.pop().map_or_else(
    Ok(inner.q.pop().map_or_else(
      || None,
      |n| {
        // If there's a queue limit configured, then senders may be blocked
        // waiting for space to become available.
        // Wake up any async senders that are waiting for space to become
        if inner.q.max_len().is_some() {
          self.sh.wake_sender();
        // available.
        self.sh.wake_senders();
        }

        drop(inner);

        Some(n)
      }
    ))
  }
}
    )

impl<C, T> Drop for Receiver<C, T>
where
  C: Controller<Item = T>
{
  fn drop(&mut self) {
    let mut inner = self.sh.inner.lock();
    inner.rx_count -= 1;
    drop(inner);

    self.sh.wake_senders();
  }
}


/// A `Future` that will will resolve when there's data that can be returned
/// from the channel, of it the internal queue is empty but all the
/// [`Sender`](super::Sender) end-points have been dropped.
pub struct RecvFuture<T> {
  sh: Arc<Shared<T>>,
pub struct RecvFuture<C, T>
where
  C: Controller<Item = T>
{
  sh: Arc<Shared<C, T>>,
  waiter: Waiter
}

impl<T> Future for RecvFuture<T> {
  type Output = T;
impl<C, T> Future for RecvFuture<C, T>
where
  C: Controller<Item = T>
{
  type Output = Result<T, Error<T>>;
  fn poll(
    mut self: Pin<&mut Self>,
    ctx: &mut Context<'_>
  ) -> Poll<Self::Output> {
    let mut inner = self.sh.inner.lock();
    if let Some(n) = inner.q.pop() {
      Poll::Ready(n)
      Poll::Ready(Ok(n))
    } else if inner.tx_count == 0 {
      Poll::Ready(Err(Error::Closed))
    } else {
      drop(inner); // happy borrow-checker
      self.waiter.prime(ctx);
      Poll::Pending
    }
  }
}
Changes to src/tx.rs.
1
2
3
4
5
6
7
8
9
10

11
12
13
14
15





16
17














18
19









20
21







22
23
24
25
26
27
28



29

30
31

32
33


34
35
36
37
38
39

















40


41
42
43








44
45
46
47











48
49

50
51
52
53
54
55
56
57
58
59


60

61
62

63
64
65
66
67
68
69
70
71
72










73

74
75
76
77
78







79
80




81
82
83
84


85



86
87
88
89



90
91
92
93
94


95
96



97
98
99


100
101
102
103
104

105
106

107


108
109
110
111
112
113




114

115
116

117
118
119
120
121



122
123
124
125




126





127
128

129
130
131
132
133
















134
135
136
137
138
139
140
141








142
143
144
145






146
147
148
149
150
151
152
153
154
155
156
157
158








159
160


161
162
163
164
165
1
2
3
4
5
6
7
8
9

10
11
12



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33


34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

62


63
64
65
66
67






68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87



88
89
90
91
92
93
94
95




96
97
98
99
100
101
102
103
104
105
106
107

108


109
110
111
112
113
114
115
116
117
118

119


120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140

141
142




143
144
145
146
147
148
149


150
151
152
153




154
155

156
157
158
159
160
161

162
163
164
165
166
167
168
169
170
171


172
173
174
175


176
177



178

179
180
181
182

183
184
185
186
187
188
189
190
191
192
193
194

195
196

197

198
199
200

201
202
203
204
205
206
207
208
209
210
211

212
213
214
215
216
217

218

219
220
221

222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242



243
244
245
246
247
248
249
250
251
252


253
254
255
256
257
258
259
260
261
262
263








264
265
266
267
268
269
270
271


272
273
274
275
276
277
278









-
+


-
-
-
+
+
+
+
+


+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
+
+
+
+
+
+
+
+
+


+
+
+
+
+
+
+







+
+
+
-
+
-
-
+


+
+
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+

+
+
-
-
-
+
+
+
+
+
+
+
+
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+

-
+
-
-








+
+
-
+
-
-
+










+
+
+
+
+
+
+
+
+
+
-
+

-
-
-
-
+
+
+
+
+
+
+
-
-
+
+
+
+
-
-
-
-
+
+
-
+
+
+



-
+
+
+





+
+
-
-
+
+
+

-
-
+
+
-
-
-

-
+


+
-
+
+






+
+
+
+
-
+

-
+
-



-
+
+
+




+
+
+
+
-
+
+
+
+
+

-
+
-



-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+





-
-
-
+
+
+
+
+
+
+
+


-
-
+
+
+
+
+
+





-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
-
-
+
+





use std::{
  future::Future,
  pin::Pin,
  sync::Arc,
  task::{Context, Poll}
};

use wakerizer::Waiter;

use super::{Overflow, Shared};
use super::{Controller, Ctrl, Error, Overflow, Shared};

/// Channel transmitter end-point.
#[derive(Clone)]
pub struct Sender<T> {
  sh: Arc<Shared<T>>
pub struct Sender<C, T>
where
  C: Controller<Item = T>
{
  sh: Arc<Shared<C, T>>
}

impl<C, T> Clone for Sender<C, T>
where
  C: Controller<Item = T>
{
  fn clone(&self) -> Self {
    let mut inner = self.sh.inner.lock();
    inner.tx_count += 1;
    drop(inner);

    let sh = Arc::clone(&self.sh);
    Self { sh }
  }
}

impl<T> Sender<T> {
  pub(super) const fn new(sh: Arc<Shared<T>>) -> Self {
impl<C, T> Sender<C, T>
where
  C: Controller<Item = T>,
  T: Send + Sync
{
  pub(super) fn new(sh: Arc<Shared<C, T>>) -> Self {
    let mut inner = sh.inner.lock();
    inner.tx_count += 1;
    drop(inner);
    Self { sh }
  }

  /// Return a [`Ctrl`], which can be used to access the internal
  /// [`Controller`].
  #[must_use]
  pub fn ctrl(&self) -> Ctrl<C, T> {
    Ctrl(Arc::clone(&self.sh))
  }

  /// Send an element over channel.
  ///
  /// If the channel has a limit, and the limit has been reached, then block
  /// and wait until a [`Receiver`](super::Receiver) has make more room
  /// available on the queue.
  ///
  /// # Errors
  /// [`Error::CantFit`] means the node was rejected by the
  /// [`Controller`]. [`Error::Closed`] means there are no more
  /// Returns `Err(())` if all [`Receiver`](super::Receiver)s have been
  /// [`Receiver`](super::Receiver)s available.
  /// dropped.
  pub fn send_blocking(&self, n: T) {
  pub fn send_blocking(&self, mut n: T) -> Result<(), Error<T>> {
    let mut inner = self.sh.inner.lock();

    // Keep trying to `try_push()` until successful.
    loop {
    // Wait until queue isn't full
    if let Some(max_len) = inner.q.max_len() {
      while inner.q.len() == max_len {
        self.sh.tx_signal.wait(&mut inner);
      }
    }
      if inner.rx_count == 0 {
        return Err(Error::Closed);
      }

      n = match inner.q.try_push(n) {
        Ok(()) => break,
        Err(e) => match e {
          limq::Error::WontFit(n) => n,
          limq::Error::CantFit(n) => {
            return Err(Error::CantFit(n));
          }
        }
      };
      self.sh.tx_signal.wait(&mut inner);
    }

    drop(inner);

    // Have a new element in queue -- wake up a waiting receiver
    self.sh.wake_receivers();
    // Ignoring error is okay here.  It was just determined that there's room
    // for the new element in the queue, so this must succeed.
    //

    Ok(())
  }

  /// This exists only because the compiler thinks `send_async()` is not `Send`
  /// (because it thinks `inner` is held past the `await`, even though it
  /// isn't.
  ///
    // This is true as long as the mutex isn't unlocked between the check above
    // and this call
    let _ = inner.q.try_push(n);

  /// # Errors
  /// [`Error::CantFit`] means the node was rejected by the
  /// [`Controller`]. [`Error::Closed`] means there are no more
  /// [`Receiver`](super::Receiver)s available.
  fn try_push(&self, n: T) -> Result<(), Error<T>> {
    let mut inner = self.sh.inner.lock();
    if inner.rx_count == 0 {
      return Err(Error::Closed);
    }

    inner.q.try_push(n)?;
    drop(inner);

    Ok(())
    // Have a new element in queue -- wake up a waiting receiver
    self.sh.wake_receiver();
  }

  /// Send an element over channel.
  ///
  /// If the channel has a limit, and the limit has been reached, then block
  /// and wait until a [`Receiver`](super::Receiver) has make more room
  /// available on the queue.
  ///
  /// # Errors
  /// [`Error::CantFit`] means the [`Controller`] rejected the node.
  /// Returns `Err(())` if all [`Receiver`](super::Receiver)s have been
  /// [`Error::Closed`] means not [`Receiver`](super::Receiver)s remain.
  /// dropped.
  pub async fn send_async(&self, mut n: T) {
  pub async fn send_async(&self, mut n: T) -> Result<(), Error<T>> {
    // In an ideal world, there'd be a SendFuture that takes in the new node.
    // However, if the queue is full the node needs to be stored somewhere
    // until the task is woken up, and this presents a few annoying challenges.
    //
    // Instead, we'll use a Future that simply waits for there to be space
    // available and when that resolves immediate all the node to the queue.
    //
    // For multithreaded runtimes it is possible for a TOCTOU issue here so we
    // need to loop until the try_push() is successful.
    loop {
      // Attempt to push node onto queue.
      n = match self.try_push(n) {
        Ok(()) => break,
        Err(e) => match e {
          Error::Closed => return Err(Error::Closed),
          Error::WontFit(n) => n,
          Error::CantFit(n) => return Err(Error::CantFit(n))
        }
      };

      ReserveSpaceFuture {
      let fut = ReserveSpaceFuture {
        sh: Arc::clone(&self.sh),
        waiter: self.sh.tx_wakers.waiter()
      }
      .await;

        waiter: self.sh.tx_wakers.waiter(),
        n: &n
      };
      match fut.await {
        Ok(()) => {
          // fall through
        }
      let mut inner = self.sh.inner.lock();

        Err(e) => match e {
          limq::CheckErr::WontFit => {
            // fall through
          }
      // ReserveSpaceFuture should have made sure that space is available.
      n = match inner.q.try_push(n) {
        Ok(()) => break,
        Err(n) => n
          limq::CheckErr::CantFit => {
            return Err(Error::CantFit(n));
      };
          }
        }
      }
    }

    // Have a new element in queue -- wake up waiting receivers
    self.sh.wake_receiver();
    self.sh.wake_receivers();

    Ok(())
  }

  /// Fallible sending.
  ///
  /// # Errors
  /// [`Error::CantFit`] means the [`Controller`] permanently rejected the
  /// node.  [`Error::WontFit`] means the [`Controller`] temporarily
  /// If the queue is full the node is returned.
  pub fn try_send(&self, n: T) -> Result<(), T> {
  /// rejected the node.  [`Error::Closed`] means not
  /// [`Receiver`](super::Receiver)s remain.
  pub fn try_send(&self, n: T) -> Result<(), Error<T>> {
    let mut inner = self.sh.inner.lock();
    let res = inner.q.try_push(n);

    if inner.rx_count == 0 {
      return Err(Error::Closed);
    // Have a new element in queue -- wake up a waiting receiver
    if res.is_ok() {
      self.sh.wake_receiver();
    }

    inner.q.try_push(n)?;
    drop(inner);

    self.sh.wake_receivers();
    res

    Ok(())
  }

  /// Forcibly add an element to the queue.
  ///
  /// If the queue has a limit and the queue is full, then the oldest node will
  /// be removed before the new element is added.
  ///
  /// # Errors
  /// [`Error::CantFit`] means the [`Controller`] rejected the node.
  /// [`Error::Closed`] means not [`Receiver`](super::Receiver)s remain.
  pub fn force_send(&self, n: T) {
  pub fn force_send(&self, n: T) -> Result<(), Error<T>> {
    let mut inner = self.sh.inner.lock();
    inner.q.force_push(n);
    inner.q.force_push(n)?;

    drop(inner);

    // Have a new element in queue -- wake up a waiting receiver
    self.sh.wake_receiver();
    self.sh.wake_receivers();

    Ok(())
  }

  /// Forcibly add an element to rhe channel, allowing the caller to determine
  /// how overflow is handled.
  ///
  /// # Errors
  /// [`Error::CantFit`] means the [`Controller`] rejected the node.
  /// [`Error::Closed`] means not [`Receiver`](super::Receiver)s remain.
  pub fn force_send_oc(&self, n: T, overflow: Overflow) {
  pub fn force_send_oc(
    &self,
    n: T,
    overflow: Overflow
  ) -> Result<(), Error<T>> {
    let mut inner = self.sh.inner.lock();
    inner.q.force_push_oc(n, overflow);
    inner.q.force_push_oc(n, overflow)?;

    drop(inner);

    // Have a new element in queue -- wake up a waiting receiver
    self.sh.wake_receiver();
    self.sh.wake_receivers();

    Ok(())
  }
}

impl<C, T> Drop for Sender<C, T>
where
  C: Controller<Item = T>
{
  fn drop(&mut self) {
    let mut inner = self.sh.inner.lock();
    inner.tx_count -= 1;
    drop(inner);

    self.sh.wake_receivers();
  }
}


/// A [`Future`] that will resolve when the queue is not full.
struct ReserveSpaceFuture<T> {
  sh: Arc<Shared<T>>,
  waiter: Waiter
struct ReserveSpaceFuture<'n, C, T>
where
  C: Controller<Item = T>,
  T: Send
{
  sh: Arc<Shared<C, T>>,
  waiter: Waiter,
  n: &'n T
}

impl<T> Future for ReserveSpaceFuture<T> {
  type Output = ();
impl<C, T> Future for ReserveSpaceFuture<'_, C, T>
where
  C: Controller<Item = T>,
  T: Send
{
  type Output = Result<(), limq::CheckErr>;
  fn poll(
    mut self: Pin<&mut Self>,
    ctx: &mut Context<'_>
  ) -> Poll<Self::Output> {
    let inner = self.sh.inner.lock();
    if let Some(max_len) = inner.q.max_len() {
      if inner.q.len() < max_len {
        Poll::Ready(())
      } else {
        drop(inner); // happy borrow-checker
        self.waiter.prime(ctx);
        Poll::Pending
      }
    match inner.q.would_fit(self.n) {
      Ok(()) => Poll::Ready(Ok(())),
      Err(e) => match e {
        limq::CheckErr::WontFit => {
          drop(inner); // happy borrow-checker
          self.waiter.prime(ctx);
          Poll::Pending
        }
    } else {
      Poll::Ready(())
        limq::CheckErr::CantFit => Poll::Ready(Err(limq::CheckErr::CantFit))
      }
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
Changes to tests/basic.rs.
1
2
3
4


5


6
7
8

9
10
11
12




13
14
15
16



17
18
19
20

21
22
23
24




25
26
27
28



29
30
31
32

33

34
35
36

37
38
39

40
41
42
43

44

45
46
47

48
49
50

51
52
53
54

55

56
57
58

59
60
61
62

63
64
65
66
67
68
1
2
3
4
5
6

7
8
9
10
11
12




13
14
15
16
17



18
19
20
21
22
23
24
25




26
27
28
29
30



31
32
33
34
35
36
37
38

39
40
41

42
43
44

45
46
47
48
49
50

51
52
53

54
55
56

57
58
59
60
61
62

63
64
65

66
67
68
69

70
71
72
73
74
75
76




+
+
-
+
+



+
-
-
-
-
+
+
+
+

-
-
-
+
+
+




+
-
-
-
-
+
+
+
+

-
-
-
+
+
+




+
-
+


-
+


-
+




+
-
+


-
+


-
+




+
-
+


-
+



-
+






use std::thread;

use tokio::task;

use limq::OptLenLim;

use limqch::channel;
use limqch::{Error, channel};


#[test]
fn try_send_full() {
  let lenlim = OptLenLim::new(Some(2));
  let (tx, rx) = channel(Some(2));
  tx.send_blocking(1);
  tx.send_blocking(2);
  assert_eq!(tx.try_send(3), Err(3));
  let (tx, rx) = channel(lenlim);
  tx.send_blocking(1).unwrap();
  tx.send_blocking(2).unwrap();
  assert_eq!(tx.try_send(3), Err(Error::WontFit(3)));

  assert_eq!(rx.recv_blocking(), 1);
  assert_eq!(rx.try_recv(), Some(2));
  assert_eq!(rx.try_recv(), None);
  assert_eq!(rx.recv_blocking().unwrap(), 1);
  assert_eq!(rx.try_recv().unwrap(), Some(2));
  assert_eq!(rx.try_recv().unwrap(), None);
}

#[tokio::test]
async fn try_on_full() {
  let lenlim = OptLenLim::new(Some(2));
  let (tx, rx) = channel(Some(2));
  tx.send_async(1).await;
  tx.send_async(2).await;
  assert_eq!(tx.try_send(3), Err(3));
  let (tx, rx) = channel(lenlim);
  tx.send_async(1).await.unwrap();
  tx.send_async(2).await.unwrap();
  assert_eq!(tx.try_send(3), Err(Error::WontFit(3)));

  assert_eq!(rx.recv_async().await, 1);
  assert_eq!(rx.try_recv(), Some(2));
  assert_eq!(rx.try_recv(), None);
  assert_eq!(rx.recv_async().await.unwrap(), 1);
  assert_eq!(rx.try_recv().unwrap(), Some(2));
  assert_eq!(rx.try_recv().unwrap(), None);
}

#[tokio::test]
async fn send_from_spawned_task() {
  let lenlim = OptLenLim::new(Some(2));
  let (tx, rx) = channel(Some(2));
  let (tx, rx) = channel(lenlim);

  task::spawn(async move {
    tx.send_async(1).await;
    tx.send_async(1).await.unwrap();
  });

  assert_eq!(rx.recv_async().await, 1);
  assert_eq!(rx.recv_async().await.unwrap(), 1);
}

#[tokio::test]
async fn blocking_send_from_thread() {
  let lenlim = OptLenLim::new(Some(2));
  let (tx, rx) = channel(Some(2));
  let (tx, rx) = channel(lenlim);

  thread::spawn(move || {
    tx.send_blocking(1);
    tx.send_blocking(1).unwrap();
  });

  assert_eq!(rx.recv_blocking(), 1);
  assert_eq!(rx.recv_blocking().unwrap(), 1);
}

#[tokio::test]
async fn task_to_task() {
  let lenlim = OptLenLim::new(Some(2));
  let (tx, rx) = channel(Some(2));
  let (tx, rx) = channel(lenlim);

  task::spawn(async move {
    tx.send_async(1).await;
    tx.send_async(1).await.unwrap();
  });

  task::spawn_blocking(move || {
    assert_eq!(rx.recv_blocking(), 1);
    assert_eq!(rx.recv_blocking().unwrap(), 1);
  })
  .await
  .unwrap();
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
Added tests/closed.rs.





























































































































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
use limq::OptLenLim;

use limqch::{Error, channel};

// No receiver exists, so sending is pointless
#[test]
fn rx_closed_sync() {
  let lenlim = OptLenLim::new(Some(2));
  let (tx, rx) = channel(lenlim);

  // Drop the only receiver
  drop(rx);

  // Transmitter should now return Error::Closed
  let Err(Error::Closed) = tx.send_blocking(11) else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };

  let Err(Error::Closed) = tx.try_send(11) else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };
}

// No receiver exists, so sending is pointless
#[tokio::test]
async fn rx_closed_async() {
  let lenlim = OptLenLim::new(Some(2));
  let (tx, rx) = channel(lenlim);

  // Drop the only receiver
  drop(rx);

  // Transmitter should now return Error::Closed
  let Err(Error::Closed) = tx.send_async(11).await else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };
}

#[test]
fn tx_closed_empty_sync() {
  let lenlim = OptLenLim::<u32>::new(Some(2));
  let (tx, rx) = channel(lenlim);

  // Drop the only transmitter
  drop(tx);

  // Transmitter should now return Error::Closed, because the queue is empty
  // and there are no receivers.
  let Err(Error::Closed) = rx.recv_blocking() else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };

  let Err(Error::Closed) = rx.try_recv() else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };
}

#[tokio::test]
async fn tx_closed_empty_async() {
  let lenlim = OptLenLim::<u32>::new(Some(2));
  let (tx, rx) = channel(lenlim);

  // Drop the only transmitter
  drop(tx);

  // Transmitter should now return Error::Closed, because the queue is empty
  // and there are no receivers.
  let Err(Error::Closed) = rx.recv_async().await else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };
}

#[test]
fn tx_closed_nonempty_sync() {
  let lenlim = OptLenLim::<u32>::new(Some(2));
  let (tx, rx) = channel(lenlim);

  // Add two nodes
  tx.try_send(1).unwrap();
  tx.try_send(2).unwrap();

  // Drop the only transmitter
  drop(tx);

  // Should succeed
  let Ok(1) = rx.recv_blocking() else {
    panic!("Unexpectedly not Ok(Some(1))");
  };
  let Ok(Some(2)) = rx.try_recv() else {
    panic!("Unexpectedly not Ok(Some(2))");
  };

  // queue is now empty, so it should be returning Error::Closed
  let Err(Error::Closed) = rx.recv_blocking() else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };

  let Err(Error::Closed) = rx.try_recv() else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };
}

#[tokio::test]
async fn tx_closed_nonempty_async() {
  let lenlim = OptLenLim::<u32>::new(Some(2));
  let (tx, rx) = channel(lenlim);

  // Add a node
  tx.try_send(1).unwrap();

  // Drop the only transmitter
  drop(tx);

  // Should succeed
  let Ok(1) = rx.recv_async().await else {
    panic!("Unexpectedly not Ok(Some(1))");
  };

  // queue is now empty, so it should be returning Error::Closed
  let Err(Error::Closed) = rx.recv_async().await else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
Added tests/ctrl.rs.


























1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
use limq::BufLim;

use limqch::channel;

#[test]
fn access_controller() {
  let lenlim = BufLim::new(None, None);
  let (tx, _rx) = channel(lenlim);

  let max_len = tx.ctrl().with_ctrl(BufLim::get_max_len);
  assert_eq!(max_len, None);

  let max_size = tx.ctrl().with_ctrl(BufLim::get_max_size);
  assert_eq!(max_size, None);

  tx.ctrl().with_ctrl_mut(|c| c.set_max_len(Some(2)));
  tx.ctrl().with_ctrl_mut(|c| c.set_max_size(Some(8)));

  let max_len = tx.ctrl().with_ctrl(BufLim::get_max_len);
  assert_eq!(max_len, Some(2));

  let max_size = tx.ctrl().with_ctrl(BufLim::get_max_size);
  assert_eq!(max_size, Some(8));
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
Changes to www/changelog.md.
1
2
3
4
5
6
7

8
9
10
11
12
13
14















15
16
17
18
19
20
1
2
3
4
5
6

7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35






-
+







+
+
+
+
+
+
+
+
+
+
+
+
+
+
+






# Change Log

⚠️  indicates a breaking change.

## [Unreleased]

[Details](/vdiff?from=limqch-0.1.0&to=trunk)
[Details](/vdiff?from=limqch-0.2.0&to=trunk)

### Added

### Changed

### Removed

---

## [0.2.0] - 2025-04-11

[Details](/vdiff?from=limqch-0.1.0&to=limqch-0.2.0)

### Changed

-⚠️ Update for `limq` `0.3.0`.  Creating a limq channel now requires a
  `Controller` implementation.

### Removed

- `Spawner` was removed.

---

## [0.1.0] - 2025-04-01

Initial release.