Skip to content

Commit 450df09

Browse files
Exploring shared cache based on multi reflectors
Signed-off-by: Danil-Grigorev <[email protected]>
1 parent 550e50f commit 450df09

File tree

15 files changed

+500
-208
lines changed

15 files changed

+500
-208
lines changed

examples/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ ws = ["kube/ws"]
2222
latest = ["k8s-openapi/latest"]
2323

2424
[dev-dependencies]
25+
parking_lot.workspace = true
2526
tokio-util.workspace = true
2627
assert-json-diff.workspace = true
2728
garde = { version = "0.22.0", default-features = false, features = ["derive"] }

examples/multi_reflector.rs

+97-89
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,99 @@
1-
use futures::{future, StreamExt};
1+
use futures::{future, stream, StreamExt};
22
use k8s_openapi::api::{
33
apps::v1::Deployment,
44
core::v1::{ConfigMap, Secret},
55
};
66
use kube::{
7+
api::{ApiResource, DynamicObject, GroupVersionKind},
8+
core::TypedResource,
79
runtime::{
8-
reflector,
9-
reflector::{ObjectRef, Store},
10-
watcher, WatchStreamExt,
10+
reflector::{
11+
store::{CacheWriter, Writer},
12+
ObjectRef, Store,
13+
},
14+
watcher::{self, dynamic_watcher},
15+
WatchStreamExt,
1116
},
12-
Api, Client,
17+
Api, Client, Resource,
1318
};
19+
use parking_lot::RwLock;
20+
use serde::de::DeserializeOwned;
1421
use std::sync::Arc;
1522
use tracing::*;
1623

17-
// This does not work because Resource trait is not dyn safe.
18-
/*
19-
use std::any::TypeId;
2024
use std::collections::HashMap;
21-
use k8s_openapi::NamespaceResourceScope;
22-
use kube::api::{Resource, ResourceExt};
23-
struct MultiStore {
24-
stores: HashMap<TypeId, Store<dyn Resource<DynamicType = (), Scope = NamespaceResourceScope>>>,
25-
}
26-
impl MultiStore {
27-
fn get<K: Resource<DynamicType = ()>>(&self, name: &str, ns: &str) -> Option<Arc<K>> {
28-
let oref = ObjectRef::<K>::new(name).within(ns);
29-
if let Some(store) = self.stores.get(&TypeId::of::<K>()) {
30-
store.get(oref)
31-
} else {
32-
None
33-
}
34-
}
35-
}*/
3625

37-
// explicit store can work
26+
type Cache = Arc<RwLock<HashMap<GroupVersionKind, Writer<DynamicObject>>>>;
27+
28+
#[derive(Default, Clone, Copy)]
29+
struct MultiWriter {
30+
store: Cache,
31+
buffer: Cache,
32+
}
33+
34+
#[derive(Default, Clone)]
3835
struct MultiStore {
39-
deploys: Store<Deployment>,
40-
cms: Store<ConfigMap>,
41-
secs: Store<Secret>,
36+
store: Cache,
4237
}
43-
// but using generics to help out won't because the K needs to be concretised
44-
/*
45-
impl MultiStore {
46-
fn get<K: Resource<DynamicType = ()>>(&self, name: &str, ns: &str) -> Option<Arc<Option<K>>> {
47-
let oref = ObjectRef::<K>::new(name).within(ns);
48-
let kind = K::kind(&()).to_owned();
49-
match kind.as_ref() {
50-
"Deployment" => self.deploys.get(&ObjectRef::new(name).within(ns)),
51-
"ConfigMap" => self.cms.get(&ObjectRef::new(name).within(ns)),
52-
"Secret" => self.secs.get(&ObjectRef::new(name).within(ns)),
53-
_ => None,
54-
}
55-
None
38+
39+
impl MultiWriter {
40+
fn as_reader(&self) -> MultiStore {
41+
MultiStore { store: self.store }
5642
}
5743
}
58-
*/
59-
// so left with this
6044

