non-duplicate updates (without updating virtual fs structure yet)
This commit is contained in:
parent
63ffe42907
commit
80541fe978
6 changed files with 132 additions and 41 deletions
|
@ -13,9 +13,10 @@ CREATE TABLE files
|
|||
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
|
||||
hash BLOB NOT NULL,
|
||||
path VARCHAR NOT NULL,
|
||||
valid BOOLEAN NOT NULL DEFAULT TRUE,
|
||||
added DATETIME NOT NULL,
|
||||
size BIGINT NOT NULL,
|
||||
created DATETIME NOT NULL,
|
||||
valid BOOLEAN NOT NULL DEFAULT TRUE
|
||||
mtime DATETIME NULL
|
||||
);
|
||||
|
||||
CREATE INDEX files_hash ON files (hash);
|
||||
|
|
|
@ -150,7 +150,15 @@ pub fn insert_file<C: Connection<Backend = Sqlite>>(
|
|||
.execute(connection)?)
|
||||
}
|
||||
|
||||
pub fn retrieve_by_hash<C: Connection<Backend = Sqlite>>(
|
||||
pub fn retrieve_all_files<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
) -> Result<Vec<models::File>> {
|
||||
use crate::schema::files::dsl::*;
|
||||
let matches = files.load::<models::File>(connection)?;
|
||||
Ok(matches)
|
||||
}
|
||||
|
||||
pub fn retrieve_file<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
obj_hash: Hash,
|
||||
) -> Result<Option<String>> {
|
||||
|
@ -164,6 +172,20 @@ pub fn retrieve_by_hash<C: Connection<Backend = Sqlite>>(
|
|||
Ok(matches.get(0).map(|f| f.path.clone()))
|
||||
}
|
||||
|
||||
pub fn file_set_valid<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
file_id: i32,
|
||||
is_valid: bool,
|
||||
) -> Result<usize> {
|
||||
use crate::schema::files::dsl::*;
|
||||
|
||||
debug!("Setting file ID {} to valid = {}", file_id, is_valid);
|
||||
|
||||
Ok(diesel::update(files.filter(id.eq(file_id)))
|
||||
.set(valid.eq(is_valid))
|
||||
.execute(connection)?)
|
||||
}
|
||||
|
||||
pub fn lookup_by_filename<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
query: String,
|
||||
|
@ -197,6 +219,21 @@ pub fn retrieve_object<C: Connection<Backend = Sqlite>>(
|
|||
Ok(entries)
|
||||
}
|
||||
|
||||
pub fn remove_object<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
object_address: Address,
|
||||
) -> Result<usize> {
|
||||
use crate::schema::data::dsl::*;
|
||||
|
||||
debug!("Deleting {}!", object_address);
|
||||
|
||||
let matches = data
|
||||
.filter(target.eq(object_address.encode()?))
|
||||
.or_filter(value.eq(EntryValue::Address(object_address).to_str()?));
|
||||
|
||||
Ok(diesel::delete(matches).execute(connection)?)
|
||||
}
|
||||
|
||||
pub struct EntryQuery {
|
||||
pub target: Option<Address>,
|
||||
pub key: Option<String>,
|
||||
|
@ -287,7 +324,7 @@ pub struct OpenResult {
|
|||
pub new: bool,
|
||||
}
|
||||
|
||||
const DATABASE_FILENAME: &str = "upend.sqlite3";
|
||||
pub const DATABASE_FILENAME: &str = "upend.sqlite3";
|
||||
|
||||
pub fn open_upend<P: AsRef<Path>>(dirpath: P, reinitialize: bool) -> Result<OpenResult> {
|
||||
embed_migrations!("./migrations/upend/");
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
use crate::addressing::Address;
|
||||
use crate::database::{
|
||||
insert_entry, insert_file, query_entries, retrieve_object, DbPool, Entry, EntryQuery,
|
||||
EntryValue, InnerEntry,
|
||||
file_set_valid, insert_entry, insert_file, query_entries, retrieve_all_files, retrieve_object,
|
||||
DbPool, Entry, EntryQuery, EntryValue, InnerEntry, DATABASE_FILENAME,
|
||||
};
|
||||
use crate::hash::Hashable;
|
||||
use crate::models;
|
||||
use crate::models::File;
|
||||
use anyhow::{anyhow, Error, Result};
|
||||
use chrono::prelude::*;
|
||||
use diesel::sqlite::Sqlite;
|
||||
|
@ -15,6 +16,7 @@ use serde::export::Formatter;
|
|||
use serde_json::Value;
|
||||
use std::path::{Component, Path, PathBuf};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::UNIX_EPOCH;
|
||||
use std::{fs, iter};
|
||||
use uuid::Uuid;
|
||||
use walkdir::WalkDir;
|
||||
|
@ -294,65 +296,113 @@ pub async fn reimport_directory(pool: DbPool, directory: PathBuf) {
|
|||
error!("Update did not succeed! {}", err);
|
||||
}
|
||||
}
|
||||
fn _reimport_directory<T: AsRef<Path>>(pool: DbPool, directory: T) -> Result<()> {
|
||||
fn _reimport_directory<T: AsRef<Path>>(pool: DbPool, directory: T) -> Result<Vec<Result<()>>> {
|
||||
let path_entries: Vec<PathBuf> = WalkDir::new(&directory)
|
||||
.into_iter()
|
||||
.filter_map(|e| e.ok())
|
||||
.filter(|e| e.path().is_file())
|
||||
.filter(|e| e.path().is_file() && e.file_name() != DATABASE_FILENAME)
|
||||
.map(|e| fs::canonicalize(e.into_path()).unwrap())
|
||||
.collect();
|
||||
|
||||
let mutex_pool = Arc::new(RwLock::new(pool));
|
||||
let rw_pool = Arc::new(RwLock::new(pool.clone()));
|
||||
let absolute_path = fs::canonicalize(&directory)?;
|
||||
path_entries
|
||||
let existing_files = Arc::new(RwLock::new(retrieve_all_files(&pool.get()?)?));
|
||||
|
||||
let path_results: Vec<Result<()>> = path_entries
|
||||
.into_par_iter()
|
||||
.map(|path| Ok(_process_directory_entry(&mutex_pool, path, &absolute_path)?))
|
||||
.collect::<Result<()>>()?;
|
||||
.map(|path| {
|
||||
Ok(_process_directory_entry(
|
||||
&rw_pool,
|
||||
path,
|
||||
&absolute_path,
|
||||
&existing_files,
|
||||
)?)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let cleanup_results: Vec<Result<()>> = existing_files
|
||||
.write()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.filter(|f| f.valid)
|
||||
.map(|file| {
|
||||
let connection = pool.get()?;
|
||||
connection.transaction::<_, Error, _>(|| {
|
||||
let _ = file_set_valid(&connection, file.id, false)?;
|
||||
// remove_object(&connection, )?
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
info!("Finished updating {}.", directory.as_ref().display());
|
||||
|
||||
Ok(())
|
||||
Ok(path_results
|
||||
.into_iter()
|
||||
.chain(cleanup_results.into_iter())
|
||||
.collect())
|
||||
}
|
||||
|
||||
fn _process_directory_entry<P: AsRef<Path>>(
|
||||
pool: &Arc<RwLock<DbPool>>,
|
||||
db_pool: &Arc<RwLock<DbPool>>,
|
||||
path: PathBuf,
|
||||
directory_path: &P,
|
||||
existing_files: &Arc<RwLock<Vec<File>>>,
|
||||
) -> Result<()> {
|
||||
info!("Processing: {:?}", path);
|
||||
|
||||
let pool = Arc::clone(&pool);
|
||||
let db_pool = Arc::clone(&db_pool);
|
||||
let existing_files = Arc::clone(&existing_files);
|
||||
|
||||
let normalized_path = path.strip_prefix(&directory_path)?;
|
||||
let normalized_path_str = normalized_path.to_str().expect("path not valid unicode?!");
|
||||
|
||||
let metadata = fs::metadata(&path)?;
|
||||
let size = metadata.len() as i64;
|
||||
if size < 0 {
|
||||
panic!("File {} too large?!", path.display());
|
||||
}
|
||||
let mtime = metadata
|
||||
.modified()
|
||||
.map(|t| {
|
||||
NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0)
|
||||
})
|
||||
.ok();
|
||||
|
||||
{
|
||||
let mut existing_files = existing_files.write().unwrap();
|
||||
|
||||
let maybe_existing_file = existing_files
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, file)| file.path == normalized_path_str)
|
||||
.clone();
|
||||
|
||||
if let Some((idx, existing_file)) = maybe_existing_file {
|
||||
if size == existing_file.size && mtime == existing_file.mtime {
|
||||
if !existing_file.valid {
|
||||
let _ =
|
||||
file_set_valid(&db_pool.write().unwrap().get()?, existing_file.id, true)?;
|
||||
}
|
||||
existing_files.remove(idx);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let digest = path.hash()?;
|
||||
|
||||
// let existing_file: Option<String> = db_executor
|
||||
// .send(RetrieveByHash {
|
||||
// hash: digest.clone(),
|
||||
// })
|
||||
// .await??;
|
||||
|
||||
let new_file = models::NewFile {
|
||||
path: path
|
||||
.strip_prefix(&directory_path)?
|
||||
.to_str()
|
||||
.expect("path not valid unicode?!")
|
||||
.to_string(),
|
||||
path: normalized_path_str.to_string(),
|
||||
hash: (digest.clone()).0,
|
||||
created: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
|
||||
added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
|
||||
size,
|
||||
mtime,
|
||||
};
|
||||
|
||||
let _ = insert_file(&pool.write().unwrap().get()?, new_file)?;
|
||||
let _ = insert_file(&db_pool.write().unwrap().get()?, new_file)?;
|
||||
|
||||
let components = path
|
||||
.strip_prefix(&directory_path)?
|
||||
.components()
|
||||
.collect::<Vec<Component>>();
|
||||
let components = normalized_path.components().collect::<Vec<Component>>();
|
||||
let (filename, dir_path) = components.split_last().unwrap();
|
||||
|
||||
let upath = UPath(
|
||||
|
@ -364,10 +414,10 @@ fn _process_directory_entry<P: AsRef<Path>>(
|
|||
}))
|
||||
.collect(),
|
||||
);
|
||||
let resolved_path = resolve_path(&pool.write().unwrap().get()?, &upath, true)?;
|
||||
let resolved_path = resolve_path(&db_pool.write().unwrap().get()?, &upath, true)?;
|
||||
let parent_dir = resolved_path.last().unwrap();
|
||||
|
||||
let _pool = &pool.write().unwrap();
|
||||
let _pool = &db_pool.write().unwrap();
|
||||
let connection = _pool.get()?;
|
||||
connection.transaction::<_, Error, _>(|| {
|
||||
let file_address = Address::UUID(Uuid::new_v4());
|
||||
|
|
|
@ -8,9 +8,10 @@ pub struct File {
|
|||
pub id: i32,
|
||||
pub hash: Vec<u8>,
|
||||
pub path: String,
|
||||
pub size: i64,
|
||||
pub created: NaiveDateTime,
|
||||
pub valid: bool,
|
||||
pub added: NaiveDateTime,
|
||||
pub size: i64,
|
||||
pub mtime: Option<NaiveDateTime>,
|
||||
}
|
||||
|
||||
#[derive(Insertable, Debug)]
|
||||
|
@ -18,8 +19,9 @@ pub struct File {
|
|||
pub struct NewFile {
|
||||
pub hash: Vec<u8>,
|
||||
pub path: String,
|
||||
pub added: NaiveDateTime,
|
||||
pub size: i64,
|
||||
pub created: NaiveDateTime,
|
||||
pub mtime: Option<NaiveDateTime>,
|
||||
}
|
||||
|
||||
#[derive(Queryable, Insertable, Serialize, Debug)]
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use crate::addressing::Address;
|
||||
use crate::database::{lookup_by_filename, retrieve_by_hash, retrieve_object, DbPool, Entry};
|
||||
use crate::database::{lookup_by_filename, retrieve_file, retrieve_object, DbPool, Entry};
|
||||
use crate::filesystem::{list_directory, UPath};
|
||||
use crate::hash::{decode, encode};
|
||||
use actix_files::NamedFile;
|
||||
|
@ -23,7 +23,7 @@ pub async fn get_raw(state: web::Data<State>, hash: web::Path<String>) -> Result
|
|||
.map_err(ErrorInternalServerError)?;
|
||||
if let Address::Hash(hash) = address {
|
||||
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
|
||||
let response = retrieve_by_hash(&connection, hash);
|
||||
let response = retrieve_file(&connection, hash);
|
||||
|
||||
debug!("{:?}", response);
|
||||
|
||||
|
|
|
@ -12,9 +12,10 @@ table! {
|
|||
id -> Integer,
|
||||
hash -> Binary,
|
||||
path -> Text,
|
||||
size -> BigInt,
|
||||
created -> Timestamp,
|
||||
valid -> Bool,
|
||||
added -> Timestamp,
|
||||
size -> BigInt,
|
||||
mtime -> Nullable<Timestamp>,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue