Index: .efiles ================================================================== --- .efiles +++ .efiles @@ -5,6 +5,7 @@ src/err.rs src/lib.rs src/wrconn.rs src/rawhook.rs src/changehook.rs +src/utils.rs examples/simple.rs Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "sqlsrv" -version = "0.2.0" +version = "0.3.0" edition = "2021" license = "0BSD" categories = [ "database" ] keywords = [ "sqlite", "server" ] repository = "https://repos.qrnch.tech/pub/sqlsrv" @@ -17,11 +17,10 @@ "www", "rustfmt.toml" ] [features] -default = ["tpool"] tpool = ["dep:swctx", "dep:threadpool"] [dependencies] parking_lot = { version = "0.12.1" } r2d2 = { version = "0.8.10" } Index: examples/simple.rs ================================================================== --- examples/simple.rs +++ examples/simple.rs @@ -1,5 +1,8 @@ +#[cfg(feature = "tpool")] +use std::sync::Arc; + use rusqlite::{functions::FunctionFlags, params, Connection, ToSql}; use rand::Rng; use sha2::{Digest, Sha256}; @@ -58,11 +61,15 @@ .reg_scalar_fn(RegOn::RO(register_pwhash)); // If 'tpool' is enabled, then create a thread pool with as many workers as // there are connections in the pool. #[cfg(feature = "tpool")] - bldr.worker_threads_r(None); + let tpool = { + let tpool = Arc::new(threadpool::Builder::new().build()); + bldr.thread_pool_r(Arc::clone(&tpool)); + tpool + }; let connpool = bldr.build("test.sqlite").unwrap(); // Acquire the writer connection and use it to conditionally create the // `stuff` table. @@ -143,37 +150,37 @@ assert!(have); } #[cfg(feature = "tpool")] connpool - .ro_run(|conn| { + .run_ro_thrd(|conn| { const SQL: &str = "SELECT * FROM snarks;"; let mut _stmt = conn.prepare_cached(SQL).unwrap(); }) .unwrap(); #[cfg(feature = "tpool")] - connpool.rw_run(|conn| { + connpool.run_rw_thrd(|conn| { const SQL: &str = "INSERT INTO whoop (name) VALUES (?);"; let mut stmt = conn.prepare_cached(SQL).unwrap(); stmt.execute(params!["test"]).unwrap(); Some(1) }); #[cfg(feature = "tpool")] - connpool.rw_run(|conn| { + connpool.run_rw_thrd(|conn| { const SQL: &str = "INSERT INTO uids (uid) VALUES (genuid(8));"; let mut stmt = conn.prepare_cached(SQL).unwrap(); stmt.execute(params![]).unwrap(); Some(1) }); #[cfg(feature = "tpool")] { - let wctx: swctx::WaitCtx<_, _, rusqlite::Error> = - connpool.rw_run_result(|conn| { + let wctx: swctx::WaitCtx<_, _, rusqlite::Error> = connpool + .run_rw_thrd_result(|conn| { const SQL: &str = "INSERT INTO uids (uid) VALUES (genuid(8));"; let mut stmt = conn.prepare_cached(SQL)?; stmt.execute([])?; Ok(()) }); @@ -180,11 +187,11 @@ wctx.wait().unwrap(); } #[cfg(feature = "tpool")] - connpool.shutdown(); + tpool.join(); } /// Register a pwhash() SQL function which returns a hex-encoded version of /// the SHA256 hash of the input string. Index: src/lib.rs ================================================================== --- src/lib.rs +++ src/lib.rs @@ -1,14 +1,14 @@ //! A library for implementing an in-process SQLite database server. //! //! # Connection pooling //! sqlsrv implements connection pooling that reflects the concurrency model -//! for SQLite: It supports only one writer but multiple readers. +//! of SQLite: It supports multiple parallel readers, but only one writer. //! -//! # Connection task pooling -//! In addition to pooling connections, the library can pool threads to run -//! database operations on. +//! # Thread pooling +//! In addition to pooling connections, the library supports optionally using +//! a thread pool for diaptching database operations onto threads. //! //! # Incremental auto-clean //! The connection pool has built-in support for setting up incremental //! autovacuum, and can be configured to implicitly run incremental vacuuming. //! @@ -20,21 +20,20 @@ //! has been reached, an incremental autovacuum is performed. //! //! # Features //! | Feature | Function //! |----------|---------- -//! | `tpool` | Enable internal thread pool. -//! -//! The `tpool` feature is enabled by default, and will allow the connection -//! pool to dispatch database work to threads within that pool. +//! | `tpool` | Enable functions/methods that use a thread pool. #![cfg_attr(docsrs, feature(doc_cfg))] mod changehook; mod err; mod rawhook; mod wrconn; + +pub mod utils; use std::{ fmt, mem::ManuallyDrop, num::NonZeroUsize, path::Path, str::FromStr, sync::Arc }; @@ -43,10 +42,11 @@ use r2d2::{CustomizeConnection, PooledConnection}; use r2d2_sqlite::SqliteConnectionManager; +pub use r2d2; pub use rusqlite; use rusqlite::{params, Connection, OpenFlags}; #[cfg(feature = "tpool")] @@ -56,16 +56,26 @@ pub use err::Error; pub use rawhook::{Action, Hook}; pub use wrconn::WrConn; +/// Wrapper around a SQL functions registration callback used to select which +/// connection types to perform registrations on. pub enum RegOn where F: Fn(&Connection) -> Result<(), rusqlite::Error> + Sync + Sync { + /// This registration callback should only be called for read-only + /// connections. RO(F), + + /// This registration callback should only be called for the read/write + /// connections. RW(F), + + /// This registration callback should be called for both the read-only and + /// read/write connections. Both(F) } type RegCb = dyn Fn(&Connection) -> Result<(), rusqlite::Error> + Send + Sync; @@ -167,30 +177,20 @@ fn on_release(&self, _conn: rusqlite::Connection) {} } -#[cfg(feature = "tpool")] -#[derive(Default)] -enum ThrdPool { - #[default] - Disable, - Enable { - nthreads: Option - } -} - /// Builder for constructing a [`ConnPool`] object. pub struct Builder { schmgr: Box, full_vacuum: bool, max_readers: usize, - #[cfg(feature = "tpool")] - thrdpool: ThrdPool, autoclean: Option, hook: Option>, - regfuncs: Option> + regfuncs: Option>, + #[cfg(feature = "tpool")] + tpool: Option> } /// Internal methods. impl Builder { /// Open the writer connection. @@ -227,26 +227,10 @@ r2d2::Pool::builder() .max_size(max_readers) .connection_customizer(Box::new(roconn_initterm)) .build(manager) } - - #[cfg(feature = "tpool")] - fn init_tpool(&self) -> Option { - match self.thrdpool { - ThrdPool::Disable => None, - ThrdPool::Enable { nthreads } => { - let nthreads = if let Some(nthreads) = nthreads { - nthreads - } else { - self.max_readers + 1 - }; - let tpool = ThreadPool::new(nthreads); - Some(tpool) - } - } - } } impl Builder { /// Create a new `Builder` for constructing a [`ConnPool`] object. @@ -257,15 +241,15 @@ pub fn new(schmgr: Box) -> Self { Self { schmgr, full_vacuum: false, max_readers: 2, - #[cfg(feature = "tpool")] - thrdpool: ThrdPool::default(), autoclean: None, hook: None, - regfuncs: None + regfuncs: None, + #[cfg(feature = "tpool")] + tpool: None } } /// Trigger a full vacuum when initializing the connection pool. /// @@ -297,41 +281,10 @@ pub fn max_readers_r(&mut self, n: usize) -> &mut Self { self.max_readers = n; self } - /// Enable a thread pool for running connection tasks on. - /// - /// Unless this is called, no thread pool will be allocated and the - /// [`ConnPool::ro_run()`] and [`ConnPool::rw_run()`] (and associated - /// methods) will panic. - /// - /// The `nthreads` can be used to specify the number of worker threads to - /// allocate. If this is `None`, the number of threads will default to the - /// number of reader connections plus one (for the writer). - /// - /// # Panic - /// Panics if `nthreads` is set to `Some(0)`. - #[cfg(feature = "tpool")] - #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))] - pub fn worker_threads(mut self, nthreads: Option) -> Self { - self.worker_threads_r(nthreads); - self - } - - /// Enable a thread pool for running connection tasks on. - /// - /// This does the same as [`Builder::worker_threads()`], but operates on a - /// borrowed object. - #[cfg(feature = "tpool")] - #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))] - pub fn worker_threads_r(&mut self, nthreads: Option) -> &mut Self { - assert_ne!(nthreads, Some(0)); - self.thrdpool = ThrdPool::Enable { nthreads }; - self - } - /// Request that a "raw" update hook be added to the writer connection. /// /// Operates on an owned `Builder` object. pub fn hook(mut self, hook: Arc) -> Self { self.hook = Some(hook); @@ -406,10 +359,21 @@ } } self } + #[cfg(feature = "tpool")] + pub fn thread_pool(mut self, tpool: Arc) -> Self { + self.tpool = Some(tpool); + self + } + + #[cfg(feature = "tpool")] + pub fn thread_pool_r(&mut self, tpool: Arc) -> &mut Self { + self.tpool = Some(tpool); + self + } /// Construct a connection pool. pub fn build

