fix: add global locks to db, fix sqlite errors (?)

feat/vaults
Tomáš Mládek 2022-07-31 22:49:31 +02:00
parent 38e6d6b6df
commit 41d0e55ca7
1 changed files with 35 additions and 2 deletions

View File

@ -32,7 +32,7 @@ use log::{debug, trace};
use std::convert::{TryFrom, TryInto}; use std::convert::{TryFrom, TryInto};
use std::fs; use std::fs;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration; use std::time::Duration;
#[derive(Debug)] #[derive(Debug)]
@ -77,6 +77,8 @@ pub struct OpenResult {
pub struct UpEndDatabase { pub struct UpEndDatabase {
pool: DbPool, pool: DbPool,
lock: Arc<RwLock<()>>,
transaction_lock: Arc<Mutex<()>>,
pub vault_path: Arc<PathBuf>, pub vault_path: Arc<PathBuf>,
pub db_path: Arc<PathBuf>, pub db_path: Arc<PathBuf>,
} }
@ -119,6 +121,8 @@ impl UpEndDatabase {
let db = UpEndDatabase { let db = UpEndDatabase {
pool, pool,
lock: Arc::new(RwLock::new(())),
transaction_lock: Arc::new(Mutex::new(())),
vault_path: Arc::new(dirpath.as_ref().canonicalize()?), vault_path: Arc::new(dirpath.as_ref().canonicalize()?),
db_path: Arc::new(upend_path), db_path: Arc::new(upend_path),
}; };
@ -164,6 +168,8 @@ impl UpEndDatabase {
pub fn connection(&self) -> Result<UpEndConnection> { pub fn connection(&self) -> Result<UpEndConnection> {
Ok(UpEndConnection { Ok(UpEndConnection {
conn: self.pool.get()?, conn: self.pool.get()?,
lock: self.lock.clone(),
transaction_lock: self.transaction_lock.clone(),
vault_path: self.vault_path.clone(), vault_path: self.vault_path.clone(),
}) })
} }
@ -171,6 +177,8 @@ impl UpEndDatabase {
pub struct UpEndConnection { pub struct UpEndConnection {
conn: PooledConnection<ConnectionManager<SqliteConnection>>, conn: PooledConnection<ConnectionManager<SqliteConnection>>,
transaction_lock: Arc<Mutex<()>>,
lock: Arc<RwLock<()>>,
vault_path: Arc<PathBuf>, vault_path: Arc<PathBuf>,
} }
@ -184,7 +192,8 @@ impl UpEndConnection {
F: FnOnce() -> Result<T, E>, F: FnOnce() -> Result<T, E>,
E: From<Error>, E: From<Error>,
{ {
self.conn.transaction(f) let _lock = self.transaction_lock.lock().unwrap();
self.conn.exclusive_transaction(f)
} }
pub fn get_meta<S: AsRef<str>>(&self, key: S) -> Result<String> { pub fn get_meta<S: AsRef<str>>(&self, key: S) -> Result<String> {
@ -193,6 +202,8 @@ impl UpEndConnection {
debug!("Querying META:{key}"); debug!("Querying META:{key}");
let _lock = self.lock.read().unwrap();
dsl::meta dsl::meta
.filter(dsl::key.eq(key)) .filter(dsl::key.eq(key))
.load::<models::MetaValue>(&self.conn)? .load::<models::MetaValue>(&self.conn)?
@ -210,6 +221,8 @@ impl UpEndConnection {
Address::Hash(Hash((&file.hash).clone())) Address::Hash(Hash((&file.hash).clone()))
); );
let _lock = self.lock.write().unwrap();
diesel::insert_into(files::table) diesel::insert_into(files::table)
.values(&file) .values(&file)
.execute(&self.conn)?; .execute(&self.conn)?;
@ -226,6 +239,8 @@ impl UpEndConnection {
pub fn retrieve_file(&self, obj_hash: &Hash) -> Result<Vec<models::OutFile>> { pub fn retrieve_file(&self, obj_hash: &Hash) -> Result<Vec<models::OutFile>> {
use crate::database::inner::schema::files::dsl::*; use crate::database::inner::schema::files::dsl::*;
let _lock = self.lock.read().unwrap();
let matches = files let matches = files
.filter(valid.eq(true)) .filter(valid.eq(true))
.filter(hash.eq(&obj_hash.0)) .filter(hash.eq(&obj_hash.0))
@ -249,6 +264,7 @@ impl UpEndConnection {
pub fn retrieve_all_files(&self) -> Result<Vec<models::File>> { pub fn retrieve_all_files(&self) -> Result<Vec<models::File>> {
use crate::database::inner::schema::files::dsl::*; use crate::database::inner::schema::files::dsl::*;
let _lock = self.lock.read().unwrap();
let matches = files.load::<models::File>(&self.conn)?; let matches = files.load::<models::File>(&self.conn)?;
Ok(matches) Ok(matches)
} }
@ -258,6 +274,8 @@ impl UpEndConnection {
debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time); debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time);
let _lock = self.lock.write().unwrap();
Ok(diesel::update(files.filter(id.eq(file_id))) Ok(diesel::update(files.filter(id.eq(file_id)))
.set(mtime.eq(m_time)) .set(mtime.eq(m_time))
.execute(&self.conn)?) .execute(&self.conn)?)
@ -268,6 +286,8 @@ impl UpEndConnection {
debug!("Setting file ID {} to valid = {}", file_id, is_valid); debug!("Setting file ID {} to valid = {}", file_id, is_valid);
let _lock = self.lock.write().unwrap();
Ok(diesel::update(files.filter(id.eq(file_id))) Ok(diesel::update(files.filter(id.eq(file_id)))
.set(valid.eq(is_valid)) .set(valid.eq(is_valid))
.execute(&self.conn)?) .execute(&self.conn)?)
@ -283,6 +303,8 @@ impl UpEndConnection {
pub fn retrieve_entry(&self, hash: &Hash) -> Result<Option<Entry>> { pub fn retrieve_entry(&self, hash: &Hash) -> Result<Option<Entry>> {
use crate::database::inner::schema::data::dsl::*; use crate::database::inner::schema::data::dsl::*;
let _lock = self.lock.read().unwrap();
let entry = data let entry = data
.filter(identity.eq(Address::Hash(hash.clone()).encode()?)) .filter(identity.eq(Address::Hash(hash.clone()).encode()?))
.load::<models::Entry>(&self.conn)?; .load::<models::Entry>(&self.conn)?;
@ -301,6 +323,8 @@ impl UpEndConnection {
pub fn retrieve_object(&self, object_address: &Address) -> Result<Vec<Entry>> { pub fn retrieve_object(&self, object_address: &Address) -> Result<Vec<Entry>> {
use crate::database::inner::schema::data::dsl::*; use crate::database::inner::schema::data::dsl::*;
let _lock = self.lock.read().unwrap();
let primary = data let primary = data
.filter(entity.eq(object_address.encode()?)) .filter(entity.eq(object_address.encode()?))
.or_filter(value_str.eq(EntryValue::Address(object_address.clone()).to_string()?)) .or_filter(value_str.eq(EntryValue::Address(object_address.clone()).to_string()?))
@ -337,6 +361,8 @@ impl UpEndConnection {
debug!("Deleting {}!", object_address); debug!("Deleting {}!", object_address);
let _lock = self.lock.write().unwrap();
let matches = data let matches = data
.filter(identity.eq(object_address.encode()?)) .filter(identity.eq(object_address.encode()?))
.or_filter(entity.eq(object_address.encode()?)) .or_filter(entity.eq(object_address.encode()?))
@ -348,6 +374,8 @@ impl UpEndConnection {
pub fn query(&self, query: Query) -> Result<Vec<Entry>> { pub fn query(&self, query: Query) -> Result<Vec<Entry>> {
trace!("Querying: {:?}", query); trace!("Querying: {:?}", query);
let _lock = self.lock.read().unwrap();
let entries = execute(&self.conn, query)?; let entries = execute(&self.conn, query)?;
let entries = entries let entries = entries
.iter() .iter()
@ -374,6 +402,7 @@ impl UpEndConnection {
} }
fn insert_model_entry(&self, entry: models::Entry) -> Result<usize> { fn insert_model_entry(&self, entry: models::Entry) -> Result<usize> {
let _lock = self.lock.write().unwrap();
let result = diesel::insert_into(data::table) let result = diesel::insert_into(data::table)
.values(&entry) .values(&entry)
.execute(&self.conn); .execute(&self.conn);
@ -391,6 +420,8 @@ impl UpEndConnection {
pub fn get_all_addresses(&self) -> Result<Vec<Address>> { pub fn get_all_addresses(&self) -> Result<Vec<Address>> {
use crate::database::inner::schema::data::dsl::*; use crate::database::inner::schema::data::dsl::*;
let _lock = self.lock.read().unwrap();
let result = data let result = data
.select(entity) .select(entity)
.distinct() .distinct()
@ -406,6 +437,8 @@ impl UpEndConnection {
pub fn get_all_attributes(&self) -> Result<Vec<String>> { pub fn get_all_attributes(&self) -> Result<Vec<String>> {
use crate::database::inner::schema::data::dsl::*; use crate::database::inner::schema::data::dsl::*;
let _lock = self.lock.read().unwrap();
let result = data let result = data
.select(attribute) .select(attribute)
.distinct() .distinct()