Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "lstngrp" -version = "0.0.3" +version = "0.0.4" edition = "2021" license = "0BSD" # https://crates.io/category_slugs categories = [ "network-programming", "asynchronous" ] keywords = [ "network", "server", "listen", "protwrap" ] @@ -23,11 +23,11 @@ [badges] maintenance = { status = "experimental" } [dependencies] hashbrown = { version = "0.14.5" } -idbag = { version = "0.1.2" } +idbag = { version = "0.2.0" } killswitch = { version = "0.4.2" } parking_lot = { version = "0.12.3" } protwrap = { version = "0.3.0", features = [ "tls", "tokio" ] } @@ -38,11 +38,14 @@ [dev-dependencies] tokio = { version = "1.40.0", features = [ "io-util", "rt-multi-thread", "time" ] } +[package.metadata.docs.rs] +rustdoc-args = ["--generate-link-to-definition"] + [lints.clippy] all = { level = "deny", priority = -1 } pedantic = { level = "warn", priority = -1 } nursery = { level = "warn", priority = -1 } cargo = { level = "warn", priority = -1 } Index: examples/per_listener_ctx.rs ================================================================== --- examples/per_listener_ctx.rs +++ examples/per_listener_ctx.rs @@ -15,12 +15,12 @@ use parking_lot::Mutex; use hashbrown::HashMap; use lstngrp::{ - async_trait, ArcId, ConnHandler, ConnInfo, GroupHandler, Listener, - ListenerGroup, SockAddr, Stream + async_trait, ArcId, ConnHandler, ConnInfo, GroupHandler, ListenGroup, + Listener, ListenerSpec, SockAddr, Stream }; struct ListenCtx { name: String } @@ -131,45 +131,57 @@ lmap: Arc::clone(&lmap) }; // Create a listener group that will use an instance of MyHandler to process // callbacks. - let lgrp = ListenerGroup::new(handler); + let lgrp = ListenGroup::new(handler); // Kick off a listener with id 1, which is a TCP localhost:8080 listener let id = 1; let listener = Listener::from_str("127.0.0.1:8080").unwrap(); let lm = Arc::clone(&lmap); - lgrp.add_listener_with_cb(id, listener, move |lid| { - let name = format!("Listener {id}"); - lm.lock().insert(lid, ListenCtx { name }); - }); + lgrp.add_listener( + ListenerSpec::new(id, listener) + .autokill_conns() + .bound_callback(move |lid| { + let name = format!("Listener {id}"); + lm.lock().insert(lid, ListenCtx { name }); + }) + ); // Give listeners a second to start up tokio::time::sleep(std::time::Duration::from_millis(200)).await; // Kick off a listener with id 9, which is a TCP localhost:8080 listener, // which should fail because it is bound by listener with id 1. let id = 9; let listener = Listener::from_str("127.0.0.1:8080").unwrap(); let lm = Arc::clone(&lmap); - lgrp.add_listener_with_cb(id, listener, move |lid| { - let name = format!("Listener {id}"); - lm.lock().insert(lid, ListenCtx { name }); - }); + lgrp.add_listener( + ListenerSpec::new(id, listener) + .autokill_conns() + .bound_callback(move |lid| { + let name = format!("Listener {id}"); + lm.lock().insert(lid, ListenCtx { name }); + }) + ); // Give listener a second to start up tokio::time::sleep(std::time::Duration::from_millis(200)).await; // Kick off a listener with id 3, which is a TCP localhost:8081 listener let id = 2; let listener = Listener::from_str("127.0.0.1:8081").unwrap(); let lm = Arc::clone(&lmap); - lgrp.add_listener_with_cb(id, listener, move |lid| { - let name = format!("Listener {id}"); - lm.lock().insert(lid, ListenCtx { name }); - }); + lgrp.add_listener( + ListenerSpec::new(id, listener) + .autokill_conns() + .bound_callback(move |lid| { + let name = format!("Listener {id}"); + lm.lock().insert(lid, ListenCtx { name }); + }) + ); // Give listener a second to start up tokio::time::sleep(std::time::Duration::from_millis(200)).await; @@ -198,12 +210,12 @@ */ tokio::time::sleep(std::time::Duration::from_secs(1)).await; - lgrp.kill_listener(id, false).await; + lgrp.kill_listener(id).await; lgrp.shutdown().await; } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: examples/simple.rs ================================================================== --- examples/simple.rs +++ examples/simple.rs @@ -1,10 +1,10 @@ use std::str::FromStr; use lstngrp::{ - async_trait, ArcId, ConnHandler, ConnInfo, GroupHandler, Listener, - ListenerGroup, SockAddr, Stream + async_trait, ArcId, ConnHandler, ConnInfo, GroupHandler, ListenGroup, + Listener, ListenerSpec, SockAddr, Stream }; struct MyHandler {} #[async_trait] @@ -84,34 +84,34 @@ // Create handler object let handler = MyHandler {}; // Create a listener group that will use an instance of MyHandler to process // callbacks. - let lgrp = ListenerGroup::new(handler); + let lgrp = ListenGroup::new(handler); // Kick off a listener with id 1, which is a TCP localhost:8080 listener let id = 1; let listener = Listener::from_str("127.0.0.1:8080").unwrap(); - lgrp.add_listener(id, listener); + lgrp.add_listener(ListenerSpec::new(id, listener).autokill_conns()); // Give listeners a second to start up tokio::time::sleep(std::time::Duration::from_millis(200)).await; // Kick off a listener with id 9, which is a TCP localhost:8080 listener, // which should fail because it is bound by listener with id 1. let id = 9; let listener = Listener::from_str("127.0.0.1:8080").unwrap(); - lgrp.add_listener(id, listener); + lgrp.add_listener(ListenerSpec::new(id, listener).autokill_conns()); // Give listener a second to start up tokio::time::sleep(std::time::Duration::from_millis(200)).await; // Kick off a listener with id 3, which is a TCP localhost:8081 listener let id = 2; let listener = Listener::from_str("127.0.0.1:8081").unwrap(); - lgrp.add_listener(id, listener); + lgrp.add_listener(ListenerSpec::new(id, listener).autokill_conns()); // Give listener a second to start up tokio::time::sleep(std::time::Duration::from_millis(200)).await; @@ -140,12 +140,12 @@ */ tokio::time::sleep(std::time::Duration::from_secs(1)).await; - lgrp.kill_listener(id, false).await; + lgrp.kill_listener(id).await; lgrp.shutdown().await; } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/conn.rs ================================================================== --- src/conn.rs +++ src/conn.rs @@ -9,11 +9,11 @@ pub use protwrap::tokio::server::{ listener::{Acceptor, SockAddr}, Stream }; -pub use idbag::ArcId; +pub use idbag::ArcIdUsize as ArcId; use super::{ConnHandler, ConnInfo}; /// Internal per-connection data. Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -1,13 +1,13 @@ //! Abstraction over a group of listeners and connections generated by them. //! -//! A [`ListenerGroup`] is a set of network listeners that are bound to the +//! A [`ListenGroup`] is a set of network listeners that are bound to the //! same basic connection handler logic. //! //! # Usage overview -//! An application calls [`ListenerGroup::new()`] to create a new -//! `ListenerGroup` object. To the `new()` function it must pass an object +//! An application calls [`ListenGroup::new()`] to create a new +//! `ListenGroup` object. To the `new()` function it must pass an object //! that implements the [`GroupHandler`] trait. The object that implements //! `GroupHandler` is responsible the shared data among all listeners while the //! implementation of the `GroupHandler`'s trait methods are responsible for //! the shared logic. Of special note is the [`GroupHandler::connected()`] //! trait method, whose responsibility it is to return an object that @@ -18,21 +18,21 @@ //! established. The application should implement the connection management //! logic in this method. When the [`ConnHandler::kill()`] is called, the //! application must perform some logic that will abort the connection and //! return from the `ConnHandler::run()` implementation. //! -//! Once a `ListenerGroup` object has been created, the application calls -//! [`ListenerGroup::add_listener()`] to add a listener. A background task +//! Once a `ListenGroup` object has been created, the application calls +//! [`ListenGroup::add_listener()`] to add a listener. A background task //! will begin listening for incoming connections immediately. //! -//! When an application is done with a `ListenerGroup` it can call -//! [`ListenerGroup::shutdown()`] to shut down all listeners and connections in +//! When an application is done with a `ListenGroup` it can call +//! [`ListenGroup::shutdown()`] to shut down all listeners and connections in //! an orderly fashion (as long as the `ConnHandler::kill()` method has been //! correctly implemented by the application). //! //! # Per-listener context -//! The `ListenerGroup` uses a single `GroupHandler` object for all listeners. +//! The `ListenGroup` uses a single `GroupHandler` object for all listeners. //! If the application needs per-listener-specific data, it can use an //! associative container (like a `HashMap`) in its `GroupHandler` object to //! map listeners' unique identifiers to listener-specific contexts in the //! container. See the `per_listener_ctx` example in lstngrp's repository for //! an example implementation of this. @@ -57,12 +57,12 @@ pub use protwrap::tokio::server::{ listener::{async_trait, Listener, SockAddr}, Stream }; -pub use idbag::ArcId; -use idbag::IdBag; +pub use idbag::ArcIdUsize as ArcId; +use idbag::IdBagUsize as IdBag; use conn::ConnectionData; use listener::{ListenData, ListenTaskParams}; @@ -160,10 +160,53 @@ /// background connection task has terminated. In order to avoid holding up /// the shutdown process, termination should be quick. fn kill(&self); } + +/// Builder for listeners to be added to a [`ListenGroup`]. +pub struct ListenerSpec
  • { + id: LI, + listener: Listener, + autokill: bool, + bound_cb: Option> +} + +impl
  • ListenerSpec
  • { + /// Create a new listener spec. + pub fn new(id: LI, listener: Listener) -> Self { + Self { + id, + listener, + autokill: false, + bound_cb: None + } + } + + /// Automatically request that connections terminate once the listener is + /// terminated. + #[must_use] + pub const fn autokill_conns(mut self) -> Self { + self.autokill = true; + self + } + + /// Add a post-successful-bound callback. + /// + /// The callback can be useful to register a listener-specific context in the + /// listen group handler's object since it is only called on success. This + /// way the application does not need to deregister the listener context for + /// the failure case. + #[must_use] + pub fn bound_callback( + mut self, + f: impl FnOnce(LI) + Send + 'static + ) -> Self { + self.bound_cb = Some(Box::new(f)); + self + } +} /// Representation of a group of listeners. /// /// Each listener will use the same connection handler. /// @@ -170,11 +213,11 @@ /// Generics: /// - `LI` - Listener identifier. Used by the application to uniquely identify /// a specific listener. /// - `CI` - Connection information. Each connection is allicated a connection /// information context. -pub struct ListenerGroup +pub struct ListenGroup where LI: Hash + Eq { /// Keep track of all listeners, identified by `LI` and all their active /// connections, identified by an `ArcId`, with their connection context, @@ -190,11 +233,11 @@ shutdown: Arc } -impl ListenerGroup +impl ListenGroup where LI: Hash + Eq + Send + Sync + Clone + 'static, CC: ConnHandler + Send + Sync + 'static { /// Create a new listener group. @@ -215,103 +258,58 @@ shutdown: Arc::new(AtomicBool::new(false)) } } /// Register a new listener within this group. - /// - /// `id` is the unique identifier of this listener. - /// `listner_ctx` is a listener-specific instance of a listener context that - /// will be put in an `Arc` and passed to each connector. - pub fn add_listener(&self, id: LI, listener: Listener) { - // Do not allow new listeners if the ListenerGroup is either in the process + pub fn add_listener(&self, spec: ListenerSpec
  • ) { + // Do not allow new listeners if the ListenGroup is either in the process // of shutting down or has been shut down. if self.shutdown.load(Ordering::Relaxed) { return; } let g = self.lmap.lock(); - if g.contains_key(&id) { + if g.contains_key(&spec.id) { // Listener already running return; } drop(g); let ltp = ListenTaskParams { - lid: id, - listener, + lid: spec.id, + listener: spec.listener, lmap: Arc::clone(&self.lmap), lhandler: Arc::clone(&self.lhandler), cmap: Arc::clone(&self.cmap), idbag: Arc::clone(&self.idbag), - shutdown: Arc::clone(&self.shutdown) + shutdown: Arc::clone(&self.shutdown), + autoclose_conns: spec.autokill }; // Need to send the listener task's JoinHandle to the task so it can be // stored in the listener's map let (tx, rx) = oneshot::channel(); + #[allow(clippy::option_if_let_else)] + let cb = if let Some(cb) = spec.bound_cb { + cb + } else { + Box::new(|_lid| {}) + }; + + // suggested by clippy, but causes build error + /* let cb = spec.bound_cb.map_or_else(|| Box::new(|_lid| {}), |cb| cb); */ + // Spawn a task to initialize and run the listener. - let jh = task::spawn(listener::task(ltp, |_lid| {}, rx)); + let jh = task::spawn(listener::task(ltp, cb, rx)); // Send JoinHandle to listener task, so it can store it in the ListenerData // structure if let Err(_e) = tx.send(jh) { // ToDo: error-management } } - - /// Register off a listener within this group. - /// - /// This works much like `add_listener()`, but it will call a callback, - /// passed through `f`, if the bind was successful. - /// - /// The callback can be useful to register a listener-specific context in the - /// listen group handler's object since it is only called on success. This - /// way the application does not need to deregister the listener context for - /// the failure case. - pub fn add_listener_with_cb(&self, lid: LI, listener: Listener, f: F) - where - F: FnOnce(LI) + Send + 'static - { - // Do not allow new listeners if the ListenerGroup is either in the process - // of shutting down or has been shut down. - if self.shutdown.load(Ordering::Relaxed) { - return; - } - - - let g = self.lmap.lock(); - if g.contains_key(&lid) { - // A listener with this listener id is already running - return; - } - drop(g); - - let ltp = ListenTaskParams { - lid, - listener, - lmap: Arc::clone(&self.lmap), - lhandler: Arc::clone(&self.lhandler), - cmap: Arc::clone(&self.cmap), - idbag: Arc::clone(&self.idbag), - shutdown: Arc::clone(&self.shutdown) - }; - - // Need to send the listener task's JoinHandle to the task so it can be - // stored in the listener's map - let (tx, rx) = oneshot::channel(); - - // Spawn a task to initialize and run the listener. - let jh = task::spawn(listener::task(ltp, f, rx)); - - // Send JoinHandle to listener task, so it can store it in the ListenerData - // structure - if let Err(_e) = tx.send(jh) { - // ToDo: error-handling - } - } - /// Check if there's a listener with the identifier `LI`. pub fn have_listener(&mut self, id: &LI) -> bool { self.lmap.lock().contains_key(id) } @@ -318,11 +316,11 @@ /// Kill the listener given a specific listener id. /// /// If `kill_conns` is `true`, the killswitch of all of the listeners active /// connections will be triggered as well. - pub async fn kill_listener(&self, lid: LI, kill_conns: bool) { + pub async fn kill_listener(&self, lid: LI) { // Extraction of join handle placed in a scope so that clippy doesn't think // the lock is held over an await point. let mut jh = { let mut lmap = self.lmap.lock(); let Some(ldata) = lmap.get_mut(&lid) else { @@ -337,32 +335,15 @@ drop(lmap); jh }; - // If the listener data contained a join handle then use it to wait for // the task to terminate. if let Some(jh) = jh.take() { let _ = jh.await; } - - if kill_conns { - // Generate a list of connection id's to be terminated. - let cids: Vec = self - .cmap - .lock() - .iter() - .filter_map(|(cid, cdata)| (cdata.lid == lid).then_some(cid.clone())) - .collect(); - - // ToDo: If all the connnections' killswitches are triggered here - // instead, they could terminate concurrently. - for cid in cids { - self.kill_connection(cid).await; - } - } } /// Terminate a connection given a connection identifier. /// /// This function will not return until the background connection task has @@ -409,11 +390,11 @@ .lock() .iter() .map(|(lid, _)| lid.clone()) .collect(); for lid in lids { - self.kill_listener(lid, true).await; + self.kill_listener(lid).await; } } /// Get a snap-shot of the current listener group state. #[must_use] Index: src/listener.rs ================================================================== --- src/listener.rs +++ src/listener.rs @@ -20,11 +20,11 @@ pub use protwrap::tokio::server::{ listener::{async_trait, Acceptor, Listener, SockAddr}, Stream }; -pub use idbag::{ArcId, IdBag}; +pub use idbag::{ArcIdUsize as ArcId, IdBagUsize as IdBag}; use crate::{ conn::{self, ConnectionData, Params as ConnParams}, ConnHandler, LCHandler }; @@ -61,11 +61,13 @@ idbag: Arc, /// Optional post-successful-bind callback. cb: Option>, - shutdown: Arc + shutdown: Arc, + + auto_kill_conns: bool } #[async_trait] impl Acceptor for InternalListenerCallback where @@ -89,11 +91,11 @@ // Allocate the buffer used to store listener data in the listners map let ldata = ListenData { ks: self.ks.clone(), jh, addr, - auto_kill_conns: false + auto_kill_conns: self.auto_kill_conns }; // Add listener to listener map self.lmap.lock().insert(self.lid.clone(), ldata); @@ -158,11 +160,12 @@ pub(crate) listener: Listener, pub(crate) lmap: Arc>>, pub(crate) lhandler: Arc>, pub(crate) cmap: Arc>>>, pub(crate) idbag: Arc, - pub(crate) shutdown: Arc + pub(crate) shutdown: Arc, + pub(crate) autoclose_conns: bool } /// Internal listener. /// /// This should be spawned on a new task. @@ -192,11 +195,12 @@ ks: ks.clone(), lhandler: Arc::clone(<p.lhandler), idbag: ltp.idbag, cb: Some(Box::new(cb)), jh: Some(jh), - shutdown: Arc::clone(<p.shutdown) + shutdown: Arc::clone(<p.shutdown), + auto_kill_conns: ltp.autoclose_conns }; // Kick off the listerner. // // If successful, this will block until terminated using the killswitch. @@ -210,26 +214,36 @@ ltp.lhandler.failed(<p.listener, ltp.lid.clone(), e).await; } } // Deregister the listener from the listener map - let mut g = ltp.lmap.lock(); - let Some(ldata) = g.remove(<p.lid) else { - // No such id found - return; + let ldata = { + let mut g = ltp.lmap.lock(); + let Some(ldata) = g.remove(<p.lid) else { + // No such id found + return; + }; + drop(g); + ldata }; - drop(g); // If the listener is configured to automatically kill client connections, // then do so + let mut cjhs = Vec::new(); if ldata.auto_kill_conns { - // ToDo: Kill all connections - /* - let g = ldata.cmap.lock(); - for (_id, chandler) in g.iter() { + //if ltp.autoclose_conns { + let mut g = ltp.cmap.lock(); + for (chandler, jh) in g.iter_mut().filter_map(|(_id, cdata)| { + (cdata.lid == ltp.lid).then_some((&cdata.chandler, cdata.jh.take())) + }) { chandler.kill(); + if let Some(jh) = jh { + cjhs.push(jh); + } } - */ + } + for jh in cjhs { + let _ = jh.await; } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -1,16 +1,28 @@ # Change Log +⚠️ indicates a breaking change. + ## [Unreleased] [Details](/vdiff?from=lstngrp-0.0.3&to=trunk) ### Added +- Support automatically closing connections when a listener is removed. + ### Changed +- ⚠️ Rather than have two different functions to add listeners, unify to a single + function, and use a builder-type `ListenerSpec` to pass to the + `add_listener()` method instead. +- Update `idbag` to `0.2.0`. + ### Removed + +- ⚠️ No longer allow `kill_conns` to be specified when killing a listener. + Rely on the `ListenerSpec` setting instead. --- ## [0.0.3] - 2024-09-18