9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
|
-
-
-
+
+
+
-
-
+
+
-
-
+
-
-
-
+
-
-
+
+
+
+
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
+
+
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
+
+
+
+
+
+
-
-
+
+
-
-
+
-
+
+
+
+
+
+
+
+
+
+
+
-
+
-
+
-
+
+
+
+
-
-
+
+
-
+
-
+
+
-
+
-
+
-
+
+
-
+
-
+
|
//! been established.
//!
//! Perhaps paradoxically the [`run()`] function does not itself actually
//! attempt to establish any connections -- it relies on the `Connector` trait
//! to implement the means to establish connections.
//!
//! The "good path" overall flow of the connector loop is to call the
//! `connect()` method. If it is successful, call the `run()` method, passing
//! along the newly allocated connection. The main application logic relating
//! to the connection should called from this method.
//! `connect()` trait method. If it is successful, call the trait's `run()`
//! method, passing along the newly allocated connection. The main application
//! logic relating to the connection should implemented in this method.
//!
//! The primary purpose of the connector concerns the "failure path": If the
//! `connect()` method encounters a failure it can choose to signal to back to
//! the connector loop that the error is "retryable", in which case the
//! `retry_delay()` method is called to determine if the connector loop should
//! retry (and implement a delay before returning instructions to do so).
//!
//! Likewise, the `run()` method returns its `Result<(), Self::Error>` wrapped
//! in a `ControlFlow::Continue(_)` to indicate that the connector look should
//! Likewise, the [`Connector::run()`] trait method returns its [`RunResult`]
//! to indicate whether the connector should reconnect or exit, either
//! reconnect, while `ControlFlow::Break(_)` signals that a fatal error occured
//! and the connect loop should terminate.
//! successfully or with an error.
#![cfg_attr(docsrs, feature(doc_cfg))]
#[cfg(feature = "tcpconn")]
#[cfg_attr(docsrs, doc(cfg(feature = "tcpconn")))]
pub mod tcpconn;
use std::ops::ControlFlow;
pub use async_trait::async_trait;
#[cfg(feature = "tcpconn")]
pub use tcpconn::SimpleTcpConnector;
/// Application callbacks for the [`run()`] function (or equivalent).
#[async_trait]
pub trait Connector {
/// The connection type.
type ConnType: Send;
/// The application error return type.
type Error: Send;
/// Establish a connection.
/// Attempt to establish a connection.
///
/// If a connection was successfully established the implementation should
/// return `Ok(Self::ConnType)`.
/// If a connection was successfully established the implementation returns
/// [`ConnResult::Connected`], which will instruct the
/// connector loop in [`run()`] to call the [`Connector::run()`]
/// implementation.
///
/// If an error is returned as `Err(ControlFlow::Continue(Self::Error))` it
/// signals to the connector loop that the error is non-fatal, and that the
/// connection should be retried. Returning an error as
/// `Err(ControlFlow::Break(Self::Error))` signals that there's no point in
/// If the implementation detects termination condition (such as a user
/// request to terminate the application), the handler returns
/// [`ConnResult::Terminate`], which will cause [`run()`]'s connection loop
/// to terminate and return `Ok(())`.
///
/// The implementation returns [`ConnResult::Reconnect`] to signal that some
/// kind of retryable failure occurred. The connector loop in [`run()`] will
/// call the [`Connector::retry_delay()`] to check if it should attempt a
/// reconnection, and delay before doing so.
///
/// # Errors
/// If a fatal error occurs that is not retryable the implementation returns
/// [`ConnResult::Fail(Self::Error)`]. This will cause the connection loop
/// trying to (re)connect (with the same configuration) and the
/// (re)connection loop is terminated.
async fn connect(
/// in [`run()`] to terminate and return `Err(E)`.
async fn connect(&mut self) -> ConnResult<Self::ConnType, Self::Error>;
&mut self
) -> Result<Self::ConnType, ControlFlow<Self::Error, Self::Error>>;
/// Give application a chance to determine whether or not to attempt a
/// reconnection, and delay before doing so.
///
/// Implementations return `ControlFlow::Continue(())` to signal to the
/// connector loop that it should retry the connection. Returnning
/// `ControlFlow::Break(Self::Error)` will terminate the connector loop and
/// cause it to return the error.
async fn retry_delay(&mut self) -> ControlFlow<Self::Error>;
/// This implementation is called either when `Connection::connect()` or
/// `Connector::run()` return `ConnResult::Reconnect` or
/// `RunResult::Reconnect`.
///
/// The application should return [`RunResult::Reconnect`] to instruct the
/// connector loop in [`run()`] to call [`Connector::connect()`] again to
/// attempt to establish a connection.
///
/// If a the application has encountered a, successful, exit state, this
/// implementation returns [`RunResult::Terminate`].
///
/// # Errors
/// If the run implementation encounters a fatal error that should terminate
/// the connection loop and return from [`run()`] with an error,
/// [`RunResult::Fail`] is returned.
async fn retry_delay(&mut self) -> RunResult<Self::Error>;
/// Run the application's connection handler.
///
/// The application should return `ControlFlow::Continue(_)` to request that
/// the connector loop delay and reconnect. Returning
/// `ControlFlow::Break(_)` will cause the connect loop to terminate and
/// return the supplied result.
async fn run(
&mut self,
conn: Self::ConnType
) -> ControlFlow<Result<(), Self::Error>, Result<(), Self::Error>>;
}
/// The application should return [`RunResult::Reconnect`] to instruct the
/// connector loop in [`run()`] to attempt a reconnect.
///
/// If a the application has received some signal to terminate, successfully,
/// this implementation returns [`RunResult::Terminate`].
///
/// # Errors
/// If the run implementation encounters a fatal error that should terminate
/// the connection loop and return from [`run()`] with an error,
/// [`RunResult::Fail`] is returned.
async fn run(&mut self, conn: Self::ConnType) -> RunResult<Self::Error>;
}
/// Special-purpose result returned by [`Connector::connect()`].
pub enum ConnResult<C, E> {
/// The connection was successful.
///
/// Run the [`Connector::run()`] with the connection `C`.
Connected(C),
/// Connection could not be established.
///
/// Call [`Connector::retry_delay()`] to check if reconnection attempts has
/// been exhaused and, if applicable, delay before reconnection attempt.
Reconnect,
/// Terminate reconnection loop, indicating successful termination.
Terminate,
/// Terminate the reconnection loop with an error.
Fail(E)
}
/// Returned by [`Connector::run()`]
pub enum RunResult<E> {
/// Attempt to reconnect.
Reconnect,
/// Terminate reconnection loop, indicating successful termination.
Terminate,
/// Terminate the reconnection loop with an error.
Fail(E)
}
/// Establish and process a network connection.
///
/// The `run()` function will enter a loop that will attempt to establish a
/// connection by calling the [`Connector::connect()`] implementation. If a
/// connection is successfully established the connector loop will call the
/// connection and call the `Connector::run()` implementation once succesful.
/// If the connection fails, `Connector::retry_delay()` will be
/// called to determine whether to retry the connection or abort and return.
/// [`Connector::run()`] implementation.
///
/// The main purpose of the connector loop is that is either the `connect()` or
/// the `run()` trait implementations fails in a retryable manner,
/// [`Connector::retry_delay()`] will be called to determine whether to retry
/// the connection or abort and return.
///
/// # Exit conditions
/// The (re)connection loop will keep running until an exit condition has been
/// # Success exit conditions
/// The (re)connection loop will exit with `Ok(())` if:
/// triggered:
/// - [`Connector::connect()`] returns `Err(ControlFlow::Break(Self::Error))`
/// - [`Connector::connect()`] returns [`ConnResult::Terminate`]
/// - [`Connector::retry_delay()`] returns `ControlFlow::Break(Self::Error)`
/// - [`Connector::run()`] returns `ControlFlow::Break(_)`
/// - [`Connector::run()`] returns [`RunResult::Terminate`]
///
/// # Errors
/// - If the [`Connector::connect()`] implementation returns
/// [`ConnResult::Fail`], this function will return `Err(E)`, where `E` is
/// the `Connector`'s `Error` type.
/// - If the [`Connector::run()`] implementation returns [`RunResult::Fail`],
/// this function will return `Err(E)`, where `E` is the `Connector`'s
/// `Error` type.
/// - If the [`Connector::retry_delay()`] implementation returns
/// `ControlFlow::Break(Self::Error)`.
#[allow(clippy::missing_errors_doc)]
pub async fn run<E>(
mut connector: impl Connector<Error = E> + Send
) -> Result<(), E>
where
E: Send
{
loop {
// Call the application's connect callback to attempt to establish
// connection.
match connector.connect().await {
Ok(conn) => {
ConnResult::Connected(conn) => {
// A connection was successfully established -- call the run()
// implementation.
match connector.run(conn).await {
ControlFlow::Continue(_res) => {
RunResult::Reconnect => {
// The application has requested a reconnection.
// Fall through to retry()/delay()
// Fall through to retry_delay()
}
RunResult::Terminate => {
break Ok(());
}
ControlFlow::Break(res) => {
// Break out of loop -- passing along the result from the
RunResult::Fail(e) => {
// Break out of loop -- passing along the error from the
// application.
break res;
break Err(e);
}
}
}
Err(ControlFlow::Continue(_res)) => {
ConnResult::Reconnect => {
// The connector returned a retriable error
// fall through to retry()/delay()
}
ConnResult::Terminate => break Ok(()),
Err(ControlFlow::Break(res)) => {
ConnResult::Fail(e) => {
// The connector returned a fatal error
break Err(res);
break Err(e);
}
}
// If this point is reached the application has requested a reconnection.
// Call `retry_delay()` to allow the application to determine whether to
// retry or not.
match connector.retry_delay().await {
ControlFlow::Continue(()) => {
RunResult::Reconnect => {
// Application wants to reconnect.
continue;
}
RunResult::Terminate => break Ok(()),
ControlFlow::Break(err) => {
RunResult::Fail(e) => {
// Application does not want to reconnect
break Err(err);
break Err(e);
}
}
}
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
|