Skip to content

Commit f172fff

Browse files
authored
feat: Move the finalization handling out of the Controller (#81)
BREAKING CHANGE: This means that adding and removing finalizers is now optional
1 parent 0b454af commit f172fff

File tree

5 files changed

+172
-73
lines changed

5 files changed

+172
-73
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ chrono = "0.4"
1010
const_format = "0.2"
1111
either = "1"
1212
futures = "0.3"
13+
json-patch = "0.2"
1314
k8s-openapi = { version = "0.11", default-features = false }
14-
kube = { version = "0.49", default-features = false }
15+
kube = { version = "0.49", default-features = false, features = ["jsonpatch"] }
1516
kube-runtime = "0.49"
1617
lazy_static = "1"
1718
regex = "1"

src/client.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,20 @@ impl Client {
124124
.await
125125
}
126126

127+
/// Patches a resource using the `JSON` patch strategy described in [JavaScript Object Notation (JSON) Patch](https://tools.ietf.org/html/rfc6902).
128+
pub async fn json_patch<T>(&self, resource: &T, patch: json_patch::Patch) -> OperatorResult<T>
129+
where
130+
T: Clone + DeserializeOwned + Meta,
131+
{
132+
// The `()` type is not used. I need to provide _some_ type just to get it to compile.
133+
// But the type is not used _at all_ for the `Json` variant so I'd argue it's okay to
134+
// provide any type here.
135+
// This is definitely a hack though but there is currently no better way.
136+
// See also: https://github.com/clux/kube-rs/pull/456
137+
let patch = Patch::Json::<()>(patch);
138+
self.patch(resource, patch, &self.patch_params).await
139+
}
140+
127141
async fn patch<T, P>(
128142
&self,
129143
resource: &T,

src/controller.rs

Lines changed: 23 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,14 @@
111111
//! let controller = Controller::new(pods_api);
112112
//!
113113
//! let strategy = FooStrategy {};
114-
//! controller.run(client, strategy).await;
114+
//! controller.run(client, strategy, Duration::from_secs(10)).await;
115115
//! }
116116
//! ```
117117
//!
118118
use crate::client::Client;
119-
use crate::error::{Error, OperatorResult};
119+
use crate::error::Error;
120+
use crate::reconcile;
120121
use crate::reconcile::{ReconcileFunctionAction, ReconciliationContext};
121-
use crate::{finalizer, reconcile};
122122

123123
use async_trait::async_trait;
124124
use futures::StreamExt;
@@ -131,7 +131,7 @@ use std::fmt::{Debug, Display};
131131
use std::future::Future;
132132
use std::pin::Pin;
133133
use std::time::Duration;
134-
use tracing::{debug, error, info, trace, Instrument};
134+
use tracing::{debug, error, trace, Instrument};
135135
use uuid::Uuid;
136136

137137
/// Every operator needs to provide an implementation of this trait as it provides the operator specific business logic.
@@ -239,14 +239,23 @@ where
239239
self
240240
}
241241

242-
/// Call this method once your Controller object is fully configured.
243-
/// It'll start talking to Kubernetes and will call the `Strategy` implementation.
244-
pub async fn run<S>(self, client: Client, strategy: S)
242+
/// Call this method once your Controller object is fully configured to start the reconciliation.
243+
///
244+
/// # Arguments
245+
///
246+
/// * `client` - The Client to access Kubernetes
247+
/// * `strategy` - This implements the domain/business logic and the framework will call its methods for each reconcile operation
248+
/// * `requeue_timeout` - Whenever a `Requeue` is returned this is the timeout/duration after which the same object will be requeued
249+
pub async fn run<S>(self, client: Client, strategy: S, requeue_timeout: Duration)
245250
where
246251
S: ControllerStrategy<Item = T> + Send + Sync + 'static,
247252
S::State: Send,
248253
{
249-
let context = Context::new(ControllerContext { client, strategy });
254+
let context = Context::new(ControllerContext {
255+
client,
256+
strategy,
257+
requeue_timeout,
258+
});
250259

251260
self.kube_controller
252261
.run(reconcile, error_policy, context)
@@ -285,6 +294,7 @@ where
285294
{
286295
client: Client,
287296
strategy: S,
297+
requeue_timeout: Duration,
288298
}
289299

290300
/// This method contains the logic of reconciling an object (the desired state) we received with the actual state.
@@ -307,18 +317,13 @@ where
307317
debug!(?resource, "Beginning reconciliation");
308318
let context = context.get_ref();
309319

310-
let client = &context.client;
311320
let strategy = &context.strategy;
312321

313-
if handle_deletion(&resource, client.clone(), &strategy.finalizer_name()).await?
314-
== ReconcileFunctionAction::Done
315-
{
316-
return Ok(reconcile::create_non_requeuing_reconciler_action());
317-
}
318-
319-
add_finalizer(&resource, client.clone(), &strategy.finalizer_name()).await?;
320-
321-
let rc = ReconciliationContext::new(context.client.clone(), resource.clone());
322+
let rc = ReconciliationContext::new(
323+
context.client.clone(),
324+
resource.clone(),
325+
context.requeue_timeout,
326+
);
322327

323328
let mut state = match strategy.init_reconcile_state(rc).in_current_span().await {
324329
Ok(state) => state,
@@ -373,38 +378,3 @@ where
373378
trace!(%err, "Reconciliation error, calling strategy error_policy");
374379
context.get_ref().strategy.error_policy()
375380
}
376-
377-
async fn handle_deletion<T>(
378-
resource: &T,
379-
client: Client,
380-
finalizer_name: &str,
381-
) -> OperatorResult<ReconcileFunctionAction>
382-
where
383-
T: Clone + DeserializeOwned + Meta + Send + Sync + 'static,
384-
{
385-
trace!("[handle_deletion] Begin");
386-
if !finalizer::has_deletion_stamp(resource) {
387-
debug!("Resource not deleted, continuing",);
388-
return Ok(ReconcileFunctionAction::Continue);
389-
}
390-
391-
info!("Removing finalizer [{}]", finalizer_name,);
392-
finalizer::remove_finalizer(client, resource, finalizer_name).await?;
393-
394-
Ok(ReconcileFunctionAction::Requeue(Duration::from_secs(10)))
395-
}
396-
397-
async fn add_finalizer<T>(resource: &T, client: Client, finalizer_name: &str) -> OperatorResult<()>
398-
where
399-
T: Clone + Debug + DeserializeOwned + Meta + Send + Sync + 'static,
400-
{
401-
trace!("[add_finalizer] Begin");
402-
403-
if finalizer::has_finalizer(resource, finalizer_name) {
404-
debug!("Finalizer already exists, continuing...",);
405-
} else {
406-
debug!("Finalizer missing, adding now and continuing...",);
407-
finalizer::add_finalizer(client, resource, finalizer_name).await?;
408-
}
409-
Ok(())
410-
}

src/finalizer.rs

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
use crate::client::Client;
22
use crate::error::{Error, OperatorResult};
33

4-
use k8s_openapi::Resource;
4+
use json_patch::{PatchOperation, RemoveOperation, TestOperation};
55
use kube::api::Meta;
66
use serde::de::DeserializeOwned;
77
use serde_json::json;
8+
use tracing::debug;
89

910
/// Checks whether our own finalizer is in the list of finalizers for the provided object.
1011
pub fn has_finalizer<T>(resource: &T, finalizer: &str) -> bool
@@ -17,25 +18,46 @@ where
1718
};
1819
}
1920

