Skip to content

Commit d034c74

Browse files
authored
Merge branch 'main' into ST_YMAX
2 parents b9e08ac + fab743b commit d034c74

File tree

35 files changed

+566
-297
lines changed

35 files changed

+566
-297
lines changed

Cargo.lock

+4-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/binaries/metactl/snapshot.rs

+27-5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use anyhow::anyhow;
3131
use databend_common_base::base::tokio;
3232
use databend_common_meta_raft_store::config::RaftConfig;
3333
use databend_common_meta_raft_store::key_spaces::RaftStoreEntry;
34+
use databend_common_meta_raft_store::key_spaces::SMEntry;
3435
use databend_common_meta_raft_store::ondisk::DataVersion;
3536
use databend_common_meta_raft_store::ondisk::OnDisk;
3637
use databend_common_meta_raft_store::sm_v002::leveled_store::sys_data_api::SysDataApiRO;
@@ -175,17 +176,26 @@ async fn import_v002(
175176

176177
let snapshot_store = SnapshotStoreV002::new(DataVersion::V002, raft_config);
177178

178-
let (tx, join_handle) = snapshot_store.spawn_writer_thread("import_v002");
179+
let writer = snapshot_store.new_writer()?;
180+
181+
let (tx, join_handle) = writer.spawn_writer_thread("import_v002");
182+
183+
let mut last_applied = None;
179184

180185
for line in lines {
181186
let l = line?;
182187
let (tree_name, kv_entry): (String, RaftStoreEntry) = serde_json::from_str(&l)?;
183188

184189
if tree_name.starts_with("state_machine/") {
185190
// Write to snapshot
186-
let sm_entry = kv_entry.try_into().map_err(|err_str| {
191+
let sm_entry: SMEntry = kv_entry.try_into().map_err(|err_str| {
187192
anyhow::anyhow!("Failed to convert RaftStoreEntry to SMEntry: {}", err_str)
188193
})?;
194+
195+
if let Some(last) = sm_entry.last_applied() {
196+
last_applied = Some(last);
197+
}
198+
189199
tx.send(WriteEntry::Data(sm_entry)).await?;
190200
} else {
191201
// Write to sled tree
@@ -214,9 +224,21 @@ async fn import_v002(
214224

215225
tx.send(WriteEntry::Finish).await?;
216226

217-
let (_snapshot_store, snapshot_stat) = join_handle.await??;
218-
219-
eprintln!("Imported {} records, snapshot: {}", n, snapshot_stat,);
227+
let (temp_snapshot_data, snapshot_stat) = join_handle.await??;
228+
229+
let (snapshot_id, snapshot_data) = snapshot_store.commit_snapshot_data_gen_id(
230+
temp_snapshot_data,
231+
last_applied,
232+
snapshot_stat.entry_cnt,
233+
)?;
234+
235+
eprintln!(
236+
"Imported {} records, snapshot: {}; snapshot_path: {}; snapshot_stat: {}",
237+
n,
238+
snapshot_id.to_string(),
239+
snapshot_data.path(),
240+
snapshot_stat,
241+
);
220242
Ok(max_log_id)
221243
}
222244

src/meta/README.md

+6-6
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,11 @@ History versions that are not included in the above chart:
9696

9797
## Compatibility between databend-meta
9898

99-
| Meta version | Backward compatible with |
100-
|:---------------------|:-------------------------|
101-
| [0.9.41, 1.2.212) | [0.9.41, 1.2.212) |
102-
| [1.2.212, 1.2.476?) | [0.9.41, 1.2.476?) |
103-
| [1.2.476?, +∞) | [1.2.212, +∞) |
99+
| Meta version | Backward compatible with |
100+
|:--------------------|:-------------------------|
101+
| [0.9.41, 1.2.212) | [0.9.41, 1.2.212) |
102+
| [1.2.212, 1.2.479) | [0.9.41, 1.2.479) |
103+
| [1.2.479, +∞) | [1.2.212, +∞) |
104104

105105
TODO: fix the above version when merged
106106

@@ -116,7 +116,7 @@ TODO: fix the above version when merged
116116
In this version, databend-meta raft-server introduced a new API `install_snapshot_v1()`.
117117
The raft-client will try to use either this new API or the original `install_snapshot()`.
118118

119-
- `1.2.476?` Remove: `install_snapshot()`(v0) from client and server.
119+
- `1.2.479` Remove: `install_snapshot()`(v0) from client and server.
120120
The `install_snapshot_v1()` is the only API to install snapshot, and becomes **REQUIRED** for the client.
121121

122122

src/meta/raft-store/src/config.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ impl RaftConfig {
194194
///
195195
/// Raft will choose a random timeout in this range for next election.
196196
pub fn election_timeout(&self) -> (u64, u64) {
197-
(self.heartbeat_interval * 5, self.heartbeat_interval * 7)
197+
(self.heartbeat_interval * 2, self.heartbeat_interval * 3)
198198
}
199199

200200
pub fn check(&self) -> std::result::Result<(), MetaStartupError> {

src/meta/raft-store/src/key_spaces.rs

+15
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use databend_common_meta_sled_store::SledOrderedSerde;
2121
use databend_common_meta_sled_store::SledSerde;
2222
use databend_common_meta_stoerr::MetaStorageError;
2323
use databend_common_meta_types::Entry;
24+
use databend_common_meta_types::LogId;
2425
use databend_common_meta_types::LogIndex;
2526
use databend_common_meta_types::Node;
2627
use databend_common_meta_types::NodeId;
@@ -209,6 +210,20 @@ impl SMEntry {
209210

210211
unreachable!("unknown prefix: {}", prefix);
211212
}
213+
214+
pub fn last_applied(&self) -> Option<LogId> {
215+
match self {
216+
Self::StateMachineMeta { key, value } => {
217+
if *key == StateMachineMetaKey::LastApplied {
218+
let last: LogId = value.clone().try_into().unwrap();
219+
Some(last)
220+
} else {
221+
None
222+
}
223+
}
224+
_ => None,
225+
}
226+
}
212227
}
213228

214229
/// Enum of key-value pairs that are used in the raft storage impl for meta-service.

src/meta/raft-store/src/ondisk/mod.rs

+23-4
Original file line numberDiff line numberDiff line change
@@ -285,8 +285,13 @@ impl OnDisk {
285285
let tree = self.db.open_tree(sm_tree_name)?;
286286

287287
let snapshot_store = SnapshotStoreV002::new(DataVersion::V002, self.config.clone());
288+
let mut last_applied = None;
288289

289-
let (tx, join_handle) = snapshot_store.spawn_writer_thread("upgrade-v001-to-v002-snapshot");
290+
let writer = snapshot_store
291+
.new_writer()
292+
.map_err(|e| snap_err(e, "new snapshot writer"))?;
293+
294+
let (tx, join_handle) = writer.spawn_writer_thread("upgrade-v001-to-v002-snapshot");
290295

291296
for ivec_pair_res in tree.iter() {
292297
let sm_entry = {
@@ -311,6 +316,11 @@ impl OnDisk {
311316
continue;
312317
}
313318

319+
if let Some(last) = sm_entry.last_applied() {
320+
self.progress(format_args!("found state machine last_applied: {}", last));
321+
last_applied = Some(last);
322+
}
323+
314324
tx.send(WriteEntry::Data(sm_entry))
315325
.await
316326
.map_err(|e| snap_err(e, "send SMEntry"))?;
@@ -320,15 +330,24 @@ impl OnDisk {
320330
.await
321331
.map_err(|e| snap_err(e, "send Commit"))?;
322332

323-
let (snapshot_store, snapshot_stat) = join_handle
333+
let (temp_snapshot_data, snapshot_stat) = join_handle
324334
.await
325335
.map_err(|e| snap_err(e, "join snapshot writer thread"))?
326336
.map_err(|e| snap_err(e, "writer error"))?;
327337

338+
if snapshot_stat.entry_cnt > 0 {
339+
assert!(last_applied.is_some(), "last_applied must be Some");
340+
}
341+
342+
let (snapshot_id, snapshot_data) = snapshot_store
343+
.commit_snapshot_data_gen_id(temp_snapshot_data, last_applied, snapshot_stat.entry_cnt)
344+
.map_err(|e| snap_err(e, "commit snapshot data"))?;
345+
328346
self.progress(format_args!(
329-
"Written to snapshot: {}, path: {}",
347+
"Written to snapshot: {}, {}; path: {}",
348+
snapshot_id.to_string(),
330349
snapshot_stat,
331-
snapshot_store.snapshot_path(&snapshot_stat.snapshot_id.to_string())
350+
snapshot_data.path()
332351
));
333352

334353
Ok(())

src/meta/raft-store/src/sm_v002/leveled_store/arc_level_impl.rs src/meta/raft-store/src/sm_v002/leveled_store/immutable.rs

+46-5
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::borrow::Borrow;
1616
use std::io;
17+
use std::ops::Deref;
1718
use std::ops::RangeBounds;
1819
use std::sync::Arc;
1920

@@ -27,10 +28,50 @@ use crate::sm_v002::leveled_store::map_api::MarkedOf;
2728
use crate::sm_v002::marked::Marked;
2829
use crate::state_machine::ExpireKey;
2930

30-
impl Level {
31+
/// A single **immutable** level of state machine data.
32+
///
33+
/// Immutable level implement only [`MapApiRO`], but not [`MapApi`].
34+
///
35+
/// [`MapApi`]: crate::sm_v002::leveled_store::map_api::MapApi
36+
#[derive(Debug, Clone)]
37+
pub struct Immutable {
38+
level: Arc<Level>,
39+
}
40+
41+
impl Immutable {
42+
pub fn new(level: Arc<Level>) -> Self {
43+
Self { level }
44+
}
45+
46+
pub fn new_from_level(level: Level) -> Self {
47+
Self {
48+
level: Arc::new(level),
49+
}
50+
}
51+
52+
pub fn inner(&self) -> &Arc<Level> {
53+
&self.level
54+
}
55+
}
56+
57+
impl AsRef<Level> for Immutable {
58+
fn as_ref(&self) -> &Level {
59+
self.level.as_ref()
60+
}
61+
}
62+
63+
impl Deref for Immutable {
64+
type Target = Level;
65+
66+
fn deref(&self) -> &Self::Target {
67+
self.level.as_ref()
68+
}
69+
}
70+
71+
impl Immutable {
3172
/// Build a static stream that yields key values for primary index
3273
#[futures_async_stream::try_stream(boxed, ok = MapKV<String>, error = io::Error)]
33-
async fn str_range<Q, R>(self: Arc<Level>, range: R)
74+
async fn str_range<Q, R>(self: Immutable, range: R)
3475
where
3576
String: Borrow<Q>,
3677
Q: Ord + Send + Sync + ?Sized,
@@ -45,7 +86,7 @@ impl Level {
4586

4687
/// Build a static stream that yields expire key and key for the secondary expiration index
4788
#[futures_async_stream::try_stream(boxed, ok = MapKV<ExpireKey>, error = io::Error)]
48-
async fn expire_range<Q, R>(self: Arc<Level>, range: R)
89+
async fn expire_range<Q, R>(self: Immutable, range: R)
4990
where
5091
ExpireKey: Borrow<Q>,
5192
Q: Ord + Send + Sync + ?Sized,
@@ -60,7 +101,7 @@ impl Level {
60101
}
61102

62103
#[async_trait::async_trait]
63-
impl MapApiRO<String> for Arc<Level> {
104+
impl MapApiRO<String> for Immutable {
64105
async fn get<Q>(&self, key: &Q) -> Result<Marked<<String as MapKey>::V>, io::Error>
65106
where
66107
String: Borrow<Q>,
@@ -78,7 +119,7 @@ impl MapApiRO<String> for Arc<Level> {
78119
}
79120

80121
#[async_trait::async_trait]
81-
impl MapApiRO<ExpireKey> for Arc<Level> {
122+
impl MapApiRO<ExpireKey> for Immutable {
82123
async fn get<Q>(&self, key: &Q) -> Result<MarkedOf<ExpireKey>, io::Error>
83124
where
84125
ExpireKey: Borrow<Q>,

src/meta/raft-store/src/sm_v002/leveled_store/static_levels.rs src/meta/raft-store/src/sm_v002/leveled_store/immutable_levels.rs

+12-12
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
use std::borrow::Borrow;
1616
use std::io;
1717
use std::ops::RangeBounds;
18-
use std::sync::Arc;
1918

19+
use crate::sm_v002::leveled_store::immutable::Immutable;
2020
use crate::sm_v002::leveled_store::level::Level;
2121
use crate::sm_v002::leveled_store::map_api::compacted_get;
2222
use crate::sm_v002::leveled_store::map_api::compacted_range;
@@ -28,20 +28,20 @@ use crate::sm_v002::marked::Marked;
2828

2929
/// A readonly leveled map that owns the data.
3030
#[derive(Debug, Default, Clone)]
31-
pub struct StaticLevels {
31+
pub struct ImmutableLevels {
3232
/// From oldest to newest, i.e., levels[0] is the oldest
33-
levels: Vec<Arc<Level>>,
33+
levels: Vec<Immutable>,
3434
}
3535

36-
impl StaticLevels {
37-
pub(in crate::sm_v002) fn new(levels: impl IntoIterator<Item = Arc<Level>>) -> Self {
36+
impl ImmutableLevels {
37+
pub(in crate::sm_v002) fn new(levels: impl IntoIterator<Item = Immutable>) -> Self {
3838
Self {
3939
levels: levels.into_iter().collect(),
4040
}
4141
}
4242

4343
/// Return an iterator of all Arc of levels from newest to oldest.
44-
pub(in crate::sm_v002) fn iter_arc_levels(&self) -> impl Iterator<Item = &Arc<Level>> {
44+
pub(in crate::sm_v002) fn iter_immutable_levels(&self) -> impl Iterator<Item = &Immutable> {
4545
self.levels.iter().rev()
4646
}
4747

@@ -50,11 +50,11 @@ impl StaticLevels {
5050
self.levels.iter().map(|x| x.as_ref()).rev()
5151
}
5252

53-
pub(in crate::sm_v002) fn newest(&self) -> Option<&Arc<Level>> {
53+
pub(in crate::sm_v002) fn newest(&self) -> Option<&Immutable> {
5454
self.levels.last()
5555
}
5656

57-
pub(in crate::sm_v002) fn push(&mut self, level: Arc<Level>) {
57+
pub(in crate::sm_v002) fn push(&mut self, level: Immutable) {
5858
self.levels.push(level);
5959
}
6060

@@ -69,24 +69,24 @@ impl StaticLevels {
6969
}
7070

7171
#[async_trait::async_trait]
72-
impl<K> MapApiRO<K> for StaticLevels
72+
impl<K> MapApiRO<K> for ImmutableLevels
7373
where
7474
K: MapKey,
7575
Level: MapApiRO<K>,
76-
Arc<Level>: MapApiRO<K>,
76+
Immutable: MapApiRO<K>,
7777
{
7878
async fn get<Q>(&self, key: &Q) -> Result<Marked<K::V>, io::Error>
7979
where
8080
K: Borrow<Q>,
8181
Q: Ord + Send + Sync + ?Sized,
8282
{
83-
let levels = self.iter_arc_levels();
83+
let levels = self.iter_immutable_levels();
8484
compacted_get(key, levels).await
8585
}
8686

8787
async fn range<R>(&self, range: R) -> Result<KVResultStream<K>, io::Error>
8888
where R: RangeBounds<K> + Clone + Send + Sync + 'static {
89-
let levels = self.iter_arc_levels();
89+
let levels = self.iter_immutable_levels();
9090
compacted_range::<_, _, _, Level>(range, None, levels).await
9191
}
9292
}

0 commit comments

Comments
 (0)