Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast shared reflector #1692

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ path = "log_stream.rs"
name = "multi_watcher"
path = "multi_watcher.rs"

[[example]]
name = "broadcast_reflector"
path = "broadcast_reflector.rs"

[[example]]
name = "pod_api"
path = "pod_api.rs"
Expand Down
113 changes: 113 additions & 0 deletions examples/broadcast_reflector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use futures::{future, lock::Mutex, pin_mut, stream, StreamExt};
use k8s_openapi::api::{
apps::v1::Deployment,
core::v1::{ConfigMap, Secret},
};
use kube::{
api::ApiResource,
runtime::{
broadcaster,
controller::Action,
reflector::multi_dispatcher::{BroadcastStream, MultiDispatcher},
watcher, Controller,
},
Api, Client, ResourceExt,
};
use std::{fmt::Debug, pin::pin, sync::Arc, time::Duration};
use thiserror::Error;
use tracing::*;

#[derive(Debug, Error)]
enum Infallible {}

// A generic reconciler that can be used with any object whose type is known at
// compile time. Will simply log its kind on reconciliation.
async fn reconcile<K>(_obj: Arc<K>, _ctx: Arc<()>) -> Result<Action, Infallible>
where
K: ResourceExt<DynamicType = ()>,
{
let kind = K::kind(&());
info!("Reconciled {kind}");
Ok(Action::await_change())
}

fn error_policy<K: ResourceExt>(_: Arc<K>, _: &Infallible, _ctx: Arc<()>) -> Action {
info!("error");
Action::requeue(Duration::from_secs(10))
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;

let writer = MultiDispatcher::new(128);

// multireflector stream
let combo_stream = Arc::new(Mutex::new(stream::select_all(vec![])));
let watcher = broadcaster(writer.clone(), BroadcastStream::new(combo_stream.clone()));

combo_stream.lock().await.push(
watcher::watcher(
Api::all_with(client.clone(), &ApiResource::erase::<Deployment>(&())),
Default::default(),
)
.boxed(),
);

// watching config maps, but ignoring in the final configuration
combo_stream.lock().await.push(
watcher::watcher(
Api::all_with(client.clone(), &ApiResource::erase::<ConfigMap>(&())),
Default::default(),
)
.boxed(),
);

// Combine duplicate type streams with narrowed down selection
combo_stream.lock().await.push(
watcher::watcher(
Api::default_namespaced_with(client.clone(), &ApiResource::erase::<Secret>(&())),
Default::default(),
)
.boxed(),
);
combo_stream.lock().await.push(
watcher::watcher(
Api::namespaced_with(client.clone(), "kube-system", &ApiResource::erase::<Secret>(&())),
Default::default(),
)
.boxed(),
);

let (sub, reader) = writer.subscribe::<Deployment>();
let deploy = Controller::for_shared_stream(sub, reader)
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(()))
.for_each(|res| async move {
match res {
Ok(v) => info!("Reconciled deployment {v:?}"),
Err(error) => warn!(%error, "Failed to reconcile metadata"),
};
});

let (sub, reader) = writer.subscribe::<Secret>();
let secret = Controller::for_shared_stream(sub, reader)
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(()))
.for_each(|res| async move {
match res {
Ok(v) => info!("Reconciled secret {v:?}"),
Err(error) => warn!(%error, "Failed to reconcile metadata"),
};
});

info!("long watches starting");
tokio::select! {
r = watcher.for_each(|_| future::ready(())) => println!("watcher exit: {r:?}"),
x = deploy => println!("deployments exit: {x:?}"),
x = secret => println!("secrets exit: {x:?}"),
}

Ok(())
}
7 changes: 7 additions & 0 deletions kube-core/src/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use crate::{
metadata::TypeMeta,
resource::{DynamicResourceScope, Resource},
GroupVersionKind,
};

use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
Expand Down Expand Up @@ -73,6 +74,12 @@
) -> Result<K, ParseDynamicObjectError> {
Ok(serde_json::from_value(serde_json::to_value(self)?)?)
}

/// Returns the group, version, and kind (GVK) of this resource.
pub fn gvk(&self) -> Option<GroupVersionKind> {
let gvk = self.types.clone()?;
gvk.try_into().ok()

Check warning on line 81 in kube-core/src/dynamic.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/dynamic.rs#L79-L81

Added lines #L79 - L81 were not covered by tests
}
}

impl Resource for DynamicObject {
Expand Down
2 changes: 1 addition & 1 deletion kube-core/src/gvk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use thiserror::Error;
pub struct ParseGroupVersionError(pub String);

/// Core information about an API Resource.
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash, Default)]
pub struct GroupVersionKind {
/// API group
pub group: String,
Expand Down
16 changes: 16 additions & 0 deletions kube-core/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@
kind: K::kind(&()).into(),
}
}

