parallelize hashing with rayon
parent
9823f646cd
commit
0e0afae089
|
@ -537,6 +537,30 @@ dependencies = [
|
|||
"crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-deque"
|
||||
version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-epoch"
|
||||
version = "0.8.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"memoffset 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"scopeguard 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-utils"
|
||||
version = "0.7.2"
|
||||
|
@ -1019,11 +1043,24 @@ name = "matches"
|
|||
version = "0.1.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "maybe-uninit"
|
||||
version = "2.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
version = "2.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
|
||||
[[package]]
|
||||
name = "memoffset"
|
||||
version = "0.5.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "migrations_internals"
|
||||
version = "1.4.1"
|
||||
|
@ -1342,6 +1379,29 @@ dependencies = [
|
|||
"rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"either 1.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rayon-core 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon-core"
|
||||
version = "1.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"crossbeam-channel 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.1.57"
|
||||
|
@ -1754,6 +1814,7 @@ dependencies = [
|
|||
"env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"filebuffer 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rayon 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serde 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serde_json 1.0.57 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"thiserror 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -2026,6 +2087,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
"checksum copyless 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "a2df960f5d869b2dd8532793fde43eb5427cceb126c929747a26823ab0eeb536"
|
||||
"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
|
||||
"checksum crossbeam-channel 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "09ee0cc8804d5393478d743b035099520087a5186f3b93fa58cec08fa62407b6"
|
||||
"checksum crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285"
|
||||
"checksum crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
|
||||
"checksum crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
|
||||
"checksum crunchy 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7"
|
||||
"checksum derive_more 0.99.9 (registry+https://github.com/rust-lang/crates.io-index)" = "298998b1cf6b5b2c8a7b023dfd45821825ce3ba8a8af55c921a0e734e4653f76"
|
||||
|
@ -2084,7 +2147,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
"checksum lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c"
|
||||
"checksum match_cfg 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
|
||||
"checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
|
||||
"checksum maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
|
||||
"checksum memchr 2.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400"
|
||||
"checksum memoffset 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "043175f069eda7b85febe4a74abbaeff828d9f8b448515d3151a14a3542811aa"
|
||||
"checksum migrations_internals 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2b4fc84e4af020b837029e017966f86a1c2d5e83e64b589963d5047525995860"
|
||||
"checksum migrations_macros 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9753f12909fd8d923f75ae5c3258cae1ed3c8ec052e1b38c93c21a6d157f789c"
|
||||
"checksum mime 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)" = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
|
||||
|
@ -2123,6 +2188,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
"checksum rand_chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
|
||||
"checksum rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
|
||||
"checksum rand_hc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
|
||||
"checksum rayon 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "cfd016f0c045ad38b5251be2c9c0ab806917f82da4d36b2a327e5166adad9270"
|
||||
"checksum rayon-core 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e8c4fec834fb6e6d2dd5eece3c7b432a52f0ba887cf40e595190c4107edc08bf"
|
||||
"checksum redox_syscall 0.1.57 (registry+https://github.com/rust-lang/crates.io-index)" = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
|
||||
"checksum regex 1.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "9c3780fcf44b193bc4d09f36d2a3c87b251da4a046c87795a0d35f4f927ad8e6"
|
||||
"checksum regex-syntax 0.6.18 (registry+https://github.com/rust-lang/crates.io-index)" = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8"
|
||||
|
|
|
@ -27,6 +27,7 @@ actix_derive = "0.3.2"
|
|||
chrono = { version = "0.4", features = ["serde"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
rayon = "1.4.0"
|
||||
|
||||
bs58 = "0.3.1"
|
||||
filebuffer = "0.4.0"
|
||||
|
|
|
@ -1,23 +1,23 @@
|
|||
use crate::addressing::Address;
|
||||
use crate::database::{
|
||||
insert_entry, insert_file, query_entries, retrieve_object, Entry, EntryQuery, EntryValue,
|
||||
InnerEntry,
|
||||
insert_entry, insert_file, query_entries, retrieve_object, DbPool, Entry, EntryQuery,
|
||||
EntryValue, InnerEntry,
|
||||
};
|
||||
use crate::hash::{ComputeHash, HasherWorker};
|
||||
use crate::hash::{hash, ComputeHash, Hashable, HasherWorker};
|
||||
use crate::models;
|
||||
use actix::prelude::*;
|
||||
use anyhow::{anyhow, Error, Result};
|
||||
use chrono::prelude::*;
|
||||
use diesel::sqlite::Sqlite;
|
||||
use diesel::Connection;
|
||||
use log::{error, info, trace};
|
||||
use rayon::prelude::*;
|
||||
use serde::export::Formatter;
|
||||
use serde_json::Value;
|
||||
use std::path::{Component, Path, PathBuf};
|
||||
use std::{fs, iter};
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use actix::prelude::*;
|
||||
use chrono::prelude::*;
|
||||
use diesel::sqlite::Sqlite;
|
||||
use diesel::Connection;
|
||||
use uuid::Uuid;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
const DIR_KEY: &str = "DIR";
|
||||
const DIR_HAS_KEY: &str = "DIR_HAS";
|
||||
|
@ -127,7 +127,7 @@ impl EntryList for Vec<Entry> {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn list_roots<C: Connection<Backend = Sqlite>>(connection: &C) -> Result<Vec<Entry>> {
|
||||
pub fn list_roots<C: Connection<Backend = Sqlite>>(connection: &C) -> Result<Vec<Entry>> {
|
||||
let all_directories: Vec<Entry> = query_entries(
|
||||
connection,
|
||||
EntryQuery {
|
||||
|
@ -158,13 +158,12 @@ pub async fn list_directory<C: Connection<Backend = Sqlite>>(
|
|||
path: &UPath,
|
||||
) -> Result<Vec<Entry>> {
|
||||
let entry_addresses = match path.0.len() {
|
||||
0 => list_roots(connection)
|
||||
.await?
|
||||
0 => list_roots(connection)?
|
||||
.into_iter()
|
||||
.map(|e| e.target)
|
||||
.collect(),
|
||||
_ => {
|
||||
let resolved_path: Vec<Address> = resolve_path(connection, path, false).await?;
|
||||
let resolved_path: Vec<Address> = resolve_path(connection, path, false)?;
|
||||
let last = resolved_path.last().unwrap();
|
||||
|
||||
query_entries(
|
||||
|
@ -191,7 +190,7 @@ pub async fn list_directory<C: Connection<Backend = Sqlite>>(
|
|||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn fetch_or_create_dir<C: Connection<Backend = Sqlite>>(
|
||||
pub fn fetch_or_create_dir<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
parent: Option<Address>,
|
||||
directory: UDirectory,
|
||||
|
@ -267,7 +266,7 @@ pub async fn fetch_or_create_dir<C: Connection<Backend = Sqlite>>(
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn resolve_path<C: Connection<Backend = Sqlite>>(
|
||||
pub fn resolve_path<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
path: &UPath,
|
||||
create: bool,
|
||||
|
@ -282,49 +281,47 @@ pub async fn resolve_path<C: Connection<Backend = Sqlite>>(
|
|||
result.last().cloned(),
|
||||
path_stack.pop().unwrap(),
|
||||
create,
|
||||
)
|
||||
.await?;
|
||||
)?;
|
||||
result.push(dir_address);
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
pub async fn reimport_directory<C: Connection<Backend = Sqlite>>(
|
||||
connection: C,
|
||||
directory: PathBuf,
|
||||
hasher_worker: Addr<HasherWorker>,
|
||||
) {
|
||||
let result = _reimport_directory(&connection, directory, &hasher_worker).await;
|
||||
pub async fn reimport_directory(pool: DbPool, directory: PathBuf) {
|
||||
let result = actix_web::web::block(move || _reimport_directory(&pool, directory)).await;
|
||||
if result.is_err() {
|
||||
error!("Update did not succeed! {}", result.err().unwrap());
|
||||
let err = result.err().unwrap();
|
||||
error!("Update did not succeed! {}", err);
|
||||
}
|
||||
}
|
||||
async fn _reimport_directory<C: Connection<Backend = Sqlite>, T: AsRef<Path>>(
|
||||
connection: &C,
|
||||
directory: T,
|
||||
hasher_worker: &Addr<HasherWorker>,
|
||||
) -> Result<()> {
|
||||
let path_entries: Result<Vec<PathBuf>, std::io::Error> = WalkDir::new(&directory)
|
||||
fn _reimport_directory<T: AsRef<Path>>(pool: &DbPool, directory: T) -> Result<()> {
|
||||
let path_entries: Vec<PathBuf> = WalkDir::new(&directory)
|
||||
.into_iter()
|
||||
.filter_map(|e| e.ok())
|
||||
.filter(|e| e.path().is_file())
|
||||
.map(|e| fs::canonicalize(e.into_path()))
|
||||
.filter_map(|e| fs::canonicalize(e.into_path()).ok()) // ???
|
||||
.collect();
|
||||
|
||||
let absolute_path = fs::canonicalize(&directory)?;
|
||||
for path in path_entries? {
|
||||
_process_directory_entry(connection, path, &absolute_path, &hasher_worker).await?;
|
||||
}
|
||||
path_entries
|
||||
.into_par_iter()
|
||||
.map(|path| {
|
||||
Ok(_process_directory_entry(
|
||||
&pool.get()?,
|
||||
path,
|
||||
&absolute_path,
|
||||
)?)
|
||||
})
|
||||
.collect::<Result<()>>()?;
|
||||
info!("Finished updating {}.", directory.as_ref().display());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
|
||||
fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path>>(
|
||||
connection: &C,
|
||||
path: PathBuf,
|
||||
directory_path: &P,
|
||||
hasher_worker: &Addr<HasherWorker>,
|
||||
) -> Result<()> {
|
||||
info!("Processing: {:?}", path);
|
||||
|
||||
|
@ -334,9 +331,7 @@ async fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path
|
|||
panic!("File {} too large?!", path.display());
|
||||
}
|
||||
|
||||
let digest = hasher_worker
|
||||
.send(ComputeHash { path: path.clone() })
|
||||
.await??;
|
||||
let digest = path.hash()?;
|
||||
|
||||
// let existing_file: Option<String> = db_executor
|
||||
// .send(RetrieveByHash {
|
||||
|
@ -372,7 +367,7 @@ async fn _process_directory_entry<C: Connection<Backend = Sqlite>, P: AsRef<Path
|
|||
}))
|
||||
.collect(),
|
||||
);
|
||||
let resolved_path = resolve_path(connection, &upath, true).await?;
|
||||
let resolved_path = resolve_path(connection, &upath, true)?;
|
||||
let parent_dir = resolved_path.last().unwrap();
|
||||
|
||||
connection.transaction::<_, Error, _>(|| {
|
||||
|
|
|
@ -47,8 +47,8 @@ impl Handler<ComputeHash> for HasherWorker {
|
|||
}
|
||||
}
|
||||
|
||||
impl Hashable for Path {
|
||||
fn hash(self: &Path) -> Result<Hash> {
|
||||
impl Hashable for PathBuf {
|
||||
fn hash(self: &PathBuf) -> Result<Hash> {
|
||||
let fbuffer = FileBuffer::open(self)?;
|
||||
Ok(hash(&fbuffer))
|
||||
}
|
||||
|
|
|
@ -99,10 +99,7 @@ fn main() -> Result<()> {
|
|||
|
||||
if open_result.new {
|
||||
info!("The vault has been just created, running initial update...");
|
||||
let connection = db_pool.get()?;
|
||||
actix::spawn(filesystem::reimport_directory(
|
||||
connection, vault_path, hash_addr,
|
||||
));
|
||||
actix::spawn(filesystem::reimport_directory(db_pool, vault_path));
|
||||
}
|
||||
|
||||
if !matches.is_present("NO_BROWSER") {
|
||||
|
|
|
@ -98,12 +98,8 @@ pub async fn get_lookup(
|
|||
|
||||
#[post("/api/refresh")]
|
||||
pub async fn api_refresh(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
||||
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
|
||||
|
||||
actix::spawn(crate::filesystem::reimport_directory(
|
||||
connection,
|
||||
state.directory.clone(),
|
||||
state.hasher.clone(),
|
||||
));
|
||||
let _pool = state.db_pool.clone();
|
||||
let _directory = state.directory.clone();
|
||||
actix::spawn(crate::filesystem::reimport_directory(_pool, _directory));
|
||||
Ok(HttpResponse::Ok().finish())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue