Skip to content

Commit

Permalink
added append method and updated interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: Sienna Lloyd <[email protected]>
  • Loading branch information
siennathesane committed Sep 12, 2024
1 parent 33221bf commit abeb4f2
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 2,746 deletions.
2,713 changes: 0 additions & 2,713 deletions Cargo.lock

This file was deleted.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ bytes = "1.7.1"
mimalloc = { version = "0.1.43", default-features = false }
moka = { version = "0.12.8", features = ["sync"] }
nova-api = { path = "crates/nova-api/rust" }
openraft = { version = "0.9.16", features = ["storage-v2"] }
openraft = { version = "0.9.16", features = ["storage-v2", "serde"] }
prost = "0.13.2"
protobuf = { version = "3.5.1" }
rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb", rev = "834f60d", features = ["multi-threaded-cf"] }
serde = { version = "1.0.209", features = ["derive"] }
serde = { version = "1.0", features = ["derive"] }
sysinfo = "0.31.4"
thiserror = "1.0.63"
rand = "0.8.5"
tokio = "1.40.0"
tempfile = "3.12.0"
log = "0.4.22"
tracing = "0.1.40"
bincode = "1.3.3"

[dev-dependencies]
criterion = "0.5.1"
Expand Down
3 changes: 2 additions & 1 deletion src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use serde::{Deserialize, Serialize};

mod raft;

/// Information about the host node.
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct HostNode {
pub addr: String,
}
Expand Down
2 changes: 1 addition & 1 deletion src/network/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use openraft::raft::{
};
use openraft::{OptionalSend, RaftNetwork, RaftTypeConfig, Snapshot, Vote};
use std::future::Future;

use serde::{Deserialize, Serialize};
use crate::network::HostNode;
use crate::typedef::{NodeId, RaftShardConfig};

Expand Down
224 changes: 203 additions & 21 deletions src/storage/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@ use std::path::Path;
use std::sync::Arc;

use anyhow::Result;
use bytes::Bytes;
use bytes::{BufMut, Bytes, BytesMut};
use openraft::Entry;
use nova_api::raft::v1::{
ColumnFamilies, ColumnFamilyDescriptor as nColumnFamilyDescriptor, KeyValuePair, LogId,
LogState, MetaKeyValuePair, MetaLogId, MetaVote, Vote,
};
use prost::Message;
use rocksdb::{
BoundColumnFamily, ColumnFamilyDescriptor as rColumnFamilyDescriptor, IteratorMode, Options,
DB, DB as ReadOnlyDB,
};
use tracing::{error, instrument, span};
use rocksdb::{BoundColumnFamily, ColumnFamilyDescriptor as rColumnFamilyDescriptor, Error, IteratorMode, Options, DB, DB as ReadOnlyDB};
use tracing::{debug, error, instrument, span, trace};

