perf: implement speed-ups for vault db
have a pool; WAL journal mode; PRAGMA SYNCHRONOUS
This commit is contained in:
parent
0b0c6f2ec3
commit
7f519d9de8
1 changed files with 25 additions and 26 deletions
|
@ -15,7 +15,7 @@ use crate::util::hash::{b58_encode, Hash, Hashable};
|
||||||
use crate::util::jobs::{JobContainer, JobHandle};
|
use crate::util::jobs::{JobContainer, JobHandle};
|
||||||
use anyhow::{anyhow, Error, Result};
|
use anyhow::{anyhow, Error, Result};
|
||||||
use chrono::prelude::*;
|
use chrono::prelude::*;
|
||||||
use diesel::r2d2::{ConnectionManager, ManageConnection};
|
use diesel::r2d2::{self, ConnectionManager};
|
||||||
use diesel::ExpressionMethods;
|
use diesel::ExpressionMethods;
|
||||||
use diesel::{Connection, QueryDsl, RunQueryDsl, SqliteConnection};
|
use diesel::{Connection, QueryDsl, RunQueryDsl, SqliteConnection};
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info, warn};
|
||||||
|
@ -46,7 +46,7 @@ lazy_static! {
|
||||||
static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap();
|
static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
struct PragmaSynchronousGuard<'a>(&'a UpEndConnection);
|
struct PragmaSynchronousGuard<'a>(&'a SqliteConnection);
|
||||||
|
|
||||||
impl Drop for PragmaSynchronousGuard<'_> {
|
impl Drop for PragmaSynchronousGuard<'_> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
|
@ -63,7 +63,7 @@ impl Drop for PragmaSynchronousGuard<'_> {
|
||||||
|
|
||||||
pub struct FsStore {
|
pub struct FsStore {
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
manager: ConnectionManager<SqliteConnection>,
|
pool: r2d2::Pool<ConnectionManager<SqliteConnection>>,
|
||||||
lock: Arc<RwLock<()>>,
|
lock: Arc<RwLock<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,7 +76,8 @@ impl FsStore {
|
||||||
.to_str()
|
.to_str()
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
);
|
);
|
||||||
let connection = manager.connect()?;
|
let pool = r2d2::Pool::builder().build(manager)?;
|
||||||
|
let connection = pool.get()?;
|
||||||
|
|
||||||
// while diesel doesn't support multiple embedded migrations...
|
// while diesel doesn't support multiple embedded migrations...
|
||||||
connection.execute(
|
connection.execute(
|
||||||
|
@ -95,16 +96,13 @@ impl FsStore {
|
||||||
CREATE INDEX IF NOT EXISTS files_hash ON files (hash);
|
CREATE INDEX IF NOT EXISTS files_hash ON files (hash);
|
||||||
CREATE INDEX IF NOT EXISTS files_valid ON files (valid);
|
CREATE INDEX IF NOT EXISTS files_valid ON files (valid);
|
||||||
|
|
||||||
|
PRAGMA journal_mode = WAL; PRAGMA wal_autocheckpoint = 1000; PRAGMA wal_checkpoint(TRUNCATE);
|
||||||
"#,
|
"#,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let lock = Arc::new(RwLock::new(()));
|
let lock = Arc::new(RwLock::new(()));
|
||||||
|
|
||||||
Ok(FsStore {
|
Ok(FsStore { path, pool, lock })
|
||||||
path,
|
|
||||||
manager,
|
|
||||||
lock,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn rescan_vault<D: Borrow<UpEndDatabase>>(
|
fn rescan_vault<D: Borrow<UpEndDatabase>>(
|
||||||
|
@ -118,16 +116,17 @@ impl FsStore {
|
||||||
info!("Vault rescan started.");
|
info!("Vault rescan started.");
|
||||||
|
|
||||||
let db = db.borrow();
|
let db = db.borrow();
|
||||||
let connection = db.connection()?;
|
let upconnection = db.connection()?;
|
||||||
|
let connection = self.pool.get()?;
|
||||||
|
|
||||||
// Initialize types, etc...
|
// Initialize types, etc...
|
||||||
debug!("Initializing DB types.");
|
debug!("Initializing DB types.");
|
||||||
connection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?;
|
upconnection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?;
|
||||||
upend_insert_addr!(connection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?;
|
upend_insert_addr!(upconnection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?;
|
||||||
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY)?;
|
upend_insert_val!(upconnection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY)?;
|
||||||
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY)?;
|
upend_insert_val!(upconnection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY)?;
|
||||||
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?;
|
upend_insert_val!(upconnection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?;
|
||||||
upend_insert_val!(connection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?;
|
upend_insert_val!(upconnection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?;
|
||||||
|
|
||||||
// Disable syncing in SQLite for the duration of the import
|
// Disable syncing in SQLite for the duration of the import
|
||||||
let mut _guard: Option<PragmaSynchronousGuard> = None;
|
let mut _guard: Option<PragmaSynchronousGuard> = None;
|
||||||
|
@ -161,7 +160,7 @@ impl FsStore {
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|path| {
|
.map(|path| {
|
||||||
let result = self.process_directory_entry(
|
let result = self.process_directory_entry(
|
||||||
db.connection().unwrap(),
|
db,
|
||||||
&resolve_cache,
|
&resolve_cache,
|
||||||
path.clone(),
|
path.clone(),
|
||||||
&existing_files,
|
&existing_files,
|
||||||
|
@ -191,7 +190,7 @@ impl FsStore {
|
||||||
let existing_files = existing_files.read().unwrap();
|
let existing_files = existing_files.read().unwrap();
|
||||||
|
|
||||||
let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| {
|
let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| {
|
||||||
let trans_result = connection.transaction::<_, Error, _>(|| {
|
let trans_result = upconnection.transaction::<_, Error, _>(|| {
|
||||||
self.file_set_valid(file.id, false)?;
|
self.file_set_valid(file.id, false)?;
|
||||||
// remove_object(&connection, )?
|
// remove_object(&connection, )?
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -256,9 +255,9 @@ impl FsStore {
|
||||||
Ok(all_outcomes)
|
Ok(all_outcomes)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_directory_entry(
|
fn process_directory_entry<D: Borrow<UpEndDatabase>>(
|
||||||
&self,
|
&self,
|
||||||
connection: UpEndConnection,
|
db: D,
|
||||||
resolve_cache: &Arc<Mutex<ResolveCache>>,
|
resolve_cache: &Arc<Mutex<ResolveCache>>,
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
existing_files: &Arc<RwLock<Vec<db::File>>>,
|
existing_files: &Arc<RwLock<Vec<db::File>>>,
|
||||||
|
@ -345,7 +344,7 @@ impl FsStore {
|
||||||
let mime_type = tree_magic_mini::from_filepath(&path).map(|s| s.to_string());
|
let mime_type = tree_magic_mini::from_filepath(&path).map(|s| s.to_string());
|
||||||
|
|
||||||
self.insert_file_with_metadata(
|
self.insert_file_with_metadata(
|
||||||
&connection,
|
&db.borrow().connection()?,
|
||||||
&normalized_path,
|
&normalized_path,
|
||||||
file_hash.unwrap(),
|
file_hash.unwrap(),
|
||||||
size,
|
size,
|
||||||
|
@ -498,7 +497,7 @@ impl FsStore {
|
||||||
);
|
);
|
||||||
|
|
||||||
let _lock = self.lock.write().unwrap();
|
let _lock = self.lock.write().unwrap();
|
||||||
let conn = self.manager.connect()?;
|
let conn = self.pool.get()?;
|
||||||
|
|
||||||
diesel::insert_into(files::table)
|
diesel::insert_into(files::table)
|
||||||
.values(&file)
|
.values(&file)
|
||||||
|
@ -517,7 +516,7 @@ impl FsStore {
|
||||||
use self::db::files::dsl::*;
|
use self::db::files::dsl::*;
|
||||||
|
|
||||||
let _lock = self.lock.read().unwrap();
|
let _lock = self.lock.read().unwrap();
|
||||||
let conn = self.manager.connect()?;
|
let conn = self.pool.get()?;
|
||||||
|
|
||||||
let matches = files
|
let matches = files
|
||||||
.filter(valid.eq(true))
|
.filter(valid.eq(true))
|
||||||
|
@ -544,7 +543,7 @@ impl FsStore {
|
||||||
use self::db::files::dsl::*;
|
use self::db::files::dsl::*;
|
||||||
|
|
||||||
let _lock = self.lock.read().unwrap();
|
let _lock = self.lock.read().unwrap();
|
||||||
let conn = self.manager.connect()?;
|
let conn = self.pool.get()?;
|
||||||
|
|
||||||
let matches = files.load::<db::File>(&conn)?;
|
let matches = files.load::<db::File>(&conn)?;
|
||||||
Ok(matches)
|
Ok(matches)
|
||||||
|
@ -555,7 +554,7 @@ impl FsStore {
|
||||||
debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time);
|
debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time);
|
||||||
|
|
||||||
let _lock = self.lock.write().unwrap();
|
let _lock = self.lock.write().unwrap();
|
||||||
let conn = self.manager.connect()?;
|
let conn = self.pool.get()?;
|
||||||
|
|
||||||
Ok(diesel::update(files.filter(id.eq(file_id)))
|
Ok(diesel::update(files.filter(id.eq(file_id)))
|
||||||
.set(mtime.eq(m_time))
|
.set(mtime.eq(m_time))
|
||||||
|
@ -567,7 +566,7 @@ impl FsStore {
|
||||||
debug!("Setting file ID {} to valid = {}", file_id, is_valid);
|
debug!("Setting file ID {} to valid = {}", file_id, is_valid);
|
||||||
|
|
||||||
let _lock = self.lock.write().unwrap();
|
let _lock = self.lock.write().unwrap();
|
||||||
let conn = self.manager.connect()?;
|
let conn = self.pool.get()?;
|
||||||
|
|
||||||
Ok(diesel::update(files.filter(id.eq(file_id)))
|
Ok(diesel::update(files.filter(id.eq(file_id)))
|
||||||
.set(valid.eq(is_valid))
|
.set(valid.eq(is_valid))
|
||||||
|
|
Loading…
Reference in a new issue