1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use super::{Slice, SyncDatabase};
use agilulf_protocol::{DatabaseError, DatabaseResult as Result};
use crate::storage::error::StorageResult;
use agilulf_protocol::Command;
use agilulf_skiplist::SkipMap;
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering;
#[derive(Clone)]
enum Value {
NotExist,
Slice(Slice),
}
impl Default for Value {
fn default() -> Self {
Value::NotExist
}
}
pub struct MemDatabase {
inner: AtomicPtr<SkipMap<Value>>,
}
impl MemDatabase {
pub fn restore_from_iterator<I: Iterator<Item = Command>>(
iter: I,
) -> StorageResult<MemDatabase> {
let mem_db = MemDatabase::default();
for command in iter {
match command {
Command::PUT(command) => {
SyncDatabase::put_sync(&mem_db, command.key, command.value)?;
}
Command::DELETE(command) => {
SyncDatabase::delete_sync(&mem_db, command.key)?;
}
_ => unreachable!(),
}
}
Ok(mem_db)
}
pub fn large_enough(&self) -> bool {
unsafe { (*self.inner.load(Ordering::SeqCst)).len() > 4 * 1024 }
}
}
impl Default for MemDatabase {
fn default() -> Self {
MemDatabase {
inner: AtomicPtr::new(Box::into_raw(box SkipMap::default())),
}
}
}
impl Drop for MemDatabase {
fn drop(&mut self) {
let skip_map = self.inner.load(Ordering::SeqCst);
unsafe { drop(Box::from_raw(skip_map)) }
}
}
impl SyncDatabase for MemDatabase {
fn get_sync(&self, key: Slice) -> Result<Slice> {
unsafe {
match (*self.inner.load(Ordering::SeqCst)).find(&key) {
Some(value) => match value {
Value::NotExist => Err(DatabaseError::KeyNotFound),
Value::Slice(value) => Ok(value),
},
None => Err(DatabaseError::KeyNotFound),
}
}
}
fn put_sync(&self, key: Slice, value: Slice) -> Result<()> {
unsafe {
(*self.inner.load(Ordering::SeqCst)).insert(&key, &Value::Slice(value));
}
Ok(())
}
fn scan_sync(&self, start: Slice, end: Slice) -> Vec<(Slice, Slice)> {
unsafe {
(*self.inner.load(Ordering::SeqCst))
.scan(start..end)
.into_iter()
.filter_map(|(key, value)| match value {
Value::Slice(value) => Some((key, value)),
Value::NotExist => None,
})
.collect()
}
}
fn delete_sync(&self, key: Slice) -> Result<()> {
unsafe {
(*self.inner.load(Ordering::SeqCst)).insert(&key, &Value::NotExist);
}
Ok(())
}
}