Index: Cargo.toml
==================================================================
--- Cargo.toml
+++ Cargo.toml
@@ -1,8 +1,8 @@
[package]
name = "lstngrp"
-version = "0.0.3"
+version = "0.0.4"
edition = "2021"
license = "0BSD"
# https://crates.io/category_slugs
categories = [ "network-programming", "asynchronous" ]
keywords = [ "network", "server", "listen", "protwrap" ]
@@ -23,11 +23,11 @@
[badges]
maintenance = { status = "experimental" }
[dependencies]
hashbrown = { version = "0.14.5" }
-idbag = { version = "0.1.2" }
+idbag = { version = "0.2.0" }
killswitch = { version = "0.4.2" }
parking_lot = { version = "0.12.3" }
protwrap = { version = "0.3.0", features = [
"tls", "tokio"
] }
@@ -38,11 +38,14 @@
[dev-dependencies]
tokio = { version = "1.40.0", features = [
"io-util", "rt-multi-thread", "time"
] }
+[package.metadata.docs.rs]
+rustdoc-args = ["--generate-link-to-definition"]
+
[lints.clippy]
all = { level = "deny", priority = -1 }
pedantic = { level = "warn", priority = -1 }
nursery = { level = "warn", priority = -1 }
cargo = { level = "warn", priority = -1 }
Index: examples/per_listener_ctx.rs
==================================================================
--- examples/per_listener_ctx.rs
+++ examples/per_listener_ctx.rs
@@ -15,12 +15,12 @@
use parking_lot::Mutex;
use hashbrown::HashMap;
use lstngrp::{
- async_trait, ArcId, ConnHandler, ConnInfo, GroupHandler, Listener,
- ListenerGroup, SockAddr, Stream
+ async_trait, ArcId, ConnHandler, ConnInfo, GroupHandler, ListenGroup,
+ Listener, ListenerSpec, SockAddr, Stream
};
struct ListenCtx {
name: String
}
@@ -131,45 +131,57 @@
lmap: Arc::clone(&lmap)
};
// Create a listener group that will use an instance of MyHandler to process
// callbacks.
- let lgrp = ListenerGroup::new(handler);
+ let lgrp = ListenGroup::new(handler);
// Kick off a listener with id 1, which is a TCP localhost:8080 listener
let id = 1;
let listener = Listener::from_str("127.0.0.1:8080").unwrap();
let lm = Arc::clone(&lmap);
- lgrp.add_listener_with_cb(id, listener, move |lid| {
- let name = format!("Listener {id}");
- lm.lock().insert(lid, ListenCtx { name });
- });
+ lgrp.add_listener(
+ ListenerSpec::new(id, listener)
+ .autokill_conns()
+ .bound_callback(move |lid| {
+ let name = format!("Listener {id}");
+ lm.lock().insert(lid, ListenCtx { name });
+ })
+ );
// Give listeners a second to start up
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
// Kick off a listener with id 9, which is a TCP localhost:8080 listener,
// which should fail because it is bound by listener with id 1.
let id = 9;
let listener = Listener::from_str("127.0.0.1:8080").unwrap();
let lm = Arc::clone(&lmap);
- lgrp.add_listener_with_cb(id, listener, move |lid| {
- let name = format!("Listener {id}");
- lm.lock().insert(lid, ListenCtx { name });
- });
+ lgrp.add_listener(
+ ListenerSpec::new(id, listener)
+ .autokill_conns()
+ .bound_callback(move |lid| {
+ let name = format!("Listener {id}");
+ lm.lock().insert(lid, ListenCtx { name });
+ })
+ );
// Give listener a second to start up
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
// Kick off a listener with id 3, which is a TCP localhost:8081 listener
let id = 2;
let listener = Listener::from_str("127.0.0.1:8081").unwrap();
let lm = Arc::clone(&lmap);
- lgrp.add_listener_with_cb(id, listener, move |lid| {
- let name = format!("Listener {id}");
- lm.lock().insert(lid, ListenCtx { name });
- });
+ lgrp.add_listener(
+ ListenerSpec::new(id, listener)
+ .autokill_conns()
+ .bound_callback(move |lid| {
+ let name = format!("Listener {id}");
+ lm.lock().insert(lid, ListenCtx { name });
+ })
+ );
// Give listener a second to start up
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
@@ -198,12 +210,12 @@
*/
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
- lgrp.kill_listener(id, false).await;
+ lgrp.kill_listener(id).await;
lgrp.shutdown().await;
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
Index: examples/simple.rs
==================================================================
--- examples/simple.rs
+++ examples/simple.rs
@@ -1,10 +1,10 @@
use std::str::FromStr;
use lstngrp::{
- async_trait, ArcId, ConnHandler, ConnInfo, GroupHandler, Listener,
- ListenerGroup, SockAddr, Stream
+ async_trait, ArcId, ConnHandler, ConnInfo, GroupHandler, ListenGroup,
+ Listener, ListenerSpec, SockAddr, Stream
};
struct MyHandler {}
#[async_trait]
@@ -84,34 +84,34 @@
// Create handler object
let handler = MyHandler {};
// Create a listener group that will use an instance of MyHandler to process
// callbacks.
- let lgrp = ListenerGroup::new(handler);
+ let lgrp = ListenGroup::new(handler);
// Kick off a listener with id 1, which is a TCP localhost:8080 listener
let id = 1;
let listener = Listener::from_str("127.0.0.1:8080").unwrap();
- lgrp.add_listener(id, listener);
+ lgrp.add_listener(ListenerSpec::new(id, listener).autokill_conns());
// Give listeners a second to start up
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
// Kick off a listener with id 9, which is a TCP localhost:8080 listener,
// which should fail because it is bound by listener with id 1.
let id = 9;
let listener = Listener::from_str("127.0.0.1:8080").unwrap();
- lgrp.add_listener(id, listener);
+ lgrp.add_listener(ListenerSpec::new(id, listener).autokill_conns());
// Give listener a second to start up
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
// Kick off a listener with id 3, which is a TCP localhost:8081 listener
let id = 2;
let listener = Listener::from_str("127.0.0.1:8081").unwrap();
- lgrp.add_listener(id, listener);
+ lgrp.add_listener(ListenerSpec::new(id, listener).autokill_conns());
// Give listener a second to start up
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
@@ -140,12 +140,12 @@
*/
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
- lgrp.kill_listener(id, false).await;
+ lgrp.kill_listener(id).await;
lgrp.shutdown().await;
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
Index: src/conn.rs
==================================================================
--- src/conn.rs
+++ src/conn.rs
@@ -9,11 +9,11 @@
pub use protwrap::tokio::server::{
listener::{Acceptor, SockAddr},
Stream
};
-pub use idbag::ArcId;
+pub use idbag::ArcIdUsize as ArcId;
use super::{ConnHandler, ConnInfo};
/// Internal per-connection data.
Index: src/lib.rs
==================================================================
--- src/lib.rs
+++ src/lib.rs
@@ -1,13 +1,13 @@
//! Abstraction over a group of listeners and connections generated by them.
//!
-//! A [`ListenerGroup`] is a set of network listeners that are bound to the
+//! A [`ListenGroup`] is a set of network listeners that are bound to the
//! same basic connection handler logic.
//!
//! # Usage overview
-//! An application calls [`ListenerGroup::new()`] to create a new
-//! `ListenerGroup` object. To the `new()` function it must pass an object
+//! An application calls [`ListenGroup::new()`] to create a new
+//! `ListenGroup` object. To the `new()` function it must pass an object
//! that implements the [`GroupHandler`] trait. The object that implements
//! `GroupHandler` is responsible the shared data among all listeners while the
//! implementation of the `GroupHandler`'s trait methods are responsible for
//! the shared logic. Of special note is the [`GroupHandler::connected()`]
//! trait method, whose responsibility it is to return an object that
@@ -18,21 +18,21 @@
//! established. The application should implement the connection management
//! logic in this method. When the [`ConnHandler::kill()`] is called, the
//! application must perform some logic that will abort the connection and
//! return from the `ConnHandler::run()` implementation.
//!
-//! Once a `ListenerGroup` object has been created, the application calls
-//! [`ListenerGroup::add_listener()`] to add a listener. A background task
+//! Once a `ListenGroup` object has been created, the application calls
+//! [`ListenGroup::add_listener()`] to add a listener. A background task
//! will begin listening for incoming connections immediately.
//!
-//! When an application is done with a `ListenerGroup` it can call
-//! [`ListenerGroup::shutdown()`] to shut down all listeners and connections in
+//! When an application is done with a `ListenGroup` it can call
+//! [`ListenGroup::shutdown()`] to shut down all listeners and connections in
//! an orderly fashion (as long as the `ConnHandler::kill()` method has been
//! correctly implemented by the application).
//!
//! # Per-listener context
-//! The `ListenerGroup` uses a single `GroupHandler` object for all listeners.
+//! The `ListenGroup` uses a single `GroupHandler` object for all listeners.
//! If the application needs per-listener-specific data, it can use an
//! associative container (like a `HashMap`) in its `GroupHandler` object to
//! map listeners' unique identifiers to listener-specific contexts in the
//! container. See the `per_listener_ctx` example in lstngrp's repository for
//! an example implementation of this.
@@ -57,12 +57,12 @@
pub use protwrap::tokio::server::{
listener::{async_trait, Listener, SockAddr},
Stream
};
-pub use idbag::ArcId;
-use idbag::IdBag;
+pub use idbag::ArcIdUsize as ArcId;
+use idbag::IdBagUsize as IdBag;
use conn::ConnectionData;
use listener::{ListenData, ListenTaskParams};
@@ -160,10 +160,53 @@
/// background connection task has terminated. In order to avoid holding up
/// the shutdown process, termination should be quick.
fn kill(&self);
}
+
+/// Builder for listeners to be added to a [`ListenGroup`].
+pub struct ListenerSpec
{
+ id: LI,
+ listener: Listener,
+ autokill: bool,
+ bound_cb: Option>
+}
+
+impl ListenerSpec {
+ /// Create a new listener spec.
+ pub fn new(id: LI, listener: Listener) -> Self {
+ Self {
+ id,
+ listener,
+ autokill: false,
+ bound_cb: None
+ }
+ }
+
+ /// Automatically request that connections terminate once the listener is
+ /// terminated.
+ #[must_use]
+ pub const fn autokill_conns(mut self) -> Self {
+ self.autokill = true;
+ self
+ }
+
+ /// Add a post-successful-bound callback.
+ ///
+ /// The callback can be useful to register a listener-specific context in the
+ /// listen group handler's object since it is only called on success. This
+ /// way the application does not need to deregister the listener context for
+ /// the failure case.
+ #[must_use]
+ pub fn bound_callback(
+ mut self,
+ f: impl FnOnce(LI) + Send + 'static
+ ) -> Self {
+ self.bound_cb = Some(Box::new(f));
+ self
+ }
+}
/// Representation of a group of listeners.
///
/// Each listener will use the same connection handler.
///
@@ -170,11 +213,11 @@
/// Generics:
/// - `LI` - Listener identifier. Used by the application to uniquely identify
/// a specific listener.
/// - `CI` - Connection information. Each connection is allicated a connection
/// information context.
-pub struct ListenerGroup
+pub struct ListenGroup
where
LI: Hash + Eq
{
/// Keep track of all listeners, identified by `LI` and all their active
/// connections, identified by an `ArcId`, with their connection context,
@@ -190,11 +233,11 @@
shutdown: Arc
}
-impl ListenerGroup
+impl ListenGroup
where
LI: Hash + Eq + Send + Sync + Clone + 'static,
CC: ConnHandler + Send + Sync + 'static
{
/// Create a new listener group.
@@ -215,103 +258,58 @@
shutdown: Arc::new(AtomicBool::new(false))
}
}
/// Register a new listener within this group.
- ///
- /// `id` is the unique identifier of this listener.
- /// `listner_ctx` is a listener-specific instance of a listener context that
- /// will be put in an `Arc` and passed to each connector.
- pub fn add_listener(&self, id: LI, listener: Listener) {
- // Do not allow new listeners if the ListenerGroup is either in the process
+ pub fn add_listener(&self, spec: ListenerSpec) {
+ // Do not allow new listeners if the ListenGroup is either in the process
// of shutting down or has been shut down.
if self.shutdown.load(Ordering::Relaxed) {
return;
}
let g = self.lmap.lock();
- if g.contains_key(&id) {
+ if g.contains_key(&spec.id) {
// Listener already running
return;
}
drop(g);
let ltp = ListenTaskParams {
- lid: id,
- listener,
+ lid: spec.id,
+ listener: spec.listener,
lmap: Arc::clone(&self.lmap),
lhandler: Arc::clone(&self.lhandler),
cmap: Arc::clone(&self.cmap),
idbag: Arc::clone(&self.idbag),
- shutdown: Arc::clone(&self.shutdown)
+ shutdown: Arc::clone(&self.shutdown),
+ autoclose_conns: spec.autokill
};
// Need to send the listener task's JoinHandle to the task so it can be
// stored in the listener's map
let (tx, rx) = oneshot::channel();
+ #[allow(clippy::option_if_let_else)]
+ let cb = if let Some(cb) = spec.bound_cb {
+ cb
+ } else {
+ Box::new(|_lid| {})
+ };
+
+ // suggested by clippy, but causes build error
+ /* let cb = spec.bound_cb.map_or_else(|| Box::new(|_lid| {}), |cb| cb); */
+
// Spawn a task to initialize and run the listener.
- let jh = task::spawn(listener::task(ltp, |_lid| {}, rx));
+ let jh = task::spawn(listener::task(ltp, cb, rx));
// Send JoinHandle to listener task, so it can store it in the ListenerData
// structure
if let Err(_e) = tx.send(jh) {
// ToDo: error-management
}
}
-
- /// Register off a listener within this group.
- ///
- /// This works much like `add_listener()`, but it will call a callback,
- /// passed through `f`, if the bind was successful.
- ///
- /// The callback can be useful to register a listener-specific context in the
- /// listen group handler's object since it is only called on success. This
- /// way the application does not need to deregister the listener context for
- /// the failure case.
- pub fn add_listener_with_cb(&self, lid: LI, listener: Listener, f: F)
- where
- F: FnOnce(LI) + Send + 'static
- {
- // Do not allow new listeners if the ListenerGroup is either in the process
- // of shutting down or has been shut down.
- if self.shutdown.load(Ordering::Relaxed) {
- return;
- }
-
-
- let g = self.lmap.lock();
- if g.contains_key(&lid) {
- // A listener with this listener id is already running
- return;
- }
- drop(g);
-
- let ltp = ListenTaskParams {
- lid,
- listener,
- lmap: Arc::clone(&self.lmap),
- lhandler: Arc::clone(&self.lhandler),
- cmap: Arc::clone(&self.cmap),
- idbag: Arc::clone(&self.idbag),
- shutdown: Arc::clone(&self.shutdown)
- };
-
- // Need to send the listener task's JoinHandle to the task so it can be
- // stored in the listener's map
- let (tx, rx) = oneshot::channel();
-
- // Spawn a task to initialize and run the listener.
- let jh = task::spawn(listener::task(ltp, f, rx));
-
- // Send JoinHandle to listener task, so it can store it in the ListenerData
- // structure
- if let Err(_e) = tx.send(jh) {
- // ToDo: error-handling
- }
- }
-
/// Check if there's a listener with the identifier `LI`.
pub fn have_listener(&mut self, id: &LI) -> bool {
self.lmap.lock().contains_key(id)
}
@@ -318,11 +316,11 @@
/// Kill the listener given a specific listener id.
///
/// If `kill_conns` is `true`, the killswitch of all of the listeners active
/// connections will be triggered as well.
- pub async fn kill_listener(&self, lid: LI, kill_conns: bool) {
+ pub async fn kill_listener(&self, lid: LI) {
// Extraction of join handle placed in a scope so that clippy doesn't think
// the lock is held over an await point.
let mut jh = {
let mut lmap = self.lmap.lock();
let Some(ldata) = lmap.get_mut(&lid) else {
@@ -337,32 +335,15 @@
drop(lmap);
jh
};
-
// If the listener data contained a join handle then use it to wait for
// the task to terminate.
if let Some(jh) = jh.take() {
let _ = jh.await;
}
-
- if kill_conns {
- // Generate a list of connection id's to be terminated.
- let cids: Vec = self
- .cmap
- .lock()
- .iter()
- .filter_map(|(cid, cdata)| (cdata.lid == lid).then_some(cid.clone()))
- .collect();
-
- // ToDo: If all the connnections' killswitches are triggered here
- // instead, they could terminate concurrently.
- for cid in cids {
- self.kill_connection(cid).await;
- }
- }
}
/// Terminate a connection given a connection identifier.
///
/// This function will not return until the background connection task has
@@ -409,11 +390,11 @@
.lock()
.iter()
.map(|(lid, _)| lid.clone())
.collect();
for lid in lids {
- self.kill_listener(lid, true).await;
+ self.kill_listener(lid).await;
}
}
/// Get a snap-shot of the current listener group state.
#[must_use]
Index: src/listener.rs
==================================================================
--- src/listener.rs
+++ src/listener.rs
@@ -20,11 +20,11 @@
pub use protwrap::tokio::server::{
listener::{async_trait, Acceptor, Listener, SockAddr},
Stream
};
-pub use idbag::{ArcId, IdBag};
+pub use idbag::{ArcIdUsize as ArcId, IdBagUsize as IdBag};
use crate::{
conn::{self, ConnectionData, Params as ConnParams},
ConnHandler, LCHandler
};
@@ -61,11 +61,13 @@
idbag: Arc,
/// Optional post-successful-bind callback.
cb: Option>,
- shutdown: Arc
+ shutdown: Arc,
+
+ auto_kill_conns: bool
}
#[async_trait]
impl Acceptor for InternalListenerCallback
where
@@ -89,11 +91,11 @@
// Allocate the buffer used to store listener data in the listners map
let ldata = ListenData {
ks: self.ks.clone(),
jh,
addr,
- auto_kill_conns: false
+ auto_kill_conns: self.auto_kill_conns
};
// Add listener to listener map
self.lmap.lock().insert(self.lid.clone(), ldata);
@@ -158,11 +160,12 @@
pub(crate) listener: Listener,
pub(crate) lmap: Arc>>,
pub(crate) lhandler: Arc>,
pub(crate) cmap: Arc>>>,
pub(crate) idbag: Arc,
- pub(crate) shutdown: Arc
+ pub(crate) shutdown: Arc,
+ pub(crate) autoclose_conns: bool
}
/// Internal listener.
///
/// This should be spawned on a new task.
@@ -192,11 +195,12 @@
ks: ks.clone(),
lhandler: Arc::clone(<p.lhandler),
idbag: ltp.idbag,
cb: Some(Box::new(cb)),
jh: Some(jh),
- shutdown: Arc::clone(<p.shutdown)
+ shutdown: Arc::clone(<p.shutdown),
+ auto_kill_conns: ltp.autoclose_conns
};
// Kick off the listerner.
//
// If successful, this will block until terminated using the killswitch.
@@ -210,26 +214,36 @@
ltp.lhandler.failed(<p.listener, ltp.lid.clone(), e).await;
}
}
// Deregister the listener from the listener map
- let mut g = ltp.lmap.lock();
- let Some(ldata) = g.remove(<p.lid) else {
- // No such id found
- return;
+ let ldata = {
+ let mut g = ltp.lmap.lock();
+ let Some(ldata) = g.remove(<p.lid) else {
+ // No such id found
+ return;
+ };
+ drop(g);
+ ldata
};
- drop(g);
// If the listener is configured to automatically kill client connections,
// then do so
+ let mut cjhs = Vec::new();
if ldata.auto_kill_conns {
- // ToDo: Kill all connections
- /*
- let g = ldata.cmap.lock();
- for (_id, chandler) in g.iter() {
+ //if ltp.autoclose_conns {
+ let mut g = ltp.cmap.lock();
+ for (chandler, jh) in g.iter_mut().filter_map(|(_id, cdata)| {
+ (cdata.lid == ltp.lid).then_some((&cdata.chandler, cdata.jh.take()))
+ }) {
chandler.kill();
+ if let Some(jh) = jh {
+ cjhs.push(jh);
+ }
}
- */
+ }
+ for jh in cjhs {
+ let _ = jh.await;
}
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
Index: www/changelog.md
==================================================================
--- www/changelog.md
+++ www/changelog.md
@@ -1,16 +1,28 @@
# Change Log
+⚠️ indicates a breaking change.
+
## [Unreleased]
[Details](/vdiff?from=lstngrp-0.0.3&to=trunk)
### Added
+- Support automatically closing connections when a listener is removed.
+
### Changed
+- ⚠️ Rather than have two different functions to add listeners, unify to a single
+ function, and use a builder-type `ListenerSpec` to pass to the
+ `add_listener()` method instead.
+- Update `idbag` to `0.2.0`.
+
### Removed
+
+- ⚠️ No longer allow `kill_conns` to be specified when killing a listener.
+ Rely on the `ListenerSpec` setting instead.
---
## [0.0.3] - 2024-09-18