Skip to content

Commit cf1e646

Browse files
Implement MultiDispatcher for a DynamicObject steam
Signed-off-by: Danil-Grigorev <[email protected]>
1 parent a4035ee commit cf1e646

15 files changed

+447
-348
lines changed

examples/Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ name = "multi_watcher"
132132
path = "multi_watcher.rs"
133133

134134
[[example]]
135-
name = "multi_reflector"
136-
path = "multi_reflector.rs"
135+
name = "broadcast_reflector"
136+
path = "broadcast_reflector.rs"
137137

138138
[[example]]
139139
name = "pod_api"

examples/broadcast_reflector.rs

+108
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use futures::{future, stream, StreamExt};
2+
use k8s_openapi::api::{
3+
apps::v1::Deployment,
4+
core::v1::{ConfigMap, Secret},
5+
};
6+
use kube::{
7+
api::ApiResource,
8+
runtime::{controller::Action, reflector::multi_dispatcher::MultiDispatcher, watcher, Controller, WatchStreamExt as _},
9+
Api, Client, ResourceExt,
10+
};
11+
use std::{fmt::Debug, sync::Arc, time::Duration};
12+
use thiserror::Error;
13+
use tracing::*;
14+
15+
#[derive(Debug, Error)]
16+
enum Infallible {}
17+
18+
// A generic reconciler that can be used with any object whose type is known at
19+
// compile time. Will simply log its kind on reconciliation.
20+
async fn reconcile<K>(_obj: Arc<K>, _ctx: Arc<()>) -> Result<Action, Infallible>
21+
where
22+
K: ResourceExt<DynamicType = ()>,
23+
{
24+
let kind = K::kind(&());
25+
info!("Reconciled {kind}");
26+
Ok(Action::await_change())
27+
}
28+
29+
fn error_policy<K: ResourceExt>(_: Arc<K>, _: &Infallible, _ctx: Arc<()>) -> Action {
30+
info!("error");
31+
Action::requeue(Duration::from_secs(10))
32+
}
33+
34+
#[tokio::main]
35+
async fn main() -> anyhow::Result<()> {
36+
tracing_subscriber::fmt::init();
37+
let client = Client::try_default().await?;
38+
39+
let writer = MultiDispatcher::new(128);
40+
41+
// multireflector stream
42+
let mut combo_stream = stream::select_all(vec![]);
43+
combo_stream.push(
44+
watcher::watcher(
45+
Api::all_with(client.clone(), &ApiResource::erase::<Deployment>(&())),
46+
Default::default(),
47+
)
48+
.boxed(),
49+
);
50+
51+
// watching config maps, but ignoring in the final configuration
52+
combo_stream.push(
53+
watcher::watcher(
54+
Api::all_with(client.clone(), &ApiResource::erase::<ConfigMap>(&())),
55+
Default::default(),
56+
)
57+
.boxed(),
58+
);
59+
60+
// Combine duplicate type streams with narrowed down selection
61+
combo_stream.push(
62+
watcher::watcher(
63+
Api::default_namespaced_with(client.clone(), &ApiResource::erase::<Secret>(&())),
64+
Default::default(),
65+
)
66+
.boxed(),
67+
);
68+
combo_stream.push(
69+
watcher::watcher(
70+
Api::namespaced_with(client.clone(), "kube-system", &ApiResource::erase::<Secret>(&())),
71+
Default::default(),
72+
)
73+
.boxed(),
74+
);
75+
76+
let watcher = combo_stream.broadcast_shared(writer.clone());
77+
78+
let (sub, reader) = writer.subscribe::<Deployment>();
79+
let deploy = Controller::for_shared_stream(sub, reader)
80+
.shutdown_on_signal()
81+
.run(reconcile, error_policy, Arc::new(()))
82+
.for_each(|res| async move {
83+
match res {
84+
Ok(v) => info!("Reconciled deployment {v:?}"),
85+
Err(error) => warn!(%error, "Failed to reconcile metadata"),
86+
};
87+
});
88+
89+
let (sub, reader) = writer.subscribe::<Secret>();
90+
let secret = Controller::for_shared_stream(sub, reader)
91+
.shutdown_on_signal()
92+
.run(reconcile, error_policy, Arc::new(()))
93+
.for_each(|res| async move {
94+
match res {
95+
Ok(v) => info!("Reconciled secret {v:?}"),
96+
Err(error) => warn!(%error, "Failed to reconcile metadata"),
97+
};
98+
});
99+
100+
info!("long watches starting");
101+
tokio::select! {
102+
r = watcher.for_each(|_| future::ready(())) => println!("watcher exit: {r:?}"),
103+
x = deploy => println!("deployments exit: {x:?}"),
104+
x = secret => println!("secrets exit: {x:?}"),
105+
}
106+
107+
Ok(())
108+
}

examples/multi_reflector.rs

-145
This file was deleted.

kube-core/src/dynamic.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
pub use crate::discovery::ApiResource;
55
use crate::{
66
metadata::TypeMeta,
7-
resource::{DynamicResourceScope, Resource},
7+
resource::{DynamicResourceScope, Resource}, GroupVersionKind,
88
};
99

1010
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
@@ -73,6 +73,12 @@ impl DynamicObject {
7373
) -> Result<K, ParseDynamicObjectError> {
7474
Ok(serde_json::from_value(serde_json::to_value(self)?)?)
7575
}
76+
77+
/// Returns the group, version, and kind (GVK) of this resource.
78+
pub fn gvk(&self) -> Option<GroupVersionKind> {
79+
let gvk = self.types.clone()?;
80+
gvk.try_into().ok()
81+
}
7682
}
7783

7884
impl Resource for DynamicObject {

kube-core/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub mod gvk;
3535
pub use gvk::{GroupVersion, GroupVersionKind, GroupVersionResource};
3636

3737
pub mod metadata;
38-
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta, TypedResource};
38+
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta};
3939

4040
pub mod labels;
4141

0 commit comments

Comments
 (0)