Index: .efiles ================================================================== --- .efiles +++ .efiles @@ -11,5 +11,6 @@ src/sqlfuncs.rs src/futures.rs src/iox.rs src/tokiox.rs src/tokiox/tcpconn.rs +src/serde_parsers.rs Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,36 +1,54 @@ [package] name = "orphanage" -version = "0.0.2" +version = "0.0.3" edition = "2021" license = "0BSD" +# https://crates.io/category_slugs +categories = [ "network-programming" ] keywords = [ "sqlite", "fs", "path" ] repository = "https://repos.qrnch.tech/pub/orphanage" description = "Random collection of stuff that is still searching for a home." -rust-version = "1.64" +rust-version = "1.74" exclude = [ ".fossil-settings", ".efiles", ".fslckout", "www", + "bacon.toml", "rustfmt.toml" ] + +# https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section +[badges] +maintenance = { status = "experimental" } [features] tokio = ["dep:tokio", "dep:async-trait", "dep:killswitch"] rusqlite = ["dep:rusqlite", "dep:sha2"] +serde = ["dep:serde", "dep:parse-size"] [dependencies] -async-trait = { version = "0.1.77", optional = true } +async-trait = { version = "0.1.82", optional = true } killswitch = { version = "0.4.2", optional = true } +parse-size = { version = "1.0.0", optional = true } rand = { version = "0.8.5" } -rusqlite = { version = "0.30.0", optional = true, features = ["functions"] } +rusqlite = { version = "0.32.1", optional = true, features = ["functions"] } +serde = { version = "1.0.210", optional = true, features = ["derive"] } sha2 = { version = "0.10.7", optional = true } shellexpand = { version = "3.1.0" } -tokio = { version = "1.36.0", optional = true, features = [ - "macros","net", "time" +tokio = { version = "1.40.0", optional = true, features = [ + "macros", "net", "time" ] } [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"] +[lints.clippy] +all = { level = "deny", priority = -1 } +pedantic = { level = "warn", priority = -1 } +nursery = { level = "warn", priority = -1 } +cargo = { level = "warn", priority = -1 } + +multiple_crate_versions = "allow" + ADDED bacon.toml Index: bacon.toml ================================================================== --- /dev/null +++ bacon.toml @@ -0,0 +1,107 @@ +# This is a configuration file for the bacon tool +# +# Bacon repository: https://github.com/Canop/bacon +# Complete help on configuration: https://dystroy.org/bacon/config/ +# You can also check bacon's own bacon.toml file +# as an example: https://github.com/Canop/bacon/blob/main/bacon.toml + +# For information about clippy lints, see: +# https://github.com/rust-lang/rust-clippy/blob/master/README.md + +#default_job = "check" +default_job = "clippy-all" + +[jobs.check] +command = ["cargo", "check", "--color", "always"] +need_stdout = false + +[jobs.check-all] +command = ["cargo", "check", "--all-targets", "--color", "always"] +need_stdout = false + +# Run clippy on the default target +[jobs.clippy] +command = [ + "cargo", "clippy", + "--all-features", + "--color", "always", +] +need_stdout = false + +# Run clippy on all targets +# To disable some lints, you may change the job this way: +# [jobs.clippy-all] +# command = [ +# "cargo", "clippy", +# "--all-targets", +# "--color", "always", +# "--", +# "-A", "clippy::bool_to_int_with_if", +# "-A", "clippy::collapsible_if", +# "-A", "clippy::derive_partial_eq_without_eq", +# ] +# need_stdout = false +[jobs.clippy-all] +command = [ + "cargo", "clippy", + "--all-features", + "--all-targets", + "--color", "always", +] +need_stdout = false + +# This job lets you run +# - all tests: bacon test +# - a specific test: bacon test -- config::test_default_files +# - the tests of a package: bacon test -- -- -p config +[jobs.test] +command = [ + "cargo", "test", "--color", "always", + "--", "--color", "always", # see https://github.com/Canop/bacon/issues/124 +] +need_stdout = true + +[jobs.doc] +command = ["cargo", "doc", "--color", "always", "--no-deps"] +need_stdout = false + +# If the doc compiles, then it opens in your browser and bacon switches +# to the previous job +[jobs.doc-open] +command = ["cargo", "doc", "--color", "always", "--no-deps", "--open"] +need_stdout = false +on_success = "back" # so that we don't open the browser at each change + +# You can run your application and have the result displayed in bacon, +# *if* it makes sense for this crate. +# Don't forget the `--color always` part or the errors won't be +# properly parsed. +# If your program never stops (eg a server), you may set `background` +# to false to have the cargo run output immediately displayed instead +# of waiting for program's end. +[jobs.run] +command = [ + "cargo", "run", + "--color", "always", + # put launch parameters for your program behind a `--` separator +] +need_stdout = true +allow_warnings = true +background = true + +# This parameterized job runs the example of your choice, as soon +# as the code compiles. +# Call it as +# bacon ex -- my-example +[jobs.ex] +command = ["cargo", "run", "--color", "always", "--example"] +need_stdout = true +allow_warnings = true + +# You may define here keybindings that would be specific to +# a project, for example a shortcut to launch a specific job. +# Shortcuts to internal functions (scrolling, toggling, etc.) +# should go in your personal global prefs.toml file instead. +[keybindings] +# alt-m = "job:my-job" +c = "job:clippy-all" # comment this to have 'c' run clippy on only the default target Index: src/buf.rs ================================================================== --- src/buf.rs +++ src/buf.rs @@ -4,10 +4,11 @@ // MaybeUninit, which it is correct about. However, the rand crate maintainers // think that filling MaybeUninit is bad and that application should be forced // to double-initialize buffers, which is obviously wrong. But it's their // crate, so we're doing it this way instead. #[allow(clippy::uninit_vec)] +#[must_use] pub fn random(len: usize) -> Vec { let mut buf = Vec::with_capacity(len); // SAFETY: Presumably with_capacity() works as documented. unsafe { Index: src/err.rs ================================================================== --- src/err.rs +++ src/err.rs @@ -5,31 +5,32 @@ BadFormat(String), IO(String) } impl Error { + #[allow(clippy::needless_pass_by_value)] pub fn bad_format(s: S) -> Self { - Error::BadFormat(s.to_string()) + Self::BadFormat(s.to_string()) } } impl std::error::Error for Error {} impl From for Error { fn from(err: io::Error) -> Self { - Error::IO(err.to_string()) + Self::IO(err.to_string()) } } impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - Error::BadFormat(s) => { - write!(f, "Bad format error; {}", s) + Self::BadFormat(s) => { + write!(f, "Bad format error; {s}") } - Error::IO(s) => { - write!(f, "I/O error; {}", s) + Self::IO(s) => { + write!(f, "I/O error; {s}") } } } } Index: src/fs.rs ================================================================== --- src/fs.rs +++ src/fs.rs @@ -4,10 +4,13 @@ }; use crate::err::Error; /// Create a random file of a specified size. +/// +/// # Errors +/// [`std::io::Error`] pub fn rndfile( fname: impl AsRef, size: u64 ) -> Result { let mut i = super::iox::RngReader::with_lim(size); @@ -15,10 +18,11 @@ std::io::copy(&mut i, &mut o) } /// Expand a string directory, make it absolute and create it (if it does not /// exist). +#[allow(clippy::missing_errors_doc)] pub fn gen_absdir(input: impl AsRef) -> Result { let pth = super::path::expabs(input)?; if !pth.exists() { std::fs::create_dir_all(&pth)?; @@ -35,10 +39,11 @@ } /// Given an input path, attempt to remove its containing (parent) directory. /// /// If successful, return a `PathBuf` of the parent directory. +#[allow(clippy::missing_errors_doc)] pub fn rm_containing( pth: impl AsRef ) -> Result { if let Some(parent) = pth.as_ref().parent() { std::fs::remove_dir(parent)?; @@ -50,13 +55,14 @@ )) } } /// Return the absolute path to an existing filesystem object. +#[allow(clippy::missing_errors_doc)] pub fn abspath

