1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
|
-
+
+
+
+
+
+
+
+
+
+
+
-
+
+
-
+
+
+
+
-
-
-
+
+
+
+
+
-
-
+
+
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+
+
+
-
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
+
+
+
+
+
+
+
-
-
-
-
-
+
+
+
+
+
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
|
//! Representation of a temporary file that can be persisted.
//!
//! This is intended to be used in an application that needs to receive a file
//! to persistent storage, but doesn't know its final target file name ntil it
//! to persistent storage, but doesn't know its final target file name until it
//! has received the entire file (such as if the file's name should be its
//! content's hash).
//!
//! # Minimum size
//! In some cases an application may not want to store small files in its
//! filesystem based data store. For this purpose, the `TmpFile` can be set up
//! to have a minimum file size. If a `TmpFile` does not reach this size
//! before being persisted, a buffer of the file's contents will be returned
//! instead of a file name of the persisted file.
//!
//! The [`TmpFile::with_minsize()`] factory method can be used to use this
//! feature.
#![cfg_attr(docsrs, feature(doc_cfg))]
use std::{
fs,
io::Write,
path::{Path, PathBuf}
path::{Path, PathBuf},
time::{Duration, Instant}
};
/// Used to inspect content as it is being fed to the temporary file.
pub trait TmpProc {
type Output;
/// Called when a buffer is about to be written.
/// Called when a buffer has been written to the `TmpFile` storage.
fn update(&mut self, buf: &[u8]);
/// Called when the application has chosen to persist the file.
///
/// The role of this method is to:
/// - Return its application-specific data of the associated type `Output`.
/// - If `src` is `Some()` it means that the `TmpFile` is backed by a file,
/// The implementation should return the persistent location. The persistent
/// target location must reside within the same mount-point as the temporary
/// file's location.
/// and the implementation of this method should return, as the second
/// tuple member, `Some(PathBuf)`, pointing out the target file that the
/// temporary file should be persisted to. If `src` is `None` the
/// temporary buffer is not stored in the file system and thus `None`
/// should be returned instead.
fn finalize(
&mut self,
src: &Path
) -> Result<(Self::Output, PathBuf), std::io::Error>;
src: Option<&Path>
) -> Result<(Self::Output, Option<PathBuf>), std::io::Error>;
}
/// A [`TmpProc`] implementation which does nothing.
pub struct NullProc<'a>(&'a Path);
impl TmpProc for NullProc<'_> {
type Output = ();
#[allow(unused_variables)]
fn update(&mut self, buf: &[u8]) {}
#[allow(unused_variables)]
fn finalize(
&mut self,
src: &Path
) -> Result<(Self::Output, PathBuf), std::io::Error> {
Ok(((), self.0.to_path_buf()))
src: Option<&Path>
) -> Result<(Self::Output, Option<PathBuf>), std::io::Error> {
Ok(((), Some(self.0.to_path_buf())))
}
}
/// Temporary file contents container returned after successful persist.
#[derive(Debug)]
pub enum Output {
/// The temporary file's contents have been persisted to a file.
File(PathBuf),
/// The temporary file's contents weren't large enough to be written to disk
/// and are returned in this buffer.
///
/// This variant can only occur if a minimum size threshold has been set.
Buf(Vec<u8>)
}
impl Output {
pub fn try_into_fname(self) -> Result<PathBuf, Output> {
match self {
Self::File(fname) => Ok(fname),
r => Err(r)
}
}
pub fn unwrap_fname(self) -> PathBuf {
let Output::File(fname) = self else {
panic!("Not a file name");
};
fname
}
pub fn try_into_buf(self) -> Result<Vec<u8>, Output> {
match self {
Self::Buf(buf) => Ok(buf),
r => Err(r)
}
}
pub fn unwrap_buf(self) -> Vec<u8> {
let Output::Buf(buf) = self else {
panic!("Not a buffer");
};
buf
}
}
/// The final results of successfully persisting a [`TmpFile`].
#[non_exhaustive]
pub struct Persisted<T> {
/// `TmpFile` output.
///
/// If a minimum size has was set, this will be `Output::Buf()` if the size
/// is less than or equal to the minimum size. Otherwise it will be
/// `Output::File()` containing the file name of the persisted file.
///
/// If the persisted `TmpFile` did not have a minimum file size set, the
/// output can safely be unwrapped using [`Output::unwrap_fname()`].
pub output: Output,
/// The size of the content written to the [`TmpFile`].
pub size: u64,
/// The application-defined content processor output.
pub procres: T,
/// The amount of time that passed between initially requesting the
/// [`TmpFile`] writer and when it was finalized.
pub duration: Duration
}
struct MemBuf {
buf: Vec<u8>,
idx: usize
}
/// Temporary file.
/// Temporary file contents generator.
pub struct TmpFile<T> {
tmpfile: PathBuf,
f: Option<Box<dyn Write + Send>>,
tp: Box<dyn TmpProc<Output = T> + Send>,
size: u64,
start_time: Instant,
membuf: Option<MemBuf>,
#[cfg(feature = "defer-persist")]
sctx: Option<swctx::SetCtx<(T, PathBuf), (), std::io::Error>>
sctx: Option<swctx::SetCtx<Persisted<T>, (), std::io::Error>>
}
impl<T> TmpFile<T> {
fn inner_persist(&mut self) -> Result<(T, PathBuf), std::io::Error> {
// Force close file
fn inner_persist(&mut self) -> Result<Persisted<T>, std::io::Error> {
// Force close file, if open
if let Some(f) = self.f.take() {
drop(f);
}
let (output, t) = if let Some(ref mut membuf) = self.membuf {
let mut buf = std::mem::take(&mut membuf.buf);
buf.truncate(membuf.idx);
// Contents it stored in a memory buffer, so don't pass a path and do not
// expect a path in return.
let (t, _) = self.tp.finalize(None)?;
(Output::Buf(buf), t)
} else {
// Tell the content processor to finalize and return the file name of the
// persistent file.
let (t, outfile) = self.tp.finalize(&self.tmpfile)?;
// Tell the content processor to finalize and pass in the source
// temporary file, which should instruct finalize() to return the
// persisten location of the file.
let (t, outfile) = self.tp.finalize(Some(&self.tmpfile))?;
// ToDo: Either document this panic or return an error instead.
let outfile = outfile.expect("An output file was not specified.");
// Hard link temporary file to persistent file, unless the file exists
// already.
if !outfile.exists() {
fs::hard_link(&self.tmpfile, &outfile)?;
}
// Hard link temporary file to persistent file, unless the file exists
// already.
if !outfile.exists() {
fs::hard_link(&self.tmpfile, &outfile)?;
}
Ok((t, outfile))
}
}
(Output::File(outfile), t)
};
Ok(Persisted {
output,
size: self.size,
procres: t,
duration: Instant::now() - self.start_time
})
}
}
impl<T> TmpFile<T> {
/// Create a new [`TmpFile`].
pub fn new<P>(
fname: P,
tp: Box<dyn TmpProc<Output = T> + Send>
) -> Result<Self, std::io::Error>
where
P: AsRef<Path>
{
let tmpfile = fname.as_ref().to_path_buf();
let f = fs::File::create(&tmpfile)?;
let f = Box::new(f);
Ok(Self {
tmpfile,
f: Some(f),
tp,
size: 0,
start_time: Instant::now(),
membuf: None,
#[cfg(feature = "defer-persist")]
sctx: None
})
}
/// Create a new [`TmpFile`] that will not write to file unless the size
/// exceeds a specified size.
pub fn with_minsize<P>(
fname: P,
tp: Box<dyn TmpProc<Output = T> + Send>,
minsize: usize
) -> Result<Self, std::io::Error>
where
P: AsRef<Path>
{
let tmpfile = fname.as_ref().to_path_buf();
let f = fs::File::create(&tmpfile)?;
let f = Box::new(f);
let membuf = MemBuf {
buf: vec![0u8; minsize],
idx: 0
};
let membuf = Some(membuf);
Ok(Self {
tmpfile,
f: Some(f),
tp,
size: 0,
start_time: Instant::now(),
membuf,
#[cfg(feature = "defer-persist")]
sctx: None
})
}
/// Persist the hitherto temporary file.
///
/// The location of the persisted file will be determined by the [`TmpProc`]
/// object that was passed into [`TmpFile::new()`].
#[cfg_attr(
feature = "defer-persist",
doc = r#"
# Panic
If the `TmpFile` has previously registered to receive the finalization
results via a channel using [`TmpFile::defer_persist()`] this method will
cause a panic.
"#
)]
pub fn persist(mut self) -> Result<(T, PathBuf), std::io::Error> {
pub fn persist(mut self) -> Result<Persisted<T>, std::io::Error> {
#[cfg(feature = "defer-persist")]
if self.sctx.is_some() {
panic!(
"Con not persist TmpFile that has been configured for deferred \
persist"
);
}
|
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
|
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
|
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+
+
+
+
+
-
+
+
-
+
+
+
+
+
-
-
+
+
-
+
|
/// # Panic
/// This method must only be called once per `TmpFile` object. Calling it
/// a second time will cause a panic.
#[cfg(feature = "defer-persist")]
#[cfg_attr(docsrs, doc(cfg(feature = "defer-persist")))]
pub fn defer_persist(
&mut self
) -> swctx::WaitCtx<(T, PathBuf), (), std::io::Error> {
) -> swctx::WaitCtx<Persisted<T>, (), std::io::Error> {
if self.sctx.is_some() {
panic!("TmpFile already configured for deferred persist");
}
let (sctx, wctx) = swctx::mkpair();
self.sctx = Some(sctx);
wctx
}
/// Cancel a deferred persist request.
#[cfg(feature = "defer-persist")]
#[cfg_attr(docsrs, doc(cfg(feature = "defer-persist")))]
pub fn cancel(mut self) {
// Take out the SetCtx so the Drop handler doesn't attempt to
// finalize/persist.
let _ = self.sctx.take();
}
}
impl<T> Write for TmpFile<T> {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
// If there's a memory buffer, then append to it. Unless this write would
// overflow the membuf, in which case switch to using a file.
if let Some(ref mut membuf) = self.membuf {
if membuf.idx + buf.len() > membuf.buf.len() {
// Current write index + size of input buffer would exceed maximum
// buffer size.
// Open temporary file and transfer the _existing_ memory buffer to it
let f = fs::File::create(&self.tmpfile)?;
let mut f = Box::new(f);
if membuf.idx > 0 {
f.write_all(&membuf.buf[..membuf.idx])?;
}
// Store file handle in context
self.f = Some(f);
// Clear memory buffer
self.membuf = None;
} else {
// There's still room. Append to memory buffer.
membuf.buf[membuf.idx..(membuf.idx + buf.len())].copy_from_slice(buf);
// Move ahead write-pointer
membuf.idx += buf.len();
// Update total written size
self.size += buf.len() as u64;
// Update TmpProc
self.tp.update(buf);
self.tp.update(buf);
return Ok(buf.len());
}
}
let Some(ref mut f) = self.f else {
panic!("No file");
panic!("No file?");
};
f.write(buf)
let n = f.write(buf)?;
self.tp.update(&buf[..n]);
self.size += n as u64;
Ok(n)
}
fn flush(&mut self) -> Result<(), std::io::Error> {
let Some(ref mut f) = self.f else {
panic!("No file");
if let Some(ref mut f) = self.f {
f.flush()?;
};
f.flush()
Ok(())
}
}
impl<T> Drop for TmpFile<T> {
fn drop(&mut self) {
// Close file if it hasn't been already
if let Some(f) = self.f.take() {
|