Many hyperlinks are disabled.
Use anonymous login
to enable hyperlinks.
Difference From procsem-0.1.0 To procsem-0.2.0
2024-11-26
| ||
00:13 | Release maintenance. Leaf check-in: bc8642f540 user: jan tags: trunk, procsem-0.2.0 | |
00:07 | Add endmsg tests. check-in: 67c95505b5 user: jan tags: trunk | |
2023-11-05
| ||
17:47 | Link to definitions on docs.rs. check-in: a4d2270686 user: jan tags: trunk | |
2023-02-03
| ||
11:07 | Move code from old repo. check-in: a952e215e3 user: jan tags: trunk, procsem-0.1.0 | |
10:06 | initial empty check-in check-in: 3451c8e4ca user: jan tags: trunk | |
Changes to .efiles.
1 2 3 | Cargo.toml README.md src/lib.rs | > > | > > | 1 2 3 4 5 6 7 8 | Cargo.toml README.md www/index.md www/changelog.md src/lib.rs tests/endmsg.rs examples/simple.rs examples/threaded.rs |
Changes to Cargo.toml.
1 2 | [package] name = "procsem" | | > > | | > > > > > > | | > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | [package] name = "procsem" version = "0.2.0" edition = "2021" license = "0BSD" # https://crates.io/category_slugs categories = ["asynchronous"] keywords = ["locking"] repository = "https://repos.qrnch.tech/pub/procsem" description = "Semaphore used to lock thread/task-spanning sequence of operations" exclude = [ ".fossil-settings", ".efiles", ".fslckout", "bacon.toml", "rustfmt.toml", "examples", "www" ] # https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section [badges] maintenance = { status = "experimental" } [dependencies] indexmap = { version = "2.6.0" } parking_lot = { version = "0.12.3" } [package.metadata.docs.rs] rustdoc-args = ["--generate-link-to-definition"] [lints.clippy] all = { level = "warn", priority = -1 } pedantic = { level = "warn", priority = -1 } nursery = { level = "warn", priority = -1 } cargo = { level = "warn", priority = -1 } |
Added bacon.toml.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | # This is a configuration file for the bacon tool # # Complete help on configuration: https://dystroy.org/bacon/config/ # # You may check the current default at # https://github.com/Canop/bacon/blob/main/defaults/default-bacon.toml default_job = "clippy-all" env.CARGO_TERM_COLOR = "always" [jobs.check] command = ["cargo", "check"] need_stdout = false [jobs.check-all] command = ["cargo", "check", "--all-targets"] need_stdout = false # Run clippy on the default target [jobs.clippy] command = ["cargo", "clippy"] need_stdout = false # Run clippy on all targets # To disable some lints, you may change the job this way: # [jobs.clippy-all] # command = [ # "cargo", "clippy", # "--all-targets", # "--", # "-A", "clippy::bool_to_int_with_if", # "-A", "clippy::collapsible_if", # "-A", "clippy::derive_partial_eq_without_eq", # ] # need_stdout = false [jobs.clippy-all] command = ["cargo", "clippy", "--all-targets"] need_stdout = false # This job lets you run # - all tests: bacon test # - a specific test: bacon test -- config::test_default_files # - the tests of a package: bacon test -- -- -p config [jobs.test] command = ["cargo", "test"] need_stdout = true [jobs.nextest] command = [ "cargo", "nextest", "run", "--hide-progress-bar", "--failure-output", "final" ] need_stdout = true analyzer = "nextest" [jobs.doc] command = ["cargo", "doc", "--no-deps"] need_stdout = false # If the doc compiles, then it opens in your browser and bacon switches # to the previous job [jobs.doc-open] command = ["cargo", "doc", "--no-deps", "--open"] need_stdout = false on_success = "back" # so that we don't open the browser at each change # You can run your application and have the result displayed in bacon, # if it makes sense for this crate. [jobs.run] command = [ "cargo", "run", # put launch parameters for your program behind a `--` separator ] need_stdout = true allow_warnings = true background = true # Run your long-running application (eg server) and have the result displayed in bacon. # For programs that never stop (eg a server), `background` is set to false # to have the cargo run output immediately displayed instead of waiting for # program's end. # 'on_change_strategy' is set to `kill_then_restart` to have your program restart # on every change (an alternative would be to use the 'F5' key manually in bacon). # If you often use this job, it makes sense to override the 'r' key by adding # a binding `r = job:run-long` at the end of this file . [jobs.run-long] command = [ "cargo", "run", # put launch parameters for your program behind a `--` separator ] need_stdout = true allow_warnings = true background = false on_change_strategy = "kill_then_restart" # This parameterized job runs the example of your choice, as soon # as the code compiles. # Call it as # bacon ex -- my-example [jobs.ex] command = ["cargo", "run", "--example"] need_stdout = true allow_warnings = true # You may define here keybindings that would be specific to # a project, for example a shortcut to launch a specific job. # Shortcuts to internal functions (scrolling, toggling, etc.) # should go in your personal global prefs.toml file instead. [keybindings] # alt-m = "job:my-job" c = "job:clippy-all" # comment this to have 'c' run clippy on only the default target |
Added examples/simple.rs.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | use procsem::{ProcCtx, ProcSem, StateReporter}; #[derive(Default)] struct Reporter {} impl StateReporter for Reporter { fn begin(&self) { println!("==> State update: begin"); } fn action(&self, s: &str) { println!("==> State update: action({s})"); } fn progress(&self, percent: Option<u8>) { println!("==> State update: progress({percent:?})"); } fn end(&self, endmsg: Option<String>) { println!("==> State update: end({endmsg:?})"); } } fn main() { // // Create the callback receiver // let reporter = Reporter::default(); // // Create the process semaphore, registering with it the reporter callback // let procsem = ProcSem::with_reporter(reporter); // // Acquire process semaphore // let Some(pctx) = procsem.try_acquire() else { panic!("Unable to acquire process semaphore"); }; // // Fail to process semaphore // assert!(procsem.try_acquire().is_none()); run_process(pctx); } #[allow(clippy::needless_pass_by_value)] fn run_process(pctx: ProcCtx) { pctx.action("action"); pctx.progress(None); pctx.progress(Some(25)); pctx.action("half-way"); pctx.progress(Some(50)); pctx.progress(Some(75)); pctx.progress(Some(100)); pctx.end("end"); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Added examples/threaded.rs.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | use std::thread; use procsem::{ProcCtx, ProcSem, StateReporter}; #[derive(Default)] struct Reporter {} impl StateReporter for Reporter { fn begin(&self) { println!("==> State update: begin"); } fn action(&self, s: &str) { println!("==> State update: action({s})"); } fn progress(&self, percent: Option<u8>) { println!("==> State update: progress({percent:?})"); } fn end(&self, endmsg: Option<String>) { println!("==> State update: end({endmsg:?})"); } } fn main() { // // Create the callback receiver // let reporter = Reporter::default(); // // Create the process semaphore, registering with it the reporter callback // let procsem = ProcSem::with_reporter(reporter); // // Acquire process semaphore // let Some(pctx) = procsem.try_acquire() else { panic!("Unable to acquire process semaphore"); }; // // Fail to process semaphore // assert!(procsem.try_acquire().is_none()); // // Pass ProcCtx to a different thread. // let jh = thread::spawn(move || { run_process(pctx); }); let _ = jh.join(); } #[allow(clippy::needless_pass_by_value)] fn run_process(pctx: ProcCtx) { pctx.action("action"); pctx.progress(None); pctx.progress(Some(25)); pctx.action("half-way"); pctx.progress(Some(50)); pctx.progress(Some(75)); pctx.progress(Some(100)); pctx.end("end"); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Changes to src/lib.rs.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | //! A simple process semaphore. //! //! (Note: The word _process_ should be read as a sequence of operations, //! rather than an operating system process). //! //! The [`ProcSem`] is intended to allow mutual exclusion of a chain of //! operations that may span over several threads/tasks. //! //! This is much like a `Mutex`, but it differs in that it holds no generic //! parameter and the `ProcCtx` (the equivalent of `Mutex`'s `MutexGuard`) is //! `Send`, because it is explicitly meant to be passed around between //! threads/tasks. It supports blocking, nonblocking and async lock //! acquisition. use std::{ future::Future, num::NonZeroUsize, pin::Pin, sync::{ atomic::{AtomicUsize, Ordering}, Arc }, task::{Context, Poll, Waker} }; use parking_lot::{Condvar, Mutex}; use indexmap::IndexMap; /// Interface used to implement objects that are reponsible for sending process /// reports. pub trait StateReporter: Send + Sync { | > > > > > > > > > > > | > > | | > | | > > > > | | | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | //! A simple process semaphore. //! //! (Note: The word _process_ should be read as a sequence of operations, //! rather than an operating system process). //! //! The [`ProcSem`] is intended to allow mutual exclusion of a chain of //! operations that may span over several threads/tasks. //! //! This is much like a `Mutex`, but it differs in that it holds no generic //! parameter and the `ProcCtx` (the equivalent of `Mutex`'s `MutexGuard`) is //! `Send`, because it is explicitly meant to be passed around between //! threads/tasks. It supports blocking, nonblocking and async lock //! acquisition. //! //! # Progress reporting //! When a `ProcSem` is created, it can optionally be handed an object that //! implements [`StateReporter`]. If a `ProcSem` has a `StateReporter` //! associated with it, then an acquired `ProcCtx` can use //! [`ProcCtx::action()`], [`ProcCtx::progress()`] to pass progress information //! to the `StateReporter` implementor. //! //! Once a `ProcCtx` is about to terminate, [`ProcCtx::end()`] can be used to //! signal a final message that can be retrieved by the `ProcSem` object. use std::{ future::Future, num::NonZeroUsize, pin::Pin, sync::{ atomic::{AtomicUsize, Ordering}, Arc }, task::{Context, Poll, Waker} }; use parking_lot::{Condvar, Mutex}; use indexmap::IndexMap; /// Interface used to implement objects that are reponsible for sending process /// reports. #[allow(unused_variables)] pub trait StateReporter: Send + Sync { /// Called when a [`ProcCtx`] has been acquired. fn begin(&self) {} /// Called when [`ProcCtx::action()`] has been called to set a current /// action. fn action(&self, s: &str) {} /// The [`ProcCtx`] has progress has been made. fn progress(&self, percent: Option<u8>) {} /// Called when the [`ProcCtx`] is being dropped. /// /// This method will be called while the process ownership context still /// exists. If the `ProcCtx` is used as a resource lock, the application /// can rely on mutual exclusive access to the resource within this callback. fn end(&self, msg: Option<String>) {} } /// Shared data that must be protected behind mutually exclusive access. #[derive(Default)] struct Inner { /// Flag denoting whether the semaphore is currently owned or not. /// /// If this is `true` it means there's a `ProcCtx` object. If this is /// `false` there's no `ProcCtx` object for this semaphore. owned: bool, /// Collection of async task wakers waiting to acquire this lock. wakers: IndexMap<usize, Waker>, /// The last final message. endmsg: Option<String> |
︙ | ︙ | |||
71 72 73 74 75 76 77 78 79 80 81 82 | /// Representation of a process that may be active or inactive. #[repr(transparent)] #[derive(Default)] pub struct ProcSem(Arc<Shared>); impl ProcSem { /// Create a new process semaphore object. pub fn new() -> Self { Self::default() } /// Create a new process semaphore with a reporter object. | > | | > > > > > > > > > > > > > > > > > > > > | < > | 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 | /// Representation of a process that may be active or inactive. #[repr(transparent)] #[derive(Default)] pub struct ProcSem(Arc<Shared>); impl ProcSem { /// Create a new process semaphore object. #[must_use] pub fn new() -> Self { Self::default() } /// Create a new process semaphore with a reporter object. pub fn with_reporter(reporter: impl StateReporter + 'static) -> Self { Self(Arc::new(Shared { reporter: Some(Box::new(reporter)), ..Default::default() })) } /// Return a `Future` that will yield a [`ProcCtx`] when possible. /// /// On successful acquire the internal endmsg will be cleared. #[must_use] pub fn acquire_async(&self) -> AcquireFuture { AcquireFuture { shared: Arc::clone(&self.0), id: None } } /// Acquire a [`ProcCtx`], blocking until successful. /// /// On successful acquire the internal endmsg will be cleared. #[must_use] #[allow(clippy::significant_drop_tightening)] pub fn acquire_blocking(&self) -> ProcCtx { let mut g = self.0.inner.lock(); loop { if g.owned { // There's a Lock for this ProcSem. // Wait until the condvar is triggered so we can check again. self.0.signal.wait(&mut g); } else { // Lock and return a Lock object g.owned = true; g.endmsg = None; if let Some(ref r) = self.0.reporter { r.begin(); } break ProcCtx(Arc::clone(&self.0)); } } } /// Attempt to acquire [`ProcCtx`], returning immediately. /// /// Returns `Some(ProcCtx)` if the lock was acquired successfully. Returns /// `None` if the lock is already owned by another lock. /// /// On successful acquire the internal endmsg will be cleared. #[must_use] #[allow(clippy::significant_drop_tightening)] pub fn try_acquire(&self) -> Option<ProcCtx> { let mut g = self.0.inner.lock(); if g.owned { // Lock already acquired None } else { g.owned = true; g.endmsg = None; if let Some(ref r) = self.0.reporter { r.begin(); } Some(ProcCtx(Arc::clone(&self.0))) } } /// Get a clone of the internal end message. #[must_use] pub fn endmsg(&self) -> Option<String> { self.0.inner.lock().endmsg.clone() } } /// Future used to wait to acquire a [`ProcCtx`] in an async environment. pub struct AcquireFuture { shared: Arc<Shared>, id: Option<NonZeroUsize> } impl Future for AcquireFuture { |
︙ | ︙ | |||
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | Poll::Pending } else { // // Mark lock as owned and return a Lock object. // inner.owned = true; Poll::Ready(ProcCtx(Arc::clone(&self.shared))) } } } impl Drop for AcquireFuture { /// If this future has generated a `Waker`, then remove it. fn drop(&mut self) { if let Some(id) = self.id { let mut inner = self.shared.inner.lock(); // Remove this future's waker | > > > > > > | | < < < < < < < < | > > > > > > > | | > > > > | | | > > > > > > > > | | > | > > | < | 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 | Poll::Pending } else { // // Mark lock as owned and return a Lock object. // inner.owned = true; inner.endmsg = None; if let Some(ref r) = self.shared.reporter { r.begin(); } drop(inner); Poll::Ready(ProcCtx(Arc::clone(&self.shared))) } } } impl Drop for AcquireFuture { /// If this future has generated a `Waker`, then remove it. fn drop(&mut self) { if let Some(id) = self.id { let mut inner = self.shared.inner.lock(); // Remove this future's waker let _ = inner.wakers.swap_remove(&id.get()); } } } /// Object that denotes ownership of a [`ProcSem`] semaphore. /// /// When the process is complete this object must be dropped so the semaphore /// can be acquired again. #[repr(transparent)] pub struct ProcCtx(Arc<Shared>); impl ProcCtx { /// Provide a textual describe the current stage of the process. pub fn action(&self, text: &str) { if let Some(ref r) = self.0.reporter { r.action(text); } } /// Set the progress (in percent). pub fn progress(&self, percent: Option<u8>) { if let Some(ref r) = self.0.reporter { r.progress(percent); } } /// Set an end-state message. /// /// The purpose of the "end message" is to be able to leave a final "result" /// message that can be read by the `ProcSem`. /// /// Calling this function will not cause the [`StateReporter::end()`] to be /// called. However, if an end message has been set, a clone if it will be /// passed to the `StateReporter::end()` callback. /// /// The message will be stored in the internal [`ProcSem`] storage /// and can be retrieved using [`ProcSem::endmsg()`]. pub fn end(&self, text: &str) { let mut g = self.0.inner.lock(); g.endmsg = Some(text.to_string()); } } impl Drop for ProcCtx { #[allow(clippy::significant_drop_tightening)] fn drop(&mut self) { if let Some(ref r) = self.0.reporter { let g = self.0.inner.lock(); let endmsg = g.endmsg.clone(); drop(g); r.end(endmsg); } let mut g = self.0.inner.lock(); g.owned = false; // Wake up all waiting futures. // // In an ideal world only one would be woken up, but there are edge cases // when using multithreaded executors when an await is performed in a // select!{} in which a doomed future might be picked. In such a case the // "wake" would be wasted on a future that will not be able to process the // wake request. // // To avoid this, _all_ wakers are woken up and they'll have to reenter // pending state if applicable. for (_, w) in g.wakers.drain(..) { w.wake(); } // Wake a single blocking thread (blocked threads do not suffer from the // same issue). self.0.signal.notify_one(); } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Added tests/endmsg.rs.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | use procsem::ProcSem; #[test] fn endmsg() { let procsem = ProcSem::new(); let Some(pctx) = procsem.try_acquire() else { panic!("Unable to acquire process semaphore"); }; pctx.end("final"); drop(pctx); // // Should be "final" // assert_eq!(procsem.endmsg(), Some(String::from("final"))); let Some(_pctx) = procsem.try_acquire() else { panic!("Unable to acquire process semaphore"); }; // // Should be reset // assert_eq!(procsem.endmsg(), None); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : |
Added www/changelog.md.
> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | # Change Log ⚠️ indicates a breaking change. ## [Unreleased] [Details](/vdiff?from=procsem-0.2.0&to=trunk) ### Added ### Changed ### Removed --- ## [0.2.0] - 2024-11-26 [Details](/vdiff?from=procsem-0.1.0&to=procsem-0.2.0) ### Added - Actually call `StateReporter::begin()` when appropriate. ### Changed - ⚠️ Pass end message to `StateReporter::end()`, if it was set previously. - Internal endmsg is cleared on successful `ProcCtx` acquire. --- ## [0.1.0] - 2023-02-03 Initial release. |
Changes to www/index.md.
1 2 3 4 | # procsem An instantiated `ProcSem` object represents a potential sequence of operations that may be active or inactive, but can only be activated once at a time. | | > > > > > > > > > > > > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | # procsem An instantiated `ProcSem` object represents a potential sequence of operations that may be active or inactive, but can only be activated once at a time. When the `ProcSem` is acquired it will yield a `ProcCtx` object that represents ownership of the sequence of operations. The `ProcSem` will remain in this "owned" state until the `ProcCtx` is dropped. The idea is that a process, which may span several threads/tasks, passes the `ProcCtx` object around for as long as a process is running to prohibit another paralell instance to be launched. A use-case for procsem is running an update process on an embedded system, where a new firmware version needs to be uploaded to the device, then unpacked, verified and finally installed. A `ProcSem` can be used to protect an entire update sequence from launching more than once at a time. ## Change log The details of changes can always be found in the timeline, but for a high-level view of changes between released versions there's a manually maintained [Change Log](./changelog.md). ## Project Status _procsem_ is experimental. |