diff --git a/src/database/mod.rs b/src/database/mod.rs index d3f77c8..2706147 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -32,7 +32,7 @@ use log::{debug, trace}; use std::convert::{TryFrom, TryInto}; use std::fs; use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; #[derive(Debug)] @@ -77,6 +77,8 @@ pub struct OpenResult { pub struct UpEndDatabase { pool: DbPool, + lock: Arc>, + transaction_lock: Arc>, pub vault_path: Arc, pub db_path: Arc, } @@ -119,6 +121,8 @@ impl UpEndDatabase { let db = UpEndDatabase { pool, + lock: Arc::new(RwLock::new(())), + transaction_lock: Arc::new(Mutex::new(())), vault_path: Arc::new(dirpath.as_ref().canonicalize()?), db_path: Arc::new(upend_path), }; @@ -164,6 +168,8 @@ impl UpEndDatabase { pub fn connection(&self) -> Result { Ok(UpEndConnection { conn: self.pool.get()?, + lock: self.lock.clone(), + transaction_lock: self.transaction_lock.clone(), vault_path: self.vault_path.clone(), }) } @@ -171,6 +177,8 @@ impl UpEndDatabase { pub struct UpEndConnection { conn: PooledConnection>, + transaction_lock: Arc>, + lock: Arc>, vault_path: Arc, } @@ -184,7 +192,8 @@ impl UpEndConnection { F: FnOnce() -> Result, E: From, { - self.conn.transaction(f) + let _lock = self.transaction_lock.lock().unwrap(); + self.conn.exclusive_transaction(f) } pub fn get_meta>(&self, key: S) -> Result { @@ -193,6 +202,8 @@ impl UpEndConnection { debug!("Querying META:{key}"); + let _lock = self.lock.read().unwrap(); + dsl::meta .filter(dsl::key.eq(key)) .load::(&self.conn)? @@ -210,6 +221,8 @@ impl UpEndConnection { Address::Hash(Hash((&file.hash).clone())) ); + let _lock = self.lock.write().unwrap(); + diesel::insert_into(files::table) .values(&file) .execute(&self.conn)?; @@ -226,6 +239,8 @@ impl UpEndConnection { pub fn retrieve_file(&self, obj_hash: &Hash) -> Result> { use crate::database::inner::schema::files::dsl::*; + let _lock = self.lock.read().unwrap(); + let matches = files .filter(valid.eq(true)) .filter(hash.eq(&obj_hash.0)) @@ -249,6 +264,7 @@ impl UpEndConnection { pub fn retrieve_all_files(&self) -> Result> { use crate::database::inner::schema::files::dsl::*; + let _lock = self.lock.read().unwrap(); let matches = files.load::(&self.conn)?; Ok(matches) } @@ -258,6 +274,8 @@ impl UpEndConnection { debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time); + let _lock = self.lock.write().unwrap(); + Ok(diesel::update(files.filter(id.eq(file_id))) .set(mtime.eq(m_time)) .execute(&self.conn)?) @@ -268,6 +286,8 @@ impl UpEndConnection { debug!("Setting file ID {} to valid = {}", file_id, is_valid); + let _lock = self.lock.write().unwrap(); + Ok(diesel::update(files.filter(id.eq(file_id))) .set(valid.eq(is_valid)) .execute(&self.conn)?) @@ -283,6 +303,8 @@ impl UpEndConnection { pub fn retrieve_entry(&self, hash: &Hash) -> Result> { use crate::database::inner::schema::data::dsl::*; + let _lock = self.lock.read().unwrap(); + let entry = data .filter(identity.eq(Address::Hash(hash.clone()).encode()?)) .load::(&self.conn)?; @@ -301,6 +323,8 @@ impl UpEndConnection { pub fn retrieve_object(&self, object_address: &Address) -> Result> { use crate::database::inner::schema::data::dsl::*; + let _lock = self.lock.read().unwrap(); + let primary = data .filter(entity.eq(object_address.encode()?)) .or_filter(value_str.eq(EntryValue::Address(object_address.clone()).to_string()?)) @@ -337,6 +361,8 @@ impl UpEndConnection { debug!("Deleting {}!", object_address); + let _lock = self.lock.write().unwrap(); + let matches = data .filter(identity.eq(object_address.encode()?)) .or_filter(entity.eq(object_address.encode()?)) @@ -348,6 +374,8 @@ impl UpEndConnection { pub fn query(&self, query: Query) -> Result> { trace!("Querying: {:?}", query); + let _lock = self.lock.read().unwrap(); + let entries = execute(&self.conn, query)?; let entries = entries .iter() @@ -374,6 +402,7 @@ impl UpEndConnection { } fn insert_model_entry(&self, entry: models::Entry) -> Result { + let _lock = self.lock.write().unwrap(); let result = diesel::insert_into(data::table) .values(&entry) .execute(&self.conn); @@ -391,6 +420,8 @@ impl UpEndConnection { pub fn get_all_addresses(&self) -> Result> { use crate::database::inner::schema::data::dsl::*; + let _lock = self.lock.read().unwrap(); + let result = data .select(entity) .distinct() @@ -406,6 +437,8 @@ impl UpEndConnection { pub fn get_all_attributes(&self) -> Result> { use crate::database::inner::schema::data::dsl::*; + let _lock = self.lock.read().unwrap(); + let result = data .select(attribute) .distinct()