orphanage

Check-in Differences
Login

Check-in Differences

Many hyperlinks are disabled.
Use anonymous login to enable hyperlinks.

Difference From orphanage-0.0.1 To orphanage-0.0.2

2024-02-13
17:03
Update changelog. check-in: 64be17d2ab user: jan tags: trunk
17:00
Code normalization. check-in: 82d0861bef user: jan tags: orphanage-0.0.2, trunk
16:55
Move SimpleTcpConnector to its own submodule to clean up tokiox a little. check-in: 04fac19c77 user: jan tags: trunk
2024-02-10
13:42
Add iox module and some means to generate random files. check-in: 16ff2e5438 user: jan tags: trunk
12:00
Migrate from old repo. check-in: e089a3c4a4 user: jan tags: orphanage-0.0.1, trunk
09:12
initial empty check-in check-in: 5b398602d3 user: jan tags: trunk

Changes to .efiles.

1
2
3
4
5
6
7
8
9
10
11
12



Cargo.toml
README.md
www/index.md
www/changelog.md
src/err.rs
src/lib.rs
src/path.rs
src/fs.rs
src/strx.rs
src/buf.rs
src/sqlfuncs.rs
src/futures.rs















>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Cargo.toml
README.md
www/index.md
www/changelog.md
src/err.rs
src/lib.rs
src/path.rs
src/fs.rs
src/strx.rs
src/buf.rs
src/sqlfuncs.rs
src/futures.rs
src/iox.rs
src/tokiox.rs
src/tokiox/tcpconn.rs

Changes to Cargo.toml.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17




18


19
20
21
22



23




[package]
name = "orphanage"
version = "0.0.1"
edition = "2021"
license = "0BSD"
keywords = [ "sqlite", "fs", "path" ]
repository = "https://repos.qrnch.tech/pub/orphanage"
description = "Random collection of stuff that is still searching for a home."
rust-version = "1.64"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "www",
  "rustfmt.toml"
]





[dependencies]


rand = { version = "0.8.5" }
rusqlite = { version = "0.30.0", features = ["functions"] }
sha2 = { version = "0.10.7" }
shellexpand = { version = "3.1.0" }










|














>
>
>
>

>
>

|
|

>
>
>

>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
[package]
name = "orphanage"
version = "0.0.2"
edition = "2021"
license = "0BSD"
keywords = [ "sqlite", "fs", "path" ]
repository = "https://repos.qrnch.tech/pub/orphanage"
description = "Random collection of stuff that is still searching for a home."
rust-version = "1.64"
exclude = [
  ".fossil-settings",
  ".efiles",
  ".fslckout",
  "www",
  "rustfmt.toml"
]

[features]
tokio = ["dep:tokio", "dep:async-trait", "dep:killswitch"]
rusqlite = ["dep:rusqlite", "dep:sha2"]

[dependencies]
async-trait = { version = "0.1.77", optional = true }
killswitch = { version = "0.4.2", optional = true }
rand = { version = "0.8.5" }
rusqlite = { version = "0.30.0", optional = true, features = ["functions"] }
sha2 = { version = "0.10.7", optional = true }
shellexpand = { version = "3.1.0" }
tokio = { version = "1.36.0", optional = true, features = [
  "macros","net", "time"
] }

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"]

Changes to src/fs.rs.



1

2
3
4









5
6
7
8
9
10
11


use std::path::{Path, PathBuf};


use crate::err::Error;