use crate::storage::IOError::{DiskEngineError, MissingColumnFamily};
use crate::storage::MetaKeyValueStoreError::{
Expand All @@ -24,20 +22,18 @@ use crate::storage::MetaRaftLogStorageError::{MissingRaftLog, MissingVote, ZeroL
use crate::storage::StorageError::{
IOError, MetaKeyValueStoreError, MetaRaftLogStorageError, MissingShardId, TranscodeError,
};
use crate::storage::TranscodeError::{
KeyValuePairDecodeError, KeyValuePairEncodeError, RaftLogDecodeError, RaftLogEncodeError,
VoteDecodeError, VoteEncodeError,
};
use crate::storage::TranscodeError::{BincodeError, KeyValuePairDecodeError, KeyValuePairEncodeError, RaftLogDecodeError, RaftLogEncodeError, VoteDecodeError, VoteEncodeError};
use crate::storage::{
ColumnFamilyEncoding, MetaKeyValueStore, MetaRaftLogStorage, StorageError,
COLUMN_FAMILY_DESCRIPTOR_KEY, DEFAULT_DB_PATH,
};
use crate::typedef::ShardId;
use crate::typedef::{RaftShardConfig, ShardId};

const SEPARATOR: &'static str = "/";
const SEPARATOR: &[u8; 1] = b"/";
const VOTE_KEY: &'static [u8] = b"/system/vote";
const LAST_PURGED_LOG_KEY: &'static [u8] = b"/system/last_purged_log";
const LOG_PREFIX_KEY: &'static [u8] = b"/system/logs";
const DATA_PREFIX_KEY: &'static [u8] = b"/system/data";

/// The default disk storage implementation in Pleiades. The underlying storage is provided by
/// RocksDB.
Expand Down Expand Up @@ -241,6 +237,8 @@ impl MetaRaftLogStorage for DiskStorage {

#[instrument]
fn save_vote(&self, vote: MetaVote) -> std::result::Result<(), StorageError> {
debug!("saving vote for shard: {}", vote.shard_id);

let shard_id = vote.shard_id;
let cf_handle =
self.db
Expand All @@ -264,6 +262,8 @@ impl MetaRaftLogStorage for DiskStorage {

#[instrument]
fn read_vote(&self, shard_id: ShardId) -> std::result::Result<Option<Vote>, StorageError> {
debug!("reading vote for shard: {}", shard_id);

let cf_handle = match self.db.cf_handle(&shard_id.to_string()) {
None => return Err(IOError(MissingColumnFamily(shard_id))),
Some(v) => v,
Expand All @@ -285,22 +285,89 @@ impl MetaRaftLogStorage for DiskStorage {
}

#[instrument]
fn append<I: Debug>(
fn append<I: Debug + IntoIterator>(
&self,
shard_id: ShardId,
entries: Vec<I>,
) -> std::result::Result<(), StorageError> {
todo!()
entries: I,
) -> Result<(), StorageError> where I: IntoIterator<Item = Entry<RaftShardConfig>> {
debug!("appending logs to shard: {}", shard_id);

let cf_handle = match self.db.cf_handle(&shard_id.to_string()) {
None => return Err(IOError(MissingColumnFamily(shard_id))),
Some(v) => v,
};

for entry in entries {
trace!("appending entry: {:?}", entry);

let mut key = BytesMut::with_capacity(LOG_PREFIX_KEY.len() + SEPARATOR.len() + size_of::<u64>());
key.put_slice(LOG_PREFIX_KEY);
key.put_slice(SEPARATOR);
key.put_u64_le(entry.log_id.index);

let val = match bincode::serialize(&entry) {
Ok(v) => v,
Err(e) => return Err(TranscodeError(BincodeError(e))),
};

match self.db.put_cf(&cf_handle, key, val) {
Ok(_) => {}
Err(e) => return Err(IOError(DiskEngineError(e))),
}
}

Ok(())
}

#[instrument]
fn truncate(&self, log_id: &MetaLogId) -> std::result::Result<(), StorageError> {
todo!()
fn truncate(&self, log_id: &MetaLogId) -> Result<(), StorageError> {
debug!("truncating logs: [{:?}, +oo)", log_id);

let cf_handle = match self.db.cf_handle(&log_id.shard_id.to_string()) {
None => return Err(IOError(MissingColumnFamily(log_id.shard_id))),
Some(v) => v,
};

let log_id = match log_id.log_id {
Some(v) => v,
None => return Err(MetaRaftLogStorageError(MissingRaftLog(log_id.shard_id))),
};

let mut from = BytesMut::with_capacity(LOG_PREFIX_KEY.len() + SEPARATOR.len() + size_of::<u64>());
from.put_slice(LOG_PREFIX_KEY);
from.put_slice(SEPARATOR);
from.put_u64_le(log_id.index);

let mut to = BytesMut::with_capacity(LOG_PREFIX_KEY.len() + SEPARATOR.len() + size_of::<u64>());
to.put_slice(LOG_PREFIX_KEY);
to.put_slice(SEPARATOR);
to.put_u64_le(u64::MAX);

self.db.delete_range_cf(&cf_handle, from, to).map_err(|e| IOError(DiskEngineError(e)))
}

#[instrument]
fn purge(&self, log_id: &MetaLogId) -> std::result::Result<(), StorageError> {
todo!()
fn purge(&self, log_id: &MetaLogId) -> Result<(), StorageError> {
debug!("purging logs: [0, {:?}]", log_id);

let cf_handle = match self.db.cf_handle(&log_id.shard_id.to_string()) {
None => return Err(IOError(MissingColumnFamily(log_id.shard_id))),
Some(v) => v,
};

self.set_last_purged_log(log_id)?;

let mut from = BytesMut::with_capacity(LOG_PREFIX_KEY.len() + SEPARATOR.len() + size_of::<u64>());
from.put_slice(LOG_PREFIX_KEY);
from.put_slice(SEPARATOR);
from.put_u64_le(0);

let mut to = BytesMut::with_capacity(LOG_PREFIX_KEY.len() + SEPARATOR.len() + size_of::<u64>());
to.put_slice(LOG_PREFIX_KEY);
to.put_slice(SEPARATOR);
to.put_u64_le(log_id.log_id.unwrap().index);

self.db.delete_range_cf(&cf_handle, from, to).map_err(|e| IOError(DiskEngineError(e)))
}
}

Expand Down Expand Up @@ -441,6 +508,8 @@ mod tests {
use prost::Message;
use rocksdb::{Options, DB};
use std::ops::Rem;
use bytes::{BufMut, BytesMut};
use openraft::{Entry, EntryPayload};
use tempfile::TempDir;

use nova_api::raft::v1::ColumnFamilyType::{Config, Data, RaftLog, Unspecified};
Expand All @@ -455,9 +524,10 @@ mod tests {
ColumnFamilyEncoding, MetaKeyValueStore, MetaRaftLogStorage, StorageError,
COLUMN_FAMILY_DESCRIPTOR_KEY,
};
use crate::typedef::{NodeId, RaftShardConfig};
use crate::utils::disk::{clear_tmp_dir, TEST_ROCKSDB_PATH};

use super::{ColumnFamilyEncoder, DiskStorage, LAST_PURGED_LOG_KEY, VOTE_KEY};
use super::{ColumnFamilyEncoder, DiskStorage, LAST_PURGED_LOG_KEY, LOG_PREFIX_KEY, SEPARATOR, VOTE_KEY};

fn setup_db() -> DiskStorage {
let temp_dir = TempDir::new().unwrap();
Expand Down Expand Up @@ -865,4 +935,116 @@ mod tests {
let result = db.get_log_state(1);
assert!(matches!(result, Err(MetaRaftLogStorageError(_))));
}

#[test]
fn test_purge_success() {
let db = setup_db();
db.db.create_cf("1", &Options::default()).unwrap();
let cf_handle = db.db.cf_handle("1").unwrap();

// Insert some logs to be purged
for i in 0..10 {
let log_id = LogId {
leader_id: Some(CommittedLeaderId {
term: 1,
node_id: 1,
}),
index: i,
};
let mut buf = Vec::with_capacity(log_id.encoded_len());
log_id.encode(&mut buf).unwrap();
let mut key = BytesMut::with_capacity(LOG_PREFIX_KEY.len() + SEPARATOR.len() + size_of::<u64>());
key.put_slice(LOG_PREFIX_KEY);
key.put_slice(SEPARATOR);
key.put_u64_le(i);
db.db.put_cf(&cf_handle, key, buf).unwrap();
}

let log_id = LogId {
leader_id: Some(CommittedLeaderId {
term: 1,
node_id: 1,
}),
index: 10,
};
let meta_log_id = MetaLogId {
shard_id: 1,
log_id: Some(log_id),
};

let result = db.purge(&meta_log_id);
assert!(result.is_ok());

// Verify logs are purged
for i in 0..10 {
let mut key = BytesMut::with_capacity(LOG_PREFIX_KEY.len() + SEPARATOR.len() + size_of::<u64>());
key.put_slice(LOG_PREFIX_KEY);
key.put_slice(SEPARATOR);
key.put_u64_le(i);

let log = db.db.get_cf(&cf_handle, key).unwrap();
assert!(log.is_none());
}
}

#[test]
fn test_purge_missing_column_family() {
let db = setup_db();
let log_id = LogId {
leader_id: Some(CommittedLeaderId {
term: 1,
node_id: 1,
}),
index: 1,
};
let meta_log_id = MetaLogId {
shard_id: 1,
log_id: Some(log_id),
};

let result = db.purge(&meta_log_id);
assert!(matches!(result, Err(StorageError::IOError(_))));
}

#[test]
fn test_purge_missing_log_id() {
let db = setup_db();
db.db.create_cf("1", &Options::default()).unwrap();

let meta_log_id = MetaLogId {
shard_id: 1,
log_id: None,
};

let result = db.purge(&meta_log_id);
assert!(matches!(result, Err(StorageError::MetaRaftLogStorageError(_))));
}

#[test]
// nb (sienna): I have no idea if this is the correct way to test this
fn test_append_success() {
let db = setup_db();
db.db.create_cf("1", &Options::default()).unwrap();
let cf_handle = db.db.cf_handle("1").unwrap();

let entries: Vec<Entry<RaftShardConfig>> = (0..10).map(|i| {
Entry {
log_id: Entry::<RaftShardConfig>::default().log_id,
payload: EntryPayload::Blank,
}
}).collect();

let result = db.append(1, entries.clone());
assert!(result.is_ok());

for entry in entries {
let mut key = BytesMut::with_capacity(LOG_PREFIX_KEY.len() + SEPARATOR.len() + size_of::<u64>());
key.put_slice(LOG_PREFIX_KEY);
key.put_slice(SEPARATOR);
key.put_u64_le(entry.log_id.index);

let log = db.db.get_cf(&cf_handle, key).unwrap();
assert!(log.is_some());
}
}
}
8 changes: 3 additions & 5 deletions src/storage/memcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ use crate::storage::IOError::{DiskEngineError, IoError, MissingColumnFamily};
use crate::storage::MetaKeyValueStoreError::{GeneralKeyValueError, MissingKeyValuePair};
use crate::storage::MetaRaftLogStorageError::{MissingVote, ZeroLengthLeaderId, ZeroLengthVote};
use crate::storage::StorageError::{MetaKeyValueStoreError, MetaRaftLogStorageError};
use crate::storage::TranscodeError::{
KeyValuePairDecodeError, KeyValuePairEncodeError, RaftLogDecodeError, RaftLogEncodeError,
VoteDecodeError, VoteEncodeError,
};
use crate::storage::TranscodeError::{BincodeError, KeyValuePairDecodeError, KeyValuePairEncodeError, RaftLogDecodeError, RaftLogEncodeError, VoteDecodeError, VoteEncodeError};
use crate::storage::{MetaKeyValueStore, MetaRaftLogStorage, StorageError};
use crate::typedef::{ShardId, SYSTEM_SHARD_RANGE_START, SYSTEM_SHARD_RANGE_STOP};
use crate::utils::math::between;
Expand Down Expand Up @@ -201,7 +198,7 @@ impl MetaRaftLogStorage for WriteBackCache {
}

#[instrument]
fn append<I: Debug>(&self, shard_id: ShardId, entries: Vec<I>) -> Result<(), StorageError> {
fn append<I: Debug>(&self, shard_id: ShardId, entries: I) -> Result<(), StorageError> {
todo!()
}

Expand Down Expand Up @@ -268,6 +265,7 @@ impl KVPCache {
VoteEncodeError(vee) => error!(error = vee.to_string(), "POSSIBLE DATA CORRUPTION: failed to evict cache item due to encode error"),
RaftLogDecodeError(rde) => error!(error = rde.to_string(), "POSSIBLE DATA CORRUPTION: failed to evict cache item due to decode error"),
RaftLogEncodeError(ree) => error!(error = ree.to_string(), "POSSIBLE DATA CORRUPTION: failed to evict cache item due to encode error"),
BincodeError(se) => error!(error = se.to_string(), "POSSIBLE DATA CORRUPTION: failed to evict cache item due to bincode error"),
},
_ => error!(error = e.to_string(), "failed to evict cache item from disk storage"),
}
Expand Down
Loading

0 comments on commit abeb4f2

Please sign in to comment.