lstngrp

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From lstngrp-0.0.3 To lstngrp-0.0.4

2024-10-02
15:49
Update change log. check-in: 77a8709ec0 user: jan tags: trunk
2024-09-20
13:04
Fix docs. check-in: 328cbfb7c7 user: jan tags: lstngrp-0.0.4, trunk
12:56
Update idbag to 0.2.0. check-in: 45e048243f user: jan tags: trunk
2024-09-18
10:23
Generate links to definition on docs.rs. check-in: ea1e0ea1f0 user: jan tags: trunk
10:21
Release maintenance. check-in: 86bd36e830 user: jan tags: lstngrp-0.0.3, trunk
10:13
Exclude examples from packaging. check-in: 2b0bfea373 user: jan tags: trunk

Changes to Cargo.toml.

1
2
3
4
5
6
7
8
9
10
[package]
name = "lstngrp"
version = "0.0.3"
edition = "2021"
license = "0BSD"
# https://crates.io/category_slugs
categories = [ "network-programming", "asynchronous" ]
keywords = [ "network", "server", "listen", "protwrap" ]
repository = "https://repos.qrnch.tech/pub/lstngrp"
description = "Groups listeners and connections with common data/logic."


|







1
2
3
4
5
6
7
8
9
10
[package]
name = "lstngrp"
version = "0.0.4"
edition = "2021"
license = "0BSD"
# https://crates.io/category_slugs
categories = [ "network-programming", "asynchronous" ]
keywords = [ "network", "server", "listen", "protwrap" ]
repository = "https://repos.qrnch.tech/pub/lstngrp"
description = "Groups listeners and connections with common data/logic."
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42



43
44
45
46
47
48

# https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section
[badges]
maintenance = { status = "experimental" }

[dependencies]
hashbrown = { version = "0.14.5" }
idbag = { version = "0.1.2" }
killswitch = { version = "0.4.2" }
parking_lot = { version = "0.12.3" }
protwrap = { version = "0.3.0", features = [
  "tls", "tokio"
] }
tokio = { version = "1.40.0", features = [
  "macros", "net", "rt", "sync"
] }

[dev-dependencies]
tokio = { version = "1.40.0", features = [
  "io-util", "rt-multi-thread", "time"
] }




[lints.clippy]
all = { level = "deny", priority = -1 }
pedantic = { level = "warn", priority = -1 }
nursery = { level = "warn", priority = -1 }
cargo = { level = "warn", priority = -1 }








|














>
>
>






21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

# https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section
[badges]
maintenance = { status = "experimental" }

[dependencies]
hashbrown = { version = "0.14.5" }
idbag = { version = "0.2.0" }
killswitch = { version = "0.4.2" }
parking_lot = { version = "0.12.3" }
protwrap = { version = "0.3.0", features = [
  "tls", "tokio"
] }
tokio = { version = "1.40.0", features = [
  "macros", "net", "rt", "sync"
] }

[dev-dependencies]
tokio = { version = "1.40.0", features = [
  "io-util", "rt-multi-thread", "time"
] }

[package.metadata.docs.rs]
rustdoc-args = ["--generate-link-to-definition"]

[lints.clippy]
all = { level = "deny", priority = -1 }
pedantic = { level = "warn", priority = -1 }
nursery = { level = "warn", priority = -1 }
cargo = { level = "warn", priority = -1 }

Changes to examples/per_listener_ctx.rs.

13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use std::{str::FromStr, sync::Arc};

use parking_lot::Mutex;

use hashbrown::HashMap;

use lstngrp::{
  async_trait, ArcId, ConnHandler, ConnInfo, GroupHandler, Listener,
  ListenerGroup, SockAddr, Stream
};

struct ListenCtx {
  name: String
}

struct MyHandler {







|
|







13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use std::{str::FromStr, sync::Arc};

use parking_lot::Mutex;

use hashbrown::HashMap;

use lstngrp::{
  async_trait, ArcId, ConnHandler, ConnInfo, GroupHandler, ListenGroup,
  Listener, ListenerSpec, SockAddr, Stream
};

struct ListenCtx {
  name: String
}

struct MyHandler {
129
130
131
132
133
134
135
136
137
138
139
140
141
142



143
144
145

146
147
148
149
150
151
152
153
154
155



156
157
158

159
160
161
162
163
164
165
166
167



168
169
170

171
172
173
174
175
176
177
  // accessible to all the callbacks.
  let handler = MyHandler {
    lmap: Arc::clone(&lmap)
  };

  // Create a listener group that will use an instance of MyHandler to process
  // callbacks.
  let lgrp = ListenerGroup::new(handler);

  // Kick off a listener with id 1, which is a TCP localhost:8080 listener
  let id = 1;
  let listener = Listener::from_str("127.0.0.1:8080").unwrap();
  let lm = Arc::clone(&lmap);
  lgrp.add_listener_with_cb(id, listener, move |lid| {



    let name = format!("Listener {id}");
    lm.lock().insert(lid, ListenCtx { name });
  });


