lock update pool behind r/w lock; increase timeout to 30s
to-do - integrate r/w lock into pool directly?feat/vaults
parent
400b0d11f2
commit
63ffe42907
|
@ -302,7 +302,7 @@ pub fn open_upend<P: AsRef<Path>>(dirpath: P, reinitialize: bool) -> Result<Open
|
||||||
let pool = r2d2::Pool::builder()
|
let pool = r2d2::Pool::builder()
|
||||||
.connection_customizer(Box::new(ConnectionOptions {
|
.connection_customizer(Box::new(ConnectionOptions {
|
||||||
enable_foreign_keys: true,
|
enable_foreign_keys: true,
|
||||||
busy_timeout: Some(Duration::from_secs(3)),
|
busy_timeout: Some(Duration::from_secs(30)),
|
||||||
}))
|
}))
|
||||||
.build(manager)
|
.build(manager)
|
||||||
.expect("Failed to create pool.");
|
.expect("Failed to create pool.");
|
||||||
|
|
|
@ -14,6 +14,7 @@ use rayon::prelude::*;
|
||||||
use serde::export::Formatter;
|
use serde::export::Formatter;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use std::path::{Component, Path, PathBuf};
|
use std::path::{Component, Path, PathBuf};
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
use std::{fs, iter};
|
use std::{fs, iter};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use walkdir::WalkDir;
|
use walkdir::WalkDir;
|
||||||
|
@ -287,13 +288,13 @@ pub fn resolve_path<C: Connection<Backend = Sqlite>>(
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
pub async fn reimport_directory(pool: DbPool, directory: PathBuf) {
|
pub async fn reimport_directory(pool: DbPool, directory: PathBuf) {
|
||||||
let result = actix_web::web::block(move || _reimport_directory(&pool, directory)).await;
|
let result = actix_web::web::block(move || _reimport_directory(pool, directory)).await;
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
let err = result.err().unwrap();
|
let err = result.err().unwrap();
|
||||||
error!("Update did not succeed! {}", err);
|
error!("Update did not succeed! {}", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fn _reimport_directory<T: AsRef<Path>>(pool: &DbPool, directory: T) -> Result<()> {
|
fn _reimport_directory<T: AsRef<Path>>(pool: DbPool, directory: T) -> Result<()> {
|
||||||
let path_entries: Vec<PathBuf> = WalkDir::new(&directory)
|
let path_entries: Vec<PathBuf> = WalkDir::new(&directory)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|e| e.ok())
|
.filter_map(|e| e.ok())
|
||||||
|
@ -301,29 +302,26 @@ fn _reimport_directory<T: AsRef<Path>>(pool: &DbPool, directory: T) -> Result<()
|
||||||
.map(|e| fs::canonicalize(e.into_path()).unwrap())
|
.map(|e| fs::canonicalize(e.into_path()).unwrap())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
let mutex_pool = Arc::new(RwLock::new(pool));
|
||||||
let absolute_path = fs::canonicalize(&directory)?;
|
let absolute_path = fs::canonicalize(&directory)?;
|
||||||
path_entries
|
path_entries
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|path| {
|
.map(|path| Ok(_process_directory_entry(&mutex_pool, path, &absolute_path)?))
|
||||||
Ok(_process_directory_entry(
|
|
||||||
&pool.get()?,
|
|
||||||
path,
|
|
||||||
&absolute_path,
|
|
||||||
)?)
|
|
||||||
})
|
|
||||||
.collect::<Result<()>>()?;
|
.collect::<Result<()>>()?;
|
||||||
info!("Finished updating {}.", directory.as_ref().display());
|
info!("Finished updating {}.", directory.as_ref().display());
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
|
fn _process_directory_entry<P: AsRef<Path>>(
|
||||||
connection: &C,
|
pool: &Arc<RwLock<DbPool>>,
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
directory_path: &P,
|
directory_path: &P,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
info!("Processing: {:?}", path);
|
info!("Processing: {:?}", path);
|
||||||
|
|
||||||
|
let pool = Arc::clone(&pool);
|
||||||
|
|
||||||
let metadata = fs::metadata(&path)?;
|
let metadata = fs::metadata(&path)?;
|
||||||
let size = metadata.len() as i64;
|
let size = metadata.len() as i64;
|
||||||
if size < 0 {
|
if size < 0 {
|
||||||
|
@ -349,7 +347,7 @@ fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
|
||||||
size,
|
size,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = insert_file(connection, new_file)?;
|
let _ = insert_file(&pool.write().unwrap().get()?, new_file)?;
|
||||||
|
|
||||||
let components = path
|
let components = path
|
||||||
.strip_prefix(&directory_path)?
|
.strip_prefix(&directory_path)?
|
||||||
|
@ -366,9 +364,11 @@ fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
|
||||||
}))
|
}))
|
||||||
.collect(),
|
.collect(),
|
||||||
);
|
);
|
||||||
let resolved_path = resolve_path(connection, &upath, true)?;
|
let resolved_path = resolve_path(&pool.write().unwrap().get()?, &upath, true)?;
|
||||||
let parent_dir = resolved_path.last().unwrap();
|
let parent_dir = resolved_path.last().unwrap();
|
||||||
|
|
||||||
|
let _pool = &pool.write().unwrap();
|
||||||
|
let connection = _pool.get()?;
|
||||||
connection.transaction::<_, Error, _>(|| {
|
connection.transaction::<_, Error, _>(|| {
|
||||||
let file_address = Address::UUID(Uuid::new_v4());
|
let file_address = Address::UUID(Uuid::new_v4());
|
||||||
|
|
||||||
|
@ -379,7 +379,7 @@ fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
|
||||||
filename.as_os_str().to_string_lossy().to_string(),
|
filename.as_os_str().to_string_lossy().to_string(),
|
||||||
)),
|
)),
|
||||||
};
|
};
|
||||||
let _ = insert_entry(connection, name_entry)?;
|
let _ = insert_entry(&connection, name_entry)?;
|
||||||
|
|
||||||
let identity_entry = InnerEntry {
|
let identity_entry = InnerEntry {
|
||||||
target: file_address.clone(),
|
target: file_address.clone(),
|
||||||
|
@ -387,14 +387,14 @@ fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
|
||||||
value: EntryValue::Address(Address::Hash(digest.clone())),
|
value: EntryValue::Address(Address::Hash(digest.clone())),
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = insert_entry(connection, identity_entry)?;
|
let _ = insert_entry(&connection, identity_entry)?;
|
||||||
|
|
||||||
let dir_has_entry = InnerEntry {
|
let dir_has_entry = InnerEntry {
|
||||||
target: parent_dir.clone(),
|
target: parent_dir.clone(),
|
||||||
key: DIR_HAS_KEY.to_string(),
|
key: DIR_HAS_KEY.to_string(),
|
||||||
value: EntryValue::Address(file_address),
|
value: EntryValue::Address(file_address),
|
||||||
};
|
};
|
||||||
let _ = insert_entry(connection, dir_has_entry)?;
|
let _ = insert_entry(&connection, dir_has_entry)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue