procsem

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From procsem-0.1.0 To procsem-0.2.0

2024-11-26
00:13
Release maintenance. Leaf check-in: bc8642f540 user: jan tags: trunk, procsem-0.2.0
00:07
Add endmsg tests. check-in: 67c95505b5 user: jan tags: trunk
2023-11-05
17:47
Link to definitions on docs.rs. check-in: a4d2270686 user: jan tags: trunk
2023-02-03
11:07
Move code from old repo. check-in: a952e215e3 user: jan tags: trunk, procsem-0.1.0
10:06
initial empty check-in check-in: 3451c8e4ca user: jan tags: trunk

Changes to .efiles.

1
2


3
4


Cargo.toml
README.md


src/lib.rs
www/index.md




>
>

|
>
>
1
2
3
4
5
6
7
8
Cargo.toml
README.md
www/index.md
www/changelog.md
src/lib.rs
tests/endmsg.rs
examples/simple.rs
examples/threaded.rs

Changes to Cargo.toml.

1
2
3
4
5


6
7
8
9
10
11
12

13

14
15
16




17
18
19









20
[package]
name = "procsem"
version = "0.1.0"
edition = "2021"
license = "0BSD"


keywords = [ "locking" ]
repository = "https://repos.qrnch.tech/pub/procsem"
description = "Semaphore used to lock thread/task spanning sequence of operations"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",

  "rustfmt.toml",

  "www"
]





[dependencies]
indexmap = { version = "1.9.2" }
parking_lot = { version = "0.12.1" }












|


>
>
|

|




>

>



>
>
>
>

|
|
>
>
>
>
>
>
>
>
>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
[package]
name = "procsem"
version = "0.2.0"
edition = "2021"
license = "0BSD"
# https://crates.io/category_slugs
categories = ["asynchronous"]
keywords = ["locking"]
repository = "https://repos.qrnch.tech/pub/procsem"
description = "Semaphore used to lock thread/task-spanning sequence of operations"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "bacon.toml",
  "rustfmt.toml",
  "examples",
  "www"
]

# https://doc.rust-lang.org/cargo/reference/manifest.html#the-badges-section
[badges]
maintenance = { status = "experimental" }

[dependencies]
indexmap = { version = "2.6.0" }
parking_lot = { version = "0.12.3" }

[package.metadata.docs.rs]
rustdoc-args = ["--generate-link-to-definition"]

[lints.clippy]
all = { level = "warn", priority = -1 }
pedantic = { level = "warn", priority = -1 }
nursery = { level = "warn", priority = -1 }
cargo = { level = "warn", priority = -1 }

Added bacon.toml.































































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# This is a configuration file for the bacon tool
#
# Complete help on configuration: https://dystroy.org/bacon/config/
# 
# You may check the current default at
#   https://github.com/Canop/bacon/blob/main/defaults/default-bacon.toml

default_job = "clippy-all"
env.CARGO_TERM_COLOR = "always"

[jobs.check]
command = ["cargo", "check"]
need_stdout = false

[jobs.check-all]
command = ["cargo", "check", "--all-targets"]
need_stdout = false

# Run clippy on the default target
[jobs.clippy]
command = ["cargo", "clippy"]
need_stdout = false

# Run clippy on all targets
# To disable some lints, you may change the job this way:
#    [jobs.clippy-all]
#    command = [
#        "cargo", "clippy",
#        "--all-targets",
#    	 "--",
#    	 "-A", "clippy::bool_to_int_with_if",
#    	 "-A", "clippy::collapsible_if",
#    	 "-A", "clippy::derive_partial_eq_without_eq",
#    ]
# need_stdout = false
[jobs.clippy-all]
command = ["cargo", "clippy", "--all-targets"]
need_stdout = false

# This job lets you run
# - all tests: bacon test
# - a specific test: bacon test -- config::test_default_files
# - the tests of a package: bacon test -- -- -p config
[jobs.test]
command = ["cargo", "test"]
need_stdout = true

