bndpresbufch

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From bndpresbufch-0.1.0 To bndpresbufch-0.1.1

2024-10-05
03:33
Update top module example. check-in: 12d539cfc9 user: jan tags: trunk
02:37
Happy pedantic clippy. check-in: ab1ab20aac user: jan tags: bndpresbufch-0.1.1, trunk
02:34
Code normalization. check-in: b3e32c1bfc user: jan tags: trunk
02:29
Add managed receives. check-in: bfc22d1f39 user: jan tags: trunk
2024-09-13
00:47
Cleanup bacon.toml. check-in: 9f8f3f3a45 user: jan tags: bndpresbufch-0.1.0, trunk
00:47
From old repo. check-in: 1a9d91b7c9 user: jan tags: trunk

Changes to .efiles.

1
2
3
4
5
6
7
8
9
10
11

1
2
3
4
5
6
7
8
9
10
11
12











+
Cargo.toml
README.md
www/index.md
www/changelog.md
src/err.rs
src/lib.rs
src/tx.rs
src/rx.rs
tests/basics.rs
tests/force.rs
tests/closed.rs
tests/managed.rs

Changes to Cargo.toml.

1
2
3

4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

24
25
26
27
28
29
30
31
32
33
34
35
36
1
2

3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

23
24
25
26
27
28
29
30
31
32
33
34
35
36


-
+



















-
+













[package]
name = "bndpresbufch"
version = "0.1.0"
version = "0.1.1"
edition = "2021"
license = "0BSD"
# https://crates.io/category_slugs
categories = [ "concurrency", "data-structures" ]
keywords = [ "channel" ]
repository = "https://repos.qrnch.tech/pub/bndpresbufch"
description = "Bounds-preserving channel for passing buffers."
rust-version = "1.56"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "www",
  "bacon.toml",
  "rustfmt.toml"
]

[dependencies]
parking_lot = { version = "0.12.3" }
bndpresbufq = { version = "0.1.2" }
bndpresbufq = { version = "0.1.3" }
rustc-hash = { version = "2.0.0" }

[dev-dependencies]
tokio = { version = "1.40.0", features = [
  "macros", "rt-multi-thread"
] }

[lints.clippy]
all = { level = "deny", priority = -1 }
pedantic = { level = "warn", priority = -1 }
nursery = { level = "warn", priority = -1 }
cargo = { level = "warn", priority = -1 }

Changes to src/lib.rs.

38
39
40
41
42
43
44
45





46
47
48
49
50
51
52
38
39
40
41
42
43
44

45
46
47
48
49
50
51
52
53
54
55
56







-
+
+
+
+
+








use parking_lot::{Condvar, Mutex, MutexGuard};

use rustc_hash::FxHashMap;

use bndpresbufq::BndPresLimBufQ;

pub use {err::Error, rx::Receiver, tx::Sender};
pub use {
  err::Error,
  rx::{MustHandle, Receiver},
  tx::Sender
};

/// Builder for a bounds-preserving buffer channel.
#[derive(Default)]
pub struct Builder {
  max_len: Option<usize>,
  max_size: Option<usize>
}

Changes to src/rx.rs.

1
2


3
4
5
6
7
8
9


































































10
11
12
13
14
15
16
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84


+
+







+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







use std::{
  future::Future,
  mem::ManuallyDrop,
  ops::{Deref, DerefMut},
  pin::Pin,
  sync::Arc,
  task::{Context, Poll}
};

use crate::err::Error;


#[derive(Default)]
enum DropAction {
  #[default]
  ReturnToQueue,
  Drop,
  Nothing
}

/// Wrapper around elements that must be handled by the application.
pub struct MustHandle {
  sh: Arc<super::Shared>,
  inner: ManuallyDrop<Vec<u8>>,
  drop_action: DropAction
}

impl MustHandle {
  fn new(sh: Arc<super::Shared>, inner: Vec<u8>) -> Self {
    Self {
      sh,
      inner: ManuallyDrop::new(inner),
      drop_action: DropAction::default()
    }
  }

