Skip to content

Commit 1e105ee

Browse files
committed
make env-vars version-specific
1 parent 5b772cc commit 1e105ee

4 files changed

Lines changed: 103 additions & 46 deletions

File tree

rust/operator-binary/src/airflow_controller.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -982,6 +982,7 @@ fn build_server_rolegroup_statefulset(
982982
authorization_config,
983983
git_sync_resources,
984984
&rolegroup_ref.role_group,
985+
resolved_product_image,
985986
)
986987
.context(BuildStatefulsetEnvVarsSnafu)?,
987988
);
@@ -1268,6 +1269,7 @@ fn build_executor_template_config_map(
12681269
env_overrides,
12691270
merged_executor_config,
12701271
git_sync_resources,
1272+
resolved_product_image,
12711273
))
12721274
.add_volume_mounts(airflow.volume_mounts())
12731275
.context(AddVolumeMountSnafu)?

rust/operator-binary/src/env_vars.rs

Lines changed: 78 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use product_config::types::PropertyNameKind;
99
use rand::Rng;
1010
use snafu::Snafu;
1111
use stackable_operator::{
12+
commons::product_image_selection::ResolvedProductImage,
1213
crd::{authentication::oidc, git_sync},
1314
k8s_openapi::api::core::v1::EnvVar,
1415
kube::ResourceExt,
@@ -85,11 +86,45 @@ pub fn build_airflow_statefulset_envs(
8586
authorization_config: &AirflowAuthorizationResolved,
8687
git_sync_resources: &git_sync::v1alpha1::GitSyncResources,
8788
rolegroup: &String,
89+
resolved_product_image: &ResolvedProductImage,
8890
) -> Result<Vec<EnvVar>, Error> {
8991
let mut env: BTreeMap<String, EnvVar> = BTreeMap::new();
9092

9193
env.extend(static_envs(git_sync_resources));
92-
env.extend(execution_server_env_vars(airflow, rolegroup));
94+
95+
if resolved_product_image.product_version.starts_with("3.") {
96+
env.extend(execution_server_env_vars(airflow, rolegroup));
97+
env.insert(AIRFLOW_CORE_AUTH_MANAGER.into(), EnvVar {
98+
name: AIRFLOW_CORE_AUTH_MANAGER.into(),
99+
value: Some(
100+
"airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager".to_string(),
101+
),
102+
..Default::default()
103+
});
104+
env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar {
105+
name: AIRFLOW_API_AUTH_BACKENDS.into(),
106+
value: Some("airflow.api.auth.backend.session".into()),
107+
..Default::default()
108+
});
109+
// As of 3.x a JWT key is required.
110+
// See https://airflow.apache.org/docs/apache-airflow/3.0.1/configurations-ref.html#jwt-secret
111+
// This must be random, but must also be consistent across api-services.
112+
// The key will be consistent for all clusters started by this
113+
// operator instance. TODO: Make this cluster specific.
114+
env.insert("AIRFLOW__API_AUTH__JWT_SECRET".into(), EnvVar {
115+
name: "AIRFLOW__API_AUTH__JWT_SECRET".into(),
116+
value: Some(JWT_KEY.clone()),
117+
..Default::default()
118+
});
119+
} else {
120+
env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar {
121+
name: AIRFLOW_API_AUTH_BACKENDS.into(),
122+
value: Some(
123+
"airflow.api.auth.backend.basic_auth, airflow.api.auth.backend.session".into(),
124+
),
125+
..Default::default()
126+
});
127+
}
93128

94129
// environment variables
95130
let env_vars = rolegroup_config.get(&PropertyNameKind::Env);
@@ -306,33 +341,6 @@ fn static_envs(
306341
..Default::default()
307342
});
308343

309-
env.insert(AIRFLOW_CORE_AUTH_MANAGER.into(), EnvVar {
310-
name: AIRFLOW_CORE_AUTH_MANAGER.into(),
311-
value: Some(
312-
"airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager".to_string(),
313-
),
314-
..Default::default()
315-
});
316-
317-
// Basic auth is only relevant to 2.x and can be removed once
318-
// that version is no longer supported.
319-
env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar {
320-
name: AIRFLOW_API_AUTH_BACKENDS.into(),
321-
value: Some("airflow.api.auth.backend.basic_auth, airflow.api.auth.backend.session".into()),
322-
..Default::default()
323-
});
324-
325-
// As of 3.x a JWT key is required.
326-
// See https://airflow.apache.org/docs/apache-airflow/3.0.1/configurations-ref.html#jwt-secret
327-
// This must be random, but must also be consistent across api-services.
328-
// The key will be consistent for all clusters started by this
329-
// operator instance. TODO: Make this cluster specific.
330-
env.insert("AIRFLOW__API_AUTH__JWT_SECRET".into(), EnvVar {
331-
name: "AIRFLOW__API_AUTH__JWT_SECRET".into(),
332-
value: Some(JWT_KEY.clone()),
333-
..Default::default()
334-
});
335-
336344
env
337345
}
338346