[jobs.nextest]
command = [
    "cargo", "nextest", "run",
    "--hide-progress-bar", "--failure-output", "final"
]
need_stdout = true
analyzer = "nextest"

[jobs.doc]
command = ["cargo", "doc", "--no-deps"]
need_stdout = false

# If the doc compiles, then it opens in your browser and bacon switches
# to the previous job
[jobs.doc-open]
command = ["cargo", "doc", "--no-deps", "--open"]
need_stdout = false
on_success = "back" # so that we don't open the browser at each change

# You can run your application and have the result displayed in bacon,
# if it makes sense for this crate.
[jobs.run]
command = [
    "cargo", "run",
    # put launch parameters for your program behind a `--` separator
]
need_stdout = true
allow_warnings = true
background = true

# Run your long-running application (eg server) and have the result displayed in bacon.
# For programs that never stop (eg a server), `background` is set to false
# to have the cargo run output immediately displayed instead of waiting for
# program's end.
# 'on_change_strategy' is set to `kill_then_restart` to have your program restart
# on every change (an alternative would be to use the 'F5' key manually in bacon).
# If you often use this job, it makes sense to override the 'r' key by adding
# a binding `r = job:run-long` at the end of this file .
[jobs.run-long]
command = [
    "cargo", "run",
    # put launch parameters for your program behind a `--` separator
]
need_stdout = true
allow_warnings = true
background = false
on_change_strategy = "kill_then_restart"

# This parameterized job runs the example of your choice, as soon
# as the code compiles.
# Call it as
#    bacon ex -- my-example
[jobs.ex]
command = ["cargo", "run", "--example"]
need_stdout = true
allow_warnings = true

# You may define here keybindings that would be specific to
# a project, for example a shortcut to launch a specific job.
# Shortcuts to internal functions (scrolling, toggling, etc.)
# should go in your personal global prefs.toml file instead.
[keybindings]
# alt-m = "job:my-job"
c = "job:clippy-all" # comment this to have 'c' run clippy on only the default target

Added examples/simple.rs.





































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
use procsem::{ProcCtx, ProcSem, StateReporter};

#[derive(Default)]
struct Reporter {}

impl StateReporter for Reporter {
  fn begin(&self) {
    println!("==> State update: begin");
  }

  fn action(&self, s: &str) {
    println!("==> State update: action({s})");
  }

  fn progress(&self, percent: Option<u8>) {
    println!("==> State update: progress({percent:?})");
  }

  fn end(&self, endmsg: Option<String>) {
    println!("==> State update: end({endmsg:?})");
  }
}


fn main() {
  //
  // Create the callback receiver
  //
  let reporter = Reporter::default();

  //
  // Create the process semaphore, registering with it the reporter callback
  //
  let procsem = ProcSem::with_reporter(reporter);

  //
  // Acquire process semaphore
  //
  let Some(pctx) = procsem.try_acquire() else {
    panic!("Unable to acquire process semaphore");
  };

  //
  // Fail to process semaphore
  //
  assert!(procsem.try_acquire().is_none());


  run_process(pctx);
}