  // Give listeners a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;

  // Kick off a listener with id 9, which is a TCP localhost:8080 listener,
  // which should fail because it is bound by listener with id 1.
  let id = 9;
  let listener = Listener::from_str("127.0.0.1:8080").unwrap();
  let lm = Arc::clone(&lmap);
  lgrp.add_listener_with_cb(id, listener, move |lid| {



    let name = format!("Listener {id}");
    lm.lock().insert(lid, ListenCtx { name });
  });


  // Give listener a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;

  // Kick off a listener with id 3, which is a TCP localhost:8081 listener
  let id = 2;
  let listener = Listener::from_str("127.0.0.1:8081").unwrap();
  let lm = Arc::clone(&lmap);
  lgrp.add_listener_with_cb(id, listener, move |lid| {



    let name = format!("Listener {id}");
    lm.lock().insert(lid, ListenCtx { name });
  });


  // Give listener a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;


  // Spawn a client that will connect to the server
  tokio::task::spawn(async {







|





|
>
>
>
|
|
|
>









|
>
>
>
|
|
|
>








|
>
>
>
|
|
|
>







129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
  // accessible to all the callbacks.
  let handler = MyHandler {
    lmap: Arc::clone(&lmap)
  };

  // Create a listener group that will use an instance of MyHandler to process
  // callbacks.
  let lgrp = ListenGroup::new(handler);

  // Kick off a listener with id 1, which is a TCP localhost:8080 listener
  let id = 1;
  let listener = Listener::from_str("127.0.0.1:8080").unwrap();
  let lm = Arc::clone(&lmap);
  lgrp.add_listener(
    ListenerSpec::new(id, listener)
      .autokill_conns()
      .bound_callback(move |lid| {
        let name = format!("Listener {id}");
        lm.lock().insert(lid, ListenCtx { name });
      })
  );

  // Give listeners a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;

  // Kick off a listener with id 9, which is a TCP localhost:8080 listener,
  // which should fail because it is bound by listener with id 1.
  let id = 9;
  let listener = Listener::from_str("127.0.0.1:8080").unwrap();
  let lm = Arc::clone(&lmap);
  lgrp.add_listener(
    ListenerSpec::new(id, listener)
      .autokill_conns()
      .bound_callback(move |lid| {
        let name = format!("Listener {id}");
        lm.lock().insert(lid, ListenCtx { name });
      })
  );

  // Give listener a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;

  // Kick off a listener with id 3, which is a TCP localhost:8081 listener
  let id = 2;
  let listener = Listener::from_str("127.0.0.1:8081").unwrap();
  let lm = Arc::clone(&lmap);
  lgrp.add_listener(
    ListenerSpec::new(id, listener)
      .autokill_conns()
      .bound_callback(move |lid| {
        let name = format!("Listener {id}");
        lm.lock().insert(lid, ListenCtx { name });
      })
  );

  // Give listener a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;


  // Spawn a client that will connect to the server
  tokio::task::spawn(async {
196
197
198
199
200
201
202
203
204
205
206
207
208
209
    }
  }
  */

  tokio::time::sleep(std::time::Duration::from_secs(1)).await;


  lgrp.kill_listener(id, false).await;


  lgrp.shutdown().await;
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :







|






208
209
210
211
212
213
214
215
216
217
218
219
220
221
    }
  }
  */

  tokio::time::sleep(std::time::Duration::from_secs(1)).await;


  lgrp.kill_listener(id).await;


