ethrecv

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From ethrecv-0.0.1 To trunk

2024-03-28
17:43
Working on a simulator, using unix local datagram sockets, backend. Leaf check-in: d1bbde3fa5 user: jan tags: uds-simulator
2024-03-20
11:47
Re-export pcap. Leaf check-in: 9c0f8028a4 user: jan tags: trunk
11:44
Use pcap 1.3.0. check-in: 8528c5c473 user: jan tags: trunk
2024-03-12
16:30
Update changelog. check-in: 01c7b31411 user: jan tags: trunk
16:20
reinstate a debug output to detect unexpected errors when reading from control stream. check-in: c630a1781b user: jan tags: ethrecv-0.0.1, trunk
16:17
Comment out debug outputs. check-in: 0dba4808e5 user: jan tags: trunk

Changes to Cargo.toml.

1
2
3

4
5

6
7
8
9
10
11
12
13
14
15
16
17
18
19
20




21
22
23
24
25
26
27
28

29
30
31
32
33
34
35
1
2

3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

33
34
35
36
37
38
39
40


-
+


+















+
+
+
+







-
+







[package]
name = "ethrecv"
version = "0.0.1"
version = "0.0.2"
edition = "2021"
license = "0BSD"
# https://crates.io/category_slugs
categories = [ "network-programming" ]
keywords = [ "ethernet", "packets", "raw", "network", "receive" ]
repository = "https://repos.qrnch.tech/pub/ethrecv"
description = "Receive ethernet packets at a high rate."
rust-version = "1.56"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "www",
  "build.ps1",
  "build.sh",
  "rustfmt.toml"
]

# https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section
[badges]
maintenance = { status = "experimental" }

[features]
idle = ["dep:atomic-time", "dep:parking_lot"]
inspect = []

[dependencies]
atomic-time = { version = "0.1.4", optional = true }
parking_lot = { version = "0.12.1", optional = true }
pcap = { version = "1.2.0" }
pcap = { version = "1.3.0" }

[target.'cfg(unix)'.dependencies]
mio = { version = "0.8.11", features = ["os-poll", "os-ext"] }
parking_lot = { version = "0.12.1" }

[target.'cfg(windows)'.dependencies]
bitflags = { version = "2.4.2" }

Changes to README.md.

1
2
3
4








1
2
3
4
5
6
7
8
9
10
11
12




+
+
+
+
+
+
+
+
# ethrecv

The _ethrecv_ crate is designed to receive ethernet packets at a high rate.

[![Crates.io][crates-badge]][crates-url]
[![0BSD licensed][0bsd-badge]][0bsd-url]

[crates-badge]: https://img.shields.io/crates/v/ethrecv.svg
[crates-url]: https://crates.io/crates/ethrecv
[0bsd-badge]: https://img.shields.io/badge/license-0BSD-blue.svg
[0bsd-url]: https://opensource.org/license/0bsd

Changes to examples/simple-demo.rs.

1

2
3
4
5
6
7
8
9
10
11
12
13


14


15
16

17
18
19
20

21
22

23
24
25
26
27
28
29

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

16
17
18

19
20
21
22

23
24

25
26
27
28
29
30
31
32
-
+












+
+
-
+
+

-
+



-
+

-
+







use std::env;
use std::{env, ops::ControlFlow};

use ethrecv::{PacketHandler, RecvThread};

#[derive(Debug)]
enum AppError {}

#[derive(Default)]
struct PktProc {}

impl PacketHandler for PktProc {
  type Error = AppError;

