Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "schmoozer" -version = "0.3.0" +version = "0.4.0" edition = "2021" license = "0BSD" # https://crates.io/category_slugs categories = [ "network-programming" ] keywords = [ "connector", "network", "tokio" ] @@ -23,15 +23,17 @@ [badges] maintenance = { status = "actively-developed" } [features] tcpconn = ["dep:killswitch", "tokio/macros", "tokio/net", "tokio/time"] +tracing = ["dep:tracing"] [dependencies] async-trait = { version = "0.1.83" } killswitch = { version = "0.4.2", optional = true } tokio = { version = "1.40.0", optional = true } +tracing = { version = "0.1.40", optional = true } [dev-dependencies] tokio = { version = "1.40.0", features = [ "macros", "net", "rt-multi-thread", "time" ] } Index: examples/net.rs ================================================================== --- examples/net.rs +++ examples/net.rs @@ -37,11 +37,11 @@ println!("Retryable error: {e}"); ConnResult::Reconnect } _ => { println!("Fatal error: {e}"); - ConnResult::Fail(e) + ConnResult::Exit(Err(e)) } } } } @@ -64,11 +64,11 @@ // // Run connection logic here .. // - RunResult::Terminate + RunResult::Exit(Ok(())) } } #[tokio::main] async fn main() { Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -1,33 +1,40 @@ //! _schmoozer_ is intended to be used as an `async` (re)connector. It -//! consists of two parts: +//! consists of two primary parts: //! - The [`Connector`] trait is implemented by applications/libraries that //! need to run retryable connection loops. //! - [`run()`] is a function that takes in a `Connector` implementation, and //! attempts to establish a connection, delaying and retrying on failures //! that the callback reports as retriable, and calls the //! [`Connector::run()`] trait method once a connection has been successfully //! been established. //! //! Perhaps paradoxically the [`run()`] function does not itself actually -//! attempt to establish any connections -- it relies on the `Connector` trait -//! to implement the means to establish connections. +//! attempt to establish any connections -- it relies on the +//! [`Connector::connect()`] trait method implementation to establish +//! connections. //! //! The "good path" overall flow of the connector loop is to call the -//! `connect()` trait method. If it is successful, call the trait's `run()` +//! `connect()` trait method. If it is successful, call the trait's `run()` //! method, passing along the newly allocated connection. The main application //! logic relating to the connection should implemented in this method. //! -//! The primary purpose of the connector concerns the "failure path": If the -//! `connect()` method encounters a failure it can choose to signal to back to -//! the connector loop that the error is "retryable", in which case the +//! The primary purpose of the connector concerns the "retryable failure path": +//! If the `connect()` method encounters a failure it can choose to signal to +//! back to the connector loop that the error is "retryable", in which case the //! `retry_delay()` method is called to determine if the connector loop should //! retry (and implement a delay before returning instructions to do so). //! //! Likewise, the [`Connector::run()`] trait method returns its [`RunResult`] //! to indicate whether the connector should reconnect or exit, either //! successfully or with an error. +//! +//! # Features +//! | Feature | Function +//! |-----------|---------- +//! | `tcpconn` | Enable support for a simple TCP (re)connector. +//! | `tracing` | Make the connector loop generator tracing logs. #![cfg_attr(docsrs, feature(doc_cfg))] #[cfg(feature = "tcpconn")] #[cfg_attr(docsrs, doc(cfg(feature = "tcpconn")))] @@ -39,37 +46,38 @@ pub use tcpconn::SimpleTcpConnector; /// Application callbacks for the [`run()`] function (or equivalent). #[async_trait] pub trait Connector { - /// The connection type. + /// The connection type that the `Connector::connect()` implementor spawns. + /// + /// Once created, an instance of this type will be passed to the + /// `Connector::run()` implementation. type ConnType: Send; /// The application error return type. type Error: Send; /// Attempt to establish a connection. /// /// If a connection was successfully established the implementation returns - /// [`ConnResult::Connected`], which will instruct the - /// connector loop in [`run()`] to call the [`Connector::run()`] - /// implementation. + /// [`ConnResult::Connected`], which will instruct the connector loop in + /// [`run()`] to call the [`Connector::run()`] implementation. /// - /// If the implementation detects termination condition (such as a user - /// request to terminate the application), the handler returns - /// [`ConnResult::Terminate`], which will cause [`run()`]'s connection loop - /// to terminate and return `Ok(())`. + /// If the implementation detects a termination condition the handler returns + /// [`ConnResult::Exit`], which will cause [`run()`]'s connection loop + /// to terminate and return the result passed along with `ConnResult::Exit`. /// /// The implementation returns [`ConnResult::Reconnect`] to signal that some /// kind of retryable failure occurred. The connector loop in [`run()`] will /// call the [`Connector::retry_delay()`] to check if it should attempt a /// reconnection, and delay before doing so. /// /// # Errors /// If a fatal error occurs that is not retryable the implementation returns - /// [`ConnResult::Fail(Self::Error)`]. This will cause the connection loop - /// in [`run()`] to terminate and return `Err(E)`. + /// [`ConnResult::Exit`] with an `Err(E)`, which will be returned by the + /// connector loop function. async fn connect(&mut self) -> ConnResult; /// Give application a chance to determine whether or not to attempt a /// reconnection, and delay before doing so. /// @@ -79,31 +87,33 @@ /// /// The application should return [`RunResult::Reconnect`] to instruct the /// connector loop in [`run()`] to call [`Connector::connect()`] again to /// attempt to establish a connection. /// - /// If a the application has encountered a, successful, exit state, this - /// implementation returns [`RunResult::Terminate`]. + /// If the implementation detects a termination condition the handler returns + /// [`RunResult::Exit`], which will cause [`run()`]'s connection loop + /// to terminate and return the result passed along with `RunResult::Exit`. /// /// # Errors - /// If the run implementation encounters a fatal error that should terminate - /// the connection loop and return from [`run()`] with an error, - /// [`RunResult::Fail`] is returned. + /// If a fatal error occurs that is not retryable the implementation returns + /// [`RunResult::Exit`] with an `Err(E)`, which will be returned by the + /// connector loop function. async fn retry_delay(&mut self) -> RunResult; /// Run the application's connection handler. /// /// The application should return [`RunResult::Reconnect`] to instruct the /// connector loop in [`run()`] to attempt a reconnect. /// - /// If a the application has received some signal to terminate, successfully, - /// this implementation returns [`RunResult::Terminate`]. + /// If the implementation detects a termination condition the handler returns + /// [`RunResult::Exit`], which will cause [`run()`]'s connection loop + /// to terminate and return the result passed along with `RunResult::Exit`. /// /// # Errors - /// If the run implementation encounters a fatal error that should terminate - /// the connection loop and return from [`run()`] with an error, - /// [`RunResult::Fail`] is returned. + /// If a fatal error occurs that is not retryable the implementation returns + /// [`RunResult::Exit`] with an `Err(E)`, which will be returned by the + /// connector loop function. async fn run(&mut self, conn: Self::ConnType) -> RunResult; } /// Special-purpose result returned by [`Connector::connect()`]. @@ -117,27 +127,21 @@ /// /// Call [`Connector::retry_delay()`] to check if reconnection attempts has /// been exhaused and, if applicable, delay before reconnection attempt. Reconnect, - /// Terminate reconnection loop, indicating successful termination. - Terminate, - - /// Terminate the reconnection loop with an error. - Fail(E) + /// Terminate the reconnection loop. + Exit(Result<(), E>) } /// Returned by [`Connector::run()`] pub enum RunResult { /// Attempt to reconnect. Reconnect, - /// Terminate reconnection loop, indicating successful termination. - Terminate, - - /// Terminate the reconnection loop with an error. - Fail(E) + /// Terminate the reconnection loop. + Exit(Result<(), E>) } /// Establish and process a network connection. /// @@ -144,84 +148,101 @@ /// The `run()` function will enter a loop that will attempt to establish a /// connection by calling the [`Connector::connect()`] implementation. If a /// connection is successfully established the connector loop will call the /// [`Connector::run()`] implementation. /// -/// The main purpose of the connector loop is that is either the `connect()` or -/// the `run()` trait implementations fails in a retryable manner, -/// [`Connector::retry_delay()`] will be called to determine whether to retry -/// the connection or abort and return. +/// The main purpose of the connector loop to handle connection retry requests +/// from either the `connect()` or the `run()` trait implementations +/// (presumably because they failed in a retryable manner). If a reconnection +/// request is returned [`Connector::retry_delay()`] will be called to allow +/// the application to implement its own logic to determine whether the +/// reconnection shoulld proceed and optionally adding a delay before the +/// reconnection attempt. /// -/// # Success exit conditions -/// The (re)connection loop will exit with `Ok(())` if: -/// - [`Connector::connect()`] returns [`ConnResult::Terminate`] -/// - [`Connector::retry_delay()`] returns `ControlFlow::Break(Self::Error)` -/// - [`Connector::run()`] returns [`RunResult::Terminate`] +/// # Exit conditions +/// The (re)connection loop will exit if: +/// - [`Connector::connect()`] returns [`ConnResult::Exit`] +/// - [`Connector::retry_delay()`] returns [`RunResult::Exit`] +/// - [`Connector::run()`] returns [`RunResult::Exit`] /// /// # Errors -/// - If the [`Connector::connect()`] implementation returns -/// [`ConnResult::Fail`], this function will return `Err(E)`, where `E` is -/// the `Connector`'s `Error` type. -/// - If the [`Connector::run()`] implementation returns [`RunResult::Fail`], -/// this function will return `Err(E)`, where `E` is the `Connector`'s -/// `Error` type. -/// - If the [`Connector::retry_delay()`] implementation returns -/// `ControlFlow::Break(Self::Error)`. +/// If any of the `Connector`'s callbacks return `ConnResult::Exit(Err(_))` or +/// `RunResult::Exit(Err(_))` this function will return the error back to the +/// caller. #[allow(clippy::missing_errors_doc)] pub async fn run( mut connector: impl Connector + Send ) -> Result<(), E> where - E: Send + E: Send + std::fmt::Debug { + #[cfg(feature = "tracing")] + tracing::info!("Enter (re)connection loop"); loop { // Call the application's connect callback to attempt to establish // connection. + #[cfg(feature = "tracing")] + tracing::info!("Attempt to establish connection"); match connector.connect().await { ConnResult::Connected(conn) => { // A connection was successfully established -- call the run() // implementation. + #[cfg(feature = "tracing")] + tracing::info!( + "Got connection -- call application connection handler" + ); match connector.run(conn).await { RunResult::Reconnect => { // The application has requested a reconnection. // Fall through to retry_delay() - } - RunResult::Terminate => { - break Ok(()); - } - RunResult::Fail(e) => { - // Break out of loop -- passing along the error from the - // application. - break Err(e); + #[cfg(feature = "tracing")] + tracing::debug!("Connector::run() requested reconnection"); + } + RunResult::Exit(res) => { + #[cfg(feature = "tracing")] + tracing::info!( + "Connector::connect() requested termination: {res:?}" + ); + break res; } } } ConnResult::Reconnect => { // The connector returned a retriable error // fall through to retry()/delay() + #[cfg(feature = "tracing")] + tracing::debug!("Connector::connect() requested reconnection"); } - ConnResult::Terminate => break Ok(()), - ConnResult::Fail(e) => { - // The connector returned a fatal error - break Err(e); + ConnResult::Exit(res) => { + // Terminate reconnection loop + #[cfg(feature = "tracing")] + tracing::info!("Connector::connect() requested termination; {res:?}"); + break res; } } // If this point is reached the application has requested a reconnection. // Call `retry_delay()` to allow the application to determine whether to // retry or not. + #[cfg(feature = "tracing")] + tracing::info!("Call retry/delay callback"); match connector.retry_delay().await { RunResult::Reconnect => { // Application wants to reconnect. + #[cfg(feature = "tracing")] + tracing::debug!("Connector::retry_delay() requested reconnection"); continue; } - RunResult::Terminate => break Ok(()), - RunResult::Fail(e) => { - // Application does not want to reconnect - break Err(e); + RunResult::Exit(res) => { + // Terminate reconnection loop + #[cfg(feature = "tracing")] + tracing::info!( + "Connector::retry_delay() requested termination: {res:?}" + ); + break res; } } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/tcpconn.rs ================================================================== --- src/tcpconn.rs +++ src/tcpconn.rs @@ -59,25 +59,25 @@ Err(e) => match e.kind() { ErrorKind::ConnectionRefused | ErrorKind::ConnectionAborted | ErrorKind::NotConnected | ErrorKind::TimedOut => { ConnResult::Reconnect } - _ => ConnResult::Fail(e) + _ => ConnResult::Exit(Err(e)) } } } () = self.ks.wait() => { - ConnResult::Terminate + ConnResult::Exit(Ok(())) } } } async fn retry_delay(&mut self) -> RunResult { let dur = Duration::from_secs(self.delay.try_into().unwrap()); tokio::select! { () = self.ks.wait() => { - RunResult::Terminate + RunResult::Exit(Ok(())) } () = tokio::time::sleep(dur) => { // double sleep duration for each iteration, but cap at 60 seconds self.delay = std::cmp::min(self.delay * 2, 60); RunResult::Reconnect Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -1,26 +1,45 @@ # Change Log +⚠️ indicates a breaking change. + ## [Unreleased] -[Details](/vdiff?from=schmoozer-0.3.0&to=trunk) +[Details](/vdiff?from=schmoozer-0.4.0&to=trunk) ### Added ### Changed ### Removed --- + +## [0.4.0] - 2024-10-03 + +[Details](/vdiff?from=schmoozer-0.3.0&to=schmoozer-0.4.0) + +### Added + +- Add `tracing` feature, and use it to trace log the connection loop. +- ⚠️ The generic error type used for the `run()` function's `Result` now has a + `std::fmt::Debug` bound. + +### Changed + +- ⚠️ Unify `Terminate` and `Fail` variants from `ConnResult` and `RunResult` into + `ConnResult::Exit(Result<(), E>)` and `RunResult::Exit(Result<(), E>)`. + +--- ## [0.3.0] - 2024-10-03 [Details](/vdiff?from=schmoozer-0.2.0&to=schmoozer-0.3.0) ### Changed -- The `Connector::connect()`, `Connector::run()` return +- ⚠️ The `Connector::connect()`, `Connector::run()` return special-purpose return types `ConnResult` and `RunResult`, because the previous mix of `ControlFlow` and `Result` was too confusing. ---