Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,10 +1,11 @@ [package] name = "ethrecv" -version = "0.0.1" +version = "0.0.2" edition = "2021" license = "0BSD" +# https://crates.io/category_slugs categories = [ "network-programming" ] keywords = [ "ethernet", "packets", "raw", "network", "receive" ] repository = "https://repos.qrnch.tech/pub/ethrecv" description = "Receive ethernet packets at a high rate." rust-version = "1.56" @@ -16,18 +17,22 @@ "build.ps1", "build.sh", "rustfmt.toml" ] +# https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section +[badges] +maintenance = { status = "experimental" } + [features] idle = ["dep:atomic-time", "dep:parking_lot"] inspect = [] [dependencies] atomic-time = { version = "0.1.4", optional = true } parking_lot = { version = "0.12.1", optional = true } -pcap = { version = "1.2.0" } +pcap = { version = "1.3.0" } [target.'cfg(unix)'.dependencies] mio = { version = "0.8.11", features = ["os-poll", "os-ext"] } parking_lot = { version = "0.12.1" } Index: README.md ================================================================== --- README.md +++ README.md @@ -1,4 +1,12 @@ # ethrecv The _ethrecv_ crate is designed to receive ethernet packets at a high rate. +[![Crates.io][crates-badge]][crates-url] +[![0BSD licensed][0bsd-badge]][0bsd-url] + +[crates-badge]: https://img.shields.io/crates/v/ethrecv.svg +[crates-url]: https://crates.io/crates/ethrecv +[0bsd-badge]: https://img.shields.io/badge/license-0BSD-blue.svg +[0bsd-url]: https://opensource.org/license/0bsd + Index: examples/simple-demo.rs ================================================================== --- examples/simple-demo.rs +++ examples/simple-demo.rs @@ -1,6 +1,6 @@ -use std::env; +use std::{env, ops::ControlFlow}; use ethrecv::{PacketHandler, RecvThread}; #[derive(Debug)] enum AppError {} @@ -9,19 +9,22 @@ struct PktProc {} impl PacketHandler for PktProc { type Error = AppError; - fn proc(&mut self, _pkt: pcap::Packet) -> Result<(), Self::Error> { + fn proc( + &mut self, + _pkt: pcap::Packet + ) -> ControlFlow> { eprintln!("packet!"); - Ok(()) + ControlFlow::Continue(()) } #[cfg(feature = "idle")] - fn idle(&mut self) -> Result<(), Self::Error> { + fn idle(&mut self) -> ControlFlow> { eprintln!("idle!"); - Ok(()) + ControlFlow::Continue(()) } #[cfg(feature = "inspect")] fn inspect(&self, info: ðrecv::RecvInfo) { eprintln!("inspect!"); Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -27,10 +27,11 @@ mod cmdsig; mod err; use std::{ + ops::ControlFlow, sync::{mpsc::Sender, Arc}, thread }; #[cfg(any(feature = "idle", feature = "inspect"))] @@ -44,10 +45,13 @@ pub use err::Error; use cmdsig::CmdSignal; + +pub use pcap; + /// Messages that can be sent back to controller from recever thread. enum Msg { #[cfg(feature = "inspect")] Inspect(std::sync::mpsc::Sender) @@ -73,18 +77,21 @@ fn init(&mut self) -> Result<(), Self::Error> { Ok(()) } /// Called to process packets. - fn proc(&mut self, pkt: pcap::Packet) -> Result<(), Self::Error>; + fn proc( + &mut self, + pkt: pcap::Packet + ) -> ControlFlow>; /// Called whenever a timeout has been reached without any new packets /// arriving. #[cfg(feature = "idle")] #[cfg_attr(docsrs, doc(cfg(feature = "idle")))] - fn idle(&mut self) -> Result<(), Self::Error> { - Ok(()) + fn idle(&mut self) -> ControlFlow> { + ControlFlow::Continue(()) } /// Called when the controller has requested an inspection. #[cfg(feature = "inspect")] #[cfg_attr(docsrs, doc(cfg(feature = "inspect")))] @@ -98,19 +105,22 @@ /// A builder-like object for initializing the packet receiver thread. pub struct RecvThread { devname: String, bufsize: i32, + thread_name: Option, + #[cfg(feature = "idle")] idle_dur: Option } impl RecvThread { pub fn new(devname: &str) -> Self { Self { devname: devname.to_string(), bufsize: 16 * 1024 * 1024, + thread_name: None, #[cfg(feature = "idle")] idle_dur: None } } @@ -144,10 +154,20 @@ #[cfg_attr(docsrs, doc(cfg(feature = "idle")))] pub fn idle_duration_r(&mut self, dur: Duration) -> &mut Self { self.idle_dur = Some(dur); self } + + pub fn thread_name(mut self, name: impl ToString) -> Self { + self.thread_name_r(name); + self + } + + pub fn thread_name_r(&mut self, name: impl ToString) -> &mut Self { + self.thread_name = Some(name.to_string()); + self + } } impl RecvThread { pub fn run( @@ -188,72 +208,83 @@ let (ch_tx, ch_rx) = std::sync::mpsc::channel::(); #[cfg(feature = "idle")] let cmdsig2 = cmdsig.clone(); - let jh = thread::spawn(move || { - handler.init().map_err(|e| Error::App(e))?; - - // If the "idle" feature is used, then kick off the idle monitoring - // thread. - #[cfg(feature = "idle")] - let idle_res = if let Some(dur) = self.idle_dur { - let r = idlemon::run(dur, cmdsig2); - Some(r) - } else { - None - }; - - #[cfg(unix)] - let ret = { - #[cfg(feature = "idle")] - let idle_sh = if let Some((idle_sh, _)) = &idle_res { - Some(Arc::clone(idle_sh)) - } else { - None - }; - - let rp = unix::RunParams { - ctl_rx, - #[cfg(feature = "idle")] - idle_sh, - ch_rx - }; - unix::run(cap, handler, rp) - }; - - #[cfg(windows)] - let ret = { - #[cfg(feature = "idle")] - let idle_sh = if let Some((idle_sh, _)) = &idle_res { - Some(Arc::clone(idle_sh)) - } else { - None - }; - - let rp = win::RunParams { - cmdreq, - #[cfg(feature = "idle")] - idle_sh, - ch_rx - }; - - win::run(cap, handler, rp) - }; - - // {unix,win}::run() calls handler.shutdown() because the handler's - // ownership was passed to it. - - // Kill idle monitoring thread before terminating the receiver thread - #[cfg(feature = "idle")] - if let Some((idle_sh, idle_jh)) = idle_res { - idle_sh.kill(); - let _ = idle_jh.join(); - } - - ret - }); + let bldr = thread::Builder::new(); + let bldr = if let Some(thread_name) = self.thread_name { + bldr.name(thread_name) + } else { + bldr + }; + + let jh = bldr + .spawn(move || { + handler.init().map_err(|e| Error::App(e))?; + + // If the "idle" feature is used, then kick off the idle monitoring + // thread. + #[cfg(feature = "idle")] + let idle_res = if let Some(dur) = self.idle_dur { + let r = idlemon::run(dur, cmdsig2); + Some(r) + } else { + None + }; + + #[cfg(unix)] + let ret = { + #[cfg(feature = "idle")] + let idle_sh = if let Some((idle_sh, _)) = &idle_res { + Some(Arc::clone(idle_sh)) + } else { + None + }; + + let rp = unix::RunParams { + ctl_rx, + #[cfg(feature = "idle")] + idle_sh, + ch_rx + }; + unix::run(cap, handler, rp) + }; + + #[cfg(windows)] + let ret = { + #[cfg(feature = "idle")] + let idle_sh = if let Some((idle_sh, _)) = &idle_res { + Some(Arc::clone(idle_sh)) + } else { + None + }; + + let rp = win::RunParams { + cmdreq, + #[cfg(feature = "idle")] + idle_sh, + ch_rx + }; + + win::run(cap, handler, rp) + }; + + // {unix,win}::run() calls handler.shutdown() because the handler's + // ownership was passed to it. + + // Kill idle monitoring thread before terminating the receiver thread + #[cfg(feature = "idle")] + if let Some((idle_sh, idle_jh)) = idle_res { + idle_sh.kill(); + let _ = idle_jh.join(); + } + + ret + }) + .map_err(|e| { + Error::Internal(format!("Unable to launch receiver thread; {}", e)) + })?; Ok(Controller { jh: Some(jh), cmdsig, ch_tx Index: src/unix.rs ================================================================== --- src/unix.rs +++ src/unix.rs @@ -3,10 +3,11 @@ //! This backend uses `mio` to wait for either packets or intra-process //! commands. use std::{ io::{ErrorKind, Read}, + ops::ControlFlow, os::fd::AsRawFd, sync::mpsc::Receiver }; #[cfg(feature = "inspect")] @@ -91,12 +92,12 @@ if let Some(ref idle_sh) = rp.idle_sh { idle_sh.touch(); } //println!("Got a packet!"); - if let Err(e) = handler.proc(pkt) { - break 'outer Err(Error::App(e)); + if let ControlFlow::Break(res) = handler.proc(pkt) { + break 'outer res.map_err(|e| Error::App(e)); } } Err(pcap::Error::TimeoutExpired) => { // Timeout here means "no more packets to read". //eprintln!("No more packets to read"); @@ -158,12 +159,13 @@ }; handler.inspect(&info); } #[cfg(feature = "idle")] b'd' => { - // ToDo: Handle error - let _ = handler.idle(); + if let ControlFlow::Break(res) = handler.idle() { + break 'outer res.map_err(|e| Error::App(e)); + } } b'c' => { while let Ok(msg) = rp.ch_rx.try_recv() { match msg { Index: src/win.rs ================================================================== --- src/win.rs +++ src/win.rs @@ -2,14 +2,17 @@ //! //! This backend uses an event semaphore, provided by pcap, to unblock the //! blocking `pcap_next()` whenever the controller or idle thread requests some //! action to be taken. -use std::sync::{ - atomic::{AtomicU32, Ordering}, - mpsc::Receiver, - Arc +use std::{ + ops::ControlFlow, + sync::{ + atomic::{AtomicU32, Ordering}, + mpsc::Receiver, + Arc + } }; #[cfg(feature = "inspect")] use std::time::Instant; @@ -86,12 +89,12 @@ if let Some(ref idle_sh) = rp.idle_sh { idle_sh.touch(); } //println!("Got a packet!"); - if let Err(e) = handler.proc(pkt) { - break 'outer Err(Error::App(e)); + if let ControlFlow::Break(res) = handler.proc(pkt) { + break 'outer res.map_err(|e| Error::App(e)); } } Err(pcap::Error::TimeoutExpired) => { // woken up //eprintln!("No more packets to read"); @@ -129,12 +132,12 @@ handler.inspect(&info); } #[cfg(feature = "idle")] if cmd & CmdFlags::IDLE.bits() != 0 { - if let Err(e) = handler.idle() { - break 'outer Err(Error::App(e)); + if let ControlFlow::Break(res) = handler.idle() { + break 'outer res.map_err(|e| Error::App(e)); } } if cmd & CmdFlags::CHANNEL.bits() != 0 { while let Ok(msg) = rp.ch_rx.try_recv() { Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -1,12 +1,29 @@ # Change Log +- ⚠️ Breaking change + ## [Unreleased] [Details](/vdiff?from=ethrecv-0.0.1&to=trunk) ### Added +- `RecvThread::thread_name()` can be used to set the receiver's thread name. +- Re-export `pcap` from crate root. + ### Changed +- ⚠️ Return `ControlFlow` from `PacketHandler::proc()` and + `PacketHandler::idle()`, to more clearly state intention to continue or abort + poll loop. This also opens up the ability to end the loop successfully, + using `ControlFlow::Break(Ok(()))`. +- Upgrade from `pcap` `1.2.0` to `1.3.0`. + ### Removed +--- + +## [0.0.1] - 2024-03-12 + +Initial release. + Index: www/index.md ================================================================== --- www/index.md +++ www/index.md @@ -1,8 +1,30 @@ # ethrecv -The _ethrecv_ crate is designed to receive ethernet packets at a high rate. +The _ethrecv_ crate is designed to receive Ethernet packets at a high rate. + + +## Capabilities on Linux + +To avoid having to run applications as `root` when using with raw networking, +the administrator can give the program specific privileges by set the +program's _capabilities_ using the `setcap(8)` command. + +For the actual communication the `cap_net_raw` capability is needed. If the +application needs to make interface changes (like set MTU) the `cap_net_admin` +capability is needed. These can be set using the command: + +``` +$ sudo setcap cap_net_raw,cap_net_admin=eip +``` + +[systemd services can be assigned these capabilities](https://www.freedesktop.org/software/systemd/man/latest/systemd.exec.html#Capabilities) by adding +the following to the `[Service]` section. + +``` +AmbientCapabilities=CAP_NET_RAW CAP_NET_ADMIN +``` ## Feature labels in documentation The crate's documentation uses automatically generated feature labels, which