#[allow(clippy::needless_pass_by_value)]
fn run_process(pctx: ProcCtx) {
  pctx.action("action");

  pctx.progress(None);
  pctx.progress(Some(25));
  pctx.action("half-way");
  pctx.progress(Some(50));
  pctx.progress(Some(75));
  pctx.progress(Some(100));

  pctx.end("end");
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Added examples/threaded.rs.





















































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
use std::thread;

use procsem::{ProcCtx, ProcSem, StateReporter};

#[derive(Default)]
struct Reporter {}

impl StateReporter for Reporter {
  fn begin(&self) {
    println!("==> State update: begin");
  }

  fn action(&self, s: &str) {
    println!("==> State update: action({s})");
  }

  fn progress(&self, percent: Option<u8>) {
    println!("==> State update: progress({percent:?})");
  }

  fn end(&self, endmsg: Option<String>) {
    println!("==> State update: end({endmsg:?})");
  }
}


fn main() {
  //
  // Create the callback receiver
  //
  let reporter = Reporter::default();

  //
  // Create the process semaphore, registering with it the reporter callback
  //
  let procsem = ProcSem::with_reporter(reporter);

  //
  // Acquire process semaphore
  //
  let Some(pctx) = procsem.try_acquire() else {
    panic!("Unable to acquire process semaphore");
  };

  //
  // Fail to process semaphore
  //
  assert!(procsem.try_acquire().is_none());

  //
  // Pass ProcCtx to a different thread.
  //
  let jh = thread::spawn(move || {
    run_process(pctx);
  });

  let _ = jh.join();
}

#[allow(clippy::needless_pass_by_value)]
fn run_process(pctx: ProcCtx) {
  pctx.action("action");

  pctx.progress(None);
  pctx.progress(Some(25));
  pctx.action("half-way");
  pctx.progress(Some(50));
  pctx.progress(Some(75));
  pctx.progress(Some(100));

  pctx.end("end");
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/lib.rs.

1
2
3
4
5
6
7
8
9
10
11
12
13










14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

33
34
35
36


37
38

39
40
41




42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//! A simple process semaphore.
//!
//! (Note: The word _process_ should be read as a sequence of operations,
//! rather than an operating system process).
//!
//! The [`ProcSem`] is intended to allow mutual exclusion of a chain of
//! operations that may span over several threads/tasks.
//!
//! This is much like a `Mutex`, but it differs in that it holds no generic
//! parameter and the `ProcCtx` (the equivalent of `Mutex`'s `MutexGuard`) is
//! `Send`, because it is explicitly meant to be passed around between
//! threads/tasks.  It supports blocking, nonblocking and async lock
//! acquisition.











use std::{
  future::Future,
  num::NonZeroUsize,
  pin::Pin,
  sync::{
    atomic::{AtomicUsize, Ordering},
    Arc
  },
  task::{Context, Poll, Waker}
};

use parking_lot::{Condvar, Mutex};

use indexmap::IndexMap;


/// Interface used to implement objects that are reponsible for sending process
/// reports.

pub trait StateReporter: Send + Sync {
  /// A new [`ProcCtx`] has been acquired.
  fn begin(&self) {}



  fn action(&self, _s: &str) {}
  fn reset_action(&self, _s: &str) {}

  fn progress(&self, _percent: Option<u8>) {}

  /// The [`ProcCtx`] is being dropped.




  fn end(&self) {}
}


/// Shared data that must be protected behind mutually exclusive access.
#[derive(Default)]
struct Inner {
  /// Flag denoting whether the semaphore is currently owned or not.
  ///
  /// If this is `true` it means there's a `Lock` object.  If this is `false`
  /// there's no `Lock` object for this semaphore.
  owned: bool,

  /// Collection of async task wakers waiting to acquire this lock.
  wakers: IndexMap<usize, Waker>,

  /// The last final message.
  endmsg: Option<String>













>
>
>
>
>
>
>
>
>
>



















>

|


>
>
|
|
>
|

|
>
>
>
>
|








|
|







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
//! A simple process semaphore.
//!
//! (Note: The word _process_ should be read as a sequence of operations,
//! rather than an operating system process).
//!
//! The [`ProcSem`] is intended to allow mutual exclusion of a chain of
//! operations that may span over several threads/tasks.
//!
//! This is much like a `Mutex`, but it differs in that it holds no generic
//! parameter and the `ProcCtx` (the equivalent of `Mutex`'s `MutexGuard`) is
//! `Send`, because it is explicitly meant to be passed around between
//! threads/tasks.  It supports blocking, nonblocking and async lock
//! acquisition.
//!
//! # Progress reporting
//! When a `ProcSem` is created, it can optionally be handed an object that
//! implements [`StateReporter`].  If a `ProcSem` has a `StateReporter`
//! associated with it, then an acquired `ProcCtx` can use
//! [`ProcCtx::action()`], [`ProcCtx::progress()`] to pass progress information
//! to the `StateReporter` implementor.
//!
//! Once a `ProcCtx` is about to terminate, [`ProcCtx::end()`] can be used to
//! signal a final message that can be retrieved by the `ProcSem` object.

use std::{
  future::Future,
  num::NonZeroUsize,
  pin::Pin,
  sync::{
    atomic::{AtomicUsize, Ordering},
    Arc
  },
  task::{Context, Poll, Waker}
};

use parking_lot::{Condvar, Mutex};

use indexmap::IndexMap;


/// Interface used to implement objects that are reponsible for sending process
/// reports.
#[allow(unused_variables)]
pub trait StateReporter: Send + Sync {
  /// Called when a [`ProcCtx`] has been acquired.
  fn begin(&self) {}

  /// Called when [`ProcCtx::action()`] has been called to set a current
  /// action.
  fn action(&self, s: &str) {}

  /// The [`ProcCtx`] has progress has been made.
  fn progress(&self, percent: Option<u8>) {}

  /// Called when the [`ProcCtx`] is being dropped.
  ///
  /// This method will be called while the process ownership context still
  /// exists.  If the `ProcCtx` is used as a resource lock, the application
  /// can rely on mutual exclusive access to the resource within this callback.
  fn end(&self, msg: Option<String>) {}
}


/// Shared data that must be protected behind mutually exclusive access.
#[derive(Default)]
struct Inner {
  /// Flag denoting whether the semaphore is currently owned or not.
  ///
  /// If this is `true` it means there's a `ProcCtx` object.  If this is
  /// `false` there's no `ProcCtx` object for this semaphore.
  owned: bool,

  /// Collection of async task wakers waiting to acquire this lock.
  wakers: IndexMap<usize, Waker>,

  /// The last final message.
  endmsg: Option<String>
71
72
73
74
75
76
77

78
79
80
81
82
83
84
85
86
87
88
89
90



91
92
93
94
95
96
97
98




99
100
101
102
103
104
105
106
107
108




109
110
111
112
113
114
115
116
117




118
119
120
121
122
123
124




125
126
127
128
129

130
131
132
133
134
135
136

137
138
139
140
141
142
143
/// Representation of a process that may be active or inactive.
#[repr(transparent)]
#[derive(Default)]
pub struct ProcSem(Arc<Shared>);

impl ProcSem {
  /// Create a new process semaphore object.

  pub fn new() -> Self {
    Self::default()
  }

  /// Create a new process semaphore with a reporter object.
  pub fn with_reporter(reporter: Box<dyn StateReporter>) -> Self {
    Self(Arc::new(Shared {
      reporter: Some(reporter),
      ..Default::default()
    }))
  }

  /// Return a `Future` that will yield a [`ProcCtx`] when possible.



  pub fn acquire_async(&self) -> AcquireFuture {
    AcquireFuture {
      shared: Arc::clone(&self.0),
      id: None
    }
  }

  /// Acquire a [`ProcCtx`], blocking until successful.




  pub fn acquire_blocking(&self) -> ProcCtx {
    let mut g = self.0.inner.lock();
    loop {
      if g.owned {
        // There's a Lock for this ProcSem.
        // Wait until the condvar is triggered so we can check again.
        self.0.signal.wait(&mut g);
      } else {
        // Lock and return a Lock object
        g.owned = true;




        break ProcCtx(Arc::clone(&self.0));
      }
    }
  }

  /// Attempt to acquire [`ProcCtx`], returning immediately.
  ///
  /// Returns `Some(ProcCtx)` if the lock was acquired successfully.  Returns
  /// `None` if the lock is already owned by another lock.




  pub fn try_acquire(&self) -> Option<ProcCtx> {
    let mut g = self.0.inner.lock();
    if g.owned {
      // Lock already acquired
      None
    } else {
      g.owned = true;




      Some(ProcCtx(Arc::clone(&self.0)))
    }
  }

  /// Get a clone of the internal end message.

  pub fn endmsg(&self) -> Option<String> {
    let g = self.0.inner.lock();
    g.endmsg.clone()
  }
}



pub struct AcquireFuture {
  shared: Arc<Shared>,

  id: Option<NonZeroUsize>
}

impl Future for AcquireFuture {







>





|

|





>
>
>








>
>
>
>










>
>
>
>









>
>
>
>







>
>
>
>





>

|
<




>







89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170

171
172
173
174
175
176
177
178
179
180
181
182
/// Representation of a process that may be active or inactive.
#[repr(transparent)]
#[derive(Default)]
pub struct ProcSem(Arc<Shared>);

impl ProcSem {
  /// Create a new process semaphore object.
  #[must_use]
  pub fn new() -> Self {
    Self::default()
  }

  /// Create a new process semaphore with a reporter object.
  pub fn with_reporter(reporter: impl StateReporter + 'static) -> Self {
    Self(Arc::new(Shared {
      reporter: Some(Box::new(reporter)),
      ..Default::default()
    }))
  }

  /// Return a `Future` that will yield a [`ProcCtx`] when possible.
  ///
  /// On successful acquire the internal endmsg will be cleared.
  #[must_use]
  pub fn acquire_async(&self) -> AcquireFuture {
    AcquireFuture {
      shared: Arc::clone(&self.0),
      id: None
    }
  }

  /// Acquire a [`ProcCtx`], blocking until successful.
  ///
  /// On successful acquire the internal endmsg will be cleared.
  #[must_use]
  #[allow(clippy::significant_drop_tightening)]
  pub fn acquire_blocking(&self) -> ProcCtx {
    let mut g = self.0.inner.lock();
    loop {
      if g.owned {
        // There's a Lock for this ProcSem.
        // Wait until the condvar is triggered so we can check again.
        self.0.signal.wait(&mut g);
      } else {
        // Lock and return a Lock object
        g.owned = true;
        g.endmsg = None;
        if let Some(ref r) = self.0.reporter {
          r.begin();
        }
        break ProcCtx(Arc::clone(&self.0));
      }
    }
  }

  /// Attempt to acquire [`ProcCtx`], returning immediately.
  ///
  /// Returns `Some(ProcCtx)` if the lock was acquired successfully.  Returns
  /// `None` if the lock is already owned by another lock.
  ///
  /// On successful acquire the internal endmsg will be cleared.
  #[must_use]
  #[allow(clippy::significant_drop_tightening)]
  pub fn try_acquire(&self) -> Option<ProcCtx> {
    let mut g = self.0.inner.lock();
    if g.owned {
      // Lock already acquired
      None
    } else {
      g.owned = true;
      g.endmsg = None;
      if let Some(ref r) = self.0.reporter {
        r.begin();
      }
      Some(ProcCtx(Arc::clone(&self.0)))
    }
  }

  /// Get a clone of the internal end message.
  #[must_use]
  pub fn endmsg(&self) -> Option<String> {
    self.0.inner.lock().endmsg.clone()

  }
}


/// Future used to wait to acquire a [`ProcCtx`] in an async environment.
pub struct AcquireFuture {
  shared: Arc<Shared>,

  id: Option<NonZeroUsize>
}

impl Future for AcquireFuture {
170
171
172
173
174
175
176






177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226







227
228
229
230
231
232
233
234
235

236
237



238
239
240
241
242
243
244
245








246
247

248


249
250
251
252
253
254

      Poll::Pending
    } else {
      //
      // Mark lock as owned and return a Lock object.
      //
      inner.owned = true;






      Poll::Ready(ProcCtx(Arc::clone(&self.shared)))
    }
  }
}

impl Drop for AcquireFuture {
  /// If this future has generated a `Waker`, then remove it.
  fn drop(&mut self) {
    if let Some(id) = self.id {
      let mut inner = self.shared.inner.lock();
      // Remove this future's waker
      let _ = inner.wakers.remove(&id.get());
    }
  }
}


/// Object that denotes ownership of a [`ProcSem`] semaphore.
///
/// When the process is complete this object must be dropped so the semaphore
/// can be acquired again.
#[repr(transparent)]
pub struct ProcCtx(Arc<Shared>);


impl ProcCtx {
  /// Describe the current stage in the process.
  ///
  /// Setting this will clear the progress.
  pub fn action(&self, text: &str) {
    if let Some(ref r) = self.0.reporter {
      r.action(text);
    }
  }

  pub fn reset_action(&self, text: &str) {
    if let Some(ref r) = self.0.reporter {
      r.reset_action(text);
    }
  }

  /// Set the progress (in percent) of the current stage.
  pub fn progress(&self, percent: Option<u8>) {
    if let Some(ref r) = self.0.reporter {
      r.progress(percent);
    }
  }

  /// Set an end-state message.
  ///







  /// This message will be stored in the shared state, which means it can be
  /// queried through the [`ProcSem`]
  pub fn end(&self, text: &str) {
    let mut g = self.0.inner.lock();
    g.endmsg = Some(text.to_string());
  }
}

impl Drop for ProcCtx {

  fn drop(&mut self) {
    if let Some(ref r) = self.0.reporter {



      r.end();
    }

    let mut g = self.0.inner.lock();
    g.owned = false;

    // If there are any wakers waiting, then extract one and trigger it.
    // Otherwise signal the condvar.








    if let Some((_, n)) = g.wakers.pop() {
      n.wake();

    } else {


      self.0.signal.notify_one();
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :







>
>
>
>
>
>











|














|
<
<






<
<
<
<
<
<
|








>
>
>
>
>
>
>
|
|







>


>
>
>
|





|
|
>
>
>
>
>
>
>
>
|
|
>
|
>
>
|
<




209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248


249
250
251
252
253
254






255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308

309
310
311
312

      Poll::Pending
    } else {
      //
      // Mark lock as owned and return a Lock object.
      //
      inner.owned = true;
      inner.endmsg = None;
      if let Some(ref r) = self.shared.reporter {
        r.begin();
      }

      drop(inner);
      Poll::Ready(ProcCtx(Arc::clone(&self.shared)))
    }
  }
}

impl Drop for AcquireFuture {
  /// If this future has generated a `Waker`, then remove it.
  fn drop(&mut self) {
    if let Some(id) = self.id {
      let mut inner = self.shared.inner.lock();
      // Remove this future's waker
      let _ = inner.wakers.swap_remove(&id.get());
    }
  }
}


/// Object that denotes ownership of a [`ProcSem`] semaphore.
///
/// When the process is complete this object must be dropped so the semaphore
/// can be acquired again.
#[repr(transparent)]
pub struct ProcCtx(Arc<Shared>);


impl ProcCtx {
  /// Provide a textual describe the current stage of the process.


  pub fn action(&self, text: &str) {
    if let Some(ref r) = self.0.reporter {
      r.action(text);
    }
  }







  /// Set the progress (in percent).
  pub fn progress(&self, percent: Option<u8>) {
    if let Some(ref r) = self.0.reporter {
      r.progress(percent);
    }
  }

  /// Set an end-state message.
  ///
  /// The purpose of the "end message" is to be able to leave a final "result"
  /// message that can be read by the `ProcSem`.
  ///
  /// Calling this function will not cause the [`StateReporter::end()`] to be
  /// called.  However, if an end message has been set, a clone if it will be
  /// passed to the `StateReporter::end()` callback.
  ///
  /// The message will be stored in the internal [`ProcSem`] storage
  /// and can be retrieved using [`ProcSem::endmsg()`].
  pub fn end(&self, text: &str) {
    let mut g = self.0.inner.lock();
    g.endmsg = Some(text.to_string());
  }
}

impl Drop for ProcCtx {
  #[allow(clippy::significant_drop_tightening)]
  fn drop(&mut self) {
    if let Some(ref r) = self.0.reporter {
      let g = self.0.inner.lock();
      let endmsg = g.endmsg.clone();
      drop(g);
      r.end(endmsg);
    }

    let mut g = self.0.inner.lock();
    g.owned = false;

    // Wake up all waiting futures.
    //
    // In an ideal world only one would be woken up, but there are edge cases
    // when using multithreaded executors when an await is performed in a
    // select!{} in which a doomed future might be picked.  In such a case the
    // "wake" would be wasted on a future that will not be able to process the
    // wake request.
    //
    // To avoid this, _all_ wakers are woken up and they'll have to reenter
    // pending state if applicable.
    for (_, w) in g.wakers.drain(..) {
      w.wake();
    }

    // Wake a single blocking thread (blocked threads do not suffer from the
    // same issue).
    self.0.signal.notify_one();

  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Added tests/endmsg.rs.





























































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
use procsem::ProcSem;

#[test]
fn endmsg() {
  let procsem = ProcSem::new();

  let Some(pctx) = procsem.try_acquire() else {
    panic!("Unable to acquire process semaphore");
  };

  pctx.end("final");

  drop(pctx);

  //
  // Should be "final"
  //
  assert_eq!(procsem.endmsg(), Some(String::from("final")));

  let Some(_pctx) = procsem.try_acquire() else {
    panic!("Unable to acquire process semaphore");
  };

  //
  // Should be reset
  //
  assert_eq!(procsem.endmsg(), None);
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Added www/changelog.md.







































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# Change Log

⚠️  indicates a breaking change.

## [Unreleased]

[Details](/vdiff?from=procsem-0.2.0&to=trunk)

### Added

### Changed

### Removed

---

## [0.2.0] - 2024-11-26

[Details](/vdiff?from=procsem-0.1.0&to=procsem-0.2.0)

### Added

- Actually call `StateReporter::begin()` when appropriate.

### Changed

- ⚠️ Pass end message to `StateReporter::end()`, if it was set previously.
- Internal endmsg is cleared on successful `ProcCtx` acquire.

---

## [0.1.0] - 2023-02-03

Initial release.

Changes to www/index.md.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17












# procsem

An instantiated `ProcSem` object represents a potential sequence of operations
that may be active or inactive, but can only be activated once at a time.
When the ProcSem is acquired it will generate a `ProcCtx` object that
represents ownership of the sequence of operations.  The `ProcSem` will remain
in this "owned" state until the `ProcCtx` is dropped.

The idea is that a process, which may span several threads/tasks, passes the
`ProcCtx` object around for as long as a process is running to prohibit another
paralell instance to be launched.

A use-case for procsem is running an update process on an embedded system,
where a new firmware version needs to be uploaded to the device, then unpacked,
verified and finally installed.  A `ProcSem` can be used to protect an entire
update sequence from launching more than once at a time.

















|












>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# procsem

An instantiated `ProcSem` object represents a potential sequence of operations
that may be active or inactive, but can only be activated once at a time.
When the `ProcSem` is acquired it will yield a `ProcCtx` object that
represents ownership of the sequence of operations.  The `ProcSem` will remain
in this "owned" state until the `ProcCtx` is dropped.

The idea is that a process, which may span several threads/tasks, passes the
`ProcCtx` object around for as long as a process is running to prohibit another
paralell instance to be launched.

A use-case for procsem is running an update process on an embedded system,
where a new firmware version needs to be uploaded to the device, then unpacked,
verified and finally installed.  A `ProcSem` can be used to protect an entire
update sequence from launching more than once at a time.


## Change log

The details of changes can always be found in the timeline, but for a
high-level view of changes between released versions there's a manually
maintained [Change Log](./changelog.md).


## Project Status

_procsem_ is experimental.