Skip to content

Commit e2c90e6

Browse files
committed
Rename things to progress code review
`*Storage` -> `*Store` `storage_client` -> `store` `etcd_root` -> `instance_root` Add a comment explaining that although `store` is `dyn KeyValueStore` right now, it has ambitions to be the one-ring of Dynamo storage. Signed-off-by: Graham King <[email protected]>
1 parent c882f29 commit e2c90e6

File tree

11 files changed

+150
-150
lines changed

11 files changed

+150
-150
lines changed

lib/llm/src/http/service/health.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,10 @@ async fn live_handler(
5252
async fn health_handler(
5353
axum::extract::State(state): axum::extract::State<Arc<service_v2::State>>,
5454
) -> impl IntoResponse {
55-
let instances = match list_all_instances(state.storage_client()).await {
55+
let instances = match list_all_instances(state.store()).await {
5656
Ok(instances) => instances,
5757
Err(err) => {
58-
tracing::warn!(%err, "Failed to fetch instances from storage");
58+
tracing::warn!(%err, "Failed to fetch instances from store");
5959
vec![]
6060
}
6161
};

lib/llm/src/http/service/service_v2.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ use anyhow::Result;
1919
use axum_server::tls_rustls::RustlsConfig;
2020
use derive_builder::Builder;
2121
use dynamo_runtime::logging::make_request_span;
22-
use dynamo_runtime::storage::key_value_store::EtcdStorage;
22+
use dynamo_runtime::storage::key_value_store::EtcdStore;
2323
use dynamo_runtime::storage::key_value_store::KeyValueStore;
24-
use dynamo_runtime::storage::key_value_store::MemoryStorage;
24+
use dynamo_runtime::storage::key_value_store::MemoryStore;
2525
use dynamo_runtime::transports::etcd;
2626
use std::net::SocketAddr;
2727
use tokio::task::JoinHandle;
@@ -33,7 +33,7 @@ pub struct State {
3333
metrics: Arc<Metrics>,
3434
manager: Arc<ModelManager>,
3535
etcd_client: Option<etcd::Client>,
36-
storage_client: Arc<dyn KeyValueStore>,
36+
store: Arc<dyn KeyValueStore>,
3737
flags: StateFlags,
3838
}
3939

@@ -79,7 +79,7 @@ impl State {
7979
manager,
8080
metrics: Arc::new(Metrics::default()),
8181
etcd_client: None,
82-
storage_client: Arc::new(MemoryStorage::new()),
82+
store: Arc::new(MemoryStore::new()),
8383
flags: StateFlags {
8484
chat_endpoints_enabled: AtomicBool::new(false),
8585
cmpl_endpoints_enabled: AtomicBool::new(false),
@@ -93,7 +93,7 @@ impl State {
9393
Self {
9494
manager,
9595
metrics: Arc::new(Metrics::default()),
96-
storage_client: Arc::new(EtcdStorage::new(etcd_client.clone())),
96+
store: Arc::new(EtcdStore::new(etcd_client.clone())),
9797
etcd_client: Some(etcd_client),
9898
flags: StateFlags {
9999
chat_endpoints_enabled: AtomicBool::new(false),
@@ -120,8 +120,8 @@ impl State {
120120
self.etcd_client.as_ref()
121121
}
122122

123-
pub fn storage_client(&self) -> Arc<dyn KeyValueStore> {
124-
self.storage_client.clone()
123+
pub fn store(&self) -> Arc<dyn KeyValueStore> {
124+
self.store.clone()
125125
}
126126

127127
// TODO

lib/llm/src/local_model.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use dynamo_runtime::storage::key_value_store::Key;
1212
use dynamo_runtime::traits::DistributedRuntimeProvider;
1313
use dynamo_runtime::{
1414
component::Endpoint,
15-
storage::key_value_store::{EtcdStorage, KeyValueStore, KeyValueStoreManager},
15+
storage::key_value_store::{EtcdStore, KeyValueStore, KeyValueStoreManager},
1616
};
1717

1818
use crate::entrypoint::RouterConfig;
@@ -409,7 +409,7 @@ impl LocalModel {
409409
self.card.move_to_nats(nats_client.clone()).await?;
410410

411411
// Publish the Model Deployment Card to KV store
412-
let kvstore: Box<dyn KeyValueStore> = Box::new(EtcdStorage::new(etcd_client.clone()));
412+
let kvstore: Box<dyn KeyValueStore> = Box::new(EtcdStore::new(etcd_client.clone()));
413413
let card_store = Arc::new(KeyValueStoreManager::new(kvstore));
414414
let lease_id = endpoint.drt().primary_lease().map(|l| l.id()).unwrap_or(0);
415415
let key = Key::from_raw(endpoint.unique_path(lease_id));

lib/llm/src/model_card.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use anyhow::{Context, Result};
2323
use derive_builder::Builder;
2424
use dynamo_runtime::DistributedRuntime;
2525
use dynamo_runtime::storage::key_value_store::{
26-
EtcdStorage, Key, KeyValueStore, KeyValueStoreManager,
26+
EtcdStore, Key, KeyValueStore, KeyValueStoreManager,
2727
};
2828
use dynamo_runtime::{slug::Slug, storage::key_value_store::Versioned, transports::nats};
2929
use serde::{Deserialize, Serialize};
@@ -457,7 +457,7 @@ impl ModelDeploymentCard {
457457
// Should be impossible because we only get here on an etcd event
458458
anyhow::bail!("Missing etcd_client");
459459
};
460-
let store: Box<dyn KeyValueStore> = Box::new(EtcdStorage::new(etcd_client));
460+
let store: Box<dyn KeyValueStore> = Box::new(EtcdStore::new(etcd_client));
461461
let card_store = Arc::new(KeyValueStoreManager::new(store));
462462
let Some(mut card) = card_store
463463
.load::<ModelDeploymentCard>(ROOT_PATH, mdc_key)

lib/runtime/src/component.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ pub mod service;
7272

7373
pub use client::{Client, InstanceSource};
7474

75-
/// The root etcd path where each instance registers itself in etcd.
75+
/// The root key-value path where each instance registers itself in.
7676
/// An instance is namespace+component+endpoint+lease_id and must be unique.
7777
pub const INSTANCE_ROOT_PATH: &str = "v1/instances";
7878

@@ -223,8 +223,8 @@ impl MetricsRegistry for Component {
223223
}
224224

225225
impl Component {
226-
/// The component part of an instance path in etcd.
227-
pub fn etcd_root(&self) -> String {
226+
/// The component part of an instance path in key-value store.
227+
pub fn instance_root(&self) -> String {
228228
let ns = self.namespace.name();
229229
let cp = &self.name;
230230
format!("{INSTANCE_ROOT_PATH}/{ns}/{cp}")
@@ -266,8 +266,8 @@ impl Component {
266266
}
267267

268268
pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
269-
let client = self.drt.storage_client();
270-
let Some(bucket) = client.get_bucket(&self.etcd_root()).await? else {
269+
let client = self.drt.store();
270+
let Some(bucket) = client.get_bucket(&self.instance_root()).await? else {
271271
return Ok(vec![]);
272272
};
273273
let entries = bucket.entries().await?;
@@ -467,7 +467,7 @@ impl Endpoint {
467467

468468
/// The endpoint part of an instance path in etcd
469469
pub fn etcd_root(&self) -> String {
470-
let component_path = self.component.etcd_root();
470+
let component_path = self.component.instance_root();
471471
let endpoint_name = &self.name;
472472
format!("{component_path}/{endpoint_name}")
473473
}

lib/runtime/src/distributed.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
pub use crate::component::Component;
5-
use crate::storage::key_value_store::{EtcdStorage, KeyValueStore, MemoryStorage};
5+
use crate::storage::key_value_store::{EtcdStore, KeyValueStore, MemoryStore};
66
use crate::transports::nats::DRTNatsClientPrometheusMetrics;
77
use crate::{
88
ErrorContext, RuntimeCallback,
@@ -45,15 +45,14 @@ impl DistributedRuntime {
4545

4646
let runtime_clone = runtime.clone();
4747

48-
let (etcd_client, storage_client) = if is_static {
49-
let storage_client: Arc<dyn KeyValueStore> = Arc::new(MemoryStorage::new());
50-
(None, storage_client)
48+
let (etcd_client, store) = if is_static {
49+
let store: Arc<dyn KeyValueStore> = Arc::new(MemoryStore::new());
50+
(None, store)
5151
} else {
5252
let etcd_client = etcd::Client::new(etcd_config.clone(), runtime_clone).await?;
53-
let storage_client: Arc<dyn KeyValueStore> =
54-
Arc::new(EtcdStorage::new(etcd_client.clone()));
53+
let store: Arc<dyn KeyValueStore> = Arc::new(EtcdStore::new(etcd_client.clone()));
5554

56-
(Some(etcd_client), storage_client)
55+
(Some(etcd_client), store)
5756
};
5857

5958
let nats_client = nats_config.clone().connect().await?;
@@ -83,7 +82,7 @@ impl DistributedRuntime {
8382
let distributed_runtime = Self {
8483
runtime,
8584
etcd_client,
86-
storage_client,
85+
store,
8786
nats_client,
8887
tcp_server: Arc::new(OnceCell::new()),
8988
system_status_server: Arc::new(OnceLock::new()),
@@ -278,8 +277,9 @@ impl DistributedRuntime {
278277
}
279278

280279
/// An interface to store things. Will eventually replace `etcd_client`.
281-
pub fn storage_client(&self) -> Arc<dyn KeyValueStore> {
282-
self.storage_client.clone()
280+
/// Currently does key-value, but will grow to include whatever we need to store.
281+
pub fn store(&self) -> Arc<dyn KeyValueStore> {
282+
self.store.clone()
283283
}
284284

285285
pub fn child_token(&self) -> CancellationToken {

lib/runtime/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ pub struct DistributedRuntime {
154154
// we might consider a unifed transport manager here
155155
etcd_client: Option<transports::etcd::Client>,
156156
nats_client: transports::nats::Client,
157-
storage_client: Arc<dyn KeyValueStore>,
157+
store: Arc<dyn KeyValueStore>,
158158
tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
159159
system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
160160

lib/runtime/src/storage/key_value_store.rs

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ use futures::StreamExt;
1717
use serde::{Deserialize, Serialize};
1818

1919
mod mem;
20-
pub use mem::MemoryStorage;
20+
pub use mem::MemoryStore;
2121
mod nats;
22-
pub use nats::NATSStorage;
22+
pub use nats::NATSStore;
2323
mod etcd;
24-
pub use etcd::EtcdStorage;
24+
pub use etcd::EtcdStore;
2525

2626
/// A key that is safe to use directly in the KV store.
2727
#[derive(Debug, Clone, PartialEq)]
@@ -69,12 +69,12 @@ pub trait KeyValueStore: Send + Sync {
6969
bucket_name: &str,
7070
// auto-delete items older than this
7171
ttl: Option<Duration>,
72-
) -> Result<Box<dyn KeyValueBucket>, StorageError>;
72+
) -> Result<Box<dyn KeyValueBucket>, StoreError>;
7373

7474
async fn get_bucket(
7575
&self,
7676
bucket_name: &str,
77-
) -> Result<Option<Box<dyn KeyValueBucket>>, StorageError>;
77+
) -> Result<Option<Box<dyn KeyValueBucket>>, StoreError>;
7878

7979
fn connection_id(&self) -> u64;
8080
}
@@ -90,7 +90,7 @@ impl KeyValueStoreManager {
9090
&self,
9191
bucket: &str,
9292
key: &Key,
93-
) -> Result<Option<T>, StorageError> {
93+
) -> Result<Option<T>, StoreError> {
9494
let Some(bucket) = self.0.get_bucket(bucket).await? else {
9595
// No bucket means no cards
9696
return Ok(None);
@@ -103,7 +103,7 @@ impl KeyValueStoreManager {
103103
Ok(None) => Ok(None),
104104
Err(err) => {
105105
// TODO look at what errors NATS can give us and make more specific wrappers
106-
Err(StorageError::NATSError(err.to_string()))
106+
Err(StoreError::NATSError(err.to_string()))
107107
}
108108
}
109109
}
@@ -116,7 +116,7 @@ impl KeyValueStoreManager {
116116
bucket_name: &str,
117117
bucket_ttl: Option<Duration>,
118118
) -> (
119-
tokio::task::JoinHandle<Result<(), StorageError>>,
119+
tokio::task::JoinHandle<Result<(), StoreError>>,
120120
tokio::sync::mpsc::UnboundedReceiver<T>,
121121
) {
122122
let bucket_name = bucket_name.to_string();
@@ -141,7 +141,7 @@ impl KeyValueStoreManager {
141141
let _ = tx.send(card);
142142
}
143143

144-
Ok::<(), StorageError>(())
144+
Ok::<(), StoreError>(())
145145
});
146146
(watch_task, rx)
147147
}
@@ -152,14 +152,14 @@ impl KeyValueStoreManager {
152152
bucket_ttl: Option<Duration>,
153153
key: &Key,
154154
obj: &mut T,
155-
) -> anyhow::Result<StorageOutcome> {
155+
) -> anyhow::Result<StoreOutcome> {
156156
let obj_json = serde_json::to_string(obj)?;
157157
let bucket = self.0.get_or_create_bucket(bucket_name, bucket_ttl).await?;
158158

159159
let outcome = bucket.insert(key, &obj_json, obj.revision()).await?;
160160

161161
match outcome {
162-
StorageOutcome::Created(revision) | StorageOutcome::Exists(revision) => {
162+
StoreOutcome::Created(revision) | StoreOutcome::Exists(revision) => {
163163
obj.set_revision(revision);
164164
}
165165
}
@@ -178,43 +178,43 @@ pub trait KeyValueBucket: Send {
178178
key: &Key,
179179
value: &str,
180180
revision: u64,
181-
) -> Result<StorageOutcome, StorageError>;
181+
) -> Result<StoreOutcome, StoreError>;
182182

183183
/// Fetch an item from the key-value storage
184-
async fn get(&self, key: &Key) -> Result<Option<bytes::Bytes>, StorageError>;
184+
async fn get(&self, key: &Key) -> Result<Option<bytes::Bytes>, StoreError>;
185185

186186
/// Delete an item from the bucket
187-
async fn delete(&self, key: &Key) -> Result<(), StorageError>;
187+
async fn delete(&self, key: &Key) -> Result<(), StoreError>;
188188

189189
/// A stream of items inserted into the bucket.
190190
/// Every time the stream is polled it will either return a newly created entry, or block until
191191
/// such time.
192192
async fn watch(
193193
&self,
194-
) -> Result<Pin<Box<dyn futures::Stream<Item = bytes::Bytes> + Send + 'life0>>, StorageError>;
194+
) -> Result<Pin<Box<dyn futures::Stream<Item = bytes::Bytes> + Send + 'life0>>, StoreError>;
195195

196-
async fn entries(&self) -> Result<HashMap<String, bytes::Bytes>, StorageError>;
196+
async fn entries(&self) -> Result<HashMap<String, bytes::Bytes>, StoreError>;
197197
}
198198

199199
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
200-
pub enum StorageOutcome {
200+
pub enum StoreOutcome {
201201
/// The operation succeeded and created a new entry with this revision.
202202
/// Note that "create" also means update, because each new revision is a "create".
203203
Created(u64),
204204
/// The operation did not do anything, the value was already present, with this revision.
205205
Exists(u64),
206206
}
207-
impl fmt::Display for StorageOutcome {
207+
impl fmt::Display for StoreOutcome {
208208
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209209
match self {
210-
StorageOutcome::Created(revision) => write!(f, "Created at {revision}"),
211-
StorageOutcome::Exists(revision) => write!(f, "Exists at {revision}"),
210+
StoreOutcome::Created(revision) => write!(f, "Created at {revision}"),
211+
StoreOutcome::Exists(revision) => write!(f, "Exists at {revision}"),
212212
}
213213
}
214214
}
215215

216216
#[derive(thiserror::Error, Debug)]
217-
pub enum StorageError {
217+
pub enum StoreError {
218218
#[error("Could not find bucket '{0}'")]
219219
MissingBucket(String),
220220

@@ -293,12 +293,12 @@ mod tests {
293293
async fn test_memory_storage() -> anyhow::Result<()> {
294294
init();
295295

296-
let s = Arc::new(MemoryStorage::new());
296+
let s = Arc::new(MemoryStore::new());
297297
let s2 = Arc::clone(&s);
298298

299299
let bucket = s.get_or_create_bucket(BUCKET_NAME, None).await?;
300300
let res = bucket.insert(&"test1".into(), "value1", 0).await?;
301-
assert_eq!(res, StorageOutcome::Created(0));
301+
assert_eq!(res, StoreOutcome::Created(0));
302302

303303
let (got_first_tx, got_first_rx) = tokio::sync::oneshot::channel();
304304
let ingress = tokio::spawn(async move {
@@ -317,27 +317,27 @@ mod tests {
317317
let v = stream.next().await.unwrap();
318318
assert_eq!(v, "value3".as_bytes());
319319

320-
Ok::<_, StorageError>(())
320+
Ok::<_, StoreError>(())
321321
});
322322

323-
// MemoryStorage uses a HashMap with no inherent ordering, so we must ensure test1 is
323+
// MemoryStore uses a HashMap with no inherent ordering, so we must ensure test1 is
324324
// fetched before test2 is inserted, otherwise they can come out in any order, and we
325325
// wouldn't be testing the watch behavior.
326326
got_first_rx.await?;
327327

328328
let res = bucket.insert(&"test2".into(), "value2", 0).await?;
329-
assert_eq!(res, StorageOutcome::Created(0));
329+
assert_eq!(res, StoreOutcome::Created(0));
330330

331331
// Repeat a key and revision. Ignored.
332332
let res = bucket.insert(&"test2".into(), "value2", 0).await?;
333-
assert_eq!(res, StorageOutcome::Exists(0));
333+
assert_eq!(res, StoreOutcome::Exists(0));
334334

335335
// Increment revision
336336
let res = bucket.insert(&"test2".into(), "value2", 1).await?;
337-
assert_eq!(res, StorageOutcome::Created(1));
337+
assert_eq!(res, StoreOutcome::Created(1));
338338

339339
let res = bucket.insert(&"test3".into(), "value3", 0).await?;
340-
assert_eq!(res, StorageOutcome::Created(0));
340+
assert_eq!(res, StoreOutcome::Created(0));
341341

342342
// ingress exits once it has received all values
343343
let _ = ingress.await?;
@@ -349,12 +349,12 @@ mod tests {
349349
async fn test_broadcast_stream() -> anyhow::Result<()> {
350350
init();
351351

352-
let s: &'static _ = Box::leak(Box::new(MemoryStorage::new()));
352+
let s: &'static _ = Box::leak(Box::new(MemoryStore::new()));
353353
let bucket: &'static _ =
354354
Box::leak(Box::new(s.get_or_create_bucket(BUCKET_NAME, None).await?));
355355

356356
let res = bucket.insert(&"test1".into(), "value1", 0).await?;
357-
assert_eq!(res, StorageOutcome::Created(0));
357+
assert_eq!(res, StoreOutcome::Created(0));
358358

359359
let stream = bucket.watch().await?;
360360
let tap = TappableStream::new(stream, 10).await;

0 commit comments

Comments
 (0)