Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "lstngrp" -version = "0.0.2" +version = "0.0.3" edition = "2021" license = "0BSD" # https://crates.io/category_slugs categories = [ "network-programming", "asynchronous" ] keywords = [ "network", "server", "listen", "protwrap" ] @@ -11,11 +11,13 @@ rust-version = "1.62" exclude = [ ".fossil-settings", ".efiles", ".fslckout", + "examples", "www", + "bacon.toml", "rustfmt.toml" ] # https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section [badges] @@ -27,14 +29,20 @@ killswitch = { version = "0.4.2" } parking_lot = { version = "0.12.3" } protwrap = { version = "0.3.0", features = [ "tls", "tokio" ] } -tokio = { version = "1.39.2", features = [ +tokio = { version = "1.40.0", features = [ "macros", "net", "rt", "sync" ] } [dev-dependencies] -tokio = { version = "1.39.2", features = [ +tokio = { version = "1.40.0", features = [ "io-util", "rt-multi-thread", "time" ] } +[lints.clippy] +all = { level = "deny", priority = -1 } +pedantic = { level = "warn", priority = -1 } +nursery = { level = "warn", priority = -1 } +cargo = { level = "warn", priority = -1 } + Index: README.md ================================================================== --- README.md +++ README.md @@ -1,5 +1,5 @@ -# listengroup +# lstngrp -`ListenGroup` is meant to collect a group of network listeners into a single +`ListenerGroup` is meant to collect a group of network listeners into a single entity, and help track connections made against those listeners. ADDED bacon.toml Index: bacon.toml ================================================================== --- /dev/null +++ bacon.toml @@ -0,0 +1,105 @@ +# This is a configuration file for the bacon tool +# +# Bacon repository: https://github.com/Canop/bacon +# Complete help on configuration: https://dystroy.org/bacon/config/ +# You can also check bacon's own bacon.toml file +# as an example: https://github.com/Canop/bacon/blob/main/bacon.toml + +# For information about clippy lints, see: +# https://github.com/rust-lang/rust-clippy/blob/master/README.md + +#default_job = "check" +default_job = "clippy-all" + +[jobs.check] +command = ["cargo", "check", "--color", "always"] +need_stdout = false + +[jobs.check-all] +command = ["cargo", "check", "--all-targets", "--color", "always"] +need_stdout = false + +# Run clippy on the default target +[jobs.clippy] +command = [ + "cargo", "clippy", + "--color", "always", +] +need_stdout = false + +# Run clippy on all targets +# To disable some lints, you may change the job this way: +# [jobs.clippy-all] +# command = [ +# "cargo", "clippy", +# "--all-targets", +# "--color", "always", +# "--", +# "-A", "clippy::bool_to_int_with_if", +# "-A", "clippy::collapsible_if", +# "-A", "clippy::derive_partial_eq_without_eq", +# ] +# need_stdout = false +[jobs.clippy-all] +command = [ + "cargo", "clippy", + "--all-targets", + "--color", "always", +] +need_stdout = false + +# This job lets you run +# - all tests: bacon test +# - a specific test: bacon test -- config::test_default_files +# - the tests of a package: bacon test -- -- -p config +[jobs.test] +command = [ + "cargo", "test", "--color", "always", + "--", "--color", "always", # see https://github.com/Canop/bacon/issues/124 +] +need_stdout = true + +[jobs.doc] +command = ["cargo", "doc", "--color", "always", "--no-deps"] +need_stdout = false + +# If the doc compiles, then it opens in your browser and bacon switches +# to the previous job +[jobs.doc-open] +command = ["cargo", "doc", "--color", "always", "--no-deps", "--open"] +need_stdout = false +on_success = "back" # so that we don't open the browser at each change + +# You can run your application and have the result displayed in bacon, +# *if* it makes sense for this crate. +# Don't forget the `--color always` part or the errors won't be +# properly parsed. +# If your program never stops (eg a server), you may set `background` +# to false to have the cargo run output immediately displayed instead +# of waiting for program's end. +[jobs.run] +command = [ + "cargo", "run", + "--color", "always", + # put launch parameters for your program behind a `--` separator +] +need_stdout = true +allow_warnings = true +background = true + +# This parameterized job runs the example of your choice, as soon +# as the code compiles. +# Call it as +# bacon ex -- my-example +[jobs.ex] +command = ["cargo", "run", "--color", "always", "--example"] +need_stdout = true +allow_warnings = true + +# You may define here keybindings that would be specific to +# a project, for example a shortcut to launch a specific job. +# Shortcuts to internal functions (scrolling, toggling, etc.) +# should go in your personal global prefs.toml file instead. +[keybindings] +# alt-m = "job:my-job" +c = "job:clippy-all" # comment this to have 'c' run clippy on only the default target Index: examples/per_listener_ctx.rs ================================================================== --- examples/per_listener_ctx.rs +++ examples/per_listener_ctx.rs @@ -50,11 +50,11 @@ _listener: &Listener, lid: Self::ListenIdent, err: std::io::Error ) { // An interface listener failed to bind to its interface - println!("Failed to set up listener lid={lid}; {}", err); + println!("Failed to set up listener lid={lid}; {err}"); // Note: Thanks to add_listener_with_cb() being used to register listeners // with a callback, where the callback is responsible for registering // the listener-specific context, we don't need to worry about // deregistering the context here. @@ -83,11 +83,11 @@ lid, ci.id.val() ); Self::ConnHandler { listen_id: lid, - conn_id: ci.id.clone() + conn_id: ci.id } } } @@ -137,45 +137,39 @@ // Kick off a listener with id 1, which is a TCP localhost:8080 listener let id = 1; let listener = Listener::from_str("127.0.0.1:8080").unwrap(); let lm = Arc::clone(&lmap); - lgrp - .add_listener_with_cb(id, listener, move |lid| { - let name = format!("Listener {}", id); - lm.lock().insert(lid, ListenCtx { name }); - }) - .await; + lgrp.add_listener_with_cb(id, listener, move |lid| { + let name = format!("Listener {id}"); + lm.lock().insert(lid, ListenCtx { name }); + }); // Give listeners a second to start up tokio::time::sleep(std::time::Duration::from_millis(200)).await; // Kick off a listener with id 9, which is a TCP localhost:8080 listener, // which should fail because it is bound by listener with id 1. let id = 9; let listener = Listener::from_str("127.0.0.1:8080").unwrap(); let lm = Arc::clone(&lmap); - lgrp - .add_listener_with_cb(id, listener, move |lid| { - let name = format!("Listener {}", id); - lm.lock().insert(lid, ListenCtx { name }); - }) - .await; + lgrp.add_listener_with_cb(id, listener, move |lid| { + let name = format!("Listener {id}"); + lm.lock().insert(lid, ListenCtx { name }); + }); // Give listener a second to start up tokio::time::sleep(std::time::Duration::from_millis(200)).await; // Kick off a listener with id 3, which is a TCP localhost:8081 listener let id = 2; let listener = Listener::from_str("127.0.0.1:8081").unwrap(); let lm = Arc::clone(&lmap); - lgrp - .add_listener_with_cb(id, listener, move |lid| { - let name = format!("Listener {}", id); - lm.lock().insert(lid, ListenCtx { name }); - }) - .await; + lgrp.add_listener_with_cb(id, listener, move |lid| { + let name = format!("Listener {id}"); + lm.lock().insert(lid, ListenCtx { name }); + }); // Give listener a second to start up tokio::time::sleep(std::time::Duration::from_millis(200)).await; Index: examples/simple.rs ================================================================== --- examples/simple.rs +++ examples/simple.rs @@ -25,11 +25,11 @@ &self, _listener: &Listener, lid: Self::ListenIdent, err: std::io::Error ) { - println!("Failed to set up listener {}; {}", lid, err); + println!("Failed to set up listener {lid}; {err}"); } async fn unbound(&self, _listener: &Listener, _lid: Self::ListenIdent) { //println!("Listener {} has been unbound", lctx.listen_id); } @@ -44,11 +44,11 @@ lid, ci.id.val() ); Self::ConnHandler { listen_id: lid, - conn_id: ci.id.clone() + conn_id: ci.id } } } @@ -89,29 +89,29 @@ let lgrp = ListenerGroup::new(handler); // Kick off a listener with id 1, which is a TCP localhost:8080 listener let id = 1; let listener = Listener::from_str("127.0.0.1:8080").unwrap(); - lgrp.add_listener(id, listener).await; + lgrp.add_listener(id, listener); // Give listeners a second to start up tokio::time::sleep(std::time::Duration::from_millis(200)).await; // Kick off a listener with id 9, which is a TCP localhost:8080 listener, // which should fail because it is bound by listener with id 1. let id = 9; let listener = Listener::from_str("127.0.0.1:8080").unwrap(); - lgrp.add_listener(id, listener).await; + lgrp.add_listener(id, listener); // Give listener a second to start up tokio::time::sleep(std::time::Duration::from_millis(200)).await; // Kick off a listener with id 3, which is a TCP localhost:8081 listener let id = 2; let listener = Listener::from_str("127.0.0.1:8081").unwrap(); - lgrp.add_listener(id, listener).await; + lgrp.add_listener(id, listener); // Give listener a second to start up tokio::time::sleep(std::time::Duration::from_millis(200)).await; Index: src/conn.rs ================================================================== --- src/conn.rs +++ src/conn.rs @@ -15,11 +15,11 @@ use super::{ConnHandler, ConnInfo}; /// Internal per-connection data. -pub(crate) struct ConnectionData { +pub struct ConnectionData { /// The listener identifier this connection was spawned from. pub(crate) lid: LI, /// Join handle for connection task. pub(crate) jh: Option>, @@ -27,11 +27,15 @@ /// Connection handler pub(crate) chandler: Arc } -pub(crate) struct ConnParams { +pub struct Params +where + LI: Send, + CC: Send +{ pub(crate) lid: LI, pub(crate) cid: ArcId, pub(crate) lhandler: Arc>, pub(crate) sa: SockAddr, pub(crate) strm: Stream, @@ -40,23 +44,23 @@ /// Internal wrapper used to call the application's connection handler. /// /// This function is called on a newly spawned task intended to manage the /// connection. -pub(crate) async fn task( - ConnParams { +pub async fn task( + Params { lid, cid, lhandler, sa, strm, cmap - }: ConnParams, + }: Params, rx: oneshot::Receiver> ) where - LI: Clone, - CC: ConnHandler + LI: Clone + Send, + CC: ConnHandler + Send + Sync { let ci = ConnInfo { sa: Arc::new(sa), id: cid.clone() }; Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -62,14 +62,14 @@ pub use idbag::ArcId; use idbag::IdBag; use conn::ConnectionData; -use listener::{ListenTaskParams, ListenerData}; +use listener::{ListenData, ListenTaskParams}; -/// Internal alias for a GroupHandler with a generic listener identifier and +/// Internal alias for a `GroupHandler` with a generic listener identifier and /// connection handler. type LCHandler = dyn GroupHandler + Send + Sync + 'static; @@ -177,11 +177,11 @@ LI: Hash + Eq { /// Keep track of all listeners, identified by `LI` and all their active /// connections, identified by an `ArcId`, with their connection context, /// identified by `CC`. - lmap: Arc>>, + lmap: Arc>>, cmap: Arc>>>, /// Connection handler used by all the listeners within this group. lhandler: Arc>, @@ -219,11 +219,11 @@ /// Register a new listener within this group. /// /// `id` is the unique identifier of this listener. /// `listner_ctx` is a listener-specific instance of a listener context that /// will be put in an `Arc` and passed to each connector. - pub async fn add_listener(&self, id: LI, listener: Listener) { + pub fn add_listener(&self, id: LI, listener: Listener) { // Do not allow new listeners if the ListenerGroup is either in the process // of shutting down or has been shut down. if self.shutdown.load(Ordering::Relaxed) { return; } @@ -252,11 +252,13 @@ // Spawn a task to initialize and run the listener. let jh = task::spawn(listener::task(ltp, |_lid| {}, rx)); // Send JoinHandle to listener task, so it can store it in the ListenerData // structure - tx.send(jh).unwrap(); + if let Err(_e) = tx.send(jh) { + // ToDo: error-management + } } /// Register off a listener within this group. /// /// This works much like `add_listener()`, but it will call a callback, @@ -264,16 +266,12 @@ /// /// The callback can be useful to register a listener-specific context in the /// listen group handler's object since it is only called on success. This /// way the application does not need to deregister the listener context for /// the failure case. - pub async fn add_listener_with_cb( - &self, - lid: LI, - listener: Listener, - f: F - ) where + pub fn add_listener_with_cb(&self, lid: LI, listener: Listener, f: F) + where F: FnOnce(LI) + Send + 'static { // Do not allow new listeners if the ListenerGroup is either in the process // of shutting down or has been shut down. if self.shutdown.load(Ordering::Relaxed) { @@ -305,17 +303,19 @@ // Spawn a task to initialize and run the listener. let jh = task::spawn(listener::task(ltp, f, rx)); // Send JoinHandle to listener task, so it can store it in the ListenerData // structure - tx.send(jh).unwrap(); + if let Err(_e) = tx.send(jh) { + // ToDo: error-handling + } } /// Check if there's a listener with the identifier `LI`. - pub async fn have_listener(&mut self, id: LI) -> bool { - self.lmap.lock().contains_key(&id) + pub fn have_listener(&mut self, id: &LI) -> bool { + self.lmap.lock().contains_key(id) } /// Kill the listener given a specific listener id. /// /// If `kill_conns` is `true`, the killswitch of all of the listeners active @@ -414,10 +414,11 @@ self.kill_listener(lid, true).await; } } /// Get a snap-shot of the current listener group state. + #[must_use] pub fn current_state(&self) -> Vec> { // Generate list of all listeners let mut ln: Vec> = self .lmap .lock() Index: src/listener.rs ================================================================== --- src/listener.rs +++ src/listener.rs @@ -23,16 +23,16 @@ }; pub use idbag::{ArcId, IdBag}; use crate::{ - conn::{self, ConnParams, ConnectionData}, + conn::{self, ConnectionData, Params as ConnParams}, ConnHandler, LCHandler }; /// Internal per-listener data. -pub(crate) struct ListenerData { +pub struct ListenData { /// A string representation of the address the listener has been bound to pub(crate) addr: Option, /// The kill switch used to terminate this listener. pub(crate) ks: KillSwitch, @@ -49,17 +49,17 @@ /// /// The primary purpose of this is to act as a callback handler for /// `protwrap`'s [`Acceptor`]. struct InternalListenerCallback { jh: Option>, - lmap: Arc>>, + lmap: Arc>>, cmap: Arc>>>, lid: LI, ks: KillSwitch, lhandler: Arc>, - /// IdBag used to allocate unique connection id's. + /// `IdBag` used to allocate unique connection id's. idbag: Arc, /// Optional post-successful-bind callback. cb: Option>, @@ -85,20 +85,19 @@ // The JoinHandle is expected to exist. let jh = self.jh.take(); // Allocate the buffer used to store listener data in the listners map - let ldata = ListenerData { + let ldata = ListenData { ks: self.ks.clone(), jh, addr, auto_kill_conns: false }; // Add listener to listener map - let mut g = self.lmap.lock(); - g.insert(self.lid.clone(), ldata); + self.lmap.lock().insert(self.lid.clone(), ldata); // Call the callback, in case the application wants to do some // post-successful-bound processing. if let Some(cb) = self.cb.take() { cb(self.lid.clone()); @@ -124,11 +123,11 @@ // Allocate a unique connection id for this connection. let cid = self.idbag.alloc().into_arcid(); let cp = ConnParams { lid: self.lid.clone(), - cid: cid.clone(), + cid, lhandler: Arc::clone(&self.lhandler), sa, strm, cmap: Arc::clone(&self.cmap) }; @@ -149,17 +148,17 @@ } } /// Used to bundle data being passed to listener task. -pub(crate) struct ListenTaskParams +pub struct ListenTaskParams where LI: Send { pub(crate) lid: LI, pub(crate) listener: Listener, - pub(crate) lmap: Arc>>, + pub(crate) lmap: Arc>>, pub(crate) lhandler: Arc>, pub(crate) cmap: Arc>>>, pub(crate) idbag: Arc, pub(crate) shutdown: Arc } @@ -169,11 +168,11 @@ /// This should be spawned on a new task. /// /// `cb` is a callback that will be called iff the listener was successfully /// bound. The application can use this to perform a custom action when the /// bound was successful. -pub(crate) async fn task( +pub async fn task( ltp: ListenTaskParams, cb: F, rx: oneshot::Receiver> ) where LI: Hash + Eq + Send + Sync + Clone + 'static, @@ -200,11 +199,11 @@ // Kick off the listerner. // // If successful, this will block until terminated using the killswitch. match ltp.listener.run(ks, ilcb).await { - Ok(_) => { + Ok(()) => { // Do nothing -- unbound is handled by Acceptor::unbound() //ltp.handler.unbound(ltp.lid.clone(), <p.lctx).await; } Err(e) => { // Listener failed -- report back to application using callback Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -1,17 +1,27 @@ # Change Log ## [Unreleased] -[Details](/vdiff?from=lstngrp-0.0.2&to=trunk) +[Details](/vdiff?from=lstngrp-0.0.3&to=trunk) ### Added ### Changed ### Removed +--- + +## [0.0.3] - 2024-09-18 + +[Details](/vdiff?from=lstngrp-0.0.2&to=lstngrp-0.0.3) + +### Changed + +- Add `Send` bounds to make generated `Future`s `Send`. + --- ## [0.0.2] - 2024-08-20 [Details](/vdiff?from=lstngrp-0.0.1&to=lstngrp-0.0.2) Index: www/index.md ================================================================== --- www/index.md +++ www/index.md @@ -1,8 +1,8 @@ -# listengroup +# lstngrp -`ListenGroup` is meant to collect a group of network listeners into a single +`ListenerGroup` is meant to collect a group of network listeners into a single entity, and help track connections made against those listeners. ## Change log @@ -11,7 +11,7 @@ maintained [Change Log](./changelog.md). ## Project Status -_lstconn_ is in early prototyping stages. +_lstngrp_ is in early prototyping stages.