lstngrp

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From lstngrp-0.0.2 To lstngrp-0.0.3

2024-09-18
10:23
Generate links to definition on docs.rs. check-in: ea1e0ea1f0 user: jan tags: trunk
10:21
Release maintenance. check-in: 86bd36e830 user: jan tags: lstngrp-0.0.3, trunk
10:13
Exclude examples from packaging. check-in: 2b0bfea373 user: jan tags: trunk
2024-09-10
03:22
Begin pedantic clippy work. check-in: 62b3c6f91b user: jan tags: trunk
2024-08-20
17:32
Release maintenance. check-in: d6addc5963 user: jan tags: lstngrp-0.0.2, trunk
17:26
Fix Windows build. check-in: e1fe5a161f user: jan tags: trunk

Changes to Cargo.toml.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

16

17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40






[package]
name = "lstngrp"
version = "0.0.2"
edition = "2021"
license = "0BSD"
# https://crates.io/category_slugs
categories = [ "network-programming", "asynchronous" ]
keywords = [ "network", "server", "listen", "protwrap" ]
repository = "https://repos.qrnch.tech/pub/lstngrp"
description = "Groups listeners and connections with common data/logic."
rust-version = "1.62"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",

  "www",

  "rustfmt.toml"
]

# https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section
[badges]
maintenance = { status = "experimental" }

[dependencies]
hashbrown = { version = "0.14.5" }
idbag = { version = "0.1.2" }
killswitch = { version = "0.4.2" }
parking_lot = { version = "0.12.3" }
protwrap = { version = "0.3.0", features = [
  "tls", "tokio"
] }
tokio = { version = "1.39.2", features = [
  "macros", "net", "rt", "sync"
] }

[dev-dependencies]
tokio = { version = "1.39.2", features = [
  "io-util", "rt-multi-thread", "time"
] }









|












>

>















|




|



>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
[package]
name = "lstngrp"
version = "0.0.3"
edition = "2021"
license = "0BSD"
# https://crates.io/category_slugs
categories = [ "network-programming", "asynchronous" ]
keywords = [ "network", "server", "listen", "protwrap" ]
repository = "https://repos.qrnch.tech/pub/lstngrp"
description = "Groups listeners and connections with common data/logic."
rust-version = "1.62"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "examples",
  "www",
  "bacon.toml",
  "rustfmt.toml"
]

# https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section
[badges]
maintenance = { status = "experimental" }

[dependencies]
hashbrown = { version = "0.14.5" }
idbag = { version = "0.1.2" }
killswitch = { version = "0.4.2" }
parking_lot = { version = "0.12.3" }
protwrap = { version = "0.3.0", features = [
  "tls", "tokio"
] }
tokio = { version = "1.40.0", features = [
  "macros", "net", "rt", "sync"
] }

[dev-dependencies]
tokio = { version = "1.40.0", features = [
  "io-util", "rt-multi-thread", "time"
] }

[lints.clippy]
all = { level = "deny", priority = -1 }
pedantic = { level = "warn", priority = -1 }
nursery = { level = "warn", priority = -1 }
cargo = { level = "warn", priority = -1 }

Changes to README.md.

1
2
3
4
5
# listengroup

`ListenGroup` is meant to collect a group of network listeners into a single
entity, and help track connections made against those listeners.

|

|


1
2
3
4
5
# lstngrp

`ListenerGroup` is meant to collect a group of network listeners into a single
entity, and help track connections made against those listeners.

Added bacon.toml.



















































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# This is a configuration file for the bacon tool
#
# Bacon repository: https://github.com/Canop/bacon
# Complete help on configuration: https://dystroy.org/bacon/config/
# You can also check bacon's own bacon.toml file
#  as an example: https://github.com/Canop/bacon/blob/main/bacon.toml

# For information about clippy lints, see:
# https://github.com/rust-lang/rust-clippy/blob/master/README.md

#default_job = "check"
default_job = "clippy-all"

[jobs.check]
command = ["cargo", "check", "--color", "always"]
need_stdout = false

[jobs.check-all]
command = ["cargo", "check", "--all-targets", "--color", "always"]
need_stdout = false

# Run clippy on the default target
[jobs.clippy]
command = [
    "cargo", "clippy",
    "--color", "always",
]
need_stdout = false

# Run clippy on all targets
# To disable some lints, you may change the job this way:
#    [jobs.clippy-all]
#    command = [
#        "cargo", "clippy",
#        "--all-targets",
#        "--color", "always",
#    	 "--",
#    	 "-A", "clippy::bool_to_int_with_if",
#    	 "-A", "clippy::collapsible_if",
#    	 "-A", "clippy::derive_partial_eq_without_eq",
#    ]
# need_stdout = false
[jobs.clippy-all]
command = [
    "cargo", "clippy",
    "--all-targets",
    "--color", "always",
]
need_stdout = false

