From f57633a5847ad6d2287cab3a19ff820294d510fe Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Mon, 5 May 2025 14:26:24 +0200 Subject: [PATCH 01/17] wip: working tests (except auth: opa, oidc) --- Cargo.lock | 3 + Cargo.nix | 12 ++ Cargo.toml | 3 + rust/operator-binary/Cargo.toml | 3 + rust/operator-binary/src/crd/mod.rs | 40 +++--- rust/operator-binary/src/env_vars.rs | 131 +++++++++++++++--- rust/operator-binary/src/product_logging.rs | 2 + tests/release.yaml | 4 +- tests/templates/kuttl/commons/health.py | 2 +- tests/templates/kuttl/commons/metrics.py | 43 ++++-- .../41-install-airflow-cluster.yaml.j2 | 16 ++- .../30-install-airflow-cluster.yaml.j2 | 2 +- .../30-install-airflow-cluster.yaml.j2 | 2 +- .../kuttl/mount-dags-gitsync/dag_metrics.py | 44 +++++- .../overrides/10-install-airflow.yaml.j2 | 1 + .../overrides/20-install-airflow2.yaml.j2 | 1 + ...ange-limit.yaml => 00-range-limit.yaml.j2} | 2 + .../smoke/40-install-airflow-cluster.yaml.j2 | 21 --- tests/templates/kuttl/smoke/41-assert.yaml | 4 - tests/templates/kuttl/smoke/42-assert.yaml | 8 -- tests/test-definition.yaml | 4 +- 21 files changed, 249 insertions(+), 99 deletions(-) rename tests/templates/kuttl/smoke/{00-range-limit.yaml => 00-range-limit.yaml.j2} (70%) delete mode 100644 tests/templates/kuttl/smoke/42-assert.yaml diff --git a/Cargo.lock b/Cargo.lock index 4127359d..c9c396a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2646,13 +2646,16 @@ name = "stackable-airflow-operator" version = "0.0.0-dev" dependencies = [ "anyhow", + "base64 0.22.1", "built", "clap", "const_format", "fnv", "futures 0.3.31", "indoc", + "lazy_static", "product-config", + "rand", "rstest", "serde", "serde_json", diff --git a/Cargo.nix b/Cargo.nix index 0b47c582..be7646db 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -8601,6 +8601,10 @@ rec { name = "anyhow"; packageId = "anyhow"; } + { + name = "base64"; + packageId = "base64 0.22.1"; + } { name = "clap"; packageId = "clap"; @@ -8622,10 +8626,18 @@ rec { name = "indoc"; packageId = "indoc"; } + { + name = "lazy_static"; + packageId = "lazy_static"; + } { name = "product-config"; packageId = "product-config"; } + { + name = "rand"; + packageId = "rand"; + } { name = "serde"; packageId = "serde"; diff --git a/Cargo.toml b/Cargo.toml index ab2e7d98..a3b0e6bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,12 +14,15 @@ product-config = { git = "https://github.com/stackabletech/product-config.git", stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", features = ["telemetry", "versioned"], tag = "stackable-operator-0.92.0" } anyhow = "1.0" +base64 = "0.22.1" built = { version = "0.7", features = ["chrono", "git2"] } clap = "4.5" const_format = "0.2" fnv = "1.0" futures = { version = "0.3", features = ["compat"] } indoc = "2.0" +lazy_static = "1.4" +rand = "0.8.5" rstest = "0.25" semver = "1.0" serde = { version = "1.0", features = ["derive"] } diff --git a/rust/operator-binary/Cargo.toml b/rust/operator-binary/Cargo.toml index 3edfae3e..cbcebd69 100644 --- a/rust/operator-binary/Cargo.toml +++ b/rust/operator-binary/Cargo.toml @@ -13,10 +13,13 @@ product-config.workspace = true stackable-operator.workspace = true anyhow.workspace = true +base64.workspace = true clap.workspace = true const_format.workspace = true fnv.workspace = true futures.workspace = true +lazy_static.workspace = true +rand.workspace = true serde.workspace = true serde_json.workspace = true serde_yaml.workspace = true diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index e435a812..6607dca3 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -329,11 +329,6 @@ impl v1alpha1::AirflowCluster { dags_git_sync.first() } - /// The name of the role-level load-balanced Kubernetes `Service` - pub fn node_role_service_name(&self) -> Option { - self.metadata.name.clone() - } - /// Retrieve and merge resource configs for role and role groups pub fn merged_config( &self, @@ -534,15 +529,16 @@ impl AirflowRole { command.extend(Self::authentication_start_commands(auth_config)); command.extend(vec![ "prepare_signal_handlers".to_string(), - format!("containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"), - "airflow webserver &".to_string(), + // format!("containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"), + //"airflow webserver &".to_string(), + "airflow db migrate".to_string(), + "airflow api-server &".to_string(), ]); } AirflowRole::Scheduler => command.extend(vec![ // Database initialization is limited to the scheduler, see https://github.com/stackabletech/airflow-operator/issues/259 - "airflow db init".to_string(), - "airflow db upgrade".to_string(), + "airflow db migrate".to_string(), "airflow users create \ --username \"$ADMIN_USERNAME\" \ --firstname \"$ADMIN_FIRSTNAME\" \ @@ -552,16 +548,18 @@ impl AirflowRole { --role \"Admin\"" .to_string(), "prepare_signal_handlers".to_string(), - format!( - "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" - ), + // format!( + // "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" + // ), + "airflow dag-processor &".to_string(), "airflow scheduler &".to_string(), ]), AirflowRole::Worker => command.extend(vec![ "prepare_signal_handlers".to_string(), - format!( - "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" - ), + // format!( + // "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" + // ), + "airflow db migrate".to_string(), "airflow celery worker &".to_string(), ]), } @@ -798,31 +796,31 @@ fn default_resources(role: &AirflowRole) -> ResourcesFragment ( CpuLimitsFragment { - min: Some(Quantity("500m".into())), + min: Some(Quantity("1".into())), max: Some(Quantity("2".into())), }, MemoryLimitsFragment { - limit: Some(Quantity("2Gi".into())), + limit: Some(Quantity("4Gi".into())), runtime_limits: NoRuntimeLimitsFragment {}, }, ), AirflowRole::Webserver => ( CpuLimitsFragment { - min: Some(Quantity("500m".into())), + min: Some(Quantity("1".into())), max: Some(Quantity("2".into())), }, MemoryLimitsFragment { - limit: Some(Quantity("3Gi".into())), + limit: Some(Quantity("4Gi".into())), runtime_limits: NoRuntimeLimitsFragment {}, }, ), AirflowRole::Scheduler => ( CpuLimitsFragment { - min: Some(Quantity("500m".to_owned())), + min: Some(Quantity("1".to_owned())), max: Some(Quantity("2".to_owned())), }, MemoryLimitsFragment { - limit: Some(Quantity("512Mi".to_owned())), + limit: Some(Quantity("4Gi".to_owned())), runtime_limits: NoRuntimeLimitsFragment {}, }, ), diff --git a/rust/operator-binary/src/env_vars.rs b/rust/operator-binary/src/env_vars.rs index 1da5bd3d..ac045529 100644 --- a/rust/operator-binary/src/env_vars.rs +++ b/rust/operator-binary/src/env_vars.rs @@ -3,7 +3,10 @@ use std::{ path::PathBuf, }; +use base64::{Engine, engine::general_purpose::STANDARD}; +use lazy_static::lazy_static; use product_config::types::PropertyNameKind; +use rand::Rng; use snafu::{OptionExt, Snafu}; use stackable_operator::{ commons::authentication::oidc, k8s_openapi::api::core::v1::EnvVar, kube::ResourceExt, @@ -29,13 +32,14 @@ const AIRFLOW_LOGGING_LOGGING_CONFIG_CLASS: &str = "AIRFLOW__LOGGING__LOGGING_CO const AIRFLOW_METRICS_STATSD_ON: &str = "AIRFLOW__METRICS__STATSD_ON"; const AIRFLOW_METRICS_STATSD_HOST: &str = "AIRFLOW__METRICS__STATSD_HOST"; const AIRFLOW_METRICS_STATSD_PORT: &str = "AIRFLOW__METRICS__STATSD_PORT"; -const AIRFLOW_API_AUTH_BACKEND: &str = "AIRFLOW__API__AUTH_BACKEND"; +//const AIRFLOW_API_AUTH_BACKEND: &str = "AIRFLOW__API__AUTH_BACKEND"; const AIRFLOW_WEBSERVER_SECRET_KEY: &str = "AIRFLOW__WEBSERVER__SECRET_KEY"; -const AIRFLOW_CORE_SQL_ALCHEMY_CONN: &str = "AIRFLOW__CORE__SQL_ALCHEMY_CONN"; +//const AIRFLOW_CORE_SQL_ALCHEMY_CONN: &str = "AIRFLOW__CORE__SQL_ALCHEMY_CONN"; const AIRFLOW_CELERY_RESULT_BACKEND: &str = "AIRFLOW__CELERY__RESULT_BACKEND"; const AIRFLOW_CELERY_BROKER_URL: &str = "AIRFLOW__CELERY__BROKER_URL"; const AIRFLOW_CORE_DAGS_FOLDER: &str = "AIRFLOW__CORE__DAGS_FOLDER"; const AIRFLOW_CORE_LOAD_EXAMPLES: &str = "AIRFLOW__CORE__LOAD_EXAMPLES"; + const AIRFLOW_WEBSERVER_EXPOSE_CONFIG: &str = "AIRFLOW__WEBSERVER__EXPOSE_CONFIG"; const AIRFLOW_CORE_EXECUTOR: &str = "AIRFLOW__CORE__EXECUTOR"; const AIRFLOW_KUBERNETES_EXECUTOR_POD_TEMPLATE_FILE: &str = @@ -53,12 +57,27 @@ const GITSYNC_PASSWORD: &str = "GITSYNC_PASSWORD"; const PYTHONPATH: &str = "PYTHONPATH"; +lazy_static! { + pub static ref JWT_KEY: String = { + let mut rng = rand::thread_rng(); + // Generate 16 random bytes and encode to base64 string + let random_bytes: [u8; 16] = rng.gen(); + STANDARD.encode(random_bytes) + }; +} + #[derive(Snafu, Debug)] pub enum Error { #[snafu(display( "failed to construct Git DAG folder - Is the git folder a valid path?: {dag_folder:?}" ))] ConstructGitDagFolder { dag_folder: PathBuf }, + + #[snafu(display("object is missing metadata"))] + NoMetadata, + + #[snafu(display("cluster is missing webservers role"))] + NoWebserver, } /// Return environment variables to be applied to the statefulsets for the scheduler, webserver (and worker, @@ -93,10 +112,18 @@ pub fn build_airflow_statefulset_envs( "connections.secretKey", ), ); + // env.insert( + // AIRFLOW_CORE_SQL_ALCHEMY_CONN.into(), + // env_var_from_secret( + // AIRFLOW_CORE_SQL_ALCHEMY_CONN, + // secret, + // "connections.sqlalchemyDatabaseUri", + // ), + // ); env.insert( - AIRFLOW_CORE_SQL_ALCHEMY_CONN.into(), + "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN".into(), env_var_from_secret( - AIRFLOW_CORE_SQL_ALCHEMY_CONN, + "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN", secret, "connections.sqlalchemyDatabaseUri", ), @@ -294,20 +321,84 @@ fn static_envs(airflow: &v1alpha1::AirflowCluster) -> Result String { +// let mut rng = rand::thread_rng(); +// // Generate 16 random bytes and encode to base64 string +// let random_bytes: [u8; 16] = rng.gen(); +// STANDARD.encode(random_bytes) +// } + /// Return environment variables to be applied to the gitsync container in the statefulset for the scheduler, /// webserver (and worker, for clusters utilizing `celeryExecutor`). N.B. the git credentials-secret is passed /// explicitly here: it is no longer added to the role config (lib/compute_env) as the kubenertes executor wraps @@ -356,10 +447,18 @@ pub fn build_airflow_template_envs( let mut env: BTreeMap = BTreeMap::new(); let secret = airflow.spec.cluster_config.credentials_secret.as_str(); + // env.insert( + // AIRFLOW_CORE_SQL_ALCHEMY_CONN.into(), + // env_var_from_secret( + // AIRFLOW_CORE_SQL_ALCHEMY_CONN, + // secret, + // "connections.sqlalchemyDatabaseUri", + // ), + // ); env.insert( - AIRFLOW_CORE_SQL_ALCHEMY_CONN.into(), + "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN".into(), env_var_from_secret( - AIRFLOW_CORE_SQL_ALCHEMY_CONN, + "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN", secret, "connections.sqlalchemyDatabaseUri", ), diff --git a/rust/operator-binary/src/product_logging.rs b/rust/operator-binary/src/product_logging.rs index 9aca3ec3..8d7e4eea 100644 --- a/rust/operator-binary/src/product_logging.rs +++ b/rust/operator-binary/src/product_logging.rs @@ -105,6 +105,8 @@ os.makedirs('{log_dir}', exist_ok=True) LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG) +REMOTE_TASK_LOG = None + LOGGING_CONFIG.setdefault('loggers', {{}}) for logger_name, logger_config in LOGGING_CONFIG['loggers'].items(): logger_config['level'] = logging.NOTSET diff --git a/tests/release.yaml b/tests/release.yaml index c94e7bcf..4729522e 100644 --- a/tests/release.yaml +++ b/tests/release.yaml @@ -12,8 +12,8 @@ releases: operatorVersion: 0.0.0-dev listener: operatorVersion: 0.0.0-dev - airflow: - operatorVersion: 0.0.0-dev + #airflow: + # operatorVersion: 0.0.0-dev opa: operatorVersion: 0.0.0-dev spark-k8s: diff --git a/tests/templates/kuttl/commons/health.py b/tests/templates/kuttl/commons/health.py index ef1fc56b..958131f1 100755 --- a/tests/templates/kuttl/commons/health.py +++ b/tests/templates/kuttl/commons/health.py @@ -17,7 +17,7 @@ except IndexError: role_group = "default" - url = f"http://airflow-webserver-{role_group}:8080/api/v1/health" + url = f"http://airflow-webserver-{role_group}:8080/api/v2/monitor/health" count = 0 while True: diff --git a/tests/templates/kuttl/commons/metrics.py b/tests/templates/kuttl/commons/metrics.py index e0fb5eff..97c662f9 100755 --- a/tests/templates/kuttl/commons/metrics.py +++ b/tests/templates/kuttl/commons/metrics.py @@ -3,6 +3,7 @@ import requests import time import sys +from datetime import datetime, timezone def exception_handler(exception_type, exception, traceback): @@ -14,9 +15,9 @@ def exception_handler(exception_type, exception, traceback): def assert_metric(role, role_group, metric): metric_response = requests.get(f"http://airflow-{role}-{role_group}:9102/metrics") - assert ( - metric_response.status_code == 200 - ), f"Metrics could not be retrieved from the {role}-{role_group}." + assert metric_response.status_code == 200, ( + f"Metrics could not be retrieved from the {role}-{role_group}." + ) return metric in metric_response.text @@ -25,21 +26,47 @@ def assert_metric(role, role_group, metric): except IndexError: role_group = "default" +now = datetime.now(timezone.utc) +ts = now.strftime("%Y-%m-%dT%H:%M:%S.%f") + now.strftime("%z") + # Trigger a DAG run to create metrics dag_id = "example_trigger_target_dag" -dag_conf = {"message": "Hello World"} +dag_data = {"logical_date": f"{ts}", "conf": {"message": "Hello World"}} -rest_url = f"http://airflow-webserver-{role_group}:8080/api/v1" -auth = ("airflow", "airflow") +print(f"DAG-Data: {dag_data}") # allow a few moments for the DAGs to be registered to all roles time.sleep(10) +rest_url = f"http://airflow-webserver-{role_group}:8080/api/v2" +token_url = f"http://airflow-webserver-{role_group}:8080/auth/token" + +data = {"username": "airflow", "password": "airflow"} + +headers = {"Content-Type": "application/json"} + +response = requests.post(token_url, headers=headers, json=data) + +if response.status_code == 200 or response.status_code == 201: + token_data = response.json() + access_token = token_data["access_token"] + print(f"Access Token: {access_token}") +else: + print(f"Failed to obtain access token: {response.status_code} - {response.text}") + sys.exit(1) + +headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", +} + +# activate DAG response = requests.patch( - f"{rest_url}/dags/{dag_id}", auth=auth, json={"is_paused": False} + f"{rest_url}/dags/{dag_id}", headers=headers, json={"is_paused": False} ) +# trigger DAG response = requests.post( - f"{rest_url}/dags/{dag_id}/dagRuns", auth=auth, json={"conf": dag_conf} + f"{rest_url}/dags/{dag_id}/dagRuns", headers=headers, json=dag_data ) # Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid diff --git a/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 index a4594b4e..4bcb2bce 100644 --- a/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 @@ -37,6 +37,8 @@ data: LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG) + REMOTE_TASK_LOG = None + LOGGING_CONFIG['formatters']['json'] = { '()': 'airflow.utils.log.json_formatter.JSONFormatter', 'json_fields': ['asctime', 'levelname', 'message', 'name'] @@ -74,10 +76,10 @@ spec: config: resources: cpu: - min: 200m - max: 1000m + min: 1000m + max: 2000m memory: - limit: 1Gi + limit: 3Gi roleGroups: automatic-log-config: replicas: 1 @@ -126,8 +128,8 @@ spec: config: resources: cpu: - min: 50m - max: 250m + min: 500m + max: 1250m roleGroups: automatic-log-config: replicas: 1 @@ -196,8 +198,8 @@ spec: config: resources: cpu: - min: 100m - max: 500m + min: 500m + max: 1500m roleGroups: automatic-log-config: replicas: 1 diff --git a/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 index 4a3e9225..fef9c279 100644 --- a/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 @@ -47,7 +47,7 @@ data: dag_id="example_trigger_target_dag", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, - schedule_interval=None, + schedule=None, tags=['example'], ) as dag: run_this = run_this_func() diff --git a/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 index 52fd40ee..5b31b18e 100644 --- a/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 @@ -71,7 +71,7 @@ spec: gitSyncConf: # supply some config to check that safe.directory is correctly set --git-config: http.sslVerify:false - gitFolder: "mount-dags-gitsync/dags" + gitFolder: "mount-dags-gitsync/dags_airflow3" volumeMounts: - name: test-cm-gitsync mountPath: /tmp/test.txt diff --git a/tests/templates/kuttl/mount-dags-gitsync/dag_metrics.py b/tests/templates/kuttl/mount-dags-gitsync/dag_metrics.py index bc5ff92a..20294828 100755 --- a/tests/templates/kuttl/mount-dags-gitsync/dag_metrics.py +++ b/tests/templates/kuttl/mount-dags-gitsync/dag_metrics.py @@ -4,29 +4,59 @@ import time import sys import logging +from datetime import datetime, timezone def assert_metric(role, metric): metric_response = requests.get(f"http://airflow-{role}-default:9102/metrics") - assert ( - metric_response.status_code == 200 - ), f"Metrics could not be retrieved from the {role}." + assert metric_response.status_code == 200, ( + f"Metrics could not be retrieved from the {role}." + ) return metric in metric_response.text +now = datetime.now(timezone.utc) +ts = now.strftime("%Y-%m-%dT%H:%M:%S.%f") + now.strftime("%z") + # Trigger a DAG run to create metrics dag_id = "sparkapp_dag" +dag_data = {"logical_date": f"{ts}"} -rest_url = "http://airflow-webserver-default:8080/api/v1" -auth = ("airflow", "airflow") +print(f"DAG-Data: {dag_data}") # allow a few moments for the DAGs to be registered to all roles time.sleep(10) +rest_url = "http://airflow-webserver-default:8080/api/v2" +token_url = "http://airflow-webserver-default:8080/auth/token" + +data = {"username": "airflow", "password": "airflow"} + +headers = {"Content-Type": "application/json"} + +response = requests.post(token_url, headers=headers, json=data) + +if response.status_code == 200 or response.status_code == 201: + token_data = response.json() + access_token = token_data["access_token"] + print(f"Access Token: {access_token}") +else: + print(f"Failed to obtain access token: {response.status_code} - {response.text}") + sys.exit(1) + +headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", +} + +# activate DAG response = requests.patch( - f"{rest_url}/dags/{dag_id}", auth=auth, json={"is_paused": False} + f"{rest_url}/dags/{dag_id}", headers=headers, json={"is_paused": False} +) +# trigger DAG +response = requests.post( + f"{rest_url}/dags/{dag_id}/dagRuns", headers=headers, json=dag_data ) -response = requests.post(f"{rest_url}/dags/{dag_id}/dagRuns", auth=auth, json={}) # Wait for the metrics to be consumed by the statsd-exporter time.sleep(5) diff --git a/tests/templates/kuttl/overrides/10-install-airflow.yaml.j2 b/tests/templates/kuttl/overrides/10-install-airflow.yaml.j2 index 8fe1d7f3..c371c13b 100644 --- a/tests/templates/kuttl/overrides/10-install-airflow.yaml.j2 +++ b/tests/templates/kuttl/overrides/10-install-airflow.yaml.j2 @@ -43,6 +43,7 @@ spec: {% else %} productVersion: "{{ test_scenario['values']['airflow-latest'] }}" {% endif %} + pullPolicy: IfNotPresent clusterConfig: loadExamples: true exposeConfig: false diff --git a/tests/templates/kuttl/overrides/20-install-airflow2.yaml.j2 b/tests/templates/kuttl/overrides/20-install-airflow2.yaml.j2 index c4a96ec0..bbfff497 100644 --- a/tests/templates/kuttl/overrides/20-install-airflow2.yaml.j2 +++ b/tests/templates/kuttl/overrides/20-install-airflow2.yaml.j2 @@ -11,6 +11,7 @@ spec: {% else %} productVersion: "{{ test_scenario['values']['airflow-latest'] }}" {% endif %} + pullPolicy: IfNotPresent clusterConfig: loadExamples: true exposeConfig: false diff --git a/tests/templates/kuttl/smoke/00-range-limit.yaml b/tests/templates/kuttl/smoke/00-range-limit.yaml.j2 similarity index 70% rename from tests/templates/kuttl/smoke/00-range-limit.yaml rename to tests/templates/kuttl/smoke/00-range-limit.yaml.j2 index 8fd02210..b35b57a4 100644 --- a/tests/templates/kuttl/smoke/00-range-limit.yaml +++ b/tests/templates/kuttl/smoke/00-range-limit.yaml.j2 @@ -1,3 +1,4 @@ +{% if test_scenario['values']['executor'] == 'celery' %} --- apiVersion: v1 kind: LimitRange @@ -9,3 +10,4 @@ spec: maxLimitRequestRatio: cpu: 5 memory: 1 +{% endif %} diff --git a/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 index b1cb7d07..f9dccdbb 100644 --- a/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 @@ -52,26 +52,6 @@ spec: ROLE_HEADER_VAR = "role-value" EXPERIMENTAL_FILE_FOOTER: | ROLE_FOOTER_VAR = "role-value" - WTF_CSRF_ENABLED: "False" - AUTH_ROLES_SYNC_AT_LOGIN: "true" - AUTH_USER_REGISTRATION: "false" - AUTH_USER_REGISTRATION_ROLE: "Role" - OAUTH_PROVIDERS: | - [ - { 'name': 'azure', - 'icon': 'fa-windows', - 'token_key': 'access_token', - 'remote_app': { - 'client_id': os.environ.get('OIDC_XXX_CLIENT_ID'), - 'client_secret': os.environ.get('OIDC_XXX_CLIENT_SECRET'), - 'client_kwargs': { - 'scope': 'openid profile' - }, - 'api_base_url': 'https://keycloak/realms/sdp/protocol/test-url', - 'server_metadata_url': 'https://keycloak/realms/sdp/.well-known/openid-configuration-test', - }, - } - ] roleGroups: default: replicas: 1 @@ -79,7 +59,6 @@ spec: webserver_config.py: EXPERIMENTAL_FILE_HEADER: | COMMON_HEADER_VAR = "group-value" - AUTH_USER_REGISTRATION_ROLE: "Rolegroup" {% if test_scenario['values']['executor'] == 'celery' %} celeryExecutors: config: diff --git a/tests/templates/kuttl/smoke/41-assert.yaml b/tests/templates/kuttl/smoke/41-assert.yaml index de4c98ff..6169fd93 100644 --- a/tests/templates/kuttl/smoke/41-assert.yaml +++ b/tests/templates/kuttl/smoke/41-assert.yaml @@ -19,7 +19,3 @@ commands: echo "$AIRFLOW_CONFIG" | grep 'COMMON_HEADER_VAR = "group-value"' echo "$AIRFLOW_CONFIG" | grep 'ROLE_FOOTER_VAR = "role-value"' echo "$AIRFLOW_CONFIG" | grep -v 'ROLE_HEADER_VAR = "role-value"' - echo "$AIRFLOW_CONFIG" | grep 'AUTH_ROLES_SYNC_AT_LOGIN = True' - echo "$AIRFLOW_CONFIG" | grep 'AUTH_USER_REGISTRATION = False' - echo "$AIRFLOW_CONFIG" | grep 'AUTH_USER_REGISTRATION_ROLE = "Rolegroup"' - echo "$AIRFLOW_CONFIG" | grep 'OAUTH_PROVIDERS' diff --git a/tests/templates/kuttl/smoke/42-assert.yaml b/tests/templates/kuttl/smoke/42-assert.yaml deleted file mode 100644 index 46ced0d2..00000000 --- a/tests/templates/kuttl/smoke/42-assert.yaml +++ /dev/null @@ -1,8 +0,0 @@ ---- -# This test checks if the containerdebug-state.json file is present and valid -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -timeout: 600 -commands: - - script: kubectl exec -n $NAMESPACE --container airflow airflow-scheduler-default-0 -- cat /stackable/log/containerdebug-state.json | jq --exit-status '"valid JSON"' - - script: kubectl exec -n $NAMESPACE --container airflow airflow-webserver-default-0 -- cat /stackable/log/containerdebug-state.json | jq --exit-status '"valid JSON"' diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 82f11035..f9d5ddca 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -7,13 +7,13 @@ dimensions: - name: airflow values: - - 2.9.3 + - 3.0.1 #2.9.3 - 2.10.4 # To use a custom image, add a comma and the full name after the product version # - 2.9.3,oci.stackable.tech/sandbox/airflow:2.9.3-stackable0.0.0-dev - name: airflow-latest values: - - 2.10.4 + - 3.0.1 #2.10.4 # To use a custom image, add a comma and the full name after the product version # - 2.9.3,oci.stackable.tech/sandbox/airflow:2.9.3-stackable0.0.0-dev - name: opa-latest From 80041fc936eaa57c39f223e91584428a0595fc8d Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 20 May 2025 10:17:30 +0200 Subject: [PATCH 02/17] wip: changing python test files --- .../operator-binary/src/airflow_controller.rs | 3 +- rust/operator-binary/src/crd/mod.rs | 112 ++++++++++++------ rust/operator-binary/src/env_vars.rs | 50 +------- tests/templates/kuttl/commons/health_v2.py | 47 ++++++++ tests/templates/kuttl/commons/health_v3.py | 47 ++++++++ tests/templates/kuttl/commons/metrics_v2.py | 75 ++++++++++++ tests/templates/kuttl/commons/metrics_v3.py | 101 ++++++++++++++++ .../{80-assert.yaml => 80-assert.yaml.j2} | 2 +- .../{90-assert.yaml => 90-assert.yaml.j2} | 2 +- 9 files changed, 352 insertions(+), 87 deletions(-) create mode 100755 tests/templates/kuttl/commons/health_v2.py create mode 100755 tests/templates/kuttl/commons/health_v3.py create mode 100755 tests/templates/kuttl/commons/metrics_v2.py create mode 100755 tests/templates/kuttl/commons/metrics_v3.py rename tests/templates/kuttl/ldap/{80-assert.yaml => 80-assert.yaml.j2} (75%) rename tests/templates/kuttl/ldap/{90-assert.yaml => 90-assert.yaml.j2} (71%) diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index 7ce18f92..cb387745 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -906,7 +906,8 @@ fn build_server_rolegroup_statefulset( .context(GracefulShutdownSnafu)?; let mut airflow_container_args = Vec::new(); - airflow_container_args.extend(airflow_role.get_commands(authentication_config)); + airflow_container_args + .extend(airflow_role.get_commands(authentication_config, resolved_product_image)); airflow_container .image_from_product_image(resolved_product_image) diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 6607dca3..07f8cefd 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -9,7 +9,7 @@ use stackable_operator::{ cache::UserInformationCache, cluster_operation::ClusterOperation, opa::OpaConfig, - product_image_selection::ProductImage, + product_image_selection::{ProductImage, ResolvedProductImage}, resources::{ CpuLimitsFragment, MemoryLimitsFragment, NoRuntimeLimits, NoRuntimeLimitsFragment, Resources, ResourcesFragment, @@ -513,6 +513,7 @@ impl AirflowRole { pub fn get_commands( &self, auth_config: &AirflowClientAuthenticationDetailsResolved, + resolved_product_image: &ResolvedProductImage, ) -> Vec { let mut command = vec![ format!( @@ -523,46 +524,83 @@ impl AirflowRole { remove_vector_shutdown_file_command(STACKABLE_LOG_DIR), ]; - match &self { - AirflowRole::Webserver => { - // Getting auth commands for AuthClass - command.extend(Self::authentication_start_commands(auth_config)); - command.extend(vec![ + if resolved_product_image.product_version.starts_with("3.") { + match &self { + AirflowRole::Webserver => { + command.extend(Self::authentication_start_commands(auth_config)); + command.extend(vec![ + "prepare_signal_handlers".to_string(), + // format!("containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"), + //"airflow webserver &".to_string(), + "airflow db migrate".to_string(), + "airflow api-server &".to_string(), + ]); + } + AirflowRole::Scheduler => command.extend(vec![ + "airflow db migrate".to_string(), + "airflow users create \ + --username \"$ADMIN_USERNAME\" \ + --firstname \"$ADMIN_FIRSTNAME\" \ + --lastname \"$ADMIN_LASTNAME\" \ + --email \"$ADMIN_EMAIL\" \ + --password \"$ADMIN_PASSWORD\" \ + --role \"Admin\"" + .to_string(), + "prepare_signal_handlers".to_string(), + // format!( + // "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" + // ), + "airflow dag-processor &".to_string(), + "airflow scheduler &".to_string(), + ]), + AirflowRole::Worker => command.extend(vec![ "prepare_signal_handlers".to_string(), - // format!("containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"), - //"airflow webserver &".to_string(), + // format!( + // "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" + // ), "airflow db migrate".to_string(), - "airflow api-server &".to_string(), - ]); + "airflow celery worker &".to_string(), + ]), + } + } else { + match &self { + AirflowRole::Webserver => { + // Getting auth commands for AuthClass + command.extend(Self::authentication_start_commands(auth_config)); + command.extend(vec![ + "prepare_signal_handlers".to_string(), + format!("containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"), + "airflow webserver &".to_string(), + ]); + } + AirflowRole::Scheduler => command.extend(vec![ + // Database initialization is limited to the scheduler, see https://github.com/stackabletech/airflow-operator/issues/259 + "airflow db init".to_string(), + "airflow db upgrade".to_string(), + "airflow users create \ + --username \"$ADMIN_USERNAME\" \ + --firstname \"$ADMIN_FIRSTNAME\" \ + --lastname \"$ADMIN_LASTNAME\" \ + --email \"$ADMIN_EMAIL\" \ + --password \"$ADMIN_PASSWORD\" \ + --role \"Admin\"" + .to_string(), + "prepare_signal_handlers".to_string(), + format!( + "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" + ), + "airflow scheduler &".to_string(), + ]), + AirflowRole::Worker => command.extend(vec![ + "prepare_signal_handlers".to_string(), + format!( + "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" + ), + "airflow celery worker &".to_string(), + ]), } - - AirflowRole::Scheduler => command.extend(vec![ - // Database initialization is limited to the scheduler, see https://github.com/stackabletech/airflow-operator/issues/259 - "airflow db migrate".to_string(), - "airflow users create \ - --username \"$ADMIN_USERNAME\" \ - --firstname \"$ADMIN_FIRSTNAME\" \ - --lastname \"$ADMIN_LASTNAME\" \ - --email \"$ADMIN_EMAIL\" \ - --password \"$ADMIN_PASSWORD\" \ - --role \"Admin\"" - .to_string(), - "prepare_signal_handlers".to_string(), - // format!( - // "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" - // ), - "airflow dag-processor &".to_string(), - "airflow scheduler &".to_string(), - ]), - AirflowRole::Worker => command.extend(vec![ - "prepare_signal_handlers".to_string(), - // format!( - // "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" - // ), - "airflow db migrate".to_string(), - "airflow celery worker &".to_string(), - ]), } + // graceful shutdown part command.extend(vec![ "wait_for_termination $!".to_string(), diff --git a/rust/operator-binary/src/env_vars.rs b/rust/operator-binary/src/env_vars.rs index ac045529..81eabf02 100644 --- a/rust/operator-binary/src/env_vars.rs +++ b/rust/operator-binary/src/env_vars.rs @@ -32,9 +32,7 @@ const AIRFLOW_LOGGING_LOGGING_CONFIG_CLASS: &str = "AIRFLOW__LOGGING__LOGGING_CO const AIRFLOW_METRICS_STATSD_ON: &str = "AIRFLOW__METRICS__STATSD_ON"; const AIRFLOW_METRICS_STATSD_HOST: &str = "AIRFLOW__METRICS__STATSD_HOST"; const AIRFLOW_METRICS_STATSD_PORT: &str = "AIRFLOW__METRICS__STATSD_PORT"; -//const AIRFLOW_API_AUTH_BACKEND: &str = "AIRFLOW__API__AUTH_BACKEND"; const AIRFLOW_WEBSERVER_SECRET_KEY: &str = "AIRFLOW__WEBSERVER__SECRET_KEY"; -//const AIRFLOW_CORE_SQL_ALCHEMY_CONN: &str = "AIRFLOW__CORE__SQL_ALCHEMY_CONN"; const AIRFLOW_CELERY_RESULT_BACKEND: &str = "AIRFLOW__CELERY__RESULT_BACKEND"; const AIRFLOW_CELERY_BROKER_URL: &str = "AIRFLOW__CELERY__BROKER_URL"; const AIRFLOW_CORE_DAGS_FOLDER: &str = "AIRFLOW__CORE__DAGS_FOLDER"; @@ -112,14 +110,6 @@ pub fn build_airflow_statefulset_envs( "connections.secretKey", ), ); - // env.insert( - // AIRFLOW_CORE_SQL_ALCHEMY_CONN.into(), - // env_var_from_secret( - // AIRFLOW_CORE_SQL_ALCHEMY_CONN, - // secret, - // "connections.sqlalchemyDatabaseUri", - // ), - // ); env.insert( "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN".into(), env_var_from_secret( @@ -329,27 +319,11 @@ fn static_envs(airflow: &v1alpha1::AirflowCluster) -> Result Result String { -// let mut rng = rand::thread_rng(); -// // Generate 16 random bytes and encode to base64 string -// let random_bytes: [u8; 16] = rng.gen(); -// STANDARD.encode(random_bytes) -// } - /// Return environment variables to be applied to the gitsync container in the statefulset for the scheduler, /// webserver (and worker, for clusters utilizing `celeryExecutor`). N.B. the git credentials-secret is passed /// explicitly here: it is no longer added to the role config (lib/compute_env) as the kubenertes executor wraps @@ -447,14 +411,6 @@ pub fn build_airflow_template_envs( let mut env: BTreeMap = BTreeMap::new(); let secret = airflow.spec.cluster_config.credentials_secret.as_str(); - // env.insert( - // AIRFLOW_CORE_SQL_ALCHEMY_CONN.into(), - // env_var_from_secret( - // AIRFLOW_CORE_SQL_ALCHEMY_CONN, - // secret, - // "connections.sqlalchemyDatabaseUri", - // ), - // ); env.insert( "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN".into(), env_var_from_secret( diff --git a/tests/templates/kuttl/commons/health_v2.py b/tests/templates/kuttl/commons/health_v2.py new file mode 100755 index 00000000..ef1fc56b --- /dev/null +++ b/tests/templates/kuttl/commons/health_v2.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +import logging +import requests +import sys +import time + +if __name__ == "__main__": + log_level = "DEBUG" + logging.basicConfig( + level=log_level, + format="%(asctime)s %(levelname)s: %(message)s", + stream=sys.stdout, + ) + + try: + role_group = sys.argv[1] + except IndexError: + role_group = "default" + + url = f"http://airflow-webserver-{role_group}:8080/api/v1/health" + count = 0 + + while True: + try: + count = count + 1 + res = requests.get(url, timeout=5) + code = res.status_code + if code == 200: + break + else: + print( + f"Got non 200 status code [{code}], retrying attempt no [{count}] ...." + ) + except requests.exceptions.Timeout: + print(f"Connection timed out, retrying attempt no [{count}] ....") + except requests.ConnectionError as e: + print(f"Connection Error: {str(e)}") + except requests.RequestException as e: + print(f"General Error: {str(e)}") + except Exception as e: + print( + f"General error occurred {str(e)}, retrying attempt no [{count}] ...." + ) + + # Wait a little bit before retrying + time.sleep(1) + sys.exit(0) diff --git a/tests/templates/kuttl/commons/health_v3.py b/tests/templates/kuttl/commons/health_v3.py new file mode 100755 index 00000000..958131f1 --- /dev/null +++ b/tests/templates/kuttl/commons/health_v3.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +import logging +import requests +import sys +import time + +if __name__ == "__main__": + log_level = "DEBUG" + logging.basicConfig( + level=log_level, + format="%(asctime)s %(levelname)s: %(message)s", + stream=sys.stdout, + ) + + try: + role_group = sys.argv[1] + except IndexError: + role_group = "default" + + url = f"http://airflow-webserver-{role_group}:8080/api/v2/monitor/health" + count = 0 + + while True: + try: + count = count + 1 + res = requests.get(url, timeout=5) + code = res.status_code + if code == 200: + break + else: + print( + f"Got non 200 status code [{code}], retrying attempt no [{count}] ...." + ) + except requests.exceptions.Timeout: + print(f"Connection timed out, retrying attempt no [{count}] ....") + except requests.ConnectionError as e: + print(f"Connection Error: {str(e)}") + except requests.RequestException as e: + print(f"General Error: {str(e)}") + except Exception as e: + print( + f"General error occurred {str(e)}, retrying attempt no [{count}] ...." + ) + + # Wait a little bit before retrying + time.sleep(1) + sys.exit(0) diff --git a/tests/templates/kuttl/commons/metrics_v2.py b/tests/templates/kuttl/commons/metrics_v2.py new file mode 100755 index 00000000..7734f18b --- /dev/null +++ b/tests/templates/kuttl/commons/metrics_v2.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python + +import requests +import time +import sys + + +def exception_handler(exception_type, exception, traceback): + print(f"{exception_type.__name__}: {exception.args}") + + +sys.excepthook = exception_handler + + +def assert_metric(role, role_group, metric): + metric_response = requests.get(f"http://airflow-{role}-{role_group}:9102/metrics") + assert metric_response.status_code == 200, ( + f"Metrics could not be retrieved from the {role}-{role_group}." + ) + return metric in metric_response.text + + +try: + role_group = sys.argv[1] +except IndexError: + role_group = "default" + + +# Trigger a DAG run to create metrics +dag_id = "example_trigger_target_dag" +dag_conf = {"message": "Hello World"} + +rest_url = f"http://airflow-webserver-{role_group}:8080/api/v1" +auth = ("airflow", "airflow") + +# allow a few moments for the DAGs to be registered to all roles +time.sleep(10) + +response = requests.patch( + f"{rest_url}/dags/{dag_id}", auth=auth, json={"is_paused": False} +) +response = requests.post( + f"{rest_url}/dags/{dag_id}/dagRuns", auth=auth, json={"conf": dag_conf} +) + +# Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid +# or minimize this by looping over the check instead. +iterations = 4 +loop = 0 +while True: + assert response.status_code == 200, "DAG run could not be triggered." + # Wait for the metrics to be consumed by the statsd-exporter + time.sleep(5) + # (disable line-break flake checks) + if ( + (assert_metric("scheduler", role_group, "airflow_scheduler_heartbeat")) + and ( + assert_metric( + "webserver", role_group, "airflow_task_instance_created_BashOperator" + ) + ) # noqa: W503, W504 + and ( + assert_metric( + "scheduler", + role_group, + "airflow_dagrun_duration_success_example_trigger_target_dag_count", + ) + ) + ): # noqa: W503, W504 + break + time.sleep(10) + loop += 1 + if loop == iterations: + # force re-try of script + sys.exit(1) diff --git a/tests/templates/kuttl/commons/metrics_v3.py b/tests/templates/kuttl/commons/metrics_v3.py new file mode 100755 index 00000000..97c662f9 --- /dev/null +++ b/tests/templates/kuttl/commons/metrics_v3.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python + +import requests +import time +import sys +from datetime import datetime, timezone + + +def exception_handler(exception_type, exception, traceback): + print(f"{exception_type.__name__}: {exception.args}") + + +sys.excepthook = exception_handler + + +def assert_metric(role, role_group, metric): + metric_response = requests.get(f"http://airflow-{role}-{role_group}:9102/metrics") + assert metric_response.status_code == 200, ( + f"Metrics could not be retrieved from the {role}-{role_group}." + ) + return metric in metric_response.text + + +try: + role_group = sys.argv[1] +except IndexError: + role_group = "default" + +now = datetime.now(timezone.utc) +ts = now.strftime("%Y-%m-%dT%H:%M:%S.%f") + now.strftime("%z") + +# Trigger a DAG run to create metrics +dag_id = "example_trigger_target_dag" +dag_data = {"logical_date": f"{ts}", "conf": {"message": "Hello World"}} + +print(f"DAG-Data: {dag_data}") + +# allow a few moments for the DAGs to be registered to all roles +time.sleep(10) + +rest_url = f"http://airflow-webserver-{role_group}:8080/api/v2" +token_url = f"http://airflow-webserver-{role_group}:8080/auth/token" + +data = {"username": "airflow", "password": "airflow"} + +headers = {"Content-Type": "application/json"} + +response = requests.post(token_url, headers=headers, json=data) + +if response.status_code == 200 or response.status_code == 201: + token_data = response.json() + access_token = token_data["access_token"] + print(f"Access Token: {access_token}") +else: + print(f"Failed to obtain access token: {response.status_code} - {response.text}") + sys.exit(1) + +headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", +} + +# activate DAG +response = requests.patch( + f"{rest_url}/dags/{dag_id}", headers=headers, json={"is_paused": False} +) +# trigger DAG +response = requests.post( + f"{rest_url}/dags/{dag_id}/dagRuns", headers=headers, json=dag_data +) + +# Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid +# or minimize this by looping over the check instead. +iterations = 4 +loop = 0 +while True: + assert response.status_code == 200, "DAG run could not be triggered." + # Wait for the metrics to be consumed by the statsd-exporter + time.sleep(5) + # (disable line-break flake checks) + if ( + (assert_metric("scheduler", role_group, "airflow_scheduler_heartbeat")) + and ( + assert_metric( + "webserver", role_group, "airflow_task_instance_created_BashOperator" + ) + ) # noqa: W503, W504 + and ( + assert_metric( + "scheduler", + role_group, + "airflow_dagrun_duration_success_example_trigger_target_dag_count", + ) + ) + ): # noqa: W503, W504 + break + time.sleep(10) + loop += 1 + if loop == iterations: + # force re-try of script + sys.exit(1) diff --git a/tests/templates/kuttl/ldap/80-assert.yaml b/tests/templates/kuttl/ldap/80-assert.yaml.j2 similarity index 75% rename from tests/templates/kuttl/ldap/80-assert.yaml rename to tests/templates/kuttl/ldap/80-assert.yaml.j2 index c37ea4f1..8c3ab208 100644 --- a/tests/templates/kuttl/ldap/80-assert.yaml +++ b/tests/templates/kuttl/ldap/80-assert.yaml.j2 @@ -5,4 +5,4 @@ metadata: name: test-airflow-webserver-health-check timeout: 480 commands: - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py {{ test_scenario['values']['airflow'] }} diff --git a/tests/templates/kuttl/ldap/90-assert.yaml b/tests/templates/kuttl/ldap/90-assert.yaml.j2 similarity index 71% rename from tests/templates/kuttl/ldap/90-assert.yaml rename to tests/templates/kuttl/ldap/90-assert.yaml.j2 index 2ea7c1ca..00687e27 100644 --- a/tests/templates/kuttl/ldap/90-assert.yaml +++ b/tests/templates/kuttl/ldap/90-assert.yaml.j2 @@ -5,4 +5,4 @@ metadata: name: metrics timeout: 480 commands: - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py {{ test_scenario['values']['airflow'] }} From 626d49649e834e7a0826f159cedd77dd1e9b3817 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 20 May 2025 10:34:03 +0200 Subject: [PATCH 03/17] single health.py for all airflow versions --- tests/templates/kuttl/commons/health.py | 14 ++++--- tests/templates/kuttl/commons/health_v2.py | 47 ---------------------- tests/templates/kuttl/commons/health_v3.py | 47 ---------------------- 3 files changed, 9 insertions(+), 99 deletions(-) delete mode 100755 tests/templates/kuttl/commons/health_v2.py delete mode 100755 tests/templates/kuttl/commons/health_v3.py diff --git a/tests/templates/kuttl/commons/health.py b/tests/templates/kuttl/commons/health.py index 958131f1..385e3a29 100755 --- a/tests/templates/kuttl/commons/health.py +++ b/tests/templates/kuttl/commons/health.py @@ -3,6 +3,7 @@ import requests import sys import time +import argparse if __name__ == "__main__": log_level = "DEBUG" @@ -12,12 +13,15 @@ stream=sys.stdout, ) - try: - role_group = sys.argv[1] - except IndexError: - role_group = "default" + parser = argparse.ArgumentParser(description="Health check script") + parser.add_argument("--role-group", type=str, default="default", help="Role group to check") + parser.add_argument("--airflow-version", type=str, help="Airflow version") + opts = parser.parse_args() + + url = f"http://airflow-webserver-{opts.role_group}:8080/api/v1/health" + if opts.airflow_version and opts.airflow_version.startswith("3"): + url = f"http://airflow-webserver-{opts.role_group}:8080/api/v2/monitor/health" - url = f"http://airflow-webserver-{role_group}:8080/api/v2/monitor/health" count = 0 while True: diff --git a/tests/templates/kuttl/commons/health_v2.py b/tests/templates/kuttl/commons/health_v2.py deleted file mode 100755 index ef1fc56b..00000000 --- a/tests/templates/kuttl/commons/health_v2.py +++ /dev/null @@ -1,47 +0,0 @@ -#!/usr/bin/env python -import logging -import requests -import sys -import time - -if __name__ == "__main__": - log_level = "DEBUG" - logging.basicConfig( - level=log_level, - format="%(asctime)s %(levelname)s: %(message)s", - stream=sys.stdout, - ) - - try: - role_group = sys.argv[1] - except IndexError: - role_group = "default" - - url = f"http://airflow-webserver-{role_group}:8080/api/v1/health" - count = 0 - - while True: - try: - count = count + 1 - res = requests.get(url, timeout=5) - code = res.status_code - if code == 200: - break - else: - print( - f"Got non 200 status code [{code}], retrying attempt no [{count}] ...." - ) - except requests.exceptions.Timeout: - print(f"Connection timed out, retrying attempt no [{count}] ....") - except requests.ConnectionError as e: - print(f"Connection Error: {str(e)}") - except requests.RequestException as e: - print(f"General Error: {str(e)}") - except Exception as e: - print( - f"General error occurred {str(e)}, retrying attempt no [{count}] ...." - ) - - # Wait a little bit before retrying - time.sleep(1) - sys.exit(0) diff --git a/tests/templates/kuttl/commons/health_v3.py b/tests/templates/kuttl/commons/health_v3.py deleted file mode 100755 index 958131f1..00000000 --- a/tests/templates/kuttl/commons/health_v3.py +++ /dev/null @@ -1,47 +0,0 @@ -#!/usr/bin/env python -import logging -import requests -import sys -import time - -if __name__ == "__main__": - log_level = "DEBUG" - logging.basicConfig( - level=log_level, - format="%(asctime)s %(levelname)s: %(message)s", - stream=sys.stdout, - ) - - try: - role_group = sys.argv[1] - except IndexError: - role_group = "default" - - url = f"http://airflow-webserver-{role_group}:8080/api/v2/monitor/health" - count = 0 - - while True: - try: - count = count + 1 - res = requests.get(url, timeout=5) - code = res.status_code - if code == 200: - break - else: - print( - f"Got non 200 status code [{code}], retrying attempt no [{count}] ...." - ) - except requests.exceptions.Timeout: - print(f"Connection timed out, retrying attempt no [{count}] ....") - except requests.ConnectionError as e: - print(f"Connection Error: {str(e)}") - except requests.RequestException as e: - print(f"General Error: {str(e)}") - except Exception as e: - print( - f"General error occurred {str(e)}, retrying attempt no [{count}] ...." - ) - - # Wait a little bit before retrying - time.sleep(1) - sys.exit(0) From 1c06eb2d25148b4e594477fadf8d5b17d9d74897 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 20 May 2025 11:24:39 +0200 Subject: [PATCH 04/17] single metrics.py for all airflow versions --- tests/templates/kuttl/commons/metrics.py | 222 +++++++++++++------- tests/templates/kuttl/commons/metrics_v2.py | 75 ------- tests/templates/kuttl/commons/metrics_v3.py | 101 --------- 3 files changed, 143 insertions(+), 255 deletions(-) delete mode 100755 tests/templates/kuttl/commons/metrics_v2.py delete mode 100755 tests/templates/kuttl/commons/metrics_v3.py diff --git a/tests/templates/kuttl/commons/metrics.py b/tests/templates/kuttl/commons/metrics.py index 97c662f9..c484e840 100755 --- a/tests/templates/kuttl/commons/metrics.py +++ b/tests/templates/kuttl/commons/metrics.py @@ -4,15 +4,14 @@ import time import sys from datetime import datetime, timezone - +import argparse +import logging def exception_handler(exception_type, exception, traceback): print(f"{exception_type.__name__}: {exception.args}") - sys.excepthook = exception_handler - def assert_metric(role, role_group, metric): metric_response = requests.get(f"http://airflow-{role}-{role_group}:9102/metrics") assert metric_response.status_code == 200, ( @@ -20,82 +19,147 @@ def assert_metric(role, role_group, metric): ) return metric in metric_response.text +def metrics_v3(role_group: str) -> None: -try: - role_group = sys.argv[1] -except IndexError: - role_group = "default" - -now = datetime.now(timezone.utc) -ts = now.strftime("%Y-%m-%dT%H:%M:%S.%f") + now.strftime("%z") - -# Trigger a DAG run to create metrics -dag_id = "example_trigger_target_dag" -dag_data = {"logical_date": f"{ts}", "conf": {"message": "Hello World"}} - -print(f"DAG-Data: {dag_data}") - -# allow a few moments for the DAGs to be registered to all roles -time.sleep(10) - -rest_url = f"http://airflow-webserver-{role_group}:8080/api/v2" -token_url = f"http://airflow-webserver-{role_group}:8080/auth/token" - -data = {"username": "airflow", "password": "airflow"} - -headers = {"Content-Type": "application/json"} - -response = requests.post(token_url, headers=headers, json=data) - -if response.status_code == 200 or response.status_code == 201: - token_data = response.json() - access_token = token_data["access_token"] - print(f"Access Token: {access_token}") -else: - print(f"Failed to obtain access token: {response.status_code} - {response.text}") - sys.exit(1) - -headers = { - "Authorization": f"Bearer {access_token}", - "Content-Type": "application/json", -} - -# activate DAG -response = requests.patch( - f"{rest_url}/dags/{dag_id}", headers=headers, json={"is_paused": False} -) -# trigger DAG -response = requests.post( - f"{rest_url}/dags/{dag_id}/dagRuns", headers=headers, json=dag_data -) - -# Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid -# or minimize this by looping over the check instead. -iterations = 4 -loop = 0 -while True: - assert response.status_code == 200, "DAG run could not be triggered." - # Wait for the metrics to be consumed by the statsd-exporter - time.sleep(5) - # (disable line-break flake checks) - if ( - (assert_metric("scheduler", role_group, "airflow_scheduler_heartbeat")) - and ( - assert_metric( - "webserver", role_group, "airflow_task_instance_created_BashOperator" - ) - ) # noqa: W503, W504 - and ( - assert_metric( - "scheduler", - role_group, - "airflow_dagrun_duration_success_example_trigger_target_dag_count", - ) - ) - ): # noqa: W503, W504 - break + now = datetime.now(timezone.utc) + ts = now.strftime("%Y-%m-%dT%H:%M:%S.%f") + now.strftime("%z") + + # Trigger a DAG run to create metrics + dag_id = "example_trigger_target_dag" + dag_data = {"logical_date": f"{ts}", "conf": {"message": "Hello World"}} + + print(f"DAG-Data: {dag_data}") + + # allow a few moments for the DAGs to be registered to all roles time.sleep(10) - loop += 1 - if loop == iterations: - # force re-try of script + + rest_url = f"http://airflow-webserver-{role_group}:8080/api/v2" + token_url = f"http://airflow-webserver-{role_group}:8080/auth/token" + + data = {"username": "airflow", "password": "airflow"} + + headers = {"Content-Type": "application/json"} + + response = requests.post(token_url, headers=headers, json=data) + + if response.status_code == 200 or response.status_code == 201: + token_data = response.json() + access_token = token_data["access_token"] + print(f"Access Token: {access_token}") + else: + print(f"Failed to obtain access token: {response.status_code} - {response.text}") sys.exit(1) + + headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + } + + # activate DAG + response = requests.patch( + f"{rest_url}/dags/{dag_id}", headers=headers, json={"is_paused": False} + ) + # trigger DAG + response = requests.post( + f"{rest_url}/dags/{dag_id}/dagRuns", headers=headers, json=dag_data + ) + + # Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid + # or minimize this by looping over the check instead. + iterations = 4 + loop = 0 + while True: + assert response.status_code == 200, "DAG run could not be triggered." + # Wait for the metrics to be consumed by the statsd-exporter + time.sleep(5) + # (disable line-break flake checks) + if ( + (assert_metric("scheduler", role_group, "airflow_scheduler_heartbeat")) + and ( + assert_metric( + "webserver", role_group, "airflow_task_instance_created_BashOperator" + ) + ) # noqa: W503, W504 + and ( + assert_metric( + "scheduler", + role_group, + "airflow_dagrun_duration_success_example_trigger_target_dag_count", + ) + ) + ): # noqa: W503, W504 + break + time.sleep(10) + loop += 1 + if loop == iterations: + # force re-try of script + sys.exit(1) + +def metrics_v2(role_group: str) -> None: + + # Trigger a DAG run to create metrics + dag_id = "example_trigger_target_dag" + dag_conf = {"message": "Hello World"} + + rest_url = f"http://airflow-webserver-{role_group}:8080/api/v1" + auth = ("airflow", "airflow") + + # allow a few moments for the DAGs to be registered to all roles + time.sleep(10) + + response = requests.patch( + f"{rest_url}/dags/{dag_id}", auth=auth, json={"is_paused": False} + ) + response = requests.post( + f"{rest_url}/dags/{dag_id}/dagRuns", auth=auth, json={"conf": dag_conf} + ) + + # Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid + # or minimize this by looping over the check instead. + iterations = 4 + loop = 0 + while True: + assert response.status_code == 200, "DAG run could not be triggered." + # Wait for the metrics to be consumed by the statsd-exporter + time.sleep(5) + # (disable line-break flake checks) + if ( + (assert_metric("scheduler", role_group, "airflow_scheduler_heartbeat")) + and ( + assert_metric( + "webserver", role_group, "airflow_task_instance_created_BashOperator" + ) + ) # noqa: W503, W504 + and ( + assert_metric( + "scheduler", + role_group, + "airflow_dagrun_duration_success_example_trigger_target_dag_count", + ) + ) + ): # noqa: W503, W504 + break + time.sleep(10) + loop += 1 + if loop == iterations: + # force re-try of script + sys.exit(1) + +if __name__ == "__main__": + + log_level = "DEBUG" + logging.basicConfig( + level=log_level, + format="%(asctime)s %(levelname)s: %(message)s", + stream=sys.stdout, + ) + + parser = argparse.ArgumentParser(description="Airflow metrics script") + parser.add_argument("--role-group", type=str, default="default", help="Role group to check") + parser.add_argument("--airflow-version", type=str, help="Airflow version") + opts = parser.parse_args() + + if opts.airflow_version and opts.airflow_version.startswith("3"): + metrics_v3(opts.role_group) + else: + metrics_v2(opts.role_group) diff --git a/tests/templates/kuttl/commons/metrics_v2.py b/tests/templates/kuttl/commons/metrics_v2.py deleted file mode 100755 index 7734f18b..00000000 --- a/tests/templates/kuttl/commons/metrics_v2.py +++ /dev/null @@ -1,75 +0,0 @@ -#!/usr/bin/env python - -import requests -import time -import sys - - -def exception_handler(exception_type, exception, traceback): - print(f"{exception_type.__name__}: {exception.args}") - - -sys.excepthook = exception_handler - - -def assert_metric(role, role_group, metric): - metric_response = requests.get(f"http://airflow-{role}-{role_group}:9102/metrics") - assert metric_response.status_code == 200, ( - f"Metrics could not be retrieved from the {role}-{role_group}." - ) - return metric in metric_response.text - - -try: - role_group = sys.argv[1] -except IndexError: - role_group = "default" - - -# Trigger a DAG run to create metrics -dag_id = "example_trigger_target_dag" -dag_conf = {"message": "Hello World"} - -rest_url = f"http://airflow-webserver-{role_group}:8080/api/v1" -auth = ("airflow", "airflow") - -# allow a few moments for the DAGs to be registered to all roles -time.sleep(10) - -response = requests.patch( - f"{rest_url}/dags/{dag_id}", auth=auth, json={"is_paused": False} -) -response = requests.post( - f"{rest_url}/dags/{dag_id}/dagRuns", auth=auth, json={"conf": dag_conf} -) - -# Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid -# or minimize this by looping over the check instead. -iterations = 4 -loop = 0 -while True: - assert response.status_code == 200, "DAG run could not be triggered." - # Wait for the metrics to be consumed by the statsd-exporter - time.sleep(5) - # (disable line-break flake checks) - if ( - (assert_metric("scheduler", role_group, "airflow_scheduler_heartbeat")) - and ( - assert_metric( - "webserver", role_group, "airflow_task_instance_created_BashOperator" - ) - ) # noqa: W503, W504 - and ( - assert_metric( - "scheduler", - role_group, - "airflow_dagrun_duration_success_example_trigger_target_dag_count", - ) - ) - ): # noqa: W503, W504 - break - time.sleep(10) - loop += 1 - if loop == iterations: - # force re-try of script - sys.exit(1) diff --git a/tests/templates/kuttl/commons/metrics_v3.py b/tests/templates/kuttl/commons/metrics_v3.py deleted file mode 100755 index 97c662f9..00000000 --- a/tests/templates/kuttl/commons/metrics_v3.py +++ /dev/null @@ -1,101 +0,0 @@ -#!/usr/bin/env python - -import requests -import time -import sys -from datetime import datetime, timezone - - -def exception_handler(exception_type, exception, traceback): - print(f"{exception_type.__name__}: {exception.args}") - - -sys.excepthook = exception_handler - - -def assert_metric(role, role_group, metric): - metric_response = requests.get(f"http://airflow-{role}-{role_group}:9102/metrics") - assert metric_response.status_code == 200, ( - f"Metrics could not be retrieved from the {role}-{role_group}." - ) - return metric in metric_response.text - - -try: - role_group = sys.argv[1] -except IndexError: - role_group = "default" - -now = datetime.now(timezone.utc) -ts = now.strftime("%Y-%m-%dT%H:%M:%S.%f") + now.strftime("%z") - -# Trigger a DAG run to create metrics -dag_id = "example_trigger_target_dag" -dag_data = {"logical_date": f"{ts}", "conf": {"message": "Hello World"}} - -print(f"DAG-Data: {dag_data}") - -# allow a few moments for the DAGs to be registered to all roles -time.sleep(10) - -rest_url = f"http://airflow-webserver-{role_group}:8080/api/v2" -token_url = f"http://airflow-webserver-{role_group}:8080/auth/token" - -data = {"username": "airflow", "password": "airflow"} - -headers = {"Content-Type": "application/json"} - -response = requests.post(token_url, headers=headers, json=data) - -if response.status_code == 200 or response.status_code == 201: - token_data = response.json() - access_token = token_data["access_token"] - print(f"Access Token: {access_token}") -else: - print(f"Failed to obtain access token: {response.status_code} - {response.text}") - sys.exit(1) - -headers = { - "Authorization": f"Bearer {access_token}", - "Content-Type": "application/json", -} - -# activate DAG -response = requests.patch( - f"{rest_url}/dags/{dag_id}", headers=headers, json={"is_paused": False} -) -# trigger DAG -response = requests.post( - f"{rest_url}/dags/{dag_id}/dagRuns", headers=headers, json=dag_data -) - -# Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid -# or minimize this by looping over the check instead. -iterations = 4 -loop = 0 -while True: - assert response.status_code == 200, "DAG run could not be triggered." - # Wait for the metrics to be consumed by the statsd-exporter - time.sleep(5) - # (disable line-break flake checks) - if ( - (assert_metric("scheduler", role_group, "airflow_scheduler_heartbeat")) - and ( - assert_metric( - "webserver", role_group, "airflow_task_instance_created_BashOperator" - ) - ) # noqa: W503, W504 - and ( - assert_metric( - "scheduler", - role_group, - "airflow_dagrun_duration_success_example_trigger_target_dag_count", - ) - ) - ): # noqa: W503, W504 - break - time.sleep(10) - loop += 1 - if loop == iterations: - # force re-try of script - sys.exit(1) From de63b7134f0f1fe6cf916a09279e29861364367c Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 20 May 2025 12:19:03 +0200 Subject: [PATCH 05/17] update tests with new commons scripts --- tests/templates/kuttl/ldap/80-assert.yaml.j2 | 6 +++++- tests/templates/kuttl/ldap/90-assert.yaml.j2 | 6 +++++- tests/templates/kuttl/logging/51-assert.yaml | 10 ---------- tests/templates/kuttl/logging/51-assert.yaml.j2 | 17 +++++++++++++++++ tests/templates/kuttl/logging/52-assert.yaml | 10 ---------- tests/templates/kuttl/logging/52-assert.yaml.j2 | 17 +++++++++++++++++ .../kuttl/mount-dags-configmap/50-assert.yaml | 8 -------- .../mount-dags-configmap/50-assert.yaml.j2 | 13 +++++++++++++ .../kuttl/mount-dags-configmap/60-assert.yaml | 6 +++++- .../kuttl/mount-dags-gitsync/50-assert.yaml | 8 -------- .../kuttl/mount-dags-gitsync/50-assert.yaml.j2 | 12 ++++++++++++ tests/templates/kuttl/smoke/60-assert.yaml | 8 -------- tests/templates/kuttl/smoke/60-assert.yaml.j2 | 12 ++++++++++++ tests/templates/kuttl/smoke/70-assert.yaml | 8 -------- tests/templates/kuttl/smoke/70-assert.yaml.j2 | 12 ++++++++++++ 15 files changed, 98 insertions(+), 55 deletions(-) delete mode 100644 tests/templates/kuttl/logging/51-assert.yaml create mode 100644 tests/templates/kuttl/logging/51-assert.yaml.j2 delete mode 100644 tests/templates/kuttl/logging/52-assert.yaml create mode 100644 tests/templates/kuttl/logging/52-assert.yaml.j2 delete mode 100644 tests/templates/kuttl/mount-dags-configmap/50-assert.yaml create mode 100644 tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 delete mode 100644 tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml create mode 100644 tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml.j2 delete mode 100644 tests/templates/kuttl/smoke/60-assert.yaml create mode 100644 tests/templates/kuttl/smoke/60-assert.yaml.j2 delete mode 100644 tests/templates/kuttl/smoke/70-assert.yaml create mode 100644 tests/templates/kuttl/smoke/70-assert.yaml.j2 diff --git a/tests/templates/kuttl/ldap/80-assert.yaml.j2 b/tests/templates/kuttl/ldap/80-assert.yaml.j2 index 8c3ab208..8b1f71bf 100644 --- a/tests/templates/kuttl/ldap/80-assert.yaml.j2 +++ b/tests/templates/kuttl/ldap/80-assert.yaml.j2 @@ -5,4 +5,8 @@ metadata: name: test-airflow-webserver-health-check timeout: 480 commands: - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py {{ test_scenario['values']['airflow'] }} +{% if test_scenario['values']['airflow'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow'] }}" +{% endif %} diff --git a/tests/templates/kuttl/ldap/90-assert.yaml.j2 b/tests/templates/kuttl/ldap/90-assert.yaml.j2 index 00687e27..a811cb8e 100644 --- a/tests/templates/kuttl/ldap/90-assert.yaml.j2 +++ b/tests/templates/kuttl/ldap/90-assert.yaml.j2 @@ -5,4 +5,8 @@ metadata: name: metrics timeout: 480 commands: - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py {{ test_scenario['values']['airflow'] }} +{% if test_scenario['values']['airflow'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow'] }}" +{% endif %} diff --git a/tests/templates/kuttl/logging/51-assert.yaml b/tests/templates/kuttl/logging/51-assert.yaml deleted file mode 100644 index 7514d353..00000000 --- a/tests/templates/kuttl/logging/51-assert.yaml +++ /dev/null @@ -1,10 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -metadata: - name: test-airflow-webserver-health-check -timeout: 480 -commands: - - script: | - kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py automatic-log-config - kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py custom-log-config diff --git a/tests/templates/kuttl/logging/51-assert.yaml.j2 b/tests/templates/kuttl/logging/51-assert.yaml.j2 new file mode 100644 index 00000000..e2971a52 --- /dev/null +++ b/tests/templates/kuttl/logging/51-assert.yaml.j2 @@ -0,0 +1,17 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-webserver-health-check +timeout: 480 +commands: +{% if test_scenario['values']['airflow'].find(",") > 0 %} + - script: | + kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --role-group automatic-log-config --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" + kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --role-group custom-log-config --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% else %} + - script: | + kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --role-group automatic-log-config --airflow-version "{{ test_scenario['values']['airflow'] }}" + kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --role-group custom-log-config --airflow-version "{{ test_scenario['values']['airflow'] }}" +{% endif %} + diff --git a/tests/templates/kuttl/logging/52-assert.yaml b/tests/templates/kuttl/logging/52-assert.yaml deleted file mode 100644 index c4f01749..00000000 --- a/tests/templates/kuttl/logging/52-assert.yaml +++ /dev/null @@ -1,10 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -metadata: - name: metrics -timeout: 600 -commands: - - script: | - kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py automatic-log-config - kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py custom-log-config diff --git a/tests/templates/kuttl/logging/52-assert.yaml.j2 b/tests/templates/kuttl/logging/52-assert.yaml.j2 new file mode 100644 index 00000000..3e093e4f --- /dev/null +++ b/tests/templates/kuttl/logging/52-assert.yaml.j2 @@ -0,0 +1,17 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: metrics +timeout: 600 +commands: +{% if test_scenario['values']['airflow'].find(",") > 0 %} + - script: | + kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --role-group automatic-log-config --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" + kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --role-group custom-log-config --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% else %} + - script: | + kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --role-group automatic-log-config --airflow-version "{{ test_scenario['values']['airflow'] }}" + kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --role-group custom-log-config --airflow-version "{{ test_scenario['values']['airflow'] }}" +{% endif %} + diff --git a/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml b/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml deleted file mode 100644 index c37ea4f1..00000000 --- a/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml +++ /dev/null @@ -1,8 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -metadata: - name: test-airflow-webserver-health-check -timeout: 480 -commands: - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py diff --git a/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 b/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 new file mode 100644 index 00000000..9c2e8a22 --- /dev/null +++ b/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 @@ -0,0 +1,13 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-webserver-health-check +timeout: 480 +commands: +{% if test_scenario['values']['airflow'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow'] }}" +{% endif %} + diff --git a/tests/templates/kuttl/mount-dags-configmap/60-assert.yaml b/tests/templates/kuttl/mount-dags-configmap/60-assert.yaml index 2ea7c1ca..a811cb8e 100644 --- a/tests/templates/kuttl/mount-dags-configmap/60-assert.yaml +++ b/tests/templates/kuttl/mount-dags-configmap/60-assert.yaml @@ -5,4 +5,8 @@ metadata: name: metrics timeout: 480 commands: - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py +{% if test_scenario['values']['airflow'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow'] }}" +{% endif %} diff --git a/tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml b/tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml deleted file mode 100644 index c37ea4f1..00000000 --- a/tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml +++ /dev/null @@ -1,8 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -metadata: - name: test-airflow-webserver-health-check -timeout: 480 -commands: - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py diff --git a/tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml.j2 b/tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml.j2 new file mode 100644 index 00000000..8b1f71bf --- /dev/null +++ b/tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-webserver-health-check +timeout: 480 +commands: +{% if test_scenario['values']['airflow'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow'] }}" +{% endif %} diff --git a/tests/templates/kuttl/smoke/60-assert.yaml b/tests/templates/kuttl/smoke/60-assert.yaml deleted file mode 100644 index c37ea4f1..00000000 --- a/tests/templates/kuttl/smoke/60-assert.yaml +++ /dev/null @@ -1,8 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -metadata: - name: test-airflow-webserver-health-check -timeout: 480 -commands: - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py diff --git a/tests/templates/kuttl/smoke/60-assert.yaml.j2 b/tests/templates/kuttl/smoke/60-assert.yaml.j2 new file mode 100644 index 00000000..8b1f71bf --- /dev/null +++ b/tests/templates/kuttl/smoke/60-assert.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-webserver-health-check +timeout: 480 +commands: +{% if test_scenario['values']['airflow'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow'] }}" +{% endif %} diff --git a/tests/templates/kuttl/smoke/70-assert.yaml b/tests/templates/kuttl/smoke/70-assert.yaml deleted file mode 100644 index 2ea7c1ca..00000000 --- a/tests/templates/kuttl/smoke/70-assert.yaml +++ /dev/null @@ -1,8 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -metadata: - name: metrics -timeout: 480 -commands: - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py diff --git a/tests/templates/kuttl/smoke/70-assert.yaml.j2 b/tests/templates/kuttl/smoke/70-assert.yaml.j2 new file mode 100644 index 00000000..a811cb8e --- /dev/null +++ b/tests/templates/kuttl/smoke/70-assert.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: metrics +timeout: 480 +commands: +{% if test_scenario['values']['airflow'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow'] }}" +{% endif %} From 5e195cc44075156545f1c82ec9ba53e46b0d7bbc Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Tue, 20 May 2025 12:29:09 +0200 Subject: [PATCH 06/17] tests: use "airflow-latest" instead of "airflow" --- tests/templates/kuttl/ldap/80-assert.yaml.j2 | 6 +++--- tests/templates/kuttl/ldap/90-assert.yaml.j2 | 6 +++--- .../templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 | 6 +++--- .../{60-assert.yaml => 60-assert.yaml.j2} | 6 +++--- tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml.j2 | 6 +++--- 5 files changed, 15 insertions(+), 15 deletions(-) rename tests/templates/kuttl/mount-dags-configmap/{60-assert.yaml => 60-assert.yaml.j2} (74%) diff --git a/tests/templates/kuttl/ldap/80-assert.yaml.j2 b/tests/templates/kuttl/ldap/80-assert.yaml.j2 index 8b1f71bf..b85052aa 100644 --- a/tests/templates/kuttl/ldap/80-assert.yaml.j2 +++ b/tests/templates/kuttl/ldap/80-assert.yaml.j2 @@ -5,8 +5,8 @@ metadata: name: test-airflow-webserver-health-check timeout: 480 commands: -{% if test_scenario['values']['airflow'].find(",") > 0 %} - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" {% else %} - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow'] }}" + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" {% endif %} diff --git a/tests/templates/kuttl/ldap/90-assert.yaml.j2 b/tests/templates/kuttl/ldap/90-assert.yaml.j2 index a811cb8e..7f43f061 100644 --- a/tests/templates/kuttl/ldap/90-assert.yaml.j2 +++ b/tests/templates/kuttl/ldap/90-assert.yaml.j2 @@ -5,8 +5,8 @@ metadata: name: metrics timeout: 480 commands: -{% if test_scenario['values']['airflow'].find(",") > 0 %} - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" {% else %} - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow'] }}" + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" {% endif %} diff --git a/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 b/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 index 9c2e8a22..91c14877 100644 --- a/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 +++ b/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 @@ -5,9 +5,9 @@ metadata: name: test-airflow-webserver-health-check timeout: 480 commands: -{% if test_scenario['values']['airflow'].find(",") > 0 %} - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" {% else %} - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow'] }}" + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" {% endif %} diff --git a/tests/templates/kuttl/mount-dags-configmap/60-assert.yaml b/tests/templates/kuttl/mount-dags-configmap/60-assert.yaml.j2 similarity index 74% rename from tests/templates/kuttl/mount-dags-configmap/60-assert.yaml rename to tests/templates/kuttl/mount-dags-configmap/60-assert.yaml.j2 index a811cb8e..7f43f061 100644 --- a/tests/templates/kuttl/mount-dags-configmap/60-assert.yaml +++ b/tests/templates/kuttl/mount-dags-configmap/60-assert.yaml.j2 @@ -5,8 +5,8 @@ metadata: name: metrics timeout: 480 commands: -{% if test_scenario['values']['airflow'].find(",") > 0 %} - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" {% else %} - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow'] }}" + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" {% endif %} diff --git a/tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml.j2 b/tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml.j2 index 8b1f71bf..b85052aa 100644 --- a/tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml.j2 +++ b/tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml.j2 @@ -5,8 +5,8 @@ metadata: name: test-airflow-webserver-health-check timeout: 480 commands: -{% if test_scenario['values']['airflow'].find(",") > 0 %} - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" {% else %} - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow'] }}" + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" {% endif %} From 999dca01c257ead1ba8394491b1157ed8c967899 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 20 May 2025 16:55:16 +0200 Subject: [PATCH 07/17] cleanup and code comments --- rust/operator-binary/src/crd/mod.rs | 22 ++++--- rust/operator-binary/src/env_vars.rs | 62 +++++++++---------- .../kuttl/oidc/install-airflow.yaml.j2 | 8 +-- .../kuttl/opa/30-install-airflow.yaml.j2 | 8 +-- tests/test-definition.yaml | 11 +++- 5 files changed, 61 insertions(+), 50 deletions(-) diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 07f8cefd..91444c03 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -525,13 +525,19 @@ impl AirflowRole { ]; if resolved_product_image.product_version.starts_with("3.") { + // Start-up commands have changed in 3.x. + // See https://airflow.apache.org/docs/apache-airflow/3.0.1/installation/upgrading_to_airflow3.html#step-6-changes-to-your-startup-scripts and + // https://airflow.apache.org/docs/apache-airflow/3.0.1/installation/setting-up-the-database.html#setting-up-the-database. + // If `airflow db migrate` is not run for each role there may be + // timing issues (services which require the db start before the + // migration is complete). DB-migrations should be eventually be + // optional. See https://github.com/stackabletech/airflow-operator/issues/589. match &self { AirflowRole::Webserver => { command.extend(Self::authentication_start_commands(auth_config)); command.extend(vec![ "prepare_signal_handlers".to_string(), - // format!("containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"), - //"airflow webserver &".to_string(), + format!("containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"), "airflow db migrate".to_string(), "airflow api-server &".to_string(), ]); @@ -547,17 +553,17 @@ impl AirflowRole { --role \"Admin\"" .to_string(), "prepare_signal_handlers".to_string(), - // format!( - // "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" - // ), + format!( + "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" + ), "airflow dag-processor &".to_string(), "airflow scheduler &".to_string(), ]), AirflowRole::Worker => command.extend(vec![ "prepare_signal_handlers".to_string(), - // format!( - // "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" - // ), + format!( + "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" + ), "airflow db migrate".to_string(), "airflow celery worker &".to_string(), ]), diff --git a/rust/operator-binary/src/env_vars.rs b/rust/operator-binary/src/env_vars.rs index 81eabf02..dc4e1016 100644 --- a/rust/operator-binary/src/env_vars.rs +++ b/rust/operator-binary/src/env_vars.rs @@ -73,9 +73,6 @@ pub enum Error { #[snafu(display("object is missing metadata"))] NoMetadata, - - #[snafu(display("cluster is missing webservers role"))] - NoWebserver, } /// Return environment variables to be applied to the statefulsets for the scheduler, webserver (and worker, @@ -325,36 +322,39 @@ fn static_envs(airflow: &v1alpha1::AirflowCluster) -> Result 0 %} - custom: "{{ test_scenario['values']['airflow'].split(',')[1] }}" - productVersion: "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% if test_scenario['values']['airflow-non-experimental'].find(",") > 0 %} + custom: "{{ test_scenario['values']['airflow-non-experimental'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['airflow-non-experimental'].split(',')[0] }}" {% else %} - productVersion: "{{ test_scenario['values']['airflow'] }}" + productVersion: "{{ test_scenario['values']['airflow-non-experimental'] }}" {% endif %} pullPolicy: IfNotPresent clusterConfig: diff --git a/tests/templates/kuttl/opa/30-install-airflow.yaml.j2 b/tests/templates/kuttl/opa/30-install-airflow.yaml.j2 index 6867c5dc..52f7a7bf 100644 --- a/tests/templates/kuttl/opa/30-install-airflow.yaml.j2 +++ b/tests/templates/kuttl/opa/30-install-airflow.yaml.j2 @@ -24,11 +24,11 @@ metadata: name: airflow spec: image: -{% if test_scenario['values']['airflow'].find(",") > 0 %} - custom: "{{ test_scenario['values']['airflow'].split(',')[1] }}" - productVersion: "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% if test_scenario['values']['airflow-non-experimental'].find(",") > 0 %} + custom: "{{ test_scenario['values']['airflow-non-experimental'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['airflow-non-experimental'].split(',')[0] }}" {% else %} - productVersion: "{{ test_scenario['values']['airflow'] }}" + productVersion: "{{ test_scenario['values']['airflow-non-experimental'] }}" {% endif %} pullPolicy: IfNotPresent clusterConfig: diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index f9d5ddca..571abaea 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -7,8 +7,8 @@ dimensions: - name: airflow values: - - 3.0.1 #2.9.3 - 2.10.4 + - 3.0.1 # To use a custom image, add a comma and the full name after the product version # - 2.9.3,oci.stackable.tech/sandbox/airflow:2.9.3-stackable0.0.0-dev - name: airflow-latest @@ -16,6 +16,11 @@ dimensions: - 3.0.1 #2.10.4 # To use a custom image, add a comma and the full name after the product version # - 2.9.3,oci.stackable.tech/sandbox/airflow:2.9.3-stackable0.0.0-dev + - name: airflow-non-experimental + values: + - 2.10.4 + # To use a custom image, add a comma and the full name after the product version + # - 2.9.3,oci.stackable.tech/sandbox/airflow:2.9.3-stackable0.0.0-dev - name: opa-latest values: - 1.0.1 @@ -55,11 +60,11 @@ tests: - executor - name: oidc dimensions: - - airflow + - airflow-non-experimental - openshift - name: opa dimensions: - - airflow + - airflow-non-experimental - opa-latest - openshift - name: resources From 83c03947d72f9ed62c99000ee4763ffa1f6b2091 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 21 May 2025 13:13:36 +0200 Subject: [PATCH 08/17] restore deleted env --- rust/operator-binary/src/env_vars.rs | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/rust/operator-binary/src/env_vars.rs b/rust/operator-binary/src/env_vars.rs index 3aa130a1..b76405d9 100644 --- a/rust/operator-binary/src/env_vars.rs +++ b/rust/operator-binary/src/env_vars.rs @@ -38,6 +38,8 @@ const AIRFLOW_CELERY_RESULT_BACKEND: &str = "AIRFLOW__CELERY__RESULT_BACKEND"; const AIRFLOW_CELERY_BROKER_URL: &str = "AIRFLOW__CELERY__BROKER_URL"; const AIRFLOW_CORE_DAGS_FOLDER: &str = "AIRFLOW__CORE__DAGS_FOLDER"; const AIRFLOW_CORE_LOAD_EXAMPLES: &str = "AIRFLOW__CORE__LOAD_EXAMPLES"; +const AIRFLOW_API_AUTH_BACKENDS: &str = "AIRFLOW__API__AUTH_BACKENDS"; +const AIRFLOW_DATABASE_SQL_ALCHEMY_CONN: &str = "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"; const AIRFLOW_WEBSERVER_EXPOSE_CONFIG: &str = "AIRFLOW__WEBSERVER__EXPOSE_CONFIG"; const AIRFLOW_CORE_EXECUTOR: &str = "AIRFLOW__CORE__EXECUTOR"; @@ -105,9 +107,9 @@ pub fn build_airflow_statefulset_envs( ), ); env.insert( - "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN".into(), + AIRFLOW_DATABASE_SQL_ALCHEMY_CONN.into(), env_var_from_secret( - "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN", + AIRFLOW_DATABASE_SQL_ALCHEMY_CONN, secret, "connections.sqlalchemyDatabaseUri", ), @@ -310,12 +312,23 @@ fn static_envs( ..Default::default() }); - env.insert("AIRFLOW__API__AUTH_BACKENDS".into(), EnvVar { - name: "AIRFLOW__API__AUTH_BACKENDS".into(), + env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar { + name: AIRFLOW_API_AUTH_BACKENDS.into(), value: Some("airflow.api.auth.backend.basic_auth, airflow.api.auth.backend.session".into()), ..Default::default() }); + // As of 3.x a JWT key is required. + // See https://airflow.apache.org/docs/apache-airflow/3.0.1/configurations-ref.html#jwt-secret + // This must be random, but must also be consistent across api-services. + // The key will be consistent for all clusters started by this + // operator instance. TODO: Make this cluster specific. + env.insert("AIRFLOW__API_AUTH__JWT_SECRET".into(), EnvVar { + name: "AIRFLOW__API_AUTH__JWT_SECRET".into(), + value: Some(JWT_KEY.clone()), + ..Default::default() + }); + env } @@ -331,9 +344,9 @@ pub fn build_airflow_template_envs( let secret = airflow.spec.cluster_config.credentials_secret.as_str(); env.insert( - "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN".into(), + AIRFLOW_DATABASE_SQL_ALCHEMY_CONN.into(), env_var_from_secret( - "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN", + AIRFLOW_DATABASE_SQL_ALCHEMY_CONN, secret, "connections.sqlalchemyDatabaseUri", ), @@ -453,6 +466,7 @@ fn execution_server_env_vars(airflow: &v1alpha1::AirflowCluster) -> BTreeMap Date: Wed, 21 May 2025 15:36:09 +0200 Subject: [PATCH 09/17] use correct webserver service --- rust/operator-binary/src/env_vars.rs | 7 ++----- .../kuttl/external-access/install-airflow-cluster.yaml.j2 | 8 ++++---- .../kuttl/logging/41-install-airflow-cluster.yaml.j2 | 8 ++++---- .../mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 | 2 ++ tests/test-definition.yaml | 2 +- 5 files changed, 13 insertions(+), 14 deletions(-) diff --git a/rust/operator-binary/src/env_vars.rs b/rust/operator-binary/src/env_vars.rs index b76405d9..6b8bb6f5 100644 --- a/rust/operator-binary/src/env_vars.rs +++ b/rust/operator-binary/src/env_vars.rs @@ -466,17 +466,14 @@ fn execution_server_env_vars(airflow: &v1alpha1::AirflowCluster) -> BTreeMap 0 %} - custom: "{{ test_scenario['values']['airflow'].split(',')[1] }}" - productVersion: "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + custom: "{{ test_scenario['values']['airflow-latest'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" {% else %} - productVersion: "{{ test_scenario['values']['airflow'] }}" + productVersion: "{{ test_scenario['values']['airflow-latest'] }}" {% endif %} pullPolicy: IfNotPresent clusterConfig: diff --git a/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 index 48d6197b..b00021bc 100644 --- a/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 @@ -139,8 +139,8 @@ spec: config: resources: cpu: - min: 50m - max: 250m + min: 1000m + max: 2000m memory: limit: 3Gi roleGroups: @@ -227,8 +227,8 @@ spec: config: resources: cpu: - min: 100m - max: 500m + min: 1000m + max: 2000m memory: limit: 1Gi roleGroups: diff --git a/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 index 5b31b18e..709b3433 100644 --- a/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 @@ -71,6 +71,8 @@ spec: gitSyncConf: # supply some config to check that safe.directory is correctly set --git-config: http.sslVerify:false + # N.B. dags definitions changed from 2.x to 3.x: + # this test assumes airflow-latest > 2 gitFolder: "mount-dags-gitsync/dags_airflow3" volumeMounts: - name: test-cm-gitsync diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 83aa6bc6..519364d4 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -92,7 +92,7 @@ tests: - openshift - name: external-access dimensions: - - airflow + - airflow-latest - openshift suites: - name: nightly From b97aaca25652ab8b2183d257b1d6dcf9927a45c1 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Wed, 21 May 2025 15:58:03 +0200 Subject: [PATCH 10/17] restore operator to release list --- tests/release.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/release.yaml b/tests/release.yaml index 4729522e..c94e7bcf 100644 --- a/tests/release.yaml +++ b/tests/release.yaml @@ -12,8 +12,8 @@ releases: operatorVersion: 0.0.0-dev listener: operatorVersion: 0.0.0-dev - #airflow: - # operatorVersion: 0.0.0-dev + airflow: + operatorVersion: 0.0.0-dev opa: operatorVersion: 0.0.0-dev spark-k8s: From 31e8c23b1ce03c8df122d25262e90a14e9017983 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Wed, 21 May 2025 17:57:59 +0200 Subject: [PATCH 11/17] test: fix oidc --- .../oidc/50-install-test-container.yaml.j2 | 11 ++++ tests/templates/kuttl/oidc/60-login.yaml | 5 +- tests/templates/kuttl/oidc/login.py | 56 ++++++++++++++++--- tests/test-definition.yaml | 2 +- 4 files changed, 61 insertions(+), 13 deletions(-) diff --git a/tests/templates/kuttl/oidc/50-install-test-container.yaml.j2 b/tests/templates/kuttl/oidc/50-install-test-container.yaml.j2 index 5119ca9b..788d079e 100644 --- a/tests/templates/kuttl/oidc/50-install-test-container.yaml.j2 +++ b/tests/templates/kuttl/oidc/50-install-test-container.yaml.j2 @@ -71,6 +71,17 @@ spec: env: - name: REQUESTS_CA_BUNDLE value: /stackable/tls/ca.crt + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: AIRFLOW_VERSION +{% if test_scenario['values']['airflow-non-experimental'].find(",") > 0 %} + value: "{{ test_scenario['values']['airflow-non-experimental'].split(',')[0] }}" +{% else %} + value: "{{ test_scenario['values']['airflow-non-experimental'] }}" +{% endif %} + volumes: - name: tls csi: diff --git a/tests/templates/kuttl/oidc/60-login.yaml b/tests/templates/kuttl/oidc/60-login.yaml index 0745bc4b..34c10ae7 100644 --- a/tests/templates/kuttl/oidc/60-login.yaml +++ b/tests/templates/kuttl/oidc/60-login.yaml @@ -4,6 +4,5 @@ kind: TestStep metadata: name: login commands: - - script: > - envsubst '$NAMESPACE' < login.py | - kubectl exec -n $NAMESPACE -i python-0 -- tee /stackable/login.py > /dev/null + - script: | + kubectl cp -n $NAMESPACE login.py python-0:/stackable/login.py diff --git a/tests/templates/kuttl/oidc/login.py b/tests/templates/kuttl/oidc/login.py index aa859803..69d3ce51 100644 --- a/tests/templates/kuttl/oidc/login.py +++ b/tests/templates/kuttl/oidc/login.py @@ -3,22 +3,53 @@ import logging import requests import sys +import os from bs4 import BeautifulSoup logging.basicConfig( level="DEBUG", format="%(asctime)s %(levelname)s: %(message)s", stream=sys.stdout ) +log = logging.getLogger(__name__) + + +def assert_equal(a, b, msg): + if a != b: + raise AssertionError(f"{msg}\n\tleft: {a}\n\tright: {b}") + + +def assert_startwith(a, b, msg): + if not a.startswith(b): + raise AssertionError(f"{msg}\n\tleft: {a}\n\tright: {b}") + + +def login_page(base_url: str, airflow_version: str) -> str: + if airflow_version.startswith("3"): + return f"{base_url}/auth/login/keycloak?next=" + else: + return f"{base_url}/login/keycloak?next=" + + +def userinfo_page(base_url: str, airflow_version: str) -> str: + if airflow_version.startswith("3"): + return f"{base_url}/auth/users/userinfo/" + else: + return f"{base_url}/users/userinfo/" + + session = requests.Session() url = "http://airflow-webserver-default:8080" # Click on "Sign In with keycloak" in Airflow -login_page = session.get(f"{url}/login/keycloak?next=") +login_page = session.get(login_page(url, os.environ["AIRFLOW_VERSION"])) assert login_page.ok, "Redirection from Airflow to Keycloak failed" -assert login_page.url.startswith( - "https://keycloak1.$NAMESPACE.svc.cluster.local:8443/realms/test1/protocol/openid-connect/auth?response_type=code&client_id=airflow1" -), "Redirection to the Keycloak login page expected" + +assert_startwith( + login_page.url, + f"https://keycloak1.{os.environ['NAMESPACE']}.svc.cluster.local:8443/realms/test1/protocol/openid-connect/auth?response_type=code&client_id=airflow1", + "Redirection to the Keycloak login page expected", +) # Enter username and password into the Keycloak login page and click on "Sign In" login_page_html = BeautifulSoup(login_page.text, "html.parser") @@ -28,16 +59,19 @@ ) assert welcome_page.ok, "Login failed" -assert welcome_page.url == f"{url}/home", ( - "Redirection to the Airflow home page expected" +assert_equal( + welcome_page.url, f"{url}/", "Redirection to the Airflow home page expected" ) # Open the user information page in Airflow -userinfo_page = session.get(f"{url}/users/userinfo/") +userinfo_url = userinfo_page(url, os.environ["AIRFLOW_VERSION"]) +userinfo_page = session.get(userinfo_url) assert userinfo_page.ok, "Retrieving user information failed" -assert userinfo_page.url == f"{url}/users/userinfo/", ( - "Redirection to the Airflow user info page expected" +assert_equal( + userinfo_page.url, + userinfo_url, + "Redirection to the Airflow user info page expected", ) # Expect the user data provided by Keycloak in Airflow @@ -45,6 +79,8 @@ table_rows = userinfo_page_html.find_all("tr") user_data = {tr.find("th").text: tr.find("td").text for tr in table_rows} +log.debug(f"{user_data=}") + assert user_data["First Name"] == "Jane", ( "The first name of the user in Airflow should match the one provided by Keycloak" ) @@ -55,6 +91,8 @@ "The email of the user in Airflow should match the one provided by Keycloak" ) +log.info("OIDC login test passed") + # Later this can be extended to use different OIDC providers (currently only Keycloak is # supported) # diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 519364d4..f3c53689 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -20,7 +20,7 @@ dimensions: # - 2.9.3,oci.stackable.tech/sandbox/airflow:2.9.3-stackable0.0.0-dev - name: airflow-non-experimental values: - - 2.10.5 + - 3.0.1 # To use a custom image, add a comma and the full name after the product version # - 2.9.3,oci.stackable.tech/sandbox/airflow:2.9.3-stackable0.0.0-dev - name: opa-latest From 5b772cccef8fa3e6ef16e0c45dfeebceac15d625 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Thu, 22 May 2025 13:44:21 +0200 Subject: [PATCH 12/17] wip: get logging tests to work post-merge --- .../operator-binary/src/airflow_controller.rs | 6 +- rust/operator-binary/src/crd/mod.rs | 1 - rust/operator-binary/src/env_vars.rs | 60 +++++++++++-------- .../{40-assert.yaml => 40-assert.yaml.j2} | 45 +++++++------- .../install-airflow-cluster.yaml.j2 | 11 ++++ .../kuttl/ldap/70-install-airflow-python.yaml | 1 + .../logging/50-install-airflow-python.yaml | 1 + .../40-install-airflow-python.yaml | 1 + .../40-install-airflow-python.yaml | 1 + .../oidc/50-install-test-container.yaml.j2 | 1 + .../opa/40-install-test-container.yaml.j2 | 1 + .../smoke/50-install-airflow-python.yaml | 1 + tests/test-definition.yaml | 1 + 13 files changed, 83 insertions(+), 48 deletions(-) rename tests/templates/kuttl/external-access/{40-assert.yaml => 40-assert.yaml.j2} (96%) diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index 57f6e9c0..0523cac3 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -981,6 +981,7 @@ fn build_server_rolegroup_statefulset( authentication_config, authorization_config, git_sync_resources, + &rolegroup_ref.role_group, ) .context(BuildStatefulsetEnvVarsSnafu)?, ); @@ -1171,7 +1172,10 @@ fn build_server_rolegroup_statefulset( match_labels: Some(statefulset_match_labels.into()), ..LabelSelector::default() }, - service_name: Some(rolegroup_ref.object_name()), + service_name: Some(format!( + "{name}-metrics", + name = rolegroup_ref.object_name() + )), template: pod_template, volume_claim_templates: pvcs, ..StatefulSetSpec::default() diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 24c41c63..6d4d008c 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -568,7 +568,6 @@ impl AirflowRole { command.extend(vec![ "prepare_signal_handlers".to_string(), format!("containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"), - "airflow db migrate".to_string(), "airflow api-server &".to_string(), ]); } diff --git a/rust/operator-binary/src/env_vars.rs b/rust/operator-binary/src/env_vars.rs index 6b8bb6f5..029218b1 100644 --- a/rust/operator-binary/src/env_vars.rs +++ b/rust/operator-binary/src/env_vars.rs @@ -75,6 +75,7 @@ pub enum Error { /// Return environment variables to be applied to the statefulsets for the scheduler, webserver (and worker, /// for clusters utilizing `celeryExecutor`: for clusters using `kubernetesExecutor` a different set will be /// used which is defined in [`build_airflow_template_envs`]). +#[allow(clippy::too_many_arguments)] pub fn build_airflow_statefulset_envs( airflow: &v1alpha1::AirflowCluster, airflow_role: &AirflowRole, @@ -83,11 +84,12 @@ pub fn build_airflow_statefulset_envs( auth_config: &AirflowClientAuthenticationDetailsResolved, authorization_config: &AirflowAuthorizationResolved, git_sync_resources: &git_sync::v1alpha1::GitSyncResources, + rolegroup: &String, ) -> Result, Error> { let mut env: BTreeMap = BTreeMap::new(); env.extend(static_envs(git_sync_resources)); - env.extend(execution_server_env_vars(airflow)); + env.extend(execution_server_env_vars(airflow, rolegroup)); // environment variables let env_vars = rolegroup_config.get(&PropertyNameKind::Env); @@ -312,6 +314,8 @@ fn static_envs( ..Default::default() }); + // Basic auth is only relevant to 2.x and can be removed once + // that version is no longer supported. env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar { name: AIRFLOW_API_AUTH_BACKENDS.into(), value: Some("airflow.api.auth.backend.basic_auth, airflow.api.auth.backend.session".into()), @@ -374,7 +378,16 @@ pub fn build_airflow_template_envs( }); env.extend(static_envs(git_sync_resources)); - env.extend(execution_server_env_vars(airflow)); + + // It does not appear to be possible for kubernetesExecutors to work with + // multiple webserver rolegroups. For the celery case, each executor sets + // the execution server from the associated rolegroup, but for kubernetes + // workers this is not possible. + if let Some(webserver_role) = airflow.spec.webservers.as_ref() { + if let Some(rolegroup) = webserver_role.role_groups.iter().next() { + env.extend(execution_server_env_vars(airflow, rolegroup.0)); + } + } // _STACKABLE_POST_HOOK will contain a command to create a shutdown hook that will be // evaluated in the wrapper for each stackable spark container: this is necessary for pods @@ -453,31 +466,28 @@ fn authorization_env_vars(authorization_config: &AirflowAuthorizationResolved) - env } -fn execution_server_env_vars(airflow: &v1alpha1::AirflowCluster) -> BTreeMap { +fn execution_server_env_vars( + airflow: &v1alpha1::AirflowCluster, + rolegroup: &String, +) -> BTreeMap { let mut env: BTreeMap = BTreeMap::new(); - if let Some(webserver_role) = airflow.spec.webservers.as_ref() { - if let Some(rolegroup) = webserver_role.role_groups.iter().next() { - if let Some(name) = airflow.metadata.name.as_ref() { - let webserver = format!( - "{name}-webserver-{rolegroup}", - name = name, - rolegroup = rolegroup.0 - ); - tracing::info!("Webserver set [{webserver}]"); - - env.insert("AIRFLOW__CORE__EXECUTION_API_SERVER_URL".into(), EnvVar { - name: "AIRFLOW__CORE__EXECUTION_API_SERVER_URL".into(), - value: Some(format!("http://{webserver}:8080/execution/")), - ..Default::default() - }); - env.insert("AIRFLOW__CORE__BASE_URL".into(), EnvVar { - name: "AIRFLOW__CORE__BASE_URL".into(), - value: Some(format!("http://{webserver}:8080/")), - ..Default::default() - }); - } - } + if let Some(name) = airflow.metadata.name.as_ref() { + let webserver = format!("{name}-webserver-{rolegroup}-metrics",); + tracing::debug!("Webserver set [{webserver}]"); + + // These settings are new in 3.x and will have no affect with earlier versions. + env.insert("AIRFLOW__CORE__EXECUTION_API_SERVER_URL".into(), EnvVar { + name: "AIRFLOW__CORE__EXECUTION_API_SERVER_URL".into(), + value: Some(format!("http://{webserver}:8080/execution/")), + ..Default::default() + }); + env.insert("AIRFLOW__CORE__BASE_URL".into(), EnvVar { + name: "AIRFLOW__CORE__BASE_URL".into(), + value: Some(format!("http://{webserver}:8080/")), + ..Default::default() + }); } + env } diff --git a/tests/templates/kuttl/external-access/40-assert.yaml b/tests/templates/kuttl/external-access/40-assert.yaml.j2 similarity index 96% rename from tests/templates/kuttl/external-access/40-assert.yaml rename to tests/templates/kuttl/external-access/40-assert.yaml.j2 index b7a9f06f..5c305154 100644 --- a/tests/templates/kuttl/external-access/40-assert.yaml +++ b/tests/templates/kuttl/external-access/40-assert.yaml.j2 @@ -43,18 +43,6 @@ status: --- apiVersion: apps/v1 kind: StatefulSet -metadata: - name: airflow-worker-default -spec: - template: - spec: - terminationGracePeriodSeconds: 300 -status: - readyReplicas: 2 - replicas: 2 ---- -apiVersion: apps/v1 -kind: StatefulSet metadata: name: airflow-scheduler-default spec: @@ -76,15 +64,6 @@ status: --- apiVersion: policy/v1 kind: PodDisruptionBudget -metadata: - name: airflow-worker -status: - expectedPods: 2 - currentHealthy: 2 - disruptionsAllowed: 1 ---- -apiVersion: policy/v1 -kind: PodDisruptionBudget metadata: name: airflow-scheduler status: @@ -112,3 +91,27 @@ metadata: name: airflow-webserver-external-unstable spec: type: NodePort # external-unstable + +{% if test_scenario['values']['executor'] == 'celery' %} +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-worker-default +spec: + template: + spec: + terminationGracePeriodSeconds: 300 +status: + readyReplicas: 2 + replicas: 2 +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: airflow-worker +status: + expectedPods: 2 + currentHealthy: 2 + disruptionsAllowed: 1 +{% endif %} diff --git a/tests/templates/kuttl/external-access/install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/external-access/install-airflow-cluster.yaml.j2 index 23b43519..935037a2 100644 --- a/tests/templates/kuttl/external-access/install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/external-access/install-airflow-cluster.yaml.j2 @@ -45,10 +45,21 @@ spec: replicas: 1 config: listenerClass: test-cluster-internal-$NAMESPACE +{% if test_scenario['values']['executor'] == 'celery' %} celeryExecutors: roleGroups: default: replicas: 2 +{% elif test_scenario['values']['executor'] == 'kubernetes' %} + kubernetesExecutors: + config: + resources: + cpu: + min: 100m + max: 500m + memory: + limit: 1Gi +{% endif %} schedulers: roleGroups: default: diff --git a/tests/templates/kuttl/ldap/70-install-airflow-python.yaml b/tests/templates/kuttl/ldap/70-install-airflow-python.yaml index ba4d16a8..c3f865a0 100644 --- a/tests/templates/kuttl/ldap/70-install-airflow-python.yaml +++ b/tests/templates/kuttl/ldap/70-install-airflow-python.yaml @@ -18,5 +18,6 @@ spec: containers: - name: test-airflow-python image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent stdin: true tty: true diff --git a/tests/templates/kuttl/logging/50-install-airflow-python.yaml b/tests/templates/kuttl/logging/50-install-airflow-python.yaml index ba4d16a8..c3f865a0 100644 --- a/tests/templates/kuttl/logging/50-install-airflow-python.yaml +++ b/tests/templates/kuttl/logging/50-install-airflow-python.yaml @@ -18,5 +18,6 @@ spec: containers: - name: test-airflow-python image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent stdin: true tty: true diff --git a/tests/templates/kuttl/mount-dags-configmap/40-install-airflow-python.yaml b/tests/templates/kuttl/mount-dags-configmap/40-install-airflow-python.yaml index ba4d16a8..c3f865a0 100644 --- a/tests/templates/kuttl/mount-dags-configmap/40-install-airflow-python.yaml +++ b/tests/templates/kuttl/mount-dags-configmap/40-install-airflow-python.yaml @@ -18,5 +18,6 @@ spec: containers: - name: test-airflow-python image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent stdin: true tty: true diff --git a/tests/templates/kuttl/mount-dags-gitsync/40-install-airflow-python.yaml b/tests/templates/kuttl/mount-dags-gitsync/40-install-airflow-python.yaml index ba4d16a8..c3f865a0 100644 --- a/tests/templates/kuttl/mount-dags-gitsync/40-install-airflow-python.yaml +++ b/tests/templates/kuttl/mount-dags-gitsync/40-install-airflow-python.yaml @@ -18,5 +18,6 @@ spec: containers: - name: test-airflow-python image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent stdin: true tty: true diff --git a/tests/templates/kuttl/oidc/50-install-test-container.yaml.j2 b/tests/templates/kuttl/oidc/50-install-test-container.yaml.j2 index 788d079e..c5369e43 100644 --- a/tests/templates/kuttl/oidc/50-install-test-container.yaml.j2 +++ b/tests/templates/kuttl/oidc/50-install-test-container.yaml.j2 @@ -56,6 +56,7 @@ spec: containers: - name: python image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent stdin: true tty: true resources: diff --git a/tests/templates/kuttl/opa/40-install-test-container.yaml.j2 b/tests/templates/kuttl/opa/40-install-test-container.yaml.j2 index 7feb7246..939fb1cf 100644 --- a/tests/templates/kuttl/opa/40-install-test-container.yaml.j2 +++ b/tests/templates/kuttl/opa/40-install-test-container.yaml.j2 @@ -56,6 +56,7 @@ spec: containers: - name: test-runner image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent stdin: true tty: true resources: diff --git a/tests/templates/kuttl/smoke/50-install-airflow-python.yaml b/tests/templates/kuttl/smoke/50-install-airflow-python.yaml index e087a848..7be1fbf8 100644 --- a/tests/templates/kuttl/smoke/50-install-airflow-python.yaml +++ b/tests/templates/kuttl/smoke/50-install-airflow-python.yaml @@ -18,6 +18,7 @@ spec: containers: - name: test-airflow-python image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent stdin: true tty: true resources: diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index f3c53689..da5104cf 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -94,6 +94,7 @@ tests: dimensions: - airflow-latest - openshift + - executor suites: - name: nightly # Run nightly with the latest airflow From 1e105ee5d6c83a885c50ea7fcb2d23aeb450df6c Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Fri, 23 May 2025 08:11:48 +0200 Subject: [PATCH 13/17] make env-vars version-specific --- .../operator-binary/src/airflow_controller.rs | 2 + rust/operator-binary/src/env_vars.rs | 114 ++++++++++++------ .../kuttl/external-access/40-assert.yaml.j2 | 16 +-- .../install-airflow-cluster.yaml.j2 | 17 ++- 4 files changed, 103 insertions(+), 46 deletions(-) diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index 0523cac3..e4c7bbc2 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -982,6 +982,7 @@ fn build_server_rolegroup_statefulset( authorization_config, git_sync_resources, &rolegroup_ref.role_group, + resolved_product_image, ) .context(BuildStatefulsetEnvVarsSnafu)?, ); @@ -1268,6 +1269,7 @@ fn build_executor_template_config_map( env_overrides, merged_executor_config, git_sync_resources, + resolved_product_image, )) .add_volume_mounts(airflow.volume_mounts()) .context(AddVolumeMountSnafu)? diff --git a/rust/operator-binary/src/env_vars.rs b/rust/operator-binary/src/env_vars.rs index 029218b1..f3cbe6ec 100644 --- a/rust/operator-binary/src/env_vars.rs +++ b/rust/operator-binary/src/env_vars.rs @@ -9,6 +9,7 @@ use product_config::types::PropertyNameKind; use rand::Rng; use snafu::Snafu; use stackable_operator::{ + commons::product_image_selection::ResolvedProductImage, crd::{authentication::oidc, git_sync}, k8s_openapi::api::core::v1::EnvVar, kube::ResourceExt, @@ -85,11 +86,45 @@ pub fn build_airflow_statefulset_envs( authorization_config: &AirflowAuthorizationResolved, git_sync_resources: &git_sync::v1alpha1::GitSyncResources, rolegroup: &String, + resolved_product_image: &ResolvedProductImage, ) -> Result, Error> { let mut env: BTreeMap = BTreeMap::new(); env.extend(static_envs(git_sync_resources)); - env.extend(execution_server_env_vars(airflow, rolegroup)); + + if resolved_product_image.product_version.starts_with("3.") { + env.extend(execution_server_env_vars(airflow, rolegroup)); + env.insert(AIRFLOW_CORE_AUTH_MANAGER.into(), EnvVar { + name: AIRFLOW_CORE_AUTH_MANAGER.into(), + value: Some( + "airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager".to_string(), + ), + ..Default::default() + }); + env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar { + name: AIRFLOW_API_AUTH_BACKENDS.into(), + value: Some("airflow.api.auth.backend.session".into()), + ..Default::default() + }); + // As of 3.x a JWT key is required. + // See https://airflow.apache.org/docs/apache-airflow/3.0.1/configurations-ref.html#jwt-secret + // This must be random, but must also be consistent across api-services. + // The key will be consistent for all clusters started by this + // operator instance. TODO: Make this cluster specific. + env.insert("AIRFLOW__API_AUTH__JWT_SECRET".into(), EnvVar { + name: "AIRFLOW__API_AUTH__JWT_SECRET".into(), + value: Some(JWT_KEY.clone()), + ..Default::default() + }); + } else { + env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar { + name: AIRFLOW_API_AUTH_BACKENDS.into(), + value: Some( + "airflow.api.auth.backend.basic_auth, airflow.api.auth.backend.session".into(), + ), + ..Default::default() + }); + } // environment variables let env_vars = rolegroup_config.get(&PropertyNameKind::Env); @@ -306,33 +341,6 @@ fn static_envs( ..Default::default() }); - env.insert(AIRFLOW_CORE_AUTH_MANAGER.into(), EnvVar { - name: AIRFLOW_CORE_AUTH_MANAGER.into(), - value: Some( - "airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager".to_string(), - ), - ..Default::default() - }); - - // Basic auth is only relevant to 2.x and can be removed once - // that version is no longer supported. - env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar { - name: AIRFLOW_API_AUTH_BACKENDS.into(), - value: Some("airflow.api.auth.backend.basic_auth, airflow.api.auth.backend.session".into()), - ..Default::default() - }); - - // As of 3.x a JWT key is required. - // See https://airflow.apache.org/docs/apache-airflow/3.0.1/configurations-ref.html#jwt-secret - // This must be random, but must also be consistent across api-services. - // The key will be consistent for all clusters started by this - // operator instance. TODO: Make this cluster specific. - env.insert("AIRFLOW__API_AUTH__JWT_SECRET".into(), EnvVar { - name: "AIRFLOW__API_AUTH__JWT_SECRET".into(), - value: Some(JWT_KEY.clone()), - ..Default::default() - }); - env } @@ -343,6 +351,7 @@ pub fn build_airflow_template_envs( env_overrides: &HashMap, config: &ExecutorConfig, git_sync_resources: &git_sync::v1alpha1::GitSyncResources, + resolved_product_image: &ResolvedProductImage, ) -> Vec { let mut env: BTreeMap = BTreeMap::new(); let secret = airflow.spec.cluster_config.credentials_secret.as_str(); @@ -379,14 +388,47 @@ pub fn build_airflow_template_envs( env.extend(static_envs(git_sync_resources)); - // It does not appear to be possible for kubernetesExecutors to work with - // multiple webserver rolegroups. For the celery case, each executor sets - // the execution server from the associated rolegroup, but for kubernetes - // workers this is not possible. - if let Some(webserver_role) = airflow.spec.webservers.as_ref() { - if let Some(rolegroup) = webserver_role.role_groups.iter().next() { - env.extend(execution_server_env_vars(airflow, rolegroup.0)); + if resolved_product_image.product_version.starts_with("3.") { + // It does not appear to be possible for kubernetesExecutors to work with + // multiple webserver rolegroups. For the celery case, each executor sets + // the execution server from the associated rolegroup, but for kubernetes + // workers this is not possible. + if let Some(webserver_role) = airflow.spec.webservers.as_ref() { + if let Some(rolegroup) = webserver_role.role_groups.iter().next() { + env.extend(execution_server_env_vars(airflow, rolegroup.0)); + } + env.insert(AIRFLOW_CORE_AUTH_MANAGER.into(), EnvVar { + name: AIRFLOW_CORE_AUTH_MANAGER.into(), + value: Some( + "airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager" + .to_string(), + ), + ..Default::default() + }); + env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar { + name: AIRFLOW_API_AUTH_BACKENDS.into(), + value: Some("airflow.api.auth.backend.session".into()), + ..Default::default() + }); + // As of 3.x a JWT key is required. + // See https://airflow.apache.org/docs/apache-airflow/3.0.1/configurations-ref.html#jwt-secret + // This must be random, but must also be consistent across api-services. + // The key will be consistent for all clusters started by this + // operator instance. TODO: Make this cluster specific. + env.insert("AIRFLOW__API_AUTH__JWT_SECRET".into(), EnvVar { + name: "AIRFLOW__API_AUTH__JWT_SECRET".into(), + value: Some(JWT_KEY.clone()), + ..Default::default() + }); } + } else { + env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar { + name: AIRFLOW_API_AUTH_BACKENDS.into(), + value: Some( + "airflow.api.auth.backend.basic_auth, airflow.api.auth.backend.session".into(), + ), + ..Default::default() + }); } // _STACKABLE_POST_HOOK will contain a command to create a shutdown hook that will be @@ -473,7 +515,7 @@ fn execution_server_env_vars( let mut env: BTreeMap = BTreeMap::new(); if let Some(name) = airflow.metadata.name.as_ref() { - let webserver = format!("{name}-webserver-{rolegroup}-metrics",); + let webserver = format!("{name}-webserver-{rolegroup}",); tracing::debug!("Webserver set [{webserver}]"); // These settings are new in 3.x and will have no affect with earlier versions. diff --git a/tests/templates/kuttl/external-access/40-assert.yaml.j2 b/tests/templates/kuttl/external-access/40-assert.yaml.j2 index 5c305154..dd58e251 100644 --- a/tests/templates/kuttl/external-access/40-assert.yaml.j2 +++ b/tests/templates/kuttl/external-access/40-assert.yaml.j2 @@ -22,8 +22,8 @@ spec: spec: terminationGracePeriodSeconds: 120 status: - readyReplicas: 2 - replicas: 2 + readyReplicas: 1 + replicas: 1 --- apiVersion: apps/v1 kind: StatefulSet @@ -58,8 +58,8 @@ kind: PodDisruptionBudget metadata: name: airflow-webserver status: - expectedPods: 4 - currentHealthy: 4 + expectedPods: 3 + currentHealthy: 3 disruptionsAllowed: 1 --- apiVersion: policy/v1 @@ -103,15 +103,15 @@ spec: spec: terminationGracePeriodSeconds: 300 status: - readyReplicas: 2 - replicas: 2 + readyReplicas: 1 + replicas: 1 --- apiVersion: policy/v1 kind: PodDisruptionBudget metadata: name: airflow-worker status: - expectedPods: 2 - currentHealthy: 2 + expectedPods: 1 + currentHealthy: 1 disruptionsAllowed: 1 {% endif %} diff --git a/tests/templates/kuttl/external-access/install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/external-access/install-airflow-cluster.yaml.j2 index 935037a2..b66bff08 100644 --- a/tests/templates/kuttl/external-access/install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/external-access/install-airflow-cluster.yaml.j2 @@ -34,9 +34,15 @@ spec: webservers: config: listenerClass: test-external-stable-$NAMESPACE + resources: + cpu: + min: 1000m + max: 2000m + memory: + limit: 2Gi roleGroups: default: - replicas: 2 + replicas: 1 external-unstable: replicas: 1 config: @@ -49,7 +55,7 @@ spec: celeryExecutors: roleGroups: default: - replicas: 2 + replicas: 1 {% elif test_scenario['values']['executor'] == 'kubernetes' %} kubernetesExecutors: config: @@ -61,6 +67,13 @@ spec: limit: 1Gi {% endif %} schedulers: + config: + resources: + cpu: + min: 1000m + max: 2000m + memory: + limit: 1Gi roleGroups: default: replicas: 1 From 714afb6c3b51024433a7b7ead8b0a5b5570ccc84 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Fri, 23 May 2025 15:52:10 +0200 Subject: [PATCH 14/17] fixed resolution of webserver url for execution api --- .../operator-binary/src/airflow_controller.rs | 1 - rust/operator-binary/src/env_vars.rs | 166 ++++++++---------- .../install-airflow-cluster.yaml.j2 | 2 +- 3 files changed, 73 insertions(+), 96 deletions(-) diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index e4c7bbc2..98aa557d 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -981,7 +981,6 @@ fn build_server_rolegroup_statefulset( authentication_config, authorization_config, git_sync_resources, - &rolegroup_ref.role_group, resolved_product_image, ) .context(BuildStatefulsetEnvVarsSnafu)?, diff --git a/rust/operator-binary/src/env_vars.rs b/rust/operator-binary/src/env_vars.rs index f3cbe6ec..4d96c898 100644 --- a/rust/operator-binary/src/env_vars.rs +++ b/rust/operator-binary/src/env_vars.rs @@ -85,46 +85,13 @@ pub fn build_airflow_statefulset_envs( auth_config: &AirflowClientAuthenticationDetailsResolved, authorization_config: &AirflowAuthorizationResolved, git_sync_resources: &git_sync::v1alpha1::GitSyncResources, - rolegroup: &String, resolved_product_image: &ResolvedProductImage, ) -> Result, Error> { let mut env: BTreeMap = BTreeMap::new(); env.extend(static_envs(git_sync_resources)); - if resolved_product_image.product_version.starts_with("3.") { - env.extend(execution_server_env_vars(airflow, rolegroup)); - env.insert(AIRFLOW_CORE_AUTH_MANAGER.into(), EnvVar { - name: AIRFLOW_CORE_AUTH_MANAGER.into(), - value: Some( - "airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager".to_string(), - ), - ..Default::default() - }); - env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar { - name: AIRFLOW_API_AUTH_BACKENDS.into(), - value: Some("airflow.api.auth.backend.session".into()), - ..Default::default() - }); - // As of 3.x a JWT key is required. - // See https://airflow.apache.org/docs/apache-airflow/3.0.1/configurations-ref.html#jwt-secret - // This must be random, but must also be consistent across api-services. - // The key will be consistent for all clusters started by this - // operator instance. TODO: Make this cluster specific. - env.insert("AIRFLOW__API_AUTH__JWT_SECRET".into(), EnvVar { - name: "AIRFLOW__API_AUTH__JWT_SECRET".into(), - value: Some(JWT_KEY.clone()), - ..Default::default() - }); - } else { - env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar { - name: AIRFLOW_API_AUTH_BACKENDS.into(), - value: Some( - "airflow.api.auth.backend.basic_auth, airflow.api.auth.backend.session".into(), - ), - ..Default::default() - }); - } + add_version_specific_env_vars(airflow, resolved_product_image, &mut env); // environment variables let env_vars = rolegroup_config.get(&PropertyNameKind::Env); @@ -388,48 +355,7 @@ pub fn build_airflow_template_envs( env.extend(static_envs(git_sync_resources)); - if resolved_product_image.product_version.starts_with("3.") { - // It does not appear to be possible for kubernetesExecutors to work with - // multiple webserver rolegroups. For the celery case, each executor sets - // the execution server from the associated rolegroup, but for kubernetes - // workers this is not possible. - if let Some(webserver_role) = airflow.spec.webservers.as_ref() { - if let Some(rolegroup) = webserver_role.role_groups.iter().next() { - env.extend(execution_server_env_vars(airflow, rolegroup.0)); - } - env.insert(AIRFLOW_CORE_AUTH_MANAGER.into(), EnvVar { - name: AIRFLOW_CORE_AUTH_MANAGER.into(), - value: Some( - "airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager" - .to_string(), - ), - ..Default::default() - }); - env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar { - name: AIRFLOW_API_AUTH_BACKENDS.into(), - value: Some("airflow.api.auth.backend.session".into()), - ..Default::default() - }); - // As of 3.x a JWT key is required. - // See https://airflow.apache.org/docs/apache-airflow/3.0.1/configurations-ref.html#jwt-secret - // This must be random, but must also be consistent across api-services. - // The key will be consistent for all clusters started by this - // operator instance. TODO: Make this cluster specific. - env.insert("AIRFLOW__API_AUTH__JWT_SECRET".into(), EnvVar { - name: "AIRFLOW__API_AUTH__JWT_SECRET".into(), - value: Some(JWT_KEY.clone()), - ..Default::default() - }); - } - } else { - env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar { - name: AIRFLOW_API_AUTH_BACKENDS.into(), - value: Some( - "airflow.api.auth.backend.basic_auth, airflow.api.auth.backend.session".into(), - ), - ..Default::default() - }); - } + add_version_specific_env_vars(airflow, resolved_product_image, &mut env); // _STACKABLE_POST_HOOK will contain a command to create a shutdown hook that will be // evaluated in the wrapper for each stackable spark container: this is necessary for pods @@ -462,6 +388,46 @@ pub fn build_airflow_template_envs( transform_map_to_vec(env) } +fn add_version_specific_env_vars( + airflow: &v1alpha1::AirflowCluster, + resolved_product_image: &ResolvedProductImage, + env: &mut BTreeMap, +) { + if resolved_product_image.product_version.starts_with("3.") { + env.extend(execution_server_env_vars(airflow)); + env.insert(AIRFLOW_CORE_AUTH_MANAGER.into(), EnvVar { + name: AIRFLOW_CORE_AUTH_MANAGER.into(), + value: Some( + "airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager".to_string(), + ), + ..Default::default() + }); + env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar { + name: AIRFLOW_API_AUTH_BACKENDS.into(), + value: Some("airflow.api.auth.backend.session".into()), + ..Default::default() + }); + // As of 3.x a JWT key is required. + // See https://airflow.apache.org/docs/apache-airflow/3.0.1/configurations-ref.html#jwt-secret + // This must be random, but must also be consistent across api-services. + // The key will be consistent for all clusters started by this + // operator instance. TODO: Make this cluster specific. + env.insert("AIRFLOW__API_AUTH__JWT_SECRET".into(), EnvVar { + name: "AIRFLOW__API_AUTH__JWT_SECRET".into(), + value: Some(JWT_KEY.clone()), + ..Default::default() + }); + } else { + env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar { + name: AIRFLOW_API_AUTH_BACKENDS.into(), + value: Some( + "airflow.api.auth.backend.basic_auth, airflow.api.auth.backend.session".into(), + ), + ..Default::default() + }); + } +} + // Internally the environment variable collection uses a map so that overrides can actually // override existing keys. The returned collection will be a vector. fn transform_map_to_vec(env_map: BTreeMap) -> Vec { @@ -508,27 +474,39 @@ fn authorization_env_vars(authorization_config: &AirflowAuthorizationResolved) - env } -fn execution_server_env_vars( - airflow: &v1alpha1::AirflowCluster, - rolegroup: &String, -) -> BTreeMap { +fn execution_server_env_vars(airflow: &v1alpha1::AirflowCluster) -> BTreeMap { let mut env: BTreeMap = BTreeMap::new(); if let Some(name) = airflow.metadata.name.as_ref() { - let webserver = format!("{name}-webserver-{rolegroup}",); - tracing::debug!("Webserver set [{webserver}]"); - - // These settings are new in 3.x and will have no affect with earlier versions. - env.insert("AIRFLOW__CORE__EXECUTION_API_SERVER_URL".into(), EnvVar { - name: "AIRFLOW__CORE__EXECUTION_API_SERVER_URL".into(), - value: Some(format!("http://{webserver}:8080/execution/")), - ..Default::default() - }); - env.insert("AIRFLOW__CORE__BASE_URL".into(), EnvVar { - name: "AIRFLOW__CORE__BASE_URL".into(), - value: Some(format!("http://{webserver}:8080/")), - ..Default::default() - }); + // The execution API server URL can be any webserver (if there + // are multiple ones). Parse the list of webservers in a deterministic + // way by iterating over a BTree map rather than the HashMap. + if let Some(webserver_role) = airflow.spec.webservers.as_ref() { + if let Some(rolegroup) = webserver_role + .role_groups + .iter() + .collect::>() + .first_entry() + { + let webserver = format!( + "{name}-webserver-{rolegroup}", + name = name, + rolegroup = rolegroup.key() + ); + tracing::debug!("Webserver set [{webserver}]"); + // These settings are new in 3.x and will have no affect with earlier versions. + env.insert("AIRFLOW__CORE__EXECUTION_API_SERVER_URL".into(), EnvVar { + name: "AIRFLOW__CORE__EXECUTION_API_SERVER_URL".into(), + value: Some(format!("http://{webserver}:8080/execution/")), + ..Default::default() + }); + env.insert("AIRFLOW__CORE__BASE_URL".into(), EnvVar { + name: "AIRFLOW__CORE__BASE_URL".into(), + value: Some(format!("http://{webserver}:8080/")), + ..Default::default() + }); + } + } } env diff --git a/tests/templates/kuttl/external-access/install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/external-access/install-airflow-cluster.yaml.j2 index b66bff08..da0a6ff6 100644 --- a/tests/templates/kuttl/external-access/install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/external-access/install-airflow-cluster.yaml.j2 @@ -29,7 +29,7 @@ spec: {% endif %} pullPolicy: IfNotPresent clusterConfig: - loadExamples: true + loadExamples: false credentialsSecret: test-airflow-credentials webservers: config: From 98cc63a8fb2083bf50fd4e941061f09c5b498164 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Fri, 23 May 2025 17:10:17 +0200 Subject: [PATCH 15/17] relaxed default resources --- rust/operator-binary/src/crd/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 6d4d008c..3276afa2 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -897,7 +897,7 @@ fn default_resources(role: &AirflowRole) -> ResourcesFragment ResourcesFragment ResourcesFragment Date: Tue, 27 May 2025 10:13:14 +0200 Subject: [PATCH 16/17] update test defs for oidc/opa --- rust/operator-binary/src/crd/mod.rs | 9 ++++----- .../kuttl/oidc/50-install-test-container.yaml.j2 | 8 ++++---- tests/templates/kuttl/oidc/install-airflow.yaml.j2 | 8 ++++---- tests/test-definition.yaml | 4 ++-- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 3276afa2..4a5e19a1 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -558,10 +558,10 @@ impl AirflowRole { // Start-up commands have changed in 3.x. // See https://airflow.apache.org/docs/apache-airflow/3.0.1/installation/upgrading_to_airflow3.html#step-6-changes-to-your-startup-scripts and // https://airflow.apache.org/docs/apache-airflow/3.0.1/installation/setting-up-the-database.html#setting-up-the-database. - // If `airflow db migrate` is not run for each role there may be - // timing issues (services which require the db start before the - // migration is complete). DB-migrations should be eventually be - // optional. See https://github.com/stackabletech/airflow-operator/issues/589. + // `airflow db migrate` is not run for each role so there may be + // re-starts of webserver and/or workers (which require the DB). + // DB-migrations should be eventually be optional: + // See https://github.com/stackabletech/airflow-operator/issues/589. match &self { AirflowRole::Webserver => { command.extend(Self::authentication_start_commands(auth_config)); @@ -593,7 +593,6 @@ impl AirflowRole { format!( "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" ), - "airflow db migrate".to_string(), "airflow celery worker &".to_string(), ]), } diff --git a/tests/templates/kuttl/oidc/50-install-test-container.yaml.j2 b/tests/templates/kuttl/oidc/50-install-test-container.yaml.j2 index c5369e43..9329f4d5 100644 --- a/tests/templates/kuttl/oidc/50-install-test-container.yaml.j2 +++ b/tests/templates/kuttl/oidc/50-install-test-container.yaml.j2 @@ -77,12 +77,12 @@ spec: fieldRef: fieldPath: metadata.namespace - name: AIRFLOW_VERSION -{% if test_scenario['values']['airflow-non-experimental'].find(",") > 0 %} - value: "{{ test_scenario['values']['airflow-non-experimental'].split(',')[0] }}" +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + value: "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" {% else %} - value: "{{ test_scenario['values']['airflow-non-experimental'] }}" + value: "{{ test_scenario['values']['airflow-latest'] }}" {% endif %} - + volumes: - name: tls csi: diff --git a/tests/templates/kuttl/oidc/install-airflow.yaml.j2 b/tests/templates/kuttl/oidc/install-airflow.yaml.j2 index 61863633..56f36651 100644 --- a/tests/templates/kuttl/oidc/install-airflow.yaml.j2 +++ b/tests/templates/kuttl/oidc/install-airflow.yaml.j2 @@ -36,11 +36,11 @@ metadata: name: airflow spec: image: -{% if test_scenario['values']['airflow-non-experimental'].find(",") > 0 %} - custom: "{{ test_scenario['values']['airflow-non-experimental'].split(',')[1] }}" - productVersion: "{{ test_scenario['values']['airflow-non-experimental'].split(',')[0] }}" +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + custom: "{{ test_scenario['values']['airflow-latest'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" {% else %} - productVersion: "{{ test_scenario['values']['airflow-non-experimental'] }}" + productVersion: "{{ test_scenario['values']['airflow-latest'] }}" {% endif %} pullPolicy: IfNotPresent clusterConfig: diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index da5104cf..f23caa0d 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -20,7 +20,7 @@ dimensions: # - 2.9.3,oci.stackable.tech/sandbox/airflow:2.9.3-stackable0.0.0-dev - name: airflow-non-experimental values: - - 3.0.1 + - 2.10.5 # To use a custom image, add a comma and the full name after the product version # - 2.9.3,oci.stackable.tech/sandbox/airflow:2.9.3-stackable0.0.0-dev - name: opa-latest @@ -62,7 +62,7 @@ tests: - executor - name: oidc dimensions: - - airflow-non-experimental + - airflow-latest - openshift - name: opa dimensions: From 4f578562dbf51b352e9bac9df3d863b684bd8f3e Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 27 May 2025 13:43:12 +0200 Subject: [PATCH 17/17] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 673e69c5..f51667c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation. - Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`. - Add support for airflow `2.10.5` ([#625]). +- Add experimental support for airflow `3.0.1` ([#630]). ### Changed @@ -41,6 +42,7 @@ [#623]: https://github.com/stackabletech/airflow-operator/pull/623 [#624]: https://github.com/stackabletech/airflow-operator/pull/624 [#625]: https://github.com/stackabletech/airflow-operator/pull/625 +[#630]: https://github.com/stackabletech/airflow-operator/pull/630 ## [25.3.0] - 2025-03-21