Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "sqlsrv" -version = "0.0.4" +version = "0.1.0" edition = "2021" license = "0BSD" categories = [ "database" ] keywords = [ "sqlite", "server" ] repository = "https://repos.qrnch.tech/pub/sqlsrv" @@ -10,21 +10,27 @@ rust-version = "1.56" exclude = [ ".fossil-settings", ".efiles", ".fslckout", + "build_docs.sh", "examples", "www", "rustfmt.toml" ] +[features] +default = ["tpool"] +tpool = ["dep:swctx", "dep:threadpool"] + [dependencies] parking_lot = { version = "0.12.1" } r2d2 = { version = "0.8.10" } r2d2_sqlite = { version = "0.23.0" } rusqlite = { version = "0.30.0", features = ["hooks"] } -swctx = { version = "0.2.2" } -threadpool = { version = "1.8.1" } +swctx = { version = "0.2.2", optional = true } +threadpool = { version = "1.8.1", optional = true } [package.metadata.docs.rs] -rustdoc-args = ["--generate-link-to-definition"] +all-features = true +rustdoc-args = ["--cfg", "docsrs", "--generate-link-to-definition"] ADDED build_docs.sh Index: build_docs.sh ================================================================== --- /dev/null +++ build_docs.sh @@ -0,0 +1,6 @@ +#!/bin/sh + +RUSTFLAGS="--cfg docsrs" RUSTDOCFLAGS="--cfg docsrs" \ +cargo +nightly doc --all-features + +# vim: set ft=sh et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: examples/simple.rs ================================================================== --- examples/simple.rs +++ examples/simple.rs @@ -18,20 +18,34 @@ ts DATETIME DEFAULT CURRENT_TIMESTAMP, CHECK (length(data) == 64) );", &[] as &[&dyn ToSql] )?; + + conn.execute( + "CREATE TABLE IF NOT EXISTS whoop ( + id INTEGER PRIMARY KEY, + name TEXT, + ts DATETIME DEFAULT CURRENT_TIMESTAMP +);", + &[] as &[&dyn ToSql] + )?; } Ok(()) } } fn main() { let schema = Box::new(Schema {}); - let bldr = sqlsrv::Builder::new(schema); + #[allow(unused_mut)] + let mut bldr = sqlsrv::Builder::new(schema).incremental_autoclean(10, None); + + #[cfg(feature = "tpool")] + bldr.worker_threads_r(None); + let connpool = bldr.build("test.sqlite").unwrap(); // Acquire the writer connection and use it to conditionally create the // `stuff` table. { @@ -108,8 +122,27 @@ let have = stmt .query_row(params!("stuff"), |row| row.get::(0)) .unwrap(); assert!(have); } + + #[cfg(feature = "tpool")] + connpool + .ro_run(|conn| { + const SQL: &str = "SELECT * FROM snarks;"; + let mut _stmt = conn.prepare_cached(SQL).unwrap(); + }) + .unwrap(); + + #[cfg(feature = "tpool")] + connpool.rw_run(|conn| { + const SQL: &str = "INSERT INTO whoop (name) VALUES (?);"; + let mut stmt = conn.prepare_cached(SQL).unwrap(); + stmt.execute(params!["test"]).unwrap(); + Some(1) + }); + + #[cfg(feature = "tpool")] + connpool.shutdown(); } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -1,10 +1,14 @@ //! A library for implementing an in-process SQLite database server. //! //! # Connection pooling //! sqlsrv implements connection pooling that reflects the concurrency model //! for SQLite: It supports only one writer but multiple readers. +//! +//! # Connection task pooling +//! In addition to pooling connections, the library can pool threads to run +//! database operations on. //! //! # Incremental auto-clean //! The connection pool has built-in support for setting up incremental //! autovacuum, and can be configured to implicitly run incremental vacuuming. //! @@ -12,10 +16,12 @@ //! pool. Whenever the writer connection performs changes to the database it //! can add "dirt" to the connection. When the writer connection is returned //! to the connection pool it checks to see if the amount of dirt is equal to //! or greater than the configured "maximum dirt" threshold. If the threshold //! has been reached, an incremental autovacuum is performed. + +#![cfg_attr(docsrs, feature(doc_cfg))] mod changehook; mod err; mod rawhook; mod wrconn; @@ -32,10 +38,11 @@ pub use rusqlite; use rusqlite::{params, Connection, OpenFlags}; +#[cfg(feature = "tpool")] use threadpool::ThreadPool; pub use changehook::ChangeLogHook; pub use err::Error; pub use rawhook::{Action, Hook}; @@ -112,16 +119,28 @@ } fn on_release(&self, _conn: rusqlite::Connection) {} } + +#[cfg(feature = "tpool")] +#[derive(Default)] +enum ThrdPool { + #[default] + Disable, + Enable { + nthreads: Option + } +} /// Builder for constructing a [`ConnPool`] object. pub struct Builder { schmgr: Box, full_vacuum: bool, max_readers: usize, + #[cfg(feature = "tpool")] + thrdpool: ThrdPool, autoclean: Option, hook: Option> } /// Internal methods. @@ -159,23 +178,42 @@ r2d2::Pool::builder() .max_size(max_readers) .connection_customizer(Box::new(roconn_initterm)) .build(manager) } + + #[cfg(feature = "tpool")] + fn init_tpool(&self) -> Option { + match self.thrdpool { + ThrdPool::Disable => None, + ThrdPool::Enable { nthreads } => { + let nthreads = if let Some(nthreads) = nthreads { + nthreads + } else { + self.max_readers + 1 + }; + let tpool = ThreadPool::new(nthreads); + Some(tpool) + } + } + } } impl Builder { /// Create a new `Builder` for constructing a [`ConnPool`] object. /// /// Default to not run a full vacuum of the database on initialization and /// create 2 read-only connections for the pool. + /// No workers thread pool will be used. pub fn new(schmgr: Box) -> Self { Self { schmgr, full_vacuum: false, max_readers: 2, + #[cfg(feature = "tpool")] + thrdpool: ThrdPool::default(), autoclean: None, hook: None } } @@ -208,10 +246,41 @@ /// Operates on a borrowed `Builder` object. pub fn max_readers_r(&mut self, n: usize) -> &mut Self { self.max_readers = n; self } + + /// Enable a thread pool for running connection tasks on. + /// + /// Unless this is called, no thread pool will be allocated and the + /// [`ConnPool::ro_run()`] and [`ConnPool::rw_run()`] (and associated + /// methods) will panic. + /// + /// The `nthreads` can be used to specify the number of worker threads to + /// allocate. If this is `None`, the number of threads will default to the + /// number of reader connections plus one (for the writer). + /// + /// # Panic + /// Panics if `nthreads` is set to `Some(0)`. + #[cfg(feature = "tpool")] + #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))] + pub fn worker_threads(mut self, nthreads: Option) -> Self { + self.worker_threads_r(nthreads); + self + } + + /// Enable a thread pool for running connection tasks on. + /// + /// This does the same as [`Builder::worker_threads()`], but operates on a + /// borrowed object. + #[cfg(feature = "tpool")] + #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))] + pub fn worker_threads_r(&mut self, nthreads: Option) -> &mut Self { + assert_ne!(nthreads, Some(0)); + self.thrdpool = ThrdPool::Enable { nthreads }; + self + } /// Request that a "raw" update hook be added to the writer connection. /// /// Operates on an owned `Builder` object. pub fn hook(mut self, hook: Arc) -> Self { @@ -286,13 +355,14 @@ if let Some(ref hook) = self.hook { rawhook::hook(&conn, Arc::clone(hook)); } // - // Create a threadpool with one thread for each reader and the writer + // Create thread pool if requested to do so // - let tpool = ThreadPool::new(self.max_readers + 1); + #[cfg(feature = "tpool")] + let tpool = self.init_tpool(); // // Set up connection pool for read-only connections. // let rpool = self.create_ro_pool(fname)?; @@ -306,11 +376,16 @@ inner: Mutex::new(inner), signal: Condvar::new(), autoclean: self.autoclean.clone() }); - Ok(ConnPool { rpool, sh, tpool }) + Ok(ConnPool { + rpool, + sh, + #[cfg(feature = "tpool")] + tpool + }) } /// Construct a connection pool. /// @@ -373,13 +448,14 @@ // Register a callback hook // changehook::hook(&conn, hook); // - // Create a threadpool with one thread for each reader and the writer + // Create thread pool if requested to do so // - let tpool = ThreadPool::new(self.max_readers + 1); + #[cfg(feature = "tpool")] + let tpool = self.init_tpool(); // // Set up connection pool for read-only connections. // let rpool = self.create_ro_pool(fname)?; @@ -393,11 +469,16 @@ inner: Mutex::new(inner), signal: Condvar::new(), autoclean: self.autoclean.clone() }); - Ok(ConnPool { rpool, sh, tpool }) + Ok(ConnPool { + rpool, + sh, + #[cfg(feature = "tpool")] + tpool + }) } } /// Inner writer connection object. @@ -435,11 +516,12 @@ /// This is a somewhat specialized connection pool that only allows a single /// writer but multiple readers. pub struct ConnPool { sh: Arc, rpool: r2d2::Pool, - tpool: ThreadPool + #[cfg(feature = "tpool")] + tpool: Option } impl ConnPool { /// Acquire a read-only connection. pub fn reader( @@ -483,72 +565,107 @@ inner: ManuallyDrop::new(conn) }) } /// Run a closure with a read-only connection + /// + /// # Panic + /// Panics if the connection pool was not configured to use a thread pool. + #[cfg(feature = "tpool")] + #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))] pub fn ro_run(&self, f: F) -> Result<(), r2d2::Error> where F: FnOnce(&Connection) + Send + 'static { + let Some(ref tpool) = self.tpool else { + panic!("Connection pool does not have a thread pool"); + }; + let roconn = self.reader()?; - self.tpool.execute(move || f(&roconn)); + tpool.execute(move || f(&roconn)); Ok(()) } /// Run a closure with read-only connection, returning a channel end-point /// for retreiving result. + /// + /// # Panic + /// Panics if the connection pool was not configured to use a thread pool. + #[cfg(feature = "tpool")] + #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))] pub fn ro_run_result( &self, f: F ) -> Result, r2d2::Error> where F: FnOnce(&Connection, swctx::SetCtx) + Send + 'static, T: Send + 'static, E: Send + 'static { + let Some(ref tpool) = self.tpool else { + panic!("Connection pool does not have a thread pool"); + }; + let roconn = self.reader()?; let (sctx, wctx) = swctx::mkpair(); - self.tpool.execute(move || f(&roconn, sctx)); + tpool.execute(move || f(&roconn, sctx)); Ok(wctx) } /// Run a closure with read/write connection + /// + /// # Panic + /// Panics if the connection pool was not configured to use a thread pool. + #[cfg(feature = "tpool")] + #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))] pub fn rw_run(&self, f: F) where F: FnOnce(&mut Connection) -> Option + Send + 'static { + let Some(ref tpool) = self.tpool else { + panic!("Connection pool does not have a thread pool"); + }; + let mut rwconn = self.writer(); - self.tpool.execute(move || { + tpool.execute(move || { let dirt = f(&mut rwconn); if let Some(dirt) = dirt { rwconn.add_dirt(dirt); } }); } - /// Run a closure with read/write connection, returning a channel end-point /// for retreiving result. + /// + /// # Panic + /// Panics if the connection pool was not configured to use a thread pool. + #[cfg(feature = "tpool")] + #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))] pub fn rw_run_result(&self, f: F) -> swctx::WaitCtx where F: FnOnce(&mut Connection, swctx::SetCtx) -> Option + Send + 'static, T: Send + 'static, E: Send + 'static { + let Some(ref tpool) = self.tpool else { + panic!("Connection pool does not have a thread pool"); + }; + let mut rwconn = self.writer(); let (sctx, wctx) = swctx::mkpair(); - self.tpool.execute(move || { + tpool.execute(move || { let dirt = f(&mut rwconn, sctx); if let Some(dirt) = dirt { rwconn.add_dirt(dirt); } }); @@ -558,10 +675,12 @@ /// Perform an incremental vacuum. /// /// `n` is the number of freelist nodes to reclaim. If `None` all nodes will /// be reclaimed. + #[cfg(feature = "tpool")] + #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))] pub fn incremental_vacuum( &self, n: Option ) -> swctx::WaitCtx<(), (), rusqlite::Error> { self.rw_run_result(move |wconn, sctx| { @@ -578,11 +697,15 @@ None }) } /// Consume self and wait for all threads in thread pool to complete. + #[cfg(feature = "tpool")] + #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))] pub fn shutdown(self) { - self.tpool.join(); + if let Some(tpool) = self.tpool { + tpool.join(); + } } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -1,20 +1,32 @@ # Change Log ## [Unreleased] -[Details](/vdiff?from=sqlsrv-0.0.4&to=trunk) +[Details](/vdiff?from=sqlsrv-0.1.0&to=trunk) ### Added ### Changed ### Removed --- -## [0.0.4] +## [0.1.0] - 2024-01-21 + +[Details](/vdiff?from=sqlsrv-0.0.4&to=sqlsrv-0.1.0) + +### Changed + +- Make the internal thread pool optional. It is disabled by default. +- Put thread pool features behind a ferature gate (`tpool`), which is enabled + by default. + +--- + +## [0.0.4] - 2024-01-19 [Details](/vdiff?from=sqlsrv-0.0.3&to=sqlsrv-0.0.4) ### Changed