# This job lets you run
# - all tests: bacon test
# - a specific test: bacon test -- config::test_default_files
# - the tests of a package: bacon test -- -- -p config
[jobs.test]
command = [
    "cargo", "test", "--color", "always",
    "--", "--color", "always", # see https://github.com/Canop/bacon/issues/124
]
need_stdout = true

[jobs.doc]
command = ["cargo", "doc", "--color", "always", "--no-deps"]
need_stdout = false

# If the doc compiles, then it opens in your browser and bacon switches
# to the previous job
[jobs.doc-open]
command = ["cargo", "doc", "--color", "always", "--no-deps", "--open"]
need_stdout = false
on_success = "back" # so that we don't open the browser at each change

# You can run your application and have the result displayed in bacon,
# *if* it makes sense for this crate.
# Don't forget the `--color always` part or the errors won't be
# properly parsed.
# If your program never stops (eg a server), you may set `background`
# to false to have the cargo run output immediately displayed instead
# of waiting for program's end.
[jobs.run]
command = [
    "cargo", "run",
    "--color", "always",
    # put launch parameters for your program behind a `--` separator
]
need_stdout = true
allow_warnings = true
background = true

# This parameterized job runs the example of your choice, as soon
# as the code compiles.
# Call it as
#    bacon ex -- my-example
[jobs.ex]
command = ["cargo", "run", "--color", "always", "--example"]
need_stdout = true
allow_warnings = true

# You may define here keybindings that would be specific to
# a project, for example a shortcut to launch a specific job.
# Shortcuts to internal functions (scrolling, toggling, etc.)
# should go in your personal global prefs.toml file instead.
[keybindings]
# alt-m = "job:my-job"
c = "job:clippy-all" # comment this to have 'c' run clippy on only the default target

Changes to examples/per_listener_ctx.rs.

48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
  async fn failed(
    &self,
    _listener: &Listener,
    lid: Self::ListenIdent,
    err: std::io::Error
  ) {
    // An interface listener failed to bind to its interface
    println!("Failed to set up listener lid={lid}; {}", err);

    // Note: Thanks to add_listener_with_cb() being used to register listeners
    // with a callback, where the callback is responsible for registering
    // the listener-specific context, we don't need to worry about
    // deregistering the context here.
  }








|







48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
  async fn failed(
    &self,
    _listener: &Listener,
    lid: Self::ListenIdent,
    err: std::io::Error
  ) {
    // An interface listener failed to bind to its interface
    println!("Failed to set up listener lid={lid}; {err}");

    // Note: Thanks to add_listener_with_cb() being used to register listeners
    // with a callback, where the callback is responsible for registering
    // the listener-specific context, we don't need to worry about
    // deregistering the context here.
  }

81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
    println!(
      "Connected on listener id={}, assigned conn_id={}",
      lid,
      ci.id.val()
    );
    Self::ConnHandler {
      listen_id: lid,
      conn_id: ci.id.clone()
    }
  }
}


struct MyConnHandler {
  listen_id: i64,







|







81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
    println!(
      "Connected on listener id={}, assigned conn_id={}",
      lid,
      ci.id.val()
    );
    Self::ConnHandler {
      listen_id: lid,
      conn_id: ci.id
    }
  }
}


struct MyConnHandler {
  listen_id: i64,
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
  // callbacks.
  let lgrp = ListenerGroup::new(handler);

  // Kick off a listener with id 1, which is a TCP localhost:8080 listener
  let id = 1;
  let listener = Listener::from_str("127.0.0.1:8080").unwrap();
  let lm = Arc::clone(&lmap);
  lgrp
    .add_listener_with_cb(id, listener, move |lid| {
      let name = format!("Listener {}", id);
      lm.lock().insert(lid, ListenCtx { name });
    })
    .await;

  // Give listeners a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;

  // Kick off a listener with id 9, which is a TCP localhost:8080 listener,
  // which should fail because it is bound by listener with id 1.
  let id = 9;
  let listener = Listener::from_str("127.0.0.1:8080").unwrap();
  let lm = Arc::clone(&lmap);
  lgrp
    .add_listener_with_cb(id, listener, move |lid| {
      let name = format!("Listener {}", id);
      lm.lock().insert(lid, ListenCtx { name });
    })
    .await;

  // Give listener a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;