@@ -343,6 +351,7 @@ pub fn build_airflow_template_envs(
343351
env_overrides: &HashMap<String, String>,
344352
config: &ExecutorConfig,
345353
git_sync_resources: &git_sync::v1alpha1::GitSyncResources,
354+
resolved_product_image: &ResolvedProductImage,
346355
) -> Vec<EnvVar> {
347356
let mut env: BTreeMap<String, EnvVar> = BTreeMap::new();
348357
let secret = airflow.spec.cluster_config.credentials_secret.as_str();
@@ -379,14 +388,47 @@ pub fn build_airflow_template_envs(
379388

380389
env.extend(static_envs(git_sync_resources));
381390

382-
// It does not appear to be possible for kubernetesExecutors to work with
383-
// multiple webserver rolegroups. For the celery case, each executor sets
384-
// the execution server from the associated rolegroup, but for kubernetes
385-
// workers this is not possible.
386-
if let Some(webserver_role) = airflow.spec.webservers.as_ref() {
387-
if let Some(rolegroup) = webserver_role.role_groups.iter().next() {
388-
env.extend(execution_server_env_vars(airflow, rolegroup.0));
391+
if resolved_product_image.product_version.starts_with("3.") {
392+
// It does not appear to be possible for kubernetesExecutors to work with
393+
// multiple webserver rolegroups. For the celery case, each executor sets
394+
// the execution server from the associated rolegroup, but for kubernetes
395+
// workers this is not possible.
396+
if let Some(webserver_role) = airflow.spec.webservers.as_ref() {
397+
if let Some(rolegroup) = webserver_role.role_groups.iter().next() {
398+
env.extend(execution_server_env_vars(airflow, rolegroup.0));
399+
}
400+
env.insert(AIRFLOW_CORE_AUTH_MANAGER.into(), EnvVar {
401+
name: AIRFLOW_CORE_AUTH_MANAGER.into(),
402+
value: Some(
403+
"airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager"
404+
.to_string(),
405+
),
406+
..Default::default()
407+
});
408+
env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar {
409+
name: AIRFLOW_API_AUTH_BACKENDS.into(),
410+
value: Some("airflow.api.auth.backend.session".into()),
411+
..Default::default()
412+
});
413+
// As of 3.x a JWT key is required.
414+
// See https://airflow.apache.org/docs/apache-airflow/3.0.1/configurations-ref.html#jwt-secret
415+
// This must be random, but must also be consistent across api-services.
416+
// The key will be consistent for all clusters started by this
417+
// operator instance. TODO: Make this cluster specific.
418+
env.insert("AIRFLOW__API_AUTH__JWT_SECRET".into(), EnvVar {
419+
name: "AIRFLOW__API_AUTH__JWT_SECRET".into(),
420+
value: Some(JWT_KEY.clone()),
421+
..Default::default()
422+
});
389423
}
424+
} else {
425+
env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar {
426+
name: AIRFLOW_API_AUTH_BACKENDS.into(),
427+
value: Some(
428+
"airflow.api.auth.backend.basic_auth, airflow.api.auth.backend.session".into(),
429+
),
430+
..Default::default()
431+
});
390432
}
391433

392434
// _STACKABLE_POST_HOOK will contain a command to create a shutdown hook that will be
@@ -473,7 +515,7 @@ fn execution_server_env_vars(
473515
let mut env: BTreeMap<String, EnvVar> = BTreeMap::new();
474516

475517
if let Some(name) = airflow.metadata.name.as_ref() {
476-
let webserver = format!("{name}-webserver-{rolegroup}-metrics",);
518+
let webserver = format!("{name}-webserver-{rolegroup}",);
477519
tracing::debug!("Webserver set [{webserver}]");
478520

479521
// These settings are new in 3.x and will have no affect with earlier versions.

tests/templates/kuttl/external-access/40-assert.yaml.j2

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ spec:
2222
spec:
2323
terminationGracePeriodSeconds: 120
2424
status:
25-
readyReplicas: 2
26-
replicas: 2
25+
readyReplicas: 1
26+
replicas: 1
2727
---
2828
apiVersion: apps/v1
2929
kind: StatefulSet
@@ -58,8 +58,8 @@ kind: PodDisruptionBudget
5858
metadata:
5959
name: airflow-webserver
6060
status:
61-
expectedPods: 4
62-
currentHealthy: 4
61+
expectedPods: 3
62+
currentHealthy: 3
6363
disruptionsAllowed: 1
6464
---
6565
apiVersion: policy/v1
@@ -103,15 +103,15 @@ spec:
103103
spec:
104104
terminationGracePeriodSeconds: 300
105105
status:
106-
readyReplicas: 2
107-
replicas: 2
106+
readyReplicas: 1
107+
replicas: 1
108108
---
109109
apiVersion: policy/v1
110110
kind: PodDisruptionBudget
111111
metadata:
112112
name: airflow-worker
113113
status:
114-
expectedPods: 2
115-
currentHealthy: 2
114+
expectedPods: 1
115+
currentHealthy: 1
116116
disruptionsAllowed: 1
117117
{% endif %}

tests/templates/kuttl/external-access/install-airflow-cluster.yaml.j2

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,15 @@ spec:
3434
webservers:
3535
config:
3636
listenerClass: test-external-stable-$NAMESPACE
37+
resources:
38+
cpu:
39+
min: 1000m
40+
max: 2000m
41+
memory:
42+
limit: 2Gi
3743
roleGroups:
3844
default:
39-
replicas: 2
45+
replicas: 1
4046
external-unstable:
4147
replicas: 1
4248
config:
@@ -49,7 +55,7 @@ spec:
4955
celeryExecutors:
5056
roleGroups:
5157
default:
52-
replicas: 2
58+
replicas: 1
5359
{% elif test_scenario['values']['executor'] == 'kubernetes' %}
5460
kubernetesExecutors:
5561
config:
@@ -61,6 +67,13 @@ spec:
6167
limit: 1Gi
6268
{% endif %}
6369
schedulers:
70+
config:
71+
resources:
72+
cpu:
73+
min: 1000m
74+
max: 2000m
75+
memory:
76+
limit: 1Gi
6477
roleGroups:
6578
default:
6679
replicas: 1

0 commit comments

Comments
 (0)