Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "schmoozer" -version = "0.2.0" +version = "0.3.0" edition = "2021" license = "0BSD" # https://crates.io/category_slugs categories = [ "network-programming" ] keywords = [ "connector", "network", "tokio" ] @@ -25,11 +25,11 @@ [features] tcpconn = ["dep:killswitch", "tokio/macros", "tokio/net", "tokio/time"] [dependencies] -async-trait = { version = "0.1.82" } +async-trait = { version = "0.1.83" } killswitch = { version = "0.4.2", optional = true } tokio = { version = "1.40.0", optional = true } [dev-dependencies] tokio = { version = "1.40.0", features = [ Index: examples/net.rs ================================================================== --- examples/net.rs +++ examples/net.rs @@ -1,10 +1,10 @@ -use std::{env, io::ErrorKind, ops::ControlFlow, time::Duration}; +use std::{env, io::ErrorKind, time::Duration}; pub use tokio::net::TcpStream; -use schmoozer::{async_trait, Connector}; +use schmoozer::{async_trait, ConnResult, Connector, RunResult}; pub struct TcpConnector { addr: String, delay: usize } @@ -22,57 +22,53 @@ #[async_trait] impl Connector for TcpConnector { type Error = std::io::Error; type ConnType = TcpStream; - async fn connect( - &mut self - ) -> Result> { + async fn connect(&mut self) -> ConnResult { println!("Connecting to {}", self.addr); let res = TcpStream::connect(&self.addr).await; match res { - Ok(conn) => Ok(conn), + Ok(conn) => ConnResult::Connected(conn), Err(e) => match e.kind() { ErrorKind::ConnectionRefused | ErrorKind::ConnectionAborted | ErrorKind::NotConnected | ErrorKind::TimedOut => { println!("Retryable error: {e}"); - Err(ControlFlow::Continue(e)) + ConnResult::Reconnect } _ => { println!("Fatal error: {e}"); - Err(ControlFlow::Break(e)) + ConnResult::Fail(e) } } } } - async fn retry_delay(&mut self) -> ControlFlow { + async fn retry_delay(&mut self) -> RunResult { let dur = Duration::from_secs(self.delay.try_into().unwrap()); println!("Retrying in {dur:?} .."); tokio::time::sleep(dur).await; // for next iteration double sleep duration for each iteration, but cap at // 60 seconds self.delay = std::cmp::min(self.delay * 2, 60); - ControlFlow::Continue(()) + + RunResult::Reconnect } - async fn run( - &mut self, - _conn: Self::ConnType - ) -> ControlFlow, Result<(), Self::Error>> { + async fn run(&mut self, _conn: Self::ConnType) -> RunResult { // reset delay duration when connection was successful self.delay = 1; // // Run connection logic here .. // - ControlFlow::Continue(Ok(())) + RunResult::Terminate } } #[tokio::main] async fn main() { Index: examples/tcpconn.rs ================================================================== --- examples/tcpconn.rs +++ examples/tcpconn.rs @@ -1,10 +1,13 @@ #[cfg(feature = "tcpconn")] mod inner { - use std::{env, ops::ControlFlow}; + use std::env; - use schmoozer::tcpconn::{KillSwitch, SimpleTcpConnector, TcpStream}; + use schmoozer::{ + tcpconn::{KillSwitch, SimpleTcpConnector, TcpStream}, + RunResult + }; pub async fn main() { let args: Vec = env::args().skip(1).collect(); let ks = KillSwitch::new(); @@ -15,12 +18,12 @@ } async fn proc_connection( _strm: TcpStream, _ks: KillSwitch - ) -> ControlFlow, Result<(), std::io::Error>> { - ControlFlow::Continue(Ok(())) + ) -> RunResult { + RunResult::Reconnect } } #[cfg(feature = "tcpconn")] #[tokio::main] Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -11,33 +11,30 @@ //! Perhaps paradoxically the [`run()`] function does not itself actually //! attempt to establish any connections -- it relies on the `Connector` trait //! to implement the means to establish connections. //! //! The "good path" overall flow of the connector loop is to call the -//! `connect()` method. If it is successful, call the `run()` method, passing -//! along the newly allocated connection. The main application logic relating -//! to the connection should called from this method. +//! `connect()` trait method. If it is successful, call the trait's `run()` +//! method, passing along the newly allocated connection. The main application +//! logic relating to the connection should implemented in this method. //! //! The primary purpose of the connector concerns the "failure path": If the //! `connect()` method encounters a failure it can choose to signal to back to //! the connector loop that the error is "retryable", in which case the //! `retry_delay()` method is called to determine if the connector loop should //! retry (and implement a delay before returning instructions to do so). //! -//! Likewise, the `run()` method returns its `Result<(), Self::Error>` wrapped -//! in a `ControlFlow::Continue(_)` to indicate that the connector look should -//! reconnect, while `ControlFlow::Break(_)` signals that a fatal error occured -//! and the connect loop should terminate. +//! Likewise, the [`Connector::run()`] trait method returns its [`RunResult`] +//! to indicate whether the connector should reconnect or exit, either +//! successfully or with an error. #![cfg_attr(docsrs, feature(doc_cfg))] #[cfg(feature = "tcpconn")] #[cfg_attr(docsrs, doc(cfg(feature = "tcpconn")))] pub mod tcpconn; -use std::ops::ControlFlow; - pub use async_trait::async_trait; #[cfg(feature = "tcpconn")] pub use tcpconn::SimpleTcpConnector; @@ -48,60 +45,129 @@ type ConnType: Send; /// The application error return type. type Error: Send; - /// Establish a connection. - /// - /// If a connection was successfully established the implementation should - /// return `Ok(Self::ConnType)`. - /// - /// If an error is returned as `Err(ControlFlow::Continue(Self::Error))` it - /// signals to the connector loop that the error is non-fatal, and that the - /// connection should be retried. Returning an error as - /// `Err(ControlFlow::Break(Self::Error))` signals that there's no point in - /// trying to (re)connect (with the same configuration) and the - /// (re)connection loop is terminated. - async fn connect( - &mut self - ) -> Result>; + /// Attempt to establish a connection. + /// + /// If a connection was successfully established the implementation returns + /// [`ConnResult::Connected`], which will instruct the + /// connector loop in [`run()`] to call the [`Connector::run()`] + /// implementation. + /// + /// If the implementation detects termination condition (such as a user + /// request to terminate the application), the handler returns + /// [`ConnResult::Terminate`], which will cause [`run()`]'s connection loop + /// to terminate and return `Ok(())`. + /// + /// The implementation returns [`ConnResult::Reconnect`] to signal that some + /// kind of retryable failure occurred. The connector loop in [`run()`] will + /// call the [`Connector::retry_delay()`] to check if it should attempt a + /// reconnection, and delay before doing so. + /// + /// # Errors + /// If a fatal error occurs that is not retryable the implementation returns + /// [`ConnResult::Fail(Self::Error)`]. This will cause the connection loop + /// in [`run()`] to terminate and return `Err(E)`. + async fn connect(&mut self) -> ConnResult; /// Give application a chance to determine whether or not to attempt a /// reconnection, and delay before doing so. /// - /// Implementations return `ControlFlow::Continue(())` to signal to the - /// connector loop that it should retry the connection. Returnning - /// `ControlFlow::Break(Self::Error)` will terminate the connector loop and - /// cause it to return the error. - async fn retry_delay(&mut self) -> ControlFlow; + /// This implementation is called either when `Connection::connect()` or + /// `Connector::run()` return `ConnResult::Reconnect` or + /// `RunResult::Reconnect`. + /// + /// The application should return [`RunResult::Reconnect`] to instruct the + /// connector loop in [`run()`] to call [`Connector::connect()`] again to + /// attempt to establish a connection. + /// + /// If a the application has encountered a, successful, exit state, this + /// implementation returns [`RunResult::Terminate`]. + /// + /// # Errors + /// If the run implementation encounters a fatal error that should terminate + /// the connection loop and return from [`run()`] with an error, + /// [`RunResult::Fail`] is returned. + async fn retry_delay(&mut self) -> RunResult; /// Run the application's connection handler. /// - /// The application should return `ControlFlow::Continue(_)` to request that - /// the connector loop delay and reconnect. Returning - /// `ControlFlow::Break(_)` will cause the connect loop to terminate and - /// return the supplied result. - async fn run( - &mut self, - conn: Self::ConnType - ) -> ControlFlow, Result<(), Self::Error>>; + /// The application should return [`RunResult::Reconnect`] to instruct the + /// connector loop in [`run()`] to attempt a reconnect. + /// + /// If a the application has received some signal to terminate, successfully, + /// this implementation returns [`RunResult::Terminate`]. + /// + /// # Errors + /// If the run implementation encounters a fatal error that should terminate + /// the connection loop and return from [`run()`] with an error, + /// [`RunResult::Fail`] is returned. + async fn run(&mut self, conn: Self::ConnType) -> RunResult; +} + + +/// Special-purpose result returned by [`Connector::connect()`]. +pub enum ConnResult { + /// The connection was successful. + /// + /// Run the [`Connector::run()`] with the connection `C`. + Connected(C), + + /// Connection could not be established. + /// + /// Call [`Connector::retry_delay()`] to check if reconnection attempts has + /// been exhaused and, if applicable, delay before reconnection attempt. + Reconnect, + + /// Terminate reconnection loop, indicating successful termination. + Terminate, + + /// Terminate the reconnection loop with an error. + Fail(E) +} + +/// Returned by [`Connector::run()`] +pub enum RunResult { + /// Attempt to reconnect. + Reconnect, + + /// Terminate reconnection loop, indicating successful termination. + Terminate, + + /// Terminate the reconnection loop with an error. + Fail(E) } /// Establish and process a network connection. /// /// The `run()` function will enter a loop that will attempt to establish a -/// connection and call the `Connector::run()` implementation once succesful. -/// If the connection fails, `Connector::retry_delay()` will be -/// called to determine whether to retry the connection or abort and return. +/// connection by calling the [`Connector::connect()`] implementation. If a +/// connection is successfully established the connector loop will call the +/// [`Connector::run()`] implementation. +/// +/// The main purpose of the connector loop is that is either the `connect()` or +/// the `run()` trait implementations fails in a retryable manner, +/// [`Connector::retry_delay()`] will be called to determine whether to retry +/// the connection or abort and return. /// -/// # Exit conditions -/// The (re)connection loop will keep running until an exit condition has been -/// triggered: -/// - [`Connector::connect()`] returns `Err(ControlFlow::Break(Self::Error))` +/// # Success exit conditions +/// The (re)connection loop will exit with `Ok(())` if: +/// - [`Connector::connect()`] returns [`ConnResult::Terminate`] /// - [`Connector::retry_delay()`] returns `ControlFlow::Break(Self::Error)` -/// - [`Connector::run()`] returns `ControlFlow::Break(_)` +/// - [`Connector::run()`] returns [`RunResult::Terminate`] +/// +/// # Errors +/// - If the [`Connector::connect()`] implementation returns +/// [`ConnResult::Fail`], this function will return `Err(E)`, where `E` is +/// the `Connector`'s `Error` type. +/// - If the [`Connector::run()`] implementation returns [`RunResult::Fail`], +/// this function will return `Err(E)`, where `E` is the `Connector`'s +/// `Error` type. +/// - If the [`Connector::retry_delay()`] implementation returns +/// `ControlFlow::Break(Self::Error)`. #[allow(clippy::missing_errors_doc)] pub async fn run( mut connector: impl Connector + Send ) -> Result<(), E> where @@ -109,48 +175,53 @@ { loop { // Call the application's connect callback to attempt to establish // connection. match connector.connect().await { - Ok(conn) => { - // A connection was successfully established -- call the run() - // implementation. - match connector.run(conn).await { - ControlFlow::Continue(_res) => { - // The application has requested a reconnection. - // Fall through to retry()/delay() - } - ControlFlow::Break(res) => { - // Break out of loop -- passing along the result from the - // application. - break res; - } - } - } - Err(ControlFlow::Continue(_res)) => { - // The connector returned a retriable error - // fall through to retry()/delay() - } - Err(ControlFlow::Break(res)) => { - // The connector returned a fatal error - break Err(res); + ConnResult::Connected(conn) => { + // A connection was successfully established -- call the run() + // implementation. + match connector.run(conn).await { + RunResult::Reconnect => { + // The application has requested a reconnection. + // Fall through to retry_delay() + } + RunResult::Terminate => { + break Ok(()); + } + RunResult::Fail(e) => { + // Break out of loop -- passing along the error from the + // application. + break Err(e); + } + } + } + ConnResult::Reconnect => { + // The connector returned a retriable error + // fall through to retry()/delay() + } + ConnResult::Terminate => break Ok(()), + ConnResult::Fail(e) => { + // The connector returned a fatal error + break Err(e); } } // If this point is reached the application has requested a reconnection. // Call `retry_delay()` to allow the application to determine whether to // retry or not. match connector.retry_delay().await { - ControlFlow::Continue(()) => { + RunResult::Reconnect => { // Application wants to reconnect. continue; } - ControlFlow::Break(err) => { + RunResult::Terminate => break Ok(()), + RunResult::Fail(e) => { // Application does not want to reconnect - break Err(err); + break Err(e); } } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/tcpconn.rs ================================================================== --- src/tcpconn.rs +++ src/tcpconn.rs @@ -1,39 +1,34 @@ //! A simple [`Connector`] implementation that specifically attempts to connect //! to a TCP server and run an async function once successful. -use std::{future::Future, io::ErrorKind, ops::ControlFlow, time::Duration}; +use std::{future::Future, io::ErrorKind, time::Duration}; pub use tokio::net::TcpStream; pub use killswitch::KillSwitch; -use super::{async_trait, Connector}; +use super::{async_trait, ConnResult, Connector, RunResult}; +/// Simple TCP (re)connector. +/// +/// When run, the `Connector` implementation will attempt to establish a +/// [`TcpStream`] connection and then call an application-specified `Future` +/// (returned by a closure). pub struct SimpleTcpConnector where - F: Future< - Output = ControlFlow< - Result<(), std::io::Error>, - Result<(), std::io::Error> - > - > + F: Future> { addr: String, delay: usize, ks: KillSwitch, cb: Box F + Send> } impl SimpleTcpConnector where - F: Future< - Output = ControlFlow< - Result<(), std::io::Error>, - Result<(), std::io::Error> - > - > + F: Future> { #[allow(clippy::needless_pass_by_value)] pub fn new( addr: impl ToString, ks: KillSwitch, @@ -49,63 +44,50 @@ } #[async_trait] impl Connector for SimpleTcpConnector where - F: Future< - Output = ControlFlow< - Result<(), std::io::Error>, - Result<(), std::io::Error> - > - > + Send -{ - type Error = std::io::Error; - type ConnType = TcpStream; - - async fn connect( - &mut self - ) -> Result> { - tokio::select! { - res = TcpStream::connect(&self.addr) => { - match res { - Ok(conn) => Ok(conn), - Err(e) => match e.kind() { - ErrorKind::ConnectionRefused | ErrorKind::ConnectionAborted | - ErrorKind::NotConnected | ErrorKind::TimedOut => { - Err(ControlFlow::Continue(e)) - } - _ => Err(ControlFlow::Break(e)) - } - } - } - () = self.ks.wait() => { - // Aborted -- use ErrorKind::Other to signal abortion - let err = std::io::Error::other(String::from("aborted")); - Err(ControlFlow::Break(err)) - } - } - } - - async fn retry_delay(&mut self) -> ControlFlow { - let dur = Duration::from_secs(self.delay.try_into().unwrap()); - tokio::select! { - () = self.ks.wait() => { - let err = std::io::Error::other(String::from("aborted")); - ControlFlow::Break(err) - } - () = tokio::time::sleep(dur) => { - // double sleep duration for each iteration, but cap at 60 seconds - self.delay = std::cmp::min(self.delay * 2, 60); - ControlFlow::Continue(()) - } - } - } - - async fn run( - &mut self, - conn: Self::ConnType - ) -> ControlFlow, Result<(), Self::Error>> { + F: Future> + Send +{ + type Error = std::io::Error; + type ConnType = TcpStream; + + async fn connect(&mut self) -> ConnResult { + tokio::select! { + res = TcpStream::connect(&self.addr) => { + match res { + Ok(conn) => ConnResult::Connected(conn), + Err(e) => match e.kind() { + ErrorKind::ConnectionRefused | ErrorKind::ConnectionAborted | + ErrorKind::NotConnected | ErrorKind::TimedOut => { + ConnResult::Reconnect + } + _ => ConnResult::Fail(e) + } + } + } + () = self.ks.wait() => { + ConnResult::Terminate + } + } + } + + async fn retry_delay(&mut self) -> RunResult { + let dur = Duration::from_secs(self.delay.try_into().unwrap()); + tokio::select! { + () = self.ks.wait() => { + RunResult::Terminate + } + () = tokio::time::sleep(dur) => { + // double sleep duration for each iteration, but cap at 60 seconds + self.delay = std::cmp::min(self.delay * 2, 60); + RunResult::Reconnect + } + } + } + + async fn run(&mut self, conn: Self::ConnType) -> RunResult { // reset delay self.delay = 1; let fut = (self.cb)(conn, self.ks.clone()); Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -1,17 +1,29 @@ # Change Log ## [Unreleased] -[Details](/vdiff?from=schmoozer-0.2.0&to=trunk) +[Details](/vdiff?from=schmoozer-0.3.0&to=trunk) ### Added ### Changed ### Removed +--- + +## [0.3.0] - 2024-10-03 + +[Details](/vdiff?from=schmoozer-0.2.0&to=schmoozer-0.3.0) + +### Changed + +- The `Connector::connect()`, `Connector::run()` return + special-purpose return types `ConnResult` and `RunResult`, because the + previous mix of `ControlFlow` and `Result` was too confusing. + --- ## [0.2.0] - 2024-09-16 [Details](/vdiff?from=schmoozer-0.1.2&to=schmoozer-0.2.0)