/// Expand a string directory, make it absolute and create it (if it does not
/// exist).
pub fn gen_absdir(input: impl AsRef<str>) -> Result<PathBuf, Error> {
  let pth = super::path::expabs(input)?;

  if !pth.exists() {
>
>
|
>



>
>
>
>
>
>
>
>
>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use std::{
  fs,
  path::{Path, PathBuf}
};

use crate::err::Error;

/// Create a random file of a specified size.
pub fn rndfile(
  fname: impl AsRef<Path>,
  size: u64
) -> Result<u64, std::io::Error> {
  let mut i = super::iox::RngReader::with_lim(size);
  let mut o = fs::File::create(fname)?;
  std::io::copy(&mut i, &mut o)
}

/// Expand a string directory, make it absolute and create it (if it does not
/// exist).
pub fn gen_absdir(input: impl AsRef<str>) -> Result<PathBuf, Error> {
  let pth = super::path::expabs(input)?;

  if !pth.exists() {

Added src/iox.rs.















































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
use rand::Rng;

/// An object that implements [`std::io::Read`] which returns random data.
#[derive(Default)]
pub struct RngReader(Option<u64>);

impl RngReader {
  /// Create an `RngReader` that will keep on yielding random data infinitely.
  pub fn new() -> Self {
    Self::default()
  }

  /// Create an `RngReader` that will return a specified amount of random data,
  /// after which the reader will return eof.
  pub fn with_lim(size: u64) -> Self {
    Self(Some(size))
  }
}

impl std::io::Read for RngReader {
  fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
    if let Some(ref mut remain) = self.0 {
      if *remain == 0 {
        // signal eof
        Ok(0)
      } else {
        let n = std::cmp::min(*remain, buf.len() as u64);
        rand::thread_rng().fill(&mut buf[..(n as usize)]);
        *remain -= n;
        Ok(n as usize)
      }
    } else {
      rand::thread_rng().fill(&mut buf[..]);
      Ok(buf.len())
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to src/lib.rs.



1
2
3
4

5





6



7
8




9
10
11


pub mod buf;
mod err;
pub mod fs;
pub mod futures;

pub mod path;





pub mod sqlfuncs;



pub mod strx;





pub use err::Error;

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :
>
>




>

>
>
>
>
>

>
>
>
|

>
>
>
>



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#![cfg_attr(docsrs, feature(doc_cfg))]

pub mod buf;
mod err;
pub mod fs;
pub mod futures;
pub mod iox;
pub mod path;
pub mod strx;


#[cfg(feature = "rusqlite")]
#[cfg_attr(docsrs, doc(cfg(feature = "rusqlite")))]
pub mod sqlfuncs;

#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub mod tokiox;

#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub use async_trait::async_trait;

pub use err::Error;

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Added src/tokiox.rs.















































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
pub mod tcpconn;

use std::ops::ControlFlow;

use async_trait::async_trait;

use tokio::io::{AsyncRead, AsyncWrite};

pub use tcpconn::SimpleTcpConnector;

pub enum Error<E> {
  /// An application callback returned an error.
  App(E),

  /// The application callback chose not to retry.
  ///
  /// Could mean that the maximum number of retries have been eached.  The
  /// last application error is returned in E.
  Exhausted(E)
}

pub enum ConnResult<C, E> {
  Connected(C),

  /// The connection was aborted by external means.
  Aborted,

  /// An error occurred, but the connector is recommended to delay and then
  /// try to connect again.
  RetryableError(E),

  /// An error occurred, and the error is not considered to be retryable.
  FatalError(E)
}

/// Application callbacks for the [`run_connector()`] function.
#[async_trait]
pub trait Connector {
  type ConnType: AsyncRead + AsyncWrite + Unpin;
  type Error;

  /// Establish a connection to the listener.
  async fn connect(&mut self) -> ConnResult<Self::ConnType, Self::Error>;

  /// Returns `true` if the connector should attempt to reconnect.  Returns
  /// `false` otherwise.
  ///
  /// The application can use this callback to implemement a maximum number of
  /// retries.
  fn retry(&mut self) -> bool;

  /// Call application to delay before next reconnection attempt.
  ///
  /// This method should only be called if [`Connector::retry()`] return true.
  ///
  /// If this returns `ControlFlow::Continue(())` the connector will attempt to
  /// reconnect.  `ControlFlow::Break(_)` will cause the connector to abort.
  async fn delay(&mut self) -> ControlFlow<(), ()>;

  /// Run the application's connection handler.
  ///
  /// The application should return `ControlFlow::Continue(_)` to request the
  /// connector to delay and reconnect.
  async fn run(
    &mut self,
    conn: Self::ConnType
  ) -> ControlFlow<Result<(), Self::Error>, Result<(), Self::Error>>;
}


/// Establish a network connection with a built-in retry loop.
pub async fn run_connector<E>(
  mut connector: impl Connector<Error = E>
) -> Result<(), Error<E>> {
  loop {
    match connector.connect().await {
      ConnResult::Connected(conn) => {
        // Connection was successful -- run the server application.
        //
        // The return value can indicate that the connector should reconnect or
        // abort.
        match connector.run(conn).await {
          ControlFlow::Continue(_) => continue,
          ControlFlow::Break(res) => break res.map_err(|e| Error::App(e))
        }
      }
      ConnResult::Aborted => break Ok(()),
      ConnResult::RetryableError(e) => {
        if connector.retry() {
          connector.delay().await;
        } else {
          // Presumably the maximum number of retries has been reached.
          break Err(Error::Exhausted(e));
        }
      }
      ConnResult::FatalError(e) => {
        break Err(Error::App(e));
      }
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Added src/tokiox/tcpconn.rs.

















































































































































































































>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
use std::{future::Future, ops::ControlFlow, time::Duration};

use tokio::net::TcpStream;

use async_trait::async_trait;

use killswitch::KillSwitch;

use super::{ConnResult, Connector};

pub struct SimpleTcpConnector<F>
where
  F: Future<
    Output = ControlFlow<
      Result<(), std::io::Error>,
      Result<(), std::io::Error>
    >
  >
{
  addr: String,
  delay: usize,
  ks: KillSwitch,
  cb: Box<dyn Fn(TcpStream, KillSwitch) -> F + Send>
}

impl<F> SimpleTcpConnector<F>
where
  F: Future<
    Output = ControlFlow<
      Result<(), std::io::Error>,
      Result<(), std::io::Error>
    >
  >
{
  pub fn new(
    addr: String,
    ks: KillSwitch,
    cb: Box<dyn Fn(TcpStream, KillSwitch) -> F + Send>
  ) -> Self {
    Self {
      addr,
      delay: 1,
      ks,
      cb
    }
  }
}

#[async_trait]
impl<F> Connector for SimpleTcpConnector<F>
where
  F: Future<
      Output = ControlFlow<
        Result<(), std::io::Error>,
        Result<(), std::io::Error>
      >
    > + Send
{
  type Error = std::io::Error;
  type ConnType = TcpStream;

  async fn connect(&mut self) -> ConnResult<Self::ConnType, Self::Error> {
    match TcpStream::connect(&self.addr).await {
      Ok(conn) => ConnResult::Connected(conn),

      Err(e) => {
        // ToDo: Distinguish between retryable and fatal errors

        ConnResult::RetryableError(e)
      }
    }
  }

  fn retry(&mut self) -> bool {
    true
  }

  async fn delay(&mut self) -> ControlFlow<(), ()> {
    let dur = Duration::from_secs(self.delay.try_into().unwrap());
    tokio::select! {
      _ = self.ks.wait() => {
        ControlFlow::Break(())
      }
      _ = tokio::time::sleep(dur) => {
        self.delay = std::cmp::min(self.delay * 2, 60);
        ControlFlow::Continue(())
      }
    }
  }

  async fn run(
    &mut self,
    conn: Self::ConnType
  ) -> ControlFlow<Result<(), Self::Error>, Result<(), Self::Error>> {
    // reset delay
    self.delay = 1;

    let fut = (self.cb)(conn, self.ks.clone());

    fut.await
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :

Changes to www/changelog.md.

1
2
3
4
5
6
7






8
9
10
11
12
13
14
# Change Log

## [Unreleased]

[Details](/vdiff?from=orphanage-0.0.1&to=trunk)

### Added







### Changed

### Removed

---








>
>
>
>
>
>







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Change Log

## [Unreleased]

[Details](/vdiff?from=orphanage-0.0.1&to=trunk)

### Added

- Add an `iox` module, containing `RngReader` which implements `std::io::Read`
  and returns random data.
- Add `fs::rndfile()` for creating files with random content of a requested
  size.
- Add `tokiox` module with an abstraction over (re)connection loops.

### Changed

### Removed

---