Index: .efiles ================================================================== --- .efiles +++ .efiles @@ -4,25 +4,30 @@ www/design-notes.md www/changelog.md src/err.rs src/lib.rs src/lumberjack.rs -src/nosvc.rs -src/systemd.rs -src/winsvc.rs -src/signals.rs -src/signals/unix.rs -src/signals/win.rs -src/rttype.rs -src/rttype/sync.rs -src/rttype/tokio.rs -src/rttype/rocket.rs +src/rt.rs +src/rt/nosvc.rs +src/rt/systemd.rs +src/rt/winsvc.rs +src/rt/rttype.rs +src/rt/rttype/sync.rs +src/rt/rttype/tokio.rs +src/rt/rttype/rocket.rs +src/rt/signals.rs +src/rt/signals/unix.rs +src/rt/signals/win.rs src/installer.rs src/installer/winsvc.rs src/installer/launchd.rs src/installer/systemd.rs src/argp.rs +tests/err/mod.rs +tests/apps/mod.rs +tests/apperr.rs +tests/initrunshutdown.rs examples/hellosvc.rs examples/hellosvc-tokio.rs examples/hellosvc-rocket.rs examples/err/mod.rs examples/procres/mod.rs Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "qsu" -version = "0.0.2" +version = "0.0.3" edition = "2021" license = "0BSD" categories = [ "asynchronous" ] keywords = [ "service", "systemd", "winsvc" ] repository = "https://repos.qrnch.tech/pub/qsu" @@ -17,21 +17,22 @@ "Rocket.toml", "rustfmt.toml" ] [features] +default = ["rt"] clap = ["dep:clap", "dep:itertools"] -full = ["clap", "installer", "rocket", "systemd"] +full = ["clap", "installer", "rocket", "rt", "systemd", "tokio"] installer = ["dep:sidoc"] systemd = ["dep:sd-notify"] -#tokio = ["dep:tokio"] -#rocket = ["dep:rocket", "dep:tokio"] -rocket = ["dep:rocket"] +rocket = ["rt", "dep:rocket", "tokio"] +rt = [] +tokio = ["rt", "tokio/macros", "tokio/rt-multi-thread", "tokio/signal"] wait-for-debugger = ["dep:dbgtools-win"] [dependencies] -async-trait = { version = "0.1.73" } +async-trait = { version = "0.1.74" } chrono = { version = "0.4.24" } clap = { version = "4.4.6", optional = true, features = [ "derive", "env", "string", "wrap_help" ] } env_logger = { version = "0.10.0" } @@ -40,15 +41,13 @@ killswitch = { version = "0.4.2" } log = { version = "0.4.20" } parking_lot = { version = "0.12.1" } rocket = { version = "0.5.0-rc.3", optional = true } sidoc = { version = "0.1.0", optional = true } -tokio = { version = "1.33.0", features = [ - "macros", "rt-multi-thread", "signal", "sync" -] } +tokio = { version = "1.33.0", features = ["sync"] } time = { version = "0.3.20", features = ["macros"] } -tracing = { version = "0.1.37" } +tracing = { version = "0.1.40" } [dependencies.tracing-subscriber] version = "0.3.17" default-features = false features = ["env-filter", "time", "fmt", "ansi"] @@ -79,15 +78,15 @@ all-features = true rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"] [[example]] name = "hellosvc" -required-features = ["clap", "installer"] +required-features = ["clap", "installer", "rt"] [[example]] name = "hellosvc-tokio" -required-features = ["clap", "installer"] +required-features = ["clap", "installer", "rt", "tokio"] [[example]] name = "hellosvc-rocket" -required-features = ["clap", "installer", "rocket"] +required-features = ["clap", "installer", "rt", "rocket"] Index: examples/err/mod.rs ================================================================== --- examples/err/mod.rs +++ examples/err/mod.rs @@ -30,7 +30,23 @@ impl From for Error { fn from(err: qsu::Error) -> Self { Error::Qsu(err.to_string()) } } + +/* +/// Convenience converter used to pass an application-defined errors from the +/// qsu inner runtime back out from the qsu runtime. +impl From for qsu::Error { + fn from(err: Error) -> qsu::Error { + qsu::Error::app(err) + } +} +*/ + +impl From for qsu::AppErr { + fn from(err: Error) -> qsu::AppErr { + qsu::AppErr::new(err) + } +} // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: examples/hellosvc-rocket.rs ================================================================== --- examples/hellosvc-rocket.rs +++ examples/hellosvc-rocket.rs @@ -4,12 +4,15 @@ mod argp; mod err; mod procres; use qsu::{ - argp::ArgParser, RocketServiceHandler, StartState, StopState, SvcEvt, - SvcEvtReader, SvcType + argp::ArgParser, + rt::{ + RocketServiceHandler, SrvAppRt, StartState, StopState, SvcEvt, + SvcEvtReader + } }; use rocket::{Build, Ignite, Rocket}; use err::Error; @@ -20,27 +23,30 @@ #[qsu::async_trait] impl RocketServiceHandler for MyService { async fn init( &mut self, - _ss: StartState - ) -> Result>, qsu::Error> { + ss: StartState + ) -> Result>, qsu::AppErr> { tracing::trace!("Running init()"); let mut rockets = vec![]; + + ss.report(Some("Building a rocket!".into())); let rocket = rocket::build().mount("/", routes![index]); + ss.report(Some("Pushing a rocket".into())); rockets.push(rocket); Ok(rockets) } async fn run( &mut self, rockets: Vec>, mut set: SvcEvtReader - ) -> Result<(), qsu::Error> { + ) -> Result<(), qsu::AppErr> { for rocket in rockets { tokio::task::spawn(async { rocket.launch().await.unwrap(); }); } @@ -48,21 +54,24 @@ loop { tokio::select! { evt = set.arecv() => { match evt { Some(SvcEvt::Shutdown) => { - tracing::info!("The service subsystem requested that the application shut down"); + tracing::info!("The service subsystem requested that the + application shut down"); break; } Some(SvcEvt::Terminate) => { tracing::info!( - "The service subsystem requested that the application terminate" + "The service subsystem requested that the application + terminate" ); break; } Some(SvcEvt::ReloadConf) => { - tracing::info!("The service subsystem requested that application reload configuration"); + tracing::info!("The service subsystem requested that application + reload configuration"); } _ => { } } } } @@ -69,11 +78,12 @@ } Ok(()) } - async fn shutdown(&mut self, _ss: StopState) -> Result<(), qsu::Error> { + async fn shutdown(&mut self, ss: StopState) -> Result<(), qsu::AppErr> { + ss.report(Some(format!("Entered {}", "shutdown").into())); tracing::trace!("Running shutdown()"); Ok(()) } } @@ -93,11 +103,11 @@ // Parse, and process, command line arguments. let mut argsproc = argp::AppArgsProc {}; let ap = ArgParser::new(&svcname, &mut argsproc); ap.proc(|| { let handler = Box::new(MyService {}); - SvcType::Rocket(handler) + SrvAppRt::Rocket(handler) })?; Ok(()) } Index: examples/hellosvc-tokio.rs ================================================================== --- examples/hellosvc-tokio.rs +++ examples/hellosvc-tokio.rs @@ -5,12 +5,14 @@ mod procres; use std::time::{Duration, Instant}; use qsu::{ - argp::ArgParser, StartState, StopState, SvcEvt, SvcEvtReader, SvcType, - TokioServiceHandler + argp::ArgParser, + rt::{ + SrvAppRt, StartState, StopState, SvcEvt, SvcEvtReader, TokioServiceHandler + } }; use err::Error; use procres::ProcRes; @@ -17,16 +19,17 @@ struct MyService {} #[qsu::async_trait] impl TokioServiceHandler for MyService { - async fn init(&mut self, _ss: StartState) -> Result<(), qsu::Error> { + async fn init(&mut self, ss: StartState) -> Result<(), qsu::AppErr> { + ss.report(Some("Entered init".into())); tracing::trace!("Running init()"); Ok(()) } - async fn run(&mut self, mut set: SvcEvtReader) -> Result<(), qsu::Error> { + async fn run(&mut self, mut set: SvcEvtReader) -> Result<(), qsu::AppErr> { const SECS: u64 = 30; let mut last_dump = Instant::now() - Duration::from_secs(SECS); loop { if Instant::now() - last_dump > Duration::from_secs(SECS) { log::error!("error"); @@ -69,11 +72,12 @@ } Ok(()) } - async fn shutdown(&mut self, _ss: StopState) -> Result<(), qsu::Error> { + async fn shutdown(&mut self, ss: StopState) -> Result<(), qsu::AppErr> { + ss.report(Some(("Entered shutdown".to_string()).into())); tracing::trace!("Running shutdown()"); Ok(()) } } @@ -93,12 +97,12 @@ // Parse, and process, command line arguments. let mut argsproc = argp::AppArgsProc {}; let ap = ArgParser::new(&svcname, &mut argsproc); ap.proc(|| { let handler = Box::new(MyService {}); - SvcType::Tokio(None, handler) + SrvAppRt::Tokio(None, handler) })?; Ok(()) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: examples/hellosvc.rs ================================================================== --- examples/hellosvc.rs +++ examples/hellosvc.rs @@ -8,27 +8,30 @@ thread, time::{Duration, Instant} }; use qsu::{ - argp::ArgParser, ServiceHandler, StartState, StopState, SvcEvt, - SvcEvtReader, SvcType + argp::ArgParser, + rt::{ + ServiceHandler, SrvAppRt, StartState, StopState, SvcEvt, SvcEvtReader + } }; use err::Error; use procres::ProcRes; struct MyService {} impl ServiceHandler for MyService { - fn init(&mut self, _ss: StartState) -> Result<(), qsu::Error> { + fn init(&mut self, ss: StartState) -> Result<(), qsu::AppErr> { + ss.report(Some("Entered init".into())); tracing::trace!("Running init()"); Ok(()) } - fn run(&mut self, mut set: SvcEvtReader) -> Result<(), qsu::Error> { + fn run(&mut self, mut set: SvcEvtReader) -> Result<(), qsu::AppErr> { const SECS: u64 = 30; let mut last_dump = Instant::now() - Duration::from_secs(SECS); loop { if Instant::now() - last_dump > Duration::from_secs(SECS) { log::error!("error"); @@ -72,11 +75,12 @@ } Ok(()) } - fn shutdown(&mut self, _ss: StopState) -> Result<(), qsu::Error> { + fn shutdown(&mut self, ss: StopState) -> Result<(), qsu::AppErr> { + ss.report(Some(("Entered shutdown".to_string()).into())); tracing::trace!("Running shutdown()"); Ok(()) } } @@ -96,12 +100,12 @@ // Parse, and process, command line arguments. let mut argsproc = argp::AppArgsProc {}; let ap = ArgParser::new(&svcname, &mut argsproc); ap.proc(|| { let handler = Box::new(MyService {}); - SvcType::Sync(handler) + SrvAppRt::Sync(handler) })?; Ok(()) } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/argp.rs ================================================================== --- src/argp.rs +++ src/argp.rs @@ -3,14 +3,26 @@ use clap::{builder::Str, Arg, ArgAction, ArgMatches, Args, Command}; use crate::{ installer::{self, RegSvc}, lumberjack::LogLevel, - Error, RunCtx, SvcType + rt::{RunCtx, SrvAppRt}, + Error }; +/// Modify a `clap` [`Command`] instance to accept common service management +/// subcommands. +/// +/// If `inst_subcmd` is `Some()`, it should be the name of the subcommand used +/// to register a service. If `rm_subcmd_ is `Some()` it should be the name of +/// the subcommand used to deregister a service. Similarly `run_subcmd` is +/// used to add a subcommand for running the service under a service subsystem +/// [where applicable]. +/// +/// It is recommended that applications use the higher-level `ArgParser` +/// instead. pub fn add_subcommands( cli: Command, svcname: &str, inst_subcmd: Option<&str>, rm_subcmd: Option<&str>, @@ -28,21 +40,19 @@ cli.subcommand(sub) } else { cli }; - let cli = if let Some(subcmd) = run_subcmd { + if let Some(subcmd) = run_subcmd { let sub = mk_run_cmd(subcmd, svcname); cli.subcommand(sub) } else { cli - }; - - cli + } } -/// Register service. +/// Service registration context. #[derive(Debug, Args)] struct RegSvcArgs { /// Autostart service at boot. #[arg(short = 's', long)] auto_start: bool, @@ -77,10 +87,16 @@ #[arg(long, value_enum, hide(true), value_name = "FNAME")] trace_file: Option } +/// Create a `clap` [`Command`] object that accepts service registration +/// arguments. +/// +/// It is recommended that applications use the higher-level `ArgParser` +/// instead, but this call exists in case applications need finer grained +/// control. pub fn mk_inst_cmd(cmd: &str, svcname: &str) -> Command { let namearg = Arg::new("svcname") .short('n') .long("name") .action(ArgAction::Set) @@ -105,10 +121,16 @@ /// Deregister service. #[derive(Debug, Args)] struct DeregSvcArgs {} +/// Create a `clap` [`Command`] object that accepts service deregistration +/// arguments. +/// +/// It is recommended that applications use the higher-level `ArgParser` +/// instead, but this call exists in case applications need finer grained +/// control. pub fn mk_rm_cmd(cmd: &str, svcname: &str) -> Command { let namearg = Arg::new("svcname") .short('n') .long("name") .action(ArgAction::Set) @@ -120,10 +142,11 @@ DeregSvcArgs::augment_args(cli) } +/// Parsed service deregistration arguments. pub struct DeregSvc { pub svcname: String } impl DeregSvc { @@ -136,10 +159,16 @@ /// Run service. #[derive(Debug, Args)] struct RunSvcArgs {} + +/// Create a `clap` [`Command`] object that accepts service running arguments. +/// +/// It is recommended that applications use the higher-level `ArgParser` +/// instead, but this call exists in case applications need finer grained +/// control. pub fn mk_run_cmd(cmd: &str, svcname: &str) -> Command { let namearg = Arg::new("svcname") .short('n') .long("name") .action(ArgAction::Set) @@ -160,10 +189,11 @@ /// Nothing to do (service was probably registered/deregistred). Quit } +/// Parsed service running arguments. pub struct RunSvc { pub svcname: String } impl RunSvc { @@ -172,21 +202,39 @@ Self { svcname } } } +/// Allow application to customise behavior of an [`ArgParser`] instance. pub trait ArgsProc { /// Callback allowing application to configure service installation argument /// parser. - fn inst_subcmd(&mut self) {} - - fn rm_subcmd(&mut self) {} - - fn run_subcmd(&mut self) {} - - /// Callback allowing application to configure the service registry context - /// before the service is registered. + fn inst_subcmd(&mut self) { + todo!() + } + + fn rm_subcmd(&mut self) { + todo!() + } + + fn run_subcmd(&mut self) { + todo!() + } + + /// Callback allowing application to configure the service registration + /// context just before the service is registered. + /// + /// This trait method can, among other things, be used by an application to: + /// - Configure a service work directory. + /// - Add environment variables. + /// - Add command like arguments to the run command. + /// + /// The `sub_m` argument represents `clap`'s parsed subcommand context for + /// the service registration subcommand. Applications that want to add + /// custom arguments to the parser should implement the + /// [`ArgsProc::inst_subcmd()`] trait method and perform the subcommand + /// augmentation there. /// /// The default implementation does nothing but return `regsvc` unmodified. #[allow(unused_variables)] fn proc_inst( &self, @@ -347,15 +395,30 @@ } } /// Process command line arguments. /// - /// The `bldr` is a closure that will be called to yield the `SvcType` in + /// Calling this method will initialize the command line parser, parse the + /// command line, using the associated [`ArgsProc`] as appropriate to modify + /// the argument parser, and then take the appropriate action: + /// + /// - If a service registration was requested, the service will be registered + /// and then the function will return. + /// - If a service deregistration was requested, the service will be + /// deregistered and then the function will return. + /// - If a service run was requested, then set up the service subsystem and + /// launch the server application under it. + /// - If an application-defined subcommand was called, then process it using + /// [`ArgsProc::proc_other()`] and then exit. + /// - If none of the above subcommands where issued, then run the server + /// application as a foreground process. + /// + /// The `bldr` is a closure that will be called to yield the `SrvAppRt` in /// case the service was requested to run. pub fn proc(mut self, bldr: F) -> Result<(), Error> where - F: FnOnce() -> SvcType + F: FnOnce() -> SrvAppRt { // Create registration subcommand let sub = mk_inst_cmd(&self.reg_subcmd, &self.svcname); self.cli = self.cli.subcommand(sub); Index: src/err.rs ================================================================== --- src/err.rs +++ src/err.rs @@ -1,17 +1,60 @@ -use std::{fmt, io}; +use std::{any::Any, fmt, io}; #[derive(Debug)] pub enum ArgsError { #[cfg(feature = "clap")] Clap(clap::Error), Msg(String) } + + +/// Indicate failure for server applicatino callbacks. +#[derive(Debug, Default)] +pub struct AppErrors { + pub init: Option, + pub run: Option, + pub shutdown: Option +} + +impl AppErrors { + pub fn init_failed(&self) -> bool { + self.init.is_some() + } + + pub fn run_failed(&self) -> bool { + self.run.is_some() + } + + pub fn shutdown_failed(&self) -> bool { + self.shutdown.is_some() + } + + pub fn origin(&self) -> CbOrigin { + if self.init_failed() { + CbOrigin::Init + } else if self.run_failed() { + CbOrigin::Run + } else if self.shutdown_failed() { + CbOrigin::Shutdown + } else { + // Can't happen + unimplemented!() + } + } +} + /// Errors that qsu will return to application. #[derive(Debug)] pub enum Error { + /// Application-defined error. + /// + /// Applications can use this variant to pass application-specific errors + /// through the runtime back to itself. + App(CbOrigin, AppErr), + ArgP(ArgsError), BadFormat(String), Internal(String), IO(String), @@ -20,17 +63,66 @@ /// This includes both initialization and actual logging. /// /// On Windows errors such as failure to register an event source will be /// treated as this error variant as well. LumberJack(String), + + /// Missing expected data. + Missing(String), + + /// Rocket-specific errors. #[cfg(feature = "rocket")] + #[cfg_attr(docsrs, doc(cfg(feature = "rocket")))] Rocket(String), SubSystem(String), + + /// Returned by [`RunCtx::run()`](crate::rt::RunCtx) to indicate which + /// server application callbacks that failed. + #[cfg(feature = "rt")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt")))] + SrvApp(AppErrors), + Unsupported } + impl Error { + pub fn is_apperr(&self) -> bool { + matches!(self, Error::App(_, _)) + } + + /// Attempt to convert [`Error`] into application-specific error. + /// + /// If it's not an `Error::App()` nor can be downcast to type `E`, the error + /// will be returned back as an `Err()`. + pub fn try_into_apperr(self) -> Result<(CbOrigin, E), Error> { + match self { + Error::App(origin, e) => match e.try_into_inner::() { + Ok(e) => Ok((origin, e)), + Err(e) => Err(Error::App(origin, AppErr::new(e))) + }, + e => Err(e) + } + } + + /// Unwrap application-specific error from an [`Error`]. + /// + /// # Panic + /// Panics if `Error` variant is not `Error::App()`. + pub fn unwrap_apperr(self) -> (CbOrigin, E) { + let Ok((origin, e)) = self.try_into_apperr::() else { + panic!("Unable to unwrap error E"); + }; + (origin, e) + } + + /* + pub(crate) fn app(origin: CbOrigin, e: E) -> Self { + Error::App(origin, AppErr::new(e)) + } + */ + pub fn bad_format(s: S) -> Self { Error::BadFormat(s.to_string()) } pub fn internal(s: S) -> Self { @@ -42,17 +134,24 @@ } pub fn lumberjack(s: S) -> Self { Error::LumberJack(s.to_string()) } + + pub fn missing(s: S) -> Self { + Error::Missing(s.to_string()) + } } impl std::error::Error for Error {} impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { + Error::App(_origin, _e) => { + write!(f, "Application-defined error") + } Error::ArgP(s) => { // ToDo: Handle the ArgsError::Clap and ArgsError::Msg differently write!(f, "Argument parser; {:?}", s) } Error::BadFormat(s) => { @@ -65,17 +164,33 @@ write!(f, "I/O error; {}", s) } Error::LumberJack(s) => { write!(f, "LumberJack error; {}", s) } + Error::Missing(s) => { + write!(f, "Missing data; {}", s) + } #[cfg(feature = "rocket")] Error::Rocket(s) => { write!(f, "Rocket error; {}", s) } Error::SubSystem(s) => { write!(f, "Service subsystem error; {}", s) } + Error::SrvApp(e) => { + let mut v = vec![]; + if e.init.is_some() { + v.push("init"); + } + if e.run.is_some() { + v.push("run"); + } + if e.shutdown.is_some() { + v.push("shutdown"); + } + write!(f, "Server application failed [{}]", v.join(",")) + } Error::Unsupported => { write!(f, "Operation is unsupported [on this platform]") } } } @@ -139,6 +254,90 @@ fn from(err: windows_service::Error) -> Self { Error::SubSystem(err.to_string()) } } + +/// An error type used to store an application-specific error. +/// +/// Application call-backs return this type for the `Err()` case in order to +/// allow the errors to be passed back to the application call that started the +/// service runtime. +#[repr(transparent)] +#[derive(Debug)] +pub struct AppErr(Box); + +impl AppErr { + pub fn new(e: E) -> Self + where + E: Send + 'static + { + Self(Box::new(e)) + } + + /// Attempt to unpack and cast the inner error type. + /// + /// If it can't be downcast to `E`, `AppErr` will be returned in the `Err()` + /// case. + /// + /// ``` + /// use qsu::AppErr; + /// + /// enum MyErr { + /// Something(String) + /// } + /// let apperr = AppErr::new(MyErr::Something("hello".into())); + /// + /// let Ok(e) = apperr.try_into_inner::() else { + /// panic!("Unexpectedly not MyErr"); + /// }; + /// ``` + pub fn try_into_inner(self) -> Result { + match self.0.downcast::() { + Ok(e) => Ok(*e), + Err(e) => Err(AppErr(e)) + } + } + + /// Unwrap application-specific error from an [`Error`](crate::err::Error). + /// + /// ``` + /// use qsu::AppErr; + /// + /// enum MyErr { + /// Something(String) + /// } + /// let apperr = AppErr::new(MyErr::Something("hello".into())); + /// + /// let MyErr::Something(e) = apperr.unwrap_inner::() else { + /// panic!("Unexpectedly not MyErr::Something"); + /// }; + /// assert_eq!(e, "hello"); + /// ``` + /// + /// # Panic + /// Panics if the inner type is not castable to `E`. + pub fn unwrap_inner(self) -> E { + let Ok(e) = self.0.downcast::() else { + panic!("Unable to downcast to error type E"); + }; + *e + } +} + +/// Origin of an application callback error. +#[derive(Debug)] +pub enum CbOrigin { + /// The application error occurred in the service handler's `init()` + /// callback. + Init, + + /// The application error occurred in the service handler's `run()` + /// callback. + Run, + + /// The application error occurred in the service handler's `shutdown()`. + /// callback. + Shutdown +} + // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/installer/winsvc.rs ================================================================== --- src/installer/winsvc.rs +++ src/installer/winsvc.rs @@ -8,11 +8,13 @@ service_manager::{ServiceManager, ServiceManagerAccess} }; use crate::{ err::Error, - winsvc::{create_service_params, write_service_subkey} + rt::winsvc::{ + create_service_params, get_service_params_subkey, write_service_subkey + } }; pub fn install(ctx: super::RegSvc) -> Result<(), Error> { let svcname = &ctx.svcname; @@ -125,10 +127,13 @@ //println!("==> Service installation successful"); let mut params = create_service_params(svcname)?; + // Just so the uninstaller will accept this service + params.set_value("Installer", &"qsu")?; + if let Some(wd) = ctx.workdir { params.set_value("WorkDir", &wd)?; } if let Some(ll) = ctx.log_level { @@ -152,11 +157,42 @@ *status.borrow_mut() = true; Ok(()) } + +/// Deregister a system service. +/// +/// # Constraints +/// Uninstalling the wrong service can have spectacularly bad side-effects, so +/// qsu goes to some lengths to ensure that only services it installed itself +/// can be uninstalled. +/// +/// Before attempting an actual uninstallation, this function will verify that +/// under the service's `Parameters` subkey there is an `Installer` key with +/// the value `qsu`. pub fn uninstall(svcname: &str) -> Result<(), Error> { + // Only allow uninstallation of services that have an Installer=qsu key in + // its Parameters subkey. + if let Ok(params) = get_service_params_subkey(svcname) { + if let Ok(val) = params.get_value::("Installer") { + if val != "qsu" { + Err(Error::missing( + "Refusing to uninstall service that doesn't appear to be installed \ + by qsu" + ))?; + } + } else { + Err(Error::missing( + "Service Parameters does not have a Installer key." + ))?; + } + } else { + Err(Error::missing("Service does not have a Parameters subkey."))?; + } + + let manager_access = ServiceManagerAccess::CONNECT; let service_manager = ServiceManager::local_computer(None::<&str>, manager_access)?; let service_access = ServiceAccess::QUERY_STATUS | ServiceAccess::STOP | ServiceAccess::DELETE; Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -1,357 +1,77 @@ -//! _qsu_ is primarily a thin layer between the server application code and the -//! operating system's service subsystem. When the application is not running -//! under a service subsystem qsu may simulate parts of one so that the server -//! application code does not need to diverge too far between the service and -//! non-service cases. +//! _qsu_ is a set of tools for integrating a server application against a +//! service subsystem (such as +//! [Windows Services](https://learn.microsoft.com/en-us/windows/win32/services/services) [systemd](https://systemd.io/), or +//! [launchd](https://www.launchd.info/)). +//! +//! It offers a thin runtime wrapper layer with the purpose of abstracting away +//! differences between service subsystems (and also provides the same +//! interface when running the server application as a foreground process). +//! More information about the wrapper runtime can be found in the [rt] module +//! documentation. +//! +//! In addition _qsu_ offers helper functions to register/deregister an +//! executable with the system's service subsystem. These are documented +//! [installer] module. +//! +//! And finally it offers an argument parser to offer basic service +//! registration/deregistration and running using a consistent command line +//! interface. These are documented in the [argp] module. +//! +//! +//! # Features +//! | Feature | Function +//! |-------------|---------- +//! | `clap` | Enable `clap` (argument parser) integration. +//! | `installer` | Tools for registering/deregistering services. +//! | `rt` | Service wrapper (enabled by default). +//! | `systemd` | systemd integration support. +//! | `tokio` | Tokio server application type support. +//! | `rocket` | Rocket server application type support. +//! +//! In addition there's a special `wait-for-debugger` feature that is only used +//! on Windows. It will make the service runtime halt and wait for a debugger +//! to attach just before starting the Windows Service runtime. Once a +//! debugger has attached, it will voluntarily trigger a breakpoint. #![cfg_attr(docsrs, feature(doc_cfg))] mod err; mod lumberjack; -mod nosvc; -mod rttype; -pub mod signals; + +#[cfg(feature = "rt")] +#[cfg_attr(docsrs, doc(cfg(feature = "rt")))] +pub mod rt; #[cfg(feature = "clap")] #[cfg_attr(docsrs, doc(cfg(feature = "clap")))] pub mod argp; #[cfg(feature = "installer")] #[cfg_attr(docsrs, doc(cfg(feature = "installer")))] pub mod installer; -#[cfg(all(target_os = "linux", feature = "systemd"))] -#[cfg_attr(docsrs, doc(cfg(feature = "systemd")))] -mod systemd; - -#[cfg(windows)] -pub mod winsvc; - -use std::{ffi::OsStr, path::Path, sync::Arc}; - -use tokio::{runtime, sync::broadcast}; +use std::{ffi::OsStr, path::Path}; pub use async_trait::async_trait; pub use lumberjack::LumberJack; -pub use crate::err::Error; - - -/// Report the current startup/shutdown state to the platform service -/// subsystem. -pub(crate) trait StateReporter { - fn starting(&self, checkpoint: u32); - - fn started(&self); - - fn stopping(&self, checkpoint: u32); - - fn stopped(&self); -} - - -/// Report startup checkpoints to the service subsystem. -pub struct StartState { - sr: Arc -} -impl StartState { - pub fn report(&self, checkpoint: u32) { - self.sr.starting(checkpoint); - } -} - -/// Report shutdown checkpoints to the service subsystem. -pub struct StopState { - sr: Arc -} -impl StopState { - pub fn report(&self, checkpoint: u32) { - self.sr.stopping(checkpoint); - } -} - - -/// "Synchronous" (non-`async`) server application. -pub trait ServiceHandler { - fn init(&mut self, ss: StartState) -> Result<(), Error>; - - fn run(&mut self, ser: SvcEvtReader) -> Result<(), Error>; - - fn shutdown(&mut self, ss: StopState) -> Result<(), Error>; -} - - -/// Tokio (`async`) server application. -#[async_trait] -pub trait TokioServiceHandler { - async fn init(&mut self, ss: StartState) -> Result<(), Error>; - - async fn run(&mut self, ser: SvcEvtReader) -> Result<(), Error>; - - async fn shutdown(&mut self, ss: StopState) -> Result<(), Error>; -} - - -/// Rocket server application handler. -/// -/// While Rocket is built on top of tokio, it \[Rocket\] wants to initialize -/// tokio itself. -/// -/// There are two major ways to write Rocket services using qsu; either the -/// application can let qsu be aware of the server applications' `Rocket` -/// instances. It does this by creating the `Rocket` instances in -/// `RocketServiceHandler::init()` and returns them. _qsu_ will ignite these -/// rockets and pass them to `RocketServiceHandler::run()`. The application is -/// responsible for launching the rockets at this point. -/// -/// The other way to do it is to completely manage the `Rocket` instances in -/// application code (by not returning rocket instances from `init()`). -/// -/// Allowing _qsu_ to manage the `Rocket` instances will cause _qsu_ to request -/// graceful shutdown of all `Rocket` instances once a `SvcEvt::Shutdown` is -/// sent by the runtime. -/// -/// It is recommended that `ctrlc` shutdown and termination signals are -/// disabled in each `Rocket` instance's configuration, and allow the _qsu_ -/// runtime to be responsible for initiating the `Rocket` shutdown. -#[cfg(feature = "rocket")] -#[cfg_attr(docsrs, doc(cfg(feature = "rocket")))] -#[async_trait] -pub trait RocketServiceHandler { - /// Rocket service initialization. - /// - /// The returned `Rocket`s will be ignited and their shutdown handlers will - /// be triggered on shutdown. - async fn init( - &mut self, - ss: StartState - ) -> Result>, Error>; - - async fn run( - &mut self, - rockets: Vec>, - ser: SvcEvtReader - ) -> Result<(), Error>; - - async fn shutdown(&mut self, ss: StopState) -> Result<(), Error>; -} - - -/// Event notifications that originate from the service subsystem that is -/// controlling the server application. -#[derive(Clone, Debug)] -pub enum SvcEvt { - /// Service subsystem has requested that the server application should pause - /// its operations. - /// - /// Only the Windows service subsystem will emit these events. - Pause, - - /// Service subsystem has requested that the server application should - /// resume its operations. - /// - /// Only the Windows service subsystem will emit these events. - Resume, - - /// Service subsystem has requested that the services configuration should - /// be reread. - /// - /// On Unixy platforms this is triggered by SIGHUP, and is unsupported on - /// Windows. - ReloadConf, - - /// The service subsystem (or equivalent) has requested that the service - /// shutdown. - Shutdown, - - Terminate -} - - -/// Channel end-point used to receive events from the service subsystem. -pub struct SvcEvtReader { - rx: broadcast::Receiver -} - -impl SvcEvtReader { - /// Block and wait for an event. - /// - /// Once `SvcEvt::Shutdown` or `SvcEvt::Terminate` has been received, this - /// method should not be called again. - pub fn recv(&mut self) -> Option { - self.rx.blocking_recv().ok() - } - - /// Attemt to get next event. - /// - /// Once `SvcEvt::Shutdown` or `SvcEvt::Terminate` has been received, this - /// method should not be called again. - pub fn try_recv(&mut self) -> Option { - self.rx.try_recv().ok() - } - - /// Async wait for an event. - /// - /// Once `SvcEvt::Shutdown` or `SvcEvt::Terminate` has been received, this - /// method should not be called again. - pub async fn arecv(&mut self) -> Option { - self.rx.recv().await.ok() - } -} - - -/// The types of service types supported. -pub enum SvcType { - Sync(Box), - - /// Initializa a tokio runtime. - Tokio( - Option, - Box - ), - - #[cfg(feature = "rocket")] - #[cfg_attr(docsrs, doc(cfg(feature = "rocket")))] - /// Rocket 0.5rc.3 insists on initializing tokio itself. - Rocket(Box) -} - - -/// Service configuration context. -pub struct RunCtx { - service: bool, - svcname: String -} - -impl RunCtx { - /// Run as a systemd service. - #[cfg(all(target_os = "linux", feature = "systemd"))] - fn systemd(_svcname: &str, st: SvcType) -> Result<(), Error> { - LumberJack::default().init()?; - - let reporter = Arc::new(systemd::ServiceReporter {}); - - let res = match st { - SvcType::Sync(handler) => rttype::sync_main(handler, reporter, None), - SvcType::Tokio(rtbldr, handler) => { - rttype::tokio_main(rtbldr, handler, reporter, None) - } - SvcType::Rocket(handler) => rttype::rocket_main(handler, reporter, None) - }; - - res - } - - /// Run as a Windows service. - #[cfg(windows)] - fn winsvc(svcname: &str, st: SvcType) -> Result<(), Error> { - winsvc::run(svcname, st)?; - - Ok(()) - } - - /// Run as a foreground server - fn foreground(_svcname: &str, st: SvcType) -> Result<(), Error> { - LumberJack::default().init()?; - - let reporter = Arc::new(nosvc::ServiceReporter {}); - - match st { - SvcType::Sync(handler) => rttype::sync_main(handler, reporter, None), - SvcType::Tokio(rtbldr, handler) => { - rttype::tokio_main(rtbldr, handler, reporter, None) - } - - #[cfg(feature = "rocket")] - SvcType::Rocket(handler) => rttype::rocket_main(handler, reporter, None) - } - } -} - -impl RunCtx { - pub fn new(name: &str) -> Self { - Self { - service: false, - svcname: name.into() - } - } - - pub fn service(mut self) -> Self { - self.service = true; - self - } - - pub fn service_ref(&mut self) -> &mut Self { - self.service = true; - self - } - - /// Launch the application. - /// - /// If this `RunCtx` has been marked as a _service_ then it will perform the - /// appropriate service subsystem integration before running the actual - /// server application code. - /// - /// This function must only be called from the main thread of the process, - /// and must be called before any other threads are started. - pub fn run(self, st: SvcType) -> Result<(), Error> { - if self.service { - #[cfg(all(target_os = "linux", feature = "systemd"))] - Self::systemd(&self.svcname, st)?; - - #[cfg(windows)] - Self::winsvc(&self.svcname, st)?; - - // ToDo: We should check for other platforms here (like macOS/launchd) - } else { - // Do not run against any specific service subsystem. Despite its name - // this isn't necessarily running as a foreground process; some service - // subsystems do not make a distinction. Perhaps a better mental model - // is that certain service subsystems expects to run regular "foreground" - // processes. - Self::foreground(&self.svcname, st)?; - } - - Ok(()) - } - - /// Convenience method around [`Self::run()`] using [`SvcType::Sync`]. - pub fn run_sync( - self, - handler: Box - ) -> Result<(), Error> { - self.run(SvcType::Sync(handler)) - } - - /// Convenience method around [`Self::run()`] using [`SvcType::Tokio`]. - //#[cfg(feature = "tokio")] - pub fn run_tokio( - self, - rtbldr: Option, - handler: Box - ) -> Result<(), Error> { - self.run(SvcType::Tokio(rtbldr, handler)) - } - - /// Convenience method around [`Self::run()`] using [`SvcType::Rocket`]. - #[cfg(feature = "rocket")] - pub fn run_rocket( - self, - handler: Box - ) -> Result<(), Error> { - self.run(SvcType::Rocket(handler)) - } -} - - -/// Attempt to determine a default service name. +pub use err::{AppErr, CbOrigin, Error}; + +#[cfg(feature = "tokio")] +pub use tokio; + +#[cfg(feature = "rocket")] +pub use rocket; + + +/// Attempt to derive a default service name based on the executable's name. /// /// The idea is to get the current executable's file name and strip it's /// extension (if there is one). The file stem name is the default service -/// name. +/// name. On macos the name will be prefixed by `local.`. pub fn default_service_name() -> Option { let binary_path = ::std::env::current_exe().ok()?; let name = binary_path.file_name()?; let name = Path::new(name); @@ -368,12 +88,6 @@ #[cfg(target_os = "macos")] fn mkname(nm: &OsStr) -> Option { nm.to_str().map(|x| format!("local.{}", x)) } - -pub fn leak_default_service_name() -> Option<&'static str> { - let svcname = default_service_name()?; - Some(svcname.leak()) -} - // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/lumberjack.rs ================================================================== --- src/lumberjack.rs +++ src/lumberjack.rs @@ -24,18 +24,30 @@ WinEvtLog { svcname: String } } /// Logging and tracing initialization. pub struct LumberJack { + init: bool, log_out: LogOut, log_level: LogLevel, trace_level: LogLevel, //log_file: Option, trace_file: Option } impl Default for LumberJack { + /// Create a default log/trace initialization. + /// + /// This will set the `log` log level to the value of the `LOG_LEVEL` + /// environment variable, or default to `warm` (if either not set or + /// invalid). + /// + /// The `tracing` trace level will use the environment variable `TRACE_LEVEL` + /// in a similar manner, but defaults to `off`. + /// + /// If the environment variable `TRACE_FILE` is set the value will be the + /// used as the file name to write the trace logs to. fn default() -> Self { let log_level = if let Ok(level) = std::env::var("LOG_LEVEL") { if let Ok(level) = level.parse::() { level } else { @@ -60,10 +72,11 @@ } else { LogLevel::Off }; Self { + init: true, log_out: Default::default(), log_level, trace_level, //log_file: None, trace_file @@ -70,18 +83,34 @@ } } } impl LumberJack { + /// Create a [`LumberJack::default()`] object. pub fn new() -> Self { Self::default() } + + /// Do not initialize logging/tracing. + /// + /// This is useful when running tests. + pub fn noinit() -> Self { + Self { + init: false, + ..Default::default() + } + } + + pub fn set_init(mut self, flag: bool) -> Self { + self.init = flag; + self + } /// Load logging/tracing information from a service Parameters subkey. #[cfg(windows)] pub fn from_winsvc(svcname: &str) -> Result { - let params = crate::winsvc::get_service_param(svcname)?; + let params = crate::rt::winsvc::get_service_param(svcname)?; let loglevel = params .get_value::("LogLevel") .unwrap_or(String::from("warn")) .parse::() .unwrap_or(LogLevel::Warn); @@ -101,47 +130,56 @@ }; Ok(this) } + /// Set the `log` logging level. pub fn log_level(mut self, level: LogLevel) -> Self { self.log_level = level; self } + /// Set the `tracing` log level. pub fn trace_level(mut self, level: LogLevel) -> Self { self.trace_level = level; self } + /// Set a file to which `tracing` log entries are written (rather than to + /// write to console). pub fn trace_file