/// Construct a new `TypeMeta` for the object from the list `TypeMeta`.
///
/// ```
/// # use k8s_openapi::api::core::v1::Pod;
/// # use kube_core::TypeMeta;
///
/// let mut type_meta = TypeMeta::resource::<Pod>();
/// type_meta.kind = "PodList".to_string();
/// assert_eq!(type_meta.clone().singular_list().unwrap().kind, "Pod");
/// assert_eq!(type_meta.clone().singular_list().unwrap().api_version, "v1");
/// ```
pub fn singular_list(self) -> Option<Self> {
let kind = self.kind.strip_suffix("List")?.to_string();
(!kind.is_empty()).then_some(Self { kind, ..self })

Check warning on line 68 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L66-L68

Added lines #L66 - L68 were not covered by tests
}
}

/// A generic representation of any object with `ObjectMeta`.
Expand Down
7 changes: 7 additions & 0 deletions kube-core/src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

pub use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope, ResourceScope, SubResourceScope};

use crate::GroupVersionKind;

/// Indicates that a [`Resource`] is of an indeterminate dynamic scope.
pub struct DynamicResourceScope {}
impl ResourceScope for DynamicResourceScope {}
Expand Down Expand Up @@ -54,6 +56,11 @@
/// This is known as the resource in apimachinery, we rename it for disambiguation.
fn plural(dt: &Self::DynamicType) -> Cow<'_, str>;

/// Generates an object reference for the resource
fn gvk(dt: &Self::DynamicType) -> GroupVersionKind {
GroupVersionKind::gvk(&Self::group(dt), &Self::version(dt), &Self::kind(dt))

Check warning on line 61 in kube-core/src/resource.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/resource.rs#L60-L61

Added lines #L60 - L61 were not covered by tests
}

/// Creates a url path for http requests for this resource
fn url_path(dt: &Self::DynamicType, namespace: Option<&str>) -> String {
let n = if let Some(ns) = namespace {
Expand Down
1 change: 1 addition & 0 deletions kube-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod watcher;

pub use controller::{applier, Config, Controller};
pub use finalizer::finalizer;
#[cfg(feature = "unstable-runtime-subscribe")] pub use reflector::broadcaster;
pub use reflector::reflector;
pub use scheduler::scheduler;
pub use utils::WatchStreamExt;
Expand Down
98 changes: 97 additions & 1 deletion kube-runtime/src/reflector/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@

use educe::Educe;
use futures::Stream;
#[cfg(feature = "unstable-runtime-subscribe")]
use kube_client::{api::DynamicObject, Resource};
use pin_project::pin_project;
#[cfg(feature = "unstable-runtime-subscribe")] use serde::de::DeserializeOwned;
use std::task::ready;

use crate::reflector::{ObjectRef, Store};
#[cfg(feature = "unstable-runtime-subscribe")] use crate::watcher::Event;
use async_broadcast::{InactiveReceiver, Receiver, Sender};

#[cfg(feature = "unstable-runtime-subscribe")] use super::store::Writer;
use super::Lookup;

#[derive(Educe)]
Expand Down Expand Up @@ -125,7 +130,7 @@
impl<K> Stream for ReflectHandle<K>
where
K: Lookup + Clone,
K::DynamicType: Eq + std::hash::Hash + Clone + Default,
K::DynamicType: Eq + std::hash::Hash + Clone,
{
type Item = Arc<K>;

Expand All @@ -141,6 +146,97 @@
}
}

/// A handle to a shared dynamic object stream
///
/// [`TypedReflectHandle`]s are created by calling [`subscribe()`] on a [`TypedDispatcher`],
/// Each shared stream reader should be polled independently and driven to readiness
/// to avoid deadlocks. When the [`TypedDispatcher`]'s buffer is filled, backpressure
/// will be applied on the root stream side.
///
/// When the root stream is dropped, or it ends, all [`TypedReflectHandle`]s
/// subscribed to the shared stream will also terminate after all events yielded by
/// the root stream have been observed. This means [`TypedReflectHandle`] streams
/// can still be polled after the root stream has been dropped.
#[cfg(feature = "unstable-runtime-subscribe")]
#[pin_project]
pub struct TypedReflectHandle<K>
where
K: Lookup + Clone + 'static,
K::DynamicType: Eq + std::hash::Hash + Clone,
K: DeserializeOwned,
{
#[pin]
rx: Receiver<Event<DynamicObject>>,
store: Writer<K>,
}

#[cfg(feature = "unstable-runtime-subscribe")]
impl<K> TypedReflectHandle<K>
where
K: Lookup + Clone + 'static,
K::DynamicType: Eq + std::hash::Hash + Clone + Default,
K: DeserializeOwned,
{
pub(super) fn new(rx: Receiver<Event<DynamicObject>>) -> TypedReflectHandle<K> {

Check warning on line 180 in kube-runtime/src/reflector/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/reflector/dispatcher.rs#L180

Added line #L180 was not covered by tests
Self {
rx,
// Initialize a ready store by default
store: {

Check warning on line 184 in kube-runtime/src/reflector/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/reflector/dispatcher.rs#L184

Added line #L184 was not covered by tests
let mut store: Writer<K> = Default::default();
store.apply_shared_watcher_event(&Event::InitDone);
store
},
}
}

pub fn reader(&self) -> Store<K> {
self.store.as_reader()

Check warning on line 193 in kube-runtime/src/reflector/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/reflector/dispatcher.rs#L192-L193

Added lines #L192 - L193 were not covered by tests
}
}

#[cfg(feature = "unstable-runtime-subscribe")]
impl<K> Stream for TypedReflectHandle<K>
where
K: Resource + Clone + 'static,
K::DynamicType: Eq + std::hash::Hash + Clone + Default,
K: DeserializeOwned,
{
type Item = Arc<K>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
return match ready!(this.rx.as_mut().poll_next(cx)) {
Some(event) => {
let obj = match event {
Event::InitApply(obj) | Event::Apply(obj)
if obj.gvk() == Some(K::gvk(&Default::default())) =>

Check warning on line 213 in kube-runtime/src/reflector/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/reflector/dispatcher.rs#L206-L213

Added lines #L206 - L213 were not covered by tests
{
obj.try_parse::<K>().ok().map(Arc::new).inspect(|o| {
this.store.apply_shared_watcher_event(&Event::Apply(o.clone()));

Check warning on line 216 in kube-runtime/src/reflector/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/reflector/dispatcher.rs#L215-L216

Added lines #L215 - L216 were not covered by tests
})
}
Event::Delete(obj) if obj.gvk() == Some(K::gvk(&Default::default())) => {
obj.try_parse::<K>().ok().map(Arc::new).inspect(|o| {
this.store.apply_shared_watcher_event(&Event::Delete(o.clone()));

Check warning on line 221 in kube-runtime/src/reflector/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/reflector/dispatcher.rs#L219-L221

Added lines #L219 - L221 were not covered by tests
})
}
_ => None,

Check warning on line 224 in kube-runtime/src/reflector/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/reflector/dispatcher.rs#L224

Added line #L224 was not covered by tests
};

// Skip propagating all objects which do not belong to the cache
if obj.is_none() {
continue;

Check warning on line 229 in kube-runtime/src/reflector/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/reflector/dispatcher.rs#L228-L229

Added lines #L228 - L229 were not covered by tests
}

Poll::Ready(obj)

Check warning on line 232 in kube-runtime/src/reflector/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/reflector/dispatcher.rs#L232

Added line #L232 was not covered by tests
}
None => Poll::Ready(None),

Check warning on line 234 in kube-runtime/src/reflector/dispatcher.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/reflector/dispatcher.rs#L234

Added line #L234 was not covered by tests
};
}
}
}

