diff --git a/CHANGELOG.md b/CHANGELOG.md index 673e69c5..f51667c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation. - Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`. - Add support for airflow `2.10.5` ([#625]). +- Add experimental support for airflow `3.0.1` ([#630]). ### Changed @@ -41,6 +42,7 @@ [#623]: https://github.com/stackabletech/airflow-operator/pull/623 [#624]: https://github.com/stackabletech/airflow-operator/pull/624 [#625]: https://github.com/stackabletech/airflow-operator/pull/625 +[#630]: https://github.com/stackabletech/airflow-operator/pull/630 ## [25.3.0] - 2025-03-21 diff --git a/Cargo.lock b/Cargo.lock index d8aff1e5..051ac671 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2639,13 +2639,16 @@ name = "stackable-airflow-operator" version = "0.0.0-dev" dependencies = [ "anyhow", + "base64 0.22.1", "built", "clap", "const_format", "fnv", "futures 0.3.31", "indoc", + "lazy_static", "product-config", + "rand 0.8.5", "rstest", "serde", "serde_json", diff --git a/Cargo.nix b/Cargo.nix index bb106b6c..9bf40d5f 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -8583,6 +8583,10 @@ rec { name = "anyhow"; packageId = "anyhow"; } + { + name = "base64"; + packageId = "base64 0.22.1"; + } { name = "clap"; packageId = "clap"; @@ -8604,10 +8608,18 @@ rec { name = "indoc"; packageId = "indoc"; } + { + name = "lazy_static"; + packageId = "lazy_static"; + } { name = "product-config"; packageId = "product-config"; } + { + name = "rand"; + packageId = "rand 0.8.5"; + } { name = "serde"; packageId = "serde"; diff --git a/Cargo.toml b/Cargo.toml index 810a3117..9d11efaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,12 +14,15 @@ product-config = { git = "https://github.com/stackabletech/product-config.git", stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", features = ["telemetry", "versioned"], tag = "stackable-operator-0.93.1" } anyhow = "1.0" +base64 = "0.22.1" built = { version = "0.7", features = ["chrono", "git2"] } clap = "4.5" const_format = "0.2" fnv = "1.0" futures = { version = "0.3", features = ["compat"] } indoc = "2.0" +lazy_static = "1.4" +rand = "0.8.5" rstest = "0.25" semver = "1.0" serde = { version = "1.0", features = ["derive"] } diff --git a/rust/operator-binary/Cargo.toml b/rust/operator-binary/Cargo.toml index 3edfae3e..cbcebd69 100644 --- a/rust/operator-binary/Cargo.toml +++ b/rust/operator-binary/Cargo.toml @@ -13,10 +13,13 @@ product-config.workspace = true stackable-operator.workspace = true anyhow.workspace = true +base64.workspace = true clap.workspace = true const_format.workspace = true fnv.workspace = true futures.workspace = true +lazy_static.workspace = true +rand.workspace = true serde.workspace = true serde_json.workspace = true serde_yaml.workspace = true diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index 569995cb..98aa557d 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -957,7 +957,8 @@ fn build_server_rolegroup_statefulset( .context(GracefulShutdownSnafu)?; let mut airflow_container_args = Vec::new(); - airflow_container_args.extend(airflow_role.get_commands(authentication_config)); + airflow_container_args + .extend(airflow_role.get_commands(authentication_config, resolved_product_image)); airflow_container .image_from_product_image(resolved_product_image) @@ -980,6 +981,7 @@ fn build_server_rolegroup_statefulset( authentication_config, authorization_config, git_sync_resources, + resolved_product_image, ) .context(BuildStatefulsetEnvVarsSnafu)?, ); @@ -1170,7 +1172,10 @@ fn build_server_rolegroup_statefulset( match_labels: Some(statefulset_match_labels.into()), ..LabelSelector::default() }, - service_name: Some(rolegroup_ref.object_name()), + service_name: Some(format!( + "{name}-metrics", + name = rolegroup_ref.object_name() + )), template: pod_template, volume_claim_templates: pvcs, ..StatefulSetSpec::default() @@ -1263,6 +1268,7 @@ fn build_executor_template_config_map( env_overrides, merged_executor_config, git_sync_resources, + resolved_product_image, )) .add_volume_mounts(airflow.volume_mounts()) .context(AddVolumeMountSnafu)? diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 4b74eb04..4a5e19a1 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -9,7 +9,7 @@ use stackable_operator::{ cache::UserInformationCache, cluster_operation::ClusterOperation, opa::OpaConfig, - product_image_selection::ProductImage, + product_image_selection::{ProductImage, ResolvedProductImage}, resources::{ CpuLimitsFragment, MemoryLimitsFragment, NoRuntimeLimits, NoRuntimeLimitsFragment, Resources, ResourcesFragment, @@ -324,11 +324,6 @@ impl v1alpha1::AirflowCluster { self.spec.cluster_config.volume_mounts.clone() } - /// The name of the role-level load-balanced Kubernetes `Service` - pub fn node_role_service_name(&self) -> Option { - self.metadata.name.clone() - } - /// Retrieve and merge resource configs for role and role groups pub fn merged_config( &self, @@ -548,6 +543,7 @@ impl AirflowRole { pub fn get_commands( &self, auth_config: &AirflowClientAuthenticationDetailsResolved, + resolved_product_image: &ResolvedProductImage, ) -> Vec { let mut command = vec![ format!( @@ -558,43 +554,87 @@ impl AirflowRole { remove_vector_shutdown_file_command(STACKABLE_LOG_DIR), ]; - match &self { - AirflowRole::Webserver => { - // Getting auth commands for AuthClass - command.extend(Self::authentication_start_commands(auth_config)); - command.extend(vec![ + if resolved_product_image.product_version.starts_with("3.") { + // Start-up commands have changed in 3.x. + // See https://airflow.apache.org/docs/apache-airflow/3.0.1/installation/upgrading_to_airflow3.html#step-6-changes-to-your-startup-scripts and + // https://airflow.apache.org/docs/apache-airflow/3.0.1/installation/setting-up-the-database.html#setting-up-the-database. + // `airflow db migrate` is not run for each role so there may be + // re-starts of webserver and/or workers (which require the DB). + // DB-migrations should be eventually be optional: + // See https://github.com/stackabletech/airflow-operator/issues/589. + match &self { + AirflowRole::Webserver => { + command.extend(Self::authentication_start_commands(auth_config)); + command.extend(vec![ + "prepare_signal_handlers".to_string(), + format!("containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"), + "airflow api-server &".to_string(), + ]); + } + AirflowRole::Scheduler => command.extend(vec![ + "airflow db migrate".to_string(), + "airflow users create \ + --username \"$ADMIN_USERNAME\" \ + --firstname \"$ADMIN_FIRSTNAME\" \ + --lastname \"$ADMIN_LASTNAME\" \ + --email \"$ADMIN_EMAIL\" \ + --password \"$ADMIN_PASSWORD\" \ + --role \"Admin\"" + .to_string(), + "prepare_signal_handlers".to_string(), + format!( + "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" + ), + "airflow dag-processor &".to_string(), + "airflow scheduler &".to_string(), + ]), + AirflowRole::Worker => command.extend(vec![ "prepare_signal_handlers".to_string(), - format!("containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"), - "airflow webserver &".to_string(), - ]); + format!( + "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" + ), + "airflow celery worker &".to_string(), + ]), + } + } else { + match &self { + AirflowRole::Webserver => { + // Getting auth commands for AuthClass + command.extend(Self::authentication_start_commands(auth_config)); + command.extend(vec![ + "prepare_signal_handlers".to_string(), + format!("containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"), + "airflow webserver &".to_string(), + ]); + } + AirflowRole::Scheduler => command.extend(vec![ + // Database initialization is limited to the scheduler, see https://github.com/stackabletech/airflow-operator/issues/259 + "airflow db init".to_string(), + "airflow db upgrade".to_string(), + "airflow users create \ + --username \"$ADMIN_USERNAME\" \ + --firstname \"$ADMIN_FIRSTNAME\" \ + --lastname \"$ADMIN_LASTNAME\" \ + --email \"$ADMIN_EMAIL\" \ + --password \"$ADMIN_PASSWORD\" \ + --role \"Admin\"" + .to_string(), + "prepare_signal_handlers".to_string(), + format!( + "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" + ), + "airflow scheduler &".to_string(), + ]), + AirflowRole::Worker => command.extend(vec![ + "prepare_signal_handlers".to_string(), + format!( + "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" + ), + "airflow celery worker &".to_string(), + ]), } - - AirflowRole::Scheduler => command.extend(vec![ - // Database initialization is limited to the scheduler, see https://github.com/stackabletech/airflow-operator/issues/259 - "airflow db init".to_string(), - "airflow db upgrade".to_string(), - "airflow users create \ - --username \"$ADMIN_USERNAME\" \ - --firstname \"$ADMIN_FIRSTNAME\" \ - --lastname \"$ADMIN_LASTNAME\" \ - --email \"$ADMIN_EMAIL\" \ - --password \"$ADMIN_PASSWORD\" \ - --role \"Admin\"" - .to_string(), - "prepare_signal_handlers".to_string(), - format!( - "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" - ), - "airflow scheduler &".to_string(), - ]), - AirflowRole::Worker => command.extend(vec![ - "prepare_signal_handlers".to_string(), - format!( - "containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &" - ), - "airflow celery worker &".to_string(), - ]), } + // graceful shutdown part command.extend(vec![ "wait_for_termination $!".to_string(), @@ -852,17 +892,17 @@ fn default_resources(role: &AirflowRole) -> ResourcesFragment ( CpuLimitsFragment { - min: Some(Quantity("500m".into())), + min: Some(Quantity("1".into())), max: Some(Quantity("2".into())), }, MemoryLimitsFragment { - limit: Some(Quantity("2Gi".into())), + limit: Some(Quantity("3Gi".into())), runtime_limits: NoRuntimeLimitsFragment {}, }, ), AirflowRole::Webserver => ( CpuLimitsFragment { - min: Some(Quantity("500m".into())), + min: Some(Quantity("1".into())), max: Some(Quantity("2".into())), }, MemoryLimitsFragment { @@ -872,11 +912,11 @@ fn default_resources(role: &AirflowRole) -> ResourcesFragment ( CpuLimitsFragment { - min: Some(Quantity("500m".to_owned())), + min: Some(Quantity("1".to_owned())), max: Some(Quantity("2".to_owned())), }, MemoryLimitsFragment { - limit: Some(Quantity("512Mi".to_owned())), + limit: Some(Quantity("1Gi".to_owned())), runtime_limits: NoRuntimeLimitsFragment {}, }, ), diff --git a/rust/operator-binary/src/env_vars.rs b/rust/operator-binary/src/env_vars.rs index bc3bbd69..4d96c898 100644 --- a/rust/operator-binary/src/env_vars.rs +++ b/rust/operator-binary/src/env_vars.rs @@ -3,9 +3,13 @@ use std::{ path::PathBuf, }; +use base64::{Engine, engine::general_purpose::STANDARD}; +use lazy_static::lazy_static; use product_config::types::PropertyNameKind; +use rand::Rng; use snafu::Snafu; use stackable_operator::{ + commons::product_image_selection::ResolvedProductImage, crd::{authentication::oidc, git_sync}, k8s_openapi::api::core::v1::EnvVar, kube::ResourceExt, @@ -30,13 +34,14 @@ const AIRFLOW_LOGGING_LOGGING_CONFIG_CLASS: &str = "AIRFLOW__LOGGING__LOGGING_CO const AIRFLOW_METRICS_STATSD_ON: &str = "AIRFLOW__METRICS__STATSD_ON"; const AIRFLOW_METRICS_STATSD_HOST: &str = "AIRFLOW__METRICS__STATSD_HOST"; const AIRFLOW_METRICS_STATSD_PORT: &str = "AIRFLOW__METRICS__STATSD_PORT"; -const AIRFLOW_API_AUTH_BACKEND: &str = "AIRFLOW__API__AUTH_BACKEND"; const AIRFLOW_WEBSERVER_SECRET_KEY: &str = "AIRFLOW__WEBSERVER__SECRET_KEY"; -const AIRFLOW_CORE_SQL_ALCHEMY_CONN: &str = "AIRFLOW__CORE__SQL_ALCHEMY_CONN"; const AIRFLOW_CELERY_RESULT_BACKEND: &str = "AIRFLOW__CELERY__RESULT_BACKEND"; const AIRFLOW_CELERY_BROKER_URL: &str = "AIRFLOW__CELERY__BROKER_URL"; const AIRFLOW_CORE_DAGS_FOLDER: &str = "AIRFLOW__CORE__DAGS_FOLDER"; const AIRFLOW_CORE_LOAD_EXAMPLES: &str = "AIRFLOW__CORE__LOAD_EXAMPLES"; +const AIRFLOW_API_AUTH_BACKENDS: &str = "AIRFLOW__API__AUTH_BACKENDS"; +const AIRFLOW_DATABASE_SQL_ALCHEMY_CONN: &str = "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"; + const AIRFLOW_WEBSERVER_EXPOSE_CONFIG: &str = "AIRFLOW__WEBSERVER__EXPOSE_CONFIG"; const AIRFLOW_CORE_EXECUTOR: &str = "AIRFLOW__CORE__EXECUTOR"; const AIRFLOW_KUBERNETES_EXECUTOR_POD_TEMPLATE_FILE: &str = @@ -51,6 +56,15 @@ const ADMIN_EMAIL: &str = "ADMIN_EMAIL"; const PYTHONPATH: &str = "PYTHONPATH"; +lazy_static! { + pub static ref JWT_KEY: String = { + let mut rng = rand::thread_rng(); + // Generate 16 random bytes and encode to base64 string + let random_bytes: [u8; 16] = rng.gen(); + STANDARD.encode(random_bytes) + }; +} + #[derive(Snafu, Debug)] pub enum Error { #[snafu(display( @@ -62,6 +76,7 @@ pub enum Error { /// Return environment variables to be applied to the statefulsets for the scheduler, webserver (and worker, /// for clusters utilizing `celeryExecutor`: for clusters using `kubernetesExecutor` a different set will be /// used which is defined in [`build_airflow_template_envs`]). +#[allow(clippy::too_many_arguments)] pub fn build_airflow_statefulset_envs( airflow: &v1alpha1::AirflowCluster, airflow_role: &AirflowRole, @@ -70,11 +85,14 @@ pub fn build_airflow_statefulset_envs( auth_config: &AirflowClientAuthenticationDetailsResolved, authorization_config: &AirflowAuthorizationResolved, git_sync_resources: &git_sync::v1alpha1::GitSyncResources, + resolved_product_image: &ResolvedProductImage, ) -> Result, Error> { let mut env: BTreeMap = BTreeMap::new(); env.extend(static_envs(git_sync_resources)); + add_version_specific_env_vars(airflow, resolved_product_image, &mut env); + // environment variables let env_vars = rolegroup_config.get(&PropertyNameKind::Env); @@ -93,9 +111,9 @@ pub fn build_airflow_statefulset_envs( ), ); env.insert( - AIRFLOW_CORE_SQL_ALCHEMY_CONN.into(), + AIRFLOW_DATABASE_SQL_ALCHEMY_CONN.into(), env_var_from_secret( - AIRFLOW_CORE_SQL_ALCHEMY_CONN, + AIRFLOW_DATABASE_SQL_ALCHEMY_CONN, secret, "connections.sqlalchemyDatabaseUri", ), @@ -207,6 +225,7 @@ pub fn build_airflow_statefulset_envs( } _ => {} } + // apply overrides last of all with a fixed ordering if let Some(env_vars) = env_vars { for (k, v) in env_vars.iter().collect::>() { @@ -289,18 +308,6 @@ fn static_envs( ..Default::default() }); - env.insert( - AIRFLOW_API_AUTH_BACKEND.into(), - // Authentication for the API is handled separately to the Web Authentication. - // Basic authentication is used by the integration tests. - // The default is to deny all requests to the API. - EnvVar { - name: AIRFLOW_API_AUTH_BACKEND.into(), - value: Some("airflow.api.auth.backend.basic_auth".into()), - ..Default::default() - }, - ); - env } @@ -311,14 +318,15 @@ pub fn build_airflow_template_envs( env_overrides: &HashMap, config: &ExecutorConfig, git_sync_resources: &git_sync::v1alpha1::GitSyncResources, + resolved_product_image: &ResolvedProductImage, ) -> Vec { let mut env: BTreeMap = BTreeMap::new(); let secret = airflow.spec.cluster_config.credentials_secret.as_str(); env.insert( - AIRFLOW_CORE_SQL_ALCHEMY_CONN.into(), + AIRFLOW_DATABASE_SQL_ALCHEMY_CONN.into(), env_var_from_secret( - AIRFLOW_CORE_SQL_ALCHEMY_CONN, + AIRFLOW_DATABASE_SQL_ALCHEMY_CONN, secret, "connections.sqlalchemyDatabaseUri", ), @@ -347,6 +355,8 @@ pub fn build_airflow_template_envs( env.extend(static_envs(git_sync_resources)); + add_version_specific_env_vars(airflow, resolved_product_image, &mut env); + // _STACKABLE_POST_HOOK will contain a command to create a shutdown hook that will be // evaluated in the wrapper for each stackable spark container: this is necessary for pods // that are created and then terminated (we do a similar thing for spark-k8s). @@ -378,6 +388,46 @@ pub fn build_airflow_template_envs( transform_map_to_vec(env) } +fn add_version_specific_env_vars( + airflow: &v1alpha1::AirflowCluster, + resolved_product_image: &ResolvedProductImage, + env: &mut BTreeMap, +) { + if resolved_product_image.product_version.starts_with("3.") { + env.extend(execution_server_env_vars(airflow)); + env.insert(AIRFLOW_CORE_AUTH_MANAGER.into(), EnvVar { + name: AIRFLOW_CORE_AUTH_MANAGER.into(), + value: Some( + "airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager".to_string(), + ), + ..Default::default() + }); + env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar { + name: AIRFLOW_API_AUTH_BACKENDS.into(), + value: Some("airflow.api.auth.backend.session".into()), + ..Default::default() + }); + // As of 3.x a JWT key is required. + // See https://airflow.apache.org/docs/apache-airflow/3.0.1/configurations-ref.html#jwt-secret + // This must be random, but must also be consistent across api-services. + // The key will be consistent for all clusters started by this + // operator instance. TODO: Make this cluster specific. + env.insert("AIRFLOW__API_AUTH__JWT_SECRET".into(), EnvVar { + name: "AIRFLOW__API_AUTH__JWT_SECRET".into(), + value: Some(JWT_KEY.clone()), + ..Default::default() + }); + } else { + env.insert(AIRFLOW_API_AUTH_BACKENDS.into(), EnvVar { + name: AIRFLOW_API_AUTH_BACKENDS.into(), + value: Some( + "airflow.api.auth.backend.basic_auth, airflow.api.auth.backend.session".into(), + ), + ..Default::default() + }); + } +} + // Internally the environment variable collection uses a map so that overrides can actually // override existing keys. The returned collection will be a vector. fn transform_map_to_vec(env_map: BTreeMap) -> Vec { @@ -423,3 +473,41 @@ fn authorization_env_vars(authorization_config: &AirflowAuthorizationResolved) - env } + +fn execution_server_env_vars(airflow: &v1alpha1::AirflowCluster) -> BTreeMap { + let mut env: BTreeMap = BTreeMap::new(); + + if let Some(name) = airflow.metadata.name.as_ref() { + // The execution API server URL can be any webserver (if there + // are multiple ones). Parse the list of webservers in a deterministic + // way by iterating over a BTree map rather than the HashMap. + if let Some(webserver_role) = airflow.spec.webservers.as_ref() { + if let Some(rolegroup) = webserver_role + .role_groups + .iter() + .collect::>() + .first_entry() + { + let webserver = format!( + "{name}-webserver-{rolegroup}", + name = name, + rolegroup = rolegroup.key() + ); + tracing::debug!("Webserver set [{webserver}]"); + // These settings are new in 3.x and will have no affect with earlier versions. + env.insert("AIRFLOW__CORE__EXECUTION_API_SERVER_URL".into(), EnvVar { + name: "AIRFLOW__CORE__EXECUTION_API_SERVER_URL".into(), + value: Some(format!("http://{webserver}:8080/execution/")), + ..Default::default() + }); + env.insert("AIRFLOW__CORE__BASE_URL".into(), EnvVar { + name: "AIRFLOW__CORE__BASE_URL".into(), + value: Some(format!("http://{webserver}:8080/")), + ..Default::default() + }); + } + } + } + + env +} diff --git a/rust/operator-binary/src/product_logging.rs b/rust/operator-binary/src/product_logging.rs index 9aca3ec3..8d7e4eea 100644 --- a/rust/operator-binary/src/product_logging.rs +++ b/rust/operator-binary/src/product_logging.rs @@ -105,6 +105,8 @@ os.makedirs('{log_dir}', exist_ok=True) LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG) +REMOTE_TASK_LOG = None + LOGGING_CONFIG.setdefault('loggers', {{}}) for logger_name, logger_config in LOGGING_CONFIG['loggers'].items(): logger_config['level'] = logging.NOTSET diff --git a/tests/templates/kuttl/commons/health.py b/tests/templates/kuttl/commons/health.py index ef1fc56b..385e3a29 100755 --- a/tests/templates/kuttl/commons/health.py +++ b/tests/templates/kuttl/commons/health.py @@ -3,6 +3,7 @@ import requests import sys import time +import argparse if __name__ == "__main__": log_level = "DEBUG" @@ -12,12 +13,15 @@ stream=sys.stdout, ) - try: - role_group = sys.argv[1] - except IndexError: - role_group = "default" + parser = argparse.ArgumentParser(description="Health check script") + parser.add_argument("--role-group", type=str, default="default", help="Role group to check") + parser.add_argument("--airflow-version", type=str, help="Airflow version") + opts = parser.parse_args() + + url = f"http://airflow-webserver-{opts.role_group}:8080/api/v1/health" + if opts.airflow_version and opts.airflow_version.startswith("3"): + url = f"http://airflow-webserver-{opts.role_group}:8080/api/v2/monitor/health" - url = f"http://airflow-webserver-{role_group}:8080/api/v1/health" count = 0 while True: diff --git a/tests/templates/kuttl/commons/metrics.py b/tests/templates/kuttl/commons/metrics.py index 866535f9..e14efb59 100755 --- a/tests/templates/kuttl/commons/metrics.py +++ b/tests/templates/kuttl/commons/metrics.py @@ -3,6 +3,9 @@ import requests import time import sys +from datetime import datetime, timezone +import argparse +import logging def exception_handler(exception_type, exception, traceback): @@ -22,55 +25,154 @@ def assert_metric(role, role_group, metric): return metric in metric_response.text -try: - role_group = sys.argv[1] -except IndexError: - role_group = "default" - -# Trigger a DAG run to create metrics -dag_id = "example_trigger_target_dag" -dag_conf = {"message": "Hello World"} - -rest_url = f"http://airflow-webserver-{role_group}:8080/api/v1" -auth = ("airflow", "airflow") - -# allow a few moments for the DAGs to be registered to all roles -time.sleep(10) - -response = requests.patch( - f"{rest_url}/dags/{dag_id}", auth=auth, json={"is_paused": False} -) -response = requests.post( - f"{rest_url}/dags/{dag_id}/dagRuns", auth=auth, json={"conf": dag_conf} -) - -# Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid -# or minimize this by looping over the check instead. -iterations = 4 -loop = 0 -while True: - assert response.status_code == 200, "DAG run could not be triggered." - # Wait for the metrics to be consumed by the statsd-exporter - time.sleep(5) - # (disable line-break flake checks) - if ( - (assert_metric("scheduler", role_group, "airflow_scheduler_heartbeat")) - and ( - assert_metric( - "webserver", role_group, "airflow_task_instance_created_BashOperator" - ) - ) # noqa: W503, W504 - and ( - assert_metric( - "scheduler", - role_group, - "airflow_dagrun_duration_success_example_trigger_target_dag_count", - ) - ) - ): # noqa: W503, W504 - break +def metrics_v3(role_group: str) -> None: + now = datetime.now(timezone.utc) + ts = now.strftime("%Y-%m-%dT%H:%M:%S.%f") + now.strftime("%z") + + # Trigger a DAG run to create metrics + dag_id = "example_trigger_target_dag" + dag_data = {"logical_date": f"{ts}", "conf": {"message": "Hello World"}} + + print(f"DAG-Data: {dag_data}") + + # allow a few moments for the DAGs to be registered to all roles time.sleep(10) - loop += 1 - if loop == iterations: - # force re-try of script + + rest_url = f"http://airflow-webserver-{role_group}:8080/api/v2" + token_url = f"http://airflow-webserver-{role_group}:8080/auth/token" + + data = {"username": "airflow", "password": "airflow"} + + headers = {"Content-Type": "application/json"} + + response = requests.post(token_url, headers=headers, json=data) + + if response.status_code == 200 or response.status_code == 201: + token_data = response.json() + access_token = token_data["access_token"] + print(f"Access Token: {access_token}") + else: + print( + f"Failed to obtain access token: {response.status_code} - {response.text}" + ) sys.exit(1) + + headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + } + + # activate DAG + response = requests.patch( + f"{rest_url}/dags/{dag_id}", headers=headers, json={"is_paused": False} + ) + # trigger DAG + response = requests.post( + f"{rest_url}/dags/{dag_id}/dagRuns", headers=headers, json=dag_data + ) + + # Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid + # or minimize this by looping over the check instead. + iterations = 4 + loop = 0 + while True: + assert response.status_code == 200, "DAG run could not be triggered." + # Wait for the metrics to be consumed by the statsd-exporter + time.sleep(5) + # (disable line-break flake checks) + if ( + (assert_metric("scheduler", role_group, "airflow_scheduler_heartbeat")) + and ( + assert_metric( + "webserver", + role_group, + "airflow_task_instance_created_BashOperator", + ) + ) # noqa: W503, W504 + and ( + assert_metric( + "scheduler", + role_group, + "airflow_dagrun_duration_success_example_trigger_target_dag_count", + ) + ) + ): # noqa: W503, W504 + break + time.sleep(10) + loop += 1 + if loop == iterations: + # force re-try of script + sys.exit(1) + + +def metrics_v2(role_group: str) -> None: + # Trigger a DAG run to create metrics + dag_id = "example_trigger_target_dag" + dag_conf = {"message": "Hello World"} + + rest_url = f"http://airflow-webserver-{role_group}:8080/api/v1" + auth = ("airflow", "airflow") + + # allow a few moments for the DAGs to be registered to all roles + time.sleep(10) + + response = requests.patch( + f"{rest_url}/dags/{dag_id}", auth=auth, json={"is_paused": False} + ) + response = requests.post( + f"{rest_url}/dags/{dag_id}/dagRuns", auth=auth, json={"conf": dag_conf} + ) + + # Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid + # or minimize this by looping over the check instead. + iterations = 4 + loop = 0 + while True: + assert response.status_code == 200, "DAG run could not be triggered." + # Wait for the metrics to be consumed by the statsd-exporter + time.sleep(5) + # (disable line-break flake checks) + if ( + (assert_metric("scheduler", role_group, "airflow_scheduler_heartbeat")) + and ( + assert_metric( + "webserver", + role_group, + "airflow_task_instance_created_BashOperator", + ) + ) # noqa: W503, W504 + and ( + assert_metric( + "scheduler", + role_group, + "airflow_dagrun_duration_success_example_trigger_target_dag_count", + ) + ) + ): # noqa: W503, W504 + break + time.sleep(10) + loop += 1 + if loop == iterations: + # force re-try of script + sys.exit(1) + + +if __name__ == "__main__": + log_level = "DEBUG" + logging.basicConfig( + level=log_level, + format="%(asctime)s %(levelname)s: %(message)s", + stream=sys.stdout, + ) + + parser = argparse.ArgumentParser(description="Airflow metrics script") + parser.add_argument( + "--role-group", type=str, default="default", help="Role group to check" + ) + parser.add_argument("--airflow-version", type=str, help="Airflow version") + opts = parser.parse_args() + + if opts.airflow_version and opts.airflow_version.startswith("3"): + metrics_v3(opts.role_group) + else: + metrics_v2(opts.role_group) diff --git a/tests/templates/kuttl/external-access/40-assert.yaml b/tests/templates/kuttl/external-access/40-assert.yaml.j2 similarity index 90% rename from tests/templates/kuttl/external-access/40-assert.yaml rename to tests/templates/kuttl/external-access/40-assert.yaml.j2 index b7a9f06f..dd58e251 100644 --- a/tests/templates/kuttl/external-access/40-assert.yaml +++ b/tests/templates/kuttl/external-access/40-assert.yaml.j2 @@ -22,8 +22,8 @@ spec: spec: terminationGracePeriodSeconds: 120 status: - readyReplicas: 2 - replicas: 2 + readyReplicas: 1 + replicas: 1 --- apiVersion: apps/v1 kind: StatefulSet @@ -43,18 +43,6 @@ status: --- apiVersion: apps/v1 kind: StatefulSet -metadata: - name: airflow-worker-default -spec: - template: - spec: - terminationGracePeriodSeconds: 300 -status: - readyReplicas: 2 - replicas: 2 ---- -apiVersion: apps/v1 -kind: StatefulSet metadata: name: airflow-scheduler-default spec: @@ -70,17 +58,8 @@ kind: PodDisruptionBudget metadata: name: airflow-webserver status: - expectedPods: 4 - currentHealthy: 4 - disruptionsAllowed: 1 ---- -apiVersion: policy/v1 -kind: PodDisruptionBudget -metadata: - name: airflow-worker -status: - expectedPods: 2 - currentHealthy: 2 + expectedPods: 3 + currentHealthy: 3 disruptionsAllowed: 1 --- apiVersion: policy/v1 @@ -112,3 +91,27 @@ metadata: name: airflow-webserver-external-unstable spec: type: NodePort # external-unstable + +{% if test_scenario['values']['executor'] == 'celery' %} +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-worker-default +spec: + template: + spec: + terminationGracePeriodSeconds: 300 +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: airflow-worker +status: + expectedPods: 1 + currentHealthy: 1 + disruptionsAllowed: 1 +{% endif %} diff --git a/tests/templates/kuttl/external-access/install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/external-access/install-airflow-cluster.yaml.j2 index d18d6c57..da0a6ff6 100644 --- a/tests/templates/kuttl/external-access/install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/external-access/install-airflow-cluster.yaml.j2 @@ -21,22 +21,28 @@ metadata: name: airflow spec: image: -{% if test_scenario['values']['airflow'].find(",") > 0 %} - custom: "{{ test_scenario['values']['airflow'].split(',')[1] }}" - productVersion: "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + custom: "{{ test_scenario['values']['airflow-latest'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" {% else %} - productVersion: "{{ test_scenario['values']['airflow'] }}" + productVersion: "{{ test_scenario['values']['airflow-latest'] }}" {% endif %} pullPolicy: IfNotPresent clusterConfig: - loadExamples: true + loadExamples: false credentialsSecret: test-airflow-credentials webservers: config: listenerClass: test-external-stable-$NAMESPACE + resources: + cpu: + min: 1000m + max: 2000m + memory: + limit: 2Gi roleGroups: default: - replicas: 2 + replicas: 1 external-unstable: replicas: 1 config: @@ -45,11 +51,29 @@ spec: replicas: 1 config: listenerClass: test-cluster-internal-$NAMESPACE +{% if test_scenario['values']['executor'] == 'celery' %} celeryExecutors: roleGroups: default: - replicas: 2 + replicas: 1 +{% elif test_scenario['values']['executor'] == 'kubernetes' %} + kubernetesExecutors: + config: + resources: + cpu: + min: 100m + max: 500m + memory: + limit: 1Gi +{% endif %} schedulers: + config: + resources: + cpu: + min: 1000m + max: 2000m + memory: + limit: 1Gi roleGroups: default: replicas: 1 diff --git a/tests/templates/kuttl/ldap/70-install-airflow-python.yaml b/tests/templates/kuttl/ldap/70-install-airflow-python.yaml index ba4d16a8..c3f865a0 100644 --- a/tests/templates/kuttl/ldap/70-install-airflow-python.yaml +++ b/tests/templates/kuttl/ldap/70-install-airflow-python.yaml @@ -18,5 +18,6 @@ spec: containers: - name: test-airflow-python image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent stdin: true tty: true diff --git a/tests/templates/kuttl/ldap/80-assert.yaml b/tests/templates/kuttl/ldap/80-assert.yaml deleted file mode 100644 index c37ea4f1..00000000 --- a/tests/templates/kuttl/ldap/80-assert.yaml +++ /dev/null @@ -1,8 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -metadata: - name: test-airflow-webserver-health-check -timeout: 480 -commands: - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py diff --git a/tests/templates/kuttl/ldap/80-assert.yaml.j2 b/tests/templates/kuttl/ldap/80-assert.yaml.j2 new file mode 100644 index 00000000..b85052aa --- /dev/null +++ b/tests/templates/kuttl/ldap/80-assert.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-webserver-health-check +timeout: 480 +commands: +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" +{% endif %} diff --git a/tests/templates/kuttl/ldap/90-assert.yaml b/tests/templates/kuttl/ldap/90-assert.yaml deleted file mode 100644 index 2ea7c1ca..00000000 --- a/tests/templates/kuttl/ldap/90-assert.yaml +++ /dev/null @@ -1,8 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -metadata: - name: metrics -timeout: 480 -commands: - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py diff --git a/tests/templates/kuttl/ldap/90-assert.yaml.j2 b/tests/templates/kuttl/ldap/90-assert.yaml.j2 new file mode 100644 index 00000000..7f43f061 --- /dev/null +++ b/tests/templates/kuttl/ldap/90-assert.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: metrics +timeout: 480 +commands: +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" +{% endif %} diff --git a/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 index dcc2956b..35ebf3a6 100644 --- a/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/logging/41-install-airflow-cluster.yaml.j2 @@ -37,6 +37,8 @@ data: LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG) + REMOTE_TASK_LOG = None + LOGGING_CONFIG['formatters']['json'] = { '()': 'airflow.utils.log.json_formatter.JSONFormatter', 'json_fields': ['asctime', 'levelname', 'message', 'name'] @@ -78,10 +80,10 @@ spec: listenerClass: external-unstable resources: cpu: - min: 200m - max: 1000m + min: 1000m + max: 2000m memory: - limit: 1Gi + limit: 3Gi roleGroups: automatic-log-config: replicas: 1 @@ -138,8 +140,8 @@ spec: config: resources: cpu: - min: 50m - max: 250m + min: 1000m + max: 2000m memory: limit: 3Gi roleGroups: @@ -226,8 +228,8 @@ spec: config: resources: cpu: - min: 100m - max: 500m + min: 1000m + max: 2000m memory: limit: 1Gi roleGroups: diff --git a/tests/templates/kuttl/logging/50-install-airflow-python.yaml b/tests/templates/kuttl/logging/50-install-airflow-python.yaml index ba4d16a8..c3f865a0 100644 --- a/tests/templates/kuttl/logging/50-install-airflow-python.yaml +++ b/tests/templates/kuttl/logging/50-install-airflow-python.yaml @@ -18,5 +18,6 @@ spec: containers: - name: test-airflow-python image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent stdin: true tty: true diff --git a/tests/templates/kuttl/logging/51-assert.yaml b/tests/templates/kuttl/logging/51-assert.yaml deleted file mode 100644 index 7514d353..00000000 --- a/tests/templates/kuttl/logging/51-assert.yaml +++ /dev/null @@ -1,10 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -metadata: - name: test-airflow-webserver-health-check -timeout: 480 -commands: - - script: | - kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py automatic-log-config - kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py custom-log-config diff --git a/tests/templates/kuttl/logging/51-assert.yaml.j2 b/tests/templates/kuttl/logging/51-assert.yaml.j2 new file mode 100644 index 00000000..e2971a52 --- /dev/null +++ b/tests/templates/kuttl/logging/51-assert.yaml.j2 @@ -0,0 +1,17 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-webserver-health-check +timeout: 480 +commands: +{% if test_scenario['values']['airflow'].find(",") > 0 %} + - script: | + kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --role-group automatic-log-config --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" + kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --role-group custom-log-config --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% else %} + - script: | + kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --role-group automatic-log-config --airflow-version "{{ test_scenario['values']['airflow'] }}" + kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --role-group custom-log-config --airflow-version "{{ test_scenario['values']['airflow'] }}" +{% endif %} + diff --git a/tests/templates/kuttl/logging/52-assert.yaml b/tests/templates/kuttl/logging/52-assert.yaml deleted file mode 100644 index c4f01749..00000000 --- a/tests/templates/kuttl/logging/52-assert.yaml +++ /dev/null @@ -1,10 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -metadata: - name: metrics -timeout: 600 -commands: - - script: | - kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py automatic-log-config - kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py custom-log-config diff --git a/tests/templates/kuttl/logging/52-assert.yaml.j2 b/tests/templates/kuttl/logging/52-assert.yaml.j2 new file mode 100644 index 00000000..3e093e4f --- /dev/null +++ b/tests/templates/kuttl/logging/52-assert.yaml.j2 @@ -0,0 +1,17 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: metrics +timeout: 600 +commands: +{% if test_scenario['values']['airflow'].find(",") > 0 %} + - script: | + kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --role-group automatic-log-config --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" + kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --role-group custom-log-config --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% else %} + - script: | + kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --role-group automatic-log-config --airflow-version "{{ test_scenario['values']['airflow'] }}" + kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --role-group custom-log-config --airflow-version "{{ test_scenario['values']['airflow'] }}" +{% endif %} + diff --git a/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 index b39e2bf8..1707cd26 100644 --- a/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 @@ -47,7 +47,7 @@ data: dag_id="example_trigger_target_dag", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, - schedule_interval=None, + schedule=None, tags=['example'], ) as dag: run_this = run_this_func() diff --git a/tests/templates/kuttl/mount-dags-configmap/40-install-airflow-python.yaml b/tests/templates/kuttl/mount-dags-configmap/40-install-airflow-python.yaml index ba4d16a8..c3f865a0 100644 --- a/tests/templates/kuttl/mount-dags-configmap/40-install-airflow-python.yaml +++ b/tests/templates/kuttl/mount-dags-configmap/40-install-airflow-python.yaml @@ -18,5 +18,6 @@ spec: containers: - name: test-airflow-python image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent stdin: true tty: true diff --git a/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml b/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml deleted file mode 100644 index c37ea4f1..00000000 --- a/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml +++ /dev/null @@ -1,8 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -metadata: - name: test-airflow-webserver-health-check -timeout: 480 -commands: - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py diff --git a/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 b/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 new file mode 100644 index 00000000..91c14877 --- /dev/null +++ b/tests/templates/kuttl/mount-dags-configmap/50-assert.yaml.j2 @@ -0,0 +1,13 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-webserver-health-check +timeout: 480 +commands: +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" +{% endif %} + diff --git a/tests/templates/kuttl/mount-dags-configmap/60-assert.yaml b/tests/templates/kuttl/mount-dags-configmap/60-assert.yaml deleted file mode 100644 index 2ea7c1ca..00000000 --- a/tests/templates/kuttl/mount-dags-configmap/60-assert.yaml +++ /dev/null @@ -1,8 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -metadata: - name: metrics -timeout: 480 -commands: - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py diff --git a/tests/templates/kuttl/mount-dags-configmap/60-assert.yaml.j2 b/tests/templates/kuttl/mount-dags-configmap/60-assert.yaml.j2 new file mode 100644 index 00000000..7f43f061 --- /dev/null +++ b/tests/templates/kuttl/mount-dags-configmap/60-assert.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: metrics +timeout: 480 +commands: +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" +{% endif %} diff --git a/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 index c4c7cf86..6aa3f43c 100644 --- a/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 @@ -71,7 +71,9 @@ spec: gitSyncConf: # supply some config to check that safe.directory is correctly set --git-config: http.sslVerify:false - gitFolder: "mount-dags-gitsync/dags" + # N.B. dags definitions changed from 2.x to 3.x: + # this test assumes airflow-latest > 2 + gitFolder: "mount-dags-gitsync/dags_airflow3" volumeMounts: - name: test-cm-gitsync mountPath: /tmp/test.txt diff --git a/tests/templates/kuttl/mount-dags-gitsync/40-install-airflow-python.yaml b/tests/templates/kuttl/mount-dags-gitsync/40-install-airflow-python.yaml index ba4d16a8..c3f865a0 100644 --- a/tests/templates/kuttl/mount-dags-gitsync/40-install-airflow-python.yaml +++ b/tests/templates/kuttl/mount-dags-gitsync/40-install-airflow-python.yaml @@ -18,5 +18,6 @@ spec: containers: - name: test-airflow-python image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent stdin: true tty: true diff --git a/tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml b/tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml deleted file mode 100644 index c37ea4f1..00000000 --- a/tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml +++ /dev/null @@ -1,8 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -metadata: - name: test-airflow-webserver-health-check -timeout: 480 -commands: - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py diff --git a/tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml.j2 b/tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml.j2 new file mode 100644 index 00000000..b85052aa --- /dev/null +++ b/tests/templates/kuttl/mount-dags-gitsync/50-assert.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-webserver-health-check +timeout: 480 +commands: +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" +{% endif %} diff --git a/tests/templates/kuttl/mount-dags-gitsync/dag_metrics.py b/tests/templates/kuttl/mount-dags-gitsync/dag_metrics.py index 670bbd53..acb38502 100755 --- a/tests/templates/kuttl/mount-dags-gitsync/dag_metrics.py +++ b/tests/templates/kuttl/mount-dags-gitsync/dag_metrics.py @@ -4,6 +4,7 @@ import time import sys import logging +from datetime import datetime, timezone def assert_metric(role, metric): @@ -16,19 +17,48 @@ def assert_metric(role, metric): return metric in metric_response.text +now = datetime.now(timezone.utc) +ts = now.strftime("%Y-%m-%dT%H:%M:%S.%f") + now.strftime("%z") + # Trigger a DAG run to create metrics dag_id = "sparkapp_dag" +dag_data = {"logical_date": f"{ts}"} -rest_url = "http://airflow-webserver-default:8080/api/v1" -auth = ("airflow", "airflow") +print(f"DAG-Data: {dag_data}") # allow a few moments for the DAGs to be registered to all roles time.sleep(10) +rest_url = "http://airflow-webserver-default:8080/api/v2" +token_url = "http://airflow-webserver-default:8080/auth/token" + +data = {"username": "airflow", "password": "airflow"} + +headers = {"Content-Type": "application/json"} + +response = requests.post(token_url, headers=headers, json=data) + +if response.status_code == 200 or response.status_code == 201: + token_data = response.json() + access_token = token_data["access_token"] + print(f"Access Token: {access_token}") +else: + print(f"Failed to obtain access token: {response.status_code} - {response.text}") + sys.exit(1) + +headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", +} + +# activate DAG response = requests.patch( - f"{rest_url}/dags/{dag_id}", auth=auth, json={"is_paused": False} + f"{rest_url}/dags/{dag_id}", headers=headers, json={"is_paused": False} +) +# trigger DAG +response = requests.post( + f"{rest_url}/dags/{dag_id}/dagRuns", headers=headers, json=dag_data ) -response = requests.post(f"{rest_url}/dags/{dag_id}/dagRuns", auth=auth, json={}) # Wait for the metrics to be consumed by the statsd-exporter time.sleep(5) diff --git a/tests/templates/kuttl/oidc/50-install-test-container.yaml.j2 b/tests/templates/kuttl/oidc/50-install-test-container.yaml.j2 index 5119ca9b..9329f4d5 100644 --- a/tests/templates/kuttl/oidc/50-install-test-container.yaml.j2 +++ b/tests/templates/kuttl/oidc/50-install-test-container.yaml.j2 @@ -56,6 +56,7 @@ spec: containers: - name: python image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent stdin: true tty: true resources: @@ -71,6 +72,17 @@ spec: env: - name: REQUESTS_CA_BUNDLE value: /stackable/tls/ca.crt + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: AIRFLOW_VERSION +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + value: "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" +{% else %} + value: "{{ test_scenario['values']['airflow-latest'] }}" +{% endif %} + volumes: - name: tls csi: diff --git a/tests/templates/kuttl/oidc/60-login.yaml b/tests/templates/kuttl/oidc/60-login.yaml index 0745bc4b..34c10ae7 100644 --- a/tests/templates/kuttl/oidc/60-login.yaml +++ b/tests/templates/kuttl/oidc/60-login.yaml @@ -4,6 +4,5 @@ kind: TestStep metadata: name: login commands: - - script: > - envsubst '$NAMESPACE' < login.py | - kubectl exec -n $NAMESPACE -i python-0 -- tee /stackable/login.py > /dev/null + - script: | + kubectl cp -n $NAMESPACE login.py python-0:/stackable/login.py diff --git a/tests/templates/kuttl/oidc/install-airflow.yaml.j2 b/tests/templates/kuttl/oidc/install-airflow.yaml.j2 index b0bbd5cc..f2e5c941 100644 --- a/tests/templates/kuttl/oidc/install-airflow.yaml.j2 +++ b/tests/templates/kuttl/oidc/install-airflow.yaml.j2 @@ -36,11 +36,11 @@ metadata: name: airflow spec: image: -{% if test_scenario['values']['airflow'].find(",") > 0 %} - custom: "{{ test_scenario['values']['airflow'].split(',')[1] }}" - productVersion: "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + custom: "{{ test_scenario['values']['airflow-latest'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" {% else %} - productVersion: "{{ test_scenario['values']['airflow'] }}" + productVersion: "{{ test_scenario['values']['airflow-latest'] }}" {% endif %} pullPolicy: IfNotPresent clusterConfig: diff --git a/tests/templates/kuttl/oidc/login.py b/tests/templates/kuttl/oidc/login.py index aa859803..69d3ce51 100644 --- a/tests/templates/kuttl/oidc/login.py +++ b/tests/templates/kuttl/oidc/login.py @@ -3,22 +3,53 @@ import logging import requests import sys +import os from bs4 import BeautifulSoup logging.basicConfig( level="DEBUG", format="%(asctime)s %(levelname)s: %(message)s", stream=sys.stdout ) +log = logging.getLogger(__name__) + + +def assert_equal(a, b, msg): + if a != b: + raise AssertionError(f"{msg}\n\tleft: {a}\n\tright: {b}") + + +def assert_startwith(a, b, msg): + if not a.startswith(b): + raise AssertionError(f"{msg}\n\tleft: {a}\n\tright: {b}") + + +def login_page(base_url: str, airflow_version: str) -> str: + if airflow_version.startswith("3"): + return f"{base_url}/auth/login/keycloak?next=" + else: + return f"{base_url}/login/keycloak?next=" + + +def userinfo_page(base_url: str, airflow_version: str) -> str: + if airflow_version.startswith("3"): + return f"{base_url}/auth/users/userinfo/" + else: + return f"{base_url}/users/userinfo/" + + session = requests.Session() url = "http://airflow-webserver-default:8080" # Click on "Sign In with keycloak" in Airflow -login_page = session.get(f"{url}/login/keycloak?next=") +login_page = session.get(login_page(url, os.environ["AIRFLOW_VERSION"])) assert login_page.ok, "Redirection from Airflow to Keycloak failed" -assert login_page.url.startswith( - "https://keycloak1.$NAMESPACE.svc.cluster.local:8443/realms/test1/protocol/openid-connect/auth?response_type=code&client_id=airflow1" -), "Redirection to the Keycloak login page expected" + +assert_startwith( + login_page.url, + f"https://keycloak1.{os.environ['NAMESPACE']}.svc.cluster.local:8443/realms/test1/protocol/openid-connect/auth?response_type=code&client_id=airflow1", + "Redirection to the Keycloak login page expected", +) # Enter username and password into the Keycloak login page and click on "Sign In" login_page_html = BeautifulSoup(login_page.text, "html.parser") @@ -28,16 +59,19 @@ ) assert welcome_page.ok, "Login failed" -assert welcome_page.url == f"{url}/home", ( - "Redirection to the Airflow home page expected" +assert_equal( + welcome_page.url, f"{url}/", "Redirection to the Airflow home page expected" ) # Open the user information page in Airflow -userinfo_page = session.get(f"{url}/users/userinfo/") +userinfo_url = userinfo_page(url, os.environ["AIRFLOW_VERSION"]) +userinfo_page = session.get(userinfo_url) assert userinfo_page.ok, "Retrieving user information failed" -assert userinfo_page.url == f"{url}/users/userinfo/", ( - "Redirection to the Airflow user info page expected" +assert_equal( + userinfo_page.url, + userinfo_url, + "Redirection to the Airflow user info page expected", ) # Expect the user data provided by Keycloak in Airflow @@ -45,6 +79,8 @@ table_rows = userinfo_page_html.find_all("tr") user_data = {tr.find("th").text: tr.find("td").text for tr in table_rows} +log.debug(f"{user_data=}") + assert user_data["First Name"] == "Jane", ( "The first name of the user in Airflow should match the one provided by Keycloak" ) @@ -55,6 +91,8 @@ "The email of the user in Airflow should match the one provided by Keycloak" ) +log.info("OIDC login test passed") + # Later this can be extended to use different OIDC providers (currently only Keycloak is # supported) # diff --git a/tests/templates/kuttl/opa/30-install-airflow.yaml.j2 b/tests/templates/kuttl/opa/30-install-airflow.yaml.j2 index fe2d6786..21eacae2 100644 --- a/tests/templates/kuttl/opa/30-install-airflow.yaml.j2 +++ b/tests/templates/kuttl/opa/30-install-airflow.yaml.j2 @@ -24,11 +24,11 @@ metadata: name: airflow spec: image: -{% if test_scenario['values']['airflow'].find(",") > 0 %} - custom: "{{ test_scenario['values']['airflow'].split(',')[1] }}" - productVersion: "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% if test_scenario['values']['airflow-non-experimental'].find(",") > 0 %} + custom: "{{ test_scenario['values']['airflow-non-experimental'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['airflow-non-experimental'].split(',')[0] }}" {% else %} - productVersion: "{{ test_scenario['values']['airflow'] }}" + productVersion: "{{ test_scenario['values']['airflow-non-experimental'] }}" {% endif %} pullPolicy: IfNotPresent clusterConfig: diff --git a/tests/templates/kuttl/opa/40-install-test-container.yaml.j2 b/tests/templates/kuttl/opa/40-install-test-container.yaml.j2 index 7feb7246..939fb1cf 100644 --- a/tests/templates/kuttl/opa/40-install-test-container.yaml.j2 +++ b/tests/templates/kuttl/opa/40-install-test-container.yaml.j2 @@ -56,6 +56,7 @@ spec: containers: - name: test-runner image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent stdin: true tty: true resources: diff --git a/tests/templates/kuttl/overrides/10-install-airflow.yaml.j2 b/tests/templates/kuttl/overrides/10-install-airflow.yaml.j2 index 018a7a74..9d25d48a 100644 --- a/tests/templates/kuttl/overrides/10-install-airflow.yaml.j2 +++ b/tests/templates/kuttl/overrides/10-install-airflow.yaml.j2 @@ -43,6 +43,7 @@ spec: {% else %} productVersion: "{{ test_scenario['values']['airflow-latest'] }}" {% endif %} + pullPolicy: IfNotPresent clusterConfig: loadExamples: true exposeConfig: false diff --git a/tests/templates/kuttl/overrides/20-install-airflow2.yaml.j2 b/tests/templates/kuttl/overrides/20-install-airflow2.yaml.j2 index ff9a631c..1a905de6 100644 --- a/tests/templates/kuttl/overrides/20-install-airflow2.yaml.j2 +++ b/tests/templates/kuttl/overrides/20-install-airflow2.yaml.j2 @@ -11,6 +11,7 @@ spec: {% else %} productVersion: "{{ test_scenario['values']['airflow-latest'] }}" {% endif %} + pullPolicy: IfNotPresent clusterConfig: loadExamples: true exposeConfig: false diff --git a/tests/templates/kuttl/smoke/00-range-limit.yaml b/tests/templates/kuttl/smoke/00-range-limit.yaml.j2 similarity index 70% rename from tests/templates/kuttl/smoke/00-range-limit.yaml rename to tests/templates/kuttl/smoke/00-range-limit.yaml.j2 index 8fd02210..b35b57a4 100644 --- a/tests/templates/kuttl/smoke/00-range-limit.yaml +++ b/tests/templates/kuttl/smoke/00-range-limit.yaml.j2 @@ -1,3 +1,4 @@ +{% if test_scenario['values']['executor'] == 'celery' %} --- apiVersion: v1 kind: LimitRange @@ -9,3 +10,4 @@ spec: maxLimitRequestRatio: cpu: 5 memory: 1 +{% endif %} diff --git a/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 index 03b2e66b..38e96a72 100644 --- a/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 @@ -53,26 +53,6 @@ spec: ROLE_HEADER_VAR = "role-value" EXPERIMENTAL_FILE_FOOTER: | ROLE_FOOTER_VAR = "role-value" - WTF_CSRF_ENABLED: "False" - AUTH_ROLES_SYNC_AT_LOGIN: "true" - AUTH_USER_REGISTRATION: "false" - AUTH_USER_REGISTRATION_ROLE: "Role" - OAUTH_PROVIDERS: | - [ - { 'name': 'azure', - 'icon': 'fa-windows', - 'token_key': 'access_token', - 'remote_app': { - 'client_id': os.environ.get('OIDC_XXX_CLIENT_ID'), - 'client_secret': os.environ.get('OIDC_XXX_CLIENT_SECRET'), - 'client_kwargs': { - 'scope': 'openid profile' - }, - 'api_base_url': 'https://keycloak/realms/sdp/protocol/test-url', - 'server_metadata_url': 'https://keycloak/realms/sdp/.well-known/openid-configuration-test', - }, - } - ] roleGroups: default: replicas: 1 @@ -80,7 +60,6 @@ spec: webserver_config.py: EXPERIMENTAL_FILE_HEADER: | COMMON_HEADER_VAR = "group-value" - AUTH_USER_REGISTRATION_ROLE: "Rolegroup" {% if test_scenario['values']['executor'] == 'celery' %} celeryExecutors: config: diff --git a/tests/templates/kuttl/smoke/41-assert.yaml b/tests/templates/kuttl/smoke/41-assert.yaml index de4c98ff..6169fd93 100644 --- a/tests/templates/kuttl/smoke/41-assert.yaml +++ b/tests/templates/kuttl/smoke/41-assert.yaml @@ -19,7 +19,3 @@ commands: echo "$AIRFLOW_CONFIG" | grep 'COMMON_HEADER_VAR = "group-value"' echo "$AIRFLOW_CONFIG" | grep 'ROLE_FOOTER_VAR = "role-value"' echo "$AIRFLOW_CONFIG" | grep -v 'ROLE_HEADER_VAR = "role-value"' - echo "$AIRFLOW_CONFIG" | grep 'AUTH_ROLES_SYNC_AT_LOGIN = True' - echo "$AIRFLOW_CONFIG" | grep 'AUTH_USER_REGISTRATION = False' - echo "$AIRFLOW_CONFIG" | grep 'AUTH_USER_REGISTRATION_ROLE = "Rolegroup"' - echo "$AIRFLOW_CONFIG" | grep 'OAUTH_PROVIDERS' diff --git a/tests/templates/kuttl/smoke/42-assert.yaml b/tests/templates/kuttl/smoke/42-assert.yaml deleted file mode 100644 index 46ced0d2..00000000 --- a/tests/templates/kuttl/smoke/42-assert.yaml +++ /dev/null @@ -1,8 +0,0 @@ ---- -# This test checks if the containerdebug-state.json file is present and valid -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -timeout: 600 -commands: - - script: kubectl exec -n $NAMESPACE --container airflow airflow-scheduler-default-0 -- cat /stackable/log/containerdebug-state.json | jq --exit-status '"valid JSON"' - - script: kubectl exec -n $NAMESPACE --container airflow airflow-webserver-default-0 -- cat /stackable/log/containerdebug-state.json | jq --exit-status '"valid JSON"' diff --git a/tests/templates/kuttl/smoke/50-install-airflow-python.yaml b/tests/templates/kuttl/smoke/50-install-airflow-python.yaml index e087a848..7be1fbf8 100644 --- a/tests/templates/kuttl/smoke/50-install-airflow-python.yaml +++ b/tests/templates/kuttl/smoke/50-install-airflow-python.yaml @@ -18,6 +18,7 @@ spec: containers: - name: test-airflow-python image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent stdin: true tty: true resources: diff --git a/tests/templates/kuttl/smoke/60-assert.yaml b/tests/templates/kuttl/smoke/60-assert.yaml deleted file mode 100644 index c37ea4f1..00000000 --- a/tests/templates/kuttl/smoke/60-assert.yaml +++ /dev/null @@ -1,8 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -metadata: - name: test-airflow-webserver-health-check -timeout: 480 -commands: - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py diff --git a/tests/templates/kuttl/smoke/60-assert.yaml.j2 b/tests/templates/kuttl/smoke/60-assert.yaml.j2 new file mode 100644 index 00000000..8b1f71bf --- /dev/null +++ b/tests/templates/kuttl/smoke/60-assert.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-webserver-health-check +timeout: 480 +commands: +{% if test_scenario['values']['airflow'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow'] }}" +{% endif %} diff --git a/tests/templates/kuttl/smoke/70-assert.yaml b/tests/templates/kuttl/smoke/70-assert.yaml deleted file mode 100644 index 2ea7c1ca..00000000 --- a/tests/templates/kuttl/smoke/70-assert.yaml +++ /dev/null @@ -1,8 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -metadata: - name: metrics -timeout: 480 -commands: - - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py diff --git a/tests/templates/kuttl/smoke/70-assert.yaml.j2 b/tests/templates/kuttl/smoke/70-assert.yaml.j2 new file mode 100644 index 00000000..a811cb8e --- /dev/null +++ b/tests/templates/kuttl/smoke/70-assert.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: metrics +timeout: 480 +commands: +{% if test_scenario['values']['airflow'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/metrics.py --airflow-version "{{ test_scenario['values']['airflow'] }}" +{% endif %} diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 30405033..f23caa0d 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -10,9 +10,15 @@ dimensions: - 2.9.3 - 2.10.4 - 2.10.5 + - 3.0.1 # To use a custom image, add a comma and the full name after the product version # - 2.9.3,oci.stackable.tech/sandbox/airflow:2.9.3-stackable0.0.0-dev - name: airflow-latest + values: + - 3.0.1 + # To use a custom image, add a comma and the full name after the product version + # - 2.9.3,oci.stackable.tech/sandbox/airflow:2.9.3-stackable0.0.0-dev + - name: airflow-non-experimental values: - 2.10.5 # To use a custom image, add a comma and the full name after the product version @@ -56,11 +62,11 @@ tests: - executor - name: oidc dimensions: - - airflow + - airflow-latest - openshift - name: opa dimensions: - - airflow + - airflow-non-experimental - opa-latest - openshift - name: resources @@ -86,8 +92,9 @@ tests: - openshift - name: external-access dimensions: - - airflow + - airflow-latest - openshift + - executor suites: - name: nightly # Run nightly with the latest airflow