6145
impl MultiStore {
62-
fn get_deploy(&self, name: &str, ns: &str) -> Option<Arc<Deployment>> {
63-
self.deploys.get(&ObjectRef::<Deployment>::new(name).within(ns))
46+
fn get<K: Resource<DynamicType = impl Default> + DeserializeOwned + Clone>(
47+
&self,
48+
name: &str,
49+
ns: &str,
50+
) -> Option<Arc<K>> {
51+
let oref = ObjectRef::<K>::new(name).within(ns).erase();
52+
let store = self.get_store::<K>()?;
53+
let obj = store.get(&oref)?.as_ref().clone();
54+
obj.try_parse().ok().map(Arc::new)
6455
}
6556

66-
fn get_secret(&self, name: &str, ns: &str) -> Option<Arc<Secret>> {
67-
self.secs.get(&ObjectRef::<Secret>::new(name).within(ns))
57+
fn get_store<K: Resource<DynamicType = impl Default> + DeserializeOwned + Clone>(
58+
&self,
59+
) -> Option<Store<DynamicObject>> {
60+
Some(self.store.read().get(&K::gvk(&Default::default()))?.as_reader())
6861
}
62+
}
6963

70-
fn get_cm(&self, name: &str, ns: &str) -> Option<Arc<ConfigMap>> {
71-
self.cms.get(&ObjectRef::<ConfigMap>::new(name).within(ns))
64+
impl CacheWriter<DynamicObject> for MultiWriter {
65+
/// Applies a single watcher event to the store
66+
fn apply_watcher_event(&mut self, event: &watcher::Event<DynamicObject>) {
67+
match event {
68+
watcher::Event::Init | watcher::Event::InitDone(None) => {}
69+
watcher::Event::Apply(obj) | watcher::Event::Delete(obj) => {
70+
let mut stores = self.store.write();
71+
if stores.get(&obj.gvk()).is_none() {
72+
let store = Writer::new(ApiResource::from_gvk(&obj.gvk()));
73+
stores.insert(obj.gvk(), store);
74+
};
75+
if let Some(store) = stores.get_mut(&obj.gvk()) {
76+
store.apply_watcher_event(event);
77+
};
78+
}
79+
watcher::Event::InitApply(obj) => {
80+
let mut buffer = self.buffer.write();
81+
if buffer.get(&obj.gvk()).is_none() {
82+
let store = Writer::new(ApiResource::from_gvk(&obj.gvk()));
83+
buffer.insert(obj.gvk(), store);
84+
};
85+
if let Some(store) = buffer.get_mut(&obj.gvk()) {
86+
store.apply_watcher_event(event);
87+
};
88+
}
89+
watcher::Event::InitDone(Some(obj)) => {
90+
let mut buffer = self.buffer.write();
91+
if let Some(mut store) = buffer.remove(&obj.gvk()) {
92+
store.apply_watcher_event(event);
93+
self.store.write().insert(obj.gvk(), store);
94+
}
95+
}
96+
}
7297
}
7398
}
7499

@@ -77,60 +102,43 @@ async fn main() -> anyhow::Result<()> {
77102
tracing_subscriber::fmt::init();
78103
let client = Client::try_default().await?;
79104

80-
let deploys: Api<Deployment> = Api::default_namespaced(client.clone());
81-
let cms: Api<ConfigMap> = Api::default_namespaced(client.clone());
82-
let secret: Api<Secret> = Api::default_namespaced(client.clone());
83-
84-
let (dep_reader, dep_writer) = reflector::store::<Deployment>();
85-
let (cm_reader, cm_writer) = reflector::store::<ConfigMap>();
86-
let (sec_reader, sec_writer) = reflector::store::<Secret>();
105+
// multistore
106+
let combo_stream = stream::select_all(vec![
107+
dynamic_watcher(Api::<Deployment>::all(client.clone()), Default::default()).boxed(),
108+
dynamic_watcher(Api::<ConfigMap>::all(client.clone()), Default::default()).boxed(),
109+
dynamic_watcher(Api::<Secret>::all(client.clone()), Default::default()).boxed(),
110+
]);
87111

88-
let cfg = watcher::Config::default();
89-
let dep_watcher = watcher(deploys, cfg.clone())
90-
.reflect(dep_writer)
91-
.applied_objects()
92-
.for_each(|_| future::ready(()));
93-
let cm_watcher = watcher(cms, cfg.clone())
94-
.reflect(cm_writer)
95-
.applied_objects()
96-
.for_each(|_| future::ready(()));
97-
let sec_watcher = watcher(secret, cfg)
98-
.reflect(sec_writer)
112+
let multi_writer = MultiWriter::default();
113+
let watcher = combo_stream
114+
.reflect(multi_writer)
99115
.applied_objects()
100116
.for_each(|_| future::ready(()));
101-
// poll these forever
102-
103-
// multistore
104-
let stores = MultiStore {
105-
deploys: dep_reader,
106-
cms: cm_reader,
107-
secs: sec_reader,
108-
};
109117

110118
// simulate doing stuff with the stores from some other thread
111119
tokio::spawn(async move {
112-
// Show state every 5 seconds of watching
113-
info!("waiting for them to be ready");
114-
stores.deploys.wait_until_ready().await.unwrap();
115-
stores.cms.wait_until_ready().await.unwrap();
116-
stores.secs.wait_until_ready().await.unwrap();
117-
info!("stores initialised");
118120
// can use helper accessors
119-
info!(
120-
"common cm: {:?}",
121-
stores.get_cm("kube-root-ca.crt", "kube-system").unwrap()
122-
);
123121
loop {
124122
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
123+
info!(
124+
"cache content: {:?}",
125+
multi_writer.as_reader().store.read().keys()
126+
);
127+
info!(
128+
"common cm: {:?}",
129+
multi_writer
130+
.as_reader()
131+
.get::<ConfigMap>("kube-root-ca.crt", "kube-system")
132+
);
125133
// access individual sub stores
126-
info!("Current deploys count: {}", stores.deploys.state().len());
134+
if let Some(deploys) = multi_writer.as_reader().get_store::<Deployment>() {
135+
info!("Current deploys count: {}", deploys.state().len());
136+
}
127137
}
128138
});
129-
// info!("long watches starting");
139+
info!("long watches starting");
130140
tokio::select! {
131-
r = dep_watcher => println!("dep watcher exit: {r:?}"),
132-
r = cm_watcher => println!("cm watcher exit: {r:?}"),
133-
r = sec_watcher => println!("sec watcher exit: {r:?}"),
141+
r = watcher => println!("watcher exit: {r:?}"),
134142
}
135143

136144
Ok(())

kube-core/src/gvk.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use thiserror::Error;
1212
pub struct ParseGroupVersionError(pub String);
1313

1414
/// Core information about an API Resource.
15-
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)]
15+
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash, Default)]
1616
pub struct GroupVersionKind {
1717
/// API group
1818
pub group: String,

kube-core/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub mod gvk;
3535
pub use gvk::{GroupVersion, GroupVersionKind, GroupVersionResource};
3636

3737
pub mod metadata;
38-
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta};
38+
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta, TypedResource};
3939

