226 lines
6.6 KiB
Rust
226 lines
6.6 KiB
Rust
use std::{
|
|
borrow::Borrow,
|
|
collections::VecDeque,
|
|
marker::PhantomData,
|
|
path::Path,
|
|
sync::{Arc, RwLock, atomic::AtomicBool},
|
|
};
|
|
|
|
use futures::task::AtomicWaker;
|
|
use redb::{Error, Key, ReadableDatabase, TableDefinition, Value};
|
|
use serde::{Serialize, de::DeserializeOwned};
|
|
|
|
const USERS: TableDefinition<uuid::Uuid, Vec<u8>> = TableDefinition::new("users");
|
|
const SERVERS: TableDefinition<uuid::Uuid, Vec<u8>> = TableDefinition::new("servers");
|
|
const SETTINGS: TableDefinition<uuid::Uuid, Vec<u8>> = TableDefinition::new("settings");
|
|
|
|
#[derive(Debug)]
|
|
pub struct TableInner<T> {
|
|
db: Arc<T>,
|
|
}
|
|
|
|
impl<T> Clone for TableInner<T> {
|
|
fn clone(&self) -> Self {
|
|
Self {
|
|
db: Arc::clone(&self.db),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<T> TableInner<T> {
|
|
fn new(db: Arc<T>) -> Self {
|
|
Self { db }
|
|
}
|
|
}
|
|
|
|
impl TableInner<DatabaseHandle> {
|
|
async fn get<'a, K: Key, V: Serialize + DeserializeOwned>(
|
|
&self,
|
|
table: TableDefinition<'static, K, Vec<u8>>,
|
|
key: impl Borrow<K::SelfType<'a>>,
|
|
) -> Result<Option<V>> {
|
|
let db: &redb::Database = &self.db.as_ref().database;
|
|
let db_reader = db.begin_read()?;
|
|
let table = db_reader.open_table(table)?;
|
|
table
|
|
.get(key)?
|
|
.map(|value| bson::deserialize_from_slice(&value.value()))
|
|
.transpose()
|
|
.map_err(|e| redb::Error::Io(std::io::Error::other(e)))
|
|
}
|
|
|
|
async fn insert<
|
|
'a,
|
|
'b,
|
|
K: Key + Send + Sync,
|
|
V: Serialize + DeserializeOwned + Send + Sync + 'a,
|
|
>(
|
|
&'b self,
|
|
table: TableDefinition<'static, K, Vec<u8>>,
|
|
key: impl Borrow<K::SelfType<'a>> + Send + 'b,
|
|
value: V,
|
|
) -> Result<Option<V>> {
|
|
let db: &redb::Database = &self.db.as_ref().database;
|
|
// self.db
|
|
// .writing
|
|
// .store(true, std::sync::atomic::Ordering::SeqCst);
|
|
|
|
// let out = tokio::task::spawn_blocking(move || -> Result<Option<V>>
|
|
|
|
let out = tokio::task::spawn_blocking(|| -> Result<Option<V>> {
|
|
let db_writer = db.begin_write()?;
|
|
let out = {
|
|
let mut table = db_writer.open_table(table)?;
|
|
let serialized_value = bson::serialize_to_vec(&value)
|
|
.map_err(|e| redb::Error::Io(std::io::Error::other(e)))?;
|
|
let previous = table.insert(key, &serialized_value)?;
|
|
let out = previous
|
|
.map(|value| bson::deserialize_from_slice(&value.value()))
|
|
.transpose()
|
|
.map_err(|e| redb::Error::Io(std::io::Error::other(e)));
|
|
out
|
|
};
|
|
db_writer.commit()?;
|
|
out
|
|
})
|
|
.await
|
|
.expect("Task panicked");
|
|
|
|
out
|
|
}
|
|
}
|
|
|
|
// impl<K: Key, V: Serialize + DeserializeOwned> Table<K, V> for TableInner {
|
|
// async fn get(&self, key: K) -> Result<Option<Value>> {}
|
|
// async fn insert(&self, key: K, value: V) -> Result<Option<Value>> {}
|
|
// async fn modify(&self, key: K, v: FnOnce(V) -> V) -> Result<bool> {}
|
|
// async fn remove(&self, key: K) -> Result<Option<Value>> {}
|
|
// }
|
|
|
|
#[derive(Debug)]
|
|
pub struct Users<T>(TableInner<T>);
|
|
|
|
impl<T> Clone for Users<T> {
|
|
fn clone(&self) -> Self {
|
|
Self(self.0.clone())
|
|
}
|
|
}
|
|
impl<T> Users<T> {
|
|
const TABLE: TableDefinition<'static, uuid::Uuid, Vec<u8>> = USERS;
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct Servers<T>(TableInner<T>);
|
|
impl<T> Clone for Servers<T> {
|
|
fn clone(&self) -> Self {
|
|
Self(self.0.clone())
|
|
}
|
|
}
|
|
impl<T> Servers<T> {
|
|
const TABLE: TableDefinition<'static, uuid::Uuid, Vec<u8>> = SERVERS;
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct Settings<T>(TableInner<T>);
|
|
impl<T> Clone for Settings<T> {
|
|
fn clone(&self) -> Self {
|
|
Self(self.0.clone())
|
|
}
|
|
}
|
|
impl<T> Settings<T> {
|
|
const TABLE: TableDefinition<'static, uuid::Uuid, Vec<u8>> = SETTINGS;
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct Database {
|
|
users: Users<DatabaseHandle>,
|
|
servers: Servers<DatabaseHandle>,
|
|
settings: Settings<DatabaseHandle>,
|
|
handle: Arc<DatabaseHandle>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct DatabaseHandle {
|
|
database: redb::Database,
|
|
writing: AtomicBool,
|
|
wakers: RwLock<VecDeque<AtomicWaker>>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct DatabaseWriterGuard<'a> {
|
|
handle: &'a DatabaseHandle,
|
|
dropper: Arc<AtomicBool>,
|
|
}
|
|
|
|
// impl Drop for DatabaseWriterGuard<'_> {
|
|
// fn drop(&mut self) {
|
|
// self.handle
|
|
// .writing
|
|
// .store(false, std::sync::atomic::Ordering::SeqCst);
|
|
// let is_panicking = std::thread::panicking();
|
|
// let Ok(writer) = self.handle.wakers.write() else {
|
|
// if is_panicking {
|
|
// return;
|
|
// } else {
|
|
// panic!("Wakers lock poisoned");
|
|
// }
|
|
// }
|
|
// if let Some(waker) = (self.handle.wakers.write()).pop() {
|
|
// waker.wake();
|
|
// };
|
|
// // let mut wakers = self.handle.wakers.write().expect();
|
|
// // if let Some(waker) = self.handle.wakers.write().expect("Wakers lock poisoned").pop_front() {
|
|
// // waker.wake();
|
|
// // }
|
|
// // while let Some(waker) = wakers.pop_front() {
|
|
// // waker.wake();
|
|
// // }
|
|
// }
|
|
// }
|
|
|
|
type Result<O, E = redb::Error> = core::result::Result<O, E>;
|
|
|
|
pub trait Table<K: Key> {
|
|
fn insert<V: Serialize + DeserializeOwned>(
|
|
&self,
|
|
key: K,
|
|
value: V,
|
|
) -> impl Future<Output = Result<Option<V>>> + Send;
|
|
fn modify<V: Serialize + DeserializeOwned, O: Serialize + DeserializeOwned>(
|
|
&self,
|
|
key: K,
|
|
v: impl FnOnce(V) -> O,
|
|
) -> impl Future<Output = Result<bool>> + Send;
|
|
fn remove<V: Serialize + DeserializeOwned>(
|
|
&self,
|
|
key: K,
|
|
) -> impl Future<Output = Result<Option<V>>> + Send;
|
|
fn get<V: Serialize + DeserializeOwned>(
|
|
&self,
|
|
key: K,
|
|
) -> impl Future<Output = Result<Option<V>>> + Send;
|
|
}
|
|
|
|
impl Database {
|
|
pub fn create(path: impl AsRef<Path>) -> Result<Self, Error> {
|
|
let writing = AtomicBool::new(false);
|
|
let wakers = RwLock::new(VecDeque::new());
|
|
let db = redb::Database::create(path)?;
|
|
let db = Arc::new(DatabaseHandle {
|
|
database: db,
|
|
writing,
|
|
wakers,
|
|
});
|
|
let table_inner = TableInner::new(Arc::clone(&db));
|
|
let users = Users(table_inner.clone());
|
|
let servers = Servers(table_inner.clone());
|
|
let settings = Settings(table_inner.clone());
|
|
Ok(Self {
|
|
servers,
|
|
users,
|
|
settings,
|
|
handle: db,
|
|
})
|
|
}
|
|
}
|