split business code in reimport_directory in a separate function
parent
00e0dc288c
commit
e0a03b30fa
|
@ -289,100 +289,6 @@ pub async fn resolve_path<C: Connection<Backend = Sqlite>>(
|
|||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn _reimport_directory<C: Connection<Backend = Sqlite>, T: AsRef<Path>>(
|
||||
connection: &C,
|
||||
directory: T,
|
||||
hasher_worker: &Addr<HasherWorker>,
|
||||
) -> Result<()> {
|
||||
let absolute_path = fs::canonicalize(&directory)?;
|
||||
let path_entries: Result<Vec<PathBuf>, std::io::Error> = WalkDir::new(&directory)
|
||||
.into_iter()
|
||||
.filter_map(|e| e.ok())
|
||||
.filter(|e| e.path().is_file())
|
||||
.map(|e| fs::canonicalize(e.into_path()))
|
||||
.collect();
|
||||
|
||||
for path in path_entries? {
|
||||
info!("Processing: {:?}", path);
|
||||
|
||||
let metadata = fs::metadata(&path)?;
|
||||
let size = metadata.len() as i64;
|
||||
if size < 0 {
|
||||
panic!("File {} too large?!", path.display());
|
||||
}
|
||||
|
||||
let digest = hasher_worker
|
||||
.send(ComputeHash { path: path.clone() })
|
||||
.await??;
|
||||
|
||||
// let existing_file: Option<String> = db_executor
|
||||
// .send(RetrieveByHash {
|
||||
// hash: digest.clone(),
|
||||
// })
|
||||
// .await??;
|
||||
|
||||
let new_file = models::NewFile {
|
||||
path: path
|
||||
.strip_prefix(&absolute_path)?
|
||||
.to_str()
|
||||
.expect("path not valid unicode?!")
|
||||
.to_string(),
|
||||
hash: (digest.clone()).0,
|
||||
created: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
|
||||
size,
|
||||
};
|
||||
|
||||
let _ = insert_file(connection, new_file)?;
|
||||
|
||||
let components = path
|
||||
.strip_prefix(&absolute_path)?
|
||||
.components()
|
||||
.collect::<Vec<Component>>();
|
||||
let (filename, dir_path) = components.split_last().unwrap();
|
||||
|
||||
let file_address = Address::UUID(Uuid::new_v4());
|
||||
|
||||
let name_entry = InnerEntry {
|
||||
target: file_address.clone(),
|
||||
key: FILENAME_KEY.to_string(),
|
||||
value: EntryValue::Value(Value::String(
|
||||
filename.as_os_str().to_string_lossy().to_string(),
|
||||
)),
|
||||
};
|
||||
let _ = insert_entry(connection, name_entry)?;
|
||||
|
||||
let identity_entry = InnerEntry {
|
||||
target: file_address.clone(),
|
||||
key: FILE_IDENTITY_KEY.to_string(),
|
||||
value: EntryValue::Address(Address::Hash(digest.clone())),
|
||||
};
|
||||
|
||||
let _ = insert_entry(connection, identity_entry)?;
|
||||
|
||||
let upath = UPath(
|
||||
iter::once(UDirectory {
|
||||
name: "NATIVE".to_string(),
|
||||
})
|
||||
.chain(dir_path.iter().map(|component| UDirectory {
|
||||
name: component.as_os_str().to_string_lossy().to_string(),
|
||||
}))
|
||||
.collect(),
|
||||
);
|
||||
let resolved_path = resolve_path(connection, &upath, true).await?;
|
||||
let parent_dir = resolved_path.last().unwrap();
|
||||
let dir_has_entry = InnerEntry {
|
||||
target: parent_dir.clone(),
|
||||
key: DIR_HAS_KEY.to_string(),
|
||||
value: EntryValue::Address(file_address),
|
||||
};
|
||||
let _ = insert_entry(connection, dir_has_entry)?;
|
||||
}
|
||||
info!("Finished updating {}.", directory.as_ref().display());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn reimport_directory<C: Connection<Backend = Sqlite>>(
|
||||
connection: C,
|
||||
directory: PathBuf,
|
||||
|
@ -393,6 +299,109 @@ pub async fn reimport_directory<C: Connection<Backend = Sqlite>>(
|
|||
error!("Update did not succeed! {}", result.err().unwrap());
|
||||
}
|
||||
}
|
||||
async fn _reimport_directory<C: Connection<Backend = Sqlite>, T: AsRef<Path>>(
|
||||
connection: &C,
|
||||
directory: T,
|
||||
hasher_worker: &Addr<HasherWorker>,
|
||||
) -> Result<()> {
|
||||
let path_entries: Result<Vec<PathBuf>, std::io::Error> = WalkDir::new(&directory)
|
||||
.into_iter()
|
||||
.filter_map(|e| e.ok())
|
||||
.filter(|e| e.path().is_file())
|
||||
.map(|e| fs::canonicalize(e.into_path()))
|
||||
.collect();
|
||||
|
||||
let absolute_path = fs::canonicalize(&directory)?;
|
||||
for path in path_entries? {
|
||||
_process_directory_entry(connection, path, &absolute_path, &hasher_worker).await?;
|
||||
}
|
||||
info!("Finished updating {}.", directory.as_ref().display());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
|
||||
connection: &C,
|
||||
path: PathBuf,
|
||||
directory_path: &P,
|
||||
hasher_worker: &Addr<HasherWorker>,
|
||||
) -> Result<()> {
|
||||
info!("Processing: {:?}", path);
|
||||
|
||||
let metadata = fs::metadata(&path)?;
|
||||
let size = metadata.len() as i64;
|
||||
if size < 0 {
|
||||
panic!("File {} too large?!", path.display());
|
||||
}
|
||||
|
||||
let digest = hasher_worker
|
||||
.send(ComputeHash { path: path.clone() })
|
||||
.await??;
|
||||
|
||||
// let existing_file: Option<String> = db_executor
|
||||
// .send(RetrieveByHash {
|
||||
// hash: digest.clone(),
|
||||
// })
|
||||
// .await??;
|
||||
|
||||
let new_file = models::NewFile {
|
||||
path: path
|
||||
.strip_prefix(&directory_path)?
|
||||
.to_str()
|
||||
.expect("path not valid unicode?!")
|
||||
.to_string(),
|
||||
hash: (digest.clone()).0,
|
||||
created: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
|
||||
size,
|
||||
};
|
||||
|
||||
let _ = insert_file(connection, new_file)?;
|
||||
|
||||
let components = path
|
||||
.strip_prefix(&directory_path)?
|
||||
.components()
|
||||
.collect::<Vec<Component>>();
|
||||
let (filename, dir_path) = components.split_last().unwrap();
|
||||
|
||||
let file_address = Address::UUID(Uuid::new_v4());
|
||||
|
||||
let name_entry = InnerEntry {
|
||||
target: file_address.clone(),
|
||||
key: FILENAME_KEY.to_string(),
|
||||
value: EntryValue::Value(Value::String(
|
||||
filename.as_os_str().to_string_lossy().to_string(),
|
||||
)),
|
||||
};
|
||||
let _ = insert_entry(connection, name_entry)?;
|
||||
|
||||
let identity_entry = InnerEntry {
|
||||
target: file_address.clone(),
|
||||
key: FILE_IDENTITY_KEY.to_string(),
|
||||
value: EntryValue::Address(Address::Hash(digest.clone())),
|
||||
};
|
||||
|
||||
let _ = insert_entry(connection, identity_entry)?;
|
||||
|
||||
let upath = UPath(
|
||||
iter::once(UDirectory {
|
||||
name: "NATIVE".to_string(),
|
||||
})
|
||||
.chain(dir_path.iter().map(|component| UDirectory {
|
||||
name: component.as_os_str().to_string_lossy().to_string(),
|
||||
}))
|
||||
.collect(),
|
||||
);
|
||||
let resolved_path = resolve_path(connection, &upath, true).await?;
|
||||
let parent_dir = resolved_path.last().unwrap();
|
||||
let dir_has_entry = InnerEntry {
|
||||
target: parent_dir.clone(),
|
||||
key: DIR_HAS_KEY.to_string(),
|
||||
value: EntryValue::Address(file_address),
|
||||
};
|
||||
let _ = insert_entry(connection, dir_has_entry)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
|
Loading…
Reference in New Issue