limqch

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From limqch-0.1.0 To limqch-0.2.0

2025-04-12
20:35
Use limq 0.3.1 to get bugfix. check-in: ef18178262 user: jan tags: trunk
2025-04-10
23:43
Release maintenance. check-in: 51ff61f471 user: jan tags: limqch-0.2.0, trunk
23:24
Docs. check-in: 8c996a0bf2 user: jan tags: trunk
23:03
Merge. check-in: 7429ab4ba0 user: jan tags: trunk
2025-04-04
01:59
Redesign for limq 0.2.0. check-in: 1024ad1186 user: jan tags: limq-0.2.0-updates
2025-04-01
01:48
Link to limq. check-in: c40e4c6151 user: jan tags: limqch-0.1.0, trunk
01:47
Move from old repo. check-in: 984b708d78 user: jan tags: trunk

Changes to .efiles.
1
2
3
4

5
6
7
8


Cargo.toml
README.md
www/index.md
www/changelog.md

src/lib.rs
src/tx.rs
src/rx.rs
tests/basic.rs






>




>
>
1
2
3
4
5
6
7
8
9
10
11
Cargo.toml
README.md
www/index.md
www/changelog.md
src/err.rs
src/lib.rs
src/tx.rs
src/rx.rs
tests/basic.rs
tests/closed.rs
tests/ctrl.rs
Changes to Cargo.toml.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
[package]
name = "limqch"
version = "0.1.0"
edition = "2021"
license = "0BSD"
# https://crates.io/category_slugs
categories = [ "concurrency" ]
keywords = [ "channel", "bounded" ]
repository = "https://repos.qrnch.tech/pub/limqch"
description = "A channel built on top of limq."
rust-version = "1.56"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "www",
  "bacon.toml",
  "rustfmt.toml"
]

# https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section
[badges]
maintenance = { status = "actively-developed" }

[dependencies]
limq = { version = "0.1.4" }
parking_lot = { version = "0.12.3" }
wakerizer = { version = "0.1.0" }

[dev-dependencies]
tokio = { version = "1.44.1", features = ["macros", "rt", "rt-multi-thread"] }

[package.metadata.docs.rs]


|
|






|














|







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
[package]
name = "limqch"
version = "0.2.0"
edition = "2024"
license = "0BSD"
# https://crates.io/category_slugs
categories = [ "concurrency" ]
keywords = [ "channel", "bounded" ]
repository = "https://repos.qrnch.tech/pub/limqch"
description = "A channel built on top of limq."
rust-version = "1.85"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "www",
  "bacon.toml",
  "rustfmt.toml"
]

# https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section
[badges]
maintenance = { status = "actively-developed" }

[dependencies]
limq = { version = "0.3.0" }
parking_lot = { version = "0.12.3" }
wakerizer = { version = "0.1.0" }

[dev-dependencies]
tokio = { version = "1.44.1", features = ["macros", "rt", "rt-multi-thread"] }

[package.metadata.docs.rs]
Added rustfmt.toml.




























>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
blank_lines_upper_bound = 2
comment_width = 79
edition = "2024"
format_strings = true
max_width = 79
match_block_trailing_comma = false
# merge_imports = true
newline_style = "Unix"
tab_spaces = 2
trailing_comma = "Never"
unstable_features = true
wrap_comments = true
#reorder_imports = false
#reorder_modules = false
Added src/err.rs.


































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
use std::fmt;

/// Errors that [`Sender`](super::Sender) and [`Receiver`](super::Receiver)
/// methods can return.
#[derive(PartialEq, Eq)]
pub enum Error<T> {
  /// No remote channel end-points remain.
  Closed,

  /// The node will currently not fit in the queue.
  WontFit(T),

  /// The node can't fit in the queue (unless reconfigured to allow larger
  /// nodes).
  CantFit(T)
}

impl<T> std::error::Error for Error<T> {}

impl<T> fmt::Debug for Error<T> {
  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
    match self {
      Self::Closed => write!(f, "Error::Closed"),
      Self::WontFit(_) => write!(f, "Error::WontFit"),
      Self::CantFit(_) => write!(f, "Error::CantFit")
    }
  }
}

impl<T> fmt::Display for Error<T> {
  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
    match self {
      Self::Closed => write!(f, "Remote end-points closed"),
      Self::WontFit(_) => write!(f, "Won't fit"),
      Self::CantFit(_) => write!(f, "Can't fit")
    }
  }
}

