lock update pool behind r/w lock; increase timeout to 30s

to-do - integrate r/w lock into pool directly?
feat/vaults
Tomáš Mládek 2020-09-20 20:14:05 +02:00
parent 400b0d11f2
commit 63ffe42907
2 changed files with 17 additions and 17 deletions

View File

@ -302,7 +302,7 @@ pub fn open_upend<P: AsRef<Path>>(dirpath: P, reinitialize: bool) -> Result<Open
let pool = r2d2::Pool::builder()
.connection_customizer(Box::new(ConnectionOptions {
enable_foreign_keys: true,
busy_timeout: Some(Duration::from_secs(3)),
busy_timeout: Some(Duration::from_secs(30)),
}))
.build(manager)
.expect("Failed to create pool.");

View File

@ -14,6 +14,7 @@ use rayon::prelude::*;
use serde::export::Formatter;
use serde_json::Value;
use std::path::{Component, Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::{fs, iter};
use uuid::Uuid;
use walkdir::WalkDir;
@ -287,13 +288,13 @@ pub fn resolve_path<C: Connection<Backend = Sqlite>>(
Ok(result)
}
pub async fn reimport_directory(pool: DbPool, directory: PathBuf) {
let result = actix_web::web::block(move || _reimport_directory(&pool, directory)).await;
let result = actix_web::web::block(move || _reimport_directory(pool, directory)).await;
if result.is_err() {
let err = result.err().unwrap();
error!("Update did not succeed! {}", err);
}
}
fn _reimport_directory<T: AsRef<Path>>(pool: &DbPool, directory: T) -> Result<()> {
fn _reimport_directory<T: AsRef<Path>>(pool: DbPool, directory: T) -> Result<()> {
let path_entries: Vec<PathBuf> = WalkDir::new(&directory)
.into_iter()
.filter_map(|e| e.ok())
@ -301,29 +302,26 @@ fn _reimport_directory<T: AsRef<Path>>(pool: &DbPool, directory: T) -> Result<()
.map(|e| fs::canonicalize(e.into_path()).unwrap())
.collect();
let mutex_pool = Arc::new(RwLock::new(pool));
let absolute_path = fs::canonicalize(&directory)?;
path_entries
.into_par_iter()
.map(|path| {
Ok(_process_directory_entry(
&pool.get()?,
path,
&absolute_path,
)?)
})
.map(|path| Ok(_process_directory_entry(&mutex_pool, path, &absolute_path)?))
.collect::<Result<()>>()?;
info!("Finished updating {}.", directory.as_ref().display());
Ok(())
}
fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
connection: &C,
fn _process_directory_entry<P: AsRef<Path>>(
pool: &Arc<RwLock<DbPool>>,
path: PathBuf,
directory_path: &P,
) -> Result<()> {
info!("Processing: {:?}", path);
let pool = Arc::clone(&pool);
let metadata = fs::metadata(&path)?;
let size = metadata.len() as i64;
if size < 0 {
@ -349,7 +347,7 @@ fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
size,
};
let _ = insert_file(connection, new_file)?;
let _ = insert_file(&pool.write().unwrap().get()?, new_file)?;
let components = path
.strip_prefix(&directory_path)?
@ -366,9 +364,11 @@ fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
}))
.collect(),
);
let resolved_path = resolve_path(connection, &upath, true)?;
let resolved_path = resolve_path(&pool.write().unwrap().get()?, &upath, true)?;
let parent_dir = resolved_path.last().unwrap();
let _pool = &pool.write().unwrap();
let connection = _pool.get()?;
connection.transaction::<_, Error, _>(|| {
let file_address = Address::UUID(Uuid::new_v4());
@ -379,7 +379,7 @@ fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
filename.as_os_str().to_string_lossy().to_string(),
)),
};
let _ = insert_entry(connection, name_entry)?;
let _ = insert_entry(&connection, name_entry)?;
let identity_entry = InnerEntry {
target: file_address.clone(),
@ -387,14 +387,14 @@ fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
value: EntryValue::Address(Address::Hash(digest.clone())),
};
let _ = insert_entry(connection, identity_entry)?;
let _ = insert_entry(&connection, identity_entry)?;
let dir_has_entry = InnerEntry {
target: parent_dir.clone(),
key: DIR_HAS_KEY.to_string(),
value: EntryValue::Address(file_address),
};
let _ = insert_entry(connection, dir_has_entry)?;
let _ = insert_entry(&connection, dir_has_entry)?;
Ok(())
})