diff --git a/examples/crd_reflector.rs b/examples/crd_reflector.rs index 493908d35..2eca8fe2f 100644 --- a/examples/crd_reflector.rs +++ b/examples/crd_reflector.rs @@ -39,18 +39,25 @@ async fn main() -> anyhow::Result<()> { let lp = ListParams::default().timeout(20); // low timeout in this example let rf = reflector(writer, watcher(foos, lp)); + let mut rfa = rf.applied_objects().boxed(); + // keep polling the reflector stream in a background task tokio::spawn(async move { loop { - // Periodically read our state - // while this runs you can kubectl apply -f crd-baz.yaml or crd-qux.yaml and see it works - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - let crds = reader.state().iter().map(|r| r.name_any()).collect::<Vec<_>>(); - info!("Current crds: {:?}", crds); + match rfa.try_next().await { + Ok(Some(event)) => info!("saw {}", event.name_any()), + Ok(_) => {} + Err(e) => { + error!("Kubernetes stream failure: {}", e); + panic!("Kubernetes stream exited"); + } + } } }); - let mut rfa = rf.applied_objects().boxed(); - while let Some(event) = rfa.try_next().await? { - info!("saw {}", event.name_any()); + + // Periodically dump state + loop { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + let foos = reader.state().iter().map(|r| r.name_any()).collect::<Vec<_>>(); + info!("Current {} foos: {:?}", foos.len(), foos); } - Ok(()) } diff --git a/kube-core/src/dynamic.rs b/kube-core/src/dynamic.rs index ebaa7e3a6..8364192f0 100644 --- a/kube-core/src/dynamic.rs +++ b/kube-core/src/dynamic.rs @@ -106,6 +106,10 @@ impl Resource for DynamicObject { fn meta_mut(&mut self) -> &mut ObjectMeta { &mut self.metadata } + + fn typemeta() -> Option<TypeMeta> { + None + } } #[cfg(test)] diff --git a/kube-core/src/lib.rs b/kube-core/src/lib.rs index 71b81b917..cb5f02815 100644 --- a/kube-core/src/lib.rs +++ b/kube-core/src/lib.rs @@ -28,7 +28,7 @@ pub mod gvk; pub use gvk::{GroupVersion, GroupVersionKind, GroupVersionResource}; pub mod metadata; -pub use metadata::{ListMeta, ObjectMeta, TypeMeta}; +pub use metadata::{Inspect, ListMeta, ObjectMeta, TypeMeta}; pub mod object; pub use object::{NotUsed, Object, ObjectList}; diff --git a/kube-core/src/metadata.rs b/kube-core/src/metadata.rs index aa7becbcd..57113a8c4 100644 --- a/kube-core/src/metadata.rs +++ b/kube-core/src/metadata.rs @@ -1,10 +1,8 @@ //! Metadata structs used in traits, lists, and dynamic objects. -use std::borrow::Cow; - +use crate::{ApiResource, DynamicObject, DynamicResourceScope, Object, Resource}; pub use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ListMeta, ObjectMeta}; use serde::{Deserialize, Serialize}; - -use crate::{ApiResource, DynamicResourceScope, Resource}; +use std::borrow::Cow; /// Type information that is flattened into every kubernetes object #[derive(Deserialize, Serialize, Clone, Default, Debug, Eq, PartialEq, Hash)] @@ -65,4 +63,127 @@ impl Resource for PartialObjectMeta { fn meta_mut(&mut self) -> &mut ObjectMeta { &mut self.metadata } + + fn typemeta() -> Option<TypeMeta> { + None + } +} + + +/// A runtime accessor trait for `TypeMeta` +/// +/// This trait is a runtime subset of the `Resource` trait that can read the object directly. +/// It **cannot** retrieve the plural, **nor** the scope of a resource (which requires an `ApiResource`). +pub trait Inspect { + /// Get the `TypeMeta` of an object + /// + /// This is a safe `TypeMeta` getter for all object types + /// While it is generally safe to unwrap this option, do note that a few endpoints can elide it. + fn types(&self) -> Option<TypeMeta>; + + /// Get the `TypeMeta` of an object that is guaranteed to have it + /// + /// Returns `TypeMeta` when exists, panics otherwise + fn types_unchecked(&self) -> TypeMeta; + + /// Get the `kind` of an object + fn kind(&self) -> Option<Cow<'_, str>>; + /// Get the `apiVersion` of any object + fn api_version(&self) -> Option<Cow<'_, str>>; + /// Get a reference to the `ObjectMeta` of an object + fn meta(&self) -> &ObjectMeta; + /// Get a mutable reference to the `ObjectMeta` of an Object + fn meta_mut(&mut self) -> &mut ObjectMeta; +} + +// lean on static info on k8s_openapi generated types (safer than runtime lookups) +impl<K> Inspect for K +where + K: k8s_openapi::Resource, + K: k8s_openapi::Metadata<Ty = ObjectMeta>, +{ + fn types(&self) -> Option<TypeMeta> { + Some(self.types_unchecked()) + } + + fn types_unchecked(&self) -> TypeMeta { + TypeMeta { + api_version: K::API_VERSION.into(), + kind: K::KIND.into(), + } + } + + fn kind(&self) -> Option<Cow<'_, str>> { + Some(K::KIND.into()) + } + + fn api_version(&self) -> Option<Cow<'_, str>> { + Some(K::API_VERSION.into()) + } + + fn meta(&self) -> &ObjectMeta { + self.metadata() + } + + fn meta_mut(&mut self) -> &mut ObjectMeta { + self.metadata_mut() + } +} + +// always lookup from object in dynamic cases +impl<P, U> Inspect for Object<P, U> +where + P: Clone, + U: Clone, +{ + fn types(&self) -> Option<TypeMeta> { + self.types.clone() + } + + fn types_unchecked(&self) -> TypeMeta { + self.types.clone().unwrap() + } + + fn kind(&self) -> Option<Cow<'_, str>> { + self.types.as_ref().map(|t| Cow::Borrowed(t.kind.as_ref())) + } + + fn api_version(&self) -> Option<Cow<'_, str>> { + self.types.as_ref().map(|t| Cow::Borrowed(t.api_version.as_ref())) + } + + fn meta(&self) -> &ObjectMeta { + &self.metadata + } + + fn meta_mut(&mut self) -> &mut ObjectMeta { + &mut self.metadata + } +} + +// always lookup from object in dynamic cases +impl Inspect for DynamicObject { + fn types(&self) -> Option<TypeMeta> { + self.types.clone() + } + + fn types_unchecked(&self) -> TypeMeta { + self.types.clone().unwrap() + } + + fn kind(&self) -> Option<Cow<'_, str>> { + self.types.as_ref().map(|t| Cow::Borrowed(t.kind.as_ref())) + } + + fn api_version(&self) -> Option<Cow<'_, str>> { + self.types.as_ref().map(|t| Cow::Borrowed(t.api_version.as_ref())) + } + + fn meta(&self) -> &ObjectMeta { + &self.metadata + } + + fn meta_mut(&mut self) -> &mut ObjectMeta { + &mut self.metadata + } } diff --git a/kube-core/src/object.rs b/kube-core/src/object.rs index ab75b1335..e44a37423 100644 --- a/kube-core/src/object.rs +++ b/kube-core/src/object.rs @@ -245,6 +245,10 @@ where fn meta_mut(&mut self) -> &mut ObjectMeta { &mut self.metadata } + + fn typemeta() -> Option<TypeMeta> { + None + } } impl<P, U> HasSpec for Object<P, U> diff --git a/kube-core/src/resource.rs b/kube-core/src/resource.rs index 6ae3500ed..67fe7aee5 100644 --- a/kube-core/src/resource.rs +++ b/kube-core/src/resource.rs @@ -1,3 +1,4 @@ +use crate::TypeMeta; pub use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use k8s_openapi::{ api::core::v1::ObjectReference, @@ -39,13 +40,13 @@ pub trait Resource { /// Dynamic types should select `Scope = DynamicResourceScope` type Scope; - /// Returns kind of this object + /// Returns the static kind of this Resource fn kind(dt: &Self::DynamicType) -> Cow<'_, str>; - /// Returns group of this object + /// Returns the static group of this Resource fn group(dt: &Self::DynamicType) -> Cow<'_, str>; - /// Returns version of this object + /// Returns the static version of this Resource fn version(dt: &Self::DynamicType) -> Cow<'_, str>; - /// Returns apiVersion of this object + /// Returns the static apiVersion of this Resource fn api_version(dt: &Self::DynamicType) -> Cow<'_, str> { let group = Self::group(dt); if group.is_empty() { @@ -56,7 +57,7 @@ pub trait Resource { group.push_str(&Self::version(dt)); group.into() } - /// Returns the plural name of the kind + /// Returns the static plural name of this Resource /// /// This is known as the resource in apimachinery, we rename it for disambiguation. fn plural(dt: &Self::DynamicType) -> Cow<'_, str>; @@ -113,6 +114,12 @@ pub trait Resource { ..OwnerReference::default() }) } + + /// Return `TypeMeta` of a Resource where it can be statically determined + /// + /// This is only possible on static types. + /// Dynamic types need to find these via discovery or through the `Inspect` trait. + fn typemeta() -> Option<TypeMeta>; } /// Implement accessor trait for any ObjectMeta-using Kubernetes Resource @@ -151,6 +158,13 @@ where fn meta_mut(&mut self) -> &mut ObjectMeta { self.metadata_mut() } + + fn typemeta() -> Option<TypeMeta> { + Some(TypeMeta { + api_version: K::API_VERSION.into(), + kind: K::KIND.into(), + }) + } } /// Helper methods for resources. diff --git a/kube-derive/src/custom_resource.rs b/kube-derive/src/custom_resource.rs index 78e78510d..1d069bc22 100644 --- a/kube-derive/src/custom_resource.rs +++ b/kube-derive/src/custom_resource.rs @@ -320,10 +320,50 @@ pub(crate) fn derive(input: proc_macro2::TokenStream) -> proc_macro2::TokenStrea fn meta_mut(&mut self) -> &mut #k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta { &mut self.metadata } + + fn typemeta() -> Option<#kube_core::TypeMeta> { + Some(#kube_core::TypeMeta { + api_version: #api_ver.into(), + kind: #kind.into(), + }) + } + } + }; + + // 3. implement typeinfo trait + let impl_typeinfo = quote! { + impl #kube_core::Inspect for #rootident { + + fn types(&self) -> Option<#kube_core::TypeMeta> { + Some(#kube_core::TypeMeta { + api_version: #api_ver.into(), + kind: #kind.into(), + }) + } + + fn types_unchecked(&self) -> #kube_core::TypeMeta { + self.types().unwrap() + } + + fn kind(&self) -> Option<std::borrow::Cow<'_, str>> { + Some(#kind.into()) + } + + fn api_version(&self) -> Option<std::borrow::Cow<'_, str>> { + Some(#api_ver.into()) + } + + fn meta(&self) -> &#k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta { + &self.metadata + } + + fn meta_mut(&mut self) -> &mut #k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta { + &mut self.metadata + } } }; - // 3. Implement Default if requested + // 4. Implement Default if requested let impl_default = if has_default { quote! { impl Default for #rootident { @@ -340,7 +380,7 @@ pub(crate) fn derive(input: proc_macro2::TokenStream) -> proc_macro2::TokenStrea quote! {} }; - // 4. Implement CustomResource + // 5. Implement CustomResource // Compute a bunch of crd props let printers = format!("[ {} ]", printcolums.join(",")); // hacksss @@ -468,6 +508,7 @@ pub(crate) fn derive(input: proc_macro2::TokenStream) -> proc_macro2::TokenStrea quote! { #root_obj #impl_resource + #impl_typeinfo #impl_default #impl_crd #impl_hasspec diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 21ec5f9d9..49bbe4b62 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -18,7 +18,10 @@ use futures::{ future::{self, BoxFuture}, ready, stream, Future, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt, }; -use kube_client::api::{Api, DynamicObject, ListParams, Resource}; +use kube_client::{ + api::{Api, ListParams, Resource}, + core::Inspect, +}; use pin_project::pin_project; use serde::de::DeserializeOwned; use std::{ @@ -39,9 +42,9 @@ mod runner; #[derive(Debug, Error)] pub enum Error<ReconcilerErr: 'static, QueueErr: 'static> { #[error("tried to reconcile object {0} that was not found in local store")] - ObjectNotFound(ObjectRef<DynamicObject>), + ObjectNotFound(ObjectRef), #[error("reconciler for object {1} failed")] - ReconcilerFailed(#[source] ReconcilerErr, ObjectRef<DynamicObject>), + ReconcilerFailed(#[source] ReconcilerErr, ObjectRef), #[error("event queue error")] QueueError(#[source] QueueErr), } @@ -85,15 +88,14 @@ impl Action { } /// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples. -pub fn trigger_with<T, K, I, S>( +pub fn trigger_with<T, I, S>( stream: S, mapper: impl Fn(T) -> I, -) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>> +) -> impl Stream<Item = Result<ReconcileRequest, S::Error>> where S: TryStream<Ok = T>, I: IntoIterator, - I::Item: Into<ReconcileRequest<K>>, - K: Resource, + I::Item: Into<ReconcileRequest>, { stream .map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Into::into).map(Ok))) @@ -101,18 +103,14 @@ where } /// Enqueues the object itself for reconciliation -pub fn trigger_self<K, S>( - stream: S, - dyntype: K::DynamicType, -) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>> +pub fn trigger_self<K, S>(stream: S) -> impl Stream<Item = Result<ReconcileRequest, S::Error>> where S: TryStream<Ok = K>, - K: Resource, - K::DynamicType: Clone, + K: Inspect, { trigger_with(stream, move |obj| { Some(ReconcileRequest { - obj_ref: ObjectRef::from_obj_with(&obj, dyntype.clone()), + obj_ref: ObjectRef::from_obj(&obj), reason: ReconcileReason::ObjectUpdated, }) }) @@ -122,24 +120,27 @@ where pub fn trigger_owners<KOwner, S>( stream: S, owner_type: KOwner::DynamicType, - child_type: <S::Ok as Resource>::DynamicType, -) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>> +) -> impl Stream<Item = Result<ReconcileRequest, S::Error>> where S: TryStream, - S::Ok: Resource, - <S::Ok as Resource>::DynamicType: Clone, + S::Ok: Resource + Inspect, KOwner: Resource, KOwner::DynamicType: Clone, { trigger_with(stream, move |obj| { - let meta = obj.meta().clone(); + let meta = Inspect::meta(&obj).clone(); let ns = meta.namespace; let owner_type = owner_type.clone(); - let child_ref = ObjectRef::from_obj_with(&obj, child_type.clone()).erase(); + let child_ref = ObjectRef::from_obj(&obj); meta.owner_references .into_iter() .flatten() - .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone())) + // Filter out ownerrefs not matching the owning GVK + .filter(move |owner| { + owner.api_version == <KOwner as Resource>::api_version(&owner_type) + && owner.kind == <KOwner as Resource>::kind(&owner_type) + }) + .map(move |owner| ObjectRef::from_owner(&owner).with_namespace(ns.as_deref())) .map(move |owner_ref| ReconcileRequest { obj_ref: owner_ref, reason: ReconcileReason::RelatedObjectUpdated { @@ -155,21 +156,16 @@ where /// an object can only occupy one scheduler slot, even if it has been scheduled for multiple reasons. /// In this case, only *the first* reason is stored. #[derive(Derivative)] -#[derivative( - Debug(bound = "K::DynamicType: Debug"), - Clone(bound = "K::DynamicType: Clone"), - PartialEq(bound = "K::DynamicType: PartialEq"), - Eq(bound = "K::DynamicType: Eq"), - Hash(bound = "K::DynamicType: Hash") -)] -pub struct ReconcileRequest<K: Resource> { - pub obj_ref: ObjectRef<K>, +#[derivative(Clone, PartialEq, Debug, Hash, Eq)] +pub struct ReconcileRequest { + pub obj_ref: ObjectRef, #[derivative(PartialEq = "ignore", Hash = "ignore")] pub reason: ReconcileReason, } -impl<K: Resource> From<ObjectRef<K>> for ReconcileRequest<K> { - fn from(obj_ref: ObjectRef<K>) -> Self { +// ..that means this would have to get extra data somehow +impl From<ObjectRef> for ReconcileRequest { + fn from(obj_ref: ObjectRef) -> Self { ReconcileRequest { obj_ref, reason: ReconcileReason::Unknown, @@ -181,7 +177,7 @@ impl<K: Resource> From<ObjectRef<K>> for ReconcileRequest<K> { pub enum ReconcileReason { Unknown, ObjectUpdated, - RelatedObjectUpdated { obj_ref: Box<ObjectRef<DynamicObject>> }, + RelatedObjectUpdated { obj_ref: Box<ObjectRef> }, ReconcilerRequestedRetry, ErrorPolicyRequestedRetry, BulkReconcile, @@ -223,19 +219,19 @@ pub fn applier<K, QueueStream, ReconcilerFut, Ctx>( context: Arc<Ctx>, store: Store<K>, queue: QueueStream, -) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>> +) -> impl Stream<Item = Result<(ObjectRef, Action), Error<ReconcilerFut::Error, QueueStream::Error>>> where - K: Clone + Resource + 'static, + K: Clone + Resource + Inspect + 'static, K::DynamicType: Debug + Eq + Hash + Clone + Unpin, ReconcilerFut: TryFuture<Ok = Action> + Unpin, ReconcilerFut::Error: std::error::Error + 'static, QueueStream: TryStream, - QueueStream::Ok: Into<ReconcileRequest<K>>, + QueueStream::Ok: Into<ReconcileRequest>, QueueStream::Error: std::error::Error + 'static, { let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel(); let (scheduler_tx, scheduler_rx) = - channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE); + channel::mpsc::channel::<ScheduleRequest<ReconcileRequest>>(APPLIER_REQUEUE_BUF_SIZE); let error_policy = Arc::new(error_policy); // Create a stream of ObjectRefs that need to be reconciled trystream_try_via( @@ -291,7 +287,7 @@ where .instrument(reconciler_span) .left_future() } - None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(), + None => future::err(Error::ObjectNotFound(request.obj_ref)).right_future(), } }) .on_complete(async { tracing::debug!("applier runner terminated") }) @@ -302,7 +298,7 @@ where .and_then(move |(obj_ref, reconciler_result)| async move { match reconciler_result { Ok(action) => Ok((obj_ref, action)), - Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase())), + Err(err) => Err(Error::ReconcilerFailed(err, obj_ref)), } }) .on_complete(async { tracing::debug!("applier terminated") }) @@ -313,22 +309,19 @@ where /// This could be an `async fn`, but isn't because we want it to be [`Unpin`] #[pin_project] #[must_use] -struct RescheduleReconciliation<K: Resource, ReconcilerErr> { - reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>, +struct RescheduleReconciliation<ReconcilerErr> { + reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest>>, - reschedule_request: Option<ScheduleRequest<ReconcileRequest<K>>>, + reschedule_request: Option<ScheduleRequest<ReconcileRequest>>, result: Option<Result<Action, ReconcilerErr>>, } -impl<K, ReconcilerErr> RescheduleReconciliation<K, ReconcilerErr> -where - K: Resource, -{ +impl<ReconcilerErr> RescheduleReconciliation<ReconcilerErr> { fn new( result: Result<Action, ReconcilerErr>, error_policy: impl FnOnce(&ReconcilerErr) -> Action, - obj_ref: ObjectRef<K>, - reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>, + obj_ref: ObjectRef, + reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest>>, ) -> Self { let reconciler_finished_at = Instant::now(); @@ -351,10 +344,7 @@ where } } -impl<K, ReconcilerErr> Future for RescheduleReconciliation<K, ReconcilerErr> -where - K: Resource, -{ +impl<ReconcilerErr> Future for RescheduleReconciliation<ReconcilerErr> { type Output = Result<Action, ReconcilerErr>; fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { @@ -449,11 +439,11 @@ where /// ``` pub struct Controller<K> where - K: Clone + Resource + Debug + 'static, + K: Clone + Resource + Inspect + Debug + 'static, K::DynamicType: Eq + Hash, { // NB: Need to Unpin for stream::select_all - trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>, + trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest, watcher::Error>>>, trigger_backoff: Box<dyn Backoff + Send>, /// [`run`](crate::Controller::run) starts a graceful shutdown when any of these [`Future`]s complete, /// refusing to start any new reconciliations but letting any existing ones finish. @@ -469,7 +459,7 @@ where impl<K> Controller<K> where - K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static, + K: Clone + Resource + Inspect + DeserializeOwned + Debug + Send + Sync + 'static, K::DynamicType: Eq + Hash + Clone, { /// Create a Controller on a type `K` @@ -502,14 +492,10 @@ where /// [`dynamic`]: kube_client::core::dynamic /// [`ListParams::default`]: kube_client::api::ListParams::default pub fn new_with(owned_api: Api<K>, lp: ListParams, dyntype: K::DynamicType) -> Self { - let writer = Writer::<K>::new(dyntype.clone()); + let writer = Writer::<K>::default(); let reader = writer.as_reader(); let mut trigger_selector = stream::SelectAll::new(); - let self_watcher = trigger_self( - reflector(writer, watcher(owned_api, lp)).applied_objects(), - dyntype.clone(), - ) - .boxed(); + let self_watcher = trigger_self(reflector(writer, watcher(owned_api, lp)).applied_objects()).boxed(); trigger_selector.push(self_watcher); Self { trigger_selector, @@ -555,28 +541,12 @@ where /// /// [`OwnerReference`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference #[must_use] - pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>( - self, - api: Api<Child>, - lp: ListParams, - ) -> Self { - self.owns_with(api, (), lp) - } - - /// Specify `Child` objects which `K` owns and should be watched - /// - /// Same as [`Controller::owns`], but accepts a `DynamicType` so it can be used with dynamic resources. - #[must_use] - pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>( + pub fn owns<Child: Clone + Resource + Inspect + DeserializeOwned + Debug + Send + 'static>( mut self, api: Api<Child>, - dyntype: Child::DynamicType, lp: ListParams, - ) -> Self - where - Child::DynamicType: Debug + Eq + Hash + Clone, - { - let child_watcher = trigger_owners(watcher(api, lp).touched_objects(), self.dyntype.clone(), dyntype); + ) -> Self { + let child_watcher = trigger_owners::<K, _>(watcher(api, lp).touched_objects(), self.dyntype.clone()); self.trigger_selector.push(child_watcher.boxed()); self } @@ -584,7 +554,7 @@ where /// Specify `Watched` object which `K` has a custom relation to and should be watched /// /// To define the `Watched` relation with `K`, you **must** define a custom relation mapper, which, - /// when given a `Watched` object, returns an option or iterator of relevant `ObjectRef<K>` to reconcile. + /// when given a `Watched` object, returns an option or iterator of relevant `ObjectRef` to reconcile. /// /// If the relation `K` has to `Watched` is that `K` owns `Watched`, consider using [`Controller::owns`]. /// @@ -634,8 +604,8 @@ where /// .annotations() /// .get("operator-sdk/primary-resource")? /// .split_once('/')?; - /// - /// Some(ObjectRef::new(name).within(namespace)) + /// + /// Some(ObjectRef::from_resource::<WatchedResource>(name).within(namespace)) /// } /// ) /// .run(reconcile, error_policy, context) @@ -648,40 +618,19 @@ where /// [Operator-SDK]: https://sdk.operatorframework.io/docs/building-operators/ansible/reference/retroactively-owned-resources/ #[must_use] pub fn watches< - Other: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static, - I: 'static + IntoIterator<Item = ObjectRef<K>>, - >( - self, - api: Api<Other>, - lp: ListParams, - mapper: impl Fn(Other) -> I + Sync + Send + 'static, - ) -> Self - where - I::IntoIter: Send, - { - self.watches_with(api, (), lp, mapper) - } - - /// Specify `Watched` object which `K` has a custom relation to and should be watched - /// - /// Same as [`Controller::watches`], but accepts a `DynamicType` so it can be used with dynamic resources. - #[must_use] - pub fn watches_with< - Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, - I: 'static + IntoIterator<Item = ObjectRef<K>>, + Other: Clone + Resource + Inspect + DeserializeOwned + Debug + Send + 'static, + I: 'static + IntoIterator<Item = ObjectRef>, >( mut self, api: Api<Other>, - dyntype: Other::DynamicType, lp: ListParams, mapper: impl Fn(Other) -> I + Sync + Send + 'static, ) -> Self where I::IntoIter: Send, - Other::DynamicType: Clone, { let other_watcher = trigger_with(watcher(api, lp).touched_objects(), move |obj| { - let watched_obj_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase(); + let watched_obj_ref = ObjectRef::from_obj(&obj); mapper(obj) .into_iter() .map(move |mapped_obj_ref| ReconcileRequest { @@ -741,14 +690,12 @@ where #[must_use] pub fn reconcile_all_on(mut self, trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self { let store = self.store(); - let dyntype = self.dyntype.clone(); self.trigger_selector.push( trigger .flat_map(move |()| { - let dyntype = dyntype.clone(); stream::iter(store.state().into_iter().map(move |obj| { Ok(ReconcileRequest { - obj_ref: ObjectRef::from_obj_with(&*obj, dyntype.clone()), + obj_ref: ObjectRef::from_obj(&*obj), reason: ReconcileReason::BulkReconcile, }) })) @@ -864,7 +811,7 @@ where mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut, error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action, context: Arc<Ctx>, - ) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>> + ) -> impl Stream<Item = Result<(ObjectRef, Action), Error<ReconcilerFut::Error, watcher::Error>>> where K::DynamicType: Debug + Unpin, ReconcilerFut: TryFuture<Ok = Action> + Send + 'static, @@ -899,7 +846,10 @@ mod tests { }; use futures::{pin_mut, StreamExt, TryStreamExt}; use k8s_openapi::api::core::v1::ConfigMap; - use kube_client::{core::ObjectMeta, Api}; + use kube_client::{ + core::{ObjectMeta, ResourceExt}, + Api, + }; use tokio::time::timeout; fn assert_send<T: Send>(x: T) -> T { @@ -935,13 +885,13 @@ mod tests { // Assume that everything's OK if we can reconcile every object 3 times on average let reconciles = items * 3; - let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>(); + let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef>(); let (store_rx, mut store_tx) = reflector::store(); let applier = applier( - |obj, _| { + |obj: Arc<ConfigMap>, _| { Box::pin(async move { // Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately - println!("reconciling {:?}", obj.metadata.name); + println!("reconciling {:?}", obj.name_any()); Ok(Action::requeue(Duration::ZERO)) }) }, diff --git a/kube-runtime/src/reflector/mod.rs b/kube-runtime/src/reflector/mod.rs index 8932456c5..66600f200 100644 --- a/kube-runtime/src/reflector/mod.rs +++ b/kube-runtime/src/reflector/mod.rs @@ -6,7 +6,7 @@ pub mod store; pub use self::object_ref::{Extra as ObjectRefExtra, ObjectRef}; use crate::watcher; use futures::{Stream, TryStreamExt}; -use kube_client::Resource; +use kube_client::core::{Inspect, Resource}; use std::hash::Hash; pub use store::{store, Store}; @@ -18,7 +18,7 @@ pub use store::{store, Store}; /// the whole `Store` will be cleared whenever any of them emits a `Restarted` event. pub fn reflector<K, W>(mut writer: store::Writer<K>, stream: W) -> impl Stream<Item = W::Item> where - K: Resource + Clone, + K: Resource + Inspect + Clone, K::DynamicType: Eq + Hash + Clone, W: Stream<Item = watcher::Result<watcher::Event<K>>>, { diff --git a/kube-runtime/src/reflector/object_ref.rs b/kube-runtime/src/reflector/object_ref.rs index cc2049ca1..3dcf1e403 100644 --- a/kube-runtime/src/reflector/object_ref.rs +++ b/kube-runtime/src/reflector/object_ref.rs @@ -1,54 +1,56 @@ use derivative::Derivative; use k8s_openapi::{api::core::v1::ObjectReference, apimachinery::pkg::apis::meta::v1::OwnerReference}; -use kube_client::{ - api::{DynamicObject, Resource}, - core::ObjectMeta, - ResourceExt, -}; -use std::{ - fmt::{Debug, Display}, - hash::Hash, -}; +use kube_client::core::{Inspect, ObjectMeta, Resource, TypeMeta}; +use std::fmt::{Debug, Display}; +use thiserror::Error; #[derive(Derivative)] -#[derivative( - Debug(bound = "K::DynamicType: Debug"), - PartialEq(bound = "K::DynamicType: PartialEq"), - Eq(bound = "K::DynamicType: Eq"), - Hash(bound = "K::DynamicType: Hash"), - Clone(bound = "K::DynamicType: Clone") -)] -/// A typed and namedspaced (if relevant) reference to a Kubernetes object +#[derivative(Debug, PartialEq, Eq, Hash, Clone)] + +/// A dynamically typed reference to an object along with its namespace /// -/// `K` may be either the object type or `DynamicObject`, in which case the -/// type is stored at runtime. Erased `ObjectRef`s pointing to different types -/// are still considered different. +/// Intended to be constructed from one of three sources: +/// 1. an object returned by the apiserver through the `Inspect` trait +/// 2. an `OwnerReference` found on an object returned by the apiserver +/// 3. a type implementing `Inspect` but with only a `name` pointing to the type /// /// ``` -/// use kube_runtime::reflector::ObjectRef; -/// use k8s_openapi::api::core::v1::{ConfigMap, Secret}; -/// assert_ne!( -/// ObjectRef::<ConfigMap>::new("a").erase(), -/// ObjectRef::<Secret>::new("a").erase(), +/// use kube_client::core::Resource; +/// use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference; +/// # use k8s_openapi::api::core::v1::Pod; +/// # use kube_runtime::reflector::ObjectRef; +/// let mut pod = Pod::default(); +/// pod.meta_mut().name = Some("foo".into()); +/// let oref = OwnerReference { +/// api_version: "v1".into(), +/// kind: "Pod".into(), +/// name: "foo".into(), +/// ..OwnerReference::default() +/// }; +/// assert_eq!( +/// ObjectRef::from_obj(&pod).within("ns"), +/// ObjectRef::from_owner(&oref).within("ns"), +/// ); +/// assert_eq!( +/// ObjectRef::from_obj(&pod).within("ns"), +/// ObjectRef::from_resource::<Pod>("foo").within("ns") /// ); /// ``` #[non_exhaustive] -pub struct ObjectRef<K: Resource> { - pub dyntype: K::DynamicType, +pub struct ObjectRef { /// The name of the object pub name: String, /// The namespace of the object /// /// May only be `None` if the kind is cluster-scoped (not located in a namespace). - /// Note that it *is* acceptable for an `ObjectRef` to a cluster-scoped resource to - /// have a namespace. These are, however, not considered equal: /// - /// ``` - /// # use kube_runtime::reflector::ObjectRef; - /// # use k8s_openapi::api::core::v1::ConfigMap; - /// assert_ne!(ObjectRef::<ConfigMap>::new("foo"), ObjectRef::new("foo").within("bar")); - /// ``` + /// When constructing an `ObjectRef` be sure to either: + /// + /// 1. supply a **known** namespace using `ObjectRef::within` + /// 2. supply an **optional** namespace using `ObjectRef::with_namespace` (when being generic over kinds) pub namespace: Option<String>, + /// The TypeMeta of the object + pub types: Option<TypeMeta>, /// Extra information about the object being referred to /// /// This is *not* considered when comparing objects, but may be used when converting to and from other representations, @@ -69,31 +71,43 @@ pub struct Extra { pub uid: Option<String>, } -impl<K: Resource> ObjectRef<K> -where - K::DynamicType: Default, -{ +impl ObjectRef { + /// Creates an `ObjectRef` from an object implementing `Inspect` #[must_use] - pub fn new(name: &str) -> Self { - Self::new_with(name, Default::default()) + pub fn from_obj<K: Inspect>(obj: &K) -> Self { + let meta = obj.meta(); + Self { + name: meta.name.clone().unwrap_or_default(), + namespace: meta.namespace.clone(), + types: obj.types(), + extra: Extra::from_objectmeta(meta), + } } + /// Creates an `ObjectRef` from an `OwnerReference` #[must_use] - pub fn from_obj(obj: &K) -> Self - where - K: Resource, - { - Self::from_obj_with(obj, Default::default()) + pub fn from_owner(owner: &OwnerReference) -> Self { + Self { + name: owner.name.clone(), + namespace: None, + types: Some(TypeMeta { + api_version: owner.api_version.clone(), + kind: owner.kind.clone(), + }), + extra: Extra { + resource_version: None, + uid: Some(owner.uid.clone()), + }, + } } -} -impl<K: Resource> ObjectRef<K> { + /// Creates an `ObjectRef` from a `Resource` along with name #[must_use] - pub fn new_with(name: &str, dyntype: K::DynamicType) -> Self { + pub fn from_resource<K: Resource>(name: &str) -> Self { Self { - dyntype, - name: name.into(), + name: name.to_string(), namespace: None, + types: K::typemeta(), extra: Extra::default(), } } @@ -104,102 +118,62 @@ impl<K: Resource> ObjectRef<K> { self } - /// Creates `ObjectRef` from the resource and dynamic type. - #[must_use] - pub fn from_obj_with(obj: &K, dyntype: K::DynamicType) -> Self - where - K: Resource, - { - let meta = obj.meta(); - Self { - dyntype, - name: obj.name_unchecked(), - namespace: meta.namespace.clone(), - extra: Extra::from_obj_meta(meta), - } - } - - /// Create an `ObjectRef` from an `OwnerReference` - /// - /// Returns `None` if the types do not match. #[must_use] - pub fn from_owner_ref( - namespace: Option<&str>, - owner: &OwnerReference, - dyntype: K::DynamicType, - ) -> Option<Self> { - if owner.api_version == K::api_version(&dyntype) && owner.kind == K::kind(&dyntype) { - Some(Self { - dyntype, - name: owner.name.clone(), - namespace: namespace.map(String::from), - extra: Extra { - resource_version: None, - uid: Some(owner.uid.clone()), - }, - }) - } else { - None - } + pub fn with_namespace(mut self, namespace: Option<&str>) -> Self { + self.namespace = namespace.map(String::from); + self } - /// Convert into a reference to `K2` - /// - /// Note that no checking is done on whether this conversion makes sense. For example, every `Service` - /// has a corresponding `Endpoints`, but it wouldn't make sense to convert a `Pod` into a `Deployment`. #[must_use] - pub fn into_kind_unchecked<K2: Resource>(self, dt2: K2::DynamicType) -> ObjectRef<K2> { - ObjectRef { - dyntype: dt2, - name: self.name, - namespace: self.namespace, - extra: self.extra, - } - } - - pub fn erase(self) -> ObjectRef<DynamicObject> { - ObjectRef { - dyntype: kube_client::api::ApiResource::erase::<K>(&self.dyntype), - name: self.name, - namespace: self.namespace, - extra: self.extra, - } + pub fn with_types(mut self, types: &TypeMeta) -> Self { + self.types = Some(types.clone()); + self } } -impl<K: Resource> From<ObjectRef<K>> for ObjectReference { - fn from(val: ObjectRef<K>) -> Self { - let ObjectRef { - dyntype: dt, - name, - namespace, - extra: Extra { +#[derive(Debug, Error)] +#[error("missing type information from ObjectRef")] +/// Source does not have `TypeMeta` +pub struct MissingTypeInfo; + + +impl TryFrom<ObjectRef> for ObjectReference { + type Error = MissingTypeInfo; + + fn try_from(val: ObjectRef) -> Result<Self, Self::Error> { + if let Some(t) = &val.types { + let ObjectRef { + name, + namespace, + extra: + Extra { + resource_version, + uid, + }, + .. + } = val; + Ok(ObjectReference { + api_version: Some(t.api_version.clone()), + kind: Some(t.kind.clone()), + field_path: None, + name: Some(name), + namespace, resource_version, uid, - }, - } = val; - ObjectReference { - api_version: Some(K::api_version(&dt).into_owned()), - kind: Some(K::kind(&dt).into_owned()), - field_path: None, - name: Some(name), - namespace, - resource_version, - uid, + }) + } else { + Err(MissingTypeInfo) } } } -impl<K: Resource> Display for ObjectRef<K> { +impl Display for ObjectRef { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}.{}.{}/{}", - K::kind(&self.dyntype), - K::version(&self.dyntype), - K::group(&self.dyntype), - self.name - )?; + if let Some(tm) = &self.types { + write!(f, "{}.{}/{}", tm.kind, tm.api_version, self.name)?; + } else { + write!(f, "unknown/{}", self.name)?; + } if let Some(namespace) = &self.namespace { write!(f, ".{namespace}")?; } @@ -208,10 +182,10 @@ impl<K: Resource> Display for ObjectRef<K> { } impl Extra { - fn from_obj_meta(obj_meta: &ObjectMeta) -> Self { + fn from_objectmeta(meta: &ObjectMeta) -> Self { Self { - resource_version: obj_meta.resource_version.clone(), - uid: obj_meta.uid.clone(), + resource_version: meta.resource_version.clone(), + uid: meta.uid.clone(), } } } @@ -224,43 +198,23 @@ mod tests { }; use super::{Extra, ObjectRef}; - use k8s_openapi::api::{ - apps::v1::Deployment, - core::v1::{Node, Pod}, - }; + use k8s_openapi::api::{apps::v1::Deployment, core::v1::Pod}; + use kube_client::Resource; #[test] fn display_should_follow_expected_format() { - assert_eq!( - format!("{}", ObjectRef::<Pod>::new("my-pod").within("my-namespace")), - "Pod.v1./my-pod.my-namespace" - ); - assert_eq!( - format!( - "{}", - ObjectRef::<Deployment>::new("my-deploy").within("my-namespace") - ), - "Deployment.v1.apps/my-deploy.my-namespace" - ); - assert_eq!( - format!("{}", ObjectRef::<Node>::new("my-node")), - "Node.v1./my-node" - ); - } - - #[test] - fn display_should_be_transparent_to_representation() { - let pod_ref = ObjectRef::<Pod>::new("my-pod").within("my-namespace"); - assert_eq!(format!("{pod_ref}"), format!("{}", pod_ref.erase())); - let deploy_ref = ObjectRef::<Deployment>::new("my-deploy").within("my-namespace"); - assert_eq!(format!("{deploy_ref}"), format!("{}", deploy_ref.erase())); - let node_ref = ObjectRef::<Node>::new("my-node"); - assert_eq!(format!("{node_ref}"), format!("{}", node_ref.erase())); + let pod = ObjectRef::from_resource::<Pod>("my-pod").within("my-ns"); + assert_eq!(format!("{}", pod), "Pod.v1/my-pod.my-ns"); + let deploy = ObjectRef::from_resource::<Deployment>("my-dep").within("my-ns"); + assert_eq!(format!("{}", deploy), "Deployment.apps/v1/my-dep.my-ns"); } #[test] fn comparison_should_ignore_extra() { - let minimal = ObjectRef::<Pod>::new("my-pod").within("my-namespace"); + let mut pod = Pod::default(); + pod.meta_mut().name = Some("my-pod".into()); + pod.meta_mut().namespace = Some("my-namespace".into()); + let minimal = ObjectRef::from_obj(&pod); let with_extra = ObjectRef { extra: Extra { resource_version: Some("123".to_string()), @@ -273,7 +227,7 @@ mod tests { assert_eq!(minimal, with_extra); // Hash should be unaffected by the contents of `extra` - let hash_value = |value: &ObjectRef<Pod>| { + let hash_value = |value: &ObjectRef| { let mut hasher = DefaultHasher::new(); value.hash(&mut hasher); hasher.finish() diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index 49032934c..fdc617060 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -2,38 +2,28 @@ use super::ObjectRef; use crate::watcher; use ahash::AHashMap; use derivative::Derivative; -use kube_client::Resource; +use kube_client::{core::Inspect, Resource}; use parking_lot::RwLock; use std::{fmt::Debug, hash::Hash, sync::Arc}; -type Cache<K> = Arc<RwLock<AHashMap<ObjectRef<K>, Arc<K>>>>; +type Cache<K> = Arc<RwLock<AHashMap<ObjectRef, Arc<K>>>>; /// A writable Store handle /// /// This is exclusive since it's not safe to share a single `Store` between multiple reflectors. /// In particular, `Restarted` events will clobber the state of other connected reflectors. #[derive(Debug, Derivative)] -#[derivative(Default(bound = "K::DynamicType: Default"))] -pub struct Writer<K: 'static + Resource> -where - K::DynamicType: Eq + Hash, -{ +#[derivative(Default(bound = ""))] +pub struct Writer<K: 'static + Resource> { store: Cache<K>, - dyntype: K::DynamicType, } -impl<K: 'static + Resource + Clone> Writer<K> -where - K::DynamicType: Eq + Hash + Clone, -{ +impl<K: 'static + Resource + Inspect + Clone> Writer<K> { /// Creates a new Writer with the specified dynamic type. - /// - /// If the dynamic type is default-able (for example when writer is used with - /// `k8s_openapi` types) you can use `Default` instead. - pub fn new(dyntype: K::DynamicType) -> Self { + #[must_use] + pub fn new() -> Self { Writer { store: Default::default(), - dyntype, } } @@ -52,23 +42,18 @@ where pub fn apply_watcher_event(&mut self, event: &watcher::Event<K>) { match event { watcher::Event::Applied(obj) => { - let key = ObjectRef::from_obj_with(obj, self.dyntype.clone()); + let key = ObjectRef::from_obj(obj); let obj = Arc::new(obj.clone()); self.store.write().insert(key, obj); } watcher::Event::Deleted(obj) => { - let key = ObjectRef::from_obj_with(obj, self.dyntype.clone()); + let key = ObjectRef::from_obj(obj); self.store.write().remove(&key); } watcher::Event::Restarted(new_objs) => { let new_objs = new_objs .iter() - .map(|obj| { - ( - ObjectRef::from_obj_with(obj, self.dyntype.clone()), - Arc::new(obj.clone()), - ) - }) + .map(|obj| (ObjectRef::from_obj(obj), Arc::new(obj.clone()))) .collect::<AHashMap<_, _>>(); *self.store.write() = new_objs; } @@ -82,16 +67,14 @@ where /// /// Cannot be constructed directly since one writer handle is required, /// use `Writer::as_reader()` instead. + #[derive(Derivative)] -#[derivative(Debug(bound = "K: Debug, K::DynamicType: Debug"), Clone)] -pub struct Store<K: 'static + Resource> -where - K::DynamicType: Hash + Eq, -{ +#[derivative(Debug(bound = "K: Debug"), Clone)] +pub struct Store<K: 'static + Resource> { store: Cache<K>, } -impl<K: 'static + Clone + Resource> Store<K> +impl<K: 'static + Clone + Resource + Inspect> Store<K> where K::DynamicType: Eq + Hash + Clone, { @@ -105,7 +88,7 @@ where /// If you use `kube_rt::controller` then you can do this by returning an error and specifying a /// reasonable `error_policy`. #[must_use] - pub fn get(&self, key: &ObjectRef<K>) -> Option<Arc<K>> { + pub fn get(&self, key: &ObjectRef) -> Option<Arc<K>> { let store = self.store.read(); store .get(key) @@ -162,8 +145,7 @@ where #[must_use] pub fn store<K>() -> (Store<K>, Writer<K>) where - K: Resource + Clone + 'static, - K::DynamicType: Eq + Hash + Clone + Default, + K: Resource + Inspect + Clone + 'static, { let w = Writer::<K>::default(); let r = w.as_reader();