Skip to content

Commit f26a217

Browse files
Exploring shared cache based on multi reflectors
Signed-off-by: Danil-Grigorev <[email protected]>
1 parent 550e50f commit f26a217

File tree

14 files changed

+430
-188
lines changed

14 files changed

+430
-188
lines changed

examples/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ ws = ["kube/ws"]
2222
latest = ["k8s-openapi/latest"]
2323

2424
[dev-dependencies]
25+
parking_lot.workspace = true
2526
tokio-util.workspace = true
2627
assert-json-diff.workspace = true
2728
garde = { version = "0.22.0", default-features = false, features = ["derive"] }

examples/multi_reflector.rs

+100-92
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,80 @@
1-
use futures::{future, StreamExt};
1+
use futures::{future, stream, StreamExt};
22
use k8s_openapi::api::{
33
apps::v1::Deployment,
44
core::v1::{ConfigMap, Secret},
55
};
66
use kube::{
7-
runtime::{
8-
reflector,
9-
reflector::{ObjectRef, Store},
10-
watcher, WatchStreamExt,
11-
},
12-
Api, Client,
7+
api::{ApiResource, DynamicObject, GroupVersionKind},
8+
core::TypedResource,
9+
runtime::{reflector::store::CacheWriter, watcher, WatchStreamExt},
10+
Api, Client, Resource,
1311
};
12+
use parking_lot::RwLock;
13+
use serde::de::DeserializeOwned;
1414
use std::sync::Arc;
1515
use tracing::*;
1616

17-
// This does not work because Resource trait is not dyn safe.
18-
/*
19-
use std::any::TypeId;
2017
use std::collections::HashMap;
21-
use k8s_openapi::NamespaceResourceScope;
22-
use kube::api::{Resource, ResourceExt};
23-
struct MultiStore {
24-
stores: HashMap<TypeId, Store<dyn Resource<DynamicType = (), Scope = NamespaceResourceScope>>>,
25-
}
26-
impl MultiStore {
27-
fn get<K: Resource<DynamicType = ()>>(&self, name: &str, ns: &str) -> Option<Arc<K>> {
28-
let oref = ObjectRef::<K>::new(name).within(ns);
29-
if let Some(store) = self.stores.get(&TypeId::of::<K>()) {
30-
store.get(oref)
31-
} else {
32-
None
33-
}
34-
}
35-
}*/
3618

