Skip to content

Commit

Permalink
started work on meta raft log storage impl
Browse files Browse the repository at this point in the history
* added tvec! macro
* fixed error hierarchy
* fixed mutability in traits
* added vote support

Signed-off-by: Sienna Lloyd <[email protected]>
  • Loading branch information
siennathesane committed Sep 5, 2024
1 parent 2136dca commit 6049a98
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 116 deletions.
4 changes: 0 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ criterion = "0.5.1"
serial_test = "2.0.0"
tempdir = "0.3.7"

# todo (sienna): update this at a later point
[target.bin]
rustflags = ["-C target-feature=+crt-static"]

[[bench]]
name = "data"
harness = false
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,8 @@ mod network;
mod typedef;
pub mod storage;
pub mod utils;

#[macro_export]
macro_rules! tvec [
($t:ty; $($e:expr),*) => { vec![$($e as $t),*] as Vec<$t> }
];
128 changes: 76 additions & 52 deletions src/storage/db.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;

Expand All @@ -13,31 +14,16 @@ use rocksdb::{
TransactionDBOptions,
};

use nova_api::raft::v1::{
ColumnFamilies,
ColumnFamilyDescriptor as nColumnFamilyDescriptor,
KeyValuePair,
LogState,
MetaKeyValuePair,
MetaLogId,
MetaVote,
Vote,
};
use nova_api::raft::v1::{ColumnFamilies, ColumnFamilyDescriptor as nColumnFamilyDescriptor, KeyValuePair, LogId, LogState, MetaKeyValuePair, MetaVote, Vote};

use crate::storage::{
COLUMN_FAMILY_DESCRIPTOR_KEY,
ColumnFamilyEncoding,
DEFAULT_DB_PATH,
MetaKeyValueStore,
MetaRaftLogStorage,
StorageError,
};
use crate::storage::MetaKeyValueStoreError::*;
use crate::storage::MetaRaftLogStorageError::{
MissingVote,
ZeroLengthVote,
};
use crate::storage::StorageError::{MetaKeyValueStoreError, MissingShardId};
use crate::storage::{COLUMN_FAMILY_DESCRIPTOR_KEY, ColumnFamilyEncoding, DEFAULT_DB_PATH, MetaKeyValueStore, MetaRaftLogStorage, StorageError};
use crate::storage::IOError::{DiskEngineError, MissingColumnFamily};
use crate::storage::MetaKeyValueStoreError::{GeneralKeyValueError, MissingKeyValuePair, ZeroLengthKey};
use crate::storage::MetaRaftLogStorageError::{MissingVote, ZeroLengthVote, };
use crate::storage::StorageError::{IOError, MetaKeyValueStoreError, MissingShardId, TranscodeError};
use crate::storage::TranscodeError::{KeyValuePairDecodeError, KeyValuePairEncodeError, VoteDecodeError, VoteEncodeError};
use crate::tvec;
use crate::typedef::ShardId;

