split business code in reimport_directory in a separate function

feat/vaults
Tomáš Mládek 2020-09-20 16:29:16 +02:00
parent 00e0dc288c
commit e0a03b30fa
1 changed files with 103 additions and 94 deletions

View File

@ -289,100 +289,6 @@ pub async fn resolve_path<C: Connection<Backend = Sqlite>>(
Ok(result)
}
async fn _reimport_directory<C: Connection<Backend = Sqlite>, T: AsRef<Path>>(
connection: &C,
directory: T,
hasher_worker: &Addr<HasherWorker>,
) -> Result<()> {
let absolute_path = fs::canonicalize(&directory)?;
let path_entries: Result<Vec<PathBuf>, std::io::Error> = WalkDir::new(&directory)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.path().is_file())
.map(|e| fs::canonicalize(e.into_path()))
.collect();
for path in path_entries? {
info!("Processing: {:?}", path);
let metadata = fs::metadata(&path)?;
let size = metadata.len() as i64;
if size < 0 {
panic!("File {} too large?!", path.display());
}
let digest = hasher_worker
.send(ComputeHash { path: path.clone() })
.await??;
// let existing_file: Option<String> = db_executor
// .send(RetrieveByHash {
// hash: digest.clone(),
// })
// .await??;
let new_file = models::NewFile {
path: path
.strip_prefix(&absolute_path)?
.to_str()
.expect("path not valid unicode?!")
.to_string(),
hash: (digest.clone()).0,
created: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
size,
};
let _ = insert_file(connection, new_file)?;
let components = path
.strip_prefix(&absolute_path)?
.components()
.collect::<Vec<Component>>();
let (filename, dir_path) = components.split_last().unwrap();
let file_address = Address::UUID(Uuid::new_v4());
let name_entry = InnerEntry {
target: file_address.clone(),
key: FILENAME_KEY.to_string(),
value: EntryValue::Value(Value::String(
filename.as_os_str().to_string_lossy().to_string(),
)),
};
let _ = insert_entry(connection, name_entry)?;
let identity_entry = InnerEntry {
target: file_address.clone(),
key: FILE_IDENTITY_KEY.to_string(),
value: EntryValue::Address(Address::Hash(digest.clone())),
};
let _ = insert_entry(connection, identity_entry)?;
let upath = UPath(
iter::once(UDirectory {
name: "NATIVE".to_string(),
})
.chain(dir_path.iter().map(|component| UDirectory {
name: component.as_os_str().to_string_lossy().to_string(),
}))
.collect(),
);
let resolved_path = resolve_path(connection, &upath, true).await?;
let parent_dir = resolved_path.last().unwrap();
let dir_has_entry = InnerEntry {
target: parent_dir.clone(),
key: DIR_HAS_KEY.to_string(),
value: EntryValue::Address(file_address),
};
let _ = insert_entry(connection, dir_has_entry)?;
}
info!("Finished updating {}.", directory.as_ref().display());
Ok(())
}
pub async fn reimport_directory<C: Connection<Backend = Sqlite>>(
connection: C,
directory: PathBuf,
@ -393,6 +299,109 @@ pub async fn reimport_directory<C: Connection<Backend = Sqlite>>(
error!("Update did not succeed! {}", result.err().unwrap());
}
}
async fn _reimport_directory<C: Connection<Backend = Sqlite>, T: AsRef<Path>>(
connection: &C,
directory: T,
hasher_worker: &Addr<HasherWorker>,
) -> Result<()> {
let path_entries: Result<Vec<PathBuf>, std::io::Error> = WalkDir::new(&directory)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.path().is_file())
.map(|e| fs::canonicalize(e.into_path()))
.collect();
let absolute_path = fs::canonicalize(&directory)?;
for path in path_entries? {
_process_directory_entry(connection, path, &absolute_path, &hasher_worker).await?;
}
info!("Finished updating {}.", directory.as_ref().display());
Ok(())
}
async fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
connection: &C,
path: PathBuf,
directory_path: &P,
hasher_worker: &Addr<HasherWorker>,
) -> Result<()> {
info!("Processing: {:?}", path);
let metadata = fs::metadata(&path)?;
let size = metadata.len() as i64;
if size < 0 {
panic!("File {} too large?!", path.display());
}
let digest = hasher_worker
.send(ComputeHash { path: path.clone() })
.await??;
// let existing_file: Option<String> = db_executor
// .send(RetrieveByHash {
// hash: digest.clone(),
// })
// .await??;
let new_file = models::NewFile {
path: path
.strip_prefix(&directory_path)?
.to_str()
.expect("path not valid unicode?!")
.to_string(),
hash: (digest.clone()).0,
created: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
size,
};
let _ = insert_file(connection, new_file)?;
let components = path
.strip_prefix(&directory_path)?
.components()
.collect::<Vec<Component>>();
let (filename, dir_path) = components.split_last().unwrap();
let file_address = Address::UUID(Uuid::new_v4());
let name_entry = InnerEntry {
target: file_address.clone(),
key: FILENAME_KEY.to_string(),
value: EntryValue::Value(Value::String(
filename.as_os_str().to_string_lossy().to_string(),
)),
};
let _ = insert_entry(connection, name_entry)?;
let identity_entry = InnerEntry {
target: file_address.clone(),
key: FILE_IDENTITY_KEY.to_string(),
value: EntryValue::Address(Address::Hash(digest.clone())),
};
let _ = insert_entry(connection, identity_entry)?;
let upath = UPath(
iter::once(UDirectory {
name: "NATIVE".to_string(),
})
.chain(dir_path.iter().map(|component| UDirectory {
name: component.as_os_str().to_string_lossy().to_string(),
}))
.collect(),
);
let resolved_path = resolve_path(connection, &upath, true).await?;
let parent_dir = resolved_path.last().unwrap();
let dir_has_entry = InnerEntry {
target: parent_dir.clone(),
key: DIR_HAS_KEY.to_string(),
value: EntryValue::Address(file_address),
};
let _ = insert_entry(connection, dir_has_entry)?;
Ok(())
}
#[cfg(test)]
mod tests {