fix: add global locks to db, fix sqlite errors (?)
parent
38e6d6b6df
commit
41d0e55ca7
|
@ -32,7 +32,7 @@ use log::{debug, trace};
|
|||
use std::convert::{TryFrom, TryInto};
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -77,6 +77,8 @@ pub struct OpenResult {
|
|||
|
||||
pub struct UpEndDatabase {
|
||||
pool: DbPool,
|
||||
lock: Arc<RwLock<()>>,
|
||||
transaction_lock: Arc<Mutex<()>>,
|
||||
pub vault_path: Arc<PathBuf>,
|
||||
pub db_path: Arc<PathBuf>,
|
||||
}
|
||||
|
@ -119,6 +121,8 @@ impl UpEndDatabase {
|
|||
|
||||
let db = UpEndDatabase {
|
||||
pool,
|
||||
lock: Arc::new(RwLock::new(())),
|
||||
transaction_lock: Arc::new(Mutex::new(())),
|
||||
vault_path: Arc::new(dirpath.as_ref().canonicalize()?),
|
||||
db_path: Arc::new(upend_path),
|
||||
};
|
||||
|
@ -164,6 +168,8 @@ impl UpEndDatabase {
|
|||
pub fn connection(&self) -> Result<UpEndConnection> {
|
||||
Ok(UpEndConnection {
|
||||
conn: self.pool.get()?,
|
||||
lock: self.lock.clone(),
|
||||
transaction_lock: self.transaction_lock.clone(),
|
||||
vault_path: self.vault_path.clone(),
|
||||
})
|
||||
}
|
||||
|
@ -171,6 +177,8 @@ impl UpEndDatabase {
|
|||
|
||||
pub struct UpEndConnection {
|
||||
conn: PooledConnection<ConnectionManager<SqliteConnection>>,
|
||||
transaction_lock: Arc<Mutex<()>>,
|
||||
lock: Arc<RwLock<()>>,
|
||||
vault_path: Arc<PathBuf>,
|
||||
}
|
||||
|
||||
|
@ -184,7 +192,8 @@ impl UpEndConnection {
|
|||
F: FnOnce() -> Result<T, E>,
|
||||
E: From<Error>,
|
||||
{
|
||||
self.conn.transaction(f)
|
||||
let _lock = self.transaction_lock.lock().unwrap();
|
||||
self.conn.exclusive_transaction(f)
|
||||
}
|
||||
|
||||
pub fn get_meta<S: AsRef<str>>(&self, key: S) -> Result<String> {
|
||||
|
@ -193,6 +202,8 @@ impl UpEndConnection {
|
|||
|
||||
debug!("Querying META:{key}");
|
||||
|
||||
let _lock = self.lock.read().unwrap();
|
||||
|
||||
dsl::meta
|
||||
.filter(dsl::key.eq(key))
|
||||
.load::<models::MetaValue>(&self.conn)?
|
||||
|
@ -210,6 +221,8 @@ impl UpEndConnection {
|
|||
Address::Hash(Hash((&file.hash).clone()))
|
||||
);
|
||||
|
||||
let _lock = self.lock.write().unwrap();
|
||||
|
||||
diesel::insert_into(files::table)
|
||||
.values(&file)
|
||||
.execute(&self.conn)?;
|
||||
|
@ -226,6 +239,8 @@ impl UpEndConnection {
|
|||
pub fn retrieve_file(&self, obj_hash: &Hash) -> Result<Vec<models::OutFile>> {
|
||||
use crate::database::inner::schema::files::dsl::*;
|
||||
|
||||
let _lock = self.lock.read().unwrap();
|
||||
|
||||
let matches = files
|
||||
.filter(valid.eq(true))
|
||||
.filter(hash.eq(&obj_hash.0))
|
||||
|
@ -249,6 +264,7 @@ impl UpEndConnection {
|
|||
|
||||
pub fn retrieve_all_files(&self) -> Result<Vec<models::File>> {
|
||||
use crate::database::inner::schema::files::dsl::*;
|
||||
let _lock = self.lock.read().unwrap();
|
||||
let matches = files.load::<models::File>(&self.conn)?;
|
||||
Ok(matches)
|
||||
}
|
||||
|
@ -258,6 +274,8 @@ impl UpEndConnection {
|
|||
|
||||
debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time);
|
||||
|
||||
let _lock = self.lock.write().unwrap();
|
||||
|
||||
Ok(diesel::update(files.filter(id.eq(file_id)))
|
||||
.set(mtime.eq(m_time))
|
||||
.execute(&self.conn)?)
|
||||
|
@ -268,6 +286,8 @@ impl UpEndConnection {
|
|||
|
||||
debug!("Setting file ID {} to valid = {}", file_id, is_valid);
|
||||
|
||||
let _lock = self.lock.write().unwrap();
|
||||
|
||||
Ok(diesel::update(files.filter(id.eq(file_id)))
|
||||
.set(valid.eq(is_valid))
|
||||
.execute(&self.conn)?)
|
||||
|
@ -283,6 +303,8 @@ impl UpEndConnection {
|
|||
pub fn retrieve_entry(&self, hash: &Hash) -> Result<Option<Entry>> {
|
||||
use crate::database::inner::schema::data::dsl::*;
|
||||
|
||||
let _lock = self.lock.read().unwrap();
|
||||
|
||||
let entry = data
|
||||
.filter(identity.eq(Address::Hash(hash.clone()).encode()?))
|
||||
.load::<models::Entry>(&self.conn)?;
|
||||
|
@ -301,6 +323,8 @@ impl UpEndConnection {
|
|||
pub fn retrieve_object(&self, object_address: &Address) -> Result<Vec<Entry>> {
|
||||
use crate::database::inner::schema::data::dsl::*;
|
||||
|
||||
let _lock = self.lock.read().unwrap();
|
||||
|
||||
let primary = data
|
||||
.filter(entity.eq(object_address.encode()?))
|
||||
.or_filter(value_str.eq(EntryValue::Address(object_address.clone()).to_string()?))
|
||||
|
@ -337,6 +361,8 @@ impl UpEndConnection {
|
|||
|
||||
debug!("Deleting {}!", object_address);
|
||||
|
||||
let _lock = self.lock.write().unwrap();
|
||||
|
||||
let matches = data
|
||||
.filter(identity.eq(object_address.encode()?))
|
||||
.or_filter(entity.eq(object_address.encode()?))
|
||||
|
@ -348,6 +374,8 @@ impl UpEndConnection {
|
|||
pub fn query(&self, query: Query) -> Result<Vec<Entry>> {
|
||||
trace!("Querying: {:?}", query);
|
||||
|
||||
let _lock = self.lock.read().unwrap();
|
||||
|
||||
let entries = execute(&self.conn, query)?;
|
||||
let entries = entries
|
||||
.iter()
|
||||
|
@ -374,6 +402,7 @@ impl UpEndConnection {
|
|||
}
|
||||
|
||||
fn insert_model_entry(&self, entry: models::Entry) -> Result<usize> {
|
||||
let _lock = self.lock.write().unwrap();
|
||||
let result = diesel::insert_into(data::table)
|
||||
.values(&entry)
|
||||
.execute(&self.conn);
|
||||
|
@ -391,6 +420,8 @@ impl UpEndConnection {
|
|||
pub fn get_all_addresses(&self) -> Result<Vec<Address>> {
|
||||
use crate::database::inner::schema::data::dsl::*;
|
||||
|
||||
let _lock = self.lock.read().unwrap();
|
||||
|
||||
let result = data
|
||||
.select(entity)
|
||||
.distinct()
|
||||
|
@ -406,6 +437,8 @@ impl UpEndConnection {
|
|||
pub fn get_all_attributes(&self) -> Result<Vec<String>> {
|
||||
use crate::database::inner::schema::data::dsl::*;
|
||||
|
||||
let _lock = self.lock.read().unwrap();
|
||||
|
||||
let result = data
|
||||
.select(attribute)
|
||||
.distinct()
|
||||
|
|
Loading…
Reference in New Issue