/// The default disk storage implementation in Pleiades. The underlying storage is provided by
/// RocksDB.
Expand Down Expand Up @@ -78,10 +64,10 @@ impl DiskStorage {
None => if create_if_not_exists {
match self.create_cf(key.shard) {
Ok(v) => v,
Err(_) => return Err(MetaKeyValueStoreError(GeneralError("cannot create column family")))
Err(_) => return Err(MetaKeyValueStoreError(GeneralKeyValueError("cannot create column family")))
}
} else {
return Err(MetaKeyValueStoreError(MissingColumnFamily(key.shard)));
return Err(IOError(MissingColumnFamily(key.shard)));
},
Some(v) => v,
};
Expand Down Expand Up @@ -112,7 +98,7 @@ impl DiskStorage {
let cf_handle = match self.db.cf_handle(&vote.shard_id.to_string()) {
None => match self.create_cf(vote.shard_id) {
Ok(v) => v,
Err(_) => return Err(MetaKeyValueStoreError(GeneralError("cannot create column family")))
Err(_) => return Err(MetaKeyValueStoreError(GeneralKeyValueError("cannot create column family")))
},
Some(v) => v,
};
Expand All @@ -127,38 +113,76 @@ impl DiskStorage {
fn create_cf(&self, shard_id: u64) -> Result<Arc<BoundColumnFamily>, StorageError> {
match self.db.create_cf(shard_id.to_string(), &Options::default()) {
Ok(_) => {}
Err(e) => return Err(MetaKeyValueStoreError(DiskEngineError(e)))
Err(e) => return Err(IOError(DiskEngineError(e)))
};

match self.db.cf_handle(&shard_id.to_string()) {
None => Err(MetaKeyValueStoreError(GeneralError("cannot create column family"))),
None => Err(MetaKeyValueStoreError(GeneralKeyValueError("cannot create column family"))),
Some(v) => Ok(v),
}
}
}

impl MetaRaftLogStorage for DiskStorage {
fn get_log_state(&mut self) -> std::result::Result<LogState, StorageError> {
fn get_log_state(&self, shard_id: ShardId) -> std::result::Result<LogState, StorageError> {
todo!()
}

fn save_vote(&self, vote: &MetaVote) -> std::result::Result<(), StorageError> {
todo!()
fn save_vote(&self, shard_id: ShardId, vote: Vote) -> std::result::Result<(), StorageError> {
let cf_handle = self.db.cf_handle(&shard_id.to_string())
.unwrap_or(match self.create_cf(shard_id) {
Ok(v) => v,
Err(_) => return Err(IOError(MissingColumnFamily(shard_id))),
});

let tx = self.db.transaction();
let mut buf = tvec![u8; vote.encoded_len()];
match vote.encode(&mut buf) {
Ok(_) => {}
Err(e) => return Err(TranscodeError(VoteEncodeError(e)))
};

match tx.put_cf(&cf_handle, b"vote", buf) {
Ok(_) => {}
Err(e) => return Err(IOError(DiskEngineError(e)))
};

match tx.commit() {
Ok(_) => Ok(()),
Err(e) => Err(IOError(DiskEngineError(e)))
}
}

fn read_vote(&mut self) -> std::result::Result<Option<Vote>, StorageError> {
todo!()
fn read_vote(&self, shard_id: ShardId) -> std::result::Result<Option<Vote>, StorageError> {
let cf_handle = match self.db.cf_handle(&shard_id.to_string()) {
None => return Err(IOError(MissingColumnFamily(shard_id))),
Some(v) => v,
};

let vote_bytes = match self.db.get_cf(&cf_handle, b"vote") {
Ok(v) => match v {
None => return Ok(None),
Some(v) => v,
}
Err(e) => return Err(IOError(DiskEngineError(e)))
};

let buf = Bytes::from(vote_bytes);
match Vote::decode(buf) {
Ok(v) => Ok(Some(v)),
Err(e) => Err(TranscodeError(VoteDecodeError(e)))
}
}

fn append<I>(&mut self, entries: Vec<I>) -> std::result::Result<(), StorageError> {
fn append<I>(&self, shard_id: ShardId, entries: Vec<I>) -> std::result::Result<(), StorageError> {
todo!()
}

fn truncate(&mut self, log_id: &MetaLogId) -> std::result::Result<(), StorageError> {
fn truncate(&self, shard_id: ShardId, log_id: &LogId) -> std::result::Result<(), StorageError> {
todo!()
}

fn purge(&mut self, log_id: &MetaLogId) -> std::result::Result<(), StorageError> {
fn purge(&self, shard_id: ShardId, log_id: &LogId) -> std::result::Result<(), StorageError> {
todo!()
}
}
Expand All @@ -175,7 +199,7 @@ impl MetaKeyValueStore for DiskStorage {
let buf = Bytes::from(b);
match KeyValuePair::decode(buf) {
Ok(v) => v,
Err(e) => return Err(MetaKeyValueStoreError(KeyValuePairDecodeError(e)))
Err(e) => return Err(TranscodeError(KeyValuePairDecodeError(e)))
}
}
}
Expand All @@ -191,18 +215,18 @@ impl MetaKeyValueStore for DiskStorage {
let mut buf = vec![];
match kvp.encode(&mut buf) {
Ok(_) => {}
Err(e) => return Err(MetaKeyValueStoreError(KeyValuePairEncodeError(e)))
Err(e) => return Err(TranscodeError(KeyValuePairEncodeError(e)))
}

let tx = self.db.transaction();
match tx.put_cf(&cf_handle, kvp.key, buf) {
Ok(_) => {}
Err(e) => return Err(MetaKeyValueStoreError(DiskEngineError(e)))
Err(e) => return Err(IOError(DiskEngineError(e)))
};

match tx.commit() {
Ok(_) => {}
Err(e) => return Err(MetaKeyValueStoreError(DiskEngineError(e)))
Err(e) => return Err(IOError(DiskEngineError(e)))
};

Ok(())
Expand All @@ -214,12 +238,12 @@ impl MetaKeyValueStore for DiskStorage {
let tx = self.db.transaction();
match tx.delete_cf(&cf_handle, kvp.key) {
Ok(_) => {}
Err(e) => return Err(MetaKeyValueStoreError(DiskEngineError(e)))
Err(e) => return Err(IOError(DiskEngineError(e)))
};

match tx.commit() {
Ok(_) => Ok(()),
Err(e) => Err(MetaKeyValueStoreError(DiskEngineError(e)))
Err(e) => Err(IOError(DiskEngineError(e)))
}
}
}
Expand Down Expand Up @@ -317,9 +341,9 @@ mod test {
use nova_api::raft::v1::{ColumnFamilies, ColumnFamilyDescriptor, KeyValuePair, MetaKeyValuePair};
use nova_api::raft::v1::ColumnFamilyType::{Config, Data, RaftLog, Unspecified};

use crate::storage::{COLUMN_FAMILY_DESCRIPTOR_KEY, ColumnFamilyEncoding, MetaKeyValueStore,StorageError};
use crate::storage::MetaKeyValueStoreError::{DiskEngineError, IoError};
use crate::storage::StorageError::MetaKeyValueStoreError;
use crate::storage::{COLUMN_FAMILY_DESCRIPTOR_KEY, ColumnFamilyEncoding, MetaKeyValueStore, StorageError};
use crate::storage::IOError::{DiskEngineError, IoError};
use crate::storage::StorageError::{IOError, MetaKeyValueStoreError};
use crate::utils::disk::{clear_tmp_dir, TEST_ROCKSDB_PATH};

use super::{ColumnFamilyEncoder, DiskStorage};
Expand All @@ -328,7 +352,7 @@ mod test {
fn open_blank_db() -> Result<(), StorageError> {
let temp_dir = match TempDir::new("open_existing_column") {
Ok(v) => v,
Err(e) => return Err(MetaKeyValueStoreError(IoError(e)))
Err(e) => return Err(IOError(IoError(e)))
};
let db_path = temp_dir.path().to_str().unwrap().to_string();

Expand All @@ -343,7 +367,7 @@ mod test {
// clear the directory so we can write a new db, then open an existing one
let temp_dir = match TempDir::new("open_existing_column") {
Ok(v) => v,
Err(e) => return Err(MetaKeyValueStoreError(IoError(e)))
Err(e) => return Err(IOError(IoError(e)))
};
let db_path = temp_dir.path().to_str().unwrap().to_string();

Expand Down Expand Up @@ -374,7 +398,7 @@ mod test {

match db.create_cf(ColumnFamilyEncoder::default().encode(&cfd), &opts) {
Ok(_) => {}
Err(e) => return Err(MetaKeyValueStoreError(DiskEngineError(e)))
Err(e) => return Err(IOError(DiskEngineError(e)))
}
cfs.column_families.push(cfd);
}
Expand All @@ -385,7 +409,7 @@ mod test {

match db.put(COLUMN_FAMILY_DESCRIPTOR_KEY.as_bytes(), buf) {
Ok(_) => {}
Err(e) => return Err(MetaKeyValueStoreError(DiskEngineError(e)))
Err(e) => return Err(IOError(DiskEngineError(e)))
}

// close
Expand All @@ -407,7 +431,7 @@ mod test {

match clear_tmp_dir() {
Ok(_) => {}
Err(e) => return Err(MetaKeyValueStoreError(IoError(e)))
Err(e) => return Err(IOError(IoError(e)))
}

let ds = DiskStorage::new(TEST_ROCKSDB_PATH.to_string());
Expand Down Expand Up @@ -453,7 +477,7 @@ mod test {
// cleanup
match clear_tmp_dir() {
Ok(_) => Ok(()),
Err(e) => Err(MetaKeyValueStoreError(IoError(e)))
Err(e) => Err(IOError(IoError(e)))
}
}
}
Loading

0 comments on commit 6049a98

Please sign in to comment.