Index: .efiles ================================================================== --- .efiles +++ .efiles @@ -1,5 +1,16 @@ Cargo.toml README.md www/index.md www/changelog.md +src/err.rs src/lib.rs +src/limitors.rs +src/limitors/nullctrl.rs +src/limitors/lenlim.rs +src/limitors/optlenlim.rs +src/limitors/buflim.rs +src/limitors/byteslim.rs +tests/lenlim.rs +tests/overflow-eject.rs +tests/buflim.rs +examples/stats.rs Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,35 +1,41 @@ [package] name = "limq" -version = "0.1.3" -edition = "2021" +version = "0.4.0" +edition = "2024" license = "0BSD" # https://crates.io/category_slugs categories = [ "data-structures" ] keywords = [ "queue", "bounded" ] repository = "https://repos.qrnch.tech/pub/limq" -description = "Queue with optional maximum number of elements constraint" -rust-version = "1.56" +description = "Queue with a controller for monitoring queue elements" +rust-version = "1.85" exclude = [ ".fossil-settings", ".efiles", ".fslckout", + "examples", "www", "bacon.toml", "rustfmt.toml" ] # https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section [badges] maintenance = { status = "passively-maintained" } +[features] +bytes = ["dep:bytes"] + [dependencies] +bytes = { version = "1.10.1", optional = true } [package.metadata.docs.rs] -rustdoc-args = ["--generate-link-to-definition"] +all-features = true +rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"] [lints.clippy] -all = { level = "deny", priority = -1 } +all = { level = "warn", priority = -1 } pedantic = { level = "warn", priority = -1 } nursery = { level = "warn", priority = -1 } cargo = { level = "warn", priority = -1 } Index: README.md ================================================================== --- README.md +++ README.md @@ -1,4 +1,5 @@ # Limited Queue -_LimQ_ is a queue that can be configured to hold a maximum number of elements. +_LimQ_ is a queue that holds a `Controller` implementation that can be used to +limit/monitor the queue elements. Index: bacon.toml ================================================================== --- bacon.toml +++ bacon.toml @@ -18,10 +18,11 @@ # Run clippy on the default target [jobs.clippy] command = [ "cargo", "clippy", + "--all-features", "--color", "always", ] need_stdout = false # Run clippy on all targets @@ -39,10 +40,11 @@ # need_stdout = false [jobs.clippy-all] command = [ "cargo", "clippy", "--all-targets", + "--all-features", "--color", "always", ] need_stdout = false # This job lets you run ADDED examples/stats.rs Index: examples/stats.rs ================================================================== --- /dev/null +++ examples/stats.rs @@ -0,0 +1,204 @@ +//! Example [`Controller`] implementation that demonstrates how it can +//! be used to store queue statistics by inspecting added/removed elements. + +use std::collections::{HashMap, VecDeque}; + +use limq::{CheckErr, Controller, LimQ}; + +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +enum Color { + Red, + Blue, + Green +} + +/// Queue element. +struct Node { + clr: Color, + buf: Vec +} + +impl Node { + pub const fn new(clr: Color, buf: Vec) -> Self { + Self { clr, buf } + } +} + + +struct StatsCtrl { + /// Maximum queue length + q_max_len: usize, + + /// Queue length high water mark. + q_len_hwm: usize, + + /// Maximum queue size. + q_max_size: usize, + + /// Current queue size. + q_size: usize, + + /// Queue length high water mark. + q_size_hwm: usize, + + /// Per-color queue count. + clr_cnt_map: HashMap, + + /// Per-color queue high water mark. + clr_hwm_map: HashMap +} + +impl Default for StatsCtrl { + fn default() -> Self { + let mut clr_cnt_map = HashMap::new(); + clr_cnt_map.insert(Color::Red, 0); + clr_cnt_map.insert(Color::Green, 0); + clr_cnt_map.insert(Color::Blue, 0); + + let mut clr_hwm_map = HashMap::new(); + clr_hwm_map.insert(Color::Red, 0); + clr_hwm_map.insert(Color::Green, 0); + clr_hwm_map.insert(Color::Blue, 0); + Self { + q_max_len: 256, + q_len_hwm: 0, + q_max_size: 64 * 1024, + q_size: 0, + q_size_hwm: 0, + clr_cnt_map, + clr_hwm_map + } + } +} + +impl StatsCtrl { + /// Return the current maximum length setting. + #[must_use] + pub const fn get_max_len(&self) -> usize { + self.q_max_len + } + + /// Update the maximum queue length. + /// + /// # Panics + /// `max_len` must not be `0`. + pub fn set_max_len(&mut self, max_len: usize) { + assert_ne!(max_len, 0, "zero-length limit"); + self.q_max_len = max_len; + } + + /// Return the current maximum length setting. + #[must_use] + pub const fn get_max_size(&self) -> usize { + self.q_max_size + } + + /// Update the maximum queue length. + /// + /// # Panics + /// `max_len` must not be `0`. + pub fn set_max_size(&mut self, max_size: usize) { + assert_ne!(max_size, 0, "zero-size limit"); + self.q_max_size = max_size; + } +} + +impl Controller for StatsCtrl { + type Item = Node; + + fn size_hint(&self) -> Option { + Some(self.q_max_len) + } + + fn is_full(&self, q: &VecDeque) -> bool { + q.len() >= self.q_max_len + } + + fn is_overflow(&self, q: &VecDeque) -> bool { + q.len() > self.q_max_len + } + + fn check( + &self, + q: &VecDeque, + _item: &Self::Item + ) -> Result<(), CheckErr> { + (q.len() < self.q_max_len) + .then_some(()) + .ok_or(CheckErr::WontFit) + } + + fn reg(&mut self, q: &VecDeque, n: &Self::Item) { + let count = self.clr_cnt_map.get_mut(&n.clr).unwrap(); + *count += 1; + + let hwm = self.clr_hwm_map.get_mut(&n.clr).unwrap(); + *hwm = std::cmp::max(*count, *hwm); + + self.q_len_hwm = std::cmp::max(self.q_len_hwm, q.len()); + + // Record total queue buffer size change + self.q_size += n.buf.len(); + + self.q_size_hwm = std::cmp::max(self.q_size_hwm, self.q_size); + } + + fn dereg(&mut self, n: &Self::Item) { + let count = self.clr_cnt_map.get_mut(&n.clr).unwrap(); + *count -= 1; + + // Record total queue buffer size change + self.q_size -= n.buf.len(); + } +} + + +fn main() { + let ctrl = StatsCtrl::default(); + let mut q = LimQ::new(ctrl); + + // Increase limits from default + let ctrl = q.controller_mut(); + ctrl.set_max_len(ctrl.get_max_len() * 2); + ctrl.set_max_size(ctrl.get_max_size() * 2); + + // Add/remove some elements so the statistics are more interesting + q.try_push(Node::new(Color::Red, b"hello".to_vec())) + .unwrap(); + q.try_push(Node::new(Color::Red, b"world".to_vec())) + .unwrap(); + let _ = q.pop(); + + // Dump statistics + dump_stats(&q); +} + +fn dump_stats(q: &LimQ) { + let ctrl = q.controller(); + + println!("Length: {} high-water-mark: {}", q.len(), ctrl.q_len_hwm); + println!( + " Size: {} high-water-mark: {}", + ctrl.q_size, ctrl.q_size_hwm + ); + + println!(); + + println!( + " Red count: {} high-water-mark: {}", + ctrl.clr_cnt_map.get(&Color::Red).unwrap(), + ctrl.clr_hwm_map.get(&Color::Red).unwrap() + ); + println!( + "Green count: {} high-water-mark: {}", + ctrl.clr_cnt_map.get(&Color::Green).unwrap(), + ctrl.clr_hwm_map.get(&Color::Green).unwrap() + ); + println!( + " Blue count: {} high-water-mark: {}", + ctrl.clr_cnt_map.get(&Color::Blue).unwrap(), + ctrl.clr_hwm_map.get(&Color::Blue).unwrap() + ); +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/err.rs Index: src/err.rs ================================================================== --- /dev/null +++ src/err.rs @@ -0,0 +1,39 @@ +use std::fmt; + +/// _limq_ errors. +#[derive(PartialEq, Eq)] +pub enum Error { + /// Returned to inform caller that the node `T` will not fit at this moment. + /// + /// The assumption being that the operation can be retried later, after + /// nodes have been taken off the queue, and it might succeed. + WontFit(T), + + /// Returned to inform caller that the node `T` cannot fit. + /// + /// The asseumption is that, unless the constraints for the queue are + /// reconfigured, it will never be possible to add this node to the queue. + CantFit(T) +} + +impl std::error::Error for Error {} + +impl fmt::Debug for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::WontFit(_n) => write!(f, "Error::WontFit"), + Self::CantFit(_n) => write!(f, "Error::CantFit") + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::WontFit(_n) => write!(f, "Won't fit"), + Self::CantFit(_n) => write!(f, "Can't fit") + } + } +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -1,92 +1,286 @@ //! `LimQ` is a queue (implemented as a wrapper around [`VecDeque`]) that -//! supports an optional maximum number of elements constraint. +//! allows an internal [`Controller`] implementor to control when the +//! queue is full/overflown. +//! +//! `Controller` objects are responsible for: +//! - Determining if: +//! - the queue is "full". +//! - the queue has "overflown". +//! - a new queue element will fit in the queue or not. +//! - Registering a new element being added to the queue. +//! - Deregistering an element being removoed from the queue. +//! +//! It is up to the implementor to interpret what "full" and "overflown" means. +//! Typical use-cases are to ensure that the length of the queue is limited, or +//! a queue of `Vec` elements could keep an internal counter to ensure that +//! the total buffer size does not exceed a limit). //! //! Rather than supplying the traditional `push()` method to add elements to //! the queue, `LimQ` implements [`LimQ::try_push()`] and -//! [`LimQ::force_push()`]. If no queue limit has been enabled, both of these -//! act exactly like a traditional `push()` would. When a limit has been set, -//! and reached, `try_push()` will fail, returning the input element. -//! `force_push()` will forcibly add the new element while dropping the next -//! element in line to be pulled off the queue. +//! [`LimQ::force_push()`]/[`LimQ::force_push_oc()`]. If no queue limit has +//! been enabled, both of these act exactly like a traditional `push()` would. +//! When a limit has been set, and reached, `try_push()` will fail, returning +//! the input element. `force_push()` will forcibly add the new element while +//! dropping the next element in line to be pulled off the queue. //! +//! # Provided length limit controller //! ``` -//! use limq::LimQ; +//! use limq::{LimQ, LengthLimit, Error}; +//! +//! // Construct a controller object that will limit the length solely based on +//! // the length of the queue. +//! let limlen = LengthLimit::new(2); //! //! // Construct a queue with a maximum 2 element length limit -//! let mut q: LimQ = LimQ::new(Some(2)); +//! let mut q: LimQ<_, u32> = LimQ::new(limlen); //! //! // Add elements to fill up to queue //! q.try_push(1).unwrap(); //! q.force_push(2); //! //! // Fail to add a new node -//! assert_eq!(q.try_push(3), Err(3)); +//! assert_eq!(q.try_push(3), Err(Error::WontFit(3))); //! //! // Forcibly add node; expelling the oldest node //! q.force_push(4); //! //! // Remaining nodes should be 2 and 4 //! assert_eq!(q.pop(), Some(2)); //! assert_eq!(q.pop(), Some(4)); //! assert_eq!(q.pop(), None); //! ``` +//! +//! # Provided buffer queue limit controller +//! _limq_ comes with a limit controller [`BufLim`] that assumes the queue item +//! type is `Vec`, and which can limit the queue according to both the +//! number of buffers and the total size of all the buffers in the queue. +//! +//! ``` +//! use limq::{LimQ, BufLim, Error}; +//! +//! // Create a buffer queue that is limited to 4 buffers with a total size of +//! // 8 bytes +//! let lim = BufLim::new(Some(4), Some(8)); +//! let mut q = LimQ::new(lim); +//! +//! // Add a buffer +//! q.try_push(Vec::from("1234")).unwrap(); +//! assert_eq!(q.len(), 1); +//! assert_eq!(q.controller().size(), 4); +//! +//! // Fill up the queue (there still room for two more buffers, but it's full +//! // with regards to total buffer size). +//! q.try_push(Vec::from("5678")).unwrap(); +//! assert_eq!(q.len(), 2); +//! assert_eq!(q.controller().size(), 8); +//! +//! // Attempting to add more data will fail, because the total size limit has +//! // been reached. +//! let Err(Error::WontFit(buf)) = q.try_push(Vec::from("ab")) else { +//! panic!("Unexpectedly not Err(Error::WontFit())"); +//! }; +//! assert_eq!(buf, b"ab"); +//! assert_eq!(q.len(), 2); +//! assert_eq!(q.controller().size(), 8); +//! ``` +//! +//! # Features +//! | Feature | Function +//! |----------|---------- +//! | `bytes` | Enable `BytesLim`. + +#![cfg_attr(docsrs, feature(doc_cfg))] + +mod err; +mod limitors; use std::collections::VecDeque; -/// A queue with an optional number of maximum elements. -pub struct LimQ { - q: VecDeque, - max_len: Option -} - -impl LimQ { - /// Create a new queue with an optional limit to the number of elements. - /// - /// Passning `None` to `max_len` will disable the limit. - /// - /// # Panics - /// A zero-limit will cause a panic. - #[must_use] - pub fn new(max_len: Option) -> Self { - assert!( - !matches!(max_len, Some(0)), - "A zero-length limit is forbidden" - ); - - let q = max_len.map_or_else(VecDeque::new, VecDeque::with_capacity); - - Self { q, max_len } +pub use limitors::{ + buflim::BufLim, lenlim::LengthLimit, nullctrl::NullCtrl, + optlenlim::OptLenLim, BufQStats +}; + +#[cfg(feature = "bytes")] +#[cfg_attr(docsrs, doc(cfg(feature = "bytes")))] +pub use limitors::byteslim::BytesLim; + +pub use err::Error; + +/// Errors that can be returned by [`Controller::check()`]. +#[derive(PartialEq, Eq)] +pub enum CheckErr { + /// Returned to inform caller that a node will not fit at this moment. + /// + /// The assumption being that the operation can be retried later and it + /// might succeed. + WontFit, + + /// Returned to inform caller that the node cannot fit. + /// + /// The asseumption is that, unless the constraints for the queue are + /// reconfigured, it will never be possible to add this node to the queue. + CantFit +} + +/// Implemented for objects that are used to monitor/control queue limits. +pub trait Controller { + /// Queue element type. + type Item; + + /// Return a queue size hint. + /// + /// This is used by the [`LimQ`] factory functions to select a capacity for + /// the internal queue. + /// + /// If `None` is returned the factory will not select a specific capacity. + /// + /// Default implementation returns `None`. + fn size_hint(&self) -> Option { + None + } + + /// Check if queue is full/overflown. + /// + /// Implementor return `true` if queue is _either_ full or has overflown. + /// + /// If an implementation does not impose any upper limits, this must return + /// `false`. + fn is_full(&self, q: &VecDeque) -> bool; + + /// Check if queue is overflown. + /// + /// Returns `true` if there's _more_ data than is maximum allowed. Returns + /// `false` if the queue is exactly full or has less data. + /// + /// If an implementation does not impose any upper limits, this must return + /// `false`. + fn is_overflow(&self, q: &VecDeque) -> bool; + + /// Check whether a new item would fit in queue. + /// + /// # Errors + /// [`CheckErr::CantFit`] means the item will never, sans potential + /// `Controller` reconfiguration, be able to fit in the queue. + /// [`CheckErr::WontFit`] means the item will _currently_ not fit, but may be + /// able to do so if the operation is retried later. + fn check( + &self, + q: &VecDeque, + item: &Self::Item + ) -> Result<(), CheckErr>; + + /// Register item as included in queue. + /// + /// At the point this function is called, the item isn't actually in the + /// queue, but it is just about it be. The implementation should perform all + /// its internal book keeping assuming it has been added. + fn reg(&mut self, q: &VecDeque, item: &Self::Item); + + /// Deregister an item. + fn dereg(&mut self, item: &Self::Item); +} + +/// Control how forced push handles overflow. +#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] +pub enum Overflow { + /// Remove the oldest (front) nodes to make room for the new node. + /// + /// This effectively means that older nodes are less important than newer + /// nodes. + #[default] + RemoveFront, + + /// Remove the newest (back) nodes to make room for the new node. + /// + /// This effectivelt means that new nodes must be added to the queue, but + /// the older nodes are more important than newish ones. + /// + /// This is a weird pattern. Applications should probably be using + /// [`LimQ::try_push()`] and drop the element if it fails instead. + RemoveBack, + + /// Ignore the configured limit (if there is one) + IgnoreLimit +} + + +/// A queue with an controller that can limit the number of elements in the +/// queue based on implementor-specific criteria. +pub struct LimQ +where + C: Controller +{ + /// Inner queue implementation. + q: VecDeque, + + /// Overflow checker controller object. + oc: C +} + +impl LimQ +where + C: Controller +{ + /// Create a new queue with an overflow check object. + /// + /// The [`Controller::size_hint()`] will be called to request a default + /// queue capacity. + #[must_use] + pub fn new(oc: C) -> Self { + let q = oc + .size_hint() + .map_or_else(VecDeque::new, VecDeque::with_capacity); + Self { q, oc } } /// Clear all nodes from queue. #[inline] pub fn clear(&mut self) { - self.q.clear(); - } - - /// Change the limit of the queue. - /// - /// Does not drain overflow elements if the new limit is lower than the - /// current number of elements in queue. This can be done manually by - /// calling [`LimQ::purge_overflow()`]. - /// - /// # Panics - /// A zero-limit will cause a panic. - pub fn set_max_len(&mut self, max_len: Option) { - assert!( - !matches!(max_len, Some(0)), - "A zero-length limit is forbidden" - ); - self.max_len = max_len; - } - - /// Return the maximum number of elements in queue. - #[inline] - #[must_use] - pub const fn max_len(&self) -> Option { - self.max_len + // Don't immediate clear the queue using self.q.clear() because the + // internal controller should be given the opportunity to deregister each + // node. + // + // Self::pop() will automatically call the controller for reach element. + while let Some(_n) = self.pop() {} + } + + /// Return a reference to the internal overflow checker controller. + /// + /// ``` + /// use limq::{LimQ, OptLenLim}; + /// + /// let lenlim = OptLenLim::::new(Some(128)); + /// + /// // The LimitLenghth is moved into the `LimQ` object + /// let mut q: LimQ<_, _> = LimQ::new(lenlim); + /// + /// // In order to access the inner LengthLimit (to get the maxmimum + /// // queue length), use `LimQ::controller()`: + /// assert_eq!(q.controller().get_max_len(), Some(128)); + /// ``` + pub const fn controller(&self) -> &C { + &self.oc + } + + /// Return a mutable reference to the internal overflow checker controller. + /// + /// ``` + /// use limq::{LimQ, LengthLimit}; + /// + /// let lenlim = LengthLimit::::new(128); + /// + /// // The LimitLenghth is moved into the `LimQ` object + /// let mut q: LimQ<_, _> = LimQ::new(lenlim); + /// + /// // In order to access the inner LengthLimit (to reconfigure the maxmimum + /// // queue length), use `LimQ::controller_mut()`: + /// q.controller_mut().set_max_len(256); + /// ``` + pub const fn controller_mut(&mut self) -> &mut C { + &mut self.oc } /// Return the current number of elements in the queue. #[inline] #[must_use] @@ -104,14 +298,14 @@ /// Returns a boolean indicating whether the queue is full. /// /// If no limit has been configured, this will always return `false`. /// /// ``` - /// use limq::LimQ; + /// use limq::{LimQ, LengthLimit}; /// /// // Construct a queue with a maximum 1 element length limit - /// let mut q: LimQ = LimQ::new(Some(1)); + /// let mut q: LimQ<_, u32> = LimQ::new(LengthLimit::new(1)); /// /// assert!(!q.is_full()); /// /// // Fill queue up /// q.try_push(1).unwrap(); @@ -119,81 +313,125 @@ /// assert!(q.is_full()); /// ``` #[inline] #[must_use] pub fn is_full(&self) -> bool { - self - .max_len - .map_or(false, |max_len| self.q.len() >= max_len) + self.oc.is_full(&self.q) + } + + /// Returns a boolean indicating whether the queue has overflown. + /// + /// This behaves much like [`LimQ::is_full()`], but it differs in that + /// `LimQ::is_overflow()` does not return `true` if the queue is exactly + /// full -- it must have exceeded its maximum capacity. + /// + /// If the controller deos not enforce limits, this will always return + /// `false`. + /// + /// ``` + /// use limq::{LimQ, LengthLimit, Overflow}; + /// + /// // Construct a queue with a maximum 1 element length limit + /// let mut q: LimQ<_, u32> = LimQ::new(LengthLimit::new(1)); + /// + /// assert!(!q.is_overflow()); + /// + /// // Fill queue up + /// q.try_push(1).unwrap(); + /// + /// assert!(!q.is_overflow()); + /// + /// // Overflow queue + /// q.force_push_oc(2, Overflow::IgnoreLimit).unwrap(); + /// + /// assert!(q.is_overflow()); + /// ``` + #[inline] + #[must_use] + pub fn is_overflow(&self) -> bool { + self.oc.is_overflow(&self.q) } - /// Drop elements that overflow the queue limit (if a limit has been set). + /// Drop elements that overflow the queue limit (if controller constrains + /// the size of the queue). + /// + /// This function will remove nodes at the _front_ of the queue, i.e. the + /// _oldest_ ones, that would be next in line to be taken off the list by + /// [`LimQ::pop()`]. + /// + /// Applications shouldn't need to call this function unless they force the + /// queue to overflow or set the maximum length such that the queue is + /// overflowed. + /// + /// The returned `Iterator` will yield the purged elements. /// /// ``` - /// use limq::LimQ; + /// use limq::{LimQ, LengthLimit}; /// /// // Construct a queue with a maximum 2 element length limit - /// let mut q: LimQ = LimQ::new(Some(2)); + /// let mut q: LimQ<_, u32> = LimQ::new(LengthLimit::new(2)); /// /// // Fill queue up /// q.try_push(1).unwrap(); /// q.try_push(2).unwrap(); /// assert_eq!(q.len(), 2); /// /// // Lower limit to one element - /// q.set_max_len(Some(1)); + /// q.controller_mut().set_max_len(1); /// /// // Length will be unchanged /// assert_eq!(q.len(), 2); /// /// // .. until purged /// q.purge_overflow(); /// assert_eq!(q.len(), 1); /// ``` #[inline] - pub fn purge_overflow(&mut self) { - if let Some(max_len) = self.max_len { - while self.q.len() > max_len { - let _ = self.q.pop_front(); + pub fn purge_overflow(&mut self) -> impl Iterator { + let mut overflow = Vec::new(); + while self.oc.is_overflow(&self.q) { + if let Some(n) = self.pop() { + overflow.push(n); } } + overflow.into_iter() } /// Push a node onto queue, fail and return the node if the queue is full. /// /// For a queue with no configured limit, this is equivalent to /// [`VecDeque::push_back()`]. /// /// ``` - /// use limq::LimQ; + /// use limq::{LimQ, LengthLimit, Error}; /// /// // Construct a queue with a maximum 1 element length limit - /// let mut q: LimQ = LimQ::new(Some(1)); + /// let mut q: LimQ<_, u32> = LimQ::new(LengthLimit::new(1)); /// /// // Fill queue up /// q.try_push(42).unwrap(); /// /// // No room for new nodes -- try_push should return Err() containing /// // the node - /// assert_eq!(q.try_push(11), Err(11)); + /// assert_eq!(q.try_push(11), Err(Error::WontFit(11))); /// ``` /// /// # Errors /// The node will be returned if the queue is full. #[inline] - pub fn try_push(&mut self, n: T) -> Result<(), T> { - if let Some(max_len) = self.max_len { - if self.q.len() < max_len { + pub fn try_push(&mut self, n: T) -> Result<(), Error> { + // Call the overflow checker to determine if there's room for this node + match self.oc.check(&self.q, &n) { + Ok(()) => { + self.oc.reg(&self.q, &n); self.q.push_back(n); Ok(()) - } else { - Err(n) } - } else { - // No limit -- just push - self.q.push_back(n); - Ok(()) + Err(e) => match e { + CheckErr::WontFit => Err(Error::WontFit(n)), + CheckErr::CantFit => Err(Error::CantFit(n)) + } } } /// Forcibly push a node onto queue. /// @@ -201,15 +439,17 @@ /// room becomes available for the new node. /// /// For a queue with no configured limit, this is equivalent to /// [`VecDeque::push_back()`]. /// + /// The returned `Iterator` will yield the purged elements. + /// /// ``` - /// use limq::LimQ; + /// use limq::{LimQ, LengthLimit}; /// /// // Construct a queue with a maximum 2 element length limit - /// let mut q: LimQ = LimQ::new(Some(2)); + /// let mut q: LimQ<_, u32> = LimQ::new(LengthLimit::new(2)); /// /// // Fill queue up /// q.force_push(1); /// q.force_push(2); /// @@ -218,33 +458,120 @@ /// /// // The remaining nodes should be (in order) '2' and '3' /// assert_eq!(q.pop(), Some(2)); /// assert_eq!(q.pop(), Some(3)); /// ``` - #[inline] - pub fn force_push(&mut self, n: T) { - // Remove node(s) before pushing new node on queue to avoid reallocation in - // case - if let Some(max_len) = self.max_len { - // Make sure there's room for the new node - while self.q.len() > (max_len - 1) { - let _ = self.q.pop_front(); - } - } - self.q.push_back(n); - } - - /// Attempt to push node onto queue so it will be the next in line. - /// - /// Meant to be used to return an element to the queue that was just taken - /// off it, but is currently not available to be processed. - /// - /// ``` - /// use limq::LimQ; - /// - /// // Construct a queue with a maximum 2 element length limit - /// let mut q: LimQ = LimQ::new(Some(2)); + /// + /// # Errors + /// [`Error::CantFit`] means the node can't fit in the queue due to + /// constraints imposed by the [`Controller`]. + #[inline] + pub fn force_push( + &mut self, + n: T + ) -> Result, Error> { + self.force_push_oc(n, Overflow::default()) + } + + /// Forcibly push a node onto queue, with the ability to control how + /// overflows are handled. + /// + /// For a controller that does not impose a queue limit, this is equivalent + /// to [`VecDeque::push_back()`]. + /// + /// The returned `Iterator` will yield purged elements. + /// + /// ``` + /// use limq::{LimQ, LengthLimit, Overflow}; + /// + /// // Construct a queue with a maximum 3 element length limit + /// let mut q: LimQ<_, u32> = LimQ::new(LengthLimit::new(3)); + /// + /// // Fill queue up + /// q.try_push(1).unwrap(); + /// q.try_push(2).unwrap(); + /// q.try_push(3).unwrap(); + /// + /// // Force push, ejecting the oldest element (containing '1') + /// q.force_push_oc(4, Overflow::RemoveFront); // 2, 3, 4 + /// + /// // Next element should be `2` + /// assert_eq!(q.pop(), Some(2)); // 3, 4 + /// + /// // Fill queue up again + /// q.try_push(5).unwrap(); // 3, 4, 5 + /// + /// // Force push, ejecting the newest element (containing '5') + /// q.force_push_oc(6, Overflow::RemoveBack); // 3, 4, 6 + /// + /// // The remaining nodes should be (in order) and '3', `4` and `6` + /// assert_eq!(q.pop(), Some(3)); + /// assert_eq!(q.pop(), Some(4)); + /// assert_eq!(q.pop(), Some(6)); + /// ``` + /// + /// # Errors + /// [`Error::CantFit`] means the node can't fit in the queue due to + /// constraints imposed by the [`Controller`]. + pub fn force_push_oc( + &mut self, + n: T, + overflow: Overflow + ) -> Result, Error> { + let mut purged = Vec::new(); + + if overflow != Overflow::IgnoreLimit { + loop { + let overflown = match self.oc.check(&self.q, &n) { + Ok(()) => false, + Err(e) => match e { + CheckErr::WontFit => true, + CheckErr::CantFit => return Err(Error::CantFit(n)) + } + }; + if overflown { + match overflow { + Overflow::RemoveFront => { + if let Some(e) = self.q.pop_front() { + self.oc.dereg(&e); + purged.push(e); + } + } + Overflow::RemoveBack => { + if let Some(e) = self.q.pop_back() { + self.oc.dereg(&e); + purged.push(e); + } + } + Overflow::IgnoreLimit => { + // Can't be reached thanks to IgnoreLimit check above + unreachable!() + } + } + } else { + // Break out of purge loop + break; + } + } + } + self.oc.reg(&self.q, &n); + self.q.push_back(n); + + Ok(purged.into_iter()) + } + + /// Attempt to push node onto queue so it will be the next in line to be + /// taken off it. + /// + /// Can be used to return an element to the queue that was just taken off it, + /// but is currently not available to be processed. + /// + /// ``` + /// use limq::{LimQ, LengthLimit}; + /// + /// // Construct a queue with a maximum 2 element length limit + /// let mut q: LimQ<_, u32> = LimQ::new(LengthLimit::new(2)); /// /// // Fill queue up /// q.force_push(1); /// q.force_push(2); /// @@ -263,69 +590,65 @@ /// ``` /// /// # Errors /// If the queue is full, return the node back to caller. #[inline] - pub fn try_return(&mut self, n: T) -> Result<(), T> { - if let Some(max_len) = self.max_len { - if self.q.len() > (max_len - 1) { - return Err(n); - } - } - self.q.push_front(n); - Ok(()) - } - - /// Take node off queue. - /// - /// Returns `None` if the queue is empty. - #[inline] - pub fn pop(&mut self) -> Option { - self.q.pop_front() - } -} - - -#[cfg(test)] -mod tests { - use super::LimQ; - - #[test] - #[should_panic(expected = "A zero-length limit is forbidden")] - fn zero_len() { - let _q: LimQ<()> = LimQ::new(Some(0)); - } - - #[test] - #[should_panic(expected = "A zero-length limit is forbidden")] - fn set_zero_len() { - let mut q: LimQ<()> = LimQ::new(Some(11)); - q.set_max_len(Some(0)); - } - - #[test] - fn try_exceed() { - let mut q: LimQ = LimQ::new(Some(1)); - q.try_push(42).unwrap(); - assert_eq!(q.len(), 1); - - let Err(e) = q.try_push(11) else { - panic!("Unexpectedly not failure"); - }; - assert_eq!(e, 11); - - assert_eq!(q.pop(), Some(42)); - assert_eq!(q.len(), 0); - assert_eq!(q.pop(), None); - } - - #[test] - fn force_on_full() { - let mut q: LimQ = LimQ::new(Some(1)); - q.force_push(42); - q.force_push(11); - - assert_eq!(q.pop(), Some(11)); + pub fn try_return(&mut self, n: T) -> Result<(), Error> { + match self.oc.check(&self.q, &n) { + Ok(()) => { + self.oc.reg(&self.q, &n); + self.q.push_front(n); + Ok(()) + } + Err(e) => match e { + CheckErr::WontFit => Err(Error::WontFit(n)), + CheckErr::CantFit => Err(Error::CantFit(n)) + } + } + } + + /// Take next node off queue. + /// + /// Returns `None` if the queue is empty. + /// + /// ``` + /// use limq::{LimQ, LengthLimit, Overflow}; + /// + /// let mut q: LimQ<_, u32> = LimQ::new(LengthLimit::new(2)); + /// + /// q.try_push(1).unwrap(); + /// q.try_push(2).unwrap(); + /// + /// assert_eq!(q.pop(), Some(1)); + /// assert_eq!(q.pop(), Some(2)); + /// assert_eq!(q.pop(), None); + /// ``` + #[inline] + pub fn pop(&mut self) -> Option { + if let Some(n) = self.q.pop_front() { + self.oc.dereg(&n); + Some(n) + } else { + None + } + } + + /// Check if a node would fit. + /// + /// If applications are looking to add a node to the queue, it should avoid + /// calling this function before calling one of the push functions. The push + /// functions will check for space automatically. + /// + /// This function is intended for corner cases where an application may need + /// to reserve space without adding a ndoe. + /// + /// # Errors + /// [`CheckErr::CantFit`] means the node will never (sans reconfiguration of + /// the [`Controller`]'s implementation) fit in the queue. + /// [`CheckErr::WontFit`] means the node won't currently fit, but may do so + /// later, if queue nodes are taken off it. + pub fn would_fit(&self, n: &T) -> Result<(), CheckErr> { + self.oc.check(&self.q, n) } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/limitors.rs Index: src/limitors.rs ================================================================== --- /dev/null +++ src/limitors.rs @@ -0,0 +1,26 @@ +pub mod buflim; +pub mod lenlim; +pub mod nullctrl; +pub mod optlenlim; + +#[cfg(feature = "bytes")] +pub mod byteslim; + +use super::{CheckErr, Controller}; + +/// Statistics for controllers that contain queues of buffers. +pub struct BufQStats { + /// Current queue length. + pub len: usize, + + /// Queue length high-water mark. + pub len_hwm: usize, + + /// Current queue total buffer size. + pub size: usize, + + /// Queue buffers size high-water mark. + pub size_hwm: usize +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/limitors/buflim.rs Index: src/limitors/buflim.rs ================================================================== --- /dev/null +++ src/limitors/buflim.rs @@ -0,0 +1,154 @@ +use std::{cmp, collections::VecDeque}; + +use super::{BufQStats, CheckErr, Controller}; + + +/// A [`Controller`] for queue of `Vec` that can optionally limit the +/// queue length as well as total buffer size. +pub struct BufLim { + cur_len: usize, + max_len: Option, + len_hwm: usize, + cur_size: usize, + max_size: Option, + size_hwm: usize +} + +impl BufLim { + /// Create a new object for limiting a buffers queue to a configured length + /// and size. + /// + /// # Panics + /// `max_len` must not be `0`. `max_size` must not be `0`. + #[must_use] + pub fn new(max_len: Option, max_size: Option) -> Self { + assert!(!matches!(max_len, Some(0)), "zero-length limit"); + assert!(!matches!(max_size, Some(0)), "zero-size limit"); + Self { + cur_len: 0, + max_len, + len_hwm: 0, + cur_size: 0, + max_size, + size_hwm: 0 + } + } + + /// Return the current total size of all buffers in queue. + #[must_use] + pub const fn size(&self) -> usize { + self.cur_size + } + + /// Return the current maximum queue length configuration. + #[must_use] + pub const fn get_max_len(&self) -> Option { + self.max_len + } + + /// Return the current maximum total buffer size configuration. + #[must_use] + pub const fn get_max_size(&self) -> Option { + self.max_size + } + + /// # Panics + /// `max_len` must not be `0`. + pub fn set_max_len(&mut self, max_len: Option) { + assert!(!matches!(max_len, Some(0)), "zero-length limit"); + self.max_len = max_len; + } + + /// # Panics + /// `max_size` must not be `0`. + pub fn set_max_size(&mut self, max_size: Option) { + assert!(!matches!(max_size, Some(0)), "zero-size limit"); + self.max_size = max_size; + } + + /// Return queue statistics. + #[must_use] + pub const fn stats(&self) -> BufQStats { + BufQStats { + len: self.cur_len, + len_hwm: self.len_hwm, + size: self.cur_size, + size_hwm: self.size_hwm + } + } + + /// Set length and size high-water marks to the current length/size values. + pub const fn reset_hwm(&mut self) { + self.len_hwm = self.cur_len; + self.size_hwm = self.cur_size; + } + + /// Set length and size high-water marks to `0`. + pub const fn clear_hwm(&mut self) { + self.len_hwm = 0; + self.size_hwm = 0; + } +} + +impl Controller for BufLim { + type Item = Vec; + + fn size_hint(&self) -> Option { + self.max_len + } + + fn is_full(&self, q: &VecDeque) -> bool { + self.max_len.is_some_and(|max_len| q.len() >= max_len) + || self + .max_size + .is_some_and(|max_size| self.cur_size >= max_size) + } + + fn is_overflow(&self, q: &VecDeque) -> bool { + self.max_len.is_some_and(|max_len| q.len() > max_len) + || self + .max_size + .is_some_and(|max_size| self.cur_size > max_size) + } + + fn check( + &self, + q: &VecDeque, + n: &Self::Item + ) -> Result<(), CheckErr> { + if let Some(max_len) = self.max_len { + if q.len() + 1 > max_len { + return Err(CheckErr::WontFit); + } + } + if let Some(max_size) = self.max_size { + if n.len() > max_size { + return Err(CheckErr::CantFit); + } + if self.cur_size + n.len() > max_size { + return Err(CheckErr::WontFit); + } + } + Ok(()) + } + + fn reg(&mut self, q: &VecDeque, n: &Self::Item) { + self.cur_len += 1; + + self.len_hwm = cmp::max(self.len_hwm, q.len()); + + // At this point it has been determined that the new buffer will not + // overflow the queue. Register the buffer size and return true to tell + // caller to add the buffer. + self.cur_size += n.len(); + + self.size_hwm = cmp::max(self.size_hwm, self.cur_size); + } + + fn dereg(&mut self, n: &Self::Item) { + self.cur_len -= 1; + self.cur_size -= n.len(); + } +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/limitors/byteslim.rs Index: src/limitors/byteslim.rs ================================================================== --- /dev/null +++ src/limitors/byteslim.rs @@ -0,0 +1,156 @@ +use std::{cmp, collections::VecDeque}; + +use bytes::Bytes; + +use super::{BufQStats, CheckErr, Controller}; + +/// A [`Controller`] for queue of `Bytes` that can optionally limit the +/// queue length as well as total buffer size. +pub struct BytesLim { + cur_len: usize, + max_len: Option, + len_hwm: usize, + cur_size: usize, + max_size: Option, + size_hwm: usize +} + +impl BytesLim { + /// Create a new object for limiting a buffers queue to a configured length + /// and size. + /// + /// # Panics + /// `max_len` must not be `0`. `max_size` must not be `0`. + #[must_use] + pub fn new(max_len: Option, max_size: Option) -> Self { + assert!(!matches!(max_len, Some(0)), "zero-length limit"); + assert!(!matches!(max_size, Some(0)), "zero-size limit"); + Self { + cur_len: 0, + max_len, + len_hwm: 0, + cur_size: 0, + max_size, + size_hwm: 0 + } + } + + /// Return the current total size of all buffers in queue. + #[must_use] + pub const fn size(&self) -> usize { + self.cur_size + } + + /// Return the current maximum queue length configuration. + #[must_use] + pub const fn get_max_len(&self) -> Option { + self.max_len + } + + /// Return the current maximum total buffer size configuration. + #[must_use] + pub const fn get_max_size(&self) -> Option { + self.max_size + } + + /// # Panics + /// `max_len` must not be `0`. + pub fn set_max_len(&mut self, max_len: Option) { + assert!(!matches!(max_len, Some(0)), "zero-length limit"); + self.max_len = max_len; + } + + /// # Panics + /// `max_size` must not be `0`. + pub fn set_max_size(&mut self, max_size: Option) { + assert!(!matches!(max_size, Some(0)), "zero-size limit"); + self.max_size = max_size; + } + + /// Return queue statistics. + #[must_use] + pub const fn stats(&self) -> BufQStats { + BufQStats { + len: self.cur_len, + len_hwm: self.len_hwm, + size: self.cur_size, + size_hwm: self.size_hwm + } + } + + /// Set length and size high-water marks to the current length/size values. + pub const fn reset_hwm(&mut self) { + self.len_hwm = self.cur_len; + self.size_hwm = self.cur_size; + } + + /// Set length and size high-water marks to `0`. + pub const fn clear_hwm(&mut self) { + self.len_hwm = 0; + self.size_hwm = 0; + } +} + +impl Controller for BytesLim { + type Item = Bytes; + + fn size_hint(&self) -> Option { + self.max_len + } + + fn is_full(&self, q: &VecDeque) -> bool { + self.max_len.is_some_and(|max_len| q.len() >= max_len) + || self + .max_size + .is_some_and(|max_size| self.cur_size >= max_size) + } + + fn is_overflow(&self, q: &VecDeque) -> bool { + self.max_len.is_some_and(|max_len| q.len() > max_len) + || self + .max_size + .is_some_and(|max_size| self.cur_size > max_size) + } + + fn check( + &self, + q: &VecDeque, + n: &Self::Item + ) -> Result<(), CheckErr> { + if let Some(max_len) = self.max_len { + if q.len() + 1 > max_len { + return Err(CheckErr::WontFit); + } + } + if let Some(max_size) = self.max_size { + if n.len() > max_size { + return Err(CheckErr::CantFit); + } + if self.cur_size + n.len() > max_size { + return Err(CheckErr::WontFit); + } + } + Ok(()) + } + + fn reg(&mut self, q: &VecDeque, n: &Self::Item) { + self.cur_len += 1; + + self.len_hwm = cmp::max(self.len_hwm, q.len()); + + // At this point it has been determined that the new buffer will not + // overflow the queue. Register the buffer size and return true to tell + // caller to add the buffer. + self.cur_size += n.len(); + + self.size_hwm = cmp::max(self.size_hwm, self.cur_size); + } + + fn dereg(&mut self, n: &Self::Item) { + self.cur_len -= 1; + + self.cur_size -= n.len(); + } +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/limitors/lenlim.rs Index: src/limitors/lenlim.rs ================================================================== --- /dev/null +++ src/limitors/lenlim.rs @@ -0,0 +1,105 @@ +use std::{cmp, collections::VecDeque, marker::PhantomData}; + +use super::{CheckErr, Controller}; + +/// [`LimQ`](crate::LimQ) controller that imposes a queue length limit. +pub struct LengthLimit { + /// Keep track of queue length. + /// + /// Normally one would simple call `len()` on the queue, but the queue is + /// not available in [`LengthLimit::reset_len_hwm()`]. + q_len: usize, + + /// Maximum queue length. + max_len: usize, + + /// Queue length high-water mark. + len_hwm: usize, + + _phantom: PhantomData +} + +impl LengthLimit { + /// Create a new object for limiting queue to a configured length. + /// + /// # Panics + /// `max_len` must not be `0`. + #[must_use] + pub fn new(max_len: usize) -> Self { + assert_ne!(max_len, 0, "zero-length limit"); + Self { + q_len: 0, + max_len, + len_hwm: 0, + _phantom: PhantomData + } + } + + /// Return the current maximum length setting. + #[must_use] + pub const fn get_max_len(&self) -> usize { + self.max_len + } + + /// Update the maximum queue length. + /// + /// # Panics + /// `max_len` must not be `0`. + pub fn set_max_len(&mut self, max_len: usize) { + assert_ne!(max_len, 0, "zero-length limit"); + self.max_len = max_len; + } + + /// Retreive the current queue length high-water mark. + #[must_use] + pub const fn get_len_hwm(&self) -> usize { + self.len_hwm + } + + /// Set the queue length high-water mark to the current queue length. + pub const fn reset_len_hwm(&mut self) { + self.len_hwm = self.q_len; + } + + /// Set the queue length high-water mark to `0`. + pub const fn clear_len_hwm(&mut self) { + self.len_hwm = 0; + } +} + +impl Controller for LengthLimit { + type Item = T; + + fn size_hint(&self) -> Option { + Some(self.max_len) + } + + fn is_full(&self, q: &VecDeque) -> bool { + q.len() >= self.max_len + } + + fn is_overflow(&self, q: &VecDeque) -> bool { + q.len() > self.max_len + } + + fn check( + &self, + q: &VecDeque, + _n: &Self::Item + ) -> Result<(), CheckErr> { + (q.len() < self.max_len) + .then_some(()) + .ok_or(CheckErr::WontFit) + } + + fn reg(&mut self, _q: &VecDeque, _n: &Self::Item) { + self.q_len += 1; + self.len_hwm = cmp::max(self.len_hwm, self.q_len); + } + + fn dereg(&mut self, _n: &Self::Item) { + self.q_len -= 1; + } +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/limitors/nullctrl.rs Index: src/limitors/nullctrl.rs ================================================================== --- /dev/null +++ src/limitors/nullctrl.rs @@ -0,0 +1,38 @@ +use std::{collections::VecDeque, marker::PhantomData}; + +use super::{CheckErr, Controller}; + +/// [`LimQ`](crate::LimQ) controller that does not impose any limit. +pub struct NullCtrl { + _phantom: PhantomData +} + +impl Controller for NullCtrl { + type Item = T; + + fn size_hint(&self) -> Option { + None + } + + fn is_full(&self, _q: &VecDeque) -> bool { + false + } + + fn is_overflow(&self, _q: &VecDeque) -> bool { + false + } + + fn check( + &self, + _q: &VecDeque, + _item: &Self::Item + ) -> Result<(), CheckErr> { + Ok(()) + } + + fn reg(&mut self, _q: &VecDeque, _n: &Self::Item) {} + + fn dereg(&mut self, _n: &Self::Item) {} +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/limitors/optlenlim.rs Index: src/limitors/optlenlim.rs ================================================================== --- /dev/null +++ src/limitors/optlenlim.rs @@ -0,0 +1,99 @@ +use std::{cmp, collections::VecDeque, marker::PhantomData}; + +use super::{CheckErr, Controller}; + +/// [`LimQ`](crate::LimQ) controller that imposes an _optional_ queue length +/// limit. +pub struct OptLenLim { + q_len: usize, + max_len: Option, + len_hwm: usize, + _phantom: PhantomData +} + +impl OptLenLim { + /// Create a new object for limiting queue to a configured length. + /// + /// # Panics + /// `max_len` must not be `0`. + #[must_use] + pub fn new(max_len: Option) -> Self { + assert!(!matches!(max_len, Some(0)), "zero-length limit"); + Self { + q_len: 0, + max_len, + len_hwm: 0, + _phantom: PhantomData + } + } + + /// Return the current maximum length setting. + #[must_use] + pub const fn get_max_len(&self) -> Option { + self.max_len + } + + /// Update the maximum queue length. + /// + /// # Panics + /// `max_len` must not be `0`. + pub fn set_max_len(&mut self, max_len: Option) { + assert!(!matches!(max_len, Some(0)), "zero-length limit"); + self.max_len = max_len; + } + + /// Retreive the current queue length high-water mark. + #[must_use] + pub const fn get_len_hwm(&self) -> usize { + self.len_hwm + } + + /// Set the queue length high-water mark to the current queue length. + pub const fn reset_len_hwm(&mut self) { + self.len_hwm = self.q_len; + } + + /// Set the queue length high-water mark to `0`. + pub const fn clear_len_hwm(&mut self) { + self.len_hwm = 0; + } +} + +impl Controller for OptLenLim { + type Item = T; + + fn size_hint(&self) -> Option { + self.max_len + } + + fn is_full(&self, q: &VecDeque) -> bool { + self.max_len.is_some_and(|max_len| q.len() >= max_len) + } + + fn is_overflow(&self, q: &VecDeque) -> bool { + self.max_len.is_some_and(|max_len| q.len() > max_len) + } + + fn check( + &self, + q: &VecDeque, + _n: &Self::Item + ) -> Result<(), CheckErr> { + self + .max_len + .is_some_and(|max_len| q.len() < max_len) + .then_some(()) + .ok_or(CheckErr::WontFit) + } + + fn reg(&mut self, _q: &VecDeque, _n: &Self::Item) { + self.q_len += 1; + self.len_hwm = cmp::max(self.len_hwm, self.q_len); + } + + fn dereg(&mut self, _n: &Self::Item) { + self.q_len -= 1; + } +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED tests/buflim.rs Index: tests/buflim.rs ================================================================== --- /dev/null +++ tests/buflim.rs @@ -0,0 +1,102 @@ +use limq::{BufLim, Error, LimQ}; + +#[test] +#[should_panic(expected = "zero-length limit")] +fn zero_len() { + let _lenlim = BufLim::new(Some(0), Some(1)); +} + +#[test] +#[should_panic(expected = "zero-size limit")] +fn zero_size() { + let _lenlim = BufLim::new(Some(1), Some(0)); +} + +#[test] +fn fill() { + // Create a buffer queue that is limited to 2 buffers with a total size of 8 + // bytes + let lim = BufLim::new(Some(2), Some(8)); + let mut q = LimQ::new(lim); + + q.try_push(Vec::from("1234")).unwrap(); + assert_eq!(q.len(), 1); + assert_eq!(q.controller().size(), 4); + + q.try_push(Vec::from("5678")).unwrap(); + assert_eq!(q.len(), 2); + assert_eq!(q.controller().size(), 8); +} + +#[test] +fn clean() { + // Create a buffer queue that is limited to 2 buffers with a total size of 8 + // bytes + let lim = BufLim::new(Some(2), Some(8)); + let mut q = LimQ::new(lim); + + q.try_push(Vec::from("1234")).unwrap(); + assert_eq!(q.len(), 1); + assert_eq!(q.controller().size(), 4); + + q.try_push(Vec::from("5678")).unwrap(); + assert_eq!(q.len(), 2); + assert_eq!(q.controller().size(), 8); + + // Clearing queue must clear the controller, in case it has its own + // counter(s) + q.clear(); + assert_eq!(q.len(), 0); + assert_eq!(q.controller().size(), 0); +} + +#[test] +fn try_exceed_len() { + // Create a buffer queue that is limited to 2 buffers with a total size of 8 + // bytes + let lim = BufLim::new(Some(2), Some(8)); + let mut q = LimQ::new(lim); + + q.try_push(Vec::from("1234")).unwrap(); + assert_eq!(q.len(), 1); + assert_eq!(q.controller().size(), 4); + + q.try_push(Vec::from("56")).unwrap(); + assert_eq!(q.len(), 2); + assert_eq!(q.controller().size(), 6); + + let Err(Error::WontFit(buf)) = q.try_push(Vec::from("78")) else { + panic!("Unexpectedly not Err()"); + }; + assert_eq!(&buf, b"78"); +} + +#[test] +fn try_exceed_size() { + // Create a buffer queue that is limited to 2 buffers with a total size of 8 + // bytes + let lim = BufLim::new(Some(2), Some(8)); + let mut q = LimQ::new(lim); + + q.try_push(Vec::from("1234")).unwrap(); + assert_eq!(q.len(), 1); + assert_eq!(q.controller().size(), 4); + + let Err(Error::WontFit(buf)) = q.try_push(Vec::from("56789")) else { + panic!("Unexpectedly not Err()"); + }; + assert_eq!(&buf, b"56789"); +} + +#[test] +fn cant_fit() { + let lim = BufLim::new(Some(2), Some(8)); + let mut q = LimQ::new(lim); + + let Err(Error::CantFit(buf)) = q.try_push(Vec::from("123456789")) else { + panic!("Unexpectedly not Err(Error::CantFit())"); + }; + assert_eq!(&buf, b"123456789"); +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED tests/lenlim.rs Index: tests/lenlim.rs ================================================================== --- /dev/null +++ tests/lenlim.rs @@ -0,0 +1,88 @@ +use limq::{Error, LengthLimit, LimQ}; + +#[test] +#[should_panic(expected = "zero-length limit")] +fn zero_len() { + let _lenlim = LengthLimit::<()>::new(0); +} + +#[test] +#[should_panic(expected = "zero-length limit")] +fn set_zero_len() { + let lenlim = LengthLimit::<()>::new(11); + let mut q: LimQ<_, _> = LimQ::new(lenlim); + q.controller_mut().set_max_len(0); +} + +#[test] +fn try_exceed() { + let lenlim = LengthLimit::new(1); + let mut q: LimQ<_, u32> = LimQ::new(lenlim); + + q.try_push(42).unwrap(); + assert_eq!(q.len(), 1); + + let Err(Error::WontFit(e)) = q.try_push(11) else { + panic!("Unexpectedly not failure"); + }; + assert_eq!(e, 11); + + assert_eq!(q.pop(), Some(42)); + assert_eq!(q.len(), 0); + assert_eq!(q.pop(), None); +} + +#[test] +fn force_on_full() { + let lenlim = LengthLimit::new(1); + let mut q: LimQ<_, u32> = LimQ::new(lenlim); + + let _ = q.force_push(42).unwrap(); + let _ = q.force_push(11).unwrap(); + + assert_eq!(q.pop(), Some(11)); +} + +#[test] +fn try_return_success() { + let lenlim = LengthLimit::new(2); + let mut q: LimQ<_, u32> = LimQ::new(lenlim); + + q.try_push(1).unwrap(); + q.try_push(2).unwrap(); + + let n = q.pop().unwrap(); + assert_eq!(n, 1); + + q.try_return(n).unwrap(); + + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); +} + +#[test] +fn try_return_fail() { + let lenlim = LengthLimit::new(2); + let mut q: LimQ<_, u32> = LimQ::new(lenlim); + + q.try_push(1).unwrap(); + q.try_push(2).unwrap(); + + let n = q.pop().unwrap(); + assert_eq!(n, 1); + + // fill up queue again + q.try_push(3).unwrap(); + + let Err(Error::WontFit(n)) = q.try_return(n) else { + panic!("Unexpectedly successful"); + }; + assert_eq!(n, 1); + + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), None); +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED tests/overflow-eject.rs Index: tests/overflow-eject.rs ================================================================== --- /dev/null +++ tests/overflow-eject.rs @@ -0,0 +1,90 @@ +use limq::{BufLim, LengthLimit, LimQ, Overflow}; + +#[test] +fn force_eject_front() { + let lenlim = LengthLimit::new(2); + let mut q: LimQ<_, u32> = LimQ::new(lenlim); + + // --front-- + let _ = q.force_push_oc(71, Overflow::RemoveFront).unwrap(); + let _ = q.force_push_oc(42, Overflow::RemoveFront).unwrap(); + + // 71 will be removed + let it = q.force_push_oc(11, Overflow::RemoveFront).unwrap(); + // --back-- + + let purged: Vec = it.collect(); + assert_eq!(purged, [71]); + + assert_eq!(q.pop(), Some(42)); +} + +#[test] +fn force_eject_back() { + let lenlim = LengthLimit::new(2); + let mut q: LimQ<_, u32> = LimQ::new(lenlim); + + // --front-- + let _ = q.force_push_oc(71, Overflow::RemoveBack).unwrap(); + let _ = q.force_push_oc(42, Overflow::RemoveBack).unwrap(); + + // 42 will be removed + let _ = q.force_push_oc(11, Overflow::RemoveBack).unwrap(); + // --back-- + + assert_eq!(q.pop(), Some(71)); +} + +#[test] +fn force_ignore_limit() { + let lenlim = LengthLimit::new(2); + let mut q: LimQ<_, u32> = LimQ::new(lenlim); + + // --front-- + let _ = q.force_push_oc(71, Overflow::IgnoreLimit).unwrap(); + let _ = q.force_push_oc(42, Overflow::IgnoreLimit).unwrap(); + let _ = q.force_push_oc(11, Overflow::IgnoreLimit).unwrap(); + // --back-- + + assert_eq!(q.len(), 3); +} + +/// Illustrate a bug in 0.3.0. +/// +/// A force push that needs to purge old elements only purges a single element. +#[test] +fn bug_check_0_3_0_eject_multiple() { + // Limit buffer queue to 2 buffers and a total of 4 bytes + let lenlim = BufLim::new(Some(2), Some(4)); + let mut q = LimQ::new(lenlim); + + // Fill up queue + let _ = q + .force_push_oc(b"12".to_vec(), Overflow::IgnoreLimit) + .unwrap(); + let _ = q + .force_push_oc(b"34".to_vec(), Overflow::IgnoreLimit) + .unwrap(); + + // Overflow it with two elements + let _ = q + .force_push_oc(b"56".to_vec(), Overflow::IgnoreLimit) + .unwrap(); + let _ = q + .force_push_oc(b"78".to_vec(), Overflow::IgnoreLimit) + .unwrap(); + + assert_eq!(q.len(), 4); + + // Force push, but time enforce limits + let it = q + .force_push_oc(b"9a".to_vec(), Overflow::RemoveFront) + .unwrap(); + let arr: Vec> = it.collect(); + assert_eq!(&arr, &[b"12", b"34", b"56"]); + + // 0.3.1 bugfix makes this 2, as it should be. + assert_eq!(q.len(), 2); +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -1,16 +1,110 @@ # Change Log +⚠️ indicates a breaking change. + ## [Unreleased] -[Details](/vdiff?from=limq-0.1.3&to=trunk) +[Details](/vdiff?from=limq-0.4.0&to=trunk) + +### Added + +### Changed + +### Removed + +--- + +## [0.4.0] - 2025-04-12 + +[Details](/vdiff?from=limq-0.3.1&to=limq-0.4.0) + +### Changed + +- ⚠️ Methods that can purge elements due to `Controller` constraints now return + an `Iterator` that will yield the purged elements. + +--- + +## [0.3.1] - 2025-04-12 + +[Details](/vdiff?from=limq-0.3.0&to=limq-0.3.1) + +### Added + +- Add test to demonstrate a `force_push_oc()` bug in 0.3.0. + +### Changed + +- Bugfix: `force_push_oc()` will eject multiple elements as needed. + +--- + +## [0.3.0] - 2025-04-11 + +[Details](/vdiff?from=limq-0.2.0&to=limq-0.3.0) + +### Added + +- For `LengthLimit` and `OptLenLim` `clear_len_hwm()` takes the place of the + old `reset_len_hwm()` behavior. +- `BytesLim` now tracks length/size high-water marks. +- Added `LimQ::is_overflow()` which simply forwards the call to + `Controller::is_overflow()`. + +### Changed + +- For `LengthLimit` and `OptLenLim` `reset_len_hwm()` changed to reset to + current queue length. +- ⚠️ `NullLimit` renamed to `NullCtrl`. Arguably not a breaking change + because `NullLimit`/`NullCtrl` should be a test/debug tool. +- ⚠️ Renamed `BufLimStats` to `BufQStats`. + +--- + +## [0.2.0] - 2025-04-08 + +[Details](/vdiff?from=limq-0.1.4&to=limq-0.2.0) ### Added + +- `Controller` is a trait that allows application-defined logic to + determine when a queue is full and to allow queue elements to be monitored + as they are being added/removed to/from queue. +- Predefined `Controller` implementations: + - `NullLimit` imposes no limits. (Exists for testing purposes). + - `LengthLimit` imposes a limit. + - `OptLenLim` can be used to mimic the `0.1.x` version behavior. + - `BufLim` is used for queues carrying `Vec` items, where both a limit + on the number of buffers and the total size of all buffers can be enforced. + - `BytesLim` (requires feature `bytes`) serves the same role as `BufLim`, but + it uses `bytes::Bytes` rather than `Vec`. +- An `err` module with an `Error` type has been added to return specific error + states. ### Changed +- ⚠️ Major redesign for modularity. + - The queue length is no longer enforced by `LimQ`, instead it takes in a + `Controller` implementation that can use application-defined logic + to termine when the queue is full. + ### Removed + +- ⚠️ `LimQ::set_max_len` and `LimQ::max_len` removed due to shifting the burden + of limit management to `Controller`. + +--- + +## [0.1.4] - 2025-04-01 + +[Details](/vdiff?from=limq-0.1.3&to=limq-0.1.4) + +### Added + +- Add `LimQ::force_push_oc()` to allow caller to control wether overflows + should remove front, back or no node. --- ## [0.1.3] - 2024-10-05 Index: www/index.md ================================================================== --- www/index.md +++ www/index.md @@ -1,11 +1,21 @@ # Limited Queue -_LimQ_ is a queue that can be configured to hold a maximum number of elements. +_LimQ_ is a queue that's implemented as a wrapper around std's `VecDeque`. It +offers `try_push()` and `force_push()` as alternatives to the traditional +`push()` operation, and takes in a `Controller` implementation that can be +used to limit/monitor queue elements. + + +## Feature labels in documentation + +The crate's documentation uses automatically generated feature labels, which +currently requires nightly featuers. To build the documentation locally use: -It's implemented as a wrapper around std's `VecDeque`, and offers `try_push()` -and `force_push()` as alternatives to the traditional `push()` operation. +``` +RUSTFLAGS="--cfg docsrs" RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features +``` ## Change log The details of changes can always be found in the timeline, but for a