  lgrp.shutdown().await;
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to examples/simple.rs.

1
2
3
4
5
6
7
8
9
10
11
12
use std::str::FromStr;

use lstngrp::{
  async_trait, ArcId, ConnHandler, ConnInfo, GroupHandler, Listener,
  ListenerGroup, SockAddr, Stream
};

struct MyHandler {}

#[async_trait]
impl GroupHandler for MyHandler {
  type ListenIdent = i64;



|
|







1
2
3
4
5
6
7
8
9
10
11
12
use std::str::FromStr;

use lstngrp::{
  async_trait, ArcId, ConnHandler, ConnInfo, GroupHandler, ListenGroup,
  Listener, ListenerSpec, SockAddr, Stream
};

struct MyHandler {}

#[async_trait]
impl GroupHandler for MyHandler {
  type ListenIdent = i64;
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#[tokio::main]
async fn main() {
  // Create handler object
  let handler = MyHandler {};

  // Create a listener group that will use an instance of MyHandler to process
  // callbacks.
  let lgrp = ListenerGroup::new(handler);

  // Kick off a listener with id 1, which is a TCP localhost:8080 listener
  let id = 1;
  let listener = Listener::from_str("127.0.0.1:8080").unwrap();
  lgrp.add_listener(id, listener);

  // Give listeners a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;

  // Kick off a listener with id 9, which is a TCP localhost:8080 listener,
  // which should fail because it is bound by listener with id 1.
  let id = 9;
  let listener = Listener::from_str("127.0.0.1:8080").unwrap();
  lgrp.add_listener(id, listener);

  // Give listener a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;


  // Kick off a listener with id 3, which is a TCP localhost:8081 listener
  let id = 2;
  let listener = Listener::from_str("127.0.0.1:8081").unwrap();
  lgrp.add_listener(id, listener);

  // Give listener a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;


  // Spawn a client that will connect to the server
  tokio::task::spawn(async {







|




|








|








|







82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#[tokio::main]
async fn main() {
  // Create handler object
  let handler = MyHandler {};

  // Create a listener group that will use an instance of MyHandler to process
  // callbacks.
  let lgrp = ListenGroup::new(handler);

  // Kick off a listener with id 1, which is a TCP localhost:8080 listener
  let id = 1;
  let listener = Listener::from_str("127.0.0.1:8080").unwrap();
  lgrp.add_listener(ListenerSpec::new(id, listener).autokill_conns());

  // Give listeners a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;

  // Kick off a listener with id 9, which is a TCP localhost:8080 listener,
  // which should fail because it is bound by listener with id 1.
  let id = 9;
  let listener = Listener::from_str("127.0.0.1:8080").unwrap();
  lgrp.add_listener(ListenerSpec::new(id, listener).autokill_conns());

  // Give listener a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;


  // Kick off a listener with id 3, which is a TCP localhost:8081 listener
  let id = 2;
  let listener = Listener::from_str("127.0.0.1:8081").unwrap();
  lgrp.add_listener(ListenerSpec::new(id, listener).autokill_conns());

  // Give listener a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;


  // Spawn a client that will connect to the server
  tokio::task::spawn(async {
138
139
140
141
142
143
144
145
146
147
148
149
150
151
    }
  }
  */

  tokio::time::sleep(std::time::Duration::from_secs(1)).await;


  lgrp.kill_listener(id, false).await;


  lgrp.shutdown().await;
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :







|






138
139
140
141
142
143
144
145
146
147
148
149
150
151
    }
  }
  */

  tokio::time::sleep(std::time::Duration::from_secs(1)).await;


  lgrp.kill_listener(id).await;


  lgrp.shutdown().await;
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/conn.rs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use std::sync::Arc;

use parking_lot::Mutex;

use hashbrown::HashMap;

use tokio::{sync::oneshot, task::JoinHandle};

pub use protwrap::tokio::server::{
  listener::{Acceptor, SockAddr},
  Stream
};

pub use idbag::ArcId;

use super::{ConnHandler, ConnInfo};


/// Internal per-connection data.
pub struct ConnectionData<LI, CC> {
  /// The listener identifier this connection was spawned from.













|







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use std::sync::Arc;

use parking_lot::Mutex;

use hashbrown::HashMap;

use tokio::{sync::oneshot, task::JoinHandle};

pub use protwrap::tokio::server::{
  listener::{Acceptor, SockAddr},
  Stream
};

pub use idbag::ArcIdUsize as ArcId;

use super::{ConnHandler, ConnInfo};


/// Internal per-connection data.
pub struct ConnectionData<LI, CC> {
  /// The listener identifier this connection was spawned from.

Changes to src/lib.rs.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//! Abstraction over a group of listeners and connections generated by them.
//!
//! A [`ListenerGroup`] is a set of network listeners that are bound to the
//! same basic connection handler logic.
//!
//! # Usage overview
//! An application calls [`ListenerGroup::new()`] to create a new
//! `ListenerGroup` object.  To the `new()` function it must pass an object
//! that implements the [`GroupHandler`] trait. The object that implements
//! `GroupHandler` is responsible the shared data among all listeners while the
//! implementation of the `GroupHandler`'s trait methods are responsible for
//! the shared logic.  Of special note is the [`GroupHandler::connected()`]
//! trait method, whose responsibility it is to return an object that
//! implements the [`ConnHandler`] trait.
//!
//! The `ConnHandler` implementation has two special trait methods of note.
//! The [`ConnHandler::run()`] is called once a connection has been
//! established.  The application should implement the connection management
//! logic in this method.  When the [`ConnHandler::kill()`] is called, the
//! application must perform some logic that will abort the connection and
//! return from the `ConnHandler::run()` implementation.
//!
//! Once a `ListenerGroup` object has been created, the application calls
//! [`ListenerGroup::add_listener()`] to add a listener.  A background task
//! will begin listening for incoming connections immediately.
//!
//! When an application is done with a `ListenerGroup` it can call
//! [`ListenerGroup::shutdown()`] to shut down all listeners and connections in
//! an orderly fashion (as long as the `ConnHandler::kill()` method has been
//! correctly implemented by the application).
//!
//! # Per-listener context
//! The `ListenerGroup` uses a single `GroupHandler` object for all listeners.
//! If the application needs per-listener-specific data, it can use an
//! associative container (like a `HashMap`) in its `GroupHandler` object to
//! map listeners' unique identifiers to listener-specific contexts in the
//! container.  See the `per_listener_ctx` example in lstngrp's repository for
//! an example implementation of this.

mod conn;


|



|
|














|
|


|
|




|







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//! Abstraction over a group of listeners and connections generated by them.
//!
//! A [`ListenGroup`] is a set of network listeners that are bound to the
//! same basic connection handler logic.
//!
//! # Usage overview
//! An application calls [`ListenGroup::new()`] to create a new
//! `ListenGroup` object.  To the `new()` function it must pass an object
//! that implements the [`GroupHandler`] trait. The object that implements
//! `GroupHandler` is responsible the shared data among all listeners while the
//! implementation of the `GroupHandler`'s trait methods are responsible for
//! the shared logic.  Of special note is the [`GroupHandler::connected()`]
//! trait method, whose responsibility it is to return an object that
//! implements the [`ConnHandler`] trait.
//!
//! The `ConnHandler` implementation has two special trait methods of note.
//! The [`ConnHandler::run()`] is called once a connection has been
//! established.  The application should implement the connection management
//! logic in this method.  When the [`ConnHandler::kill()`] is called, the
//! application must perform some logic that will abort the connection and
//! return from the `ConnHandler::run()` implementation.
//!
//! Once a `ListenGroup` object has been created, the application calls
//! [`ListenGroup::add_listener()`] to add a listener.  A background task
//! will begin listening for incoming connections immediately.
//!
//! When an application is done with a `ListenGroup` it can call
//! [`ListenGroup::shutdown()`] to shut down all listeners and connections in
//! an orderly fashion (as long as the `ConnHandler::kill()` method has been
//! correctly implemented by the application).
//!
//! # Per-listener context
//! The `ListenGroup` uses a single `GroupHandler` object for all listeners.
//! If the application needs per-listener-specific data, it can use an
//! associative container (like a `HashMap`) in its `GroupHandler` object to
//! map listeners' unique identifiers to listener-specific contexts in the
//! container.  See the `per_listener_ctx` example in lstngrp's repository for
//! an example implementation of this.

mod conn;
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use hashbrown::HashMap;

pub use protwrap::tokio::server::{
  listener::{async_trait, Listener, SockAddr},
  Stream
};

pub use idbag::ArcId;
use idbag::IdBag;

use conn::ConnectionData;

use listener::{ListenData, ListenTaskParams};


/// Internal alias for a `GroupHandler` with a generic listener identifier and







|
|







55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use hashbrown::HashMap;

pub use protwrap::tokio::server::{
  listener::{async_trait, Listener, SockAddr},
  Stream
};

pub use idbag::ArcIdUsize as ArcId;
use idbag::IdBagUsize as IdBag;

use conn::ConnectionData;

use listener::{ListenData, ListenTaskParams};


/// Internal alias for a `GroupHandler` with a generic listener identifier and
158
159
160
161
162
163
164











































165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
  /// that is relatively fast.  When a connection termination request is made
  /// the termination call will not return to the application until the
  /// background connection task has terminated.  In order to avoid holding up
  /// the shutdown process, termination should be quick.
  fn kill(&self);
}













































/// Representation of a group of listeners.
///
/// Each listener will use the same connection handler.
///
/// Generics:
/// - `LI` - Listener identifier.  Used by the application to uniquely identify
///   a specific listener.
/// - `CI` - Connection information.  Each connection is allicated a connection
///   information context.
pub struct ListenerGroup<LI, CC>
where
  LI: Hash + Eq
{
  /// Keep track of all listeners, identified by `LI` and all their active
  /// connections, identified by an `ArcId`, with their connection context,
  /// identified by `CC`.
  lmap: Arc<Mutex<HashMap<LI, ListenData>>>,

  cmap: Arc<Mutex<HashMap<ArcId, ConnectionData<LI, CC>>>>,

  /// Connection handler used by all the listeners within this group.
  lhandler: Arc<LCHandler<LI, CC>>,

  idbag: Arc<IdBag>,

  shutdown: Arc<AtomicBool>
}


impl<LI, CC> ListenerGroup<LI, CC>
where
  LI: Hash + Eq + Send + Sync + Clone + 'static,
  CC: ConnHandler + Send + Sync + 'static
{
  /// Create a new listener group.
  pub fn new(
    handler: impl GroupHandler<ListenIdent = LI, ConnHandler = CC>







>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>










|



















|







158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
  /// that is relatively fast.  When a connection termination request is made
  /// the termination call will not return to the application until the
  /// background connection task has terminated.  In order to avoid holding up
  /// the shutdown process, termination should be quick.
  fn kill(&self);
}


/// Builder for listeners to be added to a [`ListenGroup`].
pub struct ListenerSpec<LI> {
  id: LI,
  listener: Listener,
  autokill: bool,
  bound_cb: Option<Box<dyn FnOnce(LI) + Send>>
}

impl<LI> ListenerSpec<LI> {
  /// Create a new listener spec.
  pub fn new(id: LI, listener: Listener) -> Self {
    Self {
      id,
      listener,
      autokill: false,
      bound_cb: None
    }
  }

  /// Automatically request that connections terminate once the listener is
  /// terminated.
  #[must_use]
  pub const fn autokill_conns(mut self) -> Self {
    self.autokill = true;
    self
  }

  /// Add a post-successful-bound callback.
  ///
  /// The callback can be useful to register a listener-specific context in the
  /// listen group handler's object since it is only called on success.  This
  /// way the application does not need to deregister the listener context for
  /// the failure case.
  #[must_use]
  pub fn bound_callback(
    mut self,
    f: impl FnOnce(LI) + Send + 'static
  ) -> Self {
    self.bound_cb = Some(Box::new(f));
    self
  }
}

/// Representation of a group of listeners.
///
/// Each listener will use the same connection handler.
///
/// Generics:
/// - `LI` - Listener identifier.  Used by the application to uniquely identify
///   a specific listener.
/// - `CI` - Connection information.  Each connection is allicated a connection
///   information context.
pub struct ListenGroup<LI, CC>
where
  LI: Hash + Eq
{
  /// Keep track of all listeners, identified by `LI` and all their active
  /// connections, identified by an `ArcId`, with their connection context,
  /// identified by `CC`.
  lmap: Arc<Mutex<HashMap<LI, ListenData>>>,

  cmap: Arc<Mutex<HashMap<ArcId, ConnectionData<LI, CC>>>>,

  /// Connection handler used by all the listeners within this group.
  lhandler: Arc<LCHandler<LI, CC>>,

  idbag: Arc<IdBag>,

  shutdown: Arc<AtomicBool>
}


impl<LI, CC> ListenGroup<LI, CC>
where
  LI: Hash + Eq + Send + Sync + Clone + 'static,
  CC: ConnHandler + Send + Sync + 'static
{
  /// Create a new listener group.
  pub fn new(
    handler: impl GroupHandler<ListenIdent = LI, ConnHandler = CC>
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245

246
247
248
249
250
251










252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
      cmap: Arc::new(Mutex::new(HashMap::new())),
      idbag,
      shutdown: Arc::new(AtomicBool::new(false))
    }
  }

  /// Register a new listener within this group.
  ///
  /// `id` is the unique identifier of this listener.
  /// `listner_ctx` is a listener-specific instance of a listener context that
  /// will be put in an `Arc` and passed to each connector.
  pub fn add_listener(&self, id: LI, listener: Listener) {
    // Do not allow new listeners if the ListenerGroup is either in the process
    // of shutting down or has been shut down.
    if self.shutdown.load(Ordering::Relaxed) {
      return;
    }

    let g = self.lmap.lock();
    if g.contains_key(&id) {
      // Listener already running
      return;
    }
    drop(g);

    let ltp = ListenTaskParams {
      lid: id,
      listener,
      lmap: Arc::clone(&self.lmap),
      lhandler: Arc::clone(&self.lhandler),
      cmap: Arc::clone(&self.cmap),
      idbag: Arc::clone(&self.idbag),
      shutdown: Arc::clone(&self.shutdown)

    };

    // Need to send the listener task's JoinHandle to the task so it can be
    // stored in the listener's map
    let (tx, rx) = oneshot::channel();











    // Spawn a task to initialize and run the listener.
    let jh = task::spawn(listener::task(ltp, |_lid| {}, rx));

    // Send JoinHandle to listener task, so it can store it in the ListenerData
    // structure
    if let Err(_e) = tx.send(jh) {
      // ToDo: error-management
    }
  }

  /// Register off a listener within this group.
  ///
  /// This works much like `add_listener()`, but it will call a callback,
  /// passed through `f`, if the bind was successful.
  ///
  /// The callback can be useful to register a listener-specific context in the
  /// listen group handler's object since it is only called on success.  This
  /// way the application does not need to deregister the listener context for
  /// the failure case.
  pub fn add_listener_with_cb<F>(&self, lid: LI, listener: Listener, f: F)
  where
    F: FnOnce(LI) + Send + 'static
  {
    // Do not allow new listeners if the ListenerGroup is either in the process
    // of shutting down or has been shut down.
    if self.shutdown.load(Ordering::Relaxed) {
      return;
    }


    let g = self.lmap.lock();
    if g.contains_key(&lid) {
      // A listener with this listener id is already running
      return;
    }
    drop(g);

    let ltp = ListenTaskParams {
      lid,
      listener,
      lmap: Arc::clone(&self.lmap),
      lhandler: Arc::clone(&self.lhandler),
      cmap: Arc::clone(&self.cmap),
      idbag: Arc::clone(&self.idbag),
      shutdown: Arc::clone(&self.shutdown)
    };

    // Need to send the listener task's JoinHandle to the task so it can be
    // stored in the listener's map
    let (tx, rx) = oneshot::channel();

    // Spawn a task to initialize and run the listener.
    let jh = task::spawn(listener::task(ltp, f, rx));

    // Send JoinHandle to listener task, so it can store it in the ListenerData
    // structure
    if let Err(_e) = tx.send(jh) {
      // ToDo: error-handling
    }
  }


  /// Check if there's a listener with the identifier `LI`.
  pub fn have_listener(&mut self, id: &LI) -> bool {
    self.lmap.lock().contains_key(id)
  }

  /// Kill the listener given a specific listener id.
  ///
  /// If `kill_conns` is `true`, the killswitch of all of the listeners active
  /// connections will be triggered as well.
  pub async fn kill_listener(&self, lid: LI, kill_conns: bool) {
    // Extraction of join handle placed in a scope so that clippy doesn't think
    // the lock is held over an await point.
    let mut jh = {
      let mut lmap = self.lmap.lock();
      let Some(ldata) = lmap.get_mut(&lid) else {
        // No such id found
        return;
      };

      // Tell listener to self-terminate and attempt to take ownership of
      // listener task's JoinHandle.
      ldata.ks.trigger();
      let jh = ldata.jh.take();
      drop(lmap);

      jh
    };


    // If the listener data contained a join handle then use it to wait for
    // the task to terminate.
    if let Some(jh) = jh.take() {
      let _ = jh.await;
    }

    if kill_conns {
      // Generate a list of connection id's to be terminated.
      let cids: Vec<ArcId> = self
        .cmap
        .lock()
        .iter()
        .filter_map(|(cid, cdata)| (cdata.lid == lid).then_some(cid.clone()))
        .collect();

      // ToDo: If all the connnections' killswitches are triggered here
      // instead, they could terminate concurrently.
      for cid in cids {
        self.kill_connection(cid).await;
      }
    }
  }

  /// Terminate a connection given a connection identifier.
  ///
  /// This function will not return until the background connection task has
  /// terminated.
  pub async fn kill_connection(&self, cid: ArcId) {







<
<
<
<
|
|






|






|
|




|
>






>
>
>
>
>
>
>
>
>
>

|







<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<










|


















<





<
<
<
<
<
<
<
<
<
<
<
<
<
<
<
<







256
257
258
259
260
261
262




263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310




















































311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339

340
341
342
343
344
















345
346
347
348
349
350
351
      cmap: Arc::new(Mutex::new(HashMap::new())),
      idbag,
      shutdown: Arc::new(AtomicBool::new(false))
    }
  }

  /// Register a new listener within this group.




  pub fn add_listener(&self, spec: ListenerSpec<LI>) {
    // Do not allow new listeners if the ListenGroup is either in the process
    // of shutting down or has been shut down.
    if self.shutdown.load(Ordering::Relaxed) {
      return;
    }

    let g = self.lmap.lock();
    if g.contains_key(&spec.id) {
      // Listener already running
      return;
    }
    drop(g);

    let ltp = ListenTaskParams {
      lid: spec.id,
      listener: spec.listener,
      lmap: Arc::clone(&self.lmap),
      lhandler: Arc::clone(&self.lhandler),
      cmap: Arc::clone(&self.cmap),
      idbag: Arc::clone(&self.idbag),
      shutdown: Arc::clone(&self.shutdown),
      autoclose_conns: spec.autokill
    };

    // Need to send the listener task's JoinHandle to the task so it can be
    // stored in the listener's map
    let (tx, rx) = oneshot::channel();

    #[allow(clippy::option_if_let_else)]
    let cb = if let Some(cb) = spec.bound_cb {
      cb
    } else {
      Box::new(|_lid| {})
    };

    // suggested by clippy, but causes build error
    /* let cb = spec.bound_cb.map_or_else(|| Box::new(|_lid| {}), |cb| cb); */

    // Spawn a task to initialize and run the listener.
    let jh = task::spawn(listener::task(ltp, cb, rx));

    // Send JoinHandle to listener task, so it can store it in the ListenerData
    // structure
    if let Err(_e) = tx.send(jh) {
      // ToDo: error-management
    }
  }





















































  /// Check if there's a listener with the identifier `LI`.
  pub fn have_listener(&mut self, id: &LI) -> bool {
    self.lmap.lock().contains_key(id)
  }

  /// Kill the listener given a specific listener id.
  ///
  /// If `kill_conns` is `true`, the killswitch of all of the listeners active
  /// connections will be triggered as well.
  pub async fn kill_listener(&self, lid: LI) {
    // Extraction of join handle placed in a scope so that clippy doesn't think
    // the lock is held over an await point.
    let mut jh = {
      let mut lmap = self.lmap.lock();
      let Some(ldata) = lmap.get_mut(&lid) else {
        // No such id found
        return;
      };

      // Tell listener to self-terminate and attempt to take ownership of
      // listener task's JoinHandle.
      ldata.ks.trigger();
      let jh = ldata.jh.take();
      drop(lmap);

      jh
    };


    // If the listener data contained a join handle then use it to wait for
    // the task to terminate.
    if let Some(jh) = jh.take() {
      let _ = jh.await;
    }
















  }

  /// Terminate a connection given a connection identifier.
  ///
  /// This function will not return until the background connection task has
  /// terminated.
  pub async fn kill_connection(&self, cid: ArcId) {
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
    let lids: Vec<LI> = self
      .lmap
      .lock()
      .iter()
      .map(|(lid, _)| lid.clone())
      .collect();
    for lid in lids {
      self.kill_listener(lid, true).await;
    }
  }

  /// Get a snap-shot of the current listener group state.
  #[must_use]
  pub fn current_state(&self) -> Vec<LInfo<LI, CC>> {
    // Generate list of all listeners







|







388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
    let lids: Vec<LI> = self
      .lmap
      .lock()
      .iter()
      .map(|(lid, _)| lid.clone())
      .collect();
    for lid in lids {
      self.kill_listener(lid).await;
    }
  }

  /// Get a snap-shot of the current listener group state.
  #[must_use]
  pub fn current_state(&self) -> Vec<LInfo<LI, CC>> {
    // Generate list of all listeners

Changes to src/listener.rs.

18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
pub use killswitch::KillSwitch;

pub use protwrap::tokio::server::{
  listener::{async_trait, Acceptor, Listener, SockAddr},
  Stream
};

pub use idbag::{ArcId, IdBag};

use crate::{
  conn::{self, ConnectionData, Params as ConnParams},
  ConnHandler, LCHandler
};

/// Internal per-listener data.







|







18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
pub use killswitch::KillSwitch;

pub use protwrap::tokio::server::{
  listener::{async_trait, Acceptor, Listener, SockAddr},
  Stream
};

pub use idbag::{ArcIdUsize as ArcId, IdBagUsize as IdBag};

use crate::{
  conn::{self, ConnectionData, Params as ConnParams},
  ConnHandler, LCHandler
};

/// Internal per-listener data.
59
60
61
62
63
64
65
66


67
68
69
70
71
72
73

  /// `IdBag` used to allocate unique connection id's.
  idbag: Arc<IdBag>,

  /// Optional post-successful-bind callback.
  cb: Option<Box<dyn FnOnce(LI) + Send>>,

  shutdown: Arc<AtomicBool>


}

#[async_trait]
impl<LI, CC> Acceptor for InternalListenerCallback<LI, CC>
where
  LI: Hash + Eq + Send + Sync + Clone + 'static,
  CC: ConnHandler + Send + Sync + 'static







|
>
>







59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

  /// `IdBag` used to allocate unique connection id's.
  idbag: Arc<IdBag>,

  /// Optional post-successful-bind callback.
  cb: Option<Box<dyn FnOnce(LI) + Send>>,

  shutdown: Arc<AtomicBool>,

  auto_kill_conns: bool
}

#[async_trait]
impl<LI, CC> Acceptor for InternalListenerCallback<LI, CC>
where
  LI: Hash + Eq + Send + Sync + Clone + 'static,
  CC: ConnHandler + Send + Sync + 'static
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
    let jh = self.jh.take();

    // Allocate the buffer used to store listener data in the listners map
    let ldata = ListenData {
      ks: self.ks.clone(),
      jh,
      addr,
      auto_kill_conns: false
    };

    // Add listener to listener map
    self.lmap.lock().insert(self.lid.clone(), ldata);

    // Call the callback, in case the application wants to do some
    // post-successful-bound processing.







|







89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
    let jh = self.jh.take();

    // Allocate the buffer used to store listener data in the listners map
    let ldata = ListenData {
      ks: self.ks.clone(),
      jh,
      addr,
      auto_kill_conns: self.auto_kill_conns
    };

    // Add listener to listener map
    self.lmap.lock().insert(self.lid.clone(), ldata);

    // Call the callback, in case the application wants to do some
    // post-successful-bound processing.
156
157
158
159
160
161
162
163

164
165
166
167
168
169
170
{
  pub(crate) lid: LI,
  pub(crate) listener: Listener,
  pub(crate) lmap: Arc<Mutex<HashMap<LI, ListenData>>>,
  pub(crate) lhandler: Arc<LCHandler<LI, CC>>,
  pub(crate) cmap: Arc<Mutex<HashMap<ArcId, ConnectionData<LI, CC>>>>,
  pub(crate) idbag: Arc<IdBag>,
  pub(crate) shutdown: Arc<AtomicBool>

}

/// Internal listener.
///
/// This should be spawned on a new task.
///
/// `cb` is a callback that will be called iff the listener was successfully







|
>







158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
{
  pub(crate) lid: LI,
  pub(crate) listener: Listener,
  pub(crate) lmap: Arc<Mutex<HashMap<LI, ListenData>>>,
  pub(crate) lhandler: Arc<LCHandler<LI, CC>>,
  pub(crate) cmap: Arc<Mutex<HashMap<ArcId, ConnectionData<LI, CC>>>>,
  pub(crate) idbag: Arc<IdBag>,
  pub(crate) shutdown: Arc<AtomicBool>,
  pub(crate) autoclose_conns: bool
}

/// Internal listener.
///
/// This should be spawned on a new task.
///
/// `cb` is a callback that will be called iff the listener was successfully
190
191
192
193
194
195
196
197

198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214

215
216
217
218
219
220


221
222
223

224
225
226
227
228


229


230
231




232
233
234
235
    cmap: Arc::clone(&ltp.cmap),
    lid: ltp.lid.clone(),
    ks: ks.clone(),
    lhandler: Arc::clone(&ltp.lhandler),
    idbag: ltp.idbag,
    cb: Some(Box::new(cb)),
    jh: Some(jh),
    shutdown: Arc::clone(&ltp.shutdown)

  };

  // Kick off the listerner.
  //
  // If successful, this will block until terminated using the killswitch.
  match ltp.listener.run(ks, ilcb).await {
    Ok(()) => {
      // Do nothing -- unbound is handled by Acceptor::unbound()
      //ltp.handler.unbound(ltp.lid.clone(), &ltp.lctx).await;
    }
    Err(e) => {
      // Listener failed -- report back to application using callback
      ltp.lhandler.failed(&ltp.listener, ltp.lid.clone(), e).await;
    }
  }

  // Deregister the listener from the listener map

  let mut g = ltp.lmap.lock();
  let Some(ldata) = g.remove(&ltp.lid) else {
    // No such id found
    return;
  };
  drop(g);



  // If the listener is configured to automatically kill client connections,
  // then do so

  if ldata.auto_kill_conns {
    // ToDo: Kill all connections
    /*
    let g = ldata.cmap.lock();
    for (_id, chandler) in g.iter() {


      chandler.kill();


    }
    */




  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :







|
>

















>
|
|
|
|
|
|
>
>



>

<
|
|
|
>
>

>
>
|
<
>
>
>
>




193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232

233
234
235
236
237
238
239
240
241

242
243
244
245
246
247
248
249
    cmap: Arc::clone(&ltp.cmap),
    lid: ltp.lid.clone(),
    ks: ks.clone(),
    lhandler: Arc::clone(&ltp.lhandler),
    idbag: ltp.idbag,
    cb: Some(Box::new(cb)),
    jh: Some(jh),
    shutdown: Arc::clone(&ltp.shutdown),
    auto_kill_conns: ltp.autoclose_conns
  };

  // Kick off the listerner.
  //
  // If successful, this will block until terminated using the killswitch.
  match ltp.listener.run(ks, ilcb).await {
    Ok(()) => {
      // Do nothing -- unbound is handled by Acceptor::unbound()
      //ltp.handler.unbound(ltp.lid.clone(), &ltp.lctx).await;
    }
    Err(e) => {
      // Listener failed -- report back to application using callback
      ltp.lhandler.failed(&ltp.listener, ltp.lid.clone(), e).await;
    }
  }

  // Deregister the listener from the listener map
  let ldata = {
    let mut g = ltp.lmap.lock();
    let Some(ldata) = g.remove(&ltp.lid) else {
      // No such id found
      return;
    };
    drop(g);
    ldata
  };

  // If the listener is configured to automatically kill client connections,
  // then do so
  let mut cjhs = Vec::new();
  if ldata.auto_kill_conns {

    //if ltp.autoclose_conns {
    let mut g = ltp.cmap.lock();
    for (chandler, jh) in g.iter_mut().filter_map(|(_id, cdata)| {
      (cdata.lid == ltp.lid).then_some((&cdata.chandler, cdata.jh.take()))
    }) {
      chandler.kill();
      if let Some(jh) = jh {
        cjhs.push(jh);
      }

    }
  }
  for jh in cjhs {
    let _ = jh.await;
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to www/changelog.md.

1
2


3
4
5
6
7
8


9
10





11



12
13
14
15
16
17
18
# Change Log



## [Unreleased]

[Details](/vdiff?from=lstngrp-0.0.3&to=trunk)

### Added



### Changed






### Removed




---

## [0.0.3] - 2024-09-18

[Details](/vdiff?from=lstngrp-0.0.2&to=lstngrp-0.0.3)



>
>






>
>


>
>
>
>
>

>
>
>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Change Log

⚠️  indicates a breaking change.

## [Unreleased]

[Details](/vdiff?from=lstngrp-0.0.3&to=trunk)

### Added

- Support automatically closing connections when a listener is removed.

### Changed

- ⚠️ Rather than have two different functions to add listeners, unify to a single
  function, and use a builder-type `ListenerSpec` to pass to the
  `add_listener()` method instead.
- Update `idbag` to `0.2.0`.

### Removed

- ⚠️ No longer allow `kill_conns` to be specified when killing a listener.
  Rely on the `ListenerSpec` setting instead.

---

## [0.0.3] - 2024-09-18

[Details](/vdiff?from=lstngrp-0.0.2&to=lstngrp-0.0.3)