(mut self, fname: P) -> Self where P: AsRef { self.trace_file = Some(fname.as_ref().to_path_buf()); self } - pub fn init(self) -> Result<(), Error> { - match self.log_out { - LogOut::Console => { - init_console_logging()?; - } - #[cfg(windows)] - LogOut::WinEvtLog { svcname } => { - eventlog::init(&svcname, log::Level::Trace)?; - log::set_max_level(self.log_level.into()); - } - } - - if let Some(fname) = self.trace_file { - init_file_tracing(fname, self.trace_level); - } else { - init_console_tracing(self.trace_level); - } - - Ok(()) + /// Commit requested settings to `log` and `tracing`. + pub fn init(self) -> Result<(), Error> { + if self.init { + match self.log_out { + LogOut::Console => { + init_console_logging()?; + } + #[cfg(windows)] + LogOut::WinEvtLog { svcname } => { + eventlog::init(&svcname, log::Level::Trace)?; + log::set_max_level(self.log_level.into()); + } + } + + if let Some(fname) = self.trace_file { + init_file_tracing(fname, self.trace_level); + } else { + init_console_tracing(self.trace_level); + } + + Ok(()) + } else { + Ok(()) + } } } #[derive(Debug, PartialEq, Eq, Clone, Copy)] DELETED src/nosvc.rs Index: src/nosvc.rs ================================================================== --- src/nosvc.rs +++ /dev/null @@ -1,19 +0,0 @@ -//! Not a service module. -//! -//! It might seem odd to have a no-service in a service library, which is true. -//! But it simplifies to service aplication code to be able to treat all cases -//! (roughly) equally. - -pub struct ServiceReporter {} - -impl super::StateReporter for ServiceReporter { - fn starting(&self, _checkpoint: u32) {} - - fn started(&self) {} - - fn stopping(&self, _checkpoint: u32) {} - - fn stopped(&self) {} -} - -// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/rt.rs Index: src/rt.rs ================================================================== --- /dev/null +++ src/rt.rs @@ -0,0 +1,542 @@ +//! Server application wrapper runtime. +//! +//! # Overview +//! The _qsu_'s runtime lives in a layer between the actual server application +//! and the "service subsystems" that servier applications can integrate +//! against: +//! +//!

+//! +---------------------+
+//! |  Server Application |  (HTTP server, Matrix server, etc)
+//! +---------------------+
+//! |       qsu rt        |
+//! +---------------------+
+//! |  Platform service   |  (Windows Service subsystem, systemd, etc)
+//! |      subsystem      |
+//! +---------------------+
+//! 
+//! +//! The primary goal of the _qsu_ runtime is to provide a consistent interface +//! that will be mapped to whatever service subsystem it is actually running +//! on. This including no special runtime at all; such as when running the +//! server application as a regular foreground process (which is common during +//! development). +//! +//! # How server applications are implemented and used +//! _qsu_ needs to know what kind of runtime the server application expects. +//! Server applications pick the runtime type by implementing a trait, of which +//! there are currently three recognized types: +//! - [`ServiceHandler`] is used for "non-async" server applications. +//! - [`TokioServiceHandler`] is used for server applications that run under +//! the tokio executor. +//! - [`RocketServiceHandler`] is for server applications that are built on top +//! of the Rocket HTTP framework. +//! +//! Each of these implement three methods: +//! - `init()` is for initializing the service. +//! - `run()` is for running the actual server application. +//! - `shutdown()` is for shutting down the server application. +//! +//! The actual trait methods may look quite different, depending on the trait +//! being used. +//! +//! Once a service trait has been implemented, the application creates a +//! [`RunCtx`] object and calls its [`run()`](RunCtx::run()) method, passing in +//! an service implementation object. +//! +//! Note: Only one service wrapper must be initialized per process. +//! +//! ## Service Handler semantics +//! When a handler is run through the [`RunCtx`] it will first call the +//! handler's `init()` method. If it returns `Ok()`, its `run()` method will +//! be run. +//! +//! The handler's `shutdown()` will be called regardless of whether `init()` or +//! `run()` was successful (the only precondition for `shutdown()` to be called +//! is that `init()` was called). +//! +//! # Application errors +//! The _qsu_ runtime is initialized and run from an application that is called +//! back to from the _qsu_ runtime. This has the unfortunate side effect of +//! creating a kind of barrier between the application's "outside" (the part +//! that sets up and runs the _qsu_ runtime) and the "inside" (the service +//! trait callback methods). Specifically, the problem this causes is that if +//! an error occurs in the "inner" server application code, the "outer" +//! application code may want to know exactly what the inner error was. +//! +//! _qsu_ bridges this gap by providing the [`AppErr`] type for the `Err()` +//! case of the callbacks. The `AppErr` is a newtype over a boxed `Any` type. +//! In order to get at the original error value from the "inside" the `AppErr` +//! needs to be unwrapped. See the [`AppErr`] documentation for more +//! information. +//! +//! Presumably the application has it's own `Error` type. To allow the +//! callbacks to return application-defined errors using the regular question +//! mark, it may be helpful to add the following error conversion to the error +//! module: +//! +//! ``` +//! // Application-specific Error type +//! enum Error { +//! // .. application-defined error variants +//! } +//! +//! impl From for qsu::AppErr { +//! fn from(err: Error) -> qsu::AppErr { +//! qsu::AppErr::new(err) +//! } +//! } +//! ``` +//! +//! # Using the argument parser +//! _qsu_ offers an [argument parser](crate::argp::ArgParser), which can +//! abstract away much of the runtime management. + +mod nosvc; +mod rttype; +mod signals; + +#[cfg(all(target_os = "linux", feature = "systemd"))] +#[cfg_attr(docsrs, doc(cfg(feature = "systemd")))] +mod systemd; + +#[cfg(windows)] +pub mod winsvc; + +use std::sync::{ + atomic::{AtomicU32, Ordering}, + Arc +}; + +#[cfg(any(feature = "tokio", feature = "rocket"))] +use async_trait::async_trait; + +#[cfg(feature = "tokio")] +use tokio::runtime; + +use tokio::sync::broadcast; + + +use crate::{err::Error, lumberjack::LumberJack, AppErr}; + + +/// Used to pass an optional message to the service subsystem whenever a +/// startup or shutdown checkpoint as been reached. +pub enum StateMsg { + Ref(&'static str), + Owned(String) +} + +impl From<&'static str> for StateMsg { + fn from(msg: &'static str) -> Self { + Self::Ref(msg) + } +} + +impl From for StateMsg { + fn from(msg: String) -> Self { + Self::Owned(msg) + } +} + +impl AsRef for StateMsg { + fn as_ref(&self) -> &str { + match self { + StateMsg::Ref(s) => s, + StateMsg::Owned(s) => s + } + } +} + + +/// Report the current startup/shutdown state to the platform service +/// subsystem. +pub(crate) trait StateReporter { + fn starting(&self, checkpoint: u32, msg: Option); + + fn started(&self); + + fn stopping(&self, checkpoint: u32, msg: Option); + + fn stopped(&self); +} + + +/// Report startup checkpoints to the service subsystem. +/// +/// An instance of this is handed to the server application through the service +/// handler's `init()` trait method. +pub struct StartState { + sr: Arc, + cnt: Arc +} +impl StartState { + pub fn report(&self, status: Option) { + let checkpoint = self.cnt.fetch_add(1, Ordering::SeqCst); + self.sr.starting(checkpoint, status); + } +} + +/// Report shutdown checkpoints to the service subsystem. +/// +/// An instance of this is handed to the server application through the service +/// handler's `shutdown()` trait method. +pub struct StopState { + sr: Arc, + cnt: Arc +} +impl StopState { + pub fn report(&self, status: Option) { + let checkpoint = self.cnt.fetch_add(1, Ordering::SeqCst); + self.sr.stopping(checkpoint, status); + } +} + + +/// "Synchronous" (non-`async`) server application. +/// +/// Implement this for an object that wraps a server application that does not +/// use an async runtime. +pub trait ServiceHandler { + fn init(&mut self, ss: StartState) -> Result<(), AppErr>; + + fn run(&mut self, ser: SvcEvtReader) -> Result<(), AppErr>; + + fn shutdown(&mut self, ss: StopState) -> Result<(), AppErr>; +} + + +/// `async` server application built on the tokio runtime. +/// +/// Implement this for an object that wraps a server application that uses +/// tokio as an async runtime. +#[cfg(feature = "tokio")] +#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] +#[async_trait] +pub trait TokioServiceHandler { + async fn init(&mut self, ss: StartState) -> Result<(), AppErr>; + + async fn run(&mut self, ser: SvcEvtReader) -> Result<(), AppErr>; + + async fn shutdown(&mut self, ss: StopState) -> Result<(), AppErr>; +} + + +/// Rocket server application handler. +/// +/// While Rocket is built on top of tokio, it \[Rocket\] wants to initialize +/// tokio itself. +/// +/// There are two major ways to write Rocket services using qsu; either the +/// application can let qsu be aware of the server applications' `Rocket` +/// instances. It does this by creating the `Rocket` instances in +/// `RocketServiceHandler::init()` and returns them. _qsu_ will ignite these +/// rockets and pass them to `RocketServiceHandler::run()`. The application is +/// responsible for launching the rockets at this point. +/// +/// The other way to do it is to completely manage the `Rocket` instances in +/// application code (by not returning rocket instances from `init()`). +/// +/// Allowing _qsu_ to manage the `Rocket` instances will cause _qsu_ to request +/// graceful shutdown of all `Rocket` instances once a `SvcEvt::Shutdown` is +/// sent by the runtime. +/// +/// It is recommended that `ctrlc` shutdown and termination signals are +/// disabled in each `Rocket` instance's configuration, and allow the _qsu_ +/// runtime to be responsible for initiating the `Rocket` shutdown. +#[cfg(feature = "rocket")] +#[cfg_attr(docsrs, doc(cfg(feature = "rocket")))] +#[async_trait] +pub trait RocketServiceHandler { + /// Rocket service initialization. + /// + /// The returned `Rocket`s will be ignited and their shutdown handlers will + /// be triggered on shutdown. + async fn init( + &mut self, + ss: StartState + ) -> Result>, AppErr>; + + /// Server application main entry point. + /// + /// If the `init()` trait method returned `Rocket` instances to the + /// qsu runtime it will ignote these and pass them to `run()`. It is the + /// responsibility of this method to launch the rockets and await them. + async fn run( + &mut self, + rockets: Vec>, + ser: SvcEvtReader + ) -> Result<(), AppErr>; + + async fn shutdown(&mut self, ss: StopState) -> Result<(), AppErr>; +} + + +/// Event notifications that originate from the service subsystem that is +/// controlling the server application. +#[derive(Clone, Debug)] +pub enum SvcEvt { + /// Service subsystem has requested that the server application should pause + /// its operations. + /// + /// Only the Windows service subsystem will emit these events. + Pause, + + /// Service subsystem has requested that the server application should + /// resume its operations. + /// + /// Only the Windows service subsystem will emit these events. + Resume, + + /// Service subsystem has requested that the services configuration should + /// be reread. + /// + /// On Unixy platforms this is triggered by SIGHUP, and is unsupported on + /// Windows. + ReloadConf, + + /// The service subsystem (or equivalent) has requested that the service + /// shutdown. + Shutdown, + + /// The service subsystem (or equivalent) has requested that the service + /// terminate. + Terminate +} + + +/// Channel end-point used to receive events from the service subsystem. +pub struct SvcEvtReader { + rx: broadcast::Receiver +} + +impl SvcEvtReader { + /// Block and wait for an event. + /// + /// Once `SvcEvt::Shutdown` or `SvcEvt::Terminate` has been received, this + /// method should not be called again. + pub fn recv(&mut self) -> Option { + self.rx.blocking_recv().ok() + } + + /// Attempt to get next event. + /// + /// Once `SvcEvt::Shutdown` or `SvcEvt::Terminate` has been received, this + /// method should not be called again. + pub fn try_recv(&mut self) -> Option { + self.rx.try_recv().ok() + } + + /// Async wait for an event. + /// + /// Once `SvcEvt::Shutdown` or `SvcEvt::Terminate` has been received, this + /// method should not be called again. + pub async fn arecv(&mut self) -> Option { + self.rx.recv().await.ok() + } +} + + +/// The server application runtime type. + +pub enum SrvAppRt { + /// A plain non-async (sometimes referred to as "blocking") server + /// application. + Sync(Box), + + /// Initializa a tokio runtime, and call each application handler from an + /// async context. + #[cfg(feature = "tokio")] + #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] + Tokio( + Option, + Box + ), + + /// Allow Rocket to initialize the tokio runtime, and call each application + /// handler from an async context. + #[cfg(feature = "rocket")] + #[cfg_attr(docsrs, doc(cfg(feature = "rocket")))] + Rocket(Box) +} + + +/// Service runner context. +pub struct RunCtx { + service: bool, + svcname: String, + log_init: bool, + test_mode: bool +} + +impl RunCtx { + /// Run as a systemd service. + #[cfg(all(target_os = "linux", feature = "systemd"))] + fn systemd(self, st: SrvAppRt) -> Result<(), Error> { + LumberJack::default().set_init(self.log_init).init()?; + + tracing::debug!("Running service '{}'", self.svcname); + + let reporter = Arc::new(systemd::ServiceReporter {}); + + let res = match st { + SrvAppRt::Sync(handler) => { + rttype::sync_main(handler, reporter, None, self.test_mode) + } + SrvAppRt::Tokio(rtbldr, handler) => { + rttype::tokio_main(rtbldr, handler, reporter, None) + } + SrvAppRt::Rocket(handler) => rttype::rocket_main(handler, reporter, None) + }; + + res + } + + /// Run as a Windows service. + #[cfg(windows)] + fn winsvc(self, st: SrvAppRt) -> Result<(), Error> { + winsvc::run(&self.svcname, st)?; + + Ok(()) + } + + /// Run as a foreground server + fn foreground(self, st: SrvAppRt) -> Result<(), Error> { + LumberJack::default().set_init(self.log_init).init()?; + + tracing::debug!("Running service '{}'", self.svcname); + + let reporter = Arc::new(nosvc::ServiceReporter {}); + + match st { + SrvAppRt::Sync(handler) => { + rttype::sync_main(handler, reporter, None, self.test_mode) + } + + #[cfg(feature = "tokio")] + SrvAppRt::Tokio(rtbldr, handler) => { + rttype::tokio_main(rtbldr, handler, reporter, None) + } + + #[cfg(feature = "rocket")] + SrvAppRt::Rocket(handler) => rttype::rocket_main(handler, reporter, None) + } + } +} + +impl RunCtx { + /// Create a new service running context. + pub fn new(name: &str) -> Self { + Self { + service: false, + svcname: name.into(), + log_init: true, + test_mode: false + } + } + + /// Enable test mode. + /// + /// This method is intended for tests only. + /// + /// qsu performs a few global initialization that will fail if run repeatedly + /// within the same process. This causes some problem when running tests, + /// because rust may run tests in threads within the same process. + #[doc(hidden)] + pub fn test_mode(mut self) -> Self { + self.log_init = false; + self.test_mode = true; + self + } + + /// Disable logging/tracing initialization. + /// + /// This is useful in tests because tests may run in different threads within + /// the same process, causing the log/tracing initialization to panic. + #[doc(hidden)] + pub fn log_init(mut self, flag: bool) -> Self { + self.log_init = flag; + self + } + + /// Reference version of [`RunCtx::log_init()`]. + #[doc(hidden)] + pub fn log_init_ref(&mut self, flag: bool) -> &mut Self { + self.log_init = flag; + self + } + + /// Mark this run context to run under the operating system's subservice, if + /// one is available on this platform. + pub fn service(mut self) -> Self { + self.service = true; + self + } + + /// Mark this run context to run under the operating system's subservice, if + /// one is available on this platform. + pub fn service_ref(&mut self) -> &mut Self { + self.service = true; + self + } + + /// Launch the application. + /// + /// If this `RunCtx` has been marked as a _service_ then it will perform the + /// appropriate service subsystem integration before running the actual + /// server application code. + /// + /// This function must only be called from the main thread of the process, + /// and must be called before any other threads are started. + pub fn run(self, st: SrvAppRt) -> Result<(), Error> { + if self.service { + #[cfg(all(target_os = "linux", feature = "systemd"))] + self.systemd(st)?; + + #[cfg(windows)] + self.winsvc(st)?; + + // ToDo: We should check for other platforms here (like macOS/launchd) + } else { + // Do not run against any specific service subsystem. Despite its name + // this isn't necessarily running as a foreground process; some service + // subsystems do not make a distinction. Perhaps a better mental model + // is that certain service subsystems expects to run regular "foreground" + // processes. + self.foreground(st)?; + } + + Ok(()) + } + + /// Convenience method around [`Self::run()`] using [`SrvAppRt::Sync`]. + pub fn run_sync( + self, + handler: Box + ) -> Result<(), Error> { + self.run(SrvAppRt::Sync(handler)) + } + + /// Convenience method around [`Self::run()`] using [`SrvAppRt::Tokio`]. + #[cfg(feature = "tokio")] + #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] + pub fn run_tokio( + self, + rtbldr: Option, + handler: Box + ) -> Result<(), Error> { + self.run(SrvAppRt::Tokio(rtbldr, handler)) + } + + /// Convenience method around [`Self::run()`] using [`SrvAppRt::Rocket`]. + #[cfg(feature = "rocket")] + #[cfg_attr(docsrs, doc(cfg(feature = "rocket")))] + pub fn run_rocket( + self, + handler: Box + ) -> Result<(), Error> { + self.run(SrvAppRt::Rocket(handler)) + } +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/rt/nosvc.rs Index: src/rt/nosvc.rs ================================================================== --- /dev/null +++ src/rt/nosvc.rs @@ -0,0 +1,23 @@ +//! Not a service module. +//! +//! It might seem odd to have a no-service in a service library, which is true. +//! But it simplifies to service aplication code to be able to treat all cases +//! (roughly) equally. + +use super::StateMsg; + +/// A no-op service reporter, used when no service subsystem is being used, or +/// that subsystem does not require/support any incoming notifications. +pub struct ServiceReporter {} + +impl super::StateReporter for ServiceReporter { + fn starting(&self, _checkpoint: u32, _status: Option) {} + + fn started(&self) {} + + fn stopping(&self, _checkpoint: u32, _status: Option) {} + + fn stopped(&self) {} +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/rt/rttype.rs Index: src/rt/rttype.rs ================================================================== --- /dev/null +++ src/rt/rttype.rs @@ -0,0 +1,26 @@ +//! Runtime types. +//! +//! This module is used to collect the various supported "runtime types" or +//! "run contexts". + +mod sync; + +#[cfg(feature = "rocket")] +#[cfg_attr(docsrs, doc(cfg(feature = "rocket")))] +mod rocket; + +#[cfg(feature = "tokio")] +#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] +mod tokio; + +pub(crate) use sync::sync_main; + +#[cfg(feature = "rocket")] +#[cfg_attr(docsrs, doc(cfg(feature = "rocket")))] +pub(crate) use rocket::rocket_main; + +#[cfg(feature = "tokio")] +#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] +pub(crate) use tokio::tokio_main; + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/rt/rttype/rocket.rs Index: src/rt/rttype/rocket.rs ================================================================== --- /dev/null +++ src/rt/rttype/rocket.rs @@ -0,0 +1,222 @@ +//! Rocket runtime module. +//! +//! While Rocket uses tokio, it [Rocket] insists on initializing tokio for +//! itself. Attempting to use the `TokioServiceHandler` will cause `Rocket`s +//! to issue a warning at startup. +//! +//! As a convenience _qsu_ can keep track of rockets and automatically shut +//! them down once the service subsystem requests a shutdown. To use this +//! feature, the server application should return a `Vec>` from +//! `RocketServiceHandler::init()`. Any `Rocket` instance in this vec will be +//! ignited before being passed to `RocketServiceHandler::run()`. +//! +//! Server applications do not need to use this feature and should return an +//! empty vector from `init()` in this case. This also requires the +//! application code to trigger a shutdown of each instance itself. + +use std::sync::{atomic::AtomicU32, Arc}; + +use tokio::{sync::broadcast, task}; + +use killswitch::KillSwitch; + +use crate::{ + err::{AppErrors, Error}, + rt::{ + signals, RocketServiceHandler, StartState, StateReporter, StopState, + SvcEvt, SvcEvtReader + } +}; + + +/// Internal "main()" routine for server applications that run one or more +/// Rockets as their main application. +pub(crate) fn rocket_main( + handler: Box, + sr: Arc, + rx_svcevt: Option> +) -> Result<(), Error> { + rocket::execute(rocket_async_main(handler, sr, rx_svcevt))?; + + Ok(()) +} + +async fn rocket_async_main( + mut handler: Box, + sr: Arc, + rx_svcevt: Option> +) -> Result<(), Error> { + let ks = KillSwitch::new(); + + // If a SvcEvt receiver end-point was handed to us, then use it. Otherwise + // create our own and spawn the monitoring tasks that will generate events + // for it. + let rx_svcevt = if let Some(rx_svcevt) = rx_svcevt { + rx_svcevt + } else { + // Create channel used to signal events to application + let (tx, rx) = broadcast::channel(16); + + let ks2 = ks.clone(); + + // SIGINT (on unix) and Ctrl+C on Windows should trigger a Shutdown event. + let txc = tx.clone(); + task::spawn(signals::wait_shutdown( + move || { + if let Err(e) = txc.send(SvcEvt::Shutdown) { + log::error!("Unable to send SvcEvt::Shutdown event; {}", e); + } + }, + ks2 + )); + + // SIGTERM (on unix) and Ctrl+Break/Close on Windows should trigger a + // Terminate event. + let txc = tx.clone(); + let ks2 = ks.clone(); + task::spawn(signals::wait_term( + move || { + if let Err(e) = txc.send(SvcEvt::Terminate) { + log::error!("Unable to send SvcEvt::Terminate event; {}", e); + } + }, + ks2 + )); + + // There doesn't seem to be anything equivalent to SIGHUP for Windows + // (Services) + #[cfg(unix)] + { + let ks2 = ks.clone(); + + let txc = tx.clone(); + task::spawn(signals::wait_reload( + move || { + if let Err(e) = txc.send(SvcEvt::ReloadConf) { + log::error!("Unable to send SvcEvt::ReloadConf event; {}", e); + } + }, + ks2 + )); + } + + rx + }; + + let mut rx_svcevt2 = rx_svcevt.resubscribe(); + + let set = Box::new(SvcEvtReader { rx: rx_svcevt }); + + // Call application's init() method. + let ss = StartState { + sr: Arc::clone(&sr), + cnt: Arc::new(AtomicU32::new(2)) // 1 is used by the runtime, so start at 2 + }; + let (rockets, init_apperr) = match handler.init(ss).await { + Ok(rockets) => (rockets, None), + Err(e) => (Vec::new(), Some(e)) + }; + + // Ignite rockets so we can get Shutdown contexts for each of the instances. + let mut ignited = vec![]; + let mut rocket_shutdowns = vec![]; + for rocket in rockets { + let rocket = rocket.ignite().await?; + rocket_shutdowns.push(rocket.shutdown()); + ignited.push(rocket); + } + + // Set the service's state to "started" + sr.started(); + + // Launch a task that waits for the SvtEvt::Shutdown event. Once it + // arrives, tell all rocket instances to gracefully shutdown. + // + // Note: We don't want to use the killswitch for this because the killswitch + // isn't triggered until run() has returned, and we might want the graceful + // shutdown to be the cause of the graceful shutdowns. + let jh_graceful_landing = task::spawn(async move { + loop { + match rx_svcevt2.recv().await { + Ok(SvcEvt::Shutdown) => { + tracing::trace!("Ask rocket instances to shut down gracefully"); + for shutdown in rocket_shutdowns { + // Tell this rocket instance to shut down gracefully. + shutdown.notify(); + } + break; + } + Ok(SvcEvt::Terminate) => { + tracing::trace!("Ask rocket instances to shut down gracefully"); + for shutdown in rocket_shutdowns { + // Tell this rocket instance to shut down gracefully. + shutdown.notify(); + } + break; + } + Ok(_) => { + tracing::trace!("Ignored message in wask waiting for shutdown"); + continue; + } + Err(e) => { + log::error!("Unable to receive broadcast SvcEvt message, {}", e); + break; + } + } + } + }); + + let run_apperr = if init_apperr.is_none() { + sr.started(); + handler.run(ignited, *set).await.err() + } else { + None + }; + + + // Always send the first shutdown checkpoint here. Either init() failed or + // run retuned. Either way, we're shutting down. + sr.stopping(1, None); + + // Now that the main application has terminated kill off any remaining + // auxiliary tasks (read: signal waiters) + ks.trigger(); + + // .. and wait for all task that is waiting for a shutdown event to complete + if let Err(e) = jh_graceful_landing.await { + log::warn!( + "An error was returned from the graceful landing task; {}", + e + ); + } + + if (ks.finalize().await).is_err() { + log::warn!("Attempted to finalize KillSwitch that wasn't triggered yet"); + } + + // Call the application's shutdown() function. + let ss = StopState { + sr: Arc::clone(&sr), + cnt: Arc::new(AtomicU32::new(2)) // 1 is used by the runtime, so start at 2 + }; + let term_apperr = handler.shutdown(ss).await.err(); + + // Inform the service subsystem that the the shutdown is complete + sr.stopped(); + + // There can be multiple failures, and we don't want to lose information + // about what went wrong, so return an error context that can contain all + // callback errors. + if init_apperr.is_some() || run_apperr.is_some() || term_apperr.is_some() { + let apperrs = AppErrors { + init: init_apperr, + run: run_apperr, + shutdown: term_apperr + }; + Err(Error::SrvApp(apperrs))?; + } + + Ok(()) +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/rt/rttype/sync.rs Index: src/rt/rttype/sync.rs ================================================================== --- /dev/null +++ src/rt/rttype/sync.rs @@ -0,0 +1,112 @@ +use std::sync::{atomic::AtomicU32, Arc}; + +use tokio::sync::broadcast; + +use crate::{ + err::{AppErrors, Error}, + rt::{ + signals, ServiceHandler, StartState, StateReporter, StopState, SvcEvt, + SvcEvtReader + } +}; + +#[cfg(unix)] +use crate::rt::signals::SigType; + +/// Internal "main()" routine for server applications that run plain old +/// non-`async` code. +pub(crate) fn sync_main( + mut handler: Box, + sr: Arc, + rx_svcevt: Option>, + test_mode: bool +) -> Result<(), Error> { + // Get rid of unused variable warning + #[cfg(unix)] + let _ = test_mode; + + let rx_svcevt = if let Some(rx_svcevt) = rx_svcevt { + // Use the broadcast receiver supplied by caller (it likely originates from + // a service runtime integration). + rx_svcevt + } else { + let (tx, rx) = broadcast::channel(16); + + #[cfg(unix)] + signals::sync_sigmon(move |st| match st { + SigType::Int => { + if let Err(e) = tx.send(SvcEvt::Shutdown) { + log::error!("Unable to send SvcEvt::Shutdown event; {}", e); + } + } + SigType::Term => { + if let Err(e) = tx.send(SvcEvt::Terminate) { + log::error!("Unable to send SvcEvt::Terminate event; {}", e); + } + } + SigType::Hup => { + if let Err(e) = tx.send(SvcEvt::ReloadConf) { + log::error!("Unable to send SvcEvt::ReloadConf event; {}", e); + } + } + })?; + + // On Windows, if rx_svcevt is None, means we're not running under the + // service subsystem (i.e. we're running as a foreground process), so + // register a Ctrl+C handler. + #[cfg(windows)] + signals::sync_kill_to_event(tx, test_mode)?; + + rx + }; + + let set = Box::new(SvcEvtReader { rx: rx_svcevt }); + + // Call server application's init() method, passing along a startup state + // reporter object. + let ss = StartState { + sr: Arc::clone(&sr), + cnt: Arc::new(AtomicU32::new(2)) // 1 is used by the runtime, so start at 2 + }; + let init_apperr = handler.init(ss).err(); + + // If init() was successful, set the service's state to "started" and then + // call the server application's run() method. + let run_apperr = if init_apperr.is_none() { + sr.started(); + handler.run(*set).err() + } else { + None + }; + + // Always send the first shutdown checkpoint here. Either init() failed or + // run retuned. Either way, we're shutting down. + sr.stopping(1, None); + + // Call the application's shutdown() function, passing along a shutdown state + // reporter object. + let ss = StopState { + sr: Arc::clone(&sr), + cnt: Arc::new(AtomicU32::new(2)) // 1 is used by the runtime, so start at 2 + }; + let term_apperr = handler.shutdown(ss).err(); + + // Inform the service subsystem that the the shutdown is complete + sr.stopped(); + + // There can be multiple failures, and we don't want to lose information + // about what went wrong, so return an error context that can contain all + // callback errors. + if init_apperr.is_some() || run_apperr.is_some() || term_apperr.is_some() { + let apperrs = AppErrors { + init: init_apperr, + run: run_apperr, + shutdown: term_apperr + }; + Err(Error::SrvApp(apperrs))?; + } + + Ok(()) +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/rt/rttype/tokio.rs Index: src/rt/rttype/tokio.rs ================================================================== --- /dev/null +++ src/rt/rttype/tokio.rs @@ -0,0 +1,155 @@ +use std::sync::{atomic::AtomicU32, Arc}; + +use tokio::{runtime, sync::broadcast, task}; + +use crate::{ + err::{AppErrors, Error}, + rt::{ + signals, StartState, StateReporter, StopState, SvcEvt, SvcEvtReader, + TokioServiceHandler + } +}; + +use killswitch::KillSwitch; + + +/// Internal "main()" routine for server applications that run the tokio +/// runtime for `async` code. +pub(crate) fn tokio_main( + rtbldr: Option, + handler: Box, + sr: Arc, + rx_svcevt: Option> +) -> Result<(), Error> { + let rt = if let Some(mut bldr) = rtbldr { + bldr.build()? + } else { + tokio::runtime::Runtime::new()? + }; + rt.block_on(tokio_async_main(handler, sr, rx_svcevt))?; + + Ok(()) +} + +/// The `async` main function for tokio servers. +/// +/// If `rx_svcevt` is `Some(_)` it means the channel was created elsewhere +/// (implied: The transmitting endpoint lives somewhere else). If it is `None` +/// the channel needs to be created. +async fn tokio_async_main( + mut handler: Box, + sr: Arc, + rx_svcevt: Option> +) -> Result<(), Error> { + let ks = KillSwitch::new(); + + // If a SvcEvt receiver end-point was handed to us, then use it. Otherwise + // create our own and spawn the monitoring tasks that will generate events + // for it. + let rx_svcevt = if let Some(rx_svcevt) = rx_svcevt { + rx_svcevt + } else { + // Create channel used to signal events to application + let (tx, rx) = broadcast::channel(16); + + let ks2 = ks.clone(); + + // SIGINT (on unix) and Ctrl+C on Windows should trigger a Shutdown event. + let txc = tx.clone(); + task::spawn(signals::wait_shutdown( + move || { + if let Err(e) = txc.send(SvcEvt::Shutdown) { + log::error!("Unable to send SvcEvt::ReloadConf event; {}", e); + } + }, + ks2 + )); + + // SIGTERM (on unix) and Ctrl+Break/Close on Windows should trigger a + // Terminate event. + let txc = tx.clone(); + let ks2 = ks.clone(); + task::spawn(signals::wait_term( + move || { + if let Err(e) = txc.send(SvcEvt::Terminate) { + log::error!("Unable to send SvcEvt::Terminate event; {}", e); + } + }, + ks2 + )); + + // There doesn't seem to be anything equivalent to SIGHUP for Windows + // (Services) + #[cfg(unix)] + { + let ks2 = ks.clone(); + + let txc = tx.clone(); + task::spawn(signals::wait_reload( + move || { + if let Err(e) = txc.send(SvcEvt::ReloadConf) { + log::error!("Unable to send SvcEvt::ReloadConf event; {}", e); + } + }, + ks2 + )); + } + + rx + }; + + let set = Box::new(SvcEvtReader { rx: rx_svcevt }); + + // Call application's init() method. + let ss = StartState { + sr: Arc::clone(&sr), + cnt: Arc::new(AtomicU32::new(2)) // 1 is used by the runtime, so start at 2 + }; + let init_apperr = handler.init(ss).await.err(); + + let run_apperr = if init_apperr.is_none() { + sr.started(); + handler.run(*set).await.err() + } else { + None + }; + + // Always send the first shutdown checkpoint here. Either init() failed or + // run retuned. Either way, we're shutting down. + sr.stopping(1, None); + + + // Now that the main application has terminated kill off any remaining + // auxiliary tasks (read: signal waiters) + ks.trigger(); + + if (ks.finalize().await).is_err() { + log::warn!("Attempted to finalize KillSwitch that wasn't triggered yet"); + } + + // Call the application's shutdown() function. + let ss = StopState { + sr: Arc::clone(&sr), + cnt: Arc::new(AtomicU32::new(2)) // 1 is used by the runtime, so start at 2 + }; + let term_apperr = handler.shutdown(ss).await.err(); + + // Inform the service subsystem that the the shutdown is complete + sr.stopped(); + + // There can be multiple failures, and we don't want to lose information + // about what went wrong, so return an error context that can contain all + // callback errors. + if init_apperr.is_some() || run_apperr.is_some() || term_apperr.is_some() { + let apperrs = AppErrors { + init: init_apperr, + run: run_apperr, + shutdown: term_apperr + }; + Err(Error::SrvApp(apperrs))?; + } + + Ok(()) +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/rt/signals.rs Index: src/rt/signals.rs ================================================================== --- /dev/null +++ src/rt/signals.rs @@ -0,0 +1,21 @@ +//! Signal monitoring. + +#[cfg(unix)] +mod unix; + +#[cfg(windows)] +mod win; + +#[cfg(unix)] +pub use unix::{sync_sigmon, SigType}; + +#[cfg(all(unix, feature = "tokio"))] +pub use unix::{wait_reload, wait_shutdown, wait_term}; + +#[cfg(windows)] +pub use win::{wait_shutdown, wait_term}; + +#[cfg(windows)] +pub(crate) use win::sync_kill_to_event; + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/rt/signals/unix.rs Index: src/rt/signals/unix.rs ================================================================== --- /dev/null +++ src/rt/signals/unix.rs @@ -0,0 +1,167 @@ +use std::thread; + +use nix::sys::signal::{SigSet, SigmaskHow, Signal}; + +#[cfg(feature = "tokio")] +use tokio::signal::unix::{signal, SignalKind}; + +#[cfg(feature = "tokio")] +use killswitch::KillSwitch; + +use crate::err::Error; + + +/// Async task used to wait for SIGINT/SIGTERM. +/// +/// Whenever a SIGINT or SIGTERM is signalled the closure in `f` is called and +/// the task is terminated. +#[cfg(feature = "tokio")] +pub async fn wait_shutdown(f: F, ks: KillSwitch) +where + F: FnOnce() +{ + tracing::trace!("SIGINT task launched"); + + let Ok(mut sigint) = signal(SignalKind::interrupt()) else { + log::error!("Unable to create SIGINT Future"); + return; + }; + + // Wait for SIGINT. + tokio::select! { + _ = sigint.recv() => { + tracing::debug!("Received SIGINT -- running closure"); + f(); + }, + _ = ks.wait() => { + tracing::debug!("killswitch triggered"); + } + } + + tracing::trace!("wait_shutdown() terminating"); +} + +#[cfg(feature = "tokio")] +pub async fn wait_term(f: F, ks: KillSwitch) +where + F: FnOnce() +{ + tracing::trace!("SIGTERM task launched"); + + let Ok(mut sigterm) = signal(SignalKind::terminate()) else { + log::error!("Unable to create SIGTERM Future"); + return; + }; + + // Wait for either SIGTERM. + tokio::select! { + _ = sigterm.recv() => { + tracing::debug!("Received SIGTERM -- running closure"); + f(); + } + _ = ks.wait() => { + tracing::debug!("killswitch triggered"); + } + } + + tracing::trace!("wait_term() terminating"); +} + +/// Async task used to wait for SIGHUP +/// +/// Whenever a SIGHUP is signalled the closure in `f` is called. +#[cfg(feature = "tokio")] +pub async fn wait_reload(f: F, ks: KillSwitch) +where + F: Fn() +{ + tracing::trace!("SIGHUP task launched"); + + let Ok(mut sighup) = signal(SignalKind::hangup()) else { + log::error!("Unable to create SIGHUP Future"); + return; + }; + loop { + tokio::select! { + _ = sighup.recv() => { + tracing::debug!("Received SIGHUP"); + f(); + }, + _ = ks.wait() => { + tracing::debug!("killswitch triggered"); + break; + } + } + } + + tracing::trace!("wait_reload() terminating"); +} + + +pub enum SigType { + Int, + Term, + Hup +} + +pub fn sync_sigmon(f: F) -> Result, Error> +where + F: Fn(SigType) + Send + 'static +{ + // + // Block signals-of-interest on main thread. + // + let mut ss = SigSet::empty(); + ss.add(Signal::SIGINT); + ss.add(Signal::SIGTERM); + ss.add(Signal::SIGHUP); + + let mut oldset = SigSet::empty(); + nix::sys::signal::pthread_sigmask( + SigmaskHow::SIG_SETMASK, + Some(&ss), + Some(&mut oldset) + ) + .unwrap(); + + let jh = thread::Builder::new() + .name("sigmon".into()) + .spawn(move || { + // Note: Don't need to unblock signals in this thread, because sigwait() + // does it implicitly. + let mask = unsafe { + let mut mask: libc::sigset_t = std::mem::zeroed(); + libc::sigemptyset(&mut mask); + libc::sigaddset(&mut mask, libc::SIGINT); + libc::sigaddset(&mut mask, libc::SIGTERM); + libc::sigaddset(&mut mask, libc::SIGHUP); + mask + }; + + loop { + let mut sig: libc::c_int = 0; + let ret = unsafe { libc::sigwait(&mask, &mut sig) }; + if ret == 0 { + let signal = Signal::try_from(sig).unwrap(); + match signal { + Signal::SIGINT => { + f(SigType::Int); + break; + } + Signal::SIGTERM => { + f(SigType::Term); + break; + } + Signal::SIGHUP => { + f(SigType::Hup); + } + _ => {} + } + } + } + })?; + + Ok(jh) +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/rt/signals/win.rs Index: src/rt/signals/win.rs ================================================================== --- /dev/null +++ src/rt/signals/win.rs @@ -0,0 +1,160 @@ +use std::sync::OnceLock; + +use tokio::{signal, sync::broadcast}; + +use windows_sys::Win32::{ + Foundation::{BOOL, FALSE, TRUE}, + System::Console::{ + SetConsoleCtrlHandler, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_C_EVENT + } +}; + +use killswitch::KillSwitch; + +use crate::{err::Error, rt::SvcEvt}; + + +static CELL: OnceLock BOOL + Send + Sync>> = + OnceLock::new(); + +/// Async task used to wait for Ctrl+C to be signalled. +/// +/// Whenever a Ctrl+C is signalled the closure in `f` is called and +/// the task is terminated. +pub async fn wait_shutdown(f: F, ks: KillSwitch) +where + F: FnOnce() +{ + tracing::trace!("CTRL+C task launched"); + + tokio::select! { + _ = signal::ctrl_c() => { + tracing::debug!("Received Ctrl+C"); + // Once any process termination signal has been received post call the + // callback. + f(); + }, + _ = ks.wait() => { + tracing::debug!("killswitch triggered"); + } + } + + tracing::trace!("wait_shutdown() terminating"); +} + +pub async fn wait_term(f: F, ks: KillSwitch) +where + F: FnOnce() +{ + tracing::trace!("CTRL+Break/Close task launched"); + + let Ok(mut cbreak) = signal::windows::ctrl_break() else { + log::error!("Unable to create Ctrl+Break monitor"); + return; + }; + + let Ok(mut cclose) = signal::windows::ctrl_close() else { + log::error!("Unable to create Close monitor"); + return; + }; + + tokio::select! { + _ = cbreak.recv() => { + tracing::debug!("Received Ctrl+Break"); + // Once any process termination signal has been received post call the + // callback. + f(); + }, + _ = cclose.recv() => { + tracing::debug!("Received Close"); + // Once any process termination signal has been received post call the + // callback. + f(); + }, + _ = ks.wait() => { + tracing::debug!("killswitch triggered"); + } + } + + tracing::trace!("wait_term() terminating"); +} + + +pub(crate) fn sync_kill_to_event( + tx: broadcast::Sender, + test_mode: bool +) -> Result<(), Error> { + setup_sync_fg_kill_handler( + move |ty| { + match ty { + CTRL_C_EVENT => { + tracing::trace!( + "Received some kind of event that should trigger a shutdown." + ); + if tx.send(SvcEvt::Shutdown).is_ok() { + // We handled this event + TRUE + } else { + FALSE + } + } + CTRL_BREAK_EVENT | CTRL_CLOSE_EVENT => { + tracing::trace!( + "Received some kind of event that should trigger a termination." + ); + if tx.send(SvcEvt::Terminate).is_ok() { + // We handled this event + TRUE + } else { + FALSE + } + } + _ => FALSE + } + }, + test_mode + )?; + Ok(()) +} + + +pub(crate) fn setup_sync_fg_kill_handler( + f: F, + test_mode: bool +) -> Result<(), Error> +where + F: Fn(u32) -> BOOL + Send + Sync + 'static +{ + // The proper way to do this is to use CELL.set(), because this can only + // be done once for each process. Tests may run in the same process (on + // separate threads), which will cause globals like this to initialized + // multiple times (which is bad). + // + // The workaround is to use get_or_init() instead, but only do it if test + // mode has been requested. + if test_mode { + let _ = CELL.get_or_init(|| Box::new(f)); + } else { + CELL + .set(Box::new(f)) + .map_err(|_| Error::internal("Unable to set shared OnceLock cell"))?; + } + + let rc = unsafe { SetConsoleCtrlHandler(Some(ctrlhandler), 1) }; + // Returns non-zero on success + (rc != 0) + .then_some(()) + .ok_or(Error::internal("SetConsoleCtrlHandler failed"))?; + + Ok(()) +} + +unsafe extern "system" fn ctrlhandler(ty: u32) -> BOOL { + let Some(f) = CELL.get() else { + return FALSE; + }; + + f(ty) +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/rt/systemd.rs Index: src/rt/systemd.rs ================================================================== --- /dev/null +++ src/rt/systemd.rs @@ -0,0 +1,56 @@ +//! systemd service module. +//! +//! Implements systemd-specific service subsystem interactions. + +use sd_notify::NotifyState; + +use super::StateMsg; + +/// A service reporter that sends notifications to systemd. +pub struct ServiceReporter {} + +impl super::StateReporter for ServiceReporter { + fn starting(&self, checkpoint: u32, status: Option) { + let text = if let Some(msg) = status { + format!("Starting[{}] {}", checkpoint, msg.as_ref()) + } else { + format!("Startup checkpoint {}", checkpoint) + }; + + if let Err(e) = sd_notify::notify(false, &[NotifyState::Status(&text)]) { + log::error!("Unable to report service started state; {}", e); + } + } + + fn started(&self) { + if let Err(e) = sd_notify::notify(false, &[NotifyState::Ready]) { + log::error!("Unable to report service started state; {}", e); + } + } + + fn stopping(&self, checkpoint: u32, status: Option) { + // For systemd, the notification is merely that stopping has been + // initiated. + // ToDo: First checkpoint is 1? + if checkpoint == 0 { + if let Err(e) = sd_notify::notify(false, &[NotifyState::Stopping]) { + log::error!("Unable to report service started state; {}", e); + } + } + + let text = if let Some(msg) = status { + format!("Stopping[{}] {}", checkpoint, msg.as_ref()) + } else { + format!("Stopping checkpoint {}", checkpoint) + }; + + // ToDo: Is it okay to set status after "Stopping" has been set? + if let Err(e) = sd_notify::notify(false, &[NotifyState::Status(&text)]) { + log::error!("Unable to report service started state; {}", e); + } + } + + fn stopped(&self) {} +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/rt/winsvc.rs Index: src/rt/winsvc.rs ================================================================== --- /dev/null +++ src/rt/winsvc.rs @@ -0,0 +1,513 @@ +//! Windows service module. + +use std::{ + ffi::OsString, + sync::{Arc, OnceLock}, + thread, + time::Duration +}; + +use parking_lot::Mutex; + +use tokio::sync::{ + broadcast, + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + oneshot +}; + +use windows_service::{ + define_windows_service, + service::{ + ServiceControl, ServiceControlAccept, ServiceExitCode, ServiceState, + ServiceStatus, ServiceType + }, + service_control_handler::{ + self, ServiceControlHandlerResult, ServiceStatusHandle + }, + service_dispatcher +}; + +use winreg::{enums::*, RegKey}; + +#[cfg(feature = "wait-for-debugger")] +use dbgtools_win::debugger; + + +use crate::{ + err::Error, + lumberjack::LumberJack, + rt::{SrvAppRt, SvcEvt} +}; + +use super::StateMsg; + + +const SERVICE_TYPE: ServiceType = ServiceType::OWN_PROCESS; +//const SERVICE_STARTPENDING_TIME: Duration = Duration::from_secs(10); +const SERVICE_STARTPENDING_TIME: Duration = Duration::from_secs(300); +const SERVICE_STOPPENDING_TIME: Duration = Duration::from_secs(30); + + +/// Messages that are sent to the service subsystem thread from the +/// application. +enum ToSvcMsg { + Starting(u32), + Started, + Stopping(u32), + Stopped +} + +/// Buffer passed from main thread to service subsystem thread via global +/// `OnceLock`. +pub(crate) struct Xfer { + svcname: String, + + /// Used to send handhake message from the service handler. + tx_fromsvc: oneshot::Sender> +} + +/// Used as a "bridge" send information to service thread. +static CELL: OnceLock>> = OnceLock::new(); + + +/// Buffer passed back to the application thread from the service subsystem +/// thread. +struct HandshakeMsg { + /// Channel end-point used to send messages to the service subsystem. + tx: UnboundedSender, + + /// Channel end-point used to receive messages from the service subsystem. + rx: broadcast::Receiver +} + + +/// A service reporter that forwards application state information to the +/// windows service subsystem. +pub struct ServiceReporter { + tx: UnboundedSender +} + +impl super::StateReporter for ServiceReporter { + fn starting(&self, checkpoint: u32, status: Option) { + if let Err(e) = self.tx.send(ToSvcMsg::Starting(checkpoint)) { + log::error!("Unable to send Starting message; {}", e); + } + if let Some(status) = status { + log::trace!("Starting[{}] {}", checkpoint, status.as_ref()); + } + } + + fn started(&self) { + if let Err(e) = self.tx.send(ToSvcMsg::Started) { + log::error!("Unable to send Started message; {}", e); + } + log::trace!("Started"); + } + + fn stopping(&self, checkpoint: u32, status: Option) { + if let Err(e) = self.tx.send(ToSvcMsg::Stopping(checkpoint)) { + log::error!("Unable to send Stopping message; {}", e); + } + if let Some(status) = status { + log::trace!("Starting[{}] {}", checkpoint, status.as_ref()); + } + } + + fn stopped(&self) { + if let Err(e) = self.tx.send(ToSvcMsg::Stopped) { + log::error!("Unable to send Stopped message; {}", e); + } + log::trace!("Stopped"); + } +} + + +pub fn run(svcname: &str, st: SrvAppRt) -> Result<(), Error> { + #[cfg(feature = "wait-for-debugger")] + { + debugger::wait_for_then_break(); + debugger::output("Hello, debugger"); + } + + // Create a one-shot channel used to receive a an initial handshake from the + // service handler thread. + let (tx_fromsvc, rx_fromsvc) = oneshot::channel(); + + // Create a buffer that will be used to transfer data to the service + // subsystem's callback function. + let xfer = Xfer { + svcname: svcname.into(), + tx_fromsvc + }; + + // Store Xfer buffer in the shared state (so the service handler thread can + // take it out). + // This must be done _before_ launching the application runtime thread below. + CELL.get_or_init(|| Mutex::new(Some(xfer))); + + // Launch main application thread. + // + // The server application must be run on its own thread because the service + // dispatcher call below will block the thread. + let jh = thread::Builder::new() + .name("svcapp".into()) + .spawn(move || srvapp_thread(st, rx_fromsvc))?; + + // Register generated `ffi_service_main` with the system and start the + // service, blocking this thread until the service is stopped. + service_dispatcher::start(svcname, ffi_service_main)?; + + match jh.join() { + Ok(_) => Ok(()), + Err(e) => *e + .downcast::>() + .expect("Unable to downcast error from svcapp thread") + } +} + +fn srvapp_thread( + st: SrvAppRt, + rx_fromsvc: oneshot::Receiver> +) -> Result<(), Error> { + // Wait for the service subsystem to report that it has initialized. + // It passes along a channel end-point that can be used to send events to + // the service manager. + + let Ok(res) = rx_fromsvc.blocking_recv() else { + panic!("Unable to receive handshake"); + }; + + let Ok(HandshakeMsg { tx, rx }) = res else { + panic!("Unable to receive handshake"); + }; + + let reporter = Arc::new(ServiceReporter { tx: tx.clone() }); + + match st { + SrvAppRt::Sync(handler) => { + // Don't support test mode when running as a windows service + crate::rt::rttype::sync_main(handler, reporter, Some(rx), false) + } + #[cfg(feature = "tokio")] + SrvAppRt::Tokio(rtbldr, handler) => { + crate::rt::rttype::tokio_main(rtbldr, handler, reporter, Some(rx)) + } + #[cfg(feature = "rocket")] + SrvAppRt::Rocket(handler) => { + crate::rt::rttype::rocket_main(handler, reporter, Some(rx)) + } + } +} + + +// Generate the windows service boilerplate. The boilerplate contains the +// low-level service entry function (ffi_service_main) that parses incoming +// service arguments into Vec and passes them to user defined service +// entry (my_service_main). +define_windows_service!(ffi_service_main, my_service_main); + +fn take_shared_buffer() -> Xfer { + let Some(x) = CELL.get() else { + panic!("Unable to get shared buffer"); + }; + x.lock().take().unwrap() +} + +/// The `Ok()` return value from [`svcinit()`]. +struct InitRes { + /// Value returned to the server application thread. + handshake_reply: HandshakeMsg, + + rx_tosvc: UnboundedReceiver, + + status_handle: ServiceStatusHandle +} + +fn my_service_main(_arguments: Vec) { + // Start by pulling out the service name and the channel sender. + let Xfer { + svcname, + tx_fromsvc + } = take_shared_buffer(); + + match svcinit(&svcname) { + Ok(InitRes { + handshake_reply, + rx_tosvc, + status_handle + }) => { + // If svcinit() returned Ok(), it should have initialized logging. + + // Return Ok() to main server app thread so it will kick off the main + // server application. + if tx_fromsvc.send(Ok(handshake_reply)).is_err() { + log::error!("Unable to send handshake message"); + return; + } + + // Enter a loop that waits to receive a service termination event. + if let Err(e) = svcloop(rx_tosvc, status_handle) { + log::error!("The service loop failed; {}", e); + } + } + Err(e) => { + // If svcinit() returns Err() we don't actually know if logging has been + // enabled yet -- but we can't do much other than hope that it is and try + // to output an error log. + // ToDo: If dbgtools-win is used, then we should output to the debugger. + if tx_fromsvc.send(Err(e)).is_err() { + log::error!("Unable to send handshake message"); + } + } + } +} + + +fn svcinit(svcname: &str) -> Result { + // Set up logging *before* telling sending SvcRunning to caller + // ToDo: Respect request not to initialize logging + LumberJack::from_winsvc(svcname)?.init()?; + + + // If the service has a WorkDir configured under it's Parameters subkey, then + // retreive it and attempt to change directory to it. + // This must be done _before_ sending the HandskageMsg back to the service + // main thread. + // ToDo: Need proper error handling: + // - If the Paramters subkey can not be loaded, do we abort? + // - If the cwd can not be changed to the WorkDir we should abort. + if let Ok(svcparams) = get_service_params_subkey(svcname) { + if let Ok(wd) = svcparams.get_value::("WorkDir") { + std::env::set_current_dir(wd).map_err(|e| { + Error::internal(format!("Unable to switch to WorkDir; {}", e)) + })?; + } + } + + // Create channel that will be used to receive messages from the application. + let (tx_tosvc, rx_tosvc) = unbounded_channel(); + + // Create channel that will be used to send messages to the application. + let (tx_svcevt, rx_svcevt) = broadcast::channel(16); + + // + // Define system service event handler that will be receiving service events. + // + let event_handler = move |control_event| -> ServiceControlHandlerResult { + match control_event { + ServiceControl::Interrogate => { + log::debug!("svc signal recieved: interrogate"); + // Notifies a service to report its current status information to the + // service control manager. Always return NoError even if not + // implemented. + ServiceControlHandlerResult::NoError + } + ServiceControl::Stop => { + log::debug!("svc signal recieved: stop"); + + // Message application that it's time to shutdown + if let Err(e) = tx_svcevt.send(SvcEvt::Shutdown) { + log::error!("Unable to send SvcEvt::Shutdown from winsvc; {}", e); + } + + ServiceControlHandlerResult::NoError + } + ServiceControl::Continue => { + log::debug!("svc signal recieved: continue"); + ServiceControlHandlerResult::NotImplemented + } + ServiceControl::Pause => { + log::debug!("svc signal recieved: pause"); + ServiceControlHandlerResult::NotImplemented + } + _ => { + log::debug!("svc signal recieved: other"); + ServiceControlHandlerResult::NotImplemented + } + } + }; + + + let status_handle = + service_control_handler::register(svcname, event_handler)?; + + if let Err(e) = status_handle.set_service_status(ServiceStatus { + service_type: SERVICE_TYPE, + current_state: ServiceState::StartPending, + controls_accepted: ServiceControlAccept::empty(), + exit_code: ServiceExitCode::Win32(0), + checkpoint: 0, + wait_hint: SERVICE_STARTPENDING_TIME, + process_id: None + }) { + log::error!( + "Unable to set the sevice status to 'start pending 0'; {}", + e + ); + Err(e)?; + } + + Ok(InitRes { + handshake_reply: HandshakeMsg { + tx: tx_tosvc, + rx: rx_svcevt + }, + rx_tosvc, + status_handle + }) +} + +fn svcloop( + mut rx_tosvc: UnboundedReceiver, + status_handle: ServiceStatusHandle +) -> Result<(), Error> { + // + // Enter loop that waits for application state changes that should be + // reported to the service subsystem. + // Once the application reports that it has stopped, then break out of the + // loop. + // + tracing::trace!("enter app state monitoring loop"); + loop { + match rx_tosvc.blocking_recv() { + Some(ev) => { + match ev { + ToSvcMsg::Starting(checkpoint) => { + log::debug!("app reported that it is running"); + if let Err(e) = status_handle.set_service_status(ServiceStatus { + service_type: SERVICE_TYPE, + current_state: ServiceState::StartPending, + controls_accepted: ServiceControlAccept::empty(), + exit_code: ServiceExitCode::Win32(0), + checkpoint, + wait_hint: SERVICE_STARTPENDING_TIME, + process_id: None + }) { + log::error!( + "Unable to set service status to 'start pending {}'; {}", + checkpoint, + e + ); + } + } + ToSvcMsg::Started => { + if let Err(e) = status_handle.set_service_status(ServiceStatus { + service_type: SERVICE_TYPE, + current_state: ServiceState::Running, + controls_accepted: ServiceControlAccept::STOP, + exit_code: ServiceExitCode::Win32(0), + checkpoint: 0, + wait_hint: Duration::default(), + process_id: None + }) { + log::error!("Unable to set service status to 'started'; {}", e); + } + } + ToSvcMsg::Stopping(checkpoint) => { + log::debug!("app is shutting down"); + if let Err(e) = status_handle.set_service_status(ServiceStatus { + service_type: SERVICE_TYPE, + current_state: ServiceState::StopPending, + controls_accepted: ServiceControlAccept::empty(), + exit_code: ServiceExitCode::Win32(0), + checkpoint, + wait_hint: SERVICE_STOPPENDING_TIME, + process_id: None + }) { + log::error!( + "Unable to set service status to 'stop pending {}'; {}", + checkpoint, + e + ); + } + } + ToSvcMsg::Stopped => { + if let Err(e) = status_handle.set_service_status(ServiceStatus { + service_type: SERVICE_TYPE, + current_state: ServiceState::Stopped, + controls_accepted: ServiceControlAccept::empty(), + exit_code: ServiceExitCode::Win32(0), + checkpoint: 0, + wait_hint: Duration::default(), + process_id: None + }) { + log::error!("Unable to set service status to 'stopped'; {}", e); + } + + // Break out of loop to terminate service subsystem + break; + } + } + } + None => { + // All the sender halves have been deallocated + log::error!("Sender endpoints unexpectedly disappeared"); + break; + } + } + } + + tracing::trace!("service terminated"); + + Ok(()) +} + + +const SVCPATH: &str = "SYSTEM\\CurrentControlSet\\Services"; +const PARAMS: &str = "Parameters"; + + +pub fn read_service_subkey( + service_name: &str +) -> Result { + let hklm = RegKey::predef(HKEY_LOCAL_MACHINE); + let services = hklm.open_subkey(SVCPATH)?; + let subkey = services.open_subkey(service_name)?; + Ok(subkey) +} + +pub fn write_service_subkey( + service_name: &str +) -> Result { + let hklm = RegKey::predef(HKEY_LOCAL_MACHINE); + let services = hklm.open_subkey(SVCPATH)?; + let subkey = + services.open_subkey_with_flags(service_name, winreg::enums::KEY_WRITE)?; + Ok(subkey) +} + +/// Create a Parameters subkey for a service. +pub fn create_service_params( + service_name: &str +) -> Result { + let hklm = RegKey::predef(HKEY_LOCAL_MACHINE); + let services = hklm.open_subkey(SVCPATH)?; + let asrv = services.open_subkey(service_name)?; + let (subkey, _disp) = asrv.create_subkey(PARAMS)?; + + Ok(subkey) +} + +/// Create a Parameters subkey for a service. +pub fn get_service_params_subkey( + service_name: &str +) -> Result { + let hklm = RegKey::predef(HKEY_LOCAL_MACHINE); + let services = hklm.open_subkey(SVCPATH)?; + let asrv = services.open_subkey(service_name)?; + let subkey = asrv.open_subkey(PARAMS)?; + + Ok(subkey) +} + +/// Load a service Parameter from the registry. +pub fn get_service_param(service_name: &str) -> Result { + let hklm = RegKey::predef(HKEY_LOCAL_MACHINE); + let services = hklm.open_subkey(SVCPATH)?; + let asrv = services.open_subkey(service_name)?; + let params = asrv.open_subkey(PARAMS)?; + + Ok(params) +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : DELETED src/rttype.rs Index: src/rttype.rs ================================================================== --- src/rttype.rs +++ /dev/null @@ -1,20 +0,0 @@ -//! Runtime types. -//! -//! This module is used to collect the various supported "runtime types" or -//! "run contexts". - -#[cfg(feature = "rocket")] -#[cfg_attr(docsrs, doc(cfg(feature = "rocket")))] -mod rocket; - -mod sync; -mod tokio; - -#[cfg(feature = "rocket")] -#[cfg_attr(docsrs, doc(cfg(feature = "rocket")))] -pub(crate) use rocket::rocket_main; - -pub(crate) use sync::sync_main; -pub(crate) use tokio::tokio_main; - -// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : DELETED src/rttype/rocket.rs Index: src/rttype/rocket.rs ================================================================== --- src/rttype/rocket.rs +++ /dev/null @@ -1,196 +0,0 @@ -//! Rocket runtime module. -//! -//! While Rocket uses tokio, it [Rocket] insists on initializing tokio for -//! itself. Attempting to use the `TokioServiceHandler` will cause `Rocket`s -//! to issue a warning at startup. -//! -//! As a convenience _qsu_ can keep track of rockets and automatically shut -//! them down once the service subsystem requests a shutdown. To use this -//! feature, the server application should return a `Vec>` from -//! `RocketServiceHandler::init()`. Any `Rocket` instance in this vec will be -//! ignited before being passed to `RocketServiceHandler::run()`. -//! -//! Server applications do not need to use this feature and should return an -//! empty vector from `init()` in this case. This also requires the -//! application code to trigger a shutdown of each instance itself. - -use std::sync::Arc; - -use tokio::{sync::broadcast, task}; - -use killswitch::KillSwitch; - -use crate::{ - err::Error, signals, RocketServiceHandler, StartState, StateReporter, - StopState, SvcEvt, SvcEvtReader -}; - - -pub(crate) fn rocket_main( - handler: Box, - sr: Arc, - rx_svcevt: Option> -) -> Result<(), Error> { - rocket::execute(rocket_async_main(handler, sr, rx_svcevt))?; - - Ok(()) -} - -async fn rocket_async_main( - mut handler: Box, - sr: Arc, - rx_svcevt: Option> -) -> Result<(), Error> { - let ks = KillSwitch::new(); - - // If a SvcEvt receiver end-point was handed to us, then use it. Otherwise - // create our own and spawn the monitoring tasks that will generate events - // for it. - let rx_svcevt = if let Some(rx_svcevt) = rx_svcevt { - rx_svcevt - } else { - // Create channel used to signal events to application - let (tx, rx) = broadcast::channel(16); - - let ks2 = ks.clone(); - - // SIGINT (on unix) and Ctrl+C on Windows should trigger a Shutdown event. - let txc = tx.clone(); - task::spawn(signals::wait_shutdown( - move || { - if let Err(e) = txc.send(SvcEvt::Shutdown) { - log::error!("Unable to send SvcEvt::Shutdown event; {}", e); - } - }, - ks2 - )); - - // SIGTERM (on unix) and Ctrl+Break/Close on Windows should trigger a - // Terminate event. - let txc = tx.clone(); - let ks2 = ks.clone(); - task::spawn(signals::wait_term( - move || { - if let Err(e) = txc.send(SvcEvt::Terminate) { - log::error!("Unable to send SvcEvt::Terminate event; {}", e); - } - }, - ks2 - )); - - // There doesn't seem to be anything equivalent to SIGHUP for Windows - // (Services) - #[cfg(unix)] - { - let ks2 = ks.clone(); - - let txc = tx.clone(); - task::spawn(signals::wait_reload( - move || { - if let Err(e) = txc.send(SvcEvt::ReloadConf) { - log::error!("Unable to send SvcEvt::ReloadConf event; {}", e); - } - }, - ks2 - )); - } - - rx - }; - - let mut rx_svcevt2 = rx_svcevt.resubscribe(); - - let set = Box::new(SvcEvtReader { rx: rx_svcevt }); - - // Call application's init() method. - let ss = StartState { - sr: Arc::clone(&sr) - }; - let rockets = handler.init(ss).await?; - - // Set the service's state to "started" - sr.started(); - - // Ignite rockets so we can get Shutdown contexts for each of the instances - // (so we can tell them to - let mut ignited = vec![]; - let mut rocket_shutdowns = vec![]; - for rocket in rockets { - let rocket = rocket.ignite().await?; - rocket_shutdowns.push(rocket.shutdown()); - ignited.push(rocket); - } - - // Launch a task that waits for the SvtEvt::Shutdown event. Once it - // arrives, tell all rocket instances to gracefully shutdown. - // - // Note: We don't want to use the killswitch for this because the killswitch - // isn't triggered until run() has returned, and we might want the graceful - // shutdown to cause the graceful shutdowns. - let jh_graceful_landing = task::spawn(async move { - loop { - match rx_svcevt2.recv().await { - Ok(SvcEvt::Shutdown) => { - tracing::trace!("Ask rocket instances to shut down gracefully"); - for shutdown in rocket_shutdowns { - // Tell this rocket instance to shut down gracefully. - shutdown.notify(); - } - break; - } - Ok(SvcEvt::Terminate) => { - tracing::trace!("Ask rocket instances to shut down gracefully"); - for shutdown in rocket_shutdowns { - // Tell this rocket instance to shut down gracefully. - shutdown.notify(); - } - break; - } - Ok(_) => { - tracing::trace!("Ignored message in wask waiting for shutdown"); - continue; - } - Err(e) => { - log::error!("Unable to receive broadcast SvcEvt message, {}", e); - break; - } - } - } - }); - - // Call the application's main application function. - if let Err(e) = handler.run(ignited, *set).await { - log::error!("Service application returned error; {}", e); - } - - // Now that the main application has terminated kill off any remaining - // auxiliary tasks (read: signal waiters) - ks.trigger(); - - // .. and wait for - if let Err(e) = jh_graceful_landing.await { - log::warn!( - "An error was returned from the graceful landing task; {}", - e - ); - } - - if let Err(_) = ks.finalize().await { - log::warn!("Attempted to finalize KillSwitch that wasn't triggered yet"); - } - - // Call the application's shutdown() function. - let ss = StopState { - sr: Arc::clone(&sr) - }; - if let Err(e) = handler.shutdown(ss).await { - log::error!("Service shutdown handler returned error; {}", e); - } - - // Inform the service subsystem that the the shutdown is complete - sr.stopped(); - - Ok(()) -} - -// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : DELETED src/rttype/sync.rs Index: src/rttype/sync.rs ================================================================== --- src/rttype/sync.rs +++ /dev/null @@ -1,143 +0,0 @@ -use std::sync::Arc; - -#[cfg(unix)] -use std::thread; - -use tokio::sync::broadcast; - -#[cfg(unix)] -use nix::sys::signal::{SigSet, SigmaskHow, Signal}; - -use crate::{ - err::Error, ServiceHandler, StartState, StateReporter, StopState, SvcEvt, - SvcEvtReader -}; - -// ToDo: Set up a signal handling so we can catch SIGINT, SIGTERM and SIGHUP in -// sync/blocking land as well. -pub(crate) fn sync_main( - mut handler: Box, - sr: Arc, - rx_svcevt: Option> -) -> Result<(), Error> { - let rx_svcevt = if let Some(rx_svcevt) = rx_svcevt { - rx_svcevt - } else { - let (tx, rx) = broadcast::channel(16); - - #[cfg(unix)] - init_signals(tx)?; - - // On Windows, if rx_svcevt is None, means we're not running under the - // service subsystem (i.e. we're running as a foreground process), so - // register a Ctrl+C handler. - #[cfg(windows)] - crate::signals::sync_kill_to_event(tx)?; - - rx - }; - - let set = Box::new(SvcEvtReader { rx: rx_svcevt }); - - // Call application's init() method. - let ss = StartState { - sr: Arc::clone(&sr) - }; - handler.init(ss)?; - - // Set the service's state to "started" - sr.started(); - - // Call the application's main application function. - if let Err(e) = handler.run(*set) { - log::error!("Service application returned error; {}", e); - } - - // Call the application's shutdown() function. - let ss = StopState { - sr: Arc::clone(&sr) - }; - if let Err(e) = handler.shutdown(ss) { - log::error!("Service shutdown handler returned error; {}", e); - } - - // Inform the service subsystem that the the shutdown is complete - sr.stopped(); - - Ok(()) -} - - -#[cfg(unix)] -/// Set up signal management. -/// -/// Block SIGINT, SIGTERM and SIGHUP then launch a thread to catch them and -/// turn them into messages instead. -/// -/// This function must be called on the main thread. -fn init_signals( - tx_svcevt: broadcast::Sender -) -> Result, Error> { - // - // Block signals-of-interest on main thread. - // - let mut ss = SigSet::empty(); - ss.add(Signal::SIGINT); - ss.add(Signal::SIGTERM); - ss.add(Signal::SIGHUP); - - let mut oldset = SigSet::empty(); - nix::sys::signal::pthread_sigmask( - SigmaskHow::SIG_SETMASK, - Some(&ss), - Some(&mut oldset) - ) - .unwrap(); - - let jh = thread::Builder::new() - .name("sigmon".into()) - .spawn(move || { - // Note: Don't need to unblock signals in this thread, because sigwait() - // does it implicitly. - let mask = unsafe { - let mut mask: libc::sigset_t = std::mem::zeroed(); - libc::sigemptyset(&mut mask); - libc::sigaddset(&mut mask, libc::SIGINT); - libc::sigaddset(&mut mask, libc::SIGTERM); - libc::sigaddset(&mut mask, libc::SIGHUP); - mask - }; - - loop { - let mut sig: libc::c_int = 0; - let ret = unsafe { libc::sigwait(&mask, &mut sig) }; - if ret == 0 { - let signal = Signal::try_from(sig).unwrap(); - match signal { - Signal::SIGINT => { - if let Err(e) = tx_svcevt.send(SvcEvt::Shutdown) { - log::error!("Unable to send SvcEvt::Shutdown event; {}", e); - } - break; - } - Signal::SIGTERM => { - if let Err(e) = tx_svcevt.send(SvcEvt::Terminate) { - log::error!("Unable to send SvcEvt::Terminate event; {}", e); - } - break; - } - Signal::SIGHUP => { - if let Err(e) = tx_svcevt.send(SvcEvt::ReloadConf) { - log::error!("Unable to send SvcEvt::ReloadConf event; {}", e); - } - } - _ => {} - } - } - } - })?; - - Ok(jh) -} - -// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : DELETED src/rttype/tokio.rs Index: src/rttype/tokio.rs ================================================================== --- src/rttype/tokio.rs +++ /dev/null @@ -1,133 +0,0 @@ -use std::sync::Arc; - -use tokio::{runtime, sync::broadcast, task}; - -use crate::{ - err::Error, signals, StartState, StateReporter, StopState, SvcEvt, - SvcEvtReader, TokioServiceHandler -}; - -use killswitch::KillSwitch; - -pub(crate) fn tokio_main( - rtbldr: Option, - handler: Box, - sr: Arc, - rx_svcevt: Option> -) -> Result<(), Error> { - let rt = if let Some(mut bldr) = rtbldr { - bldr.build()? - } else { - tokio::runtime::Runtime::new()? - }; - rt.block_on(tokio_async_main(handler, sr, rx_svcevt))?; - - Ok(()) -} - -/// The `async` main function for tokio servers. -/// -/// If `rx_svcevt` is `Some(_)` it means the channel was created elsewhere -/// (implied: The transmitting endpoint lives somewhere else). If it is `None` -/// the channel needs to be created. -async fn tokio_async_main( - mut handler: Box, - sr: Arc, - rx_svcevt: Option> -) -> Result<(), Error> { - let ks = KillSwitch::new(); - - // If a SvcEvt receiver end-point was handed to us, then use it. Otherwise - // create our own and spawn the monitoring tasks that will generate events - // for it. - let rx_svcevt = if let Some(rx_svcevt) = rx_svcevt { - rx_svcevt - } else { - // Create channel used to signal events to application - let (tx, rx) = broadcast::channel(16); - - let ks2 = ks.clone(); - - // SIGINT (on unix) and Ctrl+C on Windows should trigger a Shutdown event. - let txc = tx.clone(); - task::spawn(signals::wait_shutdown( - move || { - if let Err(e) = txc.send(SvcEvt::Shutdown) { - log::error!("Unable to send SvcEvt::ReloadConf event; {}", e); - } - }, - ks2 - )); - - // SIGTERM (on unix) and Ctrl+Break/Close on Windows should trigger a - // Terminate event. - let txc = tx.clone(); - let ks2 = ks.clone(); - task::spawn(signals::wait_term( - move || { - if let Err(e) = txc.send(SvcEvt::Terminate) { - log::error!("Unable to send SvcEvt::Terminate event; {}", e); - } - }, - ks2 - )); - - // There doesn't seem to be anything equivalent to SIGHUP for Windows - // (Services) - #[cfg(unix)] - { - let ks2 = ks.clone(); - - let txc = tx.clone(); - task::spawn(signals::wait_reload( - move || { - if let Err(e) = txc.send(SvcEvt::ReloadConf) { - log::error!("Unable to send SvcEvt::ReloadConf event; {}", e); - } - }, - ks2 - )); - } - - rx - }; - - let set = Box::new(SvcEvtReader { rx: rx_svcevt }); - - // Call application's init() method. - let ss = StartState { - sr: Arc::clone(&sr) - }; - handler.init(ss).await?; - - // Set the service's state to "started" - sr.started(); - - // Call the application's main application function. - if let Err(e) = handler.run(*set).await { - log::error!("Service application returned error; {}", e); - } - - // Now that the main application has terminated kill off any remaining - // auxiliary tasks (read: signal waiters) - ks.trigger(); - - if (ks.finalize().await).is_err() { - log::warn!("Attempted to finalize KillSwitch that wasn't triggered yet"); - } - - // Call the application's shutdown() function. - let ss = StopState { - sr: Arc::clone(&sr) - }; - if let Err(e) = handler.shutdown(ss).await { - log::error!("Service shutdown handler returned error; {}", e); - } - - // Inform the service subsystem that the the shutdown is complete - sr.stopped(); - - Ok(()) -} - -// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : DELETED src/signals.rs Index: src/signals.rs ================================================================== --- src/signals.rs +++ /dev/null @@ -1,18 +0,0 @@ -//! Signal monitoring. - -#[cfg(unix)] -mod unix; - -#[cfg(windows)] -mod win; - -#[cfg(unix)] -pub use unix::{wait_reload, wait_shutdown, wait_term}; - -#[cfg(windows)] -pub use win::{wait_shutdown, wait_term}; - -#[cfg(windows)] -pub(crate) use win::sync_kill_to_event; - -// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : DELETED src/signals/unix.rs Index: src/signals/unix.rs ================================================================== --- src/signals/unix.rs +++ /dev/null @@ -1,88 +0,0 @@ -use tokio::signal::unix::{signal, SignalKind}; - -use killswitch::KillSwitch; - -/// Async task used to wait for SIGINT/SIGTERM. -/// -/// Whenever a SIGINT or SIGTERM is signalled the closure in `f` is called and -/// the task is terminated. -pub async fn wait_shutdown(f: F, ks: KillSwitch) -where - F: FnOnce() -{ - tracing::trace!("SIGINT task launched"); - - let Ok(mut sigint) = signal(SignalKind::interrupt()) else { - log::error!("Unable to create SIGINT Future"); - return; - }; - - // Wait for SIGINT. - tokio::select! { - _ = sigint.recv() => { - tracing::debug!("Received SIGINT -- running closure"); - f(); - }, - _ = ks.wait() => { - tracing::debug!("killswitch triggered"); - } - } - - tracing::trace!("wait_shutdown() terminating"); -} - -pub async fn wait_term(f: F, ks: KillSwitch) -where - F: FnOnce() -{ - tracing::trace!("SIGTERM task launched"); - - let Ok(mut sigterm) = signal(SignalKind::terminate()) else { - log::error!("Unable to create SIGTERM Future"); - return; - }; - - // Wait for either SIGTERM. - tokio::select! { - _ = sigterm.recv() => { - tracing::debug!("Received SIGTERM -- running closure"); - f(); - } - _ = ks.wait() => { - tracing::debug!("killswitch triggered"); - } - } - - tracing::trace!("wait_term() terminating"); -} - -/// Async task used to wait for SIGHUP -/// -/// Whenever a SIGHUP is signalled the closure in `f` is called. -pub async fn wait_reload(f: F, ks: KillSwitch) -where - F: Fn() -{ - tracing::trace!("SIGHUP task launched"); - - let Ok(mut sighup) = signal(SignalKind::hangup()) else { - log::error!("Unable to create SIGHUP Future"); - return; - }; - loop { - tokio::select! { - _ = sighup.recv() => { - tracing::debug!("Received SIGHUP"); - f(); - }, - _ = ks.wait() => { - tracing::debug!("killswitch triggered"); - break; - } - } - } - - tracing::trace!("wait_reload() terminating"); -} - -// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : DELETED src/signals/win.rs Index: src/signals/win.rs ================================================================== --- src/signals/win.rs +++ /dev/null @@ -1,141 +0,0 @@ -use std::sync::OnceLock; - -use tokio::{signal, sync::broadcast}; - -use windows_sys::Win32::{ - Foundation::{BOOL, FALSE, TRUE}, - System::Console::{ - SetConsoleCtrlHandler, CTRL_BREAK_EVENT, CTRL_CLOSE_EVENT, CTRL_C_EVENT - } -}; - -use killswitch::KillSwitch; - -use crate::{err::Error, SvcEvt}; - - -static CELL: OnceLock BOOL + Send + Sync>> = - OnceLock::new(); - -/// Async task used to wait for Ctrl+C to be signalled. -/// -/// Whenever a Ctrl+C is signalled the closure in `f` is called and -/// the task is terminated. -pub async fn wait_shutdown(f: F, ks: KillSwitch) -where - F: FnOnce() -{ - tracing::trace!("CTRL+C task launched"); - - tokio::select! { - _ = signal::ctrl_c() => { - tracing::debug!("Received Ctrl+C"); - // Once any process termination signal has been received post call the - // callback. - f(); - }, - _ = ks.wait() => { - tracing::debug!("killswitch triggered"); - } - } - - tracing::trace!("wait_shutdown() terminating"); -} - -pub async fn wait_term(f: F, ks: KillSwitch) -where - F: FnOnce() -{ - tracing::trace!("CTRL+Break/Close task launched"); - - let Ok(mut cbreak) = signal::windows::ctrl_break() else { - log::error!("Unable to create Ctrl+Break monitor"); - return; - }; - - let Ok(mut cclose) = signal::windows::ctrl_close() else { - log::error!("Unable to create Close monitor"); - return; - }; - - tokio::select! { - _ = cbreak.recv() => { - tracing::debug!("Received Ctrl+Break"); - // Once any process termination signal has been received post call the - // callback. - f(); - }, - _ = cclose.recv() => { - tracing::debug!("Received Close"); - // Once any process termination signal has been received post call the - // callback. - f(); - }, - _ = ks.wait() => { - tracing::debug!("killswitch triggered"); - } - } - - tracing::trace!("wait_term() terminating"); -} - - -pub(crate) fn sync_kill_to_event( - tx: broadcast::Sender -) -> Result<(), Error> { - setup_sync_fg_kill_handler(move |ty| { - match ty { - CTRL_C_EVENT => { - tracing::trace!( - "Received some kind of event that should trigger a shutdown." - ); - if tx.send(SvcEvt::Shutdown).is_ok() { - // We handled this event - TRUE - } else { - FALSE - } - } - CTRL_BREAK_EVENT | CTRL_CLOSE_EVENT => { - tracing::trace!( - "Received some kind of event that should trigger a termination." - ); - if tx.send(SvcEvt::Terminate).is_ok() { - // We handled this event - TRUE - } else { - FALSE - } - } - _ => FALSE - } - })?; - Ok(()) -} - - -pub(crate) fn setup_sync_fg_kill_handler(f: F) -> Result<(), Error> -where - F: Fn(u32) -> BOOL + Send + Sync + 'static -{ - CELL - .set(Box::new(f)) - .map_err(|_| Error::internal("Unable to set shared OnceLock cell"))?; - - let rc = unsafe { SetConsoleCtrlHandler(Some(ctrlhandler), 1) }; - (rc == 0) - .then_some(()) - .ok_or(Error::internal("SetConsoleCtrlHandler failed"))?; - - Ok(()) -} - -unsafe extern "system" fn ctrlhandler(ty: u32) -> BOOL { - let Some(f) = CELL.get() else { - return FALSE; - }; - - f(ty) -} - -// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : DELETED src/systemd.rs Index: src/systemd.rs ================================================================== --- src/systemd.rs +++ /dev/null @@ -1,34 +0,0 @@ -//! systemd service module. -//! -//! Implements systemd-specific service subsystem interactions. - -use sd_notify::NotifyState; - -pub struct ServiceReporter {} - -impl super::StateReporter for ServiceReporter { - fn starting(&self, checkpoint: u32) { - let text = format!("Startup checkpoint {}", checkpoint); - if let Err(e) = sd_notify::notify(false, &[NotifyState::Status(&text)]) { - log::error!("Unable to report service started state; {}", e); - } - } - - fn started(&self) { - if let Err(e) = sd_notify::notify(false, &[NotifyState::Ready]) { - log::error!("Unable to report service started state; {}", e); - } - } - - fn stopping(&self, checkpoint: u32) { - if checkpoint == 0 { - if let Err(e) = sd_notify::notify(false, &[NotifyState::Stopping]) { - log::error!("Unable to report service started state; {}", e); - } - } - } - - fn stopped(&self) {} -} - -// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : DELETED src/winsvc.rs Index: src/winsvc.rs ================================================================== --- src/winsvc.rs +++ /dev/null @@ -1,493 +0,0 @@ -//! Windows service module. - -use std::{ - ffi::OsString, - sync::{Arc, OnceLock}, - thread, - time::Duration -}; - -use parking_lot::Mutex; - -use tokio::sync::{ - broadcast, - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - oneshot -}; - -use windows_service::{ - define_windows_service, - service::{ - ServiceControl, ServiceControlAccept, ServiceExitCode, ServiceState, - ServiceStatus, ServiceType - }, - service_control_handler::{ - self, ServiceControlHandlerResult, ServiceStatusHandle - }, - service_dispatcher -}; - -use winreg::{enums::*, RegKey}; - -#[cfg(feature = "wait-for-debugger")] -use dbgtools_win::debugger; - - -use crate::{err::Error, lumberjack::LumberJack, SvcEvt, SvcType}; - -const SERVICE_TYPE: ServiceType = ServiceType::OWN_PROCESS; -//const SERVICE_STARTPENDING_TIME: Duration = Duration::from_secs(10); -const SERVICE_STARTPENDING_TIME: Duration = Duration::from_secs(300); -const SERVICE_STOPPENDING_TIME: Duration = Duration::from_secs(30); - - -/// Messages that are sent to the service subsystem thread from the -/// application. -enum ToSvcMsg { - Starting(u32), - Started, - Stopping(u32), - Stopped -} - -/// Buffer passed from main thread to service subsystem thread via global -/// `OnceLock`. -pub(crate) struct Xfer { - svcname: String, - - /// Used to send handhake message from the service handler. - tx_fromsvc: oneshot::Sender> -} - -/// Used as a "bridge" send information to service thread. -static CELL: OnceLock>> = OnceLock::new(); - - -/// Buffer passed back to the application thread from the service subsystem -/// thread. -struct HandshakeMsg { - /// Channel end-point used to send messages to the service subsystem. - tx: UnboundedSender, - - /// Channel end-point used to receive messages from the service subsystem. - rx: broadcast::Receiver -} - - -pub struct ServiceReporter { - tx: UnboundedSender -} - -impl super::StateReporter for ServiceReporter { - fn starting(&self, checkpoint: u32) { - if let Err(e) = self.tx.send(ToSvcMsg::Starting(checkpoint)) { - log::error!("Unable to send Starting message; {}", e); - } - } - - fn started(&self) { - if let Err(e) = self.tx.send(ToSvcMsg::Started) { - log::error!("Unable to send Started message; {}", e); - } - } - - fn stopping(&self, checkpoint: u32) { - if let Err(e) = self.tx.send(ToSvcMsg::Stopping(checkpoint)) { - log::error!("Unable to send Stopping message; {}", e); - } - } - - fn stopped(&self) { - if let Err(e) = self.tx.send(ToSvcMsg::Stopped) { - log::error!("Unable to send Stopped message; {}", e); - } - } -} - - -pub fn run(svcname: &str, st: SvcType) -> Result<(), Error> { - #[cfg(feature = "wait-for-debugger")] - { - debugger::wait_for_then_break(); - debugger::output("Hello, debugger"); - } - - // Create a one-shot channel used to receive a an initial handshake from the - // service handler thread. - let (tx_fromsvc, rx_fromsvc) = oneshot::channel(); - - // Create a buffer that will be used to transfer data to the service - // subsystem's callback function. - let xfer = Xfer { - svcname: svcname.into(), - tx_fromsvc - }; - - // Store Xfer buffer in the shared state (so the service handler thread can - // take it out). - // This must be done _before_ launching the application runtime thread below. - CELL.get_or_init(|| Mutex::new(Some(xfer))); - - // Launch main application thread. - // - // The server application must be run on its own thread because the service - // dispatcher call below will block the thread. - let jh = thread::Builder::new() - .name("svcapp".into()) - .spawn(move || srvapp_thread(st, rx_fromsvc))?; - - // Register generated `ffi_service_main` with the system and start the - // service, blocking this thread until the service is stopped. - service_dispatcher::start(svcname, ffi_service_main)?; - - match jh.join() { - Ok(_) => Ok(()), - Err(e) => *e - .downcast::>() - .expect("Unable to downcast error from svcapp thread") - } -} - -fn srvapp_thread( - st: SvcType, - rx_fromsvc: oneshot::Receiver> -) -> Result<(), Error> { - // Wait for the service subsystem to report that it has initialized. - // It passes along a channel end-point that can be used to send events to - // the service manager. - - let Ok(res) = rx_fromsvc.blocking_recv() else { - panic!("Unable to receive handshake"); - }; - - let Ok(HandshakeMsg { tx, rx }) = res else { - panic!("Unable to receive handshake"); - }; - - let reporter = Arc::new(ServiceReporter { tx: tx.clone() }); - - match st { - SvcType::Sync(handler) => { - crate::rttype::sync_main(handler, reporter, Some(rx)) - } - SvcType::Tokio(rtbldr, handler) => { - crate::rttype::tokio_main(rtbldr, handler, reporter, Some(rx)) - } - #[cfg(feature = "rocket")] - SvcType::Rocket(handler) => { - crate::rttype::rocket_main(handler, reporter, Some(rx)) - } - } -} - - -// Generate the windows service boilerplate. The boilerplate contains the -// low-level service entry function (ffi_service_main) that parses incoming -// service arguments into Vec and passes them to user defined service -// entry (my_service_main). -define_windows_service!(ffi_service_main, my_service_main); - -fn take_shared_buffer() -> Xfer { - let Some(x) = CELL.get() else { - panic!("Unable to get shared buffer"); - }; - x.lock().take().unwrap() -} - -/// The `Ok()` return value from [`svcinit()`]. -struct InitRes { - /// Value returned to the server application thread. - handshake_reply: HandshakeMsg, - - rx_tosvc: UnboundedReceiver, - - status_handle: ServiceStatusHandle -} - -fn my_service_main(_arguments: Vec) { - // Start by pulling out the service name and the channel sender. - let Xfer { - svcname, - tx_fromsvc - } = take_shared_buffer(); - - match svcinit(&svcname) { - Ok(InitRes { - handshake_reply, - rx_tosvc, - status_handle - }) => { - // If svcinit() returned Ok(), it should have initialized logging. - - // Return Ok() to main server app thread so it will kick off the main - // server application. - if tx_fromsvc.send(Ok(handshake_reply)).is_err() { - log::error!("Unable to send handshake message"); - return; - } - - // Enter a loop that waits to receive a service termination event. - if let Err(e) = svcloop(rx_tosvc, status_handle) { - log::error!("The service loop failed; {}", e); - } - } - Err(e) => { - // If svcinit() returns Err() we don't actually know if logging has been - // enabled yet -- but we can't do much other than hope that it is and try - // to output an error log. - // ToDo: If dbgtools-win is used, then we should output to the debugger. - if tx_fromsvc.send(Err(e)).is_err() { - log::error!("Unable to send handshake message"); - } - } - } -} - - -fn svcinit(svcname: &str) -> Result { - // Set up logging *before* telling sending SvcRunning to caller - LumberJack::from_winsvc(svcname)?.init()?; - - - // If the service has a WorkDir configured under it's Parameters subkey, then - // retreive it and attempt to change directory to it. - // This must be done _before_ sending the HandskageMsg back to the service - // main thread. - // ToDo: Need proper error handling: - // - If the Paramters subkey can not be loaded, do we abort? - // - If the cwd can not be changed to the WorkDir we should abort. - if let Ok(svcparams) = get_service_params_subkey(svcname) { - if let Ok(wd) = svcparams.get_value::("WorkDir") { - std::env::set_current_dir(wd).map_err(|e| { - Error::internal(format!("Unable to switch to WorkDir; {}", e)) - })?; - } - } - - // Create channel that will be used to receive messages from the application. - let (tx_tosvc, rx_tosvc) = unbounded_channel(); - - // Create channel that will be used to send messages to the application. - let (tx_svcevt, rx_svcevt) = broadcast::channel(16); - - // - // Define system service event handler that will be receiving service events. - // - let event_handler = move |control_event| -> ServiceControlHandlerResult { - match control_event { - ServiceControl::Interrogate => { - log::debug!("svc signal recieved: interrogate"); - // Notifies a service to report its current status information to the - // service control manager. Always return NoError even if not - // implemented. - ServiceControlHandlerResult::NoError - } - ServiceControl::Stop => { - log::debug!("svc signal recieved: stop"); - - // Message application that it's time to shutdown - if let Err(e) = tx_svcevt.send(SvcEvt::Shutdown) { - log::error!("Unable to send SvcEvt::Shutdown from winsvc; {}", e); - } - - ServiceControlHandlerResult::NoError - } - ServiceControl::Continue => { - log::debug!("svc signal recieved: continue"); - ServiceControlHandlerResult::NotImplemented - } - ServiceControl::Pause => { - log::debug!("svc signal recieved: pause"); - ServiceControlHandlerResult::NotImplemented - } - _ => { - log::debug!("svc signal recieved: other"); - ServiceControlHandlerResult::NotImplemented - } - } - }; - - - let status_handle = - service_control_handler::register(svcname, event_handler)?; - - if let Err(e) = status_handle.set_service_status(ServiceStatus { - service_type: SERVICE_TYPE, - current_state: ServiceState::StartPending, - controls_accepted: ServiceControlAccept::empty(), - exit_code: ServiceExitCode::Win32(0), - checkpoint: 0, - wait_hint: SERVICE_STARTPENDING_TIME, - process_id: None - }) { - log::error!( - "Unable to set the sevice status to 'start pending 0'; {}", - e - ); - Err(e)?; - } - - Ok(InitRes { - handshake_reply: HandshakeMsg { - tx: tx_tosvc, - rx: rx_svcevt - }, - rx_tosvc, - status_handle - }) -} - -fn svcloop( - mut rx_tosvc: UnboundedReceiver, - status_handle: ServiceStatusHandle -) -> Result<(), Error> { - // - // Enter loop that waits for application state changes that should be - // reported to the service subsystem. - // Once the application reports that it has stopped, then break out of the - // loop. - // - tracing::trace!("enter app state monitoring loop"); - loop { - match rx_tosvc.blocking_recv() { - Some(ev) => { - match ev { - ToSvcMsg::Starting(checkpoint) => { - log::debug!("app reported that it is running"); - if let Err(e) = status_handle.set_service_status(ServiceStatus { - service_type: SERVICE_TYPE, - current_state: ServiceState::StartPending, - controls_accepted: ServiceControlAccept::empty(), - exit_code: ServiceExitCode::Win32(0), - checkpoint, - wait_hint: SERVICE_STARTPENDING_TIME, - process_id: None - }) { - log::error!( - "Unable to set service status to 'start pending {}'; {}", - checkpoint, - e - ); - } - } - ToSvcMsg::Started => { - if let Err(e) = status_handle.set_service_status(ServiceStatus { - service_type: SERVICE_TYPE, - current_state: ServiceState::Running, - controls_accepted: ServiceControlAccept::STOP, - exit_code: ServiceExitCode::Win32(0), - checkpoint: 0, - wait_hint: Duration::default(), - process_id: None - }) { - log::error!("Unable to set service status to 'started'; {}", e); - } - } - ToSvcMsg::Stopping(checkpoint) => { - log::debug!("app is shutting down"); - if let Err(e) = status_handle.set_service_status(ServiceStatus { - service_type: SERVICE_TYPE, - current_state: ServiceState::StopPending, - controls_accepted: ServiceControlAccept::empty(), - exit_code: ServiceExitCode::Win32(0), - checkpoint, - wait_hint: SERVICE_STOPPENDING_TIME, - process_id: None - }) { - log::error!( - "Unable to set service status to 'stop pending {}'; {}", - checkpoint, - e - ); - } - } - ToSvcMsg::Stopped => { - if let Err(e) = status_handle.set_service_status(ServiceStatus { - service_type: SERVICE_TYPE, - current_state: ServiceState::Stopped, - controls_accepted: ServiceControlAccept::empty(), - exit_code: ServiceExitCode::Win32(0), - checkpoint: 0, - wait_hint: Duration::default(), - process_id: None - }) { - log::error!("Unable to set service status to 'stopped'; {}", e); - } - - // Break out of loop to terminate service subsystem - break; - } - } - } - None => { - // All the sender halves have been deallocated - log::error!("Sender endpoints unexpectedly disappeared"); - break; - } - } - } - - tracing::trace!("service terminated"); - - Ok(()) -} - - -const SVCPATH: &str = "SYSTEM\\CurrentControlSet\\Services"; -const PARAMS: &str = "Parameters"; - - -pub fn read_service_subkey( - service_name: &str -) -> Result { - let hklm = RegKey::predef(HKEY_LOCAL_MACHINE); - let services = hklm.open_subkey(SVCPATH)?; - let subkey = services.open_subkey(service_name)?; - Ok(subkey) -} - -pub fn write_service_subkey( - service_name: &str -) -> Result { - let hklm = RegKey::predef(HKEY_LOCAL_MACHINE); - let services = hklm.open_subkey(SVCPATH)?; - let subkey = - services.open_subkey_with_flags(service_name, winreg::enums::KEY_WRITE)?; - Ok(subkey) -} - -/// Create a Parameters subkey for a service. -pub fn create_service_params( - service_name: &str -) -> Result { - let hklm = RegKey::predef(HKEY_LOCAL_MACHINE); - let services = hklm.open_subkey(SVCPATH)?; - let asrv = services.open_subkey(service_name)?; - let (subkey, _disp) = asrv.create_subkey(PARAMS)?; - - Ok(subkey) -} - -/// Create a Parameters subkey for a service. -pub fn get_service_params_subkey( - service_name: &str -) -> Result { - let hklm = RegKey::predef(HKEY_LOCAL_MACHINE); - let services = hklm.open_subkey(SVCPATH)?; - let asrv = services.open_subkey(service_name)?; - let subkey = asrv.open_subkey(PARAMS)?; - - Ok(subkey) -} - -/// Load a service Parameter from the registry. -pub fn get_service_param(service_name: &str) -> Result { - let hklm = RegKey::predef(HKEY_LOCAL_MACHINE); - let services = hklm.open_subkey(SVCPATH)?; - let asrv = services.open_subkey(service_name)?; - let params = asrv.open_subkey(PARAMS)?; - - Ok(params) -} - -// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED tests/apperr.rs Index: tests/apperr.rs ================================================================== --- /dev/null +++ tests/apperr.rs @@ -0,0 +1,395 @@ +mod apps; +mod err; + +use std::sync::Arc; + +use parking_lot::Mutex; + +use qsu::rt::RunCtx; + +use err::Error; + +const SVCNAME: &str = "svctest"; + +/// Returning an `Err(AppErr)` from `init()` should return it back to the +/// application's initial call to kick off the runtime. +#[test] +fn error_from_sync_init() { + let runctx = RunCtx::new(SVCNAME).test_mode(); + + // Prepare a server application context which keeps track of which callbacks + // have been called + let visited = Arc::new(Mutex::new(apps::Visited::default())); + let handler = Box::new( + apps::MySyncService { + visited: Arc::clone(&visited), + ..Default::default() + } + .fail_init() + ); + + // Call RunCtx::run(), expecting an server application callback error. + let Err(qsu::Error::SrvApp(errs)) = runctx.run_sync(handler) else { + panic!("Not expected Err(qsu::Error::SrvApp(_))"); + }; + + // Verify that only init() failed + assert!(errs.init.is_some()); + assert!(errs.run.is_none()); + assert!(errs.shutdown.is_none()); + + let Error::Hello(s) = errs.init.unwrap().unwrap_inner::() else { + panic!("Not expected Error::Hello"); + }; + assert_eq!(s, "From Sync::init()"); + + // Failing init should cause run() not to be called, but shutdown() should + // sitll be called. + let visited = Arc::into_inner(visited) + .expect("Unable to into_inner Arc") + .into_inner(); + assert!(visited.init); + assert!(!visited.run); + assert!(visited.shutdown); +} + +/// Returning an `Err(AppErr)` from `run()` should return it back to the +/// application's initial call to kick off the runtime. +#[test] +fn error_from_sync_run() { + let runctx = RunCtx::new(SVCNAME).test_mode(); + + // Prepare a server application context which keeps track of which callbacks + // have been called + let visited = Arc::new(Mutex::new(apps::Visited::default())); + let handler = Box::new( + apps::MySyncService { + visited: Arc::clone(&visited), + ..Default::default() + } + .fail_run() + ); + + // Call RunCtx::run(), expecting an server application callback error. + let Err(qsu::Error::SrvApp(errs)) = runctx.run_sync(handler) else { + panic!("Not expected Err(qsu::Error::SrvApp(_))"); + }; + + // Verify that only run() failed + assert!(errs.init.is_none()); + assert!(errs.run.is_some()); + assert!(errs.shutdown.is_none()); + + let Error::Hello(s) = errs.run.unwrap().unwrap_inner::() else { + panic!("Not expected Error::Hello"); + }; + assert_eq!(s, "From Sync::run()"); + + // Failing run should not hinder shutdown() from being called. + let visited = Arc::into_inner(visited) + .expect("Unable to into_inner Arc") + .into_inner(); + assert!(visited.init); + assert!(visited.run); + assert!(visited.shutdown); +} + +/// Returning an `Err(AppErr)` from `run()` should return it back to the +/// application's initial call to kick off the runtime. +#[test] +fn error_from_sync_shutdown() { + let runctx = RunCtx::new(SVCNAME).test_mode(); + + // Prepare a server application context which keeps track of which callbacks + // have been called + let visited = Arc::new(Mutex::new(apps::Visited::default())); + let handler = Box::new( + apps::MySyncService { + visited: Arc::clone(&visited), + ..Default::default() + } + .fail_shutdown() + ); + + // Call RunCtx::run(), expecting an server application callback error. + let Err(qsu::Error::SrvApp(errs)) = runctx.run_sync(handler) else { + panic!("Not expected Err(qsu::Error::SrvApp(_))"); + }; + + // Verify that only shutdown() failed + assert!(errs.init.is_none()); + assert!(errs.run.is_none()); + assert!(errs.shutdown.is_some()); + + let Error::Hello(s) = errs.shutdown.unwrap().unwrap_inner::() else { + panic!("Not expected Error::Hello"); + }; + assert_eq!(s, "From Sync::shutdown()"); + + // All callbacks should have been visited + let visited = Arc::into_inner(visited) + .expect("Unable to into_inner Arc") + .into_inner(); + assert!(visited.init); + assert!(visited.run); + assert!(visited.shutdown); +} + + +/// Returning an `Err(AppErr)` from `init()` should return it back to the +/// application's initial call to kick off the runtime. +#[cfg(feature = "tokio")] +#[test] +fn error_from_tokio_init() { + let runctx = RunCtx::new(SVCNAME).test_mode(); + + // Prepare a server application context which keeps track of which callbacks + // have been called + let visited = Arc::new(Mutex::new(apps::Visited::default())); + let handler = Box::new( + apps::MyTokioService { + visited: Arc::clone(&visited), + ..Default::default() + } + .fail_init() + ); + + // Call RunCtx::run(), expecting an server application callback error. + let Err(qsu::Error::SrvApp(errs)) = runctx.run_tokio(None, handler) else { + panic!("Not expected Err(qsu::Error::SrvApp(_))"); + }; + + // Verify that only init() failed + assert!(errs.init.is_some()); + assert!(errs.run.is_none()); + assert!(errs.shutdown.is_none()); + + let Error::Hello(s) = errs.init.unwrap().unwrap_inner::() else { + panic!("Not expected Error::Hello"); + }; + assert_eq!(s, "From Tokio::init()"); + + // Failing init should cause run() not to be called, but shutdown() should + // sitll be called. + let visited = Arc::into_inner(visited) + .expect("Unable to into_inner Arc") + .into_inner(); + assert!(visited.init); + assert!(!visited.run); + assert!(visited.shutdown); +} + +/// Returning an `Err(AppErr)` from `run()` should return it back to the +/// application's initial call to kick off the runtime. +#[cfg(feature = "tokio")] +#[test] +fn error_from_tokio_run() { + let runctx = RunCtx::new(SVCNAME).test_mode(); + + // Prepare a server application context which keeps track of which callbacks + // have been called + let visited = Arc::new(Mutex::new(apps::Visited::default())); + let handler = Box::new( + apps::MyTokioService { + visited: Arc::clone(&visited), + ..Default::default() + } + .fail_run() + ); + + // Call RunCtx::run(), expecting an server application callback error. + let Err(qsu::Error::SrvApp(errs)) = runctx.run_tokio(None, handler) else { + panic!("Not expected Err(qsu::Error::SrvApp(_))"); + }; + + // Verify that only run() failed + assert!(errs.init.is_none()); + assert!(errs.run.is_some()); + assert!(errs.shutdown.is_none()); + + let Error::Hello(s) = errs.run.unwrap().unwrap_inner::() else { + panic!("Not expected Error::Hello"); + }; + assert_eq!(s, "From Tokio::run()"); + + // Failing run should not hinder shutdown() from being called. + let visited = Arc::into_inner(visited) + .expect("Unable to into_inner Arc") + .into_inner(); + assert!(visited.init); + assert!(visited.run); + assert!(visited.shutdown); +} + + +/// Returning an `Err(AppErr)` from `run()` should return it back to the +/// application's initial call to kick off the runtime. +#[cfg(feature = "tokio")] +#[test] +fn error_from_tokio_shutdown() { + let runctx = RunCtx::new(SVCNAME).test_mode(); + + // Prepare a server application context which keeps track of which callbacks + // have been called + let visited = Arc::new(Mutex::new(apps::Visited::default())); + let handler = Box::new( + apps::MyTokioService { + visited: Arc::clone(&visited), + ..Default::default() + } + .fail_shutdown() + ); + + // Call RunCtx::run(), expecting an server application callback error. + let Err(qsu::Error::SrvApp(errs)) = runctx.run_tokio(None, handler) else { + panic!("Not expected Err(qsu::Error::SrvApp(_))"); + }; + + // Verify that only shutdown() failed + assert!(errs.init.is_none()); + assert!(errs.run.is_none()); + assert!(errs.shutdown.is_some()); + + let Error::Hello(s) = errs.shutdown.unwrap().unwrap_inner::() else { + panic!("Not expected Error::Hello"); + }; + assert_eq!(s, "From Tokio::shutdown()"); + + // All callbacks should have been visited + let visited = Arc::into_inner(visited) + .expect("Unable to into_inner Arc") + .into_inner(); + assert!(visited.init); + assert!(visited.run); + assert!(visited.shutdown); +} + + +/// Returning an `Err(AppErr)` from `init()` should return it back to the +/// application's initial call to kick off the runtime. +#[cfg(feature = "rocket")] +#[test] +fn error_from_rocket_init() { + let runctx = RunCtx::new(SVCNAME).test_mode(); + + // Prepare a server application context which keeps track of which callbacks + // have been called + let visited = Arc::new(Mutex::new(apps::Visited::default())); + let handler = Box::new( + apps::MyRocketService { + visited: Arc::clone(&visited), + ..Default::default() + } + .fail_init() + ); + + // Call RunCtx::run(), expecting an server application callback error. + let Err(qsu::Error::SrvApp(errs)) = runctx.run_rocket(handler) else { + panic!("Not expected Err(qsu::Error::SrvApp(_))"); + }; + + // Verify that only init() failed + assert!(errs.init.is_some()); + assert!(errs.run.is_none()); + assert!(errs.shutdown.is_none()); + + let Error::Hello(s) = errs.init.unwrap().unwrap_inner::() else { + panic!("Not expected Error::Hello"); + }; + assert_eq!(s, "From Rocket::init()"); + + // Failing init should cause run() not to be called, but shutdown() should + // sitll be called. + let visited = Arc::into_inner(visited) + .expect("Unable to into_inner Arc") + .into_inner(); + assert!(visited.init); + assert!(!visited.run); + assert!(visited.shutdown); +} + +/// Returning an `Err(AppErr)` from `run()` should return it back to the +/// application's initial call to kick off the runtime. +#[cfg(feature = "rocket")] +#[test] +fn error_from_rocket_run() { + let runctx = RunCtx::new(SVCNAME).log_init(false); + + // Prepare a server application context which keeps track of which callbacks + // have been called + let visited = Arc::new(Mutex::new(apps::Visited::default())); + let handler = Box::new( + apps::MyRocketService { + visited: Arc::clone(&visited), + ..Default::default() + } + .fail_run() + ); + + // Call RunCtx::run(), expecting an server application callback error. + let Err(qsu::Error::SrvApp(errs)) = runctx.run_rocket(handler) else { + panic!("Not expected Err(qsu::Error::SrvApp(_))"); + }; + + // Verify that only run() failed + assert!(errs.init.is_none()); + assert!(errs.run.is_some()); + assert!(errs.shutdown.is_none()); + + let Error::Hello(s) = errs.run.unwrap().unwrap_inner::() else { + panic!("Not expected Error::Hello"); + }; + assert_eq!(s, "From Rocket::run()"); + + // Failing run should not hinder shutdown() from being called. + let visited = Arc::into_inner(visited) + .expect("Unable to into_inner Arc") + .into_inner(); + assert!(visited.init); + assert!(visited.run); + assert!(visited.shutdown); +} + +/// Returning an `Err(AppErr)` from `run()` should return it back to the +/// application's initial call to kick off the runtime. +#[cfg(feature = "rocket")] +#[test] +fn error_from_rocket_shutdown() { + let runctx = RunCtx::new(SVCNAME).log_init(false); + + // Prepare a server application context which keeps track of which callbacks + // have been called + let visited = Arc::new(Mutex::new(apps::Visited::default())); + let handler = Box::new( + apps::MyRocketService { + visited: Arc::clone(&visited), + ..Default::default() + } + .fail_shutdown() + ); + + // Call RunCtx::run(), expecting an server application callback error. + let Err(qsu::Error::SrvApp(errs)) = runctx.run_rocket(handler) else { + panic!("Not expected Err(qsu::Error::SrvApp(_))"); + }; + + // Verify that only shutdown() failed + assert!(errs.init.is_none()); + assert!(errs.run.is_none()); + assert!(errs.shutdown.is_some()); + + let Error::Hello(s) = errs.shutdown.unwrap().unwrap_inner::() else { + panic!("Not expected Error::Hello"); + }; + assert_eq!(s, "From Rocket::shutdown()"); + + // All callbacks should have been visited + let visited = Arc::into_inner(visited) + .expect("Unable to into_inner Arc") + .into_inner(); + assert!(visited.init); + assert!(visited.run); + assert!(visited.shutdown); +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED tests/apps/mod.rs Index: tests/apps/mod.rs ================================================================== --- /dev/null +++ tests/apps/mod.rs @@ -0,0 +1,211 @@ +use std::sync::Arc; + +use parking_lot::Mutex; + +use qsu::rt::{ServiceHandler, StartState, StopState, SvcEvtReader}; + +#[cfg(feature = "tokio")] +use qsu::rt::TokioServiceHandler; + + +#[cfg(feature = "rocket")] +use qsu::{ + rocket::{Build, Ignite, Rocket}, + rt::RocketServiceHandler +}; + +use crate::err::Error; + +#[derive(Default)] +pub struct FailMode { + init: bool, + run: bool, + shutdown: bool +} + +#[allow(unused)] +impl FailMode { + pub fn init(&mut self) -> &mut Self { + self.init = true; + self + } + pub fn run(&mut self) -> &mut Self { + self.run = true; + self + } + pub fn shutdown(&mut self) -> &mut Self { + self.shutdown = true; + self + } +} + +#[derive(Default)] +pub struct Visited { + pub init: bool, + pub run: bool, + pub shutdown: bool +} + +#[derive(Default)] +pub struct MySyncService { + pub fail: FailMode, + pub visited: Arc> +} + +#[allow(unused)] +impl MySyncService { + pub fn fail_init(mut self) -> Self { + self.fail.init(); + self + } + pub fn fail_run(mut self) -> Self { + self.fail.run(); + self + } + pub fn fail_shutdown(mut self) -> Self { + self.fail.shutdown(); + self + } +} + + +impl ServiceHandler for MySyncService { + fn init(&mut self, _ss: StartState) -> Result<(), qsu::AppErr> { + self.visited.lock().init = true; + if self.fail.init { + Err(Error::hello("From Sync::init()"))?; + } + Ok(()) + } + + fn run(&mut self, _set: SvcEvtReader) -> Result<(), qsu::AppErr> { + self.visited.lock().run = true; + if self.fail.run { + Err(Error::hello("From Sync::run()"))?; + } + Ok(()) + } + + fn shutdown(&mut self, _ss: StopState) -> Result<(), qsu::AppErr> { + self.visited.lock().shutdown = true; + if self.fail.shutdown { + Err(Error::hello("From Sync::shutdown()"))?; + } + Ok(()) + } +} + + +#[cfg(feature = "tokio")] +#[derive(Default)] +pub struct MyTokioService { + pub fail: FailMode, + pub visited: Arc> +} + +#[cfg(feature = "tokio")] +#[allow(unused)] +impl MyTokioService { + pub fn fail_init(mut self) -> Self { + self.fail.init(); + self + } + pub fn fail_run(mut self) -> Self { + self.fail.run(); + self + } + pub fn fail_shutdown(mut self) -> Self { + self.fail.shutdown(); + self + } +} + +#[cfg(feature = "tokio")] +#[qsu::async_trait] +impl TokioServiceHandler for MyTokioService { + async fn init(&mut self, _ss: StartState) -> Result<(), qsu::AppErr> { + self.visited.lock().init = true; + if self.fail.init { + Err(Error::hello("From Tokio::init()"))?; + } + Ok(()) + } + + async fn run(&mut self, _set: SvcEvtReader) -> Result<(), qsu::AppErr> { + self.visited.lock().run = true; + if self.fail.run { + Err(Error::hello("From Tokio::run()"))?; + } + Ok(()) + } + + async fn shutdown(&mut self, _ss: StopState) -> Result<(), qsu::AppErr> { + self.visited.lock().shutdown = true; + if self.fail.shutdown { + Err(Error::hello("From Tokio::shutdown()"))?; + } + Ok(()) + } +} + + +#[cfg(feature = "rocket")] +#[derive(Default)] +pub struct MyRocketService { + pub fail: FailMode, + pub visited: Arc> +} + +#[cfg(feature = "rocket")] +#[allow(unused)] +impl MyRocketService { + pub fn fail_init(mut self) -> Self { + self.fail.init(); + self + } + pub fn fail_run(mut self) -> Self { + self.fail.run(); + self + } + pub fn fail_shutdown(mut self) -> Self { + self.fail.shutdown(); + self + } +} + +#[cfg(feature = "rocket")] +#[qsu::async_trait] +impl RocketServiceHandler for MyRocketService { + async fn init( + &mut self, + _ss: StartState + ) -> Result>, qsu::AppErr> { + self.visited.lock().init = true; + if self.fail.init { + Err(Error::hello("From Rocket::init()"))?; + } + Ok(Vec::new()) + } + + async fn run( + &mut self, + _rockets: Vec>, + _set: SvcEvtReader + ) -> Result<(), qsu::AppErr> { + self.visited.lock().run = true; + if self.fail.run { + Err(Error::hello("From Rocket::run()"))?; + } + Ok(()) + } + + async fn shutdown(&mut self, _ss: StopState) -> Result<(), qsu::AppErr> { + self.visited.lock().shutdown = true; + if self.fail.shutdown { + Err(Error::hello("From Rocket::shutdown()"))?; + } + Ok(()) + } +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED tests/err/mod.rs Index: tests/err/mod.rs ================================================================== --- /dev/null +++ tests/err/mod.rs @@ -0,0 +1,62 @@ +use std::{fmt, io}; + +#[derive(Debug)] +pub enum Error { + Hello(String), + IO(String), + Qsu(String) +} + +impl std::error::Error for Error {} + +impl Error { + pub fn hello(msg: impl ToString) -> Self { + Error::Hello(msg.to_string()) + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::Hello(s) => { + write!(f, "Hello error; {}", s) + } + Error::IO(s) => { + write!(f, "I/O error; {}", s) + } + Error::Qsu(s) => { + write!(f, "qsu error; {}", s) + } + } + } +} + +impl From for Error { + fn from(err: io::Error) -> Self { + Error::IO(err.to_string()) + } +} + +impl From for Error { + fn from(err: qsu::Error) -> Self { + Error::Qsu(err.to_string()) + } +} + +/* +/// Convenience converter used to pass an application-defined errors from the +/// qsu inner runtime back out from the qsu runtime. +impl From for qsu::Error { + fn from(err: Error) -> qsu::Error { + qsu::Error::app(err) + } +} +*/ + +impl From for qsu::AppErr { + fn from(err: Error) -> qsu::AppErr { + qsu::AppErr::new(err) + } +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED tests/initrunshutdown.rs Index: tests/initrunshutdown.rs ================================================================== --- /dev/null +++ tests/initrunshutdown.rs @@ -0,0 +1,108 @@ +mod apps; +mod err; + +use std::sync::Arc; + +use parking_lot::Mutex; + +use qsu::rt::RunCtx; + +const SVCNAME: &str = "svctest"; + +/// Make sure that `init()`, `run()` and `shutdown()` are called for the sync +/// case. +#[test] +fn all_sync_callbacks() { + let runctx = RunCtx::new(SVCNAME).test_mode(); + + let visited = apps::Visited::default(); + + assert!(!visited.init); + assert!(!visited.run); + assert!(!visited.shutdown); + + let visited = Arc::new(Mutex::new(visited)); + + let handler = Box::new(apps::MySyncService { + visited: Arc::clone(&visited), + ..Default::default() + }); + + let Ok(_) = runctx.run_sync(handler) else { + panic!("run_sync() unexpectedly failed"); + }; + + let visited = Arc::into_inner(visited) + .expect("Unable to into_inner Arc") + .into_inner(); + assert!(visited.init); + assert!(visited.run); + assert!(visited.shutdown); +} + +/// Make sure that `init()`, `run()` and `shutdown()` are called for the tokio +/// case. +#[cfg(feature = "tokio")] +#[test] +fn all_tokio_callbacks() { + let runctx = RunCtx::new(SVCNAME).test_mode(); + + let visited = apps::Visited::default(); + + assert!(!visited.init); + assert!(!visited.run); + assert!(!visited.shutdown); + + let visited = Arc::new(Mutex::new(visited)); + + let handler = Box::new(apps::MyTokioService { + visited: Arc::clone(&visited), + ..Default::default() + }); + + let Ok(_) = runctx.run_tokio(None, handler) else { + panic!("run_sync() unexpectedly failed"); + }; + + let visited = Arc::into_inner(visited) + .expect("Unable to into_inner Arc") + .into_inner(); + assert!(visited.init); + assert!(visited.run); + assert!(visited.shutdown); +} + + +/// Make sure that `init()`, `run()` and `shutdown()` are called for the Rocket +/// case. +#[cfg(feature = "rocket")] +#[test] +fn all_rocket_callbacks() { + let runctx = RunCtx::new(SVCNAME).test_mode(); + + let visited = apps::Visited::default(); + + assert!(!visited.init); + assert!(!visited.run); + assert!(!visited.shutdown); + + let visited = Arc::new(Mutex::new(visited)); + + let handler = Box::new(apps::MyRocketService { + visited: Arc::clone(&visited), + ..Default::default() + }); + + let Ok(_) = runctx.run_rocket(handler) else { + panic!("run_sync() unexpectedly failed"); + }; + + let visited = Arc::into_inner(visited) + .expect("Unable to into_inner Arc") + .into_inner(); + assert!(visited.init); + assert!(visited.run); + assert!(visited.shutdown); +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -1,10 +1,51 @@ # Change log ## [Unreleased] ### Added + +### Changed + +### Removed + +--- + +## [0.0.3] - 2023-10-23 + +### Added + +- Introduce an `AppErr` type that can wrap application-specific errors that + the service runtime callbacks return for the `Err()` case. +- Make the `Error::App()` take two values: An `CbOrigin` that is used to + identify which callback returned an error, and an `AppErr` containing the + application-specific error. + +### Changed + +- Make it possible to intsruct LumberJack not to initialize logging/tracing + (because otherwise tests that initialize the _qsu_ runtime will panic). +- Major refactoring. Moved runtime to its own `rt` submodule, and put it + behind a (default) `rt` feature. +- Put the tokio server application runtime behind a `tokio` feature. + Note: qsu still depends on tokio without tokio runtime support (albeit only + with the `sync` feature for channels). + +### Removed + +- `leak_default_service_name()` was removed because it no longer serves a + purpose. +- The `signals` module is no longer public. (It still exists, but is + considered an implementation detail). + + + +--- + +## [0.0.2] - 2023-10-19 + +### Added - Added some optional clap integration convenience functionality, that can be enabled using the 'clap' feature. - Added `SvcEvt::Terminate`. - Argument parser allows setting default service logging/tracing settings when @@ -17,13 +58,11 @@ - SIGTERM/Ctrl+Break/Close sends `SvcEvt::Terminate` rather than `SvcEvt::Shutdown`. - `eventlog` errors are mapped to `Error::LumberJack` (instead of `Error::SubSystem`). -### Removed - --- ## [0.0.1] - 2023-10-15 Initial release. Index: www/index.md ================================================================== --- www/index.md +++ www/index.md @@ -77,10 +77,20 @@ variables `TRACE_FILE` and `TRACE_LEVEL`. Both these must be configured in the registry to enable tracing. - Logging through `log` will log to the Windows Events Log. - Logging using `trace` will write trace logs to a file. + +## When to use it, and when not to use it + +To be frank, if you're writing a systemd-only service, then the value of using +_qsu_ is negligible (or it might even be wasteful to pull in _qsu_). The +benefits of using _qsu_ will be noticed mostly when targeting the Windows +Services subsystem. But mostly the benefits become apparent when targetting +multiple service subsystems in the same project, and wanting to have a similar +API when developing non-async and async services. + ## Feature labels in documentation The crate's documentation uses automatically generated feature labels, which currently requires nightly featuers. To build the documentation locally use: @@ -104,21 +114,23 @@ logs and traces every 30 seconds until the service is terminated. - `hellosvc-tokio` is the same as `hellosvc`, but is an `async` server that runs on top of tokio. - `hellosvc-rocket` is a Rocket server that writes logs and traces each time a request it made to the index page. +- The [staticrocket](https://crates.io/crates/staticrocket) crate uses qsu. + In particular it implements the `Rocket` service handler, and it adds a + custom application-specific subcommand to the `ArgParser`. ## Change log The details of changes can always be found in the timeline, but for a high-level view of changes between released versions there's a manually maintained [Change Log](./changelog.md). - ## Project status This crate is a work-in-progress, still in early prototyping stage. It works for basic use-cases, but the API and some of the semantics are likely to -change. The error handling needs work. +change.