  /// Mark the inner object has handled and then drop it.
  pub fn handled(mut self) {
    self.drop_action = DropAction::Drop;
  }

  /// Remove the inner object from the `MustHandle` and return it.
  #[must_use]
  pub fn into_inner(mut self) -> Vec<u8> {
    self.drop_action = DropAction::Nothing;
    unsafe { ManuallyDrop::take(&mut self.inner) }
  }
}

impl Deref for MustHandle {
  type Target = [u8];

  fn deref(&self) -> &[u8] {
    &self.inner
  }
}

impl DerefMut for MustHandle {
  fn deref_mut(&mut self) -> &mut [u8] {
    &mut self.inner
  }
}

impl Drop for MustHandle {
  fn drop(&mut self) {
    match self.drop_action {
      DropAction::ReturnToQueue => {
        let t = unsafe { ManuallyDrop::take(&mut self.inner) };
        let mut inner = self.sh.inner.lock();
        let _ = inner.q.try_return(t);
      }
      DropAction::Drop => unsafe { ManuallyDrop::drop(&mut self.inner) },
      DropAction::Nothing => {}
    }
  }
}

/// Receiving end-point used to receive bounds-preserved buffers from a
/// [`Sender`](super::Sender) end-point.
#[repr(transparent)]
pub struct Receiver(pub(super) Arc<super::Shared>);

impl Receiver {
  /// Get next buffer in queue.
31
32
33
34
35
36
37












38
39
40
41
42
43
44
45
46
47
48
















49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

















64
65
66
67
68
69
70
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183







+
+
+
+
+
+
+
+
+
+
+
+











+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+















+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







      }
      if inner.tx_count == 0 {
        break None;
      }
      self.0.signal.wait(&mut inner);
    }
  }

  /// Take an element off the queue that must be handled by the application, or
  /// it will be returned to the queue.
  ///
  /// If no elements are available on the queue, then block and wait for one to
  /// be added.  If the queue us empty and the sender has been dropped this
  /// `None` will returned.
  #[must_use]
  pub fn pop_managed(&self) -> Option<MustHandle> {
    let n = self.pop()?;
    Some(MustHandle::new(Arc::clone(&self.0), n))
  }

  /// Attempt to get next buffer in queue.
  ///
  /// # Errors
  /// [`Error::Closed`] will be returned if the queue is empty and all
  /// sender end-points have been dropped.
  pub fn try_pop(&self) -> Result<Option<Vec<u8>>, Error> {
    let mut inner = self.0.inner.lock();
    self.0.pop(&mut inner)
  }


  /// Attempt to get next buffer in queue, wrapped in a [`MustHandle`] wrapper.
  ///
  /// # Errors
  /// [`Error::Closed`] will be returned if the queue is empty and all
  /// sender end-points have been dropped.
  pub fn try_pop_managed(&self) -> Result<Option<MustHandle>, Error> {
    let mut inner = self.0.inner.lock();
    Ok(
      self
        .0
        .pop(&mut inner)?
        .map(|n| MustHandle::new(Arc::clone(&self.0), n))
    )
  }

  /// Return a [`Future`] that will return a buffer from the queue or wait for
  /// a buffer to become available.
  ///
  /// The `Future` will resolve to `Ok(Some(Vec<u8>))` on success.
  ///
  /// # Errors
  /// [`Error::Closed`] will be returned if the queue is empty and all
  /// sender end-points have been dropped.
  #[must_use]
  pub fn apop(&self) -> RecvFuture {
    RecvFuture {
      sh: Arc::clone(&self.0),
      waker_id: None
    }
  }


  /// Return a [`Future`] that will return a buffer from the queue or wait for
  /// a buffer to become available.
  ///
  /// The `Future` will resolve to `Ok(Some(Vec<u8>))` on success.
  ///
  /// # Errors
  /// [`Error::Closed`] will be returned if the queue is empty and all
  /// sender end-points have been dropped.
  #[must_use]
  pub fn apop_managed(&self) -> RecvManagedFuture {
    RecvManagedFuture {
      sh: Arc::clone(&self.0),
      waker_id: None
    }
  }
}

