Skip to content

Commit b052a9b

Browse files
adwk67claude
andcommitted
refactor: extract validate_cluster into controller::validate module
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6fe6753 commit b052a9b

3 files changed

Lines changed: 162 additions & 136 deletions

File tree

rust/operator-binary/src/airflow_controller.rs

Lines changed: 10 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
use std::{
33
collections::{BTreeMap, BTreeSet, HashMap},
44
io::Write,
5-
str::FromStr,
65
sync::Arc,
76
};
87

@@ -61,8 +60,7 @@ use stackable_operator::{
6160
logging::controller::ReconcilerError,
6261
product_config_utils::{
6362
CONFIG_OVERRIDE_FILE_FOOTER_KEY, CONFIG_OVERRIDE_FILE_HEADER_KEY, env_vars_from,
64-
env_vars_from_rolegroup_config, transform_all_roles_to_config,
65-
validate_all_roles_and_groups_config,
63+
env_vars_from_rolegroup_config,
6664
},
6765
product_logging::{
6866
self,
@@ -77,7 +75,7 @@ use stackable_operator::{
7775
},
7876
utils::COMMON_BASH_TRAP_FUNCTIONS,
7977
};
80-
use strum::{EnumDiscriminants, IntoEnumIterator, IntoStaticStr};
78+
use strum::{EnumDiscriminants, IntoStaticStr};
8179

8280
use crate::{
8381
config::{self, PYTHON_IMPORTS},
@@ -119,33 +117,6 @@ pub struct Ctx {
119117
pub operator_environment: OperatorEnvironmentOptions,
120118
}
121119

122-
/// Per-role configuration extracted during validation.
123-
#[derive(Clone, Debug)]
124-
pub struct ValidatedRoleConfig {
125-
pub pdb: Option<stackable_operator::commons::pdb::PdbConfig>,
126-
pub listener_class: Option<String>,
127-
pub group_listener_name: Option<String>,
128-
}
129-
130-
/// Per-rolegroup configuration: the merged CRD config plus the product-config properties.
131-
#[derive(Clone, Debug)]
132-
pub struct ValidatedRoleGroupConfig {
133-
pub merged_config: AirflowConfig,
134-
pub product_config_properties: HashMap<PropertyNameKind, BTreeMap<String, String>>,
135-
}
136-
137-
pub use crate::controller::dereference::DereferencedObjects;
138-
139-
/// The validated cluster: proves that product-config validation and config merging
140-
/// succeeded for every role and role group before any resources are created.
141-
#[derive(Clone, Debug)]
142-
pub struct ValidatedAirflowCluster {
143-
pub image: ResolvedProductImage,
144-
pub role_groups: BTreeMap<AirflowRole, BTreeMap<String, ValidatedRoleGroupConfig>>,
145-
pub role_configs: BTreeMap<AirflowRole, ValidatedRoleConfig>,
146-
pub executor: AirflowExecutor,
147-
}
148-
149120
#[derive(Snafu, Debug, EnumDiscriminants)]
150121
#[strum_discriminants(derive(IntoStaticStr))]
151122
pub enum Error {
@@ -170,21 +141,11 @@ pub enum Error {
170141
rolegroup: RoleGroupRef<v1alpha2::AirflowCluster>,
171142
},
172143

173-
#[snafu(display("invalid product config"))]
174-
InvalidProductConfig {
175-
source: stackable_operator::product_config_utils::Error,
176-
},
177-
178144
#[snafu(display("object is missing metadata to build owner reference"))]
179145
ObjectMissingMetadataForOwnerRef {
180146
source: stackable_operator::builder::meta::Error,
181147
},
182148

183-
#[snafu(display("Failed to transform configs"))]
184-
ProductConfigTransform {
185-
source: stackable_operator::product_config_utils::Error,
186-
},
187-
188149
#[snafu(display("failed to patch service account"))]
189150
ApplyServiceAccount {
190151
source: stackable_operator::cluster_resources::Error,
@@ -215,12 +176,6 @@ pub enum Error {
215176
#[snafu(display("failed to resolve and merge config for role and role group"))]
216177
FailedToResolveConfig { source: crd::Error },
217178

218-
#[snafu(display("could not parse Airflow role [{role}]"))]
219-
UnidentifiedAirflowRole {
220-
source: strum::ParseError,
221-
role: String,
222-
},
223-
224179
#[snafu(display("invalid container name"))]
225180
InvalidContainerName {
226181
source: stackable_operator::builder::pod::container::Error,
@@ -258,6 +213,11 @@ pub enum Error {
258213
source: crate::controller::dereference::Error,
259214
},
260215

216+
#[snafu(display("failed to validate cluster configuration"))]
217+
Validate {
218+
source: crate::controller::validate::Error,
219+
},
220+
261221
#[snafu(display("pod template serialization"))]
262222
PodTemplateSerde { source: serde_yaml::Error },
263223

@@ -351,94 +311,6 @@ impl ReconcilerError for Error {
351311
}
352312
}
353313

354-
fn validate_cluster(
355-
airflow: &v1alpha2::AirflowCluster,
356-
dereferenced: &DereferencedObjects,
357-
product_config_manager: &ProductConfigManager,
358-
) -> Result<ValidatedAirflowCluster> {
359-
let mut roles = HashMap::new();
360-
361-
// if the kubernetes executor is specified there will be no worker role as the pods
362-
// are provisioned by airflow as defined by the task (default: one pod per task)
363-
for role in AirflowRole::iter() {
364-
if let Some(resolved_role) = airflow.get_role(&role) {
365-
roles.insert(
366-
role.to_string(),
367-
(
368-
vec![
369-
PropertyNameKind::Env,
370-
PropertyNameKind::File(AIRFLOW_CONFIG_FILENAME.into()),
371-
],
372-
resolved_role.clone(),
373-
),
374-
);
375-
}
376-
}
377-
378-
let role_config = transform_all_roles_to_config(airflow, &roles);
379-
let validated_role_config = validate_all_roles_and_groups_config(
380-
&dereferenced.resolved_product_image.product_version,
381-
&role_config.context(ProductConfigTransformSnafu)?,
382-
product_config_manager,
383-
false,
384-
false,
385-
)
386-
.context(InvalidProductConfigSnafu)?;
387-
388-
let mut role_groups = BTreeMap::new();
389-
let mut role_configs = BTreeMap::new();
390-
391-
for (role_name, rolegroup_configs) in validated_role_config.iter() {
392-
let airflow_role =
393-
AirflowRole::from_str(role_name).context(UnidentifiedAirflowRoleSnafu {
394-
role: role_name.to_string(),
395-
})?;
396-
397-
role_configs.insert(
398-
airflow_role.clone(),
399-
ValidatedRoleConfig {
400-
pdb: airflow
401-
.role_config(&airflow_role)
402-
.map(|rc| rc.pod_disruption_budget),
403-
listener_class: airflow_role
404-
.listener_class_name(airflow)
405-
.map(|s| s.to_string()),
406-
group_listener_name: airflow.group_listener_name(&airflow_role),
407-
},
408-
);
409-
410-
let mut group_configs = BTreeMap::new();
411-
for (rolegroup_name, rolegroup_config) in rolegroup_configs.iter() {
412-
let rolegroup_ref = RoleGroupRef {
413-
cluster: ObjectRef::from_obj(airflow),
414-
role: role_name.into(),
415-
role_group: rolegroup_name.into(),
416-
};
417-
418-
let merged_config = airflow
419-
.merged_config(&airflow_role, &rolegroup_ref)
420-
.context(FailedToResolveConfigSnafu)?;
421-
422-
group_configs.insert(
423-
rolegroup_name.clone(),
424-
ValidatedRoleGroupConfig {
425-
merged_config,
426-
product_config_properties: rolegroup_config.clone(),
427-
},
428-
);
429-
}
430-
431-
role_groups.insert(airflow_role, group_configs);
432-
}
433-
434-
Ok(ValidatedAirflowCluster {
435-
image: dereferenced.resolved_product_image.clone(),
436-
role_groups,
437-
role_configs,
438-
executor: airflow.spec.executor.clone(),
439-
})
440-
}
441-
442314
pub async fn reconcile_airflow(
443315
airflow: Arc<DeserializeGuard<v1alpha2::AirflowCluster>>,
444316
ctx: Arc<Ctx>,
@@ -503,7 +375,9 @@ pub async fn reconcile_airflow(
503375
None
504376
};
505377

506-
let validated = validate_cluster(airflow, &dereferenced, &ctx.product_config)?;
378+
let validated =
379+
crate::controller::validate::validate_cluster(airflow, &dereferenced, &ctx.product_config)
380+
.context(ValidateSnafu)?;
507381

508382
let mut cluster_resources = ClusterResources::new(
509383
APP_NAME,
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pub mod dereference;
2+
pub mod validate;
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
use std::{
2+
collections::{BTreeMap, HashMap},
3+
str::FromStr,
4+
};
5+
6+
use product_config::{ProductConfigManager, types::PropertyNameKind};
7+
use snafu::{ResultExt, Snafu};
8+
use stackable_operator::{
9+
commons::product_image_selection::ResolvedProductImage,
10+
product_config_utils::{transform_all_roles_to_config, validate_all_roles_and_groups_config},
11+
role_utils::RoleGroupRef,
12+
};
13+
use strum::IntoEnumIterator;
14+
15+
use super::dereference::DereferencedObjects;
16+
use crate::crd::{AIRFLOW_CONFIG_FILENAME, AirflowConfig, AirflowExecutor, AirflowRole, v1alpha2};
17+
18+
#[derive(Snafu, Debug)]
19+
pub enum Error {
20+
#[snafu(display("invalid product config"))]
21+
InvalidProductConfig {
22+
source: stackable_operator::product_config_utils::Error,
23+
},
24+
25+
#[snafu(display("Failed to transform configs"))]
26+
ProductConfigTransform {
27+
source: stackable_operator::product_config_utils::Error,
28+
},
29+
30+
#[snafu(display("failed to resolve and merge config for role and role group"))]
31+
FailedToResolveConfig { source: crate::crd::Error },
32+
33+
#[snafu(display("could not parse Airflow role [{role}]"))]
34+
UnidentifiedAirflowRole {
35+
source: strum::ParseError,
36+
role: String,
37+
},
38+
}
39+
40+
/// Per-role configuration extracted during validation.
41+
#[derive(Clone, Debug)]
42+
pub struct ValidatedRoleConfig {
43+
pub pdb: Option<stackable_operator::commons::pdb::PdbConfig>,
44+
pub listener_class: Option<String>,
45+
pub group_listener_name: Option<String>,
46+
}
47+
48+
/// Per-rolegroup configuration: the merged CRD config plus the product-config properties.
49+
#[derive(Clone, Debug)]
50+
pub struct ValidatedRoleGroupConfig {
51+
pub merged_config: AirflowConfig,
52+
pub product_config_properties: HashMap<PropertyNameKind, BTreeMap<String, String>>,
53+
}
54+
55+
/// The validated cluster: proves that product-config validation and config merging
56+
/// succeeded for every role and role group before any resources are created.
57+
#[derive(Clone, Debug)]
58+
pub struct ValidatedAirflowCluster {
59+
pub image: ResolvedProductImage,
60+
pub role_groups: BTreeMap<AirflowRole, BTreeMap<String, ValidatedRoleGroupConfig>>,
61+
pub role_configs: BTreeMap<AirflowRole, ValidatedRoleConfig>,
62+
pub executor: AirflowExecutor,
63+
}
64+
65+
pub fn validate_cluster(
66+
airflow: &v1alpha2::AirflowCluster,
67+
dereferenced: &DereferencedObjects,
68+
product_config_manager: &ProductConfigManager,
69+
) -> Result<ValidatedAirflowCluster, Error> {
70+
let mut roles = HashMap::new();
71+
72+
// if the kubernetes executor is specified there will be no worker role as the pods
73+
// are provisioned by airflow as defined by the task (default: one pod per task)
74+
for role in AirflowRole::iter() {
75+
if let Some(resolved_role) = airflow.get_role(&role) {
76+
roles.insert(
77+
role.to_string(),
78+
(
79+
vec![
80+
PropertyNameKind::Env,
81+
PropertyNameKind::File(AIRFLOW_CONFIG_FILENAME.into()),
82+
],
83+
resolved_role.clone(),
84+
),
85+
);
86+
}
87+
}
88+
89+
let role_config = transform_all_roles_to_config(airflow, &roles);
90+
let validated_role_config = validate_all_roles_and_groups_config(
91+
&dereferenced.resolved_product_image.product_version,
92+
&role_config.context(ProductConfigTransformSnafu)?,
93+
product_config_manager,
94+
false,
95+
false,
96+
)
97+
.context(InvalidProductConfigSnafu)?;
98+
99+
let mut role_groups = BTreeMap::new();
100+
let mut role_configs = BTreeMap::new();
101+
102+
for (role_name, rolegroup_configs) in validated_role_config.iter() {
103+
let airflow_role =
104+
AirflowRole::from_str(role_name).context(UnidentifiedAirflowRoleSnafu {
105+
role: role_name.to_string(),
106+
})?;
107+
108+
role_configs.insert(
109+
airflow_role.clone(),
110+
ValidatedRoleConfig {
111+
pdb: airflow
112+
.role_config(&airflow_role)
113+
.map(|rc| rc.pod_disruption_budget),
114+
listener_class: airflow_role
115+
.listener_class_name(airflow)
116+
.map(|s| s.to_string()),
117+
group_listener_name: airflow.group_listener_name(&airflow_role),
118+
},
119+
);
120+
121+
let mut group_configs = BTreeMap::new();
122+
for (rolegroup_name, rolegroup_config) in rolegroup_configs.iter() {
123+
let rolegroup_ref = RoleGroupRef {
124+
cluster: stackable_operator::kube::runtime::reflector::ObjectRef::from_obj(airflow),
125+
role: role_name.into(),
126+
role_group: rolegroup_name.into(),
127+
};
128+
129+
let merged_config = airflow
130+
.merged_config(&airflow_role, &rolegroup_ref)
131+
.context(FailedToResolveConfigSnafu)?;
132+
133+
group_configs.insert(
134+
rolegroup_name.clone(),
135+
ValidatedRoleGroupConfig {
136+
merged_config,
137+
product_config_properties: rolegroup_config.clone(),
138+
},
139+
);
140+
}
141+
142+
role_groups.insert(airflow_role, group_configs);
143+
}
144+
145+
Ok(ValidatedAirflowCluster {
146+
image: dereferenced.resolved_product_image.clone(),
147+
role_groups,
148+
role_configs,
149+
executor: airflow.spec.executor.clone(),
150+
})
151+
}

0 commit comments

Comments
 (0)