parallelize hashing with rayon

feat/vaults
Tomáš Mládek 2020-09-20 19:28:44 +02:00
parent 9823f646cd
commit 0e0afae089
6 changed files with 109 additions and 53 deletions

67
Cargo.lock generated
View File

@ -537,6 +537,30 @@ dependencies = [
"crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-deque"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-epoch"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"memoffset 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)",
"scopeguard 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-utils"
version = "0.7.2"
@ -1019,11 +1043,24 @@ name = "matches"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "maybe-uninit"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "memchr"
version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "memoffset"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "migrations_internals"
version = "1.4.1"
@ -1342,6 +1379,29 @@ dependencies = [
"rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rayon"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"either 1.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon-core 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rayon-core"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-channel 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "redox_syscall"
version = "0.1.57"
@ -1754,6 +1814,7 @@ dependencies = [
"env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"filebuffer 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.57 (registry+https://github.com/rust-lang/crates.io-index)",
"thiserror 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2026,6 +2087,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum copyless 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "a2df960f5d869b2dd8532793fde43eb5427cceb126c929747a26823ab0eeb536"
"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
"checksum crossbeam-channel 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "09ee0cc8804d5393478d743b035099520087a5186f3b93fa58cec08fa62407b6"
"checksum crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285"
"checksum crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
"checksum crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
"checksum crunchy 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7"
"checksum derive_more 0.99.9 (registry+https://github.com/rust-lang/crates.io-index)" = "298998b1cf6b5b2c8a7b023dfd45821825ce3ba8a8af55c921a0e734e4653f76"
@ -2084,7 +2147,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c"
"checksum match_cfg 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
"checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
"checksum maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
"checksum memchr 2.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400"
"checksum memoffset 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa"
"checksum migrations_internals 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2b4fc84e4af020b837029e017966f86a1c2d5e83e64b589963d5047525995860"
"checksum migrations_macros 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9753f12909fd8d923f75ae5c3258cae1ed3c8ec052e1b38c93c21a6d157f789c"
"checksum mime 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)" = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
@ -2123,6 +2188,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum rand_chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
"checksum rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
"checksum rand_hc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
"checksum rayon 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "cfd016f0c045ad38b5251be2c9c0ab806917f82da4d36b2a327e5166adad9270"
"checksum rayon-core 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e8c4fec834fb6e6d2dd5eece3c7b432a52f0ba887cf40e595190c4107edc08bf"
"checksum redox_syscall 0.1.57 (registry+https://github.com/rust-lang/crates.io-index)" = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
"checksum regex 1.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "9c3780fcf44b193bc4d09f36d2a3c87b251da4a046c87795a0d35f4f927ad8e6"
"checksum regex-syntax 0.6.18 (registry+https://github.com/rust-lang/crates.io-index)" = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8"

View File

@ -27,6 +27,7 @@ actix_derive = "0.3.2"
chrono = { version = "0.4", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rayon = "1.4.0"
bs58 = "0.3.1"
filebuffer = "0.4.0"

View File

@ -1,23 +1,23 @@
use crate::addressing::Address;
use crate::database::{
insert_entry, insert_file, query_entries, retrieve_object, Entry, EntryQuery, EntryValue,
InnerEntry,
insert_entry, insert_file, query_entries, retrieve_object, DbPool, Entry, EntryQuery,
EntryValue, InnerEntry,
};
use crate::hash::{ComputeHash, HasherWorker};
use crate::hash::{hash, ComputeHash, Hashable, HasherWorker};
use crate::models;
use actix::prelude::*;
use anyhow::{anyhow, Error, Result};
use chrono::prelude::*;
use diesel::sqlite::Sqlite;
use diesel::Connection;
use log::{error, info, trace};
use rayon::prelude::*;
use serde::export::Formatter;
use serde_json::Value;
use std::path::{Component, Path, PathBuf};
use std::{fs, iter};
use walkdir::WalkDir;
use actix::prelude::*;
use chrono::prelude::*;
use diesel::sqlite::Sqlite;
use diesel::Connection;
use uuid::Uuid;
use walkdir::WalkDir;
const DIR_KEY: &str = "DIR";
const DIR_HAS_KEY: &str = "DIR_HAS";
@ -127,7 +127,7 @@ impl EntryList for Vec<Entry> {
}
}
pub async fn list_roots<C: Connection<Backend = Sqlite>>(connection: &C) -> Result<Vec<Entry>> {
pub fn list_roots<C: Connection<Backend = Sqlite>>(connection: &C) -> Result<Vec<Entry>> {
let all_directories: Vec<Entry> = query_entries(
connection,
EntryQuery {
@ -158,13 +158,12 @@ pub async fn list_directory<C: Connection<Backend = Sqlite>>(
path: &UPath,
) -> Result<Vec<Entry>> {
let entry_addresses = match path.0.len() {
0 => list_roots(connection)
.await?
0 => list_roots(connection)?
.into_iter()
.map(|e| e.target)
.collect(),
_ => {
let resolved_path: Vec<Address> = resolve_path(connection, path, false).await?;
let resolved_path: Vec<Address> = resolve_path(connection, path, false)?;
let last = resolved_path.last().unwrap();
query_entries(
@ -191,7 +190,7 @@ pub async fn list_directory<C: Connection<Backend = Sqlite>>(
Ok(result)
}
pub async fn fetch_or_create_dir<C: Connection<Backend = Sqlite>>(
pub fn fetch_or_create_dir<C: Connection<Backend = Sqlite>>(
connection: &C,
parent: Option<Address>,
directory: UDirectory,
@ -267,7 +266,7 @@ pub async fn fetch_or_create_dir<C: Connection<Backend = Sqlite>>(
}
}
pub async fn resolve_path<C: Connection<Backend = Sqlite>>(
pub fn resolve_path<C: Connection<Backend = Sqlite>>(
connection: &C,
path: &UPath,
create: bool,
@ -282,49 +281,47 @@ pub async fn resolve_path<C: Connection<Backend = Sqlite>>(
result.last().cloned(),
path_stack.pop().unwrap(),
create,
)
.await?;
)?;
result.push(dir_address);
}
Ok(result)
}
pub async fn reimport_directory<C: Connection<Backend = Sqlite>>(
connection: C,
directory: PathBuf,
hasher_worker: Addr<HasherWorker>,
) {
let result = _reimport_directory(&connection, directory, &hasher_worker).await;
pub async fn reimport_directory(pool: DbPool, directory: PathBuf) {
let result = actix_web::web::block(move || _reimport_directory(&pool, directory)).await;
if result.is_err() {
error!("Update did not succeed! {}", result.err().unwrap());
let err = result.err().unwrap();
error!("Update did not succeed! {}", err);
}
}
async fn _reimport_directory<C: Connection<Backend = Sqlite>, T: AsRef<Path>>(
connection: &C,
directory: T,
hasher_worker: &Addr<HasherWorker>,
) -> Result<()> {
let path_entries: Result<Vec<PathBuf>, std::io::Error> = WalkDir::new(&directory)
fn _reimport_directory<T: AsRef<Path>>(pool: &DbPool, directory: T) -> Result<()> {
let path_entries: Vec<PathBuf> = WalkDir::new(&directory)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.path().is_file())
.map(|e| fs::canonicalize(e.into_path()))
.filter_map(|e| fs::canonicalize(e.into_path()).ok()) // ???
.collect();
let absolute_path = fs::canonicalize(&directory)?;
for path in path_entries? {
_process_directory_entry(connection, path, &absolute_path, &hasher_worker).await?;
}
path_entries
.into_par_iter()
.map(|path| {
Ok(_process_directory_entry(
&pool.get()?,
path,
&absolute_path,
)?)
})
.collect::<Result<()>>()?;
info!("Finished updating {}.", directory.as_ref().display());
Ok(())
}
async fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
connection: &C,
path: PathBuf,
directory_path: &P,
hasher_worker: &Addr<HasherWorker>,
) -> Result<()> {
info!("Processing: {:?}", path);
@ -334,9 +331,7 @@ async fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path
panic!("File {} too large?!", path.display());
}
let digest = hasher_worker
.send(ComputeHash { path: path.clone() })
.await??;
let digest = path.hash()?;
// let existing_file: Option<String> = db_executor
// .send(RetrieveByHash {
@ -372,7 +367,7 @@ async fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path
}))
.collect(),
);
let resolved_path = resolve_path(connection, &upath, true).await?;
let resolved_path = resolve_path(connection, &upath, true)?;
let parent_dir = resolved_path.last().unwrap();
connection.transaction::<_, Error, _>(|| {

View File

@ -47,8 +47,8 @@ impl Handler<ComputeHash> for HasherWorker {
}
}
impl Hashable for Path {
fn hash(self: &Path) -> Result<Hash> {
impl Hashable for PathBuf {
fn hash(self: &PathBuf) -> Result<Hash> {
let fbuffer = FileBuffer::open(self)?;
Ok(hash(&fbuffer))
}

View File

@ -99,10 +99,7 @@ fn main() -> Result<()> {
if open_result.new {
info!("The vault has been just created, running initial update...");
let connection = db_pool.get()?;
actix::spawn(filesystem::reimport_directory(
connection, vault_path, hash_addr,
));
actix::spawn(filesystem::reimport_directory(db_pool, vault_path));
}
if !matches.is_present("NO_BROWSER") {

View File

@ -98,12 +98,8 @@ pub async fn get_lookup(
#[post("/api/refresh")]
pub async fn api_refresh(state: web::Data<State>) -> Result<HttpResponse, Error> {
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
actix::spawn(crate::filesystem::reimport_directory(
connection,
state.directory.clone(),
state.hasher.clone(),
));
let _pool = state.db_pool.clone();
let _directory = state.directory.clone();
actix::spawn(crate::filesystem::reimport_directory(_pool, _directory));
Ok(HttpResponse::Ok().finish())
}