Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From schmoozer-0.3.0 To schmoozer-0.4.0
2024-10-18
| ||
12:00 | Add Connector::display_target() trait method. check-in: a14a7e6a1b user: jan tags: trunk | |
2024-10-03
| ||
16:35 | Release maintenance. check-in: b503fe0d7f user: jan tags: schmoozer-0.4.0, trunk | |
16:32 | Rephrase. check-in: 0a6c85b133 user: jan tags: trunk | |
13:53 | Docs update. check-in: b68521df75 user: jan tags: trunk | |
01:08 | Release maintenance. check-in: 466bd7759a user: jan tags: schmoozer-0.3.0, trunk | |
00:59 | Make delay_retry() return RunResult as well. check-in: bade962274 user: jan tags: trunk | |
Changes to Cargo.toml.
1 2 | [package] name = "schmoozer" | | | 1 2 3 4 5 6 7 8 9 10 | [package] name = "schmoozer" version = "0.4.0" edition = "2021" license = "0BSD" # https://crates.io/category_slugs categories = [ "network-programming" ] keywords = [ "connector", "network", "tokio" ] repository = "https://repos.qrnch.tech/pub/schmoozer" description = "A simple abstraction over a retryable async operation, such as establishing a connection" |
︙ | ︙ | |||
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | # https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section [badges] maintenance = { status = "actively-developed" } [features] tcpconn = ["dep:killswitch", "tokio/macros", "tokio/net", "tokio/time"] [dependencies] async-trait = { version = "0.1.83" } killswitch = { version = "0.4.2", optional = true } tokio = { version = "1.40.0", optional = true } [dev-dependencies] tokio = { version = "1.40.0", features = [ "macros", "net", "rt-multi-thread", "time" ] } [package.metadata.docs.rs] | > > | 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | # https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section [badges] maintenance = { status = "actively-developed" } [features] tcpconn = ["dep:killswitch", "tokio/macros", "tokio/net", "tokio/time"] tracing = ["dep:tracing"] [dependencies] async-trait = { version = "0.1.83" } killswitch = { version = "0.4.2", optional = true } tokio = { version = "1.40.0", optional = true } tracing = { version = "0.1.40", optional = true } [dev-dependencies] tokio = { version = "1.40.0", features = [ "macros", "net", "rt-multi-thread", "time" ] } [package.metadata.docs.rs] |
︙ | ︙ |
Changes to examples/net.rs.
︙ | ︙ | |||
35 36 37 38 39 40 41 | | ErrorKind::NotConnected | ErrorKind::TimedOut => { println!("Retryable error: {e}"); ConnResult::Reconnect } _ => { println!("Fatal error: {e}"); | | | 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | | ErrorKind::NotConnected | ErrorKind::TimedOut => { println!("Retryable error: {e}"); ConnResult::Reconnect } _ => { println!("Fatal error: {e}"); ConnResult::Exit(Err(e)) } } } } async fn retry_delay(&mut self) -> RunResult<Self::Error> { let dur = Duration::from_secs(self.delay.try_into().unwrap()); |
︙ | ︙ | |||
62 63 64 65 66 67 68 | // reset delay duration when connection was successful self.delay = 1; // // Run connection logic here .. // | | | 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | // reset delay duration when connection was successful self.delay = 1; // // Run connection logic here .. // RunResult::Exit(Ok(())) } } #[tokio::main] async fn main() { let args: Vec<String> = env::args().skip(1).collect(); let connector = TcpConnector::new(&args[0]); schmoozer::run(connector).await.unwrap(); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/lib.rs.
1 | //! _schmoozer_ is intended to be used as an `async` (re)connector. It | | | > | | | | | > > > > > > | > > > | | < | < | | | | | | > | < | > | | > | < | > < < < | < > < < < | < > | | > | > > | | | | | | < < | < | | < < | > > > > > > > > < < | > | > | | > | > > < | | > > | > > > > < | | > > > > | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 | //! _schmoozer_ is intended to be used as an `async` (re)connector. It //! consists of two primary parts: //! - The [`Connector`] trait is implemented by applications/libraries that //! need to run retryable connection loops. //! - [`run()`] is a function that takes in a `Connector` implementation, and //! attempts to establish a connection, delaying and retrying on failures //! that the callback reports as retriable, and calls the //! [`Connector::run()`] trait method once a connection has been successfully //! been established. //! //! Perhaps paradoxically the [`run()`] function does not itself actually //! attempt to establish any connections -- it relies on the //! [`Connector::connect()`] trait method implementation to establish //! connections. //! //! The "good path" overall flow of the connector loop is to call the //! `connect()` trait method. If it is successful, call the trait's `run()` //! method, passing along the newly allocated connection. The main application //! logic relating to the connection should implemented in this method. //! //! The primary purpose of the connector concerns the "retryable failure path": //! If the `connect()` method encounters a failure it can choose to signal to //! back to the connector loop that the error is "retryable", in which case the //! `retry_delay()` method is called to determine if the connector loop should //! retry (and implement a delay before returning instructions to do so). //! //! Likewise, the [`Connector::run()`] trait method returns its [`RunResult`] //! to indicate whether the connector should reconnect or exit, either //! successfully or with an error. //! //! # Features //! | Feature | Function //! |-----------|---------- //! | `tcpconn` | Enable support for a simple TCP (re)connector. //! | `tracing` | Make the connector loop generator tracing logs. #![cfg_attr(docsrs, feature(doc_cfg))] #[cfg(feature = "tcpconn")] #[cfg_attr(docsrs, doc(cfg(feature = "tcpconn")))] pub mod tcpconn; pub use async_trait::async_trait; #[cfg(feature = "tcpconn")] pub use tcpconn::SimpleTcpConnector; /// Application callbacks for the [`run()`] function (or equivalent). #[async_trait] pub trait Connector { /// The connection type that the `Connector::connect()` implementor spawns. /// /// Once created, an instance of this type will be passed to the /// `Connector::run()` implementation. type ConnType: Send; /// The application error return type. type Error: Send; /// Attempt to establish a connection. /// /// If a connection was successfully established the implementation returns /// [`ConnResult::Connected`], which will instruct the connector loop in /// [`run()`] to call the [`Connector::run()`] implementation. /// /// If the implementation detects a termination condition the handler returns /// [`ConnResult::Exit`], which will cause [`run()`]'s connection loop /// to terminate and return the result passed along with `ConnResult::Exit`. /// /// The implementation returns [`ConnResult::Reconnect`] to signal that some /// kind of retryable failure occurred. The connector loop in [`run()`] will /// call the [`Connector::retry_delay()`] to check if it should attempt a /// reconnection, and delay before doing so. /// /// # Errors /// If a fatal error occurs that is not retryable the implementation returns /// [`ConnResult::Exit`] with an `Err(E)`, which will be returned by the /// connector loop function. async fn connect(&mut self) -> ConnResult<Self::ConnType, Self::Error>; /// Give application a chance to determine whether or not to attempt a /// reconnection, and delay before doing so. /// /// This implementation is called either when `Connection::connect()` or /// `Connector::run()` return `ConnResult::Reconnect` or /// `RunResult::Reconnect`. /// /// The application should return [`RunResult::Reconnect`] to instruct the /// connector loop in [`run()`] to call [`Connector::connect()`] again to /// attempt to establish a connection. /// /// If the implementation detects a termination condition the handler returns /// [`RunResult::Exit`], which will cause [`run()`]'s connection loop /// to terminate and return the result passed along with `RunResult::Exit`. /// /// # Errors /// If a fatal error occurs that is not retryable the implementation returns /// [`RunResult::Exit`] with an `Err(E)`, which will be returned by the /// connector loop function. async fn retry_delay(&mut self) -> RunResult<Self::Error>; /// Run the application's connection handler. /// /// The application should return [`RunResult::Reconnect`] to instruct the /// connector loop in [`run()`] to attempt a reconnect. /// /// If the implementation detects a termination condition the handler returns /// [`RunResult::Exit`], which will cause [`run()`]'s connection loop /// to terminate and return the result passed along with `RunResult::Exit`. /// /// # Errors /// If a fatal error occurs that is not retryable the implementation returns /// [`RunResult::Exit`] with an `Err(E)`, which will be returned by the /// connector loop function. async fn run(&mut self, conn: Self::ConnType) -> RunResult<Self::Error>; } /// Special-purpose result returned by [`Connector::connect()`]. pub enum ConnResult<C, E> { /// The connection was successful. /// /// Run the [`Connector::run()`] with the connection `C`. Connected(C), /// Connection could not be established. /// /// Call [`Connector::retry_delay()`] to check if reconnection attempts has /// been exhaused and, if applicable, delay before reconnection attempt. Reconnect, /// Terminate the reconnection loop. Exit(Result<(), E>) } /// Returned by [`Connector::run()`] pub enum RunResult<E> { /// Attempt to reconnect. Reconnect, /// Terminate the reconnection loop. Exit(Result<(), E>) } /// Establish and process a network connection. /// /// The `run()` function will enter a loop that will attempt to establish a /// connection by calling the [`Connector::connect()`] implementation. If a /// connection is successfully established the connector loop will call the /// [`Connector::run()`] implementation. /// /// The main purpose of the connector loop to handle connection retry requests /// from either the `connect()` or the `run()` trait implementations /// (presumably because they failed in a retryable manner). If a reconnection /// request is returned [`Connector::retry_delay()`] will be called to allow /// the application to implement its own logic to determine whether the /// reconnection shoulld proceed and optionally adding a delay before the /// reconnection attempt. /// /// # Exit conditions /// The (re)connection loop will exit if: /// - [`Connector::connect()`] returns [`ConnResult::Exit`] /// - [`Connector::retry_delay()`] returns [`RunResult::Exit`] /// - [`Connector::run()`] returns [`RunResult::Exit`] /// /// # Errors /// If any of the `Connector`'s callbacks return `ConnResult::Exit(Err(_))` or /// `RunResult::Exit(Err(_))` this function will return the error back to the /// caller. #[allow(clippy::missing_errors_doc)] pub async fn run<E>( mut connector: impl Connector<Error = E> + Send ) -> Result<(), E> where E: Send + std::fmt::Debug { #[cfg(feature = "tracing")] tracing::info!("Enter (re)connection loop"); loop { // Call the application's connect callback to attempt to establish // connection. #[cfg(feature = "tracing")] tracing::info!("Attempt to establish connection"); match connector.connect().await { ConnResult::Connected(conn) => { // A connection was successfully established -- call the run() // implementation. #[cfg(feature = "tracing")] tracing::info!( "Got connection -- call application connection handler" ); match connector.run(conn).await { RunResult::Reconnect => { // The application has requested a reconnection. // Fall through to retry_delay() #[cfg(feature = "tracing")] tracing::debug!("Connector::run() requested reconnection"); } RunResult::Exit(res) => { #[cfg(feature = "tracing")] tracing::info!( "Connector::connect() requested termination: {res:?}" ); break res; } } } ConnResult::Reconnect => { // The connector returned a retriable error // fall through to retry()/delay() #[cfg(feature = "tracing")] tracing::debug!("Connector::connect() requested reconnection"); } ConnResult::Exit(res) => { // Terminate reconnection loop #[cfg(feature = "tracing")] tracing::info!("Connector::connect() requested termination; {res:?}"); break res; } } // If this point is reached the application has requested a reconnection. // Call `retry_delay()` to allow the application to determine whether to // retry or not. #[cfg(feature = "tracing")] tracing::info!("Call retry/delay callback"); match connector.retry_delay().await { RunResult::Reconnect => { // Application wants to reconnect. #[cfg(feature = "tracing")] tracing::debug!("Connector::retry_delay() requested reconnection"); continue; } RunResult::Exit(res) => { // Terminate reconnection loop #[cfg(feature = "tracing")] tracing::info!( "Connector::retry_delay() requested termination: {res:?}" ); break res; } } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/tcpconn.rs.
︙ | ︙ | |||
57 58 59 60 61 62 63 | match res { Ok(conn) => ConnResult::Connected(conn), Err(e) => match e.kind() { ErrorKind::ConnectionRefused | ErrorKind::ConnectionAborted | ErrorKind::NotConnected | ErrorKind::TimedOut => { ConnResult::Reconnect } | | | | | 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | match res { Ok(conn) => ConnResult::Connected(conn), Err(e) => match e.kind() { ErrorKind::ConnectionRefused | ErrorKind::ConnectionAborted | ErrorKind::NotConnected | ErrorKind::TimedOut => { ConnResult::Reconnect } _ => ConnResult::Exit(Err(e)) } } } () = self.ks.wait() => { ConnResult::Exit(Ok(())) } } } async fn retry_delay(&mut self) -> RunResult<Self::Error> { let dur = Duration::from_secs(self.delay.try_into().unwrap()); tokio::select! { () = self.ks.wait() => { RunResult::Exit(Ok(())) } () = tokio::time::sleep(dur) => { // double sleep duration for each iteration, but cap at 60 seconds self.delay = std::cmp::min(self.delay * 2, 60); RunResult::Reconnect } } |
︙ | ︙ |
Changes to www/changelog.md.
1 2 3 4 | # Change Log ## [Unreleased] | > > | > > > > > > > > > > > > > > > > > | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | # Change Log ⚠️ indicates a breaking change. ## [Unreleased] [Details](/vdiff?from=schmoozer-0.4.0&to=trunk) ### Added ### Changed ### Removed --- ## [0.4.0] - 2024-10-03 [Details](/vdiff?from=schmoozer-0.3.0&to=schmoozer-0.4.0) ### Added - Add `tracing` feature, and use it to trace log the connection loop. - ⚠️ The generic error type used for the `run()` function's `Result` now has a `std::fmt::Debug` bound. ### Changed - ⚠️ Unify `Terminate` and `Fail` variants from `ConnResult` and `RunResult` into `ConnResult::Exit(Result<(), E>)` and `RunResult::Exit(Result<(), E>)`. --- ## [0.3.0] - 2024-10-03 [Details](/vdiff?from=schmoozer-0.2.0&to=schmoozer-0.3.0) ### Changed - ⚠️ The `Connector::connect()`, `Connector::run()` return special-purpose return types `ConnResult` and `RunResult`, because the previous mix of `ControlFlow` and `Result` was too confusing. --- ## [0.2.0] - 2024-09-16 |
︙ | ︙ |