#[cfg(feature = "unstable-runtime-subscribe")]
#[cfg(test)]
pub(crate) mod test {
Expand Down
29 changes: 29 additions & 0 deletions kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Caches objects in memory

mod dispatcher;
#[cfg(feature = "unstable-runtime-subscribe")] pub mod multi_dispatcher;
mod object_ref;
pub mod store;

Expand All @@ -11,6 +12,12 @@
use crate::watcher;
use async_stream::stream;
use futures::{Stream, StreamExt};
#[cfg(feature = "unstable-runtime-subscribe")]
use kube_client::api::DynamicObject;
#[cfg(feature = "unstable-runtime-subscribe")]
use multi_dispatcher::BroadcastStream;
#[cfg(feature = "unstable-runtime-subscribe")]
use multi_dispatcher::MultiDispatcher;
use std::hash::Hash;
#[cfg(feature = "unstable-runtime-subscribe")] pub use store::store_shared;
pub use store::{store, Store};
Expand Down Expand Up @@ -134,6 +141,28 @@
}
}

// broadcaster uses a common stream of DynamicObject events to distribute to any subscribed typed watcher.
#[cfg(feature = "unstable-runtime-subscribe")]
pub fn broadcaster<W>(
mut writer: MultiDispatcher,
mut stream: BroadcastStream<W>,
) -> impl Stream<Item = W::Item>
where
W: Stream<Item = watcher::Result<watcher::Event<DynamicObject>>> + Unpin,
{
stream! {
while let Some(event) = stream.next().await {
match event {
Ok(ev) => {
writer.broadcast_event(&ev).await;
yield Ok(ev);

Check warning on line 158 in kube-runtime/src/reflector/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/reflector/mod.rs#L153-L158

Added lines #L153 - L158 were not covered by tests
},
Err(ev) => yield Err(ev)

Check warning on line 160 in kube-runtime/src/reflector/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/reflector/mod.rs#L160

Added line #L160 was not covered by tests
}
}
}
}

#[cfg(test)]
mod tests {
use super::{reflector, store, ObjectRef};
Expand Down
Loading