Skip to content

Commit c01f5fe

Browse files
Implement MultiDispatcher for a DynamicObject steam
Signed-off-by: Danil-Grigorev <[email protected]>
1 parent a4035ee commit c01f5fe

File tree

14 files changed

+443
-330
lines changed

14 files changed

+443
-330
lines changed

examples/Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ name = "multi_watcher"
132132
path = "multi_watcher.rs"
133133

134134
[[example]]
135-
name = "multi_reflector"
136-
path = "multi_reflector.rs"
135+
name = "broadcast_reflector"
136+
path = "broadcast_reflector.rs"
137137

138138
[[example]]
139139
name = "pod_api"

examples/broadcast_reflector.rs

+112
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
use futures::{future, stream, StreamExt};
2+
use k8s_openapi::api::{
3+
apps::v1::Deployment,
4+
core::v1::{ConfigMap, Secret},
5+
};
6+
use kube::{
7+
api::ApiResource,
8+
runtime::{
9+
controller::Action,
10+
reflector::multi_dispatcher::MultiDispatcher,
11+
watcher, Controller, WatchStreamExt,
12+
},
13+
Api, Client, ResourceExt,
14+
};
15+
use std::{fmt::Debug, sync::Arc, time::Duration};
16+
use thiserror::Error;
17+
use tracing::*;
18+
19+
#[derive(Debug, Error)]
20+
enum Infallible {}
21+
22+
// A generic reconciler that can be used with any object whose type is known at
23+
// compile time. Will simply log its kind on reconciliation.
24+
async fn reconcile<K>(_obj: Arc<K>, _ctx: Arc<()>) -> Result<Action, Infallible>
25+
where
26+
K: ResourceExt<DynamicType = ()>,
27+
{
28+
let kind = K::kind(&());
29+
info!("Reconciled {kind}");
30+
Ok(Action::await_change())
31+
}
32+
33+
fn error_policy<K: ResourceExt>(_: Arc<K>, _: &Infallible, _ctx: Arc<()>) -> Action {
34+
info!("error");
35+
Action::requeue(Duration::from_secs(10))
36+
}
37+
38+
#[tokio::main]
39+
async fn main() -> anyhow::Result<()> {
40+
tracing_subscriber::fmt::init();
41+
let client = Client::try_default().await?;
42+
43+
let writer = MultiDispatcher::new(128);
44+
45+
// multireflector stream
46+
let mut combo_stream = stream::select_all(vec![]);
47+
combo_stream.push(
48+
watcher::watcher(
49+
Api::all_with(client.clone(), &ApiResource::erase::<Deployment>(&())),
50+
Default::default(),
51+
)
52+
.boxed(),
53+
);
54+
55+
// watching config maps, but ignoring in the final configuration
56+
combo_stream.push(
57+
watcher::watcher(
58+
Api::all_with(client.clone(), &ApiResource::erase::<ConfigMap>(&())),
59+
Default::default(),
60+
)
61+
.boxed(),
62+
);
63+
64+
// Combine duplicate type streams with narrowed down selection
65+
combo_stream.push(
66+
watcher::watcher(
67+
Api::default_namespaced_with(client.clone(), &ApiResource::erase::<Secret>(&())),
68+
Default::default(),
69+
)
70+
.boxed(),
71+
);
72+
combo_stream.push(
73+
watcher::watcher(
74+
Api::namespaced_with(client.clone(), "kube-system", &ApiResource::erase::<Secret>(&())),
75+
Default::default(),
76+
)
77+
.boxed(),
78+
);
79+
80+
let watcher = combo_stream.broadcast_shared(writer.clone());
81+
82+
let (sub, reader) = writer.subscribe::<Deployment>();
83+
let deploy = Controller::for_shared_stream(sub, reader)
84+
.shutdown_on_signal()
85+
.run(reconcile, error_policy, Arc::new(()))
86+
.for_each(|res| async move {
87+
match res {
88+
Ok(v) => info!("Reconciled deployment {v:?}"),
89+
Err(error) => warn!(%error, "Failed to reconcile metadata"),
90+
};
91+
});
92+
93+
let (sub, reader) = writer.subscribe::<Secret>();
94+
let secret = Controller::for_shared_stream(sub, reader)
95+
.shutdown_on_signal()
96+
.run(reconcile, error_policy, Arc::new(()))
97+
.for_each(|res| async move {
98+
match res {
99+
Ok(v) => info!("Reconciled secret {v:?}"),
100+
Err(error) => warn!(%error, "Failed to reconcile metadata"),
101+
};
102+
});
103+
104+
info!("long watches starting");
105+
tokio::select! {
106+
r = watcher.for_each(|_| future::ready(())) => println!("watcher exit: {r:?}"),
107+
x = deploy => println!("deployments exit: {x:?}"),
108+
x = secret => println!("secrets exit: {x:?}"),
109+
}
110+
111+
Ok(())
112+
}

examples/multi_reflector.rs

-145
This file was deleted.

kube-core/src/dynamic.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
pub use crate::discovery::ApiResource;
55
use crate::{
66
metadata::TypeMeta,
7-
resource::{DynamicResourceScope, Resource},
7+
resource::{DynamicResourceScope, Resource}, GroupVersionKind,
88
};
99

1010
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
@@ -73,6 +73,12 @@ impl DynamicObject {
7373
) -> Result<K, ParseDynamicObjectError> {
7474
Ok(serde_json::from_value(serde_json::to_value(self)?)?)
7575
}
76+
77+
/// Returns the group, version, and kind (GVK) of this resource.
78+
pub fn gvk(&self) -> Option<GroupVersionKind> {
79+
let gvk = self.types.clone()?;
80+
gvk.try_into().ok()
81+
}
7682
}
7783

7884
impl Resource for DynamicObject {

kube-core/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub mod gvk;
3535
pub use gvk::{GroupVersion, GroupVersionKind, GroupVersionResource};
3636

3737
pub mod metadata;
38-
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta, TypedResource};
38+
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta};
3939

4040
pub mod labels;
4141

0 commit comments

Comments
 (0)