impl Drop for Receiver {
  fn drop(&mut self) {
    let mut inner = self.0.inner.lock();
    inner.rx_count -= 1;

113
114
115
116
117
118
119




















































120
121
122
123
124
125
126
127
128
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293







+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+









    }
  }
}

impl Drop for RecvFuture {
  /// When a [`RecvFuture`] is dropped, make sure to deregister its waker [if
  /// registered].
  fn drop(&mut self) {
    if let Some(id) = self.waker_id.take() {
      let mut inner = self.sh.lock_inner();
      inner.rx_wakers.remove(&id);
    }
  }
}


pub struct RecvManagedFuture {
  sh: Arc<super::Shared>,
  waker_id: Option<u32>
}

impl Future for RecvManagedFuture {
  type Output = Result<MustHandle, Error>;
  fn poll(
    mut self: Pin<&mut Self>,
    ctx: &mut Context<'_>
  ) -> Poll<Self::Output> {
    let mut inner = self.sh.lock_inner();
    match self.sh.pop(&mut inner) {
      Ok(Some(buf)) => {
        let ret = MustHandle::new(Arc::clone(&self.sh), buf);
        Poll::Ready(Ok(ret))
      }
      Ok(None) => {
        // Queue is empty -- add this future to the collection of wakers and
        // return pending
        // ToDo: exhaust-deadlock
        let id = loop {
          inner.idgen = inner.idgen.wrapping_add(1);
          if !inner.rx_wakers.contains_key(&inner.idgen) {
            break inner.idgen;
          }
        };
        inner.rx_wakers.insert(id, ctx.waker().clone());

        drop(inner);

        self.waker_id = Some(id);

        Poll::Pending
      }
      Err(e) => Poll::Ready(Err(e))
    }
  }
}

impl Drop for RecvManagedFuture {
  /// When a [`RecvFuture`] is dropped, make sure to deregister its waker [if
  /// registered].
  fn drop(&mut self) {
    if let Some(id) = self.waker_id.take() {
      let mut inner = self.sh.lock_inner();
      inner.rx_wakers.remove(&id);
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Added tests/managed.rs.














































1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
use bndpresbufch::Builder;

#[test]
fn wait() {
  // Create a queue that can hold at most 4 bytes of data
  let (tx, rx) = Builder::new().max_size(4).build();

  // Fill queue up with 4 1-byte nodes
  for idx in 0..4 {
    tx.try_push([idx].into()).unwrap();
  }

  // Pull managed node off queue
  let n = rx.pop_managed().unwrap();
  assert_eq!(&*n, vec![0]);

  // Drop node, which should put it back in the channel
  drop(n);

  // Should get [0] again
  assert_eq!(rx.pop(), Some(vec![0]));
}

#[test]
fn try_pop() {
  // Create a queue that can hold at most 4 bytes of data
  let (tx, rx) = Builder::new().max_size(4).build();

  // Fill queue up with 4 1-byte nodes
  for idx in 0..4 {
    tx.try_push([idx].into()).unwrap();
  }

  // Pull managed node off queue
  let n = rx.try_pop_managed().unwrap().unwrap();
  assert_eq!(&*n, vec![0]);

  // Drop node, which should put it back in the channel
  drop(n);

  // Should get [0] again
  assert_eq!(rx.pop(), Some(vec![0]));
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to www/changelog.md.

1
2
3
4
5

6
7
8
9
10
11
12











13
14
15
16
17
18
1
2
3
4

5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29




-
+







+
+
+
+
+
+
+
+
+
+
+






# Change Log

## [Unreleased]

[Details](/vdiff?from=bndpresbufch-0.1.0&to=trunk)
[Details](/vdiff?from=bndpresbufch-0.1.1&to=trunk)

### Added

### Changed

### Removed

---

## [0.1.1] - 2024-10-05

[Details](/vdiff?from=bndpresbufch-0.1.0&to=bndpresbufch-0.1.1)

### Added

- Added ability to take buffers off channel that need to be explicitly handled
  or they will be returned to the channel.

---

## [0.1.0] - 2024-09-13

First release.