fix: properly set WAL, eliminate (?) intermittent `database locked` errors

feat/type-attributes
Tomáš Mládek 2022-10-24 19:29:41 +02:00
parent ee8dc40577
commit 051e95d640
2 changed files with 22 additions and 76 deletions

View File

@ -40,24 +40,27 @@ use tracing::{debug, error, trace};
#[derive(Debug)]
pub struct ConnectionOptions {
pub enable_foreign_keys: bool,
pub busy_timeout: Option<Duration>,
pub enable_wal_mode: bool,
}
impl ConnectionOptions {
pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> {
if self.enable_foreign_keys {
debug!("Enabling foreign keys");
conn.execute("PRAGMA foreign_keys = ON;")?;
}
pub fn apply(&self, connection: &SqliteConnection) -> QueryResult<()> {
if let Some(duration) = self.busy_timeout {
debug!("Setting busy_timeout to {:?}", duration);
conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?;
connection.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?;
}
connection.execute(if self.enable_wal_mode {
debug!("Enabling WAL journal mode & truncating WAL log...");
"PRAGMA journal_mode = WAL; PRAGMA wal_autocheckpoint = 1000; PRAGMA wal_checkpoint(TRUNCATE);"
} else {
debug!("Enabling TRUNCATE journal mode");
"PRAGMA journal_mode = TRUNCATE;"
})?;
debug!(r#"Setting "synchronous" to NORMAL"#);
conn.execute("PRAGMA synchronous = NORMAL;")?;
connection.execute("PRAGMA synchronous = NORMAL;")?;
Ok(())
}
@ -124,8 +127,8 @@ impl UpEndDatabase {
);
let pool = r2d2::Pool::builder()
.connection_customizer(Box::new(ConnectionOptions {
enable_foreign_keys: true,
busy_timeout: Some(Duration::from_secs(30)),
enable_wal_mode: true,
}))
.error_handler(Box::new(LoggingHandler { name: "main" }))
.build(manager)?;
@ -145,16 +148,6 @@ impl UpEndDatabase {
}
}
trace!("Running initial config.");
let enable_wal_mode = true;
connection.execute(if enable_wal_mode {
debug!("Enabling WAL journal mode & truncating WAL log...");
"PRAGMA journal_mode = WAL; PRAGMA wal_autocheckpoint = 1000; PRAGMA wal_checkpoint(TRUNCATE);"
} else {
debug!("Enabling TRUNCATE journal mode");
"PRAGMA journal_mode = TRUNCATE;"
})?;
trace!("Running migrations...");
embedded_migrations::run_with_output(
@ -190,12 +183,6 @@ pub struct UpEndConnection {
}
impl UpEndConnection {
pub fn execute<S: AsRef<str>>(&self, query: S) -> Result<usize> {
let _lock = self.lock.write().unwrap();
let conn = self.pool.get()?;
Ok(conn.execute(query.as_ref())?)
}
pub fn transaction<T, E, F>(&self, f: F) -> Result<T, E>
where
F: FnOnce() -> Result<T, E>,

View File

@ -10,7 +10,9 @@ use crate::database::entry::{Entry, InvariantEntry};
use crate::database::hierarchies::{
resolve_path, resolve_path_cached, ResolveCache, UHierPath, UNode,
};
use crate::database::{LoggingHandler, UpEndConnection, UpEndDatabase, UPEND_SUBDIR};
use crate::database::{
ConnectionOptions, LoggingHandler, UpEndConnection, UpEndDatabase, UPEND_SUBDIR,
};
use crate::util::hash::{b58_encode, Hash, Hashable};
use crate::util::jobs::{JobContainer, JobHandle};
use anyhow::{anyhow, Error, Result};
@ -26,7 +28,7 @@ use std::convert::{TryFrom, TryInto};
use std::path::PathBuf;
use std::path::{Component, Path};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use std::{fs, iter};
use tracing::{debug, error, info, trace, warn};
use walkdir::WalkDir;
@ -47,40 +49,6 @@ lazy_static! {
static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap();
}
struct PragmaSynchronousGuard<'a>(&'a SqliteConnection);
impl Drop for PragmaSynchronousGuard<'_> {
fn drop(&mut self) {
debug!("Re-enabling synchronous mode.");
let res = self.0.execute("PRAGMA synchronous = NORMAL;");
if let Err(err) = res {
error!(
"Error setting synchronous mode back to NORMAL! Data loss possible! {}",
err
);
}
}
}
#[derive(Debug)]
struct ConnectionOptions;
impl ConnectionOptions {
pub fn apply(&self, conn: &SqliteConnection) -> diesel::QueryResult<()> {
debug!(r#"Setting "synchronous" to NORMAL"#);
conn.execute("PRAGMA synchronous = NORMAL;")?;
Ok(())
}
}
impl diesel::r2d2::CustomizeConnection<SqliteConnection, diesel::r2d2::Error>
for ConnectionOptions
{
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
self.apply(conn).map_err(diesel::r2d2::Error::QueryError)
}
}
pub struct FsStore {
path: PathBuf,
pool: r2d2::Pool<ConnectionManager<SqliteConnection>>,
@ -121,7 +89,10 @@ impl FsStore {
)?;
let pool = r2d2::Pool::builder()
.connection_customizer(Box::new(ConnectionOptions {}))
.connection_customizer(Box::new(ConnectionOptions {
busy_timeout: Some(Duration::from_secs(30)),
enable_wal_mode: true,
}))
.error_handler(Box::new(LoggingHandler { name: "fs_store" }))
.build(manager)?;
@ -137,14 +108,13 @@ impl FsStore {
db: D,
job_handle: JobHandle,
quick_check: bool,
disable_synchronous: bool,
_disable_synchronous: bool,
) -> Result<Vec<UpdatePathOutcome>> {
let start = Instant::now();
info!("Vault rescan started.");
let db = db.borrow();
let upconnection = db.connection()?;
let connection = self.pool.get()?;
// Initialize types, etc...
debug!("Initializing DB types.");
@ -155,14 +125,6 @@ impl FsStore {
upend_insert_val!(upconnection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?;
upend_insert_val!(upconnection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?;
// Disable syncing in SQLite for the duration of the import
let mut _guard: Option<PragmaSynchronousGuard> = None;
if disable_synchronous {
debug!("Disabling SQLite synchronous mode");
connection.execute("PRAGMA synchronous = OFF;")?;
_guard = Some(PragmaSynchronousGuard(&connection));
}
// Walk through the vault, find all paths
debug!("Traversing vault directory");
let absolute_dir_path = fs::canonicalize(&*self.path)?;
@ -235,9 +197,6 @@ impl FsStore {
}
});
// Re-enable SQLite syncing
drop(_guard);
// Reporting
let all_outcomes = path_outcomes
.into_iter()