non-duplicate updates (without updating virtual fs structure yet)

feat/vaults
Tomáš Mládek 2020-09-22 00:41:59 +02:00
parent 63ffe42907
commit 80541fe978
6 changed files with 132 additions and 41 deletions

View File

@ -13,9 +13,10 @@ CREATE TABLE files
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
hash BLOB NOT NULL,
path VARCHAR NOT NULL,
valid BOOLEAN NOT NULL DEFAULT TRUE,
added DATETIME NOT NULL,
size BIGINT NOT NULL,
created DATETIME NOT NULL,
valid BOOLEAN NOT NULL DEFAULT TRUE
mtime DATETIME NULL
);
CREATE INDEX files_hash ON files (hash);

View File

@ -150,7 +150,15 @@ pub fn insert_file<C: Connection<Backend = Sqlite>>(
.execute(connection)?)
}
pub fn retrieve_by_hash<C: Connection<Backend = Sqlite>>(
pub fn retrieve_all_files<C: Connection<Backend = Sqlite>>(
connection: &C,
) -> Result<Vec<models::File>> {
use crate::schema::files::dsl::*;
let matches = files.load::<models::File>(connection)?;
Ok(matches)
}
pub fn retrieve_file<C: Connection<Backend = Sqlite>>(
connection: &C,
obj_hash: Hash,
) -> Result<Option<String>> {
@ -164,6 +172,20 @@ pub fn retrieve_by_hash<C: Connection<Backend = Sqlite>>(
Ok(matches.get(0).map(|f| f.path.clone()))
}
pub fn file_set_valid<C: Connection<Backend = Sqlite>>(
connection: &C,
file_id: i32,
is_valid: bool,
) -> Result<usize> {
use crate::schema::files::dsl::*;
debug!("Setting file ID {} to valid = {}", file_id, is_valid);
Ok(diesel::update(files.filter(id.eq(file_id)))
.set(valid.eq(is_valid))
.execute(connection)?)
}
pub fn lookup_by_filename<C: Connection<Backend = Sqlite>>(
connection: &C,
query: String,
@ -197,6 +219,21 @@ pub fn retrieve_object<C: Connection<Backend = Sqlite>>(
Ok(entries)
}
pub fn remove_object<C: Connection<Backend = Sqlite>>(
connection: &C,
object_address: Address,
) -> Result<usize> {
use crate::schema::data::dsl::*;
debug!("Deleting {}!", object_address);
let matches = data
.filter(target.eq(object_address.encode()?))
.or_filter(value.eq(EntryValue::Address(object_address).to_str()?));
Ok(diesel::delete(matches).execute(connection)?)
}
pub struct EntryQuery {
pub target: Option<Address>,
pub key: Option<String>,
@ -287,7 +324,7 @@ pub struct OpenResult {
pub new: bool,
}
const DATABASE_FILENAME: &str = "upend.sqlite3";
pub const DATABASE_FILENAME: &str = "upend.sqlite3";
pub fn open_upend<P: AsRef<Path>>(dirpath: P, reinitialize: bool) -> Result<OpenResult> {
embed_migrations!("./migrations/upend/");

View File

@ -1,10 +1,11 @@
use crate::addressing::Address;
use crate::database::{
insert_entry, insert_file, query_entries, retrieve_object, DbPool, Entry, EntryQuery,
EntryValue, InnerEntry,
file_set_valid, insert_entry, insert_file, query_entries, retrieve_all_files, retrieve_object,
DbPool, Entry, EntryQuery, EntryValue, InnerEntry, DATABASE_FILENAME,
};
use crate::hash::Hashable;
use crate::models;
use crate::models::File;
use anyhow::{anyhow, Error, Result};
use chrono::prelude::*;
use diesel::sqlite::Sqlite;
@ -15,6 +16,7 @@ use serde::export::Formatter;
use serde_json::Value;
use std::path::{Component, Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::time::UNIX_EPOCH;
use std::{fs, iter};
use uuid::Uuid;
use walkdir::WalkDir;
@ -294,65 +296,113 @@ pub async fn reimport_directory(pool: DbPool, directory: PathBuf) {
error!("Update did not succeed! {}", err);
}
}
fn _reimport_directory<T: AsRef<Path>>(pool: DbPool, directory: T) -> Result<()> {
fn _reimport_directory<T: AsRef<Path>>(pool: DbPool, directory: T) -> Result<Vec<Result<()>>> {
let path_entries: Vec<PathBuf> = WalkDir::new(&directory)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.path().is_file())
.filter(|e| e.path().is_file() && e.file_name() != DATABASE_FILENAME)
.map(|e| fs::canonicalize(e.into_path()).unwrap())
.collect();
let mutex_pool = Arc::new(RwLock::new(pool));
let rw_pool = Arc::new(RwLock::new(pool.clone()));
let absolute_path = fs::canonicalize(&directory)?;
path_entries
let existing_files = Arc::new(RwLock::new(retrieve_all_files(&pool.get()?)?));
let path_results: Vec<Result<()>> = path_entries
.into_par_iter()
.map(|path| Ok(_process_directory_entry(&mutex_pool, path, &absolute_path)?))
.collect::<Result<()>>()?;
.map(|path| {
Ok(_process_directory_entry(
&rw_pool,
path,
&absolute_path,
&existing_files,
)?)
})
.collect();
let cleanup_results: Vec<Result<()>> = existing_files
.write()
.unwrap()
.iter()
.filter(|f| f.valid)
.map(|file| {
let connection = pool.get()?;
connection.transaction::<_, Error, _>(|| {
let _ = file_set_valid(&connection, file.id, false)?;
// remove_object(&connection, )?
Ok(())
})
})
.collect();
info!("Finished updating {}.", directory.as_ref().display());
Ok(())
Ok(path_results
.into_iter()
.chain(cleanup_results.into_iter())
.collect())
}
fn _process_directory_entry<P: AsRef<Path>>(
pool: &Arc<RwLock<DbPool>>,
db_pool: &Arc<RwLock<DbPool>>,
path: PathBuf,
directory_path: &P,
existing_files: &Arc<RwLock<Vec<File>>>,
) -> Result<()> {
info!("Processing: {:?}", path);
let pool = Arc::clone(&pool);
let db_pool = Arc::clone(&db_pool);
let existing_files = Arc::clone(&existing_files);
let normalized_path = path.strip_prefix(&directory_path)?;
let normalized_path_str = normalized_path.to_str().expect("path not valid unicode?!");
let metadata = fs::metadata(&path)?;
let size = metadata.len() as i64;
if size < 0 {
panic!("File {} too large?!", path.display());
}
let mtime = metadata
.modified()
.map(|t| {
NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0)
})
.ok();
{
let mut existing_files = existing_files.write().unwrap();
let maybe_existing_file = existing_files
.iter()
.enumerate()
.find(|(_, file)| file.path == normalized_path_str)
.clone();
if let Some((idx, existing_file)) = maybe_existing_file {
if size == existing_file.size && mtime == existing_file.mtime {
if !existing_file.valid {
let _ =
file_set_valid(&db_pool.write().unwrap().get()?, existing_file.id, true)?;
}
existing_files.remove(idx);
return Ok(());
}
}
}
let digest = path.hash()?;
// let existing_file: Option<String> = db_executor
// .send(RetrieveByHash {
// hash: digest.clone(),
// })
// .await??;
let new_file = models::NewFile {
path: path
.strip_prefix(&directory_path)?
.to_str()
.expect("path not valid unicode?!")
.to_string(),
path: normalized_path_str.to_string(),
hash: (digest.clone()).0,
created: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
size,
mtime,
};
let _ = insert_file(&pool.write().unwrap().get()?, new_file)?;
let _ = insert_file(&db_pool.write().unwrap().get()?, new_file)?;
let components = path
.strip_prefix(&directory_path)?
.components()
.collect::<Vec<Component>>();
let components = normalized_path.components().collect::<Vec<Component>>();
let (filename, dir_path) = components.split_last().unwrap();
let upath = UPath(
@ -364,10 +414,10 @@ fn _process_directory_entry<P: AsRef<Path>>(
}))
.collect(),
);
let resolved_path = resolve_path(&pool.write().unwrap().get()?, &upath, true)?;
let resolved_path = resolve_path(&db_pool.write().unwrap().get()?, &upath, true)?;
let parent_dir = resolved_path.last().unwrap();
let _pool = &pool.write().unwrap();
let _pool = &db_pool.write().unwrap();
let connection = _pool.get()?;
connection.transaction::<_, Error, _>(|| {
let file_address = Address::UUID(Uuid::new_v4());

View File

@ -8,9 +8,10 @@ pub struct File {
pub id: i32,
pub hash: Vec<u8>,
pub path: String,
pub size: i64,
pub created: NaiveDateTime,
pub valid: bool,
pub added: NaiveDateTime,
pub size: i64,
pub mtime: Option<NaiveDateTime>,
}
#[derive(Insertable, Debug)]
@ -18,8 +19,9 @@ pub struct File {
pub struct NewFile {
pub hash: Vec<u8>,
pub path: String,
pub added: NaiveDateTime,
pub size: i64,
pub created: NaiveDateTime,
pub mtime: Option<NaiveDateTime>,
}
#[derive(Queryable, Insertable, Serialize, Debug)]

View File

@ -1,5 +1,5 @@
use crate::addressing::Address;
use crate::database::{lookup_by_filename, retrieve_by_hash, retrieve_object, DbPool, Entry};
use crate::database::{lookup_by_filename, retrieve_file, retrieve_object, DbPool, Entry};
use crate::filesystem::{list_directory, UPath};
use crate::hash::{decode, encode};
use actix_files::NamedFile;
@ -23,7 +23,7 @@ pub async fn get_raw(state: web::Data<State>, hash: web::Path<String>) -> Result
.map_err(ErrorInternalServerError)?;
if let Address::Hash(hash) = address {
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
let response = retrieve_by_hash(&connection, hash);
let response = retrieve_file(&connection, hash);
debug!("{:?}", response);

View File

@ -12,9 +12,10 @@ table! {
id -> Integer,
hash -> Binary,
path -> Text,
size -> BigInt,
created -> Timestamp,
valid -> Bool,
added -> Timestamp,
size -> BigInt,
mtime -> Nullable<Timestamp>,
}
}