(mut self, fname: P) -> Result where P: AsRef @@ -467,16 +431,10 @@ // if let Some(ref hook) = self.hook { rawhook::hook(&conn, Arc::clone(hook)); } - // - // Create thread pool if requested to do so - // - #[cfg(feature = "tpool")] - let tpool = self.init_tpool(); - // // Set up connection pool for read-only connections. // let rpool = self.create_ro_pool(fname, regfuncs)?; @@ -490,14 +448,14 @@ signal: Condvar::new(), autoclean: self.autoclean.clone() }); Ok(ConnPool { - rpool, sh, + rpool, #[cfg(feature = "tpool")] - tpool + tpool: self.tpool }) } /// Construct a connection pool. @@ -578,16 +536,10 @@ // // Register a callback hook // changehook::hook(&conn, hook); - // - // Create thread pool if requested to do so - // - #[cfg(feature = "tpool")] - let tpool = self.init_tpool(); - // // Set up connection pool for read-only connections. // let rpool = self.create_ro_pool(fname, regfuncs)?; @@ -601,14 +553,14 @@ signal: Condvar::new(), autoclean: self.autoclean.clone() }); Ok(ConnPool { - rpool, sh, + rpool, #[cfg(feature = "tpool")] - tpool + tpool: self.tpool }) } } @@ -642,20 +594,28 @@ } /// SQLite connection pool. /// -/// This is a somewhat specialized connection pool that only allows a single -/// writer but multiple readers. +/// This is a specialized connection pool that is defined specifically for +/// sqlite, and only allows a single writer but multiple readers. pub struct ConnPool { sh: Arc, rpool: r2d2::Pool, #[cfg(feature = "tpool")] - tpool: Option + tpool: Option> } impl ConnPool { + /// Return the pool size. + /// + /// In effect, this is the size of the read-only pool plus one (for the + /// read/write connection). + pub fn size(&self) -> usize { + (self.rpool.max_size() + 1) as usize + } + /// Acquire a read-only connection. pub fn reader( &self ) -> Result, r2d2::Error> { self.rpool.get() @@ -685,156 +645,207 @@ /// /// Returns `Some(conn)` if the writer connection was available at the time /// of the request. Returns `None` if the writer has already been taken. pub fn try_writer(&self) -> Option { let mut g = self.sh.inner.lock(); - let Some(conn) = g.conn.take() else { - return None; - }; + let conn = g.conn.take()?; Some(WrConn { sh: Arc::clone(&self.sh), inner: ManuallyDrop::new(conn) }) } +} + + +/// Special queries. +impl ConnPool { + /// Return the number of unused pages. + pub fn freelist_count(&self) -> Result { + Ok(self.reader()?.query_row_and_then( + "PRAGMA freelist_count;'", + [], + |row| row.get(0) + )?) + } +} + + +pub enum RunError { + R2D2(r2d2::Error), + App(E) +} + +/// Read-only connection processing. +impl ConnPool { + /// Run a read-only database operation. + /// + /// # Errors + /// If a connection could not be acquired from the connection pool, + /// `Err(RunError::R2D2(r2d2::Error))` will be returned. If the application + /// callback fails, this function will return `Err(RunError::App(E))`. + pub fn run_ro(&self, f: F) -> Result> + where + T: Send + 'static, + E: fmt::Debug + Send + 'static, + F: FnOnce(&Connection) -> Result + Send + 'static + { + // Acquire a read-only connection from the pool + let conn = self.reader().map_err(|e| RunError::R2D2(e))?; + + // Run caller-provided closure. On error map error to RunError::App(). + f(&conn).map_err(|e| RunError::App(e)) + } - /// Run a closure with a read-only connection + /// Run a read-only database operation on a thread. /// - /// # Panic - /// Panics if the connection pool was not configured to use a thread pool. + /// # Panics + /// A thread pool must be associated with the [`ConnPool`] or this method + /// will panic. #[cfg(feature = "tpool")] #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))] - pub fn ro_run(&self, f: F) -> Result<(), r2d2::Error> + pub fn run_ro_thrd(&self, f: F) -> Result<(), r2d2::Error> where F: FnOnce(&Connection) + Send + 'static { let Some(ref tpool) = self.tpool else { - panic!("Connection pool does not have a thread pool"); + panic!("ConnPool does to have a thread pool"); }; - let roconn = self.reader()?; - - tpool.execute(move || f(&roconn)); - + // Acquire a read-only connection from the pool and then run the provided + // closure on a thread from the thread pool. + let conn = self.reader()?; + tpool.execute(move || { + f(&conn); + }); Ok(()) } - /// Run a closure with read-only connection, returning a channel end-point - /// for retreiving result. + /// Run a read-only database operation on a thread, allowing the caller to + /// receive the `Result` of the supplied closure using a + /// one-shot channel. + /// + /// The supplied closure in `f` should return a `Result` where the `Ok` + /// case will be passed as a "set" value through the `swctx` channel, and the + /// `Err` case will be passed as a "fail" value. /// - /// # Panic - /// Panics if the connection pool was not configured to use a thread pool. + /// # Panics + /// A thread pool must be associated with the [`ConnPool`] or this method + /// will panic. #[cfg(feature = "tpool")] #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))] - pub fn ro_run_result( + pub fn run_ro_thrd_result( &self, f: F ) -> Result, r2d2::Error> where - F: FnOnce(&Connection) -> Result + Send + 'static, T: Send + 'static, - E: Send + 'static + E: fmt::Debug + Send + 'static, + F: FnOnce(&Connection) -> Result + Send + 'static { let Some(ref tpool) = self.tpool else { - panic!("Connection pool does not have a thread pool"); + panic!("ConnPool does to have a thread pool"); }; - let roconn = self.reader()?; + let conn = self.reader()?; let (sctx, wctx) = swctx::mkpair(); - tpool.execute(move || match f(&roconn) { + tpool.execute(move || match f(&conn) { Ok(t) => sctx.set(t), Err(e) => sctx.fail(e) }); Ok(wctx) } +} +/// Read/Write connection processing. +impl ConnPool { + /// Run a read/write database operation. + pub fn run_rw(&self, f: F) -> Result + where + T: Send + 'static, + E: fmt::Debug + Send + 'static, + F: FnOnce(&mut WrConn) -> Result + Send + 'static + { + let mut conn = self.writer(); + f(&mut conn) + } - /// Run a closure with read/write connection + /// Run a read/write database operation on a thread. + /// + /// The supplied closure should return an `Option`, where the `Some()` + /// case denotes the specified amount of "dirt" should be added to the write + /// connection. `None` means no dirt should be added. /// - /// # Panic - /// Panics if the connection pool was not configured to use a thread pool. + /// # Panics + /// A thread pool must be associated with the [`ConnPool`] or this method + /// will panic. #[cfg(feature = "tpool")] #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))] - pub fn rw_run(&self, f: F) + pub fn run_rw_thrd(&self, f: F) where - F: FnOnce(&mut Connection) -> Option + Send + 'static + F: FnOnce(&mut WrConn) -> Option + Send + 'static { let Some(ref tpool) = self.tpool else { - panic!("Connection pool does not have a thread pool"); + panic!("ConnPool does to have a thread pool"); }; - let mut rwconn = self.writer(); - + let mut conn = self.writer(); tpool.execute(move || { - let dirt = f(&mut rwconn); + let dirt = f(&mut conn); if let Some(dirt) = dirt { - rwconn.add_dirt(dirt); + conn.add_dirt(dirt); } }); } - /// Run a closure with read/write connection, returning a channel end-point - /// for retreiving result. + /// Run a read/write database operation on a thread, allowing the + /// caller to receive the `Result` of the supplied closure using a + /// one-shot channel. + /// + /// The supplied closure in `f` should return a `Result` where the `Ok` + /// case will be passed as a "set" value through the `swctx` channel, and the + /// `Err` case will be passed as a "fail" value. /// - /// # Panic - /// Panics if the connection pool was not configured to use a thread pool. + /// # Panics + /// A thread pool must be associated with the [`ConnPool`] or this method + /// will panic. #[cfg(feature = "tpool")] #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))] - pub fn rw_run_result(&self, f: F) -> swctx::WaitCtx + pub fn run_rw_thrd_result(&self, f: F) -> swctx::WaitCtx where - F: FnOnce(&mut WrConn) -> Result + Send + 'static, T: Send + 'static, - E: Send + 'static + E: fmt::Debug + Send + 'static, + F: FnOnce(&mut WrConn) -> Result + Send + 'static { let Some(ref tpool) = self.tpool else { - panic!("Connection pool does not have a thread pool"); + panic!("ConnPool does to have a thread pool"); }; - let mut rwconn = self.writer(); + let mut conn = self.writer(); let (sctx, wctx) = swctx::mkpair(); - tpool.execute(move || match f(&mut rwconn) { + tpool.execute(move || match f(&mut conn) { Ok(t) => sctx.set(t), Err(e) => sctx.fail(e) }); wctx } - - /// Perform an incremental vacuum. - /// - /// `n` is the number of freelist nodes to reclaim. If `None` all nodes will - /// be reclaimed. - /// - /// # Panic - /// Panics if the connection pool was not configured to use a thread pool. +} + + +impl ConnPool { #[cfg(feature = "tpool")] #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))] pub fn incremental_vacuum( &self, n: Option ) -> swctx::WaitCtx<(), (), rusqlite::Error> { - self.rw_run_result(move |wconn| { - if let Some(n) = n { - wconn.execute("PRAGMA incremental_vacuum(?);", params![n]) - } else { - wconn.execute("PRAGMA incremental_vacuum;", params![]) - } - .map(|_| ()) - }) - } - - /// Consume self and wait for all threads in thread pool to complete. - #[cfg(feature = "tpool")] - #[cfg_attr(docsrs, doc(cfg(feature = "tpool")))] - pub fn shutdown(self) { - if let Some(tpool) = self.tpool { - tpool.join(); - } + self.run_rw_thrd_result(move |conn| conn.incremental_vacuum(n)) } } // vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : ADDED src/utils.rs Index: src/utils.rs ================================================================== --- /dev/null +++ src/utils.rs @@ -0,0 +1,51 @@ +//! Utility functions around SQL commands. + +use rusqlite::{params, Connection}; + +#[cfg(feature = "tpool")] +use threadpool::ThreadPool; + +#[cfg(feature = "tpool")] +use super::ConnPool; + +/// Return the number of pages in the freelist. +pub fn freelist_count(conn: &Connection) -> Result { + conn.query_row_and_then("PRAGMA freelist_count;'", [], |row| row.get(0)) +} + +/// Run an incremental vacuum. +/// +/// If `n` is `None` the entrire list of free pages will be processed. If it +/// is `Some(n)` then only up to `n` pages will be processed. +pub fn incremental_vacuum( + conn: &Connection, + n: Option +) -> Result<(), rusqlite::Error> { + if let Some(n) = n { + conn.execute("PRAGMA incremental_vacuum(?);", params![n]) + } else { + conn.execute("PRAGMA incremental_vacuum;", params![]) + } + .map(|_| ()) +} + +#[cfg(feature = "tpool")] +#[cfg_attr(docsrs, doc(cfg(feature = "tpool")))] +pub fn pooled_incremental_vacuum( + cpool: &ConnPool, + tpool: &ThreadPool, + n: Option +) -> swctx::WaitCtx<(), (), rusqlite::Error> { + let (sctx, wctx) = swctx::mkpair(); + + let conn = cpool.writer(); + + tpool.execute(move || match conn.incremental_vacuum(n) { + Ok(_) => sctx.set(()), + Err(e) => sctx.fail(e) + }); + + wctx +} + +// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 : Index: src/wrconn.rs ================================================================== --- src/wrconn.rs +++ src/wrconn.rs @@ -31,10 +31,30 @@ /// Add dirt to the writer connection. pub fn add_dirt(&mut self, weight: usize) { self.inner.dirt = self.inner.dirt.saturating_add(weight); } } + +impl WrConn { + pub fn incremental_vacuum( + &self, + n: Option + ) -> Result<(), rusqlite::Error> { + if let Some(n) = n { + self + .inner + .conn + .execute("PRAGMA incremental_vacuum(?);", params![n]) + } else { + self + .inner + .conn + .execute("PRAGMA incremental_vacuum;", params![]) + } + .map(|_| ()) + } +} impl Deref for WrConn { type Target = Connection; #[inline(always)] Index: www/changelog.md ================================================================== --- www/changelog.md +++ www/changelog.md @@ -1,16 +1,44 @@ # Change Log ## [Unreleased] -[Details](/vdiff?from=sqlsrv-0.2.0&to=trunk) +[Details](/vdiff?from=sqlsrv-0.3.0&to=trunk) + +### Added + +### Changed + +### Removed + +--- + +## [0.3.0] - 2024-02-10 + +[Details](/vdiff?from=sqlsrv-0.2.0&to=sqlsrv-0.3.0) ### Added +- Re-export `r2d2`. +- Add `ConnPool::size()` for getting number of connections in connection pool. +- Add `ConnPool::freelist_count()` for getting number of unused pages in the + database. +- `WrConn::incremental_vacuum()` added. +- Add an `utils` sudmodule for collecting SQL command wrappers. +- Add several new `ConnPool` wrappers for running closures with either + read-only or read/write connections. + ### Changed +- `tpool` feature is no longer the default. +- Redesigned thread pooling to support sharing a thread pool with others. + ### Removed + +- Removed support for the thread pool in the `ConnPool` and opt for providing + functions/methods that take in a reference to a `threadpool::ThreadPool` + instead. --- ## [0.2.0] - 2024-01-28