  fn proc(
    &mut self,
  fn proc(&mut self, _pkt: pcap::Packet) -> Result<(), Self::Error> {
    _pkt: pcap::Packet
  ) -> ControlFlow<Result<(), Self::Error>> {
    eprintln!("packet!");
    Ok(())
    ControlFlow::Continue(())
  }

  #[cfg(feature = "idle")]
  fn idle(&mut self) -> Result<(), Self::Error> {
  fn idle(&mut self) -> ControlFlow<Result<(), Self::Error>> {
    eprintln!("idle!");
    Ok(())
    ControlFlow::Continue(())
  }

  #[cfg(feature = "inspect")]
  fn inspect(&self, info: &ethrecv::RecvInfo) {
    eprintln!("inspect!");
    eprintln!("overflow drop: {}", info.overflow_dropped);
    eprintln!("if drop: {}", info.if_dropped);

Changes to src/lib.rs.

25
26
27
28
29
30
31

32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48



49
50
51
52
53
54
55
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59







+

















+
+
+







#[cfg(feature = "idle")]
mod idlemon;

mod cmdsig;
mod err;

use std::{
  ops::ControlFlow,
  sync::{mpsc::Sender, Arc},
  thread
};

#[cfg(any(feature = "idle", feature = "inspect"))]
use std::time::Duration;

#[cfg(unix)]
use parking_lot::Mutex;

#[cfg(windows)]
use std::sync::atomic::AtomicU32;

pub use err::Error;

use cmdsig::CmdSignal;


pub use pcap;


/// Messages that can be sent back to controller from recever thread.
enum Msg {
  #[cfg(feature = "inspect")]
  Inspect(std::sync::mpsc::Sender<RecvInfo>)
}

71
72
73
74
75
76
77



78

79
80
81
82
83
84
85


86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102


103
104
105
106
107
108
109
110
111

112
113
114
115
116
117
118
75
76
77
78
79
80
81
82
83
84

85
86
87
88
89
90


91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128







+
+
+
-
+





-
-
+
+

















+
+









+







  /// Called on the receiver thread before the processing reading and
  /// processing loop has been entered.
  fn init(&mut self) -> Result<(), Self::Error> {
    Ok(())
  }

  /// Called to process packets.
  fn proc(
    &mut self,
    pkt: pcap::Packet
  fn proc(&mut self, pkt: pcap::Packet) -> Result<(), Self::Error>;
  ) -> ControlFlow<Result<(), Self::Error>>;

  /// Called whenever a timeout has been reached without any new packets
  /// arriving.
  #[cfg(feature = "idle")]
  #[cfg_attr(docsrs, doc(cfg(feature = "idle")))]
  fn idle(&mut self) -> Result<(), Self::Error> {
    Ok(())
  fn idle(&mut self) -> ControlFlow<Result<(), Self::Error>> {
    ControlFlow::Continue(())
  }

  /// Called when the controller has requested an inspection.
  #[cfg(feature = "inspect")]
  #[cfg_attr(docsrs, doc(cfg(feature = "inspect")))]
  #[allow(unused_variables)]
  fn inspect(&self, info: &RecvInfo) {}

  /// Called on the receiver thread once the main loop been been terminated.
  fn shutdown(&mut self) {}
}

/// A builder-like object for initializing the packet receiver thread.
pub struct RecvThread {
  devname: String,
  bufsize: i32,

  thread_name: Option<String>,

  #[cfg(feature = "idle")]
  idle_dur: Option<Duration>
}

impl RecvThread {
  pub fn new(devname: &str) -> Self {
    Self {
      devname: devname.to_string(),
      bufsize: 16 * 1024 * 1024,
      thread_name: None,
      #[cfg(feature = "idle")]
      idle_dur: None
    }
  }

  pub fn bufsize<E>(mut self, bufsize: usize) -> Result<Self, Error<E>> {
    self.bufsize_r(bufsize)?;
142
143
144
145
146
147
148










149
150
151
152
153
154
155
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175







+
+
+
+
+
+
+
+
+
+








  #[cfg(feature = "idle")]
  #[cfg_attr(docsrs, doc(cfg(feature = "idle")))]
  pub fn idle_duration_r(&mut self, dur: Duration) -> &mut Self {
    self.idle_dur = Some(dur);
    self
  }

  pub fn thread_name(mut self, name: impl ToString) -> Self {
    self.thread_name_r(name);
    self
  }

  pub fn thread_name_r(&mut self, name: impl ToString) -> &mut Self {
    self.thread_name = Some(name.to_string());
    self
  }
}


impl RecvThread {
  pub fn run<E>(
    self,
    mut handler: impl PacketHandler<Error = E> + Send + 'static
186
187
188
189
190
191
192








193
194


195
196
197
198
199
200
201
202
203
204









205
206
207
208
209
210
211
212
213








214
215
216
217
218
219
220
221
222








223
224
225
226
227
228
229
230
231








232
233
234
235
236
237
238






239
240
241


242
243
244


245
246
247
248
249
250
251






252
253
254





255
256
257
258
259
260
261
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220


221
222
223









224
225
226
227
228
229
230
231
232
233








234
235
236
237
238
239
240
241
242








243
244
245
246
247
248
249
250
251








252
253
254
255
256
257
258
259
260






261
262
263
264
265
266
267


268
269
270


271
272
273






274
275
276
277
278
279
280


281
282
283
284
285
286
287
288
289
290
291
292







+
+
+
+
+
+
+
+
-
-
+
+

-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+

-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+

-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+

-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+

-
-
-
-
-
-
+
+
+
+
+
+

-
-
+
+

-
-
+
+

-
-
-
-
-
-
+
+
+
+
+
+

-
-
+
+
+
+
+








    // Channel used to send requests to receiver thread.
    let (ch_tx, ch_rx) = std::sync::mpsc::channel::<Msg>();

    #[cfg(feature = "idle")]
    let cmdsig2 = cmdsig.clone();

    let bldr = thread::Builder::new();
    let bldr = if let Some(thread_name) = self.thread_name {
      bldr.name(thread_name)
    } else {
      bldr
    };

    let jh = bldr
    let jh = thread::spawn(move || {
      handler.init().map_err(|e| Error::App(e))?;
      .spawn(move || {
        handler.init().map_err(|e| Error::App(e))?;

      // If the "idle" feature is used, then kick off the idle monitoring
      // thread.
      #[cfg(feature = "idle")]
      let idle_res = if let Some(dur) = self.idle_dur {
        let r = idlemon::run(dur, cmdsig2);
        Some(r)
      } else {
        None
      };
        // If the "idle" feature is used, then kick off the idle monitoring
        // thread.
        #[cfg(feature = "idle")]
        let idle_res = if let Some(dur) = self.idle_dur {
          let r = idlemon::run(dur, cmdsig2);
          Some(r)
        } else {
          None
        };

      #[cfg(unix)]
      let ret = {
        #[cfg(feature = "idle")]
        let idle_sh = if let Some((idle_sh, _)) = &idle_res {
          Some(Arc::clone(idle_sh))
        } else {
          None
        };
        #[cfg(unix)]
        let ret = {
          #[cfg(feature = "idle")]
          let idle_sh = if let Some((idle_sh, _)) = &idle_res {
            Some(Arc::clone(idle_sh))
          } else {
            None
          };

        let rp = unix::RunParams {
          ctl_rx,
          #[cfg(feature = "idle")]
          idle_sh,
          ch_rx
        };
        unix::run(cap, handler, rp)
      };
          let rp = unix::RunParams {
            ctl_rx,
            #[cfg(feature = "idle")]
            idle_sh,
            ch_rx
          };
          unix::run(cap, handler, rp)
        };

      #[cfg(windows)]
      let ret = {
        #[cfg(feature = "idle")]
        let idle_sh = if let Some((idle_sh, _)) = &idle_res {
          Some(Arc::clone(idle_sh))
        } else {
          None
        };
        #[cfg(windows)]
        let ret = {
          #[cfg(feature = "idle")]
          let idle_sh = if let Some((idle_sh, _)) = &idle_res {
            Some(Arc::clone(idle_sh))
          } else {
            None
          };

        let rp = win::RunParams {
          cmdreq,
          #[cfg(feature = "idle")]
          idle_sh,
          ch_rx
        };
          let rp = win::RunParams {
            cmdreq,
            #[cfg(feature = "idle")]
            idle_sh,
            ch_rx
          };

        win::run(cap, handler, rp)
      };
          win::run(cap, handler, rp)
        };

      // {unix,win}::run() calls handler.shutdown() because the handler's
      // ownership was passed to it.
        // {unix,win}::run() calls handler.shutdown() because the handler's
        // ownership was passed to it.

      // Kill idle monitoring thread before terminating the receiver thread
      #[cfg(feature = "idle")]
      if let Some((idle_sh, idle_jh)) = idle_res {
        idle_sh.kill();
        let _ = idle_jh.join();
      }
        // Kill idle monitoring thread before terminating the receiver thread
        #[cfg(feature = "idle")]
        if let Some((idle_sh, idle_jh)) = idle_res {
          idle_sh.kill();
          let _ = idle_jh.join();
        }

      ret
    });
        ret
      })
      .map_err(|e| {
        Error::Internal(format!("Unable to launch receiver thread; {}", e))
      })?;

    Ok(Controller {
      jh: Some(jh),
      cmdsig,
      ch_tx
    })
  }

Changes to src/unix.rs.

1
2
3
4
5
6
7

8
9
10
11
12
13
14
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15







+







//! Unix packet receiver backend implementation.
//!
//! This backend uses `mio` to wait for either packets or intra-process
//! commands.

use std::{
  io::{ErrorKind, Read},
  ops::ControlFlow,
  os::fd::AsRawFd,
  sync::mpsc::Receiver
};

#[cfg(feature = "inspect")]
use std::time::Instant;

89
90
91
92
93
94
95
96
97


98
99
100
101
102
103
104
90
91
92
93
94
95
96


97
98
99
100
101
102
103
104
105







-
-
+
+








              #[cfg(feature = "idle")]
              if let Some(ref idle_sh) = rp.idle_sh {
                idle_sh.touch();
              }

              //println!("Got a packet!");
              if let Err(e) = handler.proc(pkt) {
                break 'outer Err(Error::App(e));
              if let ControlFlow::Break(res) = handler.proc(pkt) {
                break 'outer res.map_err(|e| Error::App(e));
              }
            }
            Err(pcap::Error::TimeoutExpired) => {
              // Timeout here means "no more packets to read".
              //eprintln!("No more packets to read");
              break;
            }
156
157
158
159
160
161
162
163
164



165
166
167
168
169
170
171
157
158
159
160
161
162
163


164
165
166
167
168
169
170
171
172
173







-
-
+
+
+







                  raw_pkt_bytes,
                  runtime: Instant::now() - start_time
                };
                handler.inspect(&info);
              }
              #[cfg(feature = "idle")]
              b'd' => {
                // ToDo: Handle error
                let _ = handler.idle();
                if let ControlFlow::Break(res) = handler.idle() {
                  break 'outer res.map_err(|e| Error::App(e));
                }
              }

              b'c' => {
                while let Ok(msg) = rp.ch_rx.try_recv() {
                  match msg {
                    #[cfg(feature = "inspect")]
                    Msg::Inspect(tx) => {

Changes to src/win.rs.

1
2
3
4
5
6
7
8
9
10







11
12
13
14
15
16
17
1
2
3
4
5
6




7
8
9
10
11
12
13
14
15
16
17
18
19
20






-
-
-
-
+
+
+
+
+
+
+







//! Windows packet receiver backend implementation.
//!
//! This backend uses an event semaphore, provided by pcap, to unblock the
//! blocking `pcap_next()` whenever the controller or idle thread requests some
//! action to be taken.

use std::sync::{
  atomic::{AtomicU32, Ordering},
  mpsc::Receiver,
  Arc
use std::{
  ops::ControlFlow,
  sync::{
    atomic::{AtomicU32, Ordering},
    mpsc::Receiver,
    Arc
  }
};

#[cfg(feature = "inspect")]
use std::time::Instant;

use bitflags::bitflags;

84
85
86
87
88
89
90
91
92


93
94
95
96
97
98
99
87
88
89
90
91
92
93


94
95
96
97
98
99
100
101
102







-
-
+
+








        #[cfg(feature = "idle")]
        if let Some(ref idle_sh) = rp.idle_sh {
          idle_sh.touch();
        }

        //println!("Got a packet!");
        if let Err(e) = handler.proc(pkt) {
          break 'outer Err(Error::App(e));
        if let ControlFlow::Break(res) = handler.proc(pkt) {
          break 'outer res.map_err(|e| Error::App(e));
        }
      }
      Err(pcap::Error::TimeoutExpired) => {
        // woken up
        //eprintln!("No more packets to read");
        //break;
      }
127
128
129
130
131
132
133
134
135


136
137
138
139
140
141
142
130
131
132
133
134
135
136


137
138
139
140
141
142
143
144
145







-
-
+
+







        runtime: Instant::now() - start_time
      };
      handler.inspect(&info);
    }

    #[cfg(feature = "idle")]
    if cmd & CmdFlags::IDLE.bits() != 0 {
      if let Err(e) = handler.idle() {
        break 'outer Err(Error::App(e));
      if let ControlFlow::Break(res) = handler.idle() {
        break 'outer res.map_err(|e| Error::App(e));
      }
    }

    if cmd & CmdFlags::CHANNEL.bits() != 0 {
      while let Ok(msg) = rp.ch_rx.try_recv() {
        match msg {
          #[cfg(feature = "inspect")]

Changes to www/changelog.md.

1
2


3
4
5
6
7
8



9
10






11
12






1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29


+
+






+
+
+


+
+
+
+
+
+


+
+
+
+
+
+
# Change Log

- ⚠️  Breaking change

## [Unreleased]

[Details](/vdiff?from=ethrecv-0.0.1&to=trunk)

### Added

- `RecvThread::thread_name()` can be used to set the receiver's thread name.
- Re-export `pcap` from crate root.

### Changed

- ⚠️ Return `ControlFlow` from `PacketHandler::proc()` and
  `PacketHandler::idle()`, to more clearly state intention to continue or abort
  poll loop.  This also opens up the ability to end the loop successfully,
  using `ControlFlow::Break(Ok(()))`.
- Upgrade from `pcap` `1.2.0` to `1.3.0`.

### Removed

---

## [0.0.1] - 2024-03-12

Initial release.

Changes to www/index.md.

1
2
3























4
5
6
7
8
9
10
1
2

3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32


-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







# ethrecv

The _ethrecv_ crate is designed to receive ethernet packets at a high rate.
The _ethrecv_ crate is designed to receive Ethernet packets at a high rate.


## Capabilities on Linux

To avoid having to run applications as `root` when using with raw networking,
the administrator can give the program specific privileges by set the
program's _capabilities_ using the `setcap(8)` command.

For the actual communication the `cap_net_raw` capability is needed. If the
application needs to make interface changes (like set MTU) the `cap_net_admin`
capability is needed.  These can be set using the command:

```
$ sudo setcap cap_net_raw,cap_net_admin=eip <binfile>
```

[systemd services can be assigned these capabilities](https://www.freedesktop.org/software/systemd/man/latest/systemd.exec.html#Capabilities) by adding
the following to the `[Service]` section.

```
AmbientCapabilities=CAP_NET_RAW CAP_NET_ADMIN
```


## Feature labels in documentation

The crate's documentation uses automatically generated feature labels, which
currently requires nightly featuers.  To build the documentation locally use: