lock update pool behind r/w lock; increase timeout to 30s
to-do - integrate r/w lock into pool directly?feat/vaults
parent
400b0d11f2
commit
63ffe42907
|
@ -302,7 +302,7 @@ pub fn open_upend<P: AsRef<Path>>(dirpath: P, reinitialize: bool) -> Result<Open
|
|||
let pool = r2d2::Pool::builder()
|
||||
.connection_customizer(Box::new(ConnectionOptions {
|
||||
enable_foreign_keys: true,
|
||||
busy_timeout: Some(Duration::from_secs(3)),
|
||||
busy_timeout: Some(Duration::from_secs(30)),
|
||||
}))
|
||||
.build(manager)
|
||||
.expect("Failed to create pool.");
|
||||
|
|
|
@ -14,6 +14,7 @@ use rayon::prelude::*;
|
|||
use serde::export::Formatter;
|
||||
use serde_json::Value;
|
||||
use std::path::{Component, Path, PathBuf};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::{fs, iter};
|
||||
use uuid::Uuid;
|
||||
use walkdir::WalkDir;
|
||||
|
@ -287,13 +288,13 @@ pub fn resolve_path<C: Connection<Backend = Sqlite>>(
|
|||
Ok(result)
|
||||
}
|
||||
pub async fn reimport_directory(pool: DbPool, directory: PathBuf) {
|
||||
let result = actix_web::web::block(move || _reimport_directory(&pool, directory)).await;
|
||||
let result = actix_web::web::block(move || _reimport_directory(pool, directory)).await;
|
||||
if result.is_err() {
|
||||
let err = result.err().unwrap();
|
||||
error!("Update did not succeed! {}", err);
|
||||
}
|
||||
}
|
||||
fn _reimport_directory<T: AsRef<Path>>(pool: &DbPool, directory: T) -> Result<()> {
|
||||
fn _reimport_directory<T: AsRef<Path>>(pool: DbPool, directory: T) -> Result<()> {
|
||||
let path_entries: Vec<PathBuf> = WalkDir::new(&directory)
|
||||
.into_iter()
|
||||
.filter_map(|e| e.ok())
|
||||
|
@ -301,29 +302,26 @@ fn _reimport_directory<T: AsRef<Path>>(pool: &DbPool, directory: T) -> Result<()
|
|||
.map(|e| fs::canonicalize(e.into_path()).unwrap())
|
||||
.collect();
|
||||
|
||||
let mutex_pool = Arc::new(RwLock::new(pool));
|
||||
let absolute_path = fs::canonicalize(&directory)?;
|
||||
path_entries
|
||||
.into_par_iter()
|
||||
.map(|path| {
|
||||
Ok(_process_directory_entry(
|
||||
&pool.get()?,
|
||||
path,
|
||||
&absolute_path,
|
||||
)?)
|
||||
})
|
||||
.map(|path| Ok(_process_directory_entry(&mutex_pool, path, &absolute_path)?))
|
||||
.collect::<Result<()>>()?;
|
||||
info!("Finished updating {}.", directory.as_ref().display());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
|
||||
connection: &C,
|
||||
fn _process_directory_entry<P: AsRef<Path>>(
|
||||
pool: &Arc<RwLock<DbPool>>,
|
||||
path: PathBuf,
|
||||
directory_path: &P,
|
||||
) -> Result<()> {
|
||||
info!("Processing: {:?}", path);
|
||||
|
||||
let pool = Arc::clone(&pool);
|
||||
|
||||
let metadata = fs::metadata(&path)?;
|
||||
let size = metadata.len() as i64;
|
||||
if size < 0 {
|
||||
|
@ -349,7 +347,7 @@ fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
|
|||
size,
|
||||
};
|
||||
|
||||
let _ = insert_file(connection, new_file)?;
|
||||
let _ = insert_file(&pool.write().unwrap().get()?, new_file)?;
|
||||
|
||||
let components = path
|
||||
.strip_prefix(&directory_path)?
|
||||
|
@ -366,9 +364,11 @@ fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
|
|||
}))
|
||||
.collect(),
|
||||
);
|
||||
let resolved_path = resolve_path(connection, &upath, true)?;
|
||||
let resolved_path = resolve_path(&pool.write().unwrap().get()?, &upath, true)?;
|
||||
let parent_dir = resolved_path.last().unwrap();
|
||||
|
||||
let _pool = &pool.write().unwrap();
|
||||
let connection = _pool.get()?;
|
||||
connection.transaction::<_, Error, _>(|| {
|
||||
let file_address = Address::UUID(Uuid::new_v4());
|
||||
|
||||
|
@ -379,7 +379,7 @@ fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
|
|||
filename.as_os_str().to_string_lossy().to_string(),
|
||||
)),
|
||||
};
|
||||
let _ = insert_entry(connection, name_entry)?;
|
||||
let _ = insert_entry(&connection, name_entry)?;
|
||||
|
||||
let identity_entry = InnerEntry {
|
||||
target: file_address.clone(),
|
||||
|
@ -387,14 +387,14 @@ fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
|
|||
value: EntryValue::Address(Address::Hash(digest.clone())),
|
||||
};
|
||||
|
||||
let _ = insert_entry(connection, identity_entry)?;
|
||||
let _ = insert_entry(&connection, identity_entry)?;
|
||||
|
||||
let dir_has_entry = InnerEntry {
|
||||
target: parent_dir.clone(),
|
||||
key: DIR_HAS_KEY.to_string(),
|
||||
value: EntryValue::Address(file_address),
|
||||
};
|
||||
let _ = insert_entry(connection, dir_has_entry)?;
|
||||
let _ = insert_entry(&connection, dir_has_entry)?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue