fix: properly set WAL, eliminate (?) intermittent `database locked` errors
parent
ee8dc40577
commit
051e95d640
|
@ -40,24 +40,27 @@ use tracing::{debug, error, trace};
|
|||
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionOptions {
|
||||
pub enable_foreign_keys: bool,
|
||||
pub busy_timeout: Option<Duration>,
|
||||
pub enable_wal_mode: bool,
|
||||
}
|
||||
|
||||
impl ConnectionOptions {
|
||||
pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> {
|
||||
if self.enable_foreign_keys {
|
||||
debug!("Enabling foreign keys");
|
||||
conn.execute("PRAGMA foreign_keys = ON;")?;
|
||||
}
|
||||
|
||||
pub fn apply(&self, connection: &SqliteConnection) -> QueryResult<()> {
|
||||
if let Some(duration) = self.busy_timeout {
|
||||
debug!("Setting busy_timeout to {:?}", duration);
|
||||
conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?;
|
||||
connection.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?;
|
||||
}
|
||||
|
||||
connection.execute(if self.enable_wal_mode {
|
||||
debug!("Enabling WAL journal mode & truncating WAL log...");
|
||||
"PRAGMA journal_mode = WAL; PRAGMA wal_autocheckpoint = 1000; PRAGMA wal_checkpoint(TRUNCATE);"
|
||||
} else {
|
||||
debug!("Enabling TRUNCATE journal mode");
|
||||
"PRAGMA journal_mode = TRUNCATE;"
|
||||
})?;
|
||||
|
||||
debug!(r#"Setting "synchronous" to NORMAL"#);
|
||||
conn.execute("PRAGMA synchronous = NORMAL;")?;
|
||||
connection.execute("PRAGMA synchronous = NORMAL;")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -124,8 +127,8 @@ impl UpEndDatabase {
|
|||
);
|
||||
let pool = r2d2::Pool::builder()
|
||||
.connection_customizer(Box::new(ConnectionOptions {
|
||||
enable_foreign_keys: true,
|
||||
busy_timeout: Some(Duration::from_secs(30)),
|
||||
enable_wal_mode: true,
|
||||
}))
|
||||
.error_handler(Box::new(LoggingHandler { name: "main" }))
|
||||
.build(manager)?;
|
||||
|
@ -145,16 +148,6 @@ impl UpEndDatabase {
|
|||
}
|
||||
}
|
||||
|
||||
trace!("Running initial config.");
|
||||
let enable_wal_mode = true;
|
||||
connection.execute(if enable_wal_mode {
|
||||
debug!("Enabling WAL journal mode & truncating WAL log...");
|
||||
"PRAGMA journal_mode = WAL; PRAGMA wal_autocheckpoint = 1000; PRAGMA wal_checkpoint(TRUNCATE);"
|
||||
} else {
|
||||
debug!("Enabling TRUNCATE journal mode");
|
||||
"PRAGMA journal_mode = TRUNCATE;"
|
||||
})?;
|
||||
|
||||
trace!("Running migrations...");
|
||||
|
||||
embedded_migrations::run_with_output(
|
||||
|
@ -190,12 +183,6 @@ pub struct UpEndConnection {
|
|||
}
|
||||
|
||||
impl UpEndConnection {
|
||||
pub fn execute<S: AsRef<str>>(&self, query: S) -> Result<usize> {
|
||||
let _lock = self.lock.write().unwrap();
|
||||
let conn = self.pool.get()?;
|
||||
Ok(conn.execute(query.as_ref())?)
|
||||
}
|
||||
|
||||
pub fn transaction<T, E, F>(&self, f: F) -> Result<T, E>
|
||||
where
|
||||
F: FnOnce() -> Result<T, E>,
|
||||
|
|
|
@ -10,7 +10,9 @@ use crate::database::entry::{Entry, InvariantEntry};
|
|||
use crate::database::hierarchies::{
|
||||
resolve_path, resolve_path_cached, ResolveCache, UHierPath, UNode,
|
||||
};
|
||||
use crate::database::{LoggingHandler, UpEndConnection, UpEndDatabase, UPEND_SUBDIR};
|
||||
use crate::database::{
|
||||
ConnectionOptions, LoggingHandler, UpEndConnection, UpEndDatabase, UPEND_SUBDIR,
|
||||
};
|
||||
use crate::util::hash::{b58_encode, Hash, Hashable};
|
||||
use crate::util::jobs::{JobContainer, JobHandle};
|
||||
use anyhow::{anyhow, Error, Result};
|
||||
|
@ -26,7 +28,7 @@ use std::convert::{TryFrom, TryInto};
|
|||
use std::path::PathBuf;
|
||||
use std::path::{Component, Path};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::time::{Instant, SystemTime, UNIX_EPOCH};
|
||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
use std::{fs, iter};
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
use walkdir::WalkDir;
|
||||
|
@ -47,40 +49,6 @@ lazy_static! {
|
|||
static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap();
|
||||
}
|
||||
|
||||
struct PragmaSynchronousGuard<'a>(&'a SqliteConnection);
|
||||
|
||||
impl Drop for PragmaSynchronousGuard<'_> {
|
||||
fn drop(&mut self) {
|
||||
debug!("Re-enabling synchronous mode.");
|
||||
let res = self.0.execute("PRAGMA synchronous = NORMAL;");
|
||||
if let Err(err) = res {
|
||||
error!(
|
||||
"Error setting synchronous mode back to NORMAL! Data loss possible! {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ConnectionOptions;
|
||||
|
||||
impl ConnectionOptions {
|
||||
pub fn apply(&self, conn: &SqliteConnection) -> diesel::QueryResult<()> {
|
||||
debug!(r#"Setting "synchronous" to NORMAL"#);
|
||||
conn.execute("PRAGMA synchronous = NORMAL;")?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl diesel::r2d2::CustomizeConnection<SqliteConnection, diesel::r2d2::Error>
|
||||
for ConnectionOptions
|
||||
{
|
||||
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
|
||||
self.apply(conn).map_err(diesel::r2d2::Error::QueryError)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FsStore {
|
||||
path: PathBuf,
|
||||
pool: r2d2::Pool<ConnectionManager<SqliteConnection>>,
|
||||
|
@ -121,7 +89,10 @@ impl FsStore {
|
|||
)?;
|
||||
|
||||
let pool = r2d2::Pool::builder()
|
||||
.connection_customizer(Box::new(ConnectionOptions {}))
|
||||
.connection_customizer(Box::new(ConnectionOptions {
|
||||
busy_timeout: Some(Duration::from_secs(30)),
|
||||
enable_wal_mode: true,
|
||||
}))
|
||||
.error_handler(Box::new(LoggingHandler { name: "fs_store" }))
|
||||
.build(manager)?;
|
||||
|
||||
|
@ -137,14 +108,13 @@ impl FsStore {
|
|||
db: D,
|
||||
job_handle: JobHandle,
|
||||
quick_check: bool,
|
||||
disable_synchronous: bool,
|
||||
_disable_synchronous: bool,
|
||||
) -> Result<Vec<UpdatePathOutcome>> {
|
||||
let start = Instant::now();
|
||||
info!("Vault rescan started.");
|
||||
|
||||
let db = db.borrow();
|
||||
let upconnection = db.connection()?;
|
||||
let connection = self.pool.get()?;
|
||||
|
||||
// Initialize types, etc...
|
||||
debug!("Initializing DB types.");
|
||||
|
@ -155,14 +125,6 @@ impl FsStore {
|
|||
upend_insert_val!(upconnection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?;
|
||||
upend_insert_val!(upconnection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?;
|
||||
|
||||
// Disable syncing in SQLite for the duration of the import
|
||||
let mut _guard: Option<PragmaSynchronousGuard> = None;
|
||||
if disable_synchronous {
|
||||
debug!("Disabling SQLite synchronous mode");
|
||||
connection.execute("PRAGMA synchronous = OFF;")?;
|
||||
_guard = Some(PragmaSynchronousGuard(&connection));
|
||||
}
|
||||
|
||||
// Walk through the vault, find all paths
|
||||
debug!("Traversing vault directory");
|
||||
let absolute_dir_path = fs::canonicalize(&*self.path)?;
|
||||
|
@ -235,9 +197,6 @@ impl FsStore {
|
|||
}
|
||||
});
|
||||
|
||||
// Re-enable SQLite syncing
|
||||
drop(_guard);
|
||||
|
||||
// Reporting
|
||||
let all_outcomes = path_outcomes
|
||||
.into_iter()
|
||||
|
|
Loading…
Reference in New Issue