1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
|
-
+
-
-
+
+
+
-
+
-
-
-
+
+
+
+
+
+
+
+
+
-
+
+
+
+
-
-
+
+
-
-
+
-
-
-
+
+
-
-
+
+
-
-
+
+
+
-
+
-
-
+
+
-
-
+
+
+
-
+
-
-
+
+
-
-
-
-
+
-
+
-
-
-
-
+
-
+
-
-
-
-
+
+
+
+
+
+
+
-
-
-
-
-
+
+
+
+
+
-
-
-
+
-
-
-
+
+
-
-
-
+
+
+
+
+
+
+
+
+
-
-
-
+
+
-
-
-
-
+
+
+
+
+
+
+
+
-
-
-
-
+
+
+
+
+
+
+
+
+
-
-
-
-
+
+
+
+
+
+
+
|
//! _schmoozer_ is intended to be used as an `async` (re)connector. It
//! consists of two parts:
//! consists of two primary parts:
//! - The [`Connector`] trait is implemented by applications/libraries that
//! need to run retryable connection loops.
//! - [`run()`] is a function that takes in a `Connector` implementation, and
//! attempts to establish a connection, delaying and retrying on failures
//! that the callback reports as retriable, and calls the
//! [`Connector::run()`] trait method once a connection has been successfully
//! been established.
//!
//! Perhaps paradoxically the [`run()`] function does not itself actually
//! attempt to establish any connections -- it relies on the `Connector` trait
//! to implement the means to establish connections.
//! attempt to establish any connections -- it relies on the
//! [`Connector::connect()`] trait method implementation to establish
//! connections.
//!
//! The "good path" overall flow of the connector loop is to call the
//! `connect()` trait method. If it is successful, call the trait's `run()`
//! `connect()` trait method. If it is successful, call the trait's `run()`
//! method, passing along the newly allocated connection. The main application
//! logic relating to the connection should implemented in this method.
//!
//! The primary purpose of the connector concerns the "failure path": If the
//! `connect()` method encounters a failure it can choose to signal to back to
//! the connector loop that the error is "retryable", in which case the
//! The primary purpose of the connector concerns the "retryable failure path":
//! If the `connect()` method encounters a failure it can choose to signal to
//! back to the connector loop that the error is "retryable", in which case the
//! `retry_delay()` method is called to determine if the connector loop should
//! retry (and implement a delay before returning instructions to do so).
//!
//! Likewise, the [`Connector::run()`] trait method returns its [`RunResult`]
//! to indicate whether the connector should reconnect or exit, either
//! successfully or with an error.
//!
//! # Features
//! | Feature | Function
//! |-----------|----------
//! | `tcpconn` | Enable support for a simple TCP (re)connector.
//! | `tracing` | Make the connector loop generator tracing logs.
#![cfg_attr(docsrs, feature(doc_cfg))]
#[cfg(feature = "tcpconn")]
#[cfg_attr(docsrs, doc(cfg(feature = "tcpconn")))]
pub mod tcpconn;
pub use async_trait::async_trait;
#[cfg(feature = "tcpconn")]
pub use tcpconn::SimpleTcpConnector;
/// Application callbacks for the [`run()`] function (or equivalent).
#[async_trait]
pub trait Connector {
/// The connection type.
/// The connection type that the `Connector::connect()` implementor spawns.
///
/// Once created, an instance of this type will be passed to the
/// `Connector::run()` implementation.
type ConnType: Send;
/// The application error return type.
type Error: Send;
/// Attempt to establish a connection.
///
/// If a connection was successfully established the implementation returns
/// [`ConnResult::Connected`], which will instruct the
/// connector loop in [`run()`] to call the [`Connector::run()`]
/// [`ConnResult::Connected`], which will instruct the connector loop in
/// [`run()`] to call the [`Connector::run()`] implementation.
/// implementation.
///
/// If the implementation detects termination condition (such as a user
/// If the implementation detects a termination condition the handler returns
/// request to terminate the application), the handler returns
/// [`ConnResult::Terminate`], which will cause [`run()`]'s connection loop
/// to terminate and return `Ok(())`.
/// [`ConnResult::Exit`], which will cause [`run()`]'s connection loop
/// to terminate and return the result passed along with `ConnResult::Exit`.
///
/// The implementation returns [`ConnResult::Reconnect`] to signal that some
/// kind of retryable failure occurred. The connector loop in [`run()`] will
/// call the [`Connector::retry_delay()`] to check if it should attempt a
/// reconnection, and delay before doing so.
///
/// # Errors
/// If a fatal error occurs that is not retryable the implementation returns
/// [`ConnResult::Fail(Self::Error)`]. This will cause the connection loop
/// in [`run()`] to terminate and return `Err(E)`.
/// [`ConnResult::Exit`] with an `Err(E)`, which will be returned by the
/// connector loop function.
async fn connect(&mut self) -> ConnResult<Self::ConnType, Self::Error>;
/// Give application a chance to determine whether or not to attempt a
/// reconnection, and delay before doing so.
///
/// This implementation is called either when `Connection::connect()` or
/// `Connector::run()` return `ConnResult::Reconnect` or
/// `RunResult::Reconnect`.
///
/// The application should return [`RunResult::Reconnect`] to instruct the
/// connector loop in [`run()`] to call [`Connector::connect()`] again to
/// attempt to establish a connection.
///
/// If a the application has encountered a, successful, exit state, this
/// implementation returns [`RunResult::Terminate`].
/// If the implementation detects a termination condition the handler returns
/// [`RunResult::Exit`], which will cause [`run()`]'s connection loop
/// to terminate and return the result passed along with `RunResult::Exit`.
///
/// # Errors
/// If the run implementation encounters a fatal error that should terminate
/// If a fatal error occurs that is not retryable the implementation returns
/// the connection loop and return from [`run()`] with an error,
/// [`RunResult::Fail`] is returned.
/// [`RunResult::Exit`] with an `Err(E)`, which will be returned by the
/// connector loop function.
async fn retry_delay(&mut self) -> RunResult<Self::Error>;
/// Run the application's connection handler.
///
/// The application should return [`RunResult::Reconnect`] to instruct the
/// connector loop in [`run()`] to attempt a reconnect.
///
/// If a the application has received some signal to terminate, successfully,
/// this implementation returns [`RunResult::Terminate`].
/// If the implementation detects a termination condition the handler returns
/// [`RunResult::Exit`], which will cause [`run()`]'s connection loop
/// to terminate and return the result passed along with `RunResult::Exit`.
///
/// # Errors
/// If the run implementation encounters a fatal error that should terminate
/// If a fatal error occurs that is not retryable the implementation returns
/// the connection loop and return from [`run()`] with an error,
/// [`RunResult::Fail`] is returned.
/// [`RunResult::Exit`] with an `Err(E)`, which will be returned by the
/// connector loop function.
async fn run(&mut self, conn: Self::ConnType) -> RunResult<Self::Error>;
}
/// Special-purpose result returned by [`Connector::connect()`].
pub enum ConnResult<C, E> {
/// The connection was successful.
///
/// Run the [`Connector::run()`] with the connection `C`.
Connected(C),
/// Connection could not be established.
///
/// Call [`Connector::retry_delay()`] to check if reconnection attempts has
/// been exhaused and, if applicable, delay before reconnection attempt.
Reconnect,
/// Terminate reconnection loop, indicating successful termination.
Terminate,
/// Terminate the reconnection loop with an error.
/// Terminate the reconnection loop.
Fail(E)
Exit(Result<(), E>)
}
/// Returned by [`Connector::run()`]
pub enum RunResult<E> {
/// Attempt to reconnect.
Reconnect,
/// Terminate reconnection loop, indicating successful termination.
Terminate,
/// Terminate the reconnection loop with an error.
/// Terminate the reconnection loop.
Fail(E)
Exit(Result<(), E>)
}
/// Establish and process a network connection.
///
/// The `run()` function will enter a loop that will attempt to establish a
/// connection by calling the [`Connector::connect()`] implementation. If a
/// connection is successfully established the connector loop will call the
/// [`Connector::run()`] implementation.
///
/// The main purpose of the connector loop is that is either the `connect()` or
/// the `run()` trait implementations fails in a retryable manner,
/// [`Connector::retry_delay()`] will be called to determine whether to retry
/// the connection or abort and return.
/// The main purpose of the connector loop to handle connection retry requests
/// from either the `connect()` or the `run()` trait implementations
/// (presumably because they failed in a retryable manner). If a reconnection
/// request is returned [`Connector::retry_delay()`] will be called to allow
/// the application to implement its own logic to determine whether the
/// reconnection shoulld proceed and optionally adding a delay before the
/// reconnection attempt.
///
/// # Success exit conditions
/// The (re)connection loop will exit with `Ok(())` if:
/// - [`Connector::connect()`] returns [`ConnResult::Terminate`]
/// - [`Connector::retry_delay()`] returns `ControlFlow::Break(Self::Error)`
/// - [`Connector::run()`] returns [`RunResult::Terminate`]
/// # Exit conditions
/// The (re)connection loop will exit if:
/// - [`Connector::connect()`] returns [`ConnResult::Exit`]
/// - [`Connector::retry_delay()`] returns [`RunResult::Exit`]
/// - [`Connector::run()`] returns [`RunResult::Exit`]
///
/// # Errors
/// - If the [`Connector::connect()`] implementation returns
/// [`ConnResult::Fail`], this function will return `Err(E)`, where `E` is
/// the `Connector`'s `Error` type.
/// If any of the `Connector`'s callbacks return `ConnResult::Exit(Err(_))` or
/// - If the [`Connector::run()`] implementation returns [`RunResult::Fail`],
/// this function will return `Err(E)`, where `E` is the `Connector`'s
/// `Error` type.
/// `RunResult::Exit(Err(_))` this function will return the error back to the
/// caller.
/// - If the [`Connector::retry_delay()`] implementation returns
/// `ControlFlow::Break(Self::Error)`.
#[allow(clippy::missing_errors_doc)]
pub async fn run<E>(
mut connector: impl Connector<Error = E> + Send
) -> Result<(), E>
where
E: Send
E: Send + std::fmt::Debug
{
#[cfg(feature = "tracing")]
tracing::info!("Enter (re)connection loop");
loop {
// Call the application's connect callback to attempt to establish
// connection.
#[cfg(feature = "tracing")]
tracing::info!("Attempt to establish connection");
match connector.connect().await {
ConnResult::Connected(conn) => {
// A connection was successfully established -- call the run()
// implementation.
#[cfg(feature = "tracing")]
tracing::info!(
"Got connection -- call application connection handler"
);
match connector.run(conn).await {
RunResult::Reconnect => {
// The application has requested a reconnection.
// Fall through to retry_delay()
}
RunResult::Terminate => {
break Ok(());
#[cfg(feature = "tracing")]
tracing::debug!("Connector::run() requested reconnection");
}
RunResult::Fail(e) => {
// Break out of loop -- passing along the error from the
// application.
break Err(e);
RunResult::Exit(res) => {
#[cfg(feature = "tracing")]
tracing::info!(
"Connector::connect() requested termination: {res:?}"
);
break res;
}
}
}
ConnResult::Reconnect => {
// The connector returned a retriable error
// fall through to retry()/delay()
#[cfg(feature = "tracing")]
tracing::debug!("Connector::connect() requested reconnection");
}
ConnResult::Terminate => break Ok(()),
ConnResult::Fail(e) => {
// The connector returned a fatal error
break Err(e);
ConnResult::Exit(res) => {
// Terminate reconnection loop
#[cfg(feature = "tracing")]
tracing::info!("Connector::connect() requested termination; {res:?}");
break res;
}
}
// If this point is reached the application has requested a reconnection.
// Call `retry_delay()` to allow the application to determine whether to
// retry or not.
#[cfg(feature = "tracing")]
tracing::info!("Call retry/delay callback");
match connector.retry_delay().await {
RunResult::Reconnect => {
// Application wants to reconnect.
#[cfg(feature = "tracing")]
tracing::debug!("Connector::retry_delay() requested reconnection");
continue;
}
RunResult::Terminate => break Ok(()),
RunResult::Fail(e) => {
// Application does not want to reconnect
break Err(e);
RunResult::Exit(res) => {
// Terminate reconnection loop
#[cfg(feature = "tracing")]
tracing::info!(
"Connector::retry_delay() requested termination: {res:?}"
);
break res;
}
}
}
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
|