@@ -75,6 +75,7 @@ pub enum Error {
7575/// Return environment variables to be applied to the statefulsets for the scheduler, webserver (and worker,
7676/// for clusters utilizing `celeryExecutor`: for clusters using `kubernetesExecutor` a different set will be
7777/// used which is defined in [`build_airflow_template_envs`]).
78+ #[ allow( clippy:: too_many_arguments) ]
7879pub fn build_airflow_statefulset_envs (
7980 airflow : & v1alpha1:: AirflowCluster ,
8081 airflow_role : & AirflowRole ,
@@ -83,11 +84,12 @@ pub fn build_airflow_statefulset_envs(
8384 auth_config : & AirflowClientAuthenticationDetailsResolved ,
8485 authorization_config : & AirflowAuthorizationResolved ,
8586 git_sync_resources : & git_sync:: v1alpha1:: GitSyncResources ,
87+ rolegroup : & String ,
8688) -> Result < Vec < EnvVar > , Error > {
8789 let mut env: BTreeMap < String , EnvVar > = BTreeMap :: new ( ) ;
8890
8991 env. extend ( static_envs ( git_sync_resources) ) ;
90- env. extend ( execution_server_env_vars ( airflow) ) ;
92+ env. extend ( execution_server_env_vars ( airflow, rolegroup ) ) ;
9193
9294 // environment variables
9395 let env_vars = rolegroup_config. get ( & PropertyNameKind :: Env ) ;
@@ -312,6 +314,8 @@ fn static_envs(
312314 ..Default :: default ( )
313315 } ) ;
314316
317+ // Basic auth is only relevant to 2.x and can be removed once
318+ // that version is no longer supported.
315319 env. insert ( AIRFLOW_API_AUTH_BACKENDS . into ( ) , EnvVar {
316320 name : AIRFLOW_API_AUTH_BACKENDS . into ( ) ,
317321 value : Some ( "airflow.api.auth.backend.basic_auth, airflow.api.auth.backend.session" . into ( ) ) ,
@@ -374,7 +378,16 @@ pub fn build_airflow_template_envs(
374378 } ) ;
375379
376380 env. extend ( static_envs ( git_sync_resources) ) ;
377- env. extend ( execution_server_env_vars ( airflow) ) ;
381+
382+ // It does not appear to be possible for kubernetesExecutors to work with
383+ // multiple webserver rolegroups. For the celery case, each executor sets
384+ // the execution server from the associated rolegroup, but for kubernetes
385+ // workers this is not possible.
386+ if let Some ( webserver_role) = airflow. spec . webservers . as_ref ( ) {
387+ if let Some ( rolegroup) = webserver_role. role_groups . iter ( ) . next ( ) {
388+ env. extend ( execution_server_env_vars ( airflow, rolegroup. 0 ) ) ;
389+ }
390+ }
378391
379392 // _STACKABLE_POST_HOOK will contain a command to create a shutdown hook that will be
380393 // evaluated in the wrapper for each stackable spark container: this is necessary for pods
@@ -453,31 +466,28 @@ fn authorization_env_vars(authorization_config: &AirflowAuthorizationResolved) -
453466 env
454467}
455468
456- fn execution_server_env_vars ( airflow : & v1alpha1:: AirflowCluster ) -> BTreeMap < String , EnvVar > {
469+ fn execution_server_env_vars (
470+ airflow : & v1alpha1:: AirflowCluster ,
471+ rolegroup : & String ,
472+ ) -> BTreeMap < String , EnvVar > {
457473 let mut env: BTreeMap < String , EnvVar > = BTreeMap :: new ( ) ;
458474
459- if let Some ( webserver_role) = airflow. spec . webservers . as_ref ( ) {
460- if let Some ( rolegroup) = webserver_role. role_groups . iter ( ) . next ( ) {
461- if let Some ( name) = airflow. metadata . name . as_ref ( ) {
462- let webserver = format ! (
463- "{name}-webserver-{rolegroup}" ,
464- name = name,
465- rolegroup = rolegroup. 0
466- ) ;
467- tracing:: info!( "Webserver set [{webserver}]" ) ;
468-
469- env. insert ( "AIRFLOW__CORE__EXECUTION_API_SERVER_URL" . into ( ) , EnvVar {
470- name : "AIRFLOW__CORE__EXECUTION_API_SERVER_URL" . into ( ) ,
471- value : Some ( format ! ( "http://{webserver}:8080/execution/" ) ) ,
472- ..Default :: default ( )
473- } ) ;
474- env. insert ( "AIRFLOW__CORE__BASE_URL" . into ( ) , EnvVar {
475- name : "AIRFLOW__CORE__BASE_URL" . into ( ) ,
476- value : Some ( format ! ( "http://{webserver}:8080/" ) ) ,
477- ..Default :: default ( )
478- } ) ;
479- }
480- }
475+ if let Some ( name) = airflow. metadata . name . as_ref ( ) {
476+ let webserver = format ! ( "{name}-webserver-{rolegroup}-metrics" , ) ;
477+ tracing:: debug!( "Webserver set [{webserver}]" ) ;
478+
479+ // These settings are new in 3.x and will have no affect with earlier versions.
480+ env. insert ( "AIRFLOW__CORE__EXECUTION_API_SERVER_URL" . into ( ) , EnvVar {
481+ name : "AIRFLOW__CORE__EXECUTION_API_SERVER_URL" . into ( ) ,
482+ value : Some ( format ! ( "http://{webserver}:8080/execution/" ) ) ,
483+ ..Default :: default ( )
484+ } ) ;
485+ env. insert ( "AIRFLOW__CORE__BASE_URL" . into ( ) , EnvVar {
486+ name : "AIRFLOW__CORE__BASE_URL" . into ( ) ,
487+ value : Some ( format ! ( "http://{webserver}:8080/" ) ) ,
488+ ..Default :: default ( )
489+ } ) ;
481490 }
491+
482492 env
483493}
0 commit comments