20-
/// Adds our finalizer to the list of finalizers.
21-
pub async fn add_finalizer<T>(client: Client, resource: &T, finalizer: &str) -> OperatorResult<T>
21+
/// This will add the passed finalizer to the list of finalizers for the resource if it doesn't exist yet
22+
/// and will update the resource in Kubernetes.
23+
///
24+
/// It'll return `true` if we changed the object in Kubernetes and `false` if no modification was needed.
25+
/// If the object is currently being deleted this _will_ return an Error!
26+
pub async fn add_finalizer<T>(
27+
client: &Client,
28+
resource: &T,
29+
finalizer: &str,
30+
) -> OperatorResult<bool>
2231
where
23-
T: Resource + Clone + Meta + DeserializeOwned,
32+
T: Clone + Meta + DeserializeOwned,
2433
{
34+
if has_finalizer(resource, finalizer) {
35+
debug!("Finalizer [{}] already exists, continuing...", finalizer);
36+
37+
return Ok(false);
38+
}
39+
2540
let new_metadata = json!({
2641
"metadata": {
2742
"finalizers": [finalizer.to_string()]
2843
}
2944
});
30-
client.merge_patch(resource, new_metadata).await
45+
client.merge_patch(resource, new_metadata).await?;
46+
Ok(true)
3147
}
3248