4040
pub mod labels;
4141

kube-core/src/metadata.rs

+118-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::{borrow::Cow, marker::PhantomData};
44
pub use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ListMeta, ObjectMeta};
55
use serde::{Deserialize, Serialize};
66

7-
use crate::{DynamicObject, Resource};
7+
use crate::{ApiResource, DynamicObject, GroupVersionKind, Resource};
88

99
/// Type information that is flattened into every kubernetes object
1010
#[derive(Deserialize, Serialize, Clone, Default, Debug, Eq, PartialEq, Hash)]
@@ -175,6 +175,123 @@ impl<K: Resource> Resource for PartialObjectMeta<K> {
175175
}
176176
}
177177

178+
///
179+
pub trait TypedResource: Resource + Sized {
180+
///
181+
fn type_meta(&self) -> TypeMeta;
182+
///
183+
fn gvk(&self) -> GroupVersionKind;
184+
///
185+
fn kind(&self) -> Cow<'_, str>;
186+
///
187+
fn group(&self) -> Cow<'_, str>;
188+
///
189+
fn version(&self) -> Cow<'_, str>;
190+
///
191+
fn plural(&self) -> Cow<'_, str>;
192+
}
193+
194+
impl<K> TypedResource for K
195+
where
196+
K: Resource,
197+
(K, K::DynamicType): TypedResourceImpl<Resource = K>,
198+
{
199+
fn type_meta(&self) -> TypeMeta {
200+
<(K, K::DynamicType) as TypedResourceImpl>::type_meta(self)
201+
}
202+
203+
fn gvk(&self) -> GroupVersionKind {
204+
<(K, K::DynamicType) as TypedResourceImpl>::gvk(self)
205+
}
206+
207+
fn kind(&self) -> Cow<'_, str> {
208+
<(K, K::DynamicType) as TypedResourceImpl>::kind(self)
209+
}
210+
///
211+
fn group(&self) -> Cow<'_, str> {
212+
<(K, K::DynamicType) as TypedResourceImpl>::group(self)
213+
}
214+
///
215+
fn version(&self) -> Cow<'_, str> {
216+
<(K, K::DynamicType) as TypedResourceImpl>::version(self)
217+
}
218+
///
219+
fn plural(&self) -> Cow<'_, str> {
220+
<(K, K::DynamicType) as TypedResourceImpl>::plural(self)
221+
}
222+
}
223+
224+
#[doc(hidden)]
225+
// Workaround for https://github.com/rust-lang/rust/issues/20400
226+
pub trait TypedResourceImpl {
227+
type Resource: Resource;
228+
fn type_meta(res: &Self::Resource) -> TypeMeta;
229+
fn gvk(res: &Self::Resource) -> GroupVersionKind;
230+
fn kind(res: &Self::Resource) -> Cow<'_, str>;
231+
fn group(res: &Self::Resource) -> Cow<'_, str>;
232+
fn version(res: &Self::Resource) -> Cow<'_, str>;
233+
fn plural(res: &Self::Resource) -> Cow<'_, str>;
234+
}
235+
236+
impl<K> TypedResourceImpl for (K, ())
237+
where
238+
K: Resource<DynamicType = ()>,
239+
{
240+
type Resource = K;
241+
242+
fn type_meta(_: &Self::Resource) -> TypeMeta {
243+
TypeMeta::resource::<K>()
244+
}
245+
246+
fn gvk(res: &Self::Resource) -> GroupVersionKind {
247+
GroupVersionKind::gvk(&res.group(), &res.version(), &res.kind())
248+
}
249+
250+
fn kind(_: &Self::Resource) -> Cow<'_, str> {
251+
K::kind(&())
252+
}
253+
254+
fn group(_: &Self::Resource) -> Cow<'_, str> {
255+
K::group(&())
256+
}
257+
258+
fn version(_: &Self::Resource) -> Cow<'_, str> {
259+
K::version(&())
260+
}
261+
262+
fn plural(_: &Self::Resource) -> Cow<'_, str> {
263+
K::plural(&())
264+
}
265+
}
266+
267+
impl TypedResourceImpl for (DynamicObject, ApiResource) {
268+
type Resource = DynamicObject;
269+
270+
fn type_meta(obj: &Self::Resource) -> TypeMeta {
271+
obj.types.clone().unwrap_or_default()
272+
}
273+
274+
fn gvk(res: &Self::Resource) -> GroupVersionKind {
275+
res.type_meta().try_into().unwrap_or_default()
276+
}
277+
278+
fn kind(res: &Self::Resource) -> Cow<'_, str> {
279+
Cow::from(res.type_meta().kind)
280+
}
281+
282+
fn group(res: &Self::Resource) -> Cow<'_, str> {
283+
Cow::from(res.gvk().group)
284+
}
285+
286+
fn version(res: &Self::Resource) -> Cow<'_, str> {
287+
Cow::from(res.gvk().version)
288+
}
289+
290+
fn plural(res: &Self::Resource) -> Cow<'_, str> {
291+
Cow::from(ApiResource::from_gvk(&res.gvk()).plural)
292+
}
293+
}
294+
178295
#[cfg(test)]
179296
mod test {
180297
use super::{ObjectMeta, PartialObjectMeta, PartialObjectMetaExt};

0 commit comments

Comments
 (0)