Skip to content

Commit 137aa41

Browse files
Perform typed event filtering inside stream impl
Signed-off-by: Danil-Grigorev <[email protected]>
1 parent 8fa7434 commit 137aa41

File tree

2 files changed

+36
-44
lines changed

2 files changed

+36
-44
lines changed

kube-runtime/src/controller/mod.rs

+9-24
Original file line numberDiff line numberDiff line change
@@ -851,10 +851,7 @@ where
851851
/// }
852852
/// # }
853853
#[cfg(feature = "unstable-runtime-subscribe")]
854-
pub fn for_shared_stream(
855-
trigger: impl Stream<Item = impl Into<Option<Arc<K>>> + Send + 'static> + Send + 'static,
856-
reader: Store<K>,
857-
) -> Self
854+
pub fn for_shared_stream(trigger: impl Stream<Item = Arc<K>> + Send + 'static, reader: Store<K>) -> Self
858855
where
859856
K::DynamicType: Default,
860857
{
@@ -881,16 +878,12 @@ where
881878
/// [`dynamic`]: kube_client::core::dynamic
882879
#[cfg(feature = "unstable-runtime-subscribe")]
883880
pub fn for_shared_stream_with(
884-
trigger: impl Stream<Item = impl Into<Option<Arc<K>>> + Send + 'static> + Send + 'static,
881+
trigger: impl Stream<Item = Arc<K>> + Send + 'static,
885882
reader: Store<K>,
886883
dyntype: K::DynamicType,
887884
) -> Self {
888885
let mut trigger_selector = stream::SelectAll::new();
889-
let self_watcher = trigger_self_shared(
890-
trigger.filter_map(|r| async move { r.into() }).map(Ok),
891-
dyntype.clone(),
892-
)
893-
.boxed();
886+
let self_watcher = trigger_self_shared(trigger.map(Ok), dyntype.clone()).boxed();
894887
trigger_selector.push(self_watcher);
895888
Self {
896889
trigger_selector,
@@ -1119,7 +1112,7 @@ where
11191112
#[must_use]
11201113
pub fn owns_shared_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
11211114
self,
1122-
trigger: impl Stream<Item = impl Into<Option<Arc<Child>>> + Send + 'static> + Send + 'static,
1115+
trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
11231116
) -> Self {
11241117
self.owns_shared_stream_with(trigger, ())
11251118
}
@@ -1137,17 +1130,13 @@ where
11371130
#[must_use]
11381131
pub fn owns_shared_stream_with<Child: Resource<DynamicType = ()> + Send + 'static>(
11391132
mut self,
1140-
trigger: impl Stream<Item = impl Into<Option<Arc<Child>>> + Send + 'static> + Send + 'static,
1133+
trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
11411134
dyntype: Child::DynamicType,
11421135
) -> Self
11431136
where
11441137
Child::DynamicType: Debug + Eq + Hash + Clone,
11451138
{
1146-
let child_watcher = trigger_owners_shared(
1147-
trigger.filter_map(|r| async move { r.into() }).map(Ok),
1148-
self.dyntype.clone(),
1149-
dyntype,
1150-
);
1139+
let child_watcher = trigger_owners_shared(trigger.map(Ok), self.dyntype.clone(), dyntype);
11511140
self.trigger_selector.push(child_watcher.boxed());
11521141
self
11531142
}
@@ -1394,7 +1383,7 @@ where
13941383
#[must_use]
13951384
pub fn watches_shared_stream<Other, I>(
13961385
self,
1397-
trigger: impl Stream<Item = impl Into<Option<Arc<Other>>> + Send + 'static> + Send + 'static,
1386+
trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
13981387
mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
13991388
) -> Self
14001389
where
@@ -1419,7 +1408,7 @@ where
14191408
#[must_use]
14201409
pub fn watches_shared_stream_with<Other, I>(
14211410
mut self,
1422-
trigger: impl Stream<Item = impl Into<Option<Arc<Other>>> + Send + 'static> + Send + 'static,
1411+
trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
14231412
mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
14241413
dyntype: Other::DynamicType,
14251414
) -> Self
@@ -1429,11 +1418,7 @@ where
14291418
I: 'static + IntoIterator<Item = ObjectRef<K>>,
14301419
I::IntoIter: Send,
14311420
{
1432-
let other_watcher = trigger_others_shared(
1433-
trigger.filter_map(|r| async move { r.into() }).map(Ok),
1434-
mapper,
1435-
dyntype,
1436-
);
1421+
let other_watcher = trigger_others_shared(trigger.map(Ok), mapper, dyntype);
14371422
self.trigger_selector.push(other_watcher.boxed());
14381423
self
14391424
}

kube-runtime/src/reflector/dispatcher.rs

+27-20
Original file line numberDiff line numberDiff line change
@@ -256,31 +256,38 @@ where
256256
K::DynamicType: Eq + std::hash::Hash + Clone + Default,
257257
K: DeserializeOwned,
258258
{
259-
type Item = Option<Arc<K>>;
259+
type Item = Arc<K>;
260260

261261
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
262262
let mut this = self.project();
263-
match ready!(this.rx.as_mut().poll_next(cx)) {
264-
Some(event) => {
265-
let obj = match event {
266-
Event::InitApply(obj) | Event::Apply(obj)
267-
if obj.gvk() == Some(K::gvk(&Default::default())) =>
268-
{
269-
obj.try_parse::<K>().ok().map(Arc::new).inspect(|o| {
270-
this.store.apply_shared_watcher_event(&Event::Apply(o.clone()));
271-
})
263+
loop {
264+
return match ready!(this.rx.as_mut().poll_next(cx)) {
265+
Some(event) => {
266+
let obj = match event {
267+
Event::InitApply(obj) | Event::Apply(obj)
268+
if obj.gvk() == Some(K::gvk(&Default::default())) =>
269+
{
270+
obj.try_parse::<K>().ok().map(Arc::new).inspect(|o| {
271+
this.store.apply_shared_watcher_event(&Event::Apply(o.clone()));
272+
})
273+
}
274+
Event::Delete(obj) if obj.gvk() == Some(K::gvk(&Default::default())) => {
275+
obj.try_parse::<K>().ok().map(Arc::new).inspect(|o| {
276+
this.store.apply_shared_watcher_event(&Event::Delete(o.clone()));
277+
})
278+
}
279+
_ => None,
280+
};
281+
282+
// Skip propagating all objects which do not belong to the cache
283+
if obj.is_none() {
284+
continue;
272285
}
273-
Event::Delete(obj) if obj.gvk() == Some(K::gvk(&Default::default())) => {
274-
obj.try_parse::<K>().ok().map(Arc::new).inspect(|o| {
275-
this.store.apply_shared_watcher_event(&Event::Delete(o.clone()));
276-
})
277-
}
278-
_ => None,
279-
};
280286

281-
Poll::Ready(Some(obj))
282-
}
283-
None => Poll::Ready(None),
287+
Poll::Ready(obj)
288+
}
289+
None => Poll::Ready(None),
290+
};
284291
}
285292
}
286293
}

0 commit comments

Comments
 (0)