Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From schmoozer-0.2.0 To schmoozer-0.3.0
2024-10-03
| ||
13:53 | Docs update. check-in: b68521df75 user: jan tags: trunk | |
01:08 | Release maintenance. check-in: 466bd7759a user: jan tags: schmoozer-0.3.0, trunk | |
00:59 | Make delay_retry() return RunResult as well. check-in: bade962274 user: jan tags: trunk | |
00:30 | Change trait methods return types. Add ConnResult and RunResult. check-in: 2924879fbf user: jan tags: trunk | |
2024-09-16
| ||
05:25 | Style fixups. check-in: de2e43c6de user: jan tags: schmoozer-0.2.0, trunk | |
05:24 | Exclude backon.toml from packaging. check-in: 42c4c94c32 user: jan tags: trunk | |
Changes to Cargo.toml.
1 2 | [package] name = "schmoozer" | | | 1 2 3 4 5 6 7 8 9 10 | [package] name = "schmoozer" version = "0.3.0" edition = "2021" license = "0BSD" # https://crates.io/category_slugs categories = [ "network-programming" ] keywords = [ "connector", "network", "tokio" ] repository = "https://repos.qrnch.tech/pub/schmoozer" description = "A simple abstraction over a retryable async operation, such as establishing a connection" |
︙ | ︙ | |||
23 24 25 26 27 28 29 | [badges] maintenance = { status = "actively-developed" } [features] tcpconn = ["dep:killswitch", "tokio/macros", "tokio/net", "tokio/time"] [dependencies] | | | 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | [badges] maintenance = { status = "actively-developed" } [features] tcpconn = ["dep:killswitch", "tokio/macros", "tokio/net", "tokio/time"] [dependencies] async-trait = { version = "0.1.83" } killswitch = { version = "0.4.2", optional = true } tokio = { version = "1.40.0", optional = true } [dev-dependencies] tokio = { version = "1.40.0", features = [ "macros", "net", "rt-multi-thread", "time" ] } |
︙ | ︙ |
Changes to examples/net.rs.
|
| | | | 1 2 3 4 5 6 7 8 9 10 11 12 | use std::{env, io::ErrorKind, time::Duration}; pub use tokio::net::TcpStream; use schmoozer::{async_trait, ConnResult, Connector, RunResult}; pub struct TcpConnector { addr: String, delay: usize } impl TcpConnector { |
︙ | ︙ | |||
20 21 22 23 24 25 26 | } #[async_trait] impl Connector for TcpConnector { type Error = std::io::Error; type ConnType = TcpStream; | | < < | | | | | > < < | < | | 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | } #[async_trait] impl Connector for TcpConnector { type Error = std::io::Error; type ConnType = TcpStream; async fn connect(&mut self) -> ConnResult<Self::ConnType, Self::Error> { println!("Connecting to {}", self.addr); let res = TcpStream::connect(&self.addr).await; match res { Ok(conn) => ConnResult::Connected(conn), Err(e) => match e.kind() { ErrorKind::ConnectionRefused | ErrorKind::ConnectionAborted | ErrorKind::NotConnected | ErrorKind::TimedOut => { println!("Retryable error: {e}"); ConnResult::Reconnect } _ => { println!("Fatal error: {e}"); ConnResult::Fail(e) } } } } async fn retry_delay(&mut self) -> RunResult<Self::Error> { let dur = Duration::from_secs(self.delay.try_into().unwrap()); println!("Retrying in {dur:?} .."); tokio::time::sleep(dur).await; // for next iteration double sleep duration for each iteration, but cap at // 60 seconds self.delay = std::cmp::min(self.delay * 2, 60); RunResult::Reconnect } async fn run(&mut self, _conn: Self::ConnType) -> RunResult<Self::Error> { // reset delay duration when connection was successful self.delay = 1; // // Run connection logic here .. // RunResult::Terminate } } #[tokio::main] async fn main() { let args: Vec<String> = env::args().skip(1).collect(); let connector = TcpConnector::new(&args[0]); schmoozer::run(connector).await.unwrap(); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to examples/tcpconn.rs.
1 2 | #[cfg(feature = "tcpconn")] mod inner { | | > | > > | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | #[cfg(feature = "tcpconn")] mod inner { use std::env; use schmoozer::{ tcpconn::{KillSwitch, SimpleTcpConnector, TcpStream}, RunResult }; pub async fn main() { let args: Vec<String> = env::args().skip(1).collect(); let ks = KillSwitch::new(); let connector = SimpleTcpConnector::new(&args[0], ks, Box::new(proc_connection)); schmoozer::run(connector).await.unwrap(); } async fn proc_connection( _strm: TcpStream, _ks: KillSwitch ) -> RunResult<std::io::Error> { RunResult::Reconnect } } #[cfg(feature = "tcpconn")] #[tokio::main] async fn main() { inner::main().await; |
︙ | ︙ |
Changes to src/lib.rs.
︙ | ︙ | |||
9 10 11 12 13 14 15 | //! been established. //! //! Perhaps paradoxically the [`run()`] function does not itself actually //! attempt to establish any connections -- it relies on the `Connector` trait //! to implement the means to establish connections. //! //! The "good path" overall flow of the connector loop is to call the | | | | | | < | < < | | > > | | > > > > > | > | > > > | < | | < < | > > > > | > > > > > > > | | | | > > > > > > > > > > > | | > > > > > > | > > > > > | > > | > > > | > > > > | > > | > > | > > > | > > > | | | | < | | > > > > > > > > > > | | | > > > | | | | > | | | > | | | 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 | //! been established. //! //! Perhaps paradoxically the [`run()`] function does not itself actually //! attempt to establish any connections -- it relies on the `Connector` trait //! to implement the means to establish connections. //! //! The "good path" overall flow of the connector loop is to call the //! `connect()` trait method. If it is successful, call the trait's `run()` //! method, passing along the newly allocated connection. The main application //! logic relating to the connection should implemented in this method. //! //! The primary purpose of the connector concerns the "failure path": If the //! `connect()` method encounters a failure it can choose to signal to back to //! the connector loop that the error is "retryable", in which case the //! `retry_delay()` method is called to determine if the connector loop should //! retry (and implement a delay before returning instructions to do so). //! //! Likewise, the [`Connector::run()`] trait method returns its [`RunResult`] //! to indicate whether the connector should reconnect or exit, either //! successfully or with an error. #![cfg_attr(docsrs, feature(doc_cfg))] #[cfg(feature = "tcpconn")] #[cfg_attr(docsrs, doc(cfg(feature = "tcpconn")))] pub mod tcpconn; pub use async_trait::async_trait; #[cfg(feature = "tcpconn")] pub use tcpconn::SimpleTcpConnector; /// Application callbacks for the [`run()`] function (or equivalent). #[async_trait] pub trait Connector { /// The connection type. type ConnType: Send; /// The application error return type. type Error: Send; /// Attempt to establish a connection. /// /// If a connection was successfully established the implementation returns /// [`ConnResult::Connected`], which will instruct the /// connector loop in [`run()`] to call the [`Connector::run()`] /// implementation. /// /// If the implementation detects termination condition (such as a user /// request to terminate the application), the handler returns /// [`ConnResult::Terminate`], which will cause [`run()`]'s connection loop /// to terminate and return `Ok(())`. /// /// The implementation returns [`ConnResult::Reconnect`] to signal that some /// kind of retryable failure occurred. The connector loop in [`run()`] will /// call the [`Connector::retry_delay()`] to check if it should attempt a /// reconnection, and delay before doing so. /// /// # Errors /// If a fatal error occurs that is not retryable the implementation returns /// [`ConnResult::Fail(Self::Error)`]. This will cause the connection loop /// in [`run()`] to terminate and return `Err(E)`. async fn connect(&mut self) -> ConnResult<Self::ConnType, Self::Error>; /// Give application a chance to determine whether or not to attempt a /// reconnection, and delay before doing so. /// /// This implementation is called either when `Connection::connect()` or /// `Connector::run()` return `ConnResult::Reconnect` or /// `RunResult::Reconnect`. /// /// The application should return [`RunResult::Reconnect`] to instruct the /// connector loop in [`run()`] to call [`Connector::connect()`] again to /// attempt to establish a connection. /// /// If a the application has encountered a, successful, exit state, this /// implementation returns [`RunResult::Terminate`]. /// /// # Errors /// If the run implementation encounters a fatal error that should terminate /// the connection loop and return from [`run()`] with an error, /// [`RunResult::Fail`] is returned. async fn retry_delay(&mut self) -> RunResult<Self::Error>; /// Run the application's connection handler. /// /// The application should return [`RunResult::Reconnect`] to instruct the /// connector loop in [`run()`] to attempt a reconnect. /// /// If a the application has received some signal to terminate, successfully, /// this implementation returns [`RunResult::Terminate`]. /// /// # Errors /// If the run implementation encounters a fatal error that should terminate /// the connection loop and return from [`run()`] with an error, /// [`RunResult::Fail`] is returned. async fn run(&mut self, conn: Self::ConnType) -> RunResult<Self::Error>; } /// Special-purpose result returned by [`Connector::connect()`]. pub enum ConnResult<C, E> { /// The connection was successful. /// /// Run the [`Connector::run()`] with the connection `C`. Connected(C), /// Connection could not be established. /// /// Call [`Connector::retry_delay()`] to check if reconnection attempts has /// been exhaused and, if applicable, delay before reconnection attempt. Reconnect, /// Terminate reconnection loop, indicating successful termination. Terminate, /// Terminate the reconnection loop with an error. Fail(E) } /// Returned by [`Connector::run()`] pub enum RunResult<E> { /// Attempt to reconnect. Reconnect, /// Terminate reconnection loop, indicating successful termination. Terminate, /// Terminate the reconnection loop with an error. Fail(E) } /// Establish and process a network connection. /// /// The `run()` function will enter a loop that will attempt to establish a /// connection by calling the [`Connector::connect()`] implementation. If a /// connection is successfully established the connector loop will call the /// [`Connector::run()`] implementation. /// /// The main purpose of the connector loop is that is either the `connect()` or /// the `run()` trait implementations fails in a retryable manner, /// [`Connector::retry_delay()`] will be called to determine whether to retry /// the connection or abort and return. /// /// # Success exit conditions /// The (re)connection loop will exit with `Ok(())` if: /// - [`Connector::connect()`] returns [`ConnResult::Terminate`] /// - [`Connector::retry_delay()`] returns `ControlFlow::Break(Self::Error)` /// - [`Connector::run()`] returns [`RunResult::Terminate`] /// /// # Errors /// - If the [`Connector::connect()`] implementation returns /// [`ConnResult::Fail`], this function will return `Err(E)`, where `E` is /// the `Connector`'s `Error` type. /// - If the [`Connector::run()`] implementation returns [`RunResult::Fail`], /// this function will return `Err(E)`, where `E` is the `Connector`'s /// `Error` type. /// - If the [`Connector::retry_delay()`] implementation returns /// `ControlFlow::Break(Self::Error)`. #[allow(clippy::missing_errors_doc)] pub async fn run<E>( mut connector: impl Connector<Error = E> + Send ) -> Result<(), E> where E: Send { loop { // Call the application's connect callback to attempt to establish // connection. match connector.connect().await { ConnResult::Connected(conn) => { // A connection was successfully established -- call the run() // implementation. match connector.run(conn).await { RunResult::Reconnect => { // The application has requested a reconnection. // Fall through to retry_delay() } RunResult::Terminate => { break Ok(()); } RunResult::Fail(e) => { // Break out of loop -- passing along the error from the // application. break Err(e); } } } ConnResult::Reconnect => { // The connector returned a retriable error // fall through to retry()/delay() } ConnResult::Terminate => break Ok(()), ConnResult::Fail(e) => { // The connector returned a fatal error break Err(e); } } // If this point is reached the application has requested a reconnection. // Call `retry_delay()` to allow the application to determine whether to // retry or not. match connector.retry_delay().await { RunResult::Reconnect => { // Application wants to reconnect. continue; } RunResult::Terminate => break Ok(()), RunResult::Fail(e) => { // Application does not want to reconnect break Err(e); } } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/tcpconn.rs.
1 2 3 | //! A simple [`Connector`] implementation that specifically attempts to connect //! to a TCP server and run an async function once successful. | | | > > > > > | < < < < < | < < < < < | < < < < < | < < | | | < < | | < | < > | < < < | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | //! A simple [`Connector`] implementation that specifically attempts to connect //! to a TCP server and run an async function once successful. use std::{future::Future, io::ErrorKind, time::Duration}; pub use tokio::net::TcpStream; pub use killswitch::KillSwitch; use super::{async_trait, ConnResult, Connector, RunResult}; /// Simple TCP (re)connector. /// /// When run, the `Connector` implementation will attempt to establish a /// [`TcpStream`] connection and then call an application-specified `Future` /// (returned by a closure). pub struct SimpleTcpConnector<F> where F: Future<Output = RunResult<std::io::Error>> { addr: String, delay: usize, ks: KillSwitch, cb: Box<dyn Fn(TcpStream, KillSwitch) -> F + Send> } impl<F> SimpleTcpConnector<F> where F: Future<Output = RunResult<std::io::Error>> { #[allow(clippy::needless_pass_by_value)] pub fn new( addr: impl ToString, ks: KillSwitch, cb: Box<dyn Fn(TcpStream, KillSwitch) -> F + Send> ) -> Self { Self { addr: addr.to_string(), delay: 1, ks, cb } } } #[async_trait] impl<F> Connector for SimpleTcpConnector<F> where F: Future<Output = RunResult<std::io::Error>> + Send { type Error = std::io::Error; type ConnType = TcpStream; async fn connect(&mut self) -> ConnResult<Self::ConnType, Self::Error> { tokio::select! { res = TcpStream::connect(&self.addr) => { match res { Ok(conn) => ConnResult::Connected(conn), Err(e) => match e.kind() { ErrorKind::ConnectionRefused | ErrorKind::ConnectionAborted | ErrorKind::NotConnected | ErrorKind::TimedOut => { ConnResult::Reconnect } _ => ConnResult::Fail(e) } } } () = self.ks.wait() => { ConnResult::Terminate } } } async fn retry_delay(&mut self) -> RunResult<Self::Error> { let dur = Duration::from_secs(self.delay.try_into().unwrap()); tokio::select! { () = self.ks.wait() => { RunResult::Terminate } () = tokio::time::sleep(dur) => { // double sleep duration for each iteration, but cap at 60 seconds self.delay = std::cmp::min(self.delay * 2, 60); RunResult::Reconnect } } } async fn run(&mut self, conn: Self::ConnType) -> RunResult<Self::Error> { // reset delay self.delay = 1; let fut = (self.cb)(conn, self.ks.clone()); fut.await } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to www/changelog.md.
1 2 3 4 | # Change Log ## [Unreleased] | | > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | # Change Log ## [Unreleased] [Details](/vdiff?from=schmoozer-0.3.0&to=trunk) ### Added ### Changed ### Removed --- ## [0.3.0] - 2024-10-03 [Details](/vdiff?from=schmoozer-0.2.0&to=schmoozer-0.3.0) ### Changed - The `Connector::connect()`, `Connector::run()` return special-purpose return types `ConnResult` and `RunResult`, because the previous mix of `ControlFlow` and `Result` was too confusing. --- ## [0.2.0] - 2024-09-16 [Details](/vdiff?from=schmoozer-0.1.2&to=schmoozer-0.2.0) ### Changed |
︙ | ︙ |