impl<T> From<limq::Error<T>> for Error<T> {
  fn from(err: limq::Error<T>) -> Self {
    match err {
      limq::Error::WontFit(n) => Self::WontFit(n),
      limq::Error::CantFit(n) => Self::CantFit(n)
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
Changes to src/lib.rs.
1
2

3
4
5
6
7
8
9
10
11
12
13

14
15
16
17



18
19
20


21
22
23



24

25



26
27
28
29



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74



















75
76




77
78
79
80
81
82
83
84
85
86
87
88


89
90




91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107

108















































109
//! Channel based on [`LimQ`].


mod rx;
mod tx;

use std::sync::Arc;

use parking_lot::{Condvar, Mutex};

use limq::LimQ;

use wakerizer::Wakers;


pub use limq::Overflow;
pub use rx::Receiver;
pub use tx::Sender;





struct Inner<T> {
  q: LimQ<T>


}

impl<T> Inner<T> {



  fn new(qlim: Option<usize>) -> Self {

    Self { q: LimQ::new(qlim) }



  }
}

struct Shared<T> {



  inner: Mutex<Inner<T>>,

  /// Used from receivers in order to wait for senders to add new elements to
  /// the queue.
  tx_wakers: Wakers,

  tx_signal: Condvar,

  /// Used from senders in order to wait for receivers to make room available
  /// in the queue.
  rx_wakers: Wakers,

  rx_signal: Condvar
}

impl<T> Shared<T> {



  fn new(qlim: Option<usize>) -> Self {
    let inner = Inner::new(qlim);
    let inner = Mutex::new(inner);
    Self {
      inner,
      tx_wakers: Wakers::new(),
      rx_wakers: Wakers::new(),
      tx_signal: Condvar::new(),
      rx_signal: Condvar::new()
    }
  }

  /// Called by the receiver end-point when a node has been takwn off the
  /// queue, in case a sender is waiting for space to become available.
  fn wake_sender(&self) {
    self.tx_wakers.wake_all();
    self.tx_signal.notify_one();
  }

  /// Called by the sender end-point when a node has been added to the queue in
  /// case a receiver is waiting for nodes to become available.
  fn wake_receiver(&self) {
    self.rx_wakers.wake_all();
    self.rx_signal.notify_one();
  }
}


/// Create a channel pair, with an optional internal queue limit.



















#[must_use]
pub fn channel<T>(qlim: Option<usize>) -> (Sender<T>, Receiver<T>) {




  let sh = Shared::new(qlim);
  let sh = Arc::new(sh);

  let tx = Sender::new(Arc::clone(&sh));
  let rx = Receiver::new(sh);

  (tx, rx)
}


/// Object that can be used to spawn sender and receiver end-points.
pub struct Spawner<T>(Arc<Shared<T>>);



impl<T> Spawner<T> {




  #[must_use]
  pub fn new(qlim: Option<usize>) -> Self {
    let sh = Shared::new(qlim);
    let sh = Arc::new(sh);
    Self(sh)
  }

  #[must_use]
  pub fn sender(&self) -> Sender<T> {
    Sender::new(Arc::clone(&self.0))
  }

  #[must_use]
  pub fn receiver(&self) -> Receiver<T> {
    Receiver::new(Arc::clone(&self.0))
  }
}

















































// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :


>











>
|
|


>
>
>
|
<
|
>
>


|
>
>
>
|
>
|
>
>
>



|
>
>
>
|














|
>
>
>
|
|











|
|





|
|





<

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

|
>
>
>
>
|








|

|
>
>

|
>
>
>
>

|
|





|




|



>

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91

92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
//! Channel based on [`LimQ`].

mod err;
mod rx;
mod tx;

use std::sync::Arc;

use parking_lot::{Condvar, Mutex};

use limq::LimQ;

use wakerizer::Wakers;

pub use err::Error;
pub use limq::{Controller, Overflow};
pub use rx::{Receiver, RecvFuture};
pub use tx::Sender;

struct Inner<C, T>
where
  C: Controller<Item = T>
{

  q: LimQ<C, T>,
  tx_count: usize,
  rx_count: usize
}

impl<C, T> Inner<C, T>
where
  C: Controller<Item = T>
{
  fn new(lc: C) -> Self {
    Self {
      q: LimQ::new(lc),
      tx_count: 0,
      rx_count: 0
    }
  }
}

struct Shared<C, T>
where
  C: Controller<Item = T>
{
  inner: Mutex<Inner<C, T>>,

  /// Used from receivers in order to wait for senders to add new elements to
  /// the queue.
  tx_wakers: Wakers,

  tx_signal: Condvar,

  /// Used from senders in order to wait for receivers to make room available
  /// in the queue.
  rx_wakers: Wakers,

  rx_signal: Condvar
}

impl<C, T> Shared<C, T>
where
  C: Controller<Item = T>
{
  fn new(lc: C) -> Self {
    let inner = Inner::new(lc);
    let inner = Mutex::new(inner);
    Self {
      inner,
      tx_wakers: Wakers::new(),
      rx_wakers: Wakers::new(),
      tx_signal: Condvar::new(),
      rx_signal: Condvar::new()
    }
  }

  /// Called by the receiver end-point when a node has been takwn off the
  /// queue, in case any sendera are waiting for space to become available.
  fn wake_senders(&self) {
    self.tx_wakers.wake_all();
    self.tx_signal.notify_one();
  }

  /// Called by the sender end-point when a node has been added to the queue in
  /// case any receivers are waiting for nodes to become available.
  fn wake_receivers(&self) {
    self.rx_wakers.wake_all();
    self.rx_signal.notify_one();
  }
}


/// Create a channel pair, with an optional internal queue limit.
///
/// ```
/// use limqch::{channel, Error};
/// use limq::{LimQ, LengthLimit};
///
/// // Construct a channel which uses an internal queue that is limited to
/// // 2 elements.
/// let lenlim = LengthLimit::new(2);
/// let (tx, rx) = channel(lenlim);
///
/// tx.try_send(1).unwrap();
/// tx.try_send(2).unwrap();
/// let Err(Error::WontFit(n)) = tx.try_send(3) else {
///   panic!("Unexpectedly not Error::WontFit");
/// };
///
/// let n = rx.try_recv().unwrap().unwrap();
/// assert_eq!(n, 1);
/// ```
#[must_use]
pub fn channel<C, T>(lc: C) -> (Sender<C, T>, Receiver<C, T>)
where
  C: Controller<Item = T>,
  T: Send + Sync
{
  let sh = Shared::new(lc);
  let sh = Arc::new(sh);

  let tx = Sender::new(Arc::clone(&sh));
  let rx = Receiver::new(sh);

  (tx, rx)
}

/*
/// Object that can be used to spawn sender and receiver end-points.
pub struct Spawner<C, T>(Arc<Shared<C, T>>)
where
  C: Controller<Item = T>;

impl<C, T> Spawner<C, T>
where
  C: Controller<Item = T>,
  T: Send + Sync
{
  #[must_use]
  pub fn new(lc: C) -> Self {
    let sh = Shared::new(lc);
    let sh = Arc::new(sh);
    Self(sh)
  }

  #[must_use]
  pub fn sender(&self) -> Sender<C, T> {
    Sender::new(Arc::clone(&self.0))
  }

  #[must_use]
  pub fn receiver(&self) -> Receiver<C, T> {
    Receiver::new(Arc::clone(&self.0))
  }
}
*/


/// Proxy object to allow [`Sender`] and [`Receiver`] instances to
/// access the internal queue's [`Controller`].
#[derive(Clone)]
#[repr(transparent)]
pub struct Ctrl<C, T>(Arc<Shared<C, T>>)
where
  C: Controller<Item = T>;

impl<C, T> Ctrl<C, T>
where
  C: Controller<Item = T>
{
  /// Access the underlying [`Controller`] in a closure.
  ///
  /// This can be used to access the `Controller`.
  ///
  /// # Caveat Utilitor
  /// An internal lock is held while the closure is called.  The application
  /// must return quickly to avoid holding up the channel end-points (or,
  /// worse, holding up an an async runtime).
  pub fn with_ctrl<F, R>(&self, f: F) -> R
  where
    F: FnOnce(&C) -> R
  {
    let inner = self.0.inner.lock();
    f(inner.q.controller())
  }

  /// Access the underlying [`Controller`] mutably in a closure.
  ///
  /// This can be used to reconfigure the `Controller`, if it support
  /// reconfiguration.
  ///
  /// # Caveat Utilitor
  /// An internal lock is held while the closure is called.  The application
  /// must return quickly to avoid holding up the channel end-points (or,
  /// worse, holding up an an async runtime).
  pub fn with_ctrl_mut<F, R>(&self, f: F) -> R
  where
    F: FnOnce(&mut C) -> R
  {
    let mut inner = self.0.inner.lock();
    f(inner.q.controller_mut())
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
Changes to src/rx.rs.
1
2
3
4
5
6
7
8
9
10
11
12
13
14



15
16
17














18



19
20



21
22
23


24










25
26
27
28




29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45


46
47
48
49
50
51
52
53
54







55
56



57
58
59
60
61
62

63
64
65
66
67
68
69



70










71
72
73
74



75



76
77
78
79
80



81
82
83
84
85
86
87
88


89
90
91
92
93
94
95
use std::{
  future::Future,
  pin::Pin,
  sync::Arc,
  task::{Context, Poll}
};

use wakerizer::Waiter;

use super::Shared;

/// Channel receiver end-point.
#[derive(Clone)]
pub struct Receiver<T> {



  sh: Arc<Shared<T>>
}















impl<T> Receiver<T> {



  #[must_use]
  pub(super) const fn new(sh: Arc<Shared<T>>) -> Self {



    Self { sh }
  }



  #[must_use]










  pub fn recv_blocking(&self) -> T {
    let mut inner = self.sh.inner.lock();

    let n = loop {




      if let Some(n) = inner.q.pop() {
        break n;
      }
      self.sh.rx_signal.wait(&mut inner);
    };

    // If there's a queue limit configured, then senders may be blocked waiting
    // for space to become available.
    if inner.q.max_len().is_some() {
      self.sh.wake_sender();
    }

    drop(inner);

    n
  }



  #[must_use]
  pub fn recv_async(&self) -> RecvFuture<T> {
    RecvFuture {
      sh: Arc::clone(&self.sh),
      waiter: self.sh.rx_wakers.waiter()
    }
  }

  #[must_use]







  pub fn try_recv(&self) -> Option<T> {
    let mut inner = self.sh.inner.lock();



    inner.q.pop().map_or_else(
      || None,
      |n| {
        // If there's a queue limit configured, then senders may be blocked
        // waiting for space to become available.
        if inner.q.max_len().is_some() {

          self.sh.wake_sender();
        }

        drop(inner);

        Some(n)
      }



    )










  }
}





pub struct RecvFuture<T> {



  sh: Arc<Shared<T>>,
  waiter: Waiter
}

impl<T> Future for RecvFuture<T> {



  type Output = T;
  fn poll(
    mut self: Pin<&mut Self>,
    ctx: &mut Context<'_>
  ) -> Poll<Self::Output> {
    let mut inner = self.sh.inner.lock();
    if let Some(n) = inner.q.pop() {
      Poll::Ready(n)


    } else {
      drop(inner); // happy borrow-checker
      self.waiter.prime(ctx);
      Poll::Pending
    }
  }
}









|


<
|
>
>
>
|


>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>

|
>
>
>



>
>

>
>
>
>
>
>
>
>
>
>
|



>
>
>
>






|
|
<
|
<



|


>
>

|






|
>
>
>
>
>
>
>
|

>
>
>
|


<
|
<
>
|
<





>
>
>
|
>
>
>
>
>
>
>
>
>
>




>
>
>
|
>
>
>
|



|
>
>
>
|






|
>
>







1
2
3
4
5
6
7
8
9
10
11
12

13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

75

76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107

108

109
110

111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
use std::{
  future::Future,
  pin::Pin,
  sync::Arc,
  task::{Context, Poll}
};

use wakerizer::Waiter;

use super::{Controller, Ctrl, Error, Shared};

/// Channel receiver end-point.

pub struct Receiver<C, T>
where
  C: Controller<Item = T>
{
  sh: Arc<Shared<C, T>>
}

impl<C, T> Clone for Receiver<C, T>
where
  C: Controller<Item = T>
{
  fn clone(&self) -> Self {
    let mut inner = self.sh.inner.lock();
    inner.rx_count += 1;
    drop(inner);

    let sh = Arc::clone(&self.sh);
    Self { sh }
  }
}

impl<C, T> Receiver<C, T>
where
  C: Controller<Item = T>
{
  #[must_use]
  pub(super) fn new(sh: Arc<Shared<C, T>>) -> Self {
    let mut inner = sh.inner.lock();
    inner.rx_count += 1;
    drop(inner);
    Self { sh }
  }

  /// Return a [`Ctrl`], which can be used to access the internal
  /// [`Controller`].
  #[must_use]
  pub fn ctrl(&self) -> Ctrl<C, T> {
    Ctrl(Arc::clone(&self.sh))
  }

  /// Receive a node from queue.  If the queue is empty, block and wait for a
  /// new node to arrive.
  ///
  /// # Errors
  /// [`Error::Closed`] means that the queue is empty and all the
  /// [`Sender`](super::Sender)s have been dropped.
  pub fn recv_blocking(&self) -> Result<T, Error<T>> {
    let mut inner = self.sh.inner.lock();

    let n = loop {
      if inner.q.is_empty() && inner.tx_count == 0 {
        return Err(Error::Closed);
      }

      if let Some(n) = inner.q.pop() {
        break n;
      }
      self.sh.rx_signal.wait(&mut inner);
    };

    // Wake up any async senders that are waiting for space to become
    // available.

    self.sh.wake_senders();


    drop(inner);

    Ok(n)
  }

  /// Returns a `Future` that is the `async` equivalent of
  /// [`Receiver::recv_blocking()`].
  #[must_use]
  pub fn recv_async(&self) -> RecvFuture<C, T> {
    RecvFuture {
      sh: Arc::clone(&self.sh),
      waiter: self.sh.rx_wakers.waiter()
    }
  }

  /// Attempt to retreive a node from the queue.
  ///
  /// Returns `Ok(Some(T))` if there's a node available for immediate pickup.
  /// Returns `Ok(None)` is there are no nodes to pick up.
  ///
  /// # Errors
  /// [`Error::Closed`] means that the queue is empty and all the
  /// [`Sender`](super::Sender)s have been dropped.
  pub fn try_recv(&self) -> Result<Option<T>, Error<T>> {
    let mut inner = self.sh.inner.lock();
    if inner.q.is_empty() && inner.tx_count == 0 {
      return Err(Error::Closed);
    }
    Ok(inner.q.pop().map_or_else(
      || None,
      |n| {

        // Wake up any async senders that are waiting for space to become

        // available.
        self.sh.wake_senders();


        drop(inner);

        Some(n)
      }
    ))
  }
}

impl<C, T> Drop for Receiver<C, T>
where
  C: Controller<Item = T>
{
  fn drop(&mut self) {
    let mut inner = self.sh.inner.lock();
    inner.rx_count -= 1;
    drop(inner);

    self.sh.wake_senders();
  }
}


/// A `Future` that will will resolve when there's data that can be returned
/// from the channel, of it the internal queue is empty but all the
/// [`Sender`](super::Sender) end-points have been dropped.
pub struct RecvFuture<C, T>
where
  C: Controller<Item = T>
{
  sh: Arc<Shared<C, T>>,
  waiter: Waiter
}

impl<C, T> Future for RecvFuture<C, T>
where
  C: Controller<Item = T>
{
  type Output = Result<T, Error<T>>;
  fn poll(
    mut self: Pin<&mut Self>,
    ctx: &mut Context<'_>
  ) -> Poll<Self::Output> {
    let mut inner = self.sh.inner.lock();
    if let Some(n) = inner.q.pop() {
      Poll::Ready(Ok(n))
    } else if inner.tx_count == 0 {
      Poll::Ready(Err(Error::Closed))
    } else {
      drop(inner); // happy borrow-checker
      self.waiter.prime(ctx);
      Poll::Pending
    }
  }
}
Changes to src/tx.rs.
1
2
3
4
5
6
7
8
9
10
11
12
13
14



15
16
17














18




19



20
21







22
23
24
25
26
27
28



29
30
31
32
33


34


35
36








37
38
39

40


41


42



43
44
45





46

47


48
49
50
51
52
53
54
55
56
57
58
59


60
61
62
63
64
65
66
67
68
69
70
71
72










73
74
75

76
77


78
79



80
81
82
83
84
85



86
87
88
89


90
91
92
93
94


95

96
97
98
99
100
101
102
103
104
105
106

107

108
109
110
111
112
113




114
115
116
117
118
119
120
121


122
123
124
125




126




127
128
129
130
131
132
133















134
135
136
137
138
139




140
141

142
143
144




145
146
147
148
149
150
151
152
153
154

155
156
157
158
159
160

161
162
163
164
165
use std::{
  future::Future,
  pin::Pin,
  sync::Arc,
  task::{Context, Poll}
};

use wakerizer::Waiter;

use super::{Overflow, Shared};

/// Channel transmitter end-point.
#[derive(Clone)]
pub struct Sender<T> {



  sh: Arc<Shared<T>>
}















impl<T> Sender<T> {




  pub(super) const fn new(sh: Arc<Shared<T>>) -> Self {



    Self { sh }
  }








  /// Send an element over channel.
  ///
  /// If the channel has a limit, and the limit has been reached, then block
  /// and wait until a [`Receiver`](super::Receiver) has make more room
  /// available on the queue.
  ///



  /// Returns `Err(())` if all [`Receiver`](super::Receiver)s have been
  /// dropped.
  pub fn send_blocking(&self, n: T) {
    let mut inner = self.sh.inner.lock();



    // Wait until queue isn't full


    if let Some(max_len) = inner.q.max_len() {
      while inner.q.len() == max_len {








        self.sh.tx_signal.wait(&mut inner);
      }
    }




    // Ignoring error is okay here.  It was just determined that there's room


    // for the new element in the queue, so this must succeed.



    //
    // This is true as long as the mutex isn't unlocked between the check above
    // and this call





    let _ = inner.q.try_push(n);




    drop(inner);

    // Have a new element in queue -- wake up a waiting receiver
    self.sh.wake_receiver();
  }

  /// Send an element over channel.
  ///
  /// If the channel has a limit, and the limit has been reached, then block
  /// and wait until a [`Receiver`](super::Receiver) has make more room
  /// available on the queue.
  ///


  /// Returns `Err(())` if all [`Receiver`](super::Receiver)s have been
  /// dropped.
  pub async fn send_async(&self, mut n: T) {
    // In an ideal world, there'd be a SendFuture that takes in the new node.
    // However, if the queue is full the node needs to be stored somewhere
    // until the task is woken up, and this presents a few annoying challenges.
    //
    // Instead, we'll use a Future that simply waits for there to be space
    // available and when that resolves immediate all the node to the queue.
    //
    // For multithreaded runtimes it is possible for a TOCTOU issue here so we
    // need to loop until the try_push() is successful.
    loop {










      ReserveSpaceFuture {
        sh: Arc::clone(&self.sh),
        waiter: self.sh.tx_wakers.waiter()

      }
      .await;



      let mut inner = self.sh.inner.lock();




      // ReserveSpaceFuture should have made sure that space is available.
      n = match inner.q.try_push(n) {
        Ok(()) => break,
        Err(n) => n
      };



    }

    // Have a new element in queue -- wake up waiting receivers
    self.sh.wake_receiver();


  }

  /// Fallible sending.
  ///
  /// # Errors


  /// If the queue is full the node is returned.

  pub fn try_send(&self, n: T) -> Result<(), T> {
    let mut inner = self.sh.inner.lock();
    let res = inner.q.try_push(n);

    // Have a new element in queue -- wake up a waiting receiver
    if res.is_ok() {
      self.sh.wake_receiver();
    }

    drop(inner);


    res

  }

  /// Forcibly add an element to the queue.
  ///
  /// If the queue has a limit and the queue is full, then the oldest node will
  /// be removed before the new element is added.




  pub fn force_send(&self, n: T) {
    let mut inner = self.sh.inner.lock();
    inner.q.force_push(n);

    drop(inner);

    // Have a new element in queue -- wake up a waiting receiver
    self.sh.wake_receiver();


  }

  /// Forcibly add an element to rhe channel, allowing the caller to determine
  /// how overflow is handled.




  pub fn force_send_oc(&self, n: T, overflow: Overflow) {




    let mut inner = self.sh.inner.lock();
    inner.q.force_push_oc(n, overflow);

    drop(inner);

    // Have a new element in queue -- wake up a waiting receiver
    self.sh.wake_receiver();















  }
}


/// A [`Future`] that will resolve when the queue is not full.
struct ReserveSpaceFuture<T> {




  sh: Arc<Shared<T>>,
  waiter: Waiter

}

impl<T> Future for ReserveSpaceFuture<T> {




  type Output = ();
  fn poll(
    mut self: Pin<&mut Self>,
    ctx: &mut Context<'_>
  ) -> Poll<Self::Output> {
    let inner = self.sh.inner.lock();
    if let Some(max_len) = inner.q.max_len() {
      if inner.q.len() < max_len {
        Poll::Ready(())
      } else {

        drop(inner); // happy borrow-checker
        self.waiter.prime(ctx);
        Poll::Pending
      }
    } else {
      Poll::Ready(())

    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :









|


<
|
>
>
>
|


>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
>
>
>
|
>
>
>


>
>
>
>
>
>
>







>
>
>
|
<
|


>
>
|
>
>
|
|
>
>
>
>
>
>
>
>
|
|
|
>

>
>
|
>
>
|
>
>
>
|
<
|
>
>
>
>
>
|
>
|
>
>

|
<
<








>
>
|
<
|










>
>
>
>
>
>
>
>
>
>
|

|
>
|
|
>
>
|
<
>
>
>
|
<
<
|
|
<
>
>
>



|
>
>





>
>
|
>
|

|
|
<
<
<

|


>
|
>






>
>
>
>
|

|
<



|
>
>




>
>
>
>
|
>
>
>
>

|
<



|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>





|
>
>
>
>
|
|
>


|
>
>
>
>
|





<
|
|
|
>
|
|
|
|
<
|
>





1
2
3
4
5
6
7
8
9
10
11
12

13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

96
97
98
99
100
101
102
103
104
105
106
107
108


109
110
111
112
113
114
115
116
117
118
119

120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149

150
151
152
153


154
155

156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197

198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218

219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263

264
265
266
267
268
269
270
271

272
273
274
275
276
277
278
use std::{
  future::Future,
  pin::Pin,
  sync::Arc,
  task::{Context, Poll}
};

use wakerizer::Waiter;

use super::{Controller, Ctrl, Error, Overflow, Shared};

/// Channel transmitter end-point.

pub struct Sender<C, T>
where
  C: Controller<Item = T>
{
  sh: Arc<Shared<C, T>>
}

impl<C, T> Clone for Sender<C, T>
where
  C: Controller<Item = T>
{
  fn clone(&self) -> Self {
    let mut inner = self.sh.inner.lock();
    inner.tx_count += 1;
    drop(inner);

    let sh = Arc::clone(&self.sh);
    Self { sh }
  }
}

impl<C, T> Sender<C, T>
where
  C: Controller<Item = T>,
  T: Send + Sync
{
  pub(super) fn new(sh: Arc<Shared<C, T>>) -> Self {
    let mut inner = sh.inner.lock();
    inner.tx_count += 1;
    drop(inner);
    Self { sh }
  }

  /// Return a [`Ctrl`], which can be used to access the internal
  /// [`Controller`].
  #[must_use]
  pub fn ctrl(&self) -> Ctrl<C, T> {
    Ctrl(Arc::clone(&self.sh))
  }

  /// Send an element over channel.
  ///
  /// If the channel has a limit, and the limit has been reached, then block
  /// and wait until a [`Receiver`](super::Receiver) has make more room
  /// available on the queue.
  ///
  /// # Errors
  /// [`Error::CantFit`] means the node was rejected by the
  /// [`Controller`]. [`Error::Closed`] means there are no more
  /// [`Receiver`](super::Receiver)s available.

  pub fn send_blocking(&self, mut n: T) -> Result<(), Error<T>> {
    let mut inner = self.sh.inner.lock();

    // Keep trying to `try_push()` until successful.
    loop {
      if inner.rx_count == 0 {
        return Err(Error::Closed);
      }

      n = match inner.q.try_push(n) {
        Ok(()) => break,
        Err(e) => match e {
          limq::Error::WontFit(n) => n,
          limq::Error::CantFit(n) => {
            return Err(Error::CantFit(n));
          }
        }
      };
      self.sh.tx_signal.wait(&mut inner);
    }

    drop(inner);

    // Have a new element in queue -- wake up a waiting receiver
    self.sh.wake_receivers();

    Ok(())
  }

  /// This exists only because the compiler thinks `send_async()` is not `Send`
  /// (because it thinks `inner` is held past the `await`, even though it
  /// isn't.
  ///

  /// # Errors
  /// [`Error::CantFit`] means the node was rejected by the
  /// [`Controller`]. [`Error::Closed`] means there are no more
  /// [`Receiver`](super::Receiver)s available.
  fn try_push(&self, n: T) -> Result<(), Error<T>> {
    let mut inner = self.sh.inner.lock();
    if inner.rx_count == 0 {
      return Err(Error::Closed);
    }

    inner.q.try_push(n)?;
    drop(inner);
    Ok(())


  }

  /// Send an element over channel.
  ///
  /// If the channel has a limit, and the limit has been reached, then block
  /// and wait until a [`Receiver`](super::Receiver) has make more room
  /// available on the queue.
  ///
  /// # Errors
  /// [`Error::CantFit`] means the [`Controller`] rejected the node.
  /// [`Error::Closed`] means not [`Receiver`](super::Receiver)s remain.

  pub async fn send_async(&self, mut n: T) -> Result<(), Error<T>> {
    // In an ideal world, there'd be a SendFuture that takes in the new node.
    // However, if the queue is full the node needs to be stored somewhere
    // until the task is woken up, and this presents a few annoying challenges.
    //
    // Instead, we'll use a Future that simply waits for there to be space
    // available and when that resolves immediate all the node to the queue.
    //
    // For multithreaded runtimes it is possible for a TOCTOU issue here so we
    // need to loop until the try_push() is successful.
    loop {
      // Attempt to push node onto queue.
      n = match self.try_push(n) {
        Ok(()) => break,
        Err(e) => match e {
          Error::Closed => return Err(Error::Closed),
          Error::WontFit(n) => n,
          Error::CantFit(n) => return Err(Error::CantFit(n))
        }
      };

      let fut = ReserveSpaceFuture {
        sh: Arc::clone(&self.sh),
        waiter: self.sh.tx_wakers.waiter(),
        n: &n
      };
      match fut.await {
        Ok(()) => {
          // fall through
        }

        Err(e) => match e {
          limq::CheckErr::WontFit => {
            // fall through
          }


          limq::CheckErr::CantFit => {
            return Err(Error::CantFit(n));

          }
        }
      }
    }

    // Have a new element in queue -- wake up waiting receivers
    self.sh.wake_receivers();

    Ok(())
  }

  /// Fallible sending.
  ///
  /// # Errors
  /// [`Error::CantFit`] means the [`Controller`] permanently rejected the
  /// node.  [`Error::WontFit`] means the [`Controller`] temporarily
  /// rejected the node.  [`Error::Closed`] means not
  /// [`Receiver`](super::Receiver)s remain.
  pub fn try_send(&self, n: T) -> Result<(), Error<T>> {
    let mut inner = self.sh.inner.lock();
    if inner.rx_count == 0 {
      return Err(Error::Closed);



    }
    inner.q.try_push(n)?;
    drop(inner);

    self.sh.wake_receivers();

    Ok(())
  }

  /// Forcibly add an element to the queue.
  ///
  /// If the queue has a limit and the queue is full, then the oldest node will
  /// be removed before the new element is added.
  ///
  /// # Errors
  /// [`Error::CantFit`] means the [`Controller`] rejected the node.
  /// [`Error::Closed`] means not [`Receiver`](super::Receiver)s remain.
  pub fn force_send(&self, n: T) -> Result<(), Error<T>> {
    let mut inner = self.sh.inner.lock();
    inner.q.force_push(n)?;

    drop(inner);

    // Have a new element in queue -- wake up a waiting receiver
    self.sh.wake_receivers();

    Ok(())
  }

  /// Forcibly add an element to rhe channel, allowing the caller to determine
  /// how overflow is handled.
  ///
  /// # Errors
  /// [`Error::CantFit`] means the [`Controller`] rejected the node.
  /// [`Error::Closed`] means not [`Receiver`](super::Receiver)s remain.
  pub fn force_send_oc(
    &self,
    n: T,
    overflow: Overflow
  ) -> Result<(), Error<T>> {
    let mut inner = self.sh.inner.lock();
    inner.q.force_push_oc(n, overflow)?;

    drop(inner);

    // Have a new element in queue -- wake up a waiting receiver
    self.sh.wake_receivers();

    Ok(())
  }
}

impl<C, T> Drop for Sender<C, T>
where
  C: Controller<Item = T>
{
  fn drop(&mut self) {
    let mut inner = self.sh.inner.lock();
    inner.tx_count -= 1;
    drop(inner);

    self.sh.wake_receivers();
  }
}


/// A [`Future`] that will resolve when the queue is not full.
struct ReserveSpaceFuture<'n, C, T>
where
  C: Controller<Item = T>,
  T: Send
{
  sh: Arc<Shared<C, T>>,
  waiter: Waiter,
  n: &'n T
}

impl<C, T> Future for ReserveSpaceFuture<'_, C, T>
where
  C: Controller<Item = T>,
  T: Send
{
  type Output = Result<(), limq::CheckErr>;
  fn poll(
    mut self: Pin<&mut Self>,
    ctx: &mut Context<'_>
  ) -> Poll<Self::Output> {
    let inner = self.sh.inner.lock();

    match inner.q.would_fit(self.n) {
      Ok(()) => Poll::Ready(Ok(())),
      Err(e) => match e {
        limq::CheckErr::WontFit => {
          drop(inner); // happy borrow-checker
          self.waiter.prime(ctx);
          Poll::Pending
        }

        limq::CheckErr::CantFit => Poll::Ready(Err(limq::CheckErr::CantFit))
      }
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
Changes to tests/basic.rs.
1
2
3
4


5

6
7
8

9
10
11
12
13
14
15
16
17
18
19
20

21
22
23
24
25
26
27
28
29
30
31
32

33
34
35
36
37
38
39
40
41
42
43

44
45
46
47
48
49
50
51
52
53
54

55
56
57
58
59
60
61
62
63
64
65
66
67
68
use std::thread;

use tokio::task;



use limqch::channel;


#[test]
fn try_send_full() {

  let (tx, rx) = channel(Some(2));
  tx.send_blocking(1);
  tx.send_blocking(2);
  assert_eq!(tx.try_send(3), Err(3));

  assert_eq!(rx.recv_blocking(), 1);
  assert_eq!(rx.try_recv(), Some(2));
  assert_eq!(rx.try_recv(), None);
}

#[tokio::test]
async fn try_on_full() {

  let (tx, rx) = channel(Some(2));
  tx.send_async(1).await;
  tx.send_async(2).await;
  assert_eq!(tx.try_send(3), Err(3));

  assert_eq!(rx.recv_async().await, 1);
  assert_eq!(rx.try_recv(), Some(2));
  assert_eq!(rx.try_recv(), None);
}

#[tokio::test]
async fn send_from_spawned_task() {

  let (tx, rx) = channel(Some(2));

  task::spawn(async move {
    tx.send_async(1).await;
  });

  assert_eq!(rx.recv_async().await, 1);
}

#[tokio::test]
async fn blocking_send_from_thread() {

  let (tx, rx) = channel(Some(2));

  thread::spawn(move || {
    tx.send_blocking(1);
  });

  assert_eq!(rx.recv_blocking(), 1);
}

#[tokio::test]
async fn task_to_task() {

  let (tx, rx) = channel(Some(2));

  task::spawn(async move {
    tx.send_async(1).await;
  });

  task::spawn_blocking(move || {
    assert_eq!(rx.recv_blocking(), 1);
  })
  .await
  .unwrap();
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :




>
>
|
>



>
|
|
|
|

|
|
|




>
|
|
|
|

|
|
|




>
|


|


|




>
|


|


|




>
|


|



|






1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
use std::thread;

use tokio::task;

use limq::OptLenLim;

use limqch::{Error, channel};


#[test]
fn try_send_full() {
  let lenlim = OptLenLim::new(Some(2));
  let (tx, rx) = channel(lenlim);
  tx.send_blocking(1).unwrap();
  tx.send_blocking(2).unwrap();
  assert_eq!(tx.try_send(3), Err(Error::WontFit(3)));

  assert_eq!(rx.recv_blocking().unwrap(), 1);
  assert_eq!(rx.try_recv().unwrap(), Some(2));
  assert_eq!(rx.try_recv().unwrap(), None);
}

#[tokio::test]
async fn try_on_full() {
  let lenlim = OptLenLim::new(Some(2));
  let (tx, rx) = channel(lenlim);
  tx.send_async(1).await.unwrap();
  tx.send_async(2).await.unwrap();
  assert_eq!(tx.try_send(3), Err(Error::WontFit(3)));

  assert_eq!(rx.recv_async().await.unwrap(), 1);
  assert_eq!(rx.try_recv().unwrap(), Some(2));
  assert_eq!(rx.try_recv().unwrap(), None);
}

#[tokio::test]
async fn send_from_spawned_task() {
  let lenlim = OptLenLim::new(Some(2));
  let (tx, rx) = channel(lenlim);

  task::spawn(async move {
    tx.send_async(1).await.unwrap();
  });

  assert_eq!(rx.recv_async().await.unwrap(), 1);
}

#[tokio::test]
async fn blocking_send_from_thread() {
  let lenlim = OptLenLim::new(Some(2));
  let (tx, rx) = channel(lenlim);

  thread::spawn(move || {
    tx.send_blocking(1).unwrap();
  });

  assert_eq!(rx.recv_blocking().unwrap(), 1);
}

#[tokio::test]
async fn task_to_task() {
  let lenlim = OptLenLim::new(Some(2));
  let (tx, rx) = channel(lenlim);

  task::spawn(async move {
    tx.send_async(1).await.unwrap();
  });

  task::spawn_blocking(move || {
    assert_eq!(rx.recv_blocking().unwrap(), 1);
  })
  .await
  .unwrap();
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
Added tests/closed.rs.


























































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use limq::OptLenLim;

use limqch::{Error, channel};

// No receiver exists, so sending is pointless
#[test]
fn rx_closed_sync() {
  let lenlim = OptLenLim::new(Some(2));
  let (tx, rx) = channel(lenlim);

  // Drop the only receiver
  drop(rx);

  // Transmitter should now return Error::Closed
  let Err(Error::Closed) = tx.send_blocking(11) else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };

  let Err(Error::Closed) = tx.try_send(11) else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };
}

// No receiver exists, so sending is pointless
#[tokio::test]
async fn rx_closed_async() {
  let lenlim = OptLenLim::new(Some(2));
  let (tx, rx) = channel(lenlim);

  // Drop the only receiver
  drop(rx);

  // Transmitter should now return Error::Closed
  let Err(Error::Closed) = tx.send_async(11).await else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };
}

#[test]
fn tx_closed_empty_sync() {
  let lenlim = OptLenLim::<u32>::new(Some(2));
  let (tx, rx) = channel(lenlim);

  // Drop the only transmitter
  drop(tx);

  // Transmitter should now return Error::Closed, because the queue is empty
  // and there are no receivers.
  let Err(Error::Closed) = rx.recv_blocking() else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };

  let Err(Error::Closed) = rx.try_recv() else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };
}

#[tokio::test]
async fn tx_closed_empty_async() {
  let lenlim = OptLenLim::<u32>::new(Some(2));
  let (tx, rx) = channel(lenlim);

  // Drop the only transmitter
  drop(tx);

  // Transmitter should now return Error::Closed, because the queue is empty
  // and there are no receivers.
  let Err(Error::Closed) = rx.recv_async().await else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };
}

#[test]
fn tx_closed_nonempty_sync() {
  let lenlim = OptLenLim::<u32>::new(Some(2));
  let (tx, rx) = channel(lenlim);

  // Add two nodes
  tx.try_send(1).unwrap();
  tx.try_send(2).unwrap();

  // Drop the only transmitter
  drop(tx);

  // Should succeed
  let Ok(1) = rx.recv_blocking() else {
    panic!("Unexpectedly not Ok(Some(1))");
  };
  let Ok(Some(2)) = rx.try_recv() else {
    panic!("Unexpectedly not Ok(Some(2))");
  };

  // queue is now empty, so it should be returning Error::Closed
  let Err(Error::Closed) = rx.recv_blocking() else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };

  let Err(Error::Closed) = rx.try_recv() else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };
}

#[tokio::test]
async fn tx_closed_nonempty_async() {
  let lenlim = OptLenLim::<u32>::new(Some(2));
  let (tx, rx) = channel(lenlim);

  // Add a node
  tx.try_send(1).unwrap();

  // Drop the only transmitter
  drop(tx);

  // Should succeed
  let Ok(1) = rx.recv_async().await else {
    panic!("Unexpectedly not Ok(Some(1))");
  };

  // queue is now empty, so it should be returning Error::Closed
  let Err(Error::Closed) = rx.recv_async().await else {
    panic!("Unexpectedly not Err(Error::Closed)");
  };
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
Added tests/ctrl.rs.




















































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
use limq::BufLim;

use limqch::channel;

#[test]
fn access_controller() {
  let lenlim = BufLim::new(None, None);
  let (tx, _rx) = channel(lenlim);

  let max_len = tx.ctrl().with_ctrl(BufLim::get_max_len);
  assert_eq!(max_len, None);

  let max_size = tx.ctrl().with_ctrl(BufLim::get_max_size);
  assert_eq!(max_size, None);

  tx.ctrl().with_ctrl_mut(|c| c.set_max_len(Some(2)));
  tx.ctrl().with_ctrl_mut(|c| c.set_max_size(Some(8)));

  let max_len = tx.ctrl().with_ctrl(BufLim::get_max_len);
  assert_eq!(max_len, Some(2));

  let max_size = tx.ctrl().with_ctrl(BufLim::get_max_size);
  assert_eq!(max_size, Some(8));
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
Changes to www/changelog.md.
1
2
3
4
5
6
7
8
9
10
11
12
13
14















15
16
17
18
19
20
# Change Log

⚠️  indicates a breaking change.

## [Unreleased]

[Details](/vdiff?from=limqch-0.1.0&to=trunk)

### Added

### Changed

### Removed
















---

## [0.1.0] - 2025-04-01

Initial release.







|







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>






1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Change Log

⚠️  indicates a breaking change.

## [Unreleased]

[Details](/vdiff?from=limqch-0.2.0&to=trunk)

### Added

### Changed

### Removed

---

## [0.2.0] - 2025-04-11

[Details](/vdiff?from=limqch-0.1.0&to=limqch-0.2.0)

### Changed

-⚠️ Update for `limq` `0.3.0`.  Creating a limq channel now requires a
  `Controller` implementation.

### Removed

- `Spawner` was removed.

---

## [0.1.0] - 2025-04-01

Initial release.