(pth: P) -> Result where P: AsRef { Ok(pth.as_ref().canonicalize()?) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/futures.rs ================================================================== --- src/futures.rs +++ src/futures.rs @@ -2,52 +2,14 @@ /// If `f` is `Some`, then `await` its inner value. Otherwise return /// `Pending`. pub async fn if_some(f: &mut Option) -> Option where - F: Future + Unpin + F: Future + Unpin + Send { match f.as_mut() { Some(fut) => Some(fut.await), None => std::future::pending().await } } -/* -/// If a timeout has been specified, sleep until then. -/// -/// Otherwise wait forever. -/// -/// Can be used to timeout select loops. -/// -/// ``` -/// let timeout_at = if do_timeout { -/// Some(Instant::now() + Duration::from_secs(8)); -/// } else { -/// None -/// }; -/// loop { -/// tokio::select!{ -/// msg = framed.next() => { -/// } -/// _ = sleep_if_some("timeout", timeout_at) => { -/// break; -/// } -/// } -/// } -/// ``` -async fn sleep_if_some(what: &str, at: Option) { - if let Some(to) = at { - if tokio::time::Instant::now() >= to { - //eprintln!("Leave {} sleep early!", what); - return; - } - //eprintln!("{} sleeping until {:?}", what, to); - tokio::time::sleep_until(to).await; - } else { - //eprintln!("{} sleeping indefinitely..", what); - let () = std::future::pending().await; - } -} -*/ - // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/iox.rs ================================================================== --- src/iox.rs +++ src/iox.rs @@ -4,17 +4,19 @@ #[derive(Default)] pub struct RngReader(Option); impl RngReader { /// Create an `RngReader` that will keep on yielding random data infinitely. + #[must_use] pub fn new() -> Self { Self::default() } /// Create an `RngReader` that will return a specified amount of random data, /// after which the reader will return eof. - pub fn with_lim(size: u64) -> Self { + #[must_use] + pub const fn with_lim(size: u64) -> Self { Self(Some(size)) } } impl std::io::Read for RngReader { @@ -23,17 +25,18 @@ if *remain == 0 { // signal eof Ok(0) } else { let n = std::cmp::min(*remain, buf.len() as u64); - rand::thread_rng().fill(&mut buf[..(n as usize)]); + let len = usize::try_from(n).unwrap(); + rand::thread_rng().fill(&mut buf[..len]); *remain -= n; - Ok(n as usize) + Ok(len) } } else { rand::thread_rng().fill(&mut buf[..]); Ok(buf.len()) } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -6,11 +6,10 @@ pub mod futures; pub mod iox; pub mod path; pub mod strx; - #[cfg(feature = "rusqlite")] #[cfg_attr(docsrs, doc(cfg(feature = "rusqlite")))] pub mod sqlfuncs; #[cfg(feature = "tokio")] @@ -18,9 +17,13 @@ pub mod tokiox; #[cfg(feature = "tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] pub use async_trait::async_trait; + +#[cfg(feature = "serde")] +#[cfg_attr(docsrs, doc(cfg(feature = "serde")))] +pub mod serde_parsers; pub use err::Error; // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/path.rs ================================================================== --- src/path.rs +++ src/path.rs @@ -10,22 +10,26 @@ use crate::err::Error; /// Expand a string path and return it as a `PathBuf`. /// /// The expansion is done using [`shellexpand::full()`]. +/// +/// # Errors +/// [`Error::BadFormat`] means the path could not be expanded. pub fn expand(input: impl AsRef) -> Result { match shellexpand::full(&input) { Ok(value) => Ok(PathBuf::from(value.into_owned())), - Err(e) => Err(Error::BadFormat(format!("Unable to expand path; {}", e))) + Err(e) => Err(Error::BadFormat(format!("Unable to expand path; {e}"))) } } /// Expand a string path to absolute path and return it as a `PathBuf`. /// /// The expansion is performed by [`expand()`]. If the path is determined to /// be relative it is made absolute by calling [`std::env::current_dir()`] and /// joining it with the relative path. +#[allow(clippy::missing_errors_doc)] pub fn expabs(input: impl AsRef) -> Result { let exppth = expand(input)?; abspath(exppth) } @@ -32,10 +36,11 @@ /// Return the full path to an input path. /// /// If the input path is absolute this function will return a `PathBuf` /// version of the input. Otherwise the input path will be appended to the /// current working directory to generate the returned `PathBuf`. +#[allow(clippy::missing_errors_doc)] pub fn abspath

(pth: P) -> Result where P: AsRef { fn inner(pth: &Path) -> Result { ADDED src/serde_parsers.rs Index: src/serde_parsers.rs ================================================================== --- /dev/null +++ src/serde_parsers.rs @@ -0,0 +1,148 @@ +//! ``` +//! use serde::{Deserialize}; +//! +//! use orphanage::serde_parsers::*; +//! +//! #[derive(Debug, Deserialize)] +//! struct Config { +//! /// Support `count = 10000` and `count = "10K"` +//! count: Count, +//! +//! /// Support optional `count = 10000` and `count = "10K"` +//! count_opt: Option, +//! +//! /// Support `binsize = 65536` and `binsize = "64KB"` +//! binsize: BinSize, +//! +//! /// Support optional `binsize = 65536` and `binsize = "64KB"` +//! binsize_opt: Option, +//! +//! /// Support `decsize = 20000` and `decsize = "20KB"` +//! decsize: DecSize, +//! +//! /// Support optional `decsize = 20000` and `decsize = "20KB"` +//! decsize_opt: Option +//! } +//! ``` + +use serde::{de::Deserializer, Deserialize}; + +#[derive(Debug, Default, PartialEq, Eq)] +pub struct Count(pub u64); + +impl Count { + #[must_use] + pub const fn get(&self) -> u64 { + self.0 + } +} + +#[derive(Debug, Default, PartialEq, Eq)] +pub struct BinSize(pub u64); + +impl BinSize { + #[must_use] + pub const fn get(&self) -> u64 { + self.0 + } +} + +#[derive(Debug, Default, PartialEq, Eq)] +pub struct DecSize(pub u64); + +impl DecSize { + #[must_use] + pub const fn get(&self) -> u64 { + self.0 + } +} + + +#[derive(Deserialize)] +#[serde(untagged)] +enum StrOrU64 { + U64(u64), + Str(String) +} + + +#[allow(clippy::missing_errors_doc)] +pub fn count<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de> +{ + match StrOrU64::deserialize(deserializer)? { + StrOrU64::U64(v) => Ok(Count(v)), + StrOrU64::Str(v) => { + let cfg = parse_size::Config::new().with_decimal(); + cfg + .parse_size(&v) + .map(Count) + .map_err(|_| serde::de::Error::custom("Can't parse count")) + } + } +} + +impl<'de> Deserialize<'de> for Count { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de> + { + count(deserializer) + } +} + + +#[allow(clippy::missing_errors_doc)] +pub fn binsize<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de> +{ + match StrOrU64::deserialize(deserializer)? { + StrOrU64::U64(v) => Ok(BinSize(v)), + StrOrU64::Str(v) => { + let cfg = parse_size::Config::new().with_binary(); + cfg + .parse_size(&v) + .map(BinSize) + .map_err(|_| serde::de::Error::custom("Can't parse size")) + } + } +} + +impl<'de> Deserialize<'de> for BinSize { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de> + { + binsize(deserializer) + } +} + +#[allow(clippy::missing_errors_doc)] +pub fn decsize<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de> +{ + match StrOrU64::deserialize(deserializer)? { + StrOrU64::U64(v) => Ok(DecSize(v)), + StrOrU64::Str(v) => { + let cfg = parse_size::Config::new().with_decimal(); + cfg + .parse_size(&v) + .map(DecSize) + .map_err(|_| serde::de::Error::custom("Can't parse size")) + } + } +} + +impl<'de> Deserialize<'de> for DecSize { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de> + { + decsize(deserializer) + } +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/sqlfuncs.rs ================================================================== --- src/sqlfuncs.rs +++ src/sqlfuncs.rs @@ -1,10 +1,10 @@ use rusqlite::{functions::FunctionFlags, Connection, Error}; use sha2::{Digest, Sha256}; -use crate::strx::RndStr; +use crate::strx::{validate_objname, RndStr}; /// Add a `hashstr()` SQL function to the connection object. /// /// The SQL function `hashstr()` takes two arguments: /// 1. The algorithm that will be used to hash the input. @@ -26,10 +26,11 @@ /// row.get(0) /// } /// ).unwrap(); /// assert_eq!(&hstr[..8], "09ca7e4e"); /// ``` +#[allow(clippy::missing_errors_doc)] pub fn hashstr(conn: &Connection) -> Result<(), Error> { conn.create_scalar_function( "hashstr", 2, FunctionFlags::SQLITE_UTF8 | FunctionFlags::SQLITE_DETERMINISTIC, @@ -73,10 +74,11 @@ /// row.get(0) /// } /// ).unwrap(); /// assert_eq!(&hstr[..8], "09ca7e4e"); /// ``` +#[allow(clippy::missing_errors_doc)] pub fn hashblob(conn: &Connection) -> Result<(), Error> { conn.create_scalar_function( "hashblob", 2, FunctionFlags::SQLITE_UTF8 | FunctionFlags::SQLITE_DETERMINISTIC, @@ -118,10 +120,11 @@ /// id INTEGER PRIMARY KEY, /// salt TEXT NOT NULL DEFAULT (randomstr(8)) /// ); /// "#, []); /// ``` +#[allow(clippy::missing_errors_doc)] pub fn rndstr_alphanum(db: &Connection) -> Result<(), Error> { db.create_scalar_function( "randomstr", 1, FunctionFlags::SQLITE_UTF8, @@ -133,10 +136,11 @@ Ok(String::rnd_alphanum(len)) } ) } +#[allow(clippy::missing_errors_doc)] pub fn rndstr(db: &Connection) -> Result<(), Error> { db.create_scalar_function( "randomstr", 2, FunctionFlags::SQLITE_UTF8, @@ -149,7 +153,46 @@ Ok(String::rnd_from_alphabet(len, charset)) } ) } + +/// Add a `isobjname()` SQL function to the connection object. +/// +/// The SQL function `isobjname()` takes a signle argument: +/// 1. The input string to check whether it conforms to an object name. +/// +/// ``` +/// use rusqlite::Connection; +/// use orphanage::sqlfuncs; +/// +/// let conn = Connection::open_in_memory().unwrap(); +/// sqlfuncs::hashstr(&conn).unwrap(); +/// +/// conn.execute(r#" +/// CREATE TABLE IF NOT EXISTS stuff ( +/// id INTEGER PRIMARY KEY, +/// name TEXT UNIQUE NOT NULL, +/// CHECK (isobjname(name) == 1) +/// ); +/// "#, []); +/// ``` +/// +/// # Panics +/// The number of input parameters must be exactly 1. +#[allow(clippy::missing_errors_doc)] +pub fn isobjname(conn: &Connection) -> Result<(), Error> { + conn.create_scalar_function( + "isobjname", + 1, + FunctionFlags::SQLITE_UTF8 | FunctionFlags::SQLITE_DETERMINISTIC, + move |ctx| { + assert_eq!(ctx.len(), 1, "called with unexpected number of arguments"); + let name = ctx.get::(0)?; + validate_objname(&name) + .map_err(|e| Error::UserFunctionError(Box::new(e)))?; + Ok(1) + } + ) +} // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/strx.rs ================================================================== --- src/strx.rs +++ src/strx.rs @@ -1,8 +1,10 @@ -//! Extended string functionality. +//! Extended str/string functionality. use rand::{distributions::Alphanumeric, Rng}; + +use crate::err::Error; pub trait RndStr { fn rnd_alphanum(len: usize) -> String; fn rnd_from_alphabet(len: usize, alpha: &[u8]) -> String; } @@ -39,6 +41,91 @@ }) .collect() } } + +#[inline] +#[must_use] +pub fn is_name_leading_char(c: char) -> bool { + c.is_alphabetic() +} + +#[inline] +#[must_use] +pub fn is_name_char(c: char) -> bool { + c.is_alphanumeric() || c == '_' || c == '-' || c == '.' +} + +#[allow(clippy::missing_errors_doc)] +pub fn validate_name(s: &str, lead: L, rest: R) -> Result<(), Error> +where + L: Fn(char) -> bool, + R: Fn(char) -> bool +{ + let mut chars = s.chars(); + let Some(ch) = chars.next() else { + return Err(Error::BadFormat( + "Object name must not be empty".to_string() + )); + }; + if !lead(ch) { + return Err(Error::BadFormat( + "Invalid leading object name character".to_string() + )); + } + if chars.any(|c| !rest(c)) { + return Err(Error::BadFormat( + "Invalid object name character".to_string() + )); + } + Ok(()) +} + +#[inline] +#[allow(clippy::missing_errors_doc)] +pub fn validate_objname(s: &str) -> Result<(), Error> { + validate_name(s, is_name_leading_char, is_name_char) +} + + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + #[should_panic(expected = "empty name")] + fn bad_empty_name() { + validate_name("", is_name_leading_char, is_name_char).expect("empty name"); + } + + #[test] + #[should_panic(expected = "invalid leading character")] + fn bad_initial_number_num() { + validate_name("0hello", is_name_leading_char, is_name_char) + .expect("invalid leading character"); + } + + #[test] + #[should_panic(expected = "invalid leading character")] + fn bad_initial_number_dash() { + validate_name("-hello", is_name_leading_char, is_name_char) + .expect("invalid leading character"); + } + + #[test] + #[should_panic(expected = "invalid leading character")] + fn bad_initial_number_underscore() { + validate_name("_hello", is_name_leading_char, is_name_char) + .expect("invalid leading character"); + } + + #[test] + fn good_names() { + validate_objname("hello").unwrap(); + validate_objname("hell0").unwrap(); + validate_objname("he_ll0").unwrap(); + validate_objname("he_ll-0").unwrap(); + } +} + // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/tokiox.rs ================================================================== --- src/tokiox.rs +++ src/tokiox.rs @@ -1,103 +1,180 @@ +//! tokio extensions. +//! +//! # Connector +//! The _Connector_ subsystem consists of two parts: +//! - The [`Connector`] trait is implemented by applications/libraries that +//! want to run retryable connection loops. +//! - [`run_connector()`] is a function that takes in a `Connector` +//! implementation, and attempts to establish a connection and calls the +//! [`Connector::run()`] once a connection has been successfully been +//! established. +//! +//! Paradoxically the `run_connector()` function does not itself actually +//! attemtp to establish any connections -- it relies on the `Connector` trait +//! to implement the means to establish connections. +//! +//! The "good path" overall flow of the connector loop is to call the +//! `connect()` method. If it is successful, call the `run()` method, passing +//! along the newly allocated connection. The main application logic relating +//! to the connection should called from this method. +//! +//! The primary purpose of the connector concerns the "failure path": If the +//! `connect()` method encounters a failure it can choose to signal to back to +//! the connector loop that the error is "retryable", in which case the +//! `retry_delay()` method is called to determine if the connector loop should +//! retry (and implement a delay before returning instructions to do so). +//! +//! Likewise, the `run()` method returns its `Result<(), Self::Error>` wrapped +//! in a `ControlFlow::Continue(_)` to indicate that the connector look should +//! reconnect, while `ControlFlow::Break(_)` signals that a fatal error occured +//! and the connect loop should terminate. + pub mod tcpconn; use std::ops::ControlFlow; use async_trait::async_trait; -use tokio::io::{AsyncRead, AsyncWrite}; - pub use tcpconn::SimpleTcpConnector; -pub enum Error { - /// An application callback returned an error. - App(E), - - /// The application callback chose not to retry. - /// - /// Could mean that the maximum number of retries have been eached. The - /// last application error is returned in E. - Exhausted(E) -} - -pub enum ConnResult { - Connected(C), - - /// The connection was aborted by external means. - Aborted, - - /// An error occurred, but the connector is recommended to delay and then - /// try to connect again. - RetryableError(E), - - /// An error occurred, and the error is not considered to be retryable. - FatalError(E) -} - -/// Application callbacks for the [`run_connector()`] function. + +/// Application callbacks for the [`run_connector()`] function (or equivalent). #[async_trait] pub trait Connector { - type ConnType: AsyncRead + AsyncWrite + Unpin; + /// The connection return type. + type ConnType: Send; + + /// The application return type. type Error; - /// Establish a connection to the listener. - async fn connect(&mut self) -> ConnResult; - - /// Returns `true` if the connector should attempt to reconnect. Returns - /// `false` otherwise. - /// - /// The application can use this callback to implemement a maximum number of - /// retries. - fn retry(&mut self) -> bool; - - /// Call application to delay before next reconnection attempt. - /// - /// This method should only be called if [`Connector::retry()`] return true. - /// - /// If this returns `ControlFlow::Continue(())` the connector will attempt to - /// reconnect. `ControlFlow::Break(_)` will cause the connector to abort. - async fn delay(&mut self) -> ControlFlow<(), ()>; + /// Establish a connection. + /// + /// If a connection was successfully established the implementation should + /// return `Ok(Self::ConnType)`. + /// + /// If an error is returned as `Err(ControlFlow::Continue(Self::Error))` it + /// signals to the connector loop that the error is non-fatal, and that the + /// connection should be retried. Returning an error as + /// `Err(ControlFlow::Break(Self::Error))` signals that there's no point in + /// trying to (re)connect (with the same configuration) and the + /// (re)connection loop is terminated. + async fn connect( + &mut self + ) -> Result>; + + /// Give application a chance to determine whether or not to attempt a + /// reconnection, and delay before doing so. + /// + /// Implementations return `ControlFlow::Continue(())` to signal to the + /// connector loop that it should retry the connection. Returnning + /// `ControlFlow::Break(Self::Error)` will terminate the connector loop and + /// cause it to return the error. + async fn retry_delay(&mut self) -> ControlFlow; /// Run the application's connection handler. /// - /// The application should return `ControlFlow::Continue(_)` to request the - /// connector to delay and reconnect. + /// The application should return `ControlFlow::Continue(_)` to request that + /// the connector loop delay and reconnect. Returning + /// `ControlFlow::Break(_)` will cause the connect loop to terminate and + /// return the suppied result. async fn run( &mut self, conn: Self::ConnType ) -> ControlFlow, Result<(), Self::Error>>; } -/// Establish a network connection with a built-in retry loop. -pub async fn run_connector( - mut connector: impl Connector -) -> Result<(), Error> { - loop { - match connector.connect().await { - ConnResult::Connected(conn) => { - // Connection was successful -- run the server application. - // - // The return value can indicate that the connector should reconnect or - // abort. - match connector.run(conn).await { - ControlFlow::Continue(_) => continue, - ControlFlow::Break(res) => break res.map_err(|e| Error::App(e)) - } - } - ConnResult::Aborted => break Ok(()), - ConnResult::RetryableError(e) => { - if connector.retry() { - connector.delay().await; - } else { - // Presumably the maximum number of retries has been reached. - break Err(Error::Exhausted(e)); - } - } - ConnResult::FatalError(e) => { - break Err(Error::App(e)); - } - } +/// Establish a network connection. +/// +/// The `run_connector()` function will enter a loop that will attempt to +/// establish a connection and call the `Connector::run()` implementation once +/// succesful. If the connection fails, `Connector::retry_delay()` will be +/// called to determine whether to retry the connection. +/// +/// # Exit conditions +/// The (re)connection loop will keep running until an exit condition has been +/// triggered: +/// - [`Connector::connect()`] returns `Err(ControlFlow::Break(Self::Error))` +/// - [`Connector::retry_delay()`] returns `ControlFlow::Break(Self::Error)`. +/// - [`Connector::run()`] returns `ControlFlow::Break(Result<(), +/// Self::Error>)` +#[deprecated(since = "0.0.3", note = "use `schmoozer` crate instead")] +#[allow(clippy::missing_errors_doc)] +pub async fn run_connector( + mut connector: impl Connector + Send +) -> Result<(), E> +where + E: Send +{ + loop { + // Call the application's connect callback to attempt to establish + // connection. + match connector.connect().await { + Ok(conn) => { + // A connection was successfully established -- call the run() + // implementation. + match connector.run(conn).await { + ControlFlow::Continue(_res) => { + // The application has requested a reconnection. + // Fall through to retry()/delay() + } + ControlFlow::Break(res) => { + // Break out of loop -- passing along the result from the + // application. + break res; + } + } + } + Err(ControlFlow::Continue(_res)) => { + // The connector returned a retriable error + // fall through to retry()/delay() + } + Err(ControlFlow::Break(res)) => { + // The connector returned a fatal error + break Err(res); + } + } + + // If this point is reached the application has requested a reconnection. + // Call `retry_delay()` to allow the application to determine whether to + // retry or not. + + match connector.retry_delay().await { + ControlFlow::Continue(()) => { + // Application wants to reconnect. + continue; + } + ControlFlow::Break(err) => { + // Application does not want to reconnect + break Err(err); + } + } + } +} + + +/// If a timeout has been specified, sleep until then. +/// +/// Otherwise wait forever. +/// +/// Can be used to timeout select loops. +pub async fn sleep_until_if_some(at: Option) { + if let Some(to) = at { + if tokio::time::Instant::now() >= to { + return; + } + tokio::time::sleep_until(to).await; + } else { + let () = std::future::pending().await; + } +} + +pub async fn sleep_for_if_some(dur: Option) { + if let Some(dur) = dur { + tokio::time::sleep(dur).await; + } else { + let () = std::future::pending().await; } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/tokiox/tcpconn.rs ================================================================== --- src/tokiox/tcpconn.rs +++ src/tokiox/tcpconn.rs @@ -1,14 +1,14 @@ -use std::{future::Future, ops::ControlFlow, time::Duration}; +use std::{future::Future, io::ErrorKind, ops::ControlFlow, time::Duration}; use tokio::net::TcpStream; use async_trait::async_trait; use killswitch::KillSwitch; -use super::{ConnResult, Connector}; +use super::Connector; pub struct SimpleTcpConnector where F: Future< Output = ControlFlow< @@ -30,10 +30,11 @@ Result<(), std::io::Error>, Result<(), std::io::Error> > > { + #[must_use] pub fn new( addr: String, ks: KillSwitch, cb: Box F + Send> ) -> Self { @@ -57,33 +58,42 @@ > + Send { type Error = std::io::Error; type ConnType = TcpStream; - async fn connect(&mut self) -> ConnResult { - match TcpStream::connect(&self.addr).await { - Ok(conn) => ConnResult::Connected(conn), - - Err(e) => { - // ToDo: Distinguish between retryable and fatal errors - - ConnResult::RetryableError(e) + async fn connect( + &mut self + ) -> Result> { + tokio::select! { + res = TcpStream::connect(&self.addr) => { + match res { + Ok(conn) => Ok(conn), + Err(e) => match e.kind() { + ErrorKind::ConnectionAborted | ErrorKind::NotConnected => { + Err(ControlFlow::Continue(e)) + } + _ => Err(ControlFlow::Break(e)) + } + } + } + () = self.ks.wait() => { + // Aborted -- use ErrorKind::Other to signal abortion + let err = std::io::Error::other(String::from("aborted")); + Err(ControlFlow::Break(err)) } } } - fn retry(&mut self) -> bool { - true - } - - async fn delay(&mut self) -> ControlFlow<(), ()> { + async fn retry_delay(&mut self) -> ControlFlow { let dur = Duration::from_secs(self.delay.try_into().unwrap()); tokio::select! { - _ = self.ks.wait() => { - ControlFlow::Break(()) + () = self.ks.wait() => { + let err = std::io::Error::other(String::from("aborted")); + ControlFlow::Break(err) } - _ = tokio::time::sleep(dur) => { + () = tokio::time::sleep(dur) => { + // double sleep duration for each iteration, but cap at 60 seconds self.delay = std::cmp::min(self.delay * 2, 60); ControlFlow::Continue(()) } } } Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -1,23 +1,48 @@ # Change Log ## [Unreleased] -[Details](/vdiff?from=orphanage-0.0.1&to=trunk) +[Details](/vdiff?from=orphanage-0.0.3&to=trunk) + +### Added + +### Changed + +### Removed + +--- + +## [0.0.3] - 2024-09-14 + +[Details](/vdiff?from=orphanage-0.0.2&to=orphanage-0.0.3) + +### Added + +- API for validating "object names". +- `sqlfuncs::isobjname()` + +### Changed + +- Major redesign of `Connector`/`run_connector()`. Uses more conventional + return types now and merged `Connector::retry()` and `Connector::delay()` + into `Connector::retry_delay()`. + +--- + +## [0.0.2] - 2024-02-13 + +[Details](/vdiff?from=orphanage-0.0.1&to=orphanage-0.0.2) ### Added - Add an `iox` module, containing `RngReader` which implements `std::io::Read` and returns random data. - Add `fs::rndfile()` for creating files with random content of a requested size. - Add `tokiox` module with an abstraction over (re)connection loops. -### Changed - -### Removed - --- ## [0.0.1] - 2024-02-10 Initial release Index: www/index.md ================================================================== --- www/index.md +++ www/index.md @@ -7,5 +7,15 @@ forever, because some functions are weird and esoteric. Warning: There's a significant chance that this crate will blow up the number of dependencies in your project. + +## Feature labels in documentation + +The crate's documentation uses automatically generated feature labels, which +currently requires nightly featuers. To build the documentation locally use: + +``` +RUSTFLAGS="--cfg docsrs" RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features +``` +