37-
// explicit store can work
38-
struct MultiStore {
39-
deploys: Store<Deployment>,
40-
cms: Store<ConfigMap>,
41-
secs: Store<Secret>,
19+
type Cache = Arc<RwLock<HashMap<LookupKey, Arc<DynamicObject>>>>;
20+
21+
#[derive(Default, Clone, Hash, PartialEq, Eq, Debug)]
22+
struct LookupKey {
23+
gvk: GroupVersionKind,
24+
name: Option<String>,
25+
namespace: Option<String>,
4226
}
43-
// but using generics to help out won't because the K needs to be concretised
44-
/*
45-
impl MultiStore {
46-
fn get<K: Resource<DynamicType = ()>>(&self, name: &str, ns: &str) -> Option<Arc<Option<K>>> {
47-
let oref = ObjectRef::<K>::new(name).within(ns);
48-
let kind = K::kind(&()).to_owned();
49-
match kind.as_ref() {
50-
"Deployment" => self.deploys.get(&ObjectRef::new(name).within(ns)),
51-
"ConfigMap" => self.cms.get(&ObjectRef::new(name).within(ns)),
52-
"Secret" => self.secs.get(&ObjectRef::new(name).within(ns)),
53-
_ => None,
27+
28+
impl LookupKey {
29+
fn new<R: TypedResource>(resource: &R) -> LookupKey {
30+
let meta = resource.meta();
31+
LookupKey {
32+
gvk: resource.gvk(),
33+
name: meta.name.clone(),
34+
namespace: meta.namespace.clone(),
5435
}
55-
None
5636
}
5737
}
58-
*/
59-
// so left with this
6038

61-
impl MultiStore {
62-
fn get_deploy(&self, name: &str, ns: &str) -> Option<Arc<Deployment>> {
63-
self.deploys.get(&ObjectRef::<Deployment>::new(name).within(ns))
64-
}
39+
#[derive(Default, Clone)]
40+
struct MultiCache {
41+
store: Cache,
42+
}
6543

66-
fn get_secret(&self, name: &str, ns: &str) -> Option<Arc<Secret>> {
67-
self.secs.get(&ObjectRef::<Secret>::new(name).within(ns))
44+
impl MultiCache {
45+
fn get<K: Resource<DynamicType = impl Default> + DeserializeOwned + Clone>(
46+
&self,
47+
name: &str,
48+
ns: &str,
49+
) -> Option<Arc<K>> {
50+
let obj = self
51+
.store
52+
.read()
53+
.get(&LookupKey {
54+
gvk: K::gvk(&Default::default()),
55+
name: Some(name.into()),
56+
namespace: if !ns.is_empty() { Some(ns.into()) } else { None },
57+
})?
58+
.as_ref()
59+
.clone();
60+
obj.try_parse().ok().map(Arc::new)
6861
}
62+
}
6963

70-
fn get_cm(&self, name: &str, ns: &str) -> Option<Arc<ConfigMap>> {
71-
self.cms.get(&ObjectRef::<ConfigMap>::new(name).within(ns))
64+
impl CacheWriter<DynamicObject> for MultiCache {
65+
/// Applies a single watcher event to the store
66+
fn apply_watcher_event(&mut self, event: &watcher::Event<DynamicObject>) {
67+
match event {
68+
watcher::Event::Init | watcher::Event::InitDone => {}
69+
watcher::Event::Delete(obj) => {
70+
self.store.write().remove(&LookupKey::new(obj));
71+
}
72+
watcher::Event::InitApply(obj) | watcher::Event::Apply(obj) => {
73+
self.store
74+
.write()
75+
.insert(LookupKey::new(obj), Arc::new(obj.clone()));
76+
}
77+
}
7278
}
7379
}
7480

@@ -77,60 +83,62 @@ async fn main() -> anyhow::Result<()> {
7783
tracing_subscriber::fmt::init();
7884
let client = Client::try_default().await?;
7985

80-
let deploys: Api<Deployment> = Api::default_namespaced(client.clone());
81-
let cms: Api<ConfigMap> = Api::default_namespaced(client.clone());
82-
let secret: Api<Secret> = Api::default_namespaced(client.clone());
86+
// multistore
87+
let mut combo_stream = stream::select_all(vec![]);
88+
combo_stream.push(
89+
watcher::watcher(
90+
Api::all_with(client.clone(), &ApiResource::erase::<Deployment>(&())),
91+
Default::default(),
92+
)
93+
.boxed(),
94+
);
95+
combo_stream.push(
96+
watcher::watcher(
97+
Api::all_with(client.clone(), &ApiResource::erase::<ConfigMap>(&())),
98+
Default::default(),
99+
)
100+
.boxed(),
101+
);
83102

84-
let (dep_reader, dep_writer) = reflector::store::<Deployment>();
85-
let (cm_reader, cm_writer) = reflector::store::<ConfigMap>();
86-
let (sec_reader, sec_writer) = reflector::store::<Secret>();
103+
// // Duplicate streams with narrowed down selection
104+
combo_stream.push(
105+
watcher::watcher(
106+
Api::default_namespaced_with(client.clone(), &ApiResource::erase::<Secret>(&())),
107+
Default::default(),
108+
)
109+
.boxed(),
110+
);
111+
combo_stream.push(
112+
watcher::watcher(
113+
Api::all_with(client.clone(), &ApiResource::erase::<Secret>(&())),
114+
Default::default(),
115+
)
116+
.boxed(),
117+
);
87118

88-
let cfg = watcher::Config::default();
89-
let dep_watcher = watcher(deploys, cfg.clone())
90-
.reflect(dep_writer)
91-
.applied_objects()
92-
.for_each(|_| future::ready(()));
93-
let cm_watcher = watcher(cms, cfg.clone())
94-
.reflect(cm_writer)
119+
let multi_writer = MultiCache::default();
120+
let watcher = combo_stream
121+
.reflect(multi_writer.clone())
95122
.applied_objects()
96123
.for_each(|_| future::ready(()));
97-
let sec_watcher = watcher(secret, cfg)
98-
.reflect(sec_writer)
99-
.applied_objects()
100-
.for_each(|_| future::ready(()));
101-
// poll these forever
102-
103-
// multistore
104-
let stores = MultiStore {
105-
deploys: dep_reader,
106-
cms: cm_reader,
107-
secs: sec_reader,
108-
};
109124

110125
// simulate doing stuff with the stores from some other thread
111126
tokio::spawn(async move {
112-
// Show state every 5 seconds of watching
113-
info!("waiting for them to be ready");
114-
stores.deploys.wait_until_ready().await.unwrap();
115-
stores.cms.wait_until_ready().await.unwrap();
116-
stores.secs.wait_until_ready().await.unwrap();
117-
info!("stores initialised");
118127
// can use helper accessors
119-
info!(
120-
"common cm: {:?}",
121-
stores.get_cm("kube-root-ca.crt", "kube-system").unwrap()
122-
);
123128
loop {
124129
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
130+
info!("cache content: {:?}", multi_writer.store.read().keys());
131+
info!(
132+
"common cm: {:?}",
133+
multi_writer.get::<ConfigMap>("kube-root-ca.crt", "kube-system")
134+
);
125135
// access individual sub stores
126-
info!("Current deploys count: {}", stores.deploys.state().len());
136+
info!("Current objects count: {}", multi_writer.store.read().len());
127137
}
128138
});
129-
// info!("long watches starting");
139+
info!("long watches starting");
130140
tokio::select! {
131-
r = dep_watcher => println!("dep watcher exit: {r:?}"),
132-
r = cm_watcher => println!("cm watcher exit: {r:?}"),
133-
r = sec_watcher => println!("sec watcher exit: {r:?}"),
141+
r = watcher => println!("watcher exit: {r:?}"),
134142
}
135143

136144
Ok(())

kube-core/src/gvk.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use thiserror::Error;
1212
pub struct ParseGroupVersionError(pub String);
1313

1414
/// Core information about an API Resource.
15-
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)]
15+
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash, Default)]
1616
pub struct GroupVersionKind {
1717
/// API group
1818
pub group: String,

kube-core/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub mod gvk;
3535
pub use gvk::{GroupVersion, GroupVersionKind, GroupVersionResource};
3636

3737
pub mod metadata;
38-
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta};
38+
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta, TypedResource};
3939

4040
pub mod labels;
4141

0 commit comments

Comments
 (0)