Skip to content

Commit 13ce09e

Browse files
authored
chore: KV Router start using Store instead of etcd (#3945)
Signed-off-by: Graham King <[email protected]>
1 parent eb3a486 commit 13ce09e

File tree

5 files changed

+53
-65
lines changed

5 files changed

+53
-65
lines changed

lib/llm/src/discovery/model_manager.rs

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@ use std::{
99
use parking_lot::{Mutex, RwLock};
1010
use tokio::sync::oneshot;
1111

12-
use dynamo_runtime::component::{Component, Endpoint};
1312
use dynamo_runtime::prelude::DistributedRuntimeProvider;
13+
use dynamo_runtime::{
14+
component::{Component, Endpoint},
15+
storage::key_value_store::Key,
16+
};
1417

1518
use crate::{
1619
discovery::KV_ROUTERS_ROOT_PATH,
@@ -309,24 +312,15 @@ impl ModelManager {
309312
return Ok(kv_chooser);
310313
}
311314

312-
// Create new KV router with etcd registration
313-
let etcd_client = component
314-
.drt()
315-
.etcd_client()
316-
.ok_or_else(|| anyhow::anyhow!("KV routing requires etcd (dynamic mode)"))?;
315+
let store = component.drt().store();
316+
let router_bucket = store
317+
.get_or_create_bucket(KV_ROUTERS_ROOT_PATH, None)
318+
.await?;
317319
let router_uuid = uuid::Uuid::new_v4();
318-
let router_key = format!(
319-
"{}/{}/{}",
320-
KV_ROUTERS_ROOT_PATH,
321-
component.path(),
322-
router_uuid
323-
);
324-
etcd_client
325-
.kv_create(
326-
&router_key,
327-
serde_json::to_vec_pretty(&kv_router_config.unwrap_or_default())?,
328-
None, // use primary lease
329-
)
320+
let router_key = Key::from_raw(format!("{}/{router_uuid}", component.path()));
321+
let json_router_config = serde_json::to_vec_pretty(&kv_router_config.unwrap_or_default())?;
322+
router_bucket
323+
.insert(&router_key, json_router_config.into(), 0)
330324
.await?;
331325

332326
let selector = Box::new(DefaultWorkerSelector::new(kv_router_config));

lib/runtime/src/storage/key_value_store.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -290,10 +290,10 @@ impl KeyValueStoreManager {
290290
key: &Key,
291291
obj: &mut T,
292292
) -> anyhow::Result<StoreOutcome> {
293-
let obj_json = serde_json::to_string(obj)?;
293+
let obj_json = serde_json::to_vec(obj)?;
294294
let bucket = self.0.get_or_create_bucket(bucket_name, bucket_ttl).await?;
295295

296-
let outcome = bucket.insert(key, &obj_json, obj.revision()).await?;
296+
let outcome = bucket.insert(key, obj_json.into(), obj.revision()).await?;
297297

298298
match outcome {
299299
StoreOutcome::Created(revision) | StoreOutcome::Exists(revision) => {
@@ -313,7 +313,7 @@ pub trait KeyValueBucket: Send + Sync {
313313
async fn insert(
314314
&self,
315315
key: &Key,
316-
value: &str,
316+
value: bytes::Bytes,
317317
revision: u64,
318318
) -> Result<StoreOutcome, StoreError>;
319319

@@ -434,14 +434,14 @@ mod tests {
434434
let s2 = Arc::clone(&s);
435435

436436
let bucket = s.get_or_create_bucket(BUCKET_NAME, None).await?;
437-
let res = bucket.insert(&"test1".into(), "value1", 0).await?;
437+
let res = bucket.insert(&"test1".into(), "value1".into(), 0).await?;
438438
assert_eq!(res, StoreOutcome::Created(0));
439439

440440
let mut expected = Vec::with_capacity(3);
441441
for i in 1..=3 {
442442
let item = WatchEvent::Put(KeyValue::new(
443443
format!("test{i}"),
444-
bytes::Bytes::from(format!("value{i}").into_bytes()),
444+
format!("value{i}").into(),
445445
));
446446
expected.push(item);
447447
}
@@ -472,18 +472,18 @@ mod tests {
472472
// wouldn't be testing the watch behavior.
473473
got_first_rx.await?;
474474

475-
let res = bucket.insert(&"test2".into(), "value2", 0).await?;
475+
let res = bucket.insert(&"test2".into(), "value2".into(), 0).await?;
476476
assert_eq!(res, StoreOutcome::Created(0));
477477

478478
// Repeat a key and revision. Ignored.
479-
let res = bucket.insert(&"test2".into(), "value2", 0).await?;
479+
let res = bucket.insert(&"test2".into(), "value2".into(), 0).await?;
480480
assert_eq!(res, StoreOutcome::Exists(0));
481481

482482
// Increment revision
483-
let res = bucket.insert(&"test2".into(), "value2", 1).await?;
483+
let res = bucket.insert(&"test2".into(), "value2".into(), 1).await?;
484484
assert_eq!(res, StoreOutcome::Created(1));
485485

486-
let res = bucket.insert(&"test3".into(), "value3", 0).await?;
486+
let res = bucket.insert(&"test3".into(), "value3".into(), 0).await?;
487487
assert_eq!(res, StoreOutcome::Created(0));
488488

489489
// ingress exits once it has received all values
@@ -500,7 +500,7 @@ mod tests {
500500
let bucket: &'static _ =
501501
Box::leak(Box::new(s.get_or_create_bucket(BUCKET_NAME, None).await?));
502502

503-
let res = bucket.insert(&"test1".into(), "value1", 0).await?;
503+
let res = bucket.insert(&"test1".into(), "value1".into(), 0).await?;
504504
assert_eq!(res, StoreOutcome::Created(0));
505505

506506
let stream = bucket.watch().await?;
@@ -509,10 +509,7 @@ mod tests {
509509
let mut rx1 = tap.subscribe();
510510
let mut rx2 = tap.subscribe();
511511

512-
let item = WatchEvent::Put(KeyValue::new(
513-
"test1".to_string(),
514-
bytes::Bytes::from(b"GK".as_slice()),
515-
));
512+
let item = WatchEvent::Put(KeyValue::new("test1".to_string(), "GK".into()));
516513
let item_clone = item.clone();
517514
let handle1 = tokio::spawn(async move {
518515
let b = rx1.recv().await.unwrap();
@@ -523,7 +520,7 @@ mod tests {
523520
assert_eq!(b, item);
524521
});
525522

526-
bucket.insert(&"test1".into(), "GK", 1).await?;
523+
bucket.insert(&"test1".into(), "GK".into(), 1).await?;
527524

528525
let _ = futures::join!(handle1, handle2);
529526
Ok(())

lib/runtime/src/storage/key_value_store/etcd.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ impl KeyValueBucket for EtcdBucket {
6666
async fn insert(
6767
&self,
6868
key: &Key,
69-
value: &str,
69+
value: bytes::Bytes,
7070
// "version" in etcd speak. revision is a global cluster-wide value
7171
revision: u64,
7272
) -> Result<StoreOutcome, StoreError> {
@@ -169,7 +169,11 @@ impl KeyValueBucket for EtcdBucket {
169169
}
170170

171171
impl EtcdBucket {
172-
async fn create(&self, key: &Key, value: &str) -> Result<StoreOutcome, StoreError> {
172+
async fn create(
173+
&self,
174+
key: &Key,
175+
value: impl Into<Vec<u8>>,
176+
) -> Result<StoreOutcome, StoreError> {
173177
let k = make_key(&self.bucket_name, key);
174178
tracing::trace!("etcd create: {k}");
175179

@@ -215,7 +219,7 @@ impl EtcdBucket {
215219
async fn update(
216220
&self,
217221
key: &Key,
218-
value: &str,
222+
value: impl AsRef<[u8]>,
219223
revision: u64,
220224
) -> Result<StoreOutcome, StoreError> {
221225
let version = revision;
@@ -328,7 +332,7 @@ mod concurrent_create_tests {
328332
let result = bucket_clone
329333
.lock()
330334
.await
331-
.insert(&key_clone, &value_clone, 0)
335+
.insert(&key_clone, value_clone.into(), 0)
332336
.await;
333337

334338
match result {

lib/runtime/src/storage/key_value_store/mem.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use super::{KeyValueBucket, KeyValueStore, StoreError, StoreOutcome};
1717

1818
#[derive(Clone, Debug)]
1919
enum MemoryEvent {
20-
Put { key: String, value: String },
20+
Put { key: String, value: bytes::Bytes },
2121
Delete { key: String },
2222
}
2323

@@ -45,7 +45,7 @@ pub struct MemoryBucketRef {
4545
}
4646

4747
struct MemoryBucket {
48-
data: HashMap<String, (u64, String)>,
48+
data: HashMap<String, (u64, bytes::Bytes)>,
4949
}
5050

5151
impl MemoryBucket {
@@ -114,7 +114,7 @@ impl KeyValueBucket for MemoryBucketRef {
114114
async fn insert(
115115
&self,
116116
key: &Key,
117-
value: &str,
117+
value: bytes::Bytes,
118118
revision: u64,
119119
) -> Result<StoreOutcome, StoreError> {
120120
let mut locked_data = self.inner.data.lock();
@@ -124,10 +124,10 @@ impl KeyValueBucket for MemoryBucketRef {
124124
};
125125
let outcome = match bucket.data.entry(key.to_string()) {
126126
Entry::Vacant(e) => {
127-
e.insert((revision, value.to_string()));
127+
e.insert((revision, value.clone()));
128128
let _ = self.inner.change_sender.send(MemoryEvent::Put {
129129
key: key.to_string(),
130-
value: value.to_string(),
130+
value,
131131
});
132132
StoreOutcome::Created(revision)
133133
}
@@ -136,7 +136,7 @@ impl KeyValueBucket for MemoryBucketRef {
136136
if *rev == revision {
137137
StoreOutcome::Exists(revision)
138138
} else {
139-
entry.insert((revision, value.to_string()));
139+
entry.insert((revision, value));
140140
StoreOutcome::Created(revision)
141141
}
142142
}
@@ -149,10 +149,7 @@ impl KeyValueBucket for MemoryBucketRef {
149149
let Some(bucket) = locked_data.get(&self.name) else {
150150
return Ok(None);
151151
};
152-
Ok(bucket
153-
.data
154-
.get(&key.0)
155-
.map(|(_, v)| bytes::Bytes::from(v.clone())))
152+
Ok(bucket.data.get(&key.0).map(|(_, v)| v.clone()))
156153
}
157154

158155
async fn delete(&self, key: &Key) -> Result<(), StoreError> {
@@ -183,7 +180,7 @@ impl KeyValueBucket for MemoryBucketRef {
183180
};
184181
for (key, (_rev, v)) in &bucket.data {
185182
seen_keys.insert(key.clone());
186-
let item = KeyValue::new(key.clone(), bytes::Bytes::from(v.clone().into_bytes()));
183+
let item = KeyValue::new(key.clone(), v.clone());
187184
existing_items.push(WatchEvent::Put(item));
188185
}
189186
drop(data_lock);
@@ -204,7 +201,7 @@ impl KeyValueBucket for MemoryBucketRef {
204201
if seen_keys.contains(&key) {
205202
continue;
206203
}
207-
let item = KeyValue::new(key, bytes::Bytes::from(value));
204+
let item = KeyValue::new(key, value);
208205
yield WatchEvent::Put(item);
209206
},
210207
Some(MemoryEvent::Delete { key }) => {
@@ -222,7 +219,7 @@ impl KeyValueBucket for MemoryBucketRef {
222219
Some(bucket) => Ok(bucket
223220
.data
224221
.iter()
225-
.map(|(k, (_rev, v))| (k.to_string(), bytes::Bytes::from(v.clone())))
222+
.map(|(k, (_rev, v))| (k.to_string(), v.clone()))
226223
.collect()),
227224
None => Err(StoreError::MissingBucket(self.name.clone())),
228225
}

lib/runtime/src/storage/key_value_store/nats.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ impl KeyValueBucket for NATSBucket {
119119
async fn insert(
120120
&self,
121121
key: &Key,
122-
value: &str,
122+
value: bytes::Bytes,
123123
revision: u64,
124124
) -> Result<StoreOutcome, StoreError> {
125125
if revision == 0 {
@@ -195,8 +195,8 @@ impl KeyValueBucket for NATSBucket {
195195
}
196196

197197
impl NATSBucket {
198-
async fn create(&self, key: &Key, value: &str) -> Result<StoreOutcome, StoreError> {
199-
match self.nats_store.create(&key, value.to_string().into()).await {
198+
async fn create(&self, key: &Key, value: bytes::Bytes) -> Result<StoreOutcome, StoreError> {
199+
match self.nats_store.create(&key, value).await {
200200
Ok(revision) => Ok(StoreOutcome::Created(revision)),
201201
Err(err) if err.kind() == async_nats::jetstream::kv::CreateErrorKind::AlreadyExists => {
202202
// key exists, get the revsion
@@ -219,14 +219,10 @@ impl NATSBucket {
219219
async fn update(
220220
&self,
221221
key: &Key,
222-
value: &str,
222+
value: bytes::Bytes,
223223
revision: u64,
224224
) -> Result<StoreOutcome, StoreError> {
225-
match self
226-
.nats_store
227-
.update(key, value.to_string().into(), revision)
228-
.await
229-
{
225+
match self.nats_store.update(key, value.clone(), revision).await {
230226
Ok(revision) => Ok(StoreOutcome::Created(revision)),
231227
Err(err)
232228
if err.kind() == async_nats::jetstream::kv::UpdateErrorKind::WrongLastRevision =>
@@ -240,16 +236,16 @@ impl NATSBucket {
240236

241237
/// We have the wrong revision for a key. Fetch it's entry to get the correct revision,
242238
/// and try the update again.
243-
async fn resync_update(&self, key: &Key, value: &str) -> Result<StoreOutcome, StoreError> {
239+
async fn resync_update(
240+
&self,
241+
key: &Key,
242+
value: bytes::Bytes,
243+
) -> Result<StoreOutcome, StoreError> {
244244
match self.nats_store.entry(key).await {
245245
Ok(Some(entry)) => {
246246
// Re-try the update with new version number
247247
let next_rev = entry.revision + 1;
248-
match self
249-
.nats_store
250-
.update(key, value.to_string().into(), next_rev)
251-
.await
252-
{
248+
match self.nats_store.update(key, value, next_rev).await {
253249
Ok(correct_revision) => Ok(StoreOutcome::Created(correct_revision)),
254250
Err(err) => Err(StoreError::NATSError(format!(
255251
"Error during update of key {key} after resync: {err}"

0 commit comments

Comments
 (0)