Index: .efiles ================================================================== --- .efiles +++ .efiles @@ -1,4 +1,8 @@ Cargo.toml README.md -src/lib.rs www/index.md +www/changelog.md +src/lib.rs +tests/endmsg.rs +examples/simple.rs +examples/threaded.rs Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,20 +1,37 @@ [package] name = "procsem" -version = "0.1.0" +version = "0.2.0" edition = "2021" license = "0BSD" -keywords = [ "locking" ] +# https://crates.io/category_slugs +categories = ["asynchronous"] +keywords = ["locking"] repository = "https://repos.qrnch.tech/pub/procsem" -description = "Semaphore used to lock thread/task spanning sequence of operations" +description = "Semaphore used to lock thread/task-spanning sequence of operations" exclude = [ ".fossil-settings", ".efiles", ".fslckout", + "bacon.toml", "rustfmt.toml", + "examples", "www" ] +# https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section +[badges] +maintenance = { status = "experimental" } + [dependencies] -indexmap = { version = "1.9.2" } -parking_lot = { version = "0.12.1" } +indexmap = { version = "2.6.0" } +parking_lot = { version = "0.12.3" } + +[package.metadata.docs.rs] +rustdoc-args = ["--generate-link-to-definition"] + +[lints.clippy] +all = { level = "warn", priority = -1 } +pedantic = { level = "warn", priority = -1 } +nursery = { level = "warn", priority = -1 } +cargo = { level = "warn", priority = -1 } ADDED bacon.toml Index: bacon.toml ================================================================== --- /dev/null +++ bacon.toml @@ -0,0 +1,111 @@ +# This is a configuration file for the bacon tool +# +# Complete help on configuration: https://dystroy.org/bacon/config/ +# +# You may check the current default at +# https://github.com/Canop/bacon/blob/main/defaults/default-bacon.toml + +default_job = "clippy-all" +env.CARGO_TERM_COLOR = "always" + +[jobs.check] +command = ["cargo", "check"] +need_stdout = false + +[jobs.check-all] +command = ["cargo", "check", "--all-targets"] +need_stdout = false + +# Run clippy on the default target +[jobs.clippy] +command = ["cargo", "clippy"] +need_stdout = false + +# Run clippy on all targets +# To disable some lints, you may change the job this way: +# [jobs.clippy-all] +# command = [ +# "cargo", "clippy", +# "--all-targets", +# "--", +# "-A", "clippy::bool_to_int_with_if", +# "-A", "clippy::collapsible_if", +# "-A", "clippy::derive_partial_eq_without_eq", +# ] +# need_stdout = false +[jobs.clippy-all] +command = ["cargo", "clippy", "--all-targets"] +need_stdout = false + +# This job lets you run +# - all tests: bacon test +# - a specific test: bacon test -- config::test_default_files +# - the tests of a package: bacon test -- -- -p config +[jobs.test] +command = ["cargo", "test"] +need_stdout = true + +[jobs.nextest] +command = [ + "cargo", "nextest", "run", + "--hide-progress-bar", "--failure-output", "final" +] +need_stdout = true +analyzer = "nextest" + +[jobs.doc] +command = ["cargo", "doc", "--no-deps"] +need_stdout = false + +# If the doc compiles, then it opens in your browser and bacon switches +# to the previous job +[jobs.doc-open] +command = ["cargo", "doc", "--no-deps", "--open"] +need_stdout = false +on_success = "back" # so that we don't open the browser at each change + +# You can run your application and have the result displayed in bacon, +# if it makes sense for this crate. +[jobs.run] +command = [ + "cargo", "run", + # put launch parameters for your program behind a `--` separator +] +need_stdout = true +allow_warnings = true +background = true + +# Run your long-running application (eg server) and have the result displayed in bacon. +# For programs that never stop (eg a server), `background` is set to false +# to have the cargo run output immediately displayed instead of waiting for +# program's end. +# 'on_change_strategy' is set to `kill_then_restart` to have your program restart +# on every change (an alternative would be to use the 'F5' key manually in bacon). +# If you often use this job, it makes sense to override the 'r' key by adding +# a binding `r = job:run-long` at the end of this file . +[jobs.run-long] +command = [ + "cargo", "run", + # put launch parameters for your program behind a `--` separator +] +need_stdout = true +allow_warnings = true +background = false +on_change_strategy = "kill_then_restart" + +# This parameterized job runs the example of your choice, as soon +# as the code compiles. +# Call it as +# bacon ex -- my-example +[jobs.ex] +command = ["cargo", "run", "--example"] +need_stdout = true +allow_warnings = true + +# You may define here keybindings that would be specific to +# a project, for example a shortcut to launch a specific job. +# Shortcuts to internal functions (scrolling, toggling, etc.) +# should go in your personal global prefs.toml file instead. +[keybindings] +# alt-m = "job:my-job" +c = "job:clippy-all" # comment this to have 'c' run clippy on only the default target ADDED examples/simple.rs Index: examples/simple.rs ================================================================== --- /dev/null +++ examples/simple.rs @@ -0,0 +1,66 @@ +use procsem::{ProcCtx, ProcSem, StateReporter}; + +#[derive(Default)] +struct Reporter {} + +impl StateReporter for Reporter { + fn begin(&self) { + println!("==> State update: begin"); + } + + fn action(&self, s: &str) { + println!("==> State update: action({s})"); + } + + fn progress(&self, percent: Option) { + println!("==> State update: progress({percent:?})"); + } + + fn end(&self, endmsg: Option) { + println!("==> State update: end({endmsg:?})"); + } +} + + +fn main() { + // + // Create the callback receiver + // + let reporter = Reporter::default(); + + // + // Create the process semaphore, registering with it the reporter callback + // + let procsem = ProcSem::with_reporter(reporter); + + // + // Acquire process semaphore + // + let Some(pctx) = procsem.try_acquire() else { + panic!("Unable to acquire process semaphore"); + }; + + // + // Fail to process semaphore + // + assert!(procsem.try_acquire().is_none()); + + + run_process(pctx); +} + +#[allow(clippy::needless_pass_by_value)] +fn run_process(pctx: ProcCtx) { + pctx.action("action"); + + pctx.progress(None); + pctx.progress(Some(25)); + pctx.action("half-way"); + pctx.progress(Some(50)); + pctx.progress(Some(75)); + pctx.progress(Some(100)); + + pctx.end("end"); +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED examples/threaded.rs Index: examples/threaded.rs ================================================================== --- /dev/null +++ examples/threaded.rs @@ -0,0 +1,74 @@ +use std::thread; + +use procsem::{ProcCtx, ProcSem, StateReporter}; + +#[derive(Default)] +struct Reporter {} + +impl StateReporter for Reporter { + fn begin(&self) { + println!("==> State update: begin"); + } + + fn action(&self, s: &str) { + println!("==> State update: action({s})"); + } + + fn progress(&self, percent: Option) { + println!("==> State update: progress({percent:?})"); + } + + fn end(&self, endmsg: Option) { + println!("==> State update: end({endmsg:?})"); + } +} + + +fn main() { + // + // Create the callback receiver + // + let reporter = Reporter::default(); + + // + // Create the process semaphore, registering with it the reporter callback + // + let procsem = ProcSem::with_reporter(reporter); + + // + // Acquire process semaphore + // + let Some(pctx) = procsem.try_acquire() else { + panic!("Unable to acquire process semaphore"); + }; + + // + // Fail to process semaphore + // + assert!(procsem.try_acquire().is_none()); + + // + // Pass ProcCtx to a different thread. + // + let jh = thread::spawn(move || { + run_process(pctx); + }); + + let _ = jh.join(); +} + +#[allow(clippy::needless_pass_by_value)] +fn run_process(pctx: ProcCtx) { + pctx.action("action"); + + pctx.progress(None); + pctx.progress(Some(25)); + pctx.action("half-way"); + pctx.progress(Some(50)); + pctx.progress(Some(75)); + pctx.progress(Some(100)); + + pctx.end("end"); +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -9,10 +9,20 @@ //! This is much like a `Mutex`, but it differs in that it holds no generic //! parameter and the `ProcCtx` (the equivalent of `Mutex`'s `MutexGuard`) is //! `Send`, because it is explicitly meant to be passed around between //! threads/tasks. It supports blocking, nonblocking and async lock //! acquisition. +//! +//! # Progress reporting +//! When a `ProcSem` is created, it can optionally be handed an object that +//! implements [`StateReporter`]. If a `ProcSem` has a `StateReporter` +//! associated with it, then an acquired `ProcCtx` can use +//! [`ProcCtx::action()`], [`ProcCtx::progress()`] to pass progress information +//! to the `StateReporter` implementor. +//! +//! Once a `ProcCtx` is about to terminate, [`ProcCtx::end()`] can be used to +//! signal a final message that can be retrieved by the `ProcSem` object. use std::{ future::Future, num::NonZeroUsize, pin::Pin, @@ -28,30 +38,38 @@ use indexmap::IndexMap; /// Interface used to implement objects that are reponsible for sending process /// reports. +#[allow(unused_variables)] pub trait StateReporter: Send + Sync { - /// A new [`ProcCtx`] has been acquired. + /// Called when a [`ProcCtx`] has been acquired. fn begin(&self) {} - fn action(&self, _s: &str) {} - fn reset_action(&self, _s: &str) {} - fn progress(&self, _percent: Option) {} + /// Called when [`ProcCtx::action()`] has been called to set a current + /// action. + fn action(&self, s: &str) {} + + /// The [`ProcCtx`] has progress has been made. + fn progress(&self, percent: Option) {} - /// The [`ProcCtx`] is being dropped. - fn end(&self) {} + /// Called when the [`ProcCtx`] is being dropped. + /// + /// This method will be called while the process ownership context still + /// exists. If the `ProcCtx` is used as a resource lock, the application + /// can rely on mutual exclusive access to the resource within this callback. + fn end(&self, msg: Option) {} } /// Shared data that must be protected behind mutually exclusive access. #[derive(Default)] struct Inner { /// Flag denoting whether the semaphore is currently owned or not. /// - /// If this is `true` it means there's a `Lock` object. If this is `false` - /// there's no `Lock` object for this semaphore. + /// If this is `true` it means there's a `ProcCtx` object. If this is + /// `false` there's no `ProcCtx` object for this semaphore. owned: bool, /// Collection of async task wakers waiting to acquire this lock. wakers: IndexMap, @@ -73,31 +91,39 @@ #[derive(Default)] pub struct ProcSem(Arc); impl ProcSem { /// Create a new process semaphore object. + #[must_use] pub fn new() -> Self { Self::default() } /// Create a new process semaphore with a reporter object. - pub fn with_reporter(reporter: Box) -> Self { + pub fn with_reporter(reporter: impl StateReporter + 'static) -> Self { Self(Arc::new(Shared { - reporter: Some(reporter), + reporter: Some(Box::new(reporter)), ..Default::default() })) } /// Return a `Future` that will yield a [`ProcCtx`] when possible. + /// + /// On successful acquire the internal endmsg will be cleared. + #[must_use] pub fn acquire_async(&self) -> AcquireFuture { AcquireFuture { shared: Arc::clone(&self.0), id: None } } /// Acquire a [`ProcCtx`], blocking until successful. + /// + /// On successful acquire the internal endmsg will be cleared. + #[must_use] + #[allow(clippy::significant_drop_tightening)] pub fn acquire_blocking(&self) -> ProcCtx { let mut g = self.0.inner.lock(); loop { if g.owned { // There's a Lock for this ProcSem. @@ -104,38 +130,51 @@ // Wait until the condvar is triggered so we can check again. self.0.signal.wait(&mut g); } else { // Lock and return a Lock object g.owned = true; + g.endmsg = None; + if let Some(ref r) = self.0.reporter { + r.begin(); + } break ProcCtx(Arc::clone(&self.0)); } } } /// Attempt to acquire [`ProcCtx`], returning immediately. /// /// Returns `Some(ProcCtx)` if the lock was acquired successfully. Returns /// `None` if the lock is already owned by another lock. + /// + /// On successful acquire the internal endmsg will be cleared. + #[must_use] + #[allow(clippy::significant_drop_tightening)] pub fn try_acquire(&self) -> Option { let mut g = self.0.inner.lock(); if g.owned { // Lock already acquired None } else { g.owned = true; + g.endmsg = None; + if let Some(ref r) = self.0.reporter { + r.begin(); + } Some(ProcCtx(Arc::clone(&self.0))) } } /// Get a clone of the internal end message. + #[must_use] pub fn endmsg(&self) -> Option { - let g = self.0.inner.lock(); - g.endmsg.clone() + self.0.inner.lock().endmsg.clone() } } +/// Future used to wait to acquire a [`ProcCtx`] in an async environment. pub struct AcquireFuture { shared: Arc, id: Option } @@ -172,10 +211,16 @@ } else { // // Mark lock as owned and return a Lock object. // inner.owned = true; + inner.endmsg = None; + if let Some(ref r) = self.shared.reporter { + r.begin(); + } + + drop(inner); Poll::Ready(ProcCtx(Arc::clone(&self.shared))) } } } @@ -183,11 +228,11 @@ /// If this future has generated a `Waker`, then remove it. fn drop(&mut self) { if let Some(id) = self.id { let mut inner = self.shared.inner.lock(); // Remove this future's waker - let _ = inner.wakers.remove(&id.get()); + let _ = inner.wakers.swap_remove(&id.get()); } } } @@ -198,57 +243,70 @@ #[repr(transparent)] pub struct ProcCtx(Arc); impl ProcCtx { - /// Describe the current stage in the process. - /// - /// Setting this will clear the progress. + /// Provide a textual describe the current stage of the process. pub fn action(&self, text: &str) { if let Some(ref r) = self.0.reporter { r.action(text); } } - pub fn reset_action(&self, text: &str) { - if let Some(ref r) = self.0.reporter { - r.reset_action(text); - } - } - - /// Set the progress (in percent) of the current stage. + /// Set the progress (in percent). pub fn progress(&self, percent: Option) { if let Some(ref r) = self.0.reporter { r.progress(percent); } } /// Set an end-state message. /// - /// This message will be stored in the shared state, which means it can be - /// queried through the [`ProcSem`] + /// The purpose of the "end message" is to be able to leave a final "result" + /// message that can be read by the `ProcSem`. + /// + /// Calling this function will not cause the [`StateReporter::end()`] to be + /// called. However, if an end message has been set, a clone if it will be + /// passed to the `StateReporter::end()` callback. + /// + /// The message will be stored in the internal [`ProcSem`] storage + /// and can be retrieved using [`ProcSem::endmsg()`]. pub fn end(&self, text: &str) { let mut g = self.0.inner.lock(); g.endmsg = Some(text.to_string()); } } impl Drop for ProcCtx { + #[allow(clippy::significant_drop_tightening)] fn drop(&mut self) { if let Some(ref r) = self.0.reporter { - r.end(); + let g = self.0.inner.lock(); + let endmsg = g.endmsg.clone(); + drop(g); + r.end(endmsg); } let mut g = self.0.inner.lock(); g.owned = false; - // If there are any wakers waiting, then extract one and trigger it. - // Otherwise signal the condvar. - if let Some((_, n)) = g.wakers.pop() { - n.wake(); - } else { - self.0.signal.notify_one(); + // Wake up all waiting futures. + // + // In an ideal world only one would be woken up, but there are edge cases + // when using multithreaded executors when an await is performed in a + // select!{} in which a doomed future might be picked. In such a case the + // "wake" would be wasted on a future that will not be able to process the + // wake request. + // + // To avoid this, _all_ wakers are woken up and they'll have to reenter + // pending state if applicable. + for (_, w) in g.wakers.drain(..) { + w.wake(); } + + // Wake a single blocking thread (blocked threads do not suffer from the + // same issue). + self.0.signal.notify_one(); } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED tests/endmsg.rs Index: tests/endmsg.rs ================================================================== --- /dev/null +++ tests/endmsg.rs @@ -0,0 +1,30 @@ +use procsem::ProcSem; + +#[test] +fn endmsg() { + let procsem = ProcSem::new(); + + let Some(pctx) = procsem.try_acquire() else { + panic!("Unable to acquire process semaphore"); + }; + + pctx.end("final"); + + drop(pctx); + + // + // Should be "final" + // + assert_eq!(procsem.endmsg(), Some(String::from("final"))); + + let Some(_pctx) = procsem.try_acquire() else { + panic!("Unable to acquire process semaphore"); + }; + + // + // Should be reset + // + assert_eq!(procsem.endmsg(), None); +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED www/changelog.md Index: www/changelog.md ================================================================== --- /dev/null +++ www/changelog.md @@ -0,0 +1,35 @@ +# Change Log + +⚠️ indicates a breaking change. + +## [Unreleased] + +[Details](/vdiff?from=procsem-0.2.0&to=trunk) + +### Added + +### Changed + +### Removed + +--- + +## [0.2.0] - 2024-11-26 + +[Details](/vdiff?from=procsem-0.1.0&to=procsem-0.2.0) + +### Added + +- Actually call `StateReporter::begin()` when appropriate. + +### Changed + +- ⚠️ Pass end message to `StateReporter::end()`, if it was set previously. +- Internal endmsg is cleared on successful `ProcCtx` acquire. + +--- + +## [0.1.0] - 2023-02-03 + +Initial release. + Index: www/index.md ================================================================== --- www/index.md +++ www/index.md @@ -1,10 +1,10 @@ # procsem An instantiated `ProcSem` object represents a potential sequence of operations that may be active or inactive, but can only be activated once at a time. -When the ProcSem is acquired it will generate a `ProcCtx` object that +When the `ProcSem` is acquired it will yield a `ProcCtx` object that represents ownership of the sequence of operations. The `ProcSem` will remain in this "owned" state until the `ProcCtx` is dropped. The idea is that a process, which may span several threads/tasks, passes the `ProcCtx` object around for as long as a process is running to prohibit another @@ -13,5 +13,17 @@ A use-case for procsem is running an update process on an embedded system, where a new firmware version needs to be uploaded to the device, then unpacked, verified and finally installed. A `ProcSem` can be used to protect an entire update sequence from launching more than once at a time. + +## Change log + +The details of changes can always be found in the timeline, but for a +high-level view of changes between released versions there's a manually +maintained [Change Log](./changelog.md). + + +## Project Status + +_procsem_ is experimental. +