  // Kick off a listener with id 3, which is a TCP localhost:8081 listener
  let id = 2;
  let listener = Listener::from_str("127.0.0.1:8081").unwrap();
  let lm = Arc::clone(&lmap);
  lgrp
    .add_listener_with_cb(id, listener, move |lid| {
      let name = format!("Listener {}", id);
      lm.lock().insert(lid, ListenCtx { name });
    })
    .await;

  // Give listener a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;


  // Spawn a client that will connect to the server
  tokio::task::spawn(async {







<
|
|
|
|
<









<
|
|
|
|
<








<
|
|
|
|
<







135
136
137
138
139
140
141

142
143
144
145

146
147
148
149
150
151
152
153
154

155
156
157
158

159
160
161
162
163
164
165
166

167
168
169
170

171
172
173
174
175
176
177
  // callbacks.
  let lgrp = ListenerGroup::new(handler);

  // Kick off a listener with id 1, which is a TCP localhost:8080 listener
  let id = 1;
  let listener = Listener::from_str("127.0.0.1:8080").unwrap();
  let lm = Arc::clone(&lmap);

  lgrp.add_listener_with_cb(id, listener, move |lid| {
    let name = format!("Listener {id}");
    lm.lock().insert(lid, ListenCtx { name });
  });


  // Give listeners a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;

  // Kick off a listener with id 9, which is a TCP localhost:8080 listener,
  // which should fail because it is bound by listener with id 1.
  let id = 9;
  let listener = Listener::from_str("127.0.0.1:8080").unwrap();
  let lm = Arc::clone(&lmap);

  lgrp.add_listener_with_cb(id, listener, move |lid| {
    let name = format!("Listener {id}");
    lm.lock().insert(lid, ListenCtx { name });
  });


  // Give listener a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;

  // Kick off a listener with id 3, which is a TCP localhost:8081 listener
  let id = 2;
  let listener = Listener::from_str("127.0.0.1:8081").unwrap();
  let lm = Arc::clone(&lmap);

  lgrp.add_listener_with_cb(id, listener, move |lid| {
    let name = format!("Listener {id}");
    lm.lock().insert(lid, ListenCtx { name });
  });


  // Give listener a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;


  // Spawn a client that will connect to the server
  tokio::task::spawn(async {

Changes to examples/simple.rs.

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

  async fn failed(
    &self,
    _listener: &Listener,
    lid: Self::ListenIdent,
    err: std::io::Error
  ) {
    println!("Failed to set up listener {}; {}", lid, err);
  }

  async fn unbound(&self, _listener: &Listener, _lid: Self::ListenIdent) {
    //println!("Listener {} has been unbound", lctx.listen_id);
  }

  async fn connected(
    &self,
    lid: Self::ListenIdent,
    ci: ConnInfo
  ) -> Self::ConnHandler {
    println!(
      "Connected on listener id={}, assigned conn_id={}",
      lid,
      ci.id.val()
    );
    Self::ConnHandler {
      listen_id: lid,
      conn_id: ci.id.clone()
    }
  }
}


struct MyConnHandler {
  listen_id: i64,







|


















|







23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

  async fn failed(
    &self,
    _listener: &Listener,
    lid: Self::ListenIdent,
    err: std::io::Error
  ) {
    println!("Failed to set up listener {lid}; {err}");
  }

  async fn unbound(&self, _listener: &Listener, _lid: Self::ListenIdent) {
    //println!("Listener {} has been unbound", lctx.listen_id);
  }

  async fn connected(
    &self,
    lid: Self::ListenIdent,
    ci: ConnInfo
  ) -> Self::ConnHandler {
    println!(
      "Connected on listener id={}, assigned conn_id={}",
      lid,
      ci.id.val()
    );
    Self::ConnHandler {
      listen_id: lid,
      conn_id: ci.id
    }
  }
}


struct MyConnHandler {
  listen_id: i64,
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
  // Create a listener group that will use an instance of MyHandler to process
  // callbacks.
  let lgrp = ListenerGroup::new(handler);

  // Kick off a listener with id 1, which is a TCP localhost:8080 listener
  let id = 1;
  let listener = Listener::from_str("127.0.0.1:8080").unwrap();
  lgrp.add_listener(id, listener).await;

  // Give listeners a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;

  // Kick off a listener with id 9, which is a TCP localhost:8080 listener,
  // which should fail because it is bound by listener with id 1.
  let id = 9;
  let listener = Listener::from_str("127.0.0.1:8080").unwrap();
  lgrp.add_listener(id, listener).await;

  // Give listener a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;


  // Kick off a listener with id 3, which is a TCP localhost:8081 listener
  let id = 2;
  let listener = Listener::from_str("127.0.0.1:8081").unwrap();
  lgrp.add_listener(id, listener).await;

  // Give listener a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;


  // Spawn a client that will connect to the server
  tokio::task::spawn(async {







|








|








|







87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
  // Create a listener group that will use an instance of MyHandler to process
  // callbacks.
  let lgrp = ListenerGroup::new(handler);

  // Kick off a listener with id 1, which is a TCP localhost:8080 listener
  let id = 1;
  let listener = Listener::from_str("127.0.0.1:8080").unwrap();
  lgrp.add_listener(id, listener);

  // Give listeners a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;

  // Kick off a listener with id 9, which is a TCP localhost:8080 listener,
  // which should fail because it is bound by listener with id 1.
  let id = 9;
  let listener = Listener::from_str("127.0.0.1:8080").unwrap();
  lgrp.add_listener(id, listener);

  // Give listener a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;


  // Kick off a listener with id 3, which is a TCP localhost:8081 listener
  let id = 2;
  let listener = Listener::from_str("127.0.0.1:8081").unwrap();
  lgrp.add_listener(id, listener);

  // Give listener a second to start up
  tokio::time::sleep(std::time::Duration::from_millis(200)).await;


  // Spawn a client that will connect to the server
  tokio::task::spawn(async {

Changes to src/conn.rs.

13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32




33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

pub use idbag::ArcId;

use super::{ConnHandler, ConnInfo};


/// Internal per-connection data.
pub(crate) struct ConnectionData<LI, CC> {
  /// The listener identifier this connection was spawned from.
  pub(crate) lid: LI,

  /// Join handle for connection task.
  pub(crate) jh: Option<JoinHandle<()>>,

  /// Connection handler
  pub(crate) chandler: Arc<CC>
}


pub(crate) struct ConnParams<LI, CC> {




  pub(crate) lid: LI,
  pub(crate) cid: ArcId,
  pub(crate) lhandler: Arc<super::LCHandler<LI, CC>>,
  pub(crate) sa: SockAddr,
  pub(crate) strm: Stream,
  pub(crate) cmap: Arc<Mutex<HashMap<ArcId, ConnectionData<LI, CC>>>>
}

/// Internal wrapper used to call the application's connection handler.
///
/// This function is called on a newly spawned task intended to manage the
/// connection.
pub(crate) async fn task<LI, CC>(
  ConnParams {
    lid,
    cid,
    lhandler,
    sa,
    strm,
    cmap
  }: ConnParams<LI, CC>,
  rx: oneshot::Receiver<JoinHandle<()>>
) where
  LI: Clone,
  CC: ConnHandler
{
  let ci = ConnInfo {
    sa: Arc::new(sa),
    id: cid.clone()
  };

  // Call application callback to generate the connection handler object.







|











|
>
>
>
>












|
|






|


|
|







13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

pub use idbag::ArcId;

use super::{ConnHandler, ConnInfo};


/// Internal per-connection data.
pub struct ConnectionData<LI, CC> {
  /// The listener identifier this connection was spawned from.
  pub(crate) lid: LI,

  /// Join handle for connection task.
  pub(crate) jh: Option<JoinHandle<()>>,

  /// Connection handler
  pub(crate) chandler: Arc<CC>
}


pub struct Params<LI, CC>
where
  LI: Send,
  CC: Send
{
  pub(crate) lid: LI,
  pub(crate) cid: ArcId,
  pub(crate) lhandler: Arc<super::LCHandler<LI, CC>>,
  pub(crate) sa: SockAddr,
  pub(crate) strm: Stream,
  pub(crate) cmap: Arc<Mutex<HashMap<ArcId, ConnectionData<LI, CC>>>>
}

/// Internal wrapper used to call the application's connection handler.
///
/// This function is called on a newly spawned task intended to manage the
/// connection.
pub async fn task<LI, CC>(
  Params {
    lid,
    cid,
    lhandler,
    sa,
    strm,
    cmap
  }: Params<LI, CC>,
  rx: oneshot::Receiver<JoinHandle<()>>
) where
  LI: Clone + Send,
  CC: ConnHandler + Send + Sync
{
  let ci = ConnInfo {
    sa: Arc::new(sa),
    id: cid.clone()
  };

  // Call application callback to generate the connection handler object.

Changes to src/lib.rs.

60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
};

pub use idbag::ArcId;
use idbag::IdBag;

use conn::ConnectionData;

use listener::{ListenTaskParams, ListenerData};


/// Internal alias for a GroupHandler with a generic listener identifier and
/// connection handler.
type LCHandler<LI, CC> =
  dyn GroupHandler<ListenIdent = LI, ConnHandler = CC> + Send + Sync + 'static;


/// Context passed to [`GroupHandler::connected()`] used to pass connection
/// (metadata) information.







|


|







60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
};

pub use idbag::ArcId;
use idbag::IdBag;

use conn::ConnectionData;

use listener::{ListenData, ListenTaskParams};


/// Internal alias for a `GroupHandler` with a generic listener identifier and
/// connection handler.
type LCHandler<LI, CC> =
  dyn GroupHandler<ListenIdent = LI, ConnHandler = CC> + Send + Sync + 'static;


/// Context passed to [`GroupHandler::connected()`] used to pass connection
/// (metadata) information.
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
pub struct ListenerGroup<LI, CC>
where
  LI: Hash + Eq
{
  /// Keep track of all listeners, identified by `LI` and all their active
  /// connections, identified by an `ArcId`, with their connection context,
  /// identified by `CC`.
  lmap: Arc<Mutex<HashMap<LI, ListenerData>>>,

  cmap: Arc<Mutex<HashMap<ArcId, ConnectionData<LI, CC>>>>,

  /// Connection handler used by all the listeners within this group.
  lhandler: Arc<LCHandler<LI, CC>>,

  idbag: Arc<IdBag>,







|







175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
pub struct ListenerGroup<LI, CC>
where
  LI: Hash + Eq
{
  /// Keep track of all listeners, identified by `LI` and all their active
  /// connections, identified by an `ArcId`, with their connection context,
  /// identified by `CC`.
  lmap: Arc<Mutex<HashMap<LI, ListenData>>>,

  cmap: Arc<Mutex<HashMap<ArcId, ConnectionData<LI, CC>>>>,

  /// Connection handler used by all the listeners within this group.
  lhandler: Arc<LCHandler<LI, CC>>,

  idbag: Arc<IdBag>,
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
  }

  /// Register a new listener within this group.
  ///
  /// `id` is the unique identifier of this listener.
  /// `listner_ctx` is a listener-specific instance of a listener context that
  /// will be put in an `Arc` and passed to each connector.
  pub async fn add_listener(&self, id: LI, listener: Listener) {
    // Do not allow new listeners if the ListenerGroup is either in the process
    // of shutting down or has been shut down.
    if self.shutdown.load(Ordering::Relaxed) {
      return;
    }

    let g = self.lmap.lock();







|







217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
  }

  /// Register a new listener within this group.
  ///
  /// `id` is the unique identifier of this listener.
  /// `listner_ctx` is a listener-specific instance of a listener context that
  /// will be put in an `Arc` and passed to each connector.
  pub fn add_listener(&self, id: LI, listener: Listener) {
    // Do not allow new listeners if the ListenerGroup is either in the process
    // of shutting down or has been shut down.
    if self.shutdown.load(Ordering::Relaxed) {
      return;
    }

    let g = self.lmap.lock();
250
251
252
253
254
255
256
257


258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
    let (tx, rx) = oneshot::channel();

    // Spawn a task to initialize and run the listener.
    let jh = task::spawn(listener::task(ltp, |_lid| {}, rx));

    // Send JoinHandle to listener task, so it can store it in the ListenerData
    // structure
    tx.send(jh).unwrap();


  }

  /// Register off a listener within this group.
  ///
  /// This works much like `add_listener()`, but it will call a callback,
  /// passed through `f`, if the bind was successful.
  ///
  /// The callback can be useful to register a listener-specific context in the
  /// listen group handler's object since it is only called on success.  This
  /// way the application does not need to deregister the listener context for
  /// the failure case.
  pub async fn add_listener_with_cb<F>(
    &self,
    lid: LI,
    listener: Listener,
    f: F
  ) where
    F: FnOnce(LI) + Send + 'static
  {
    // Do not allow new listeners if the ListenerGroup is either in the process
    // of shutting down or has been shut down.
    if self.shutdown.load(Ordering::Relaxed) {
      return;
    }







|
>
>











|
<
<
<
<
|







250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271




272
273
274
275
276
277
278
279
    let (tx, rx) = oneshot::channel();

    // Spawn a task to initialize and run the listener.
    let jh = task::spawn(listener::task(ltp, |_lid| {}, rx));

    // Send JoinHandle to listener task, so it can store it in the ListenerData
    // structure
    if let Err(_e) = tx.send(jh) {
      // ToDo: error-management
    }
  }

  /// Register off a listener within this group.
  ///
  /// This works much like `add_listener()`, but it will call a callback,
  /// passed through `f`, if the bind was successful.
  ///
  /// The callback can be useful to register a listener-specific context in the
  /// listen group handler's object since it is only called on success.  This
  /// way the application does not need to deregister the listener context for
  /// the failure case.
  pub fn add_listener_with_cb<F>(&self, lid: LI, listener: Listener, f: F)




  where
    F: FnOnce(LI) + Send + 'static
  {
    // Do not allow new listeners if the ListenerGroup is either in the process
    // of shutting down or has been shut down.
    if self.shutdown.load(Ordering::Relaxed) {
      return;
    }
303
304
305
306
307
308
309
310


311
312
313
314
315
316
317
318
319
320
321
322
323
    let (tx, rx) = oneshot::channel();

    // Spawn a task to initialize and run the listener.
    let jh = task::spawn(listener::task(ltp, f, rx));

    // Send JoinHandle to listener task, so it can store it in the ListenerData
    // structure
    tx.send(jh).unwrap();


  }


  /// Check if there's a listener with the identifier `LI`.
  pub async fn have_listener(&mut self, id: LI) -> bool {
    self.lmap.lock().contains_key(&id)
  }

  /// Kill the listener given a specific listener id.
  ///
  /// If `kill_conns` is `true`, the killswitch of all of the listeners active
  /// connections will be triggered as well.
  pub async fn kill_listener(&self, lid: LI, kill_conns: bool) {







|
>
>




|
|







301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
    let (tx, rx) = oneshot::channel();

    // Spawn a task to initialize and run the listener.
    let jh = task::spawn(listener::task(ltp, f, rx));

    // Send JoinHandle to listener task, so it can store it in the ListenerData
    // structure
    if let Err(_e) = tx.send(jh) {
      // ToDo: error-handling
    }
  }


  /// Check if there's a listener with the identifier `LI`.
  pub fn have_listener(&mut self, id: &LI) -> bool {
    self.lmap.lock().contains_key(id)
  }

  /// Kill the listener given a specific listener id.
  ///
  /// If `kill_conns` is `true`, the killswitch of all of the listeners active
  /// connections will be triggered as well.
  pub async fn kill_listener(&self, lid: LI, kill_conns: bool) {
412
413
414
415
416
417
418

419
420
421
422
423
424
425
      .collect();
    for lid in lids {
      self.kill_listener(lid, true).await;
    }
  }

  /// Get a snap-shot of the current listener group state.

  pub fn current_state(&self) -> Vec<LInfo<LI, CC>> {
    // Generate list of all listeners
    let mut ln: Vec<LInfo<LI, CC>> = self
      .lmap
      .lock()
      .iter()
      .map(|(lid, ld)| LInfo {







>







412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
      .collect();
    for lid in lids {
      self.kill_listener(lid, true).await;
    }
  }

  /// Get a snap-shot of the current listener group state.
  #[must_use]
  pub fn current_state(&self) -> Vec<LInfo<LI, CC>> {
    // Generate list of all listeners
    let mut ln: Vec<LInfo<LI, CC>> = self
      .lmap
      .lock()
      .iter()
      .map(|(lid, ld)| LInfo {

Changes to src/listener.rs.

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
  listener::{async_trait, Acceptor, Listener, SockAddr},
  Stream
};

pub use idbag::{ArcId, IdBag};

use crate::{
  conn::{self, ConnParams, ConnectionData},
  ConnHandler, LCHandler
};

/// Internal per-listener data.
pub(crate) struct ListenerData {
  /// A string representation of the address the listener has been bound to
  pub(crate) addr: Option<String>,

  /// The kill switch used to terminate this listener.
  pub(crate) ks: KillSwitch,

  pub(crate) jh: Option<JoinHandle<()>>,

  /// If set to `true`, all the connections will be requested to
  /// self-terminate when the listener terminates.
  pub(crate) auto_kill_conns: bool
}


/// Internal listener context.
///
/// The primary purpose of this is to act as a callback handler for
/// `protwrap`'s [`Acceptor`].
struct InternalListenerCallback<LI, CC> {
  jh: Option<JoinHandle<()>>,
  lmap: Arc<Mutex<HashMap<LI, ListenerData>>>,
  cmap: Arc<Mutex<HashMap<ArcId, ConnectionData<LI, CC>>>>,
  lid: LI,
  ks: KillSwitch,
  lhandler: Arc<LCHandler<LI, CC>>,

  /// IdBag used to allocate unique connection id's.
  idbag: Arc<IdBag>,

  /// Optional post-successful-bind callback.
  cb: Option<Box<dyn FnOnce(LI) + Send>>,

  shutdown: Arc<AtomicBool>
}







|




|




















|





|







21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
  listener::{async_trait, Acceptor, Listener, SockAddr},
  Stream
};

pub use idbag::{ArcId, IdBag};

use crate::{
  conn::{self, ConnectionData, Params as ConnParams},
  ConnHandler, LCHandler
};

/// Internal per-listener data.
pub struct ListenData {
  /// A string representation of the address the listener has been bound to
  pub(crate) addr: Option<String>,

  /// The kill switch used to terminate this listener.
  pub(crate) ks: KillSwitch,

  pub(crate) jh: Option<JoinHandle<()>>,

  /// If set to `true`, all the connections will be requested to
  /// self-terminate when the listener terminates.
  pub(crate) auto_kill_conns: bool
}


/// Internal listener context.
///
/// The primary purpose of this is to act as a callback handler for
/// `protwrap`'s [`Acceptor`].
struct InternalListenerCallback<LI, CC> {
  jh: Option<JoinHandle<()>>,
  lmap: Arc<Mutex<HashMap<LI, ListenData>>>,
  cmap: Arc<Mutex<HashMap<ArcId, ConnectionData<LI, CC>>>>,
  lid: LI,
  ks: KillSwitch,
  lhandler: Arc<LCHandler<LI, CC>>,

  /// `IdBag` used to allocate unique connection id's.
  idbag: Arc<IdBag>,

  /// Optional post-successful-bind callback.
  cb: Option<Box<dyn FnOnce(LI) + Send>>,

  shutdown: Arc<AtomicBool>
}
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
    // Forward bound() to the listen group application callback Handler
    self.lhandler.bound(listener, self.lid.clone(), sa).await;

    // The JoinHandle is expected to exist.
    let jh = self.jh.take();

    // Allocate the buffer used to store listener data in the listners map
    let ldata = ListenerData {
      ks: self.ks.clone(),
      jh,
      addr,
      auto_kill_conns: false
    };

    // Add listener to listener map
    let mut g = self.lmap.lock();
    g.insert(self.lid.clone(), ldata);

    // Call the callback, in case the application wants to do some
    // post-successful-bound processing.
    if let Some(cb) = self.cb.take() {
      cb(self.lid.clone());
    }
  }







|







<
|







83
84
85
86
87
88
89
90
91
92
93
94
95
96
97

98
99
100
101
102
103
104
105
    // Forward bound() to the listen group application callback Handler
    self.lhandler.bound(listener, self.lid.clone(), sa).await;

    // The JoinHandle is expected to exist.
    let jh = self.jh.take();

    // Allocate the buffer used to store listener data in the listners map
    let ldata = ListenData {
      ks: self.ks.clone(),
      jh,
      addr,
      auto_kill_conns: false
    };

    // Add listener to listener map

    self.lmap.lock().insert(self.lid.clone(), ldata);

    // Call the callback, in case the application wants to do some
    // post-successful-bound processing.
    if let Some(cb) = self.cb.take() {
      cb(self.lid.clone());
    }
  }
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136


    // Allocate a unique connection id for this connection.
    let cid = self.idbag.alloc().into_arcid();

    let cp = ConnParams {
      lid: self.lid.clone(),
      cid: cid.clone(),
      lhandler: Arc::clone(&self.lhandler),
      sa,
      strm,
      cmap: Arc::clone(&self.cmap)
    };

    // Used to make sure the connection task doesn't progress boyond







|







121
122
123
124
125
126
127
128
129
130
131
132
133
134
135


    // Allocate a unique connection id for this connection.
    let cid = self.idbag.alloc().into_arcid();

    let cp = ConnParams {
      lid: self.lid.clone(),
      cid,
      lhandler: Arc::clone(&self.lhandler),
      sa,
      strm,
      cmap: Arc::clone(&self.cmap)
    };

    // Used to make sure the connection task doesn't progress boyond
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
    // Store the join handle in the internal listener context.
    //self.ctaskmap.lock().insert(cid.clone(), jh);
  }
}


/// Used to bundle data being passed to listener task.
pub(crate) struct ListenTaskParams<LI, CC>
where
  LI: Send
{
  pub(crate) lid: LI,
  pub(crate) listener: Listener,
  pub(crate) lmap: Arc<Mutex<HashMap<LI, ListenerData>>>,
  pub(crate) lhandler: Arc<LCHandler<LI, CC>>,
  pub(crate) cmap: Arc<Mutex<HashMap<ArcId, ConnectionData<LI, CC>>>>,
  pub(crate) idbag: Arc<IdBag>,
  pub(crate) shutdown: Arc<AtomicBool>
}

/// Internal listener.
///
/// This should be spawned on a new task.
///
/// `cb` is a callback that will be called iff the listener was successfully
/// bound.  The application can use this to perform a custom action when the
/// bound was successful.
pub(crate) async fn task<LI, CC, F>(
  ltp: ListenTaskParams<LI, CC>,
  cb: F,
  rx: oneshot::Receiver<JoinHandle<()>>
) where
  LI: Hash + Eq + Send + Sync + Clone + 'static,
  CC: ConnHandler + Send + Sync + 'static,
  F: FnOnce(LI) + Send + 'static







|





|













|







146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
    // Store the join handle in the internal listener context.
    //self.ctaskmap.lock().insert(cid.clone(), jh);
  }
}


/// Used to bundle data being passed to listener task.
pub struct ListenTaskParams<LI, CC>
where
  LI: Send
{
  pub(crate) lid: LI,
  pub(crate) listener: Listener,
  pub(crate) lmap: Arc<Mutex<HashMap<LI, ListenData>>>,
  pub(crate) lhandler: Arc<LCHandler<LI, CC>>,
  pub(crate) cmap: Arc<Mutex<HashMap<ArcId, ConnectionData<LI, CC>>>>,
  pub(crate) idbag: Arc<IdBag>,
  pub(crate) shutdown: Arc<AtomicBool>
}

/// Internal listener.
///
/// This should be spawned on a new task.
///
/// `cb` is a callback that will be called iff the listener was successfully
/// bound.  The application can use this to perform a custom action when the
/// bound was successful.
pub async fn task<LI, CC, F>(
  ltp: ListenTaskParams<LI, CC>,
  cb: F,
  rx: oneshot::Receiver<JoinHandle<()>>
) where
  LI: Hash + Eq + Send + Sync + Clone + 'static,
  CC: ConnHandler + Send + Sync + 'static,
  F: FnOnce(LI) + Send + 'static
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
    shutdown: Arc::clone(&ltp.shutdown)
  };

  // Kick off the listerner.
  //
  // If successful, this will block until terminated using the killswitch.
  match ltp.listener.run(ks, ilcb).await {
    Ok(_) => {
      // Do nothing -- unbound is handled by Acceptor::unbound()
      //ltp.handler.unbound(ltp.lid.clone(), &ltp.lctx).await;
    }
    Err(e) => {
      // Listener failed -- report back to application using callback
      ltp.lhandler.failed(&ltp.listener, ltp.lid.clone(), e).await;
    }







|







197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
    shutdown: Arc::clone(&ltp.shutdown)
  };

  // Kick off the listerner.
  //
  // If successful, this will block until terminated using the killswitch.
  match ltp.listener.run(ks, ilcb).await {
    Ok(()) => {
      // Do nothing -- unbound is handled by Acceptor::unbound()
      //ltp.handler.unbound(ltp.lid.clone(), &ltp.lctx).await;
    }
    Err(e) => {
      // Listener failed -- report back to application using callback
      ltp.lhandler.failed(&ltp.listener, ltp.lid.clone(), e).await;
    }

Changes to www/changelog.md.

1
2
3
4
5
6
7
8
9
10
11
12










13
14
15
16
17
18
19
# Change Log

## [Unreleased]

[Details](/vdiff?from=lstngrp-0.0.2&to=trunk)

### Added

### Changed

### Removed











---

## [0.0.2] - 2024-08-20

[Details](/vdiff?from=lstngrp-0.0.1&to=lstngrp-0.0.2)

### Changed




|







>
>
>
>
>
>
>
>
>
>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Change Log

## [Unreleased]

[Details](/vdiff?from=lstngrp-0.0.3&to=trunk)

### Added

### Changed

### Removed

---

## [0.0.3] - 2024-09-18

[Details](/vdiff?from=lstngrp-0.0.2&to=lstngrp-0.0.3)

### Changed

- Add `Send` bounds to make generated `Future`s `Send`.

---

## [0.0.2] - 2024-08-20

[Details](/vdiff?from=lstngrp-0.0.1&to=lstngrp-0.0.2)

### Changed

Changes to www/index.md.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# listengroup

`ListenGroup` is meant to collect a group of network listeners into a single
entity, and help track connections made against those listeners.


## Change log

The details of changes can always be found in the timeline, but for a
high-level view of changes between released versions there's a manually
maintained [Change Log](./changelog.md).


## Project Status

_lstconn_ is in early prototyping stages.

|

|












|

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# lstngrp

`ListenerGroup` is meant to collect a group of network listeners into a single
entity, and help track connections made against those listeners.


## Change log

The details of changes can always be found in the timeline, but for a
high-level view of changes between released versions there's a manually
maintained [Change Log](./changelog.md).


## Project Status

_lstngrp_ is in early prototyping stages.