3349
/// Removes our finalizer from a resource object.
3450
///
3551
/// # Arguments
36-
/// `name` - is the name of the resource we want to patch
37-
/// `namespace` is the namespace of where the resource to patch lives
38-
pub async fn remove_finalizer<T>(client: Client, resource: &T, finalizer: &str) -> OperatorResult<T>
52+
///
53+
/// * `client` - The Client to access Kubernetes
54+
/// * `resource` - is the resource we want to remove the finalizer from
55+
/// * `finalizer` - this is the actual finalizer string that we want to remove
56+
pub async fn remove_finalizer<T>(
57+
client: &Client,
58+
resource: &T,
59+
finalizer: &str,
60+
) -> OperatorResult<T>
3961
where
4062
T: Clone + DeserializeOwned + Meta,
4163
{
@@ -51,7 +73,7 @@ where
5173
None => Err(Error::MissingObjectKey {
5274
key: ".metadata.finalizers",
5375
}),
54-
Some(mut finalizers) => {
76+
Some(finalizers) => {
5577
let index = finalizers
5678
.iter()
5779
.position(|cur_finalizer| cur_finalizer == finalizer);
@@ -60,14 +82,18 @@ where
6082
// We found our finalizer which means that we now need to handle our deletion logic
6183
// And then remove the finalizer from the list.
6284

63-
finalizers.swap_remove(index);
64-
let new_metadata = json!({
65-
"metadata": {
66-
"finalizers": finalizers
67-
}
68-
});
85+
let finalizer_path = format!("/metadata/finalizers/{}", index);
86+
let patch = json_patch::Patch(vec![
87+
PatchOperation::Test(TestOperation {
88+
path: finalizer_path.clone(),
89+
value: finalizer.into(),
90+
}),
91+
PatchOperation::Remove(RemoveOperation {
92+
path: finalizer_path,
93+
}),
94+
]);
6995

70-
client.merge_patch(resource, new_metadata).await
96+
client.json_patch(resource, patch).await
7197
} else {
7298
Err(Error::MissingObjectKey {
7399
key: ".metadata.finalizers",

src/reconcile.rs

Lines changed: 91 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::client::Client;
22
use crate::error::{Error, OperatorResult};
3-
use crate::{conditions, controller_ref, podutils};
3+
use crate::{conditions, controller_ref, finalizer, podutils};
44

55
use crate::conditions::ConditionStatus;
66
use k8s_openapi::api::core::v1::Pod;
@@ -9,7 +9,9 @@ use kube::api::{ListParams, Meta, ObjectMeta};
99
use kube_runtime::controller::ReconcilerAction;
1010
use serde::de::DeserializeOwned;
1111
use std::future::Future;
12+
use std::pin::Pin;
1213
use std::time::Duration;
14+
use tracing::{debug, info};
1315

1416
pub type ReconcileResult<E> = std::result::Result<ReconcileFunctionAction, E>;
1517

@@ -58,11 +60,20 @@ pub fn create_requeuing_reconcile_function_action(secs: u64) -> ReconcileFunctio
5860
pub struct ReconciliationContext<T> {
5961
pub client: Client,
6062
pub resource: T,
63+
pub requeue_timeout: Duration,
6164
}
6265

6366
impl<T> ReconciliationContext<T> {
64-
pub fn new(client: Client, resource: T) -> Self {
65-
ReconciliationContext { client, resource }
67+
pub fn new(client: Client, resource: T, requeue_timeout: Duration) -> Self {
68+
ReconciliationContext {
69+
client,
70+
resource,
71+
requeue_timeout,
72+
}
73+
}
74+
75+
fn requeue(&self) -> ReconcileFunctionAction {
76+
ReconcileFunctionAction::Requeue(self.requeue_timeout)
6677
}
6778
}
6879

@@ -122,6 +133,63 @@ where
122133
})
123134
}
124135

136+
/// This reconcile function can be added to the chain to automatically handle deleted objects
137+
/// using finalizers.
138+
///
139+
/// It'll add a finalizer to the object if it's not there yet, if the `deletion_timestamp` is set
140+
/// it'll call the provided handler function and it'll remove the finalizer if the handler completes
141+
/// with a `Done` result.
142+
///
143+
/// If the object is not deleted this function will return a `Continue` event.
144+
///
145+
/// # Arguments
146+
///
147+
/// * `handler` - This future will be completed if the object has been marked for deletion
148+
/// * `finalizer` - The finalizer to add and/or check for
149+
/// * `requeue_if_changed` - If this is `true` we'll return a `Requeue` immediately if we had to
150+
/// change the resource due to the addition of the finalizer
151+
pub async fn handle_deletion(
152+
&self,
153+
handler: Pin<Box<dyn Future<Output = Result<ReconcileFunctionAction, Error>> + Send + '_>>,
154+
finalizer: &str,
155+
requeue_if_changed: bool,
156+
) -> ReconcileResult<Error>
157+
where
158+
T: Clone + DeserializeOwned + Meta + Send + Sync + 'static,
159+
{
160+
let being_deleted = finalizer::has_deletion_stamp(&self.resource);
161+
162+
// Try to add a finalizer but only if the deletion_timestamp is not already set
163+
// Kubernetes forbids setting new finalizers on objects under deletion and will return this error:
164+
// Forbidden: no new finalizers can be added if the object is being deleted, found new finalizers []string{\"foo\"}
165+
if !being_deleted
166+
&& finalizer::add_finalizer(&self.client, &self.resource, finalizer).await?
167+
&& requeue_if_changed
168+
{
169+
return Ok(self.requeue());
170+
}
171+
172+
if !being_deleted {
173+
debug!("Resource not deleted, continuing",);
174+
return Ok(ReconcileFunctionAction::Continue);
175+
}
176+
177+
if !finalizer::has_finalizer(&self.resource, finalizer) {
178+
debug!("Resource being deleted but our finalizer is already gone, there might be others but we're done here!");
179+
return Ok(ReconcileFunctionAction::Done);
180+
}
181+
182+
match handler.await? {
183+
ReconcileFunctionAction::Continue => Ok(ReconcileFunctionAction::Continue),
184+
ReconcileFunctionAction::Done => {
185+
info!("Removing finalizer [{}]", finalizer,);
186+
finalizer::remove_finalizer(&self.client, &self.resource, finalizer).await?;
187+
Ok(ReconcileFunctionAction::Done)
188+
}
189+
ReconcileFunctionAction::Requeue(_) => Ok(self.requeue()),
190+
}
191+
}
192+
125193
/// Creates a new [`Condition`] for the `resource` this context contains.
126194
///
127195
/// It's a convenience function that passes through all parameters and builds a `Condition`
@@ -173,6 +241,26 @@ where
173241
);
174242
self.set_condition(condition).await
175243
}
244+
245+
/// A reconciler function to add to our finalizer to the list of finalizers.
246+
/// It is a wrapper around [`finalizer::add_finalizer`].
247+
///
248+
/// It can return `Continue` or `Requeue` depending on the `requeue` argument and the state of the resource.
249+
/// If the finalizer already exists it'll _always_ return `Continue`.
250+
///
251+
/// There is a more full-featured alternative to this function ([`handle_deletion`]).
252+
///
253+
/// # Arguments
254+
///
255+
/// * `finalizer` - The finalizer to add
256+
/// * `requeue` - If `true` this function will return `Requeue` if the object was changed (i.e. the finalizer was added) otherwise it'll return `Continue`
257+
pub async fn add_finalizer(&self, finalizer: &str, requeue: bool) -> ReconcileResult<Error> {
258+
if finalizer::add_finalizer(&self.client, &self.resource, finalizer).await? && requeue {
259+
Ok(self.requeue())
260+
} else {
261+
Ok(ReconcileFunctionAction::Continue)
262+
}
263+
}
176264
}
177265

178266
/// This returns `false` for Pods that have no OwnerReference (with a Controller flag)

0 commit comments

Comments
 (0)