Skip to content

Commit ceac3c1

Browse files
Refactor blocking loop
Signed-off-by: Danil-Grigorev <[email protected]>
1 parent 86a4636 commit ceac3c1

File tree

3 files changed

+62
-15
lines changed

3 files changed

+62
-15
lines changed

examples/broadcast_reflector.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
1-
use futures::{future, pin_mut, stream, StreamExt};
1+
use futures::{future, lock::Mutex, pin_mut, stream, StreamExt};
22
use k8s_openapi::api::{
33
apps::v1::Deployment,
44
core::v1::{ConfigMap, Secret},
55
};
66
use kube::{
77
api::ApiResource,
88
runtime::{
9-
broadcaster, controller::Action, reflector::multi_dispatcher::MultiDispatcher, watcher, Controller,
9+
broadcaster,
10+
controller::Action,
11+
reflector::multi_dispatcher::{BroadcastStream, MultiDispatcher},
12+
watcher, Controller,
1013
},
1114
Api, Client, ResourceExt,
1215
};
1316
use std::{fmt::Debug, pin::pin, sync::Arc, time::Duration};
1417
use thiserror::Error;
15-
use tokio::sync::Mutex;
1618
use tracing::*;
1719

1820
#[derive(Debug, Error)]
@@ -43,7 +45,7 @@ async fn main() -> anyhow::Result<()> {
4345

4446
// multireflector stream
4547
let combo_stream = Arc::new(Mutex::new(stream::select_all(vec![])));
46-
let watcher = broadcaster(writer.clone(), combo_stream.clone());
48+
let watcher = broadcaster(writer.clone(), BroadcastStream::new(combo_stream.clone()));
4749

4850
combo_stream.lock().await.push(
4951
watcher::watcher(

kube-runtime/src/reflector/mod.rs

+7-9
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,14 @@ use futures::{Stream, StreamExt};
1818
#[cfg(feature = "unstable-runtime-subscribe")]
1919
use kube_client::api::DynamicObject;
2020
#[cfg(feature = "unstable-runtime-subscribe")]
21+
use multi_dispatcher::BroadcastStream;
22+
#[cfg(feature = "unstable-runtime-subscribe")]
2123
use multi_dispatcher::MultiDispatcher;
2224
use std::hash::Hash;
2325
#[cfg(feature = "unstable-runtime-subscribe")] use std::pin::Pin;
2426
#[cfg(feature = "unstable-runtime-subscribe")] pub use store::store_shared;
2527
pub use store::{store, Store};
2628

27-
#[cfg(feature = "unstable-runtime-subscribe")]
28-
/// Type for a shared stream of dynamic objects, which can be provided to [`broadcaster`]
29-
/// wrapped as [`Arc<Mutex<DynamicStream>>`], this type can be stored in context and provided
30-
/// to controllers, which can arbitrary modify existing event streams at runtime.
31-
pub type DynamicStream =
32-
SelectAll<Pin<Box<dyn Stream<Item = Result<watcher::Event<DynamicObject>, watcher::Error>> + Send>>>;
33-
3429
/// Cache objects from a [`watcher()`] stream into a local [`Store`]
3530
///
3631
/// Observes the raw `Stream` of [`watcher::Event`] objects, and modifies the cache.
@@ -152,12 +147,15 @@ where
152147

153148
// broadcaster uses a common stream of DynamicObject events to distribute to any subscribed typed watcher.
154149
#[cfg(feature = "unstable-runtime-subscribe")]
155-
pub fn broadcaster<W>(mut writer: MultiDispatcher, stream: Arc<Mutex<W>>) -> impl Stream<Item = W::Item>
150+
pub fn broadcaster<W>(
151+
mut writer: MultiDispatcher,
152+
mut stream: BroadcastStream<W>,
153+
) -> impl Stream<Item = W::Item>
156154
where
157155
W: Stream<Item = watcher::Result<watcher::Event<DynamicObject>>> + Unpin,
158156
{
159157
stream! {
160-
while let Some(event) = stream.lock().await.next().await {
158+
while let Some(event) = stream.next().await {
161159
match event {
162160
Ok(ev) => {
163161
writer.broadcast_event(&ev).await;

kube-runtime/src/reflector/multi_dispatcher.rs

+49-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
1-
use std::hash::Hash;
1+
use std::{
2+
hash::Hash,
3+
pin::Pin,
4+
sync::Arc,
5+
task::{Context, Poll},
6+
};
27

8+
use futures::{lock::Mutex, FutureExt, Stream, StreamExt as _};
39
use kube_client::{api::DynamicObject, Resource};
410
use serde::de::DeserializeOwned;
511

@@ -43,6 +49,47 @@ impl MultiDispatcher {
4349

4450
/// Broadcast an event to any downstream listeners subscribed on the store
4551
pub(crate) async fn broadcast_event(&mut self, event: &watcher::Event<DynamicObject>) {
46-
self.dispatcher.broadcast(event.clone()).await
52+
match event {
53+
// Broadcast stores are pre-initialized
54+
watcher::Event::InitDone => {}
55+
ev => self.dispatcher.broadcast(ev.clone()).await,
56+
}
57+
}
58+
}
59+
60+
/// See [`Scheduler::hold`]
61+
pub struct BroadcastStream<W> {
62+
pub stream: Arc<Mutex<W>>,
63+
}
64+
65+
impl<W> Clone for BroadcastStream<W> {
66+
fn clone(&self) -> Self {
67+
Self {
68+
stream: self.stream.clone(),
69+
}
70+
}
71+
}
72+
73+
impl<W> BroadcastStream<W>
74+
where
75+
W: Stream<Item = watcher::Result<watcher::Event<DynamicObject>>> + Unpin,
76+
{
77+
pub fn new(stream: Arc<Mutex<W>>) -> Self {
78+
Self { stream }
79+
}
80+
}
81+
82+
impl<W> Stream for BroadcastStream<W>
83+
where
84+
W: Stream<Item = watcher::Result<watcher::Event<DynamicObject>>> + Unpin,
85+
{
86+
type Item = W::Item;
87+
88+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
89+
if let Some(mut stream) = self.stream.try_lock() {
90+
return stream.poll_next_unpin(cx);
91+
}
92+
93+
Poll::Pending
4794
}
4895
}

0 commit comments

Comments
 (0)