refactor(db): refactor rescan process

new blobs are only placed if they aren't in any groups
get rid of add_file for store()
remove depthfirst
refactor/errors
Tomáš Mládek 2023-11-05 16:37:09 +01:00
parent dc9a626a4e
commit ba8d272bc2
3 changed files with 117 additions and 204 deletions

View File

@ -242,7 +242,6 @@ impl UpEndConnection {
if let Some(blob_mode) = options.blob_mode {
let tree_mode = match blob_mode {
BlobMode::Flat => "FLAT".to_string(),
BlobMode::DepthFirst => "DEPTH_FIRST".to_string(),
BlobMode::Mirror => "MIRROR".to_string(),
BlobMode::Incoming(None) => "INCOMING".to_string(),
BlobMode::Incoming(Some(group)) => format!("INCOMING:{}", group),
@ -257,7 +256,6 @@ impl UpEndConnection {
let blob_mode = match self.get_meta("VAULT_BLOB_MODE")? {
Some(mode) => match mode.as_str() {
"FLAT" => Some(BlobMode::Flat),
"DEPTH_FIRST" => Some(BlobMode::DepthFirst),
"MIRROR" => Some(BlobMode::Mirror),
"INCOMING" => Some(BlobMode::Incoming(None)),
"STORE_ONLY" => Some(BlobMode::StoreOnly),
@ -614,8 +612,6 @@ pub enum BlobMode {
#[default]
/// Mirror the original tree
Mirror,
/// Like Mirror, but chooses the shortest path
DepthFirst,
/// Use only the last level of the tree as a group
Flat,
/// Place all files in a single group

View File

@ -7,7 +7,7 @@ use crate::util::hash_at_path;
use crate::{
BlobMode, ConnectionOptions, LoggingHandler, UpEndConnection, UpEndDatabase, UPEND_SUBDIR,
};
use anyhow::{anyhow, Error, Result};
use anyhow::{anyhow, Result};
use chrono::prelude::*;
use diesel::r2d2::{self, ConnectionManager, ManageConnection};
use diesel::ExpressionMethods;
@ -17,7 +17,6 @@ use lru::LruCache;
use rayon::prelude::*;
use serde_json::json;
use std::borrow::Borrow;
use std::collections::HashMap;
use std::convert::TryInto;
use std::path::PathBuf;
use std::path::{Component, Path};
@ -123,7 +122,7 @@ impl FsStore {
// Walk through the vault, find all paths
trace!("Traversing vault directory");
let absolute_dir_path = fs::canonicalize(&*self.path)?;
let pathbufs: Vec<PathBuf> = WalkDir::new(&*self.path)
let paths: Vec<PathBuf> = WalkDir::new(&*self.path)
.follow_links(true)
.into_iter()
.filter_map(|e| e.ok())
@ -132,92 +131,6 @@ impl FsStore {
.filter(|e| !e.starts_with(absolute_dir_path.join(UPEND_SUBDIR)))
.collect();
let mut upaths: HashMap<PathBuf, UHierPath> = HashMap::new();
match options.tree_mode {
BlobMode::Flat => {
for pb in &pathbufs {
let normalized_path = self.normalize_path(pb).unwrap();
let dirname = normalized_path.parent().and_then(|p| p.components().last());
let upath = UHierPath(if let Some(dirname) = dirname {
vec![
"NATIVE".parse().unwrap(),
UNode::from_str(&dirname.as_os_str().to_string_lossy()).unwrap(),
]
} else {
vec!["NATIVE".parse().unwrap()]
});
upaths.insert(pb.clone(), upath);
}
}
BlobMode::DepthFirst => {
let mut shallowest: HashMap<String, PathBuf> = HashMap::new();
for path in &pathbufs {
let normalized_path = self.normalize_path(path).unwrap();
let dirname = normalized_path.parent().and_then(|p| p.components().last());
if let Some(dirname) = dirname {
let dirname = dirname.as_os_str().to_string_lossy().to_string();
if let Some(existing_path) = shallowest.get_mut(&dirname) {
if existing_path.components().count() > path.components().count() {
*existing_path = path.clone();
}
} else {
shallowest.insert(dirname.clone(), path.clone());
}
let shallowest_path = shallowest.get(&dirname).unwrap();
let upath = iter::once("NATIVE".parse().unwrap())
.chain(
self.normalize_path(shallowest_path)
.unwrap()
.parent()
.unwrap()
.iter()
.map(|component: &std::ffi::OsStr| {
UNode::from_str(&component.to_string_lossy()).unwrap()
}),
)
.collect::<Vec<UNode>>();
upaths.insert(path.clone(), UHierPath(upath));
} else {
upaths.insert(path.clone(), UHierPath(vec!["NATIVE".parse().unwrap()]));
}
}
}
BlobMode::Mirror => {
for pb in &pathbufs {
let normalized_path = self.normalize_path(&pb).unwrap();
let path = normalized_path.parent().unwrap();
let upath = iter::once("NATIVE".parse().unwrap())
.chain(path.iter().map(|component| {
UNode::from_str(&component.to_string_lossy()).unwrap()
}))
.collect::<Vec<UNode>>();
upaths.insert(pb.clone(), UHierPath(upath));
}
}
BlobMode::Incoming(group) => {
let upath = UHierPath(vec![group
.unwrap_or("INCOMING".to_string())
.parse()
.unwrap()]);
for pb in &pathbufs {
upaths.insert(pb.clone(), upath.clone());
}
}
BlobMode::StoreOnly => {}
};
let path_entries = pathbufs
.into_iter()
.map(|pb| {
let upath = upaths.remove(&pb).unwrap();
(pb, upath)
})
.collect::<Vec<(PathBuf, UHierPath)>>();
// Prepare for processing
let existing_files = Arc::new(RwLock::new(self.retrieve_all_files()?));
@ -225,18 +138,18 @@ impl FsStore {
let count = RwLock::new(0_usize);
let resolve_cache: Arc<Mutex<LruCache<(Option<Address>, UNode), Address>>> =
Arc::new(Mutex::new(LruCache::new(256)));
let total = path_entries.len() as f32;
let total = paths.len() as f32;
let shared_job_handle = Arc::new(Mutex::new(job_handle));
let path_outcomes: Vec<UpdatePathOutcome> = path_entries
let path_outcomes: Vec<UpdatePathOutcome> = paths
.into_par_iter()
.map(|(path, upath)| {
.map(|path| {
let result = self.process_directory_entry(
db,
&resolve_cache,
path.clone(),
upath,
options.tree_mode.clone(),
&existing_files,
&resolve_cache,
quick_check,
);
@ -263,10 +176,7 @@ impl FsStore {
let existing_files = existing_files.read().unwrap();
let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| {
let trans_result = upconnection.transaction::<_, Error, _>(|| {
self.file_set_valid(file.id, false)?;
Ok(())
});
let trans_result = self.file_set_valid(file.id, false);
match trans_result {
Ok(_) => {
@ -330,10 +240,10 @@ impl FsStore {
fn process_directory_entry<D: Borrow<UpEndDatabase>>(
&self,
db: D,
resolve_cache: &Arc<Mutex<ResolveCache>>,
path: PathBuf,
upath: UHierPath,
mode: BlobMode,
existing_files: &Arc<RwLock<Vec<db::File>>>,
resolve_cache: &Arc<Mutex<ResolveCache>>,
quick_check: bool,
) -> Result<UpdatePathOutcome> {
trace!("Processing: {:?}", path);
@ -414,21 +324,38 @@ impl FsStore {
drop(existing_files_read);
}
// If not, add it!
// If not, hash it.
if file_hash.is_none() {
file_hash = Some(hash_at_path(&path)?);
}
let mime_type = tree_magic_mini::from_filepath(&path).map(|s| s.to_string());
let file_hash = file_hash.unwrap();
let connection: UpEndConnection = db.borrow().connection()?;
let file_is_known = !connection
.query(
format!(
"(matches @{} \"{}\" ?)",
Address::Hash(file_hash.clone()),
ATTR_IN
)
.parse()?,
)?
.is_empty();
let upath = if !file_is_known {
self.path_to_upath(&path, mode)?
} else {
None
};
self.insert_file_with_metadata(
&db.borrow().connection()?,
&normalized_path,
&path,
upath,
file_hash.unwrap(),
file_hash,
None,
size,
mtime,
mime_type,
Some(resolve_cache),
)
.map(|_| {
@ -437,55 +364,57 @@ impl FsStore {
})
}
fn add_file(
&self,
connection: &UpEndConnection,
path: &Path,
upath: UHierPath,
hash: UpMultihash,
name_hint: Option<String>,
) -> Result<Address> {
let normalized_path = self.normalize_path(path)?;
let metadata = fs::metadata(path)?;
let size = metadata.len() as i64;
let mtime = metadata
.modified()
.map(|t| {
NaiveDateTime::from_timestamp_opt(
t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64,
0,
)
})
.ok()
.flatten();
let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string());
fn path_to_upath(&self, path: &Path, mode: BlobMode) -> Result<Option<UHierPath>> {
match mode {
BlobMode::Flat => {
let normalized_path = self.normalize_path(path).unwrap();
let dirname = normalized_path.parent().and_then(|p| p.components().last());
self.insert_file_with_metadata(
connection,
&normalized_path,
upath,
hash,
name_hint,
size,
mtime,
mime_type,
None,
)
let upath = UHierPath(if let Some(dirname) = dirname {
vec![
"NATIVE".parse().unwrap(),
UNode::from_str(&dirname.as_os_str().to_string_lossy()).unwrap(),
]
} else {
vec!["NATIVE".parse().unwrap()]
});
Ok(Some(upath))
}
BlobMode::Mirror => {
let normalized_path = self.normalize_path(path).unwrap();
let path = normalized_path.parent().unwrap();
let upath =
iter::once("NATIVE".parse().unwrap())
.chain(path.iter().map(|component| {
UNode::from_str(&component.to_string_lossy()).unwrap()
}))
.collect::<Vec<UNode>>();
Ok(Some(UHierPath(upath)))
}
BlobMode::Incoming(group) => {
let upath = UHierPath(vec![group.unwrap_or("INCOMING".to_string()).parse()?]);
Ok(Some(upath))
}
BlobMode::StoreOnly => Ok(None),
}
}
#[allow(clippy::too_many_arguments)]
fn insert_file_with_metadata(
&self,
connection: &UpEndConnection,
normalized_path: &Path,
upath: UHierPath,
path: &Path,
upath: Option<UHierPath>,
hash: UpMultihash,
name: Option<String>,
size: i64,
mtime: Option<NaiveDateTime>,
mime_type: Option<String>,
resolve_cache: Option<&Arc<Mutex<ResolveCache>>>,
) -> Result<Address> {
let normalized_path = self.normalize_path(path)?;
let new_file = db::NewFile {
path: normalized_path
.to_str()
@ -508,6 +437,7 @@ impl FsStore {
timestamp: chrono::Utc::now().naive_utc(),
};
let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string());
let mime_entry = mime_type.map(|mime_type| Entry {
entity: blob_address.clone(),
attribute: FILE_MIME_KEY.to_string(),
@ -528,18 +458,10 @@ impl FsStore {
timestamp: chrono::Utc::now().naive_utc(),
};
// Add the appropriate entries w/r/t virtual filesystem location
let components = normalized_path.components().collect::<Vec<Component>>();
let filename = components.last().unwrap();
let resolved_path = match resolve_cache {
Some(cache) => resolve_path_cached(connection, &upath, true, cache)?,
None => resolve_path(connection, &upath, true)?,
};
let parent_dir = resolved_path.last().unwrap();
// Insert all
let file_count = self.insert_file(new_file)?;
let file_count = self.insert_file_record(new_file)?;
connection.insert_entry_immutable(size_entry)?;
if file_count == 1 {
@ -549,15 +471,6 @@ impl FsStore {
connection.insert_entry(mime_entry)?;
}
let dir_has_entry = Entry {
entity: blob_address.clone(),
attribute: ATTR_IN.to_string(),
value: parent_dir.clone().into(),
provenance: "SYSTEM INIT".to_string(),
timestamp: chrono::Utc::now().naive_utc(),
};
let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?;
let label_entry = Entry {
entity: blob_address.clone(),
attribute: ATTR_LABEL.to_string(),
@ -569,19 +482,36 @@ impl FsStore {
};
let label_entry_addr = connection.insert_entry(label_entry)?;
let alias_entry = Entry {
entity: dir_has_entry_addr,
attribute: ATTR_BY.to_string(),
value: label_entry_addr.into(),
provenance: "SYSTEM INIT".to_string(),
timestamp: chrono::Utc::now().naive_utc(),
};
connection.insert_entry(alias_entry)?;
if let Some(upath) = upath {
let resolved_path = match resolve_cache {
Some(cache) => resolve_path_cached(connection, &upath, true, cache)?,
None => resolve_path(connection, &upath, true)?,
};
let parent_dir = resolved_path.last().unwrap();
let dir_has_entry = Entry {
entity: blob_address.clone(),
attribute: ATTR_IN.to_string(),
value: parent_dir.clone().into(),
provenance: "SYSTEM INIT".to_string(),
timestamp: chrono::Utc::now().naive_utc(),
};
let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?;
let alias_entry = Entry {
entity: dir_has_entry_addr,
attribute: ATTR_BY.to_string(),
value: label_entry_addr.into(),
provenance: "SYSTEM INIT".to_string(),
timestamp: chrono::Utc::now().naive_utc(),
};
connection.insert_entry(alias_entry)?;
}
Ok(blob_address)
}
pub fn insert_file(&self, file: db::NewFile) -> Result<u32> {
fn insert_file_record(&self, file: db::NewFile) -> Result<u32> {
trace!(
"Inserting {} ({})...",
&file.path,
@ -732,15 +662,31 @@ impl UpStore for FsStore {
};
let final_path = self.path.join(final_name);
fs::copy(file_path, &final_path).map_err(|e| StoreError::Unknown(e.to_string()))?;
self.add_file(
let metadata =
fs::metadata(&final_path).map_err(|e| StoreError::Unknown(e.to_string()))?;
let size = metadata.len() as i64;
let mtime = metadata
.modified()
.map(|t| {
NaiveDateTime::from_timestamp_opt(
t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64,
0,
)
})
.ok()
.flatten();
self.insert_file_with_metadata(
&connection,
&final_path,
UHierPath(vec!["NATIVE".parse().unwrap(), "INCOMING".parse().unwrap()]),
None,
hash.clone(),
name_hint,
size,
mtime,
None,
)
.map_err(|e| StoreError::Unknown(e.to_string()))?;
}
@ -1102,20 +1048,6 @@ mod test {
);
}
#[test]
fn test_depth_mode() {
test_initial_scan(
BlobMode::DepthFirst,
vec![
"NATIVE",
"NATIVE/nested_directory/nested_two/nested_four/baz.txt",
"NATIVE/nested_directory/nested_three/foo.txt",
"NATIVE/nested_directory/nested_three/bar.txt",
"NATIVE/in_root.txt",
],
);
}
#[test]
fn test_incoming_mode() {
test_initial_scan(

View File

@ -49,21 +49,6 @@
)}
</p>
</div>
<div class="option">
<IconButton
name="vertical-bottom"
outline
on:click={() => (mode = "DepthFirst")}
active={mode === "DepthFirst"}
>
Depth-First
</IconButton>
<p>
{$i18n.t(
"Like Mirror, but the shortest path with the same end group is chosen. ",
)}
</p>
</div>
<div class="option">
<IconButton
name="checkbox-minus"