@@ -124,39 +124,54 @@ pub fn build_airflow_statefulset_envs(
124124 }
125125
126126 let dags_folder = get_dags_folder ( git_sync_resources) ;
127- env. insert ( AIRFLOW_CORE_DAGS_FOLDER . into ( ) , EnvVar {
128- name : AIRFLOW_CORE_DAGS_FOLDER . into ( ) ,
129- value : Some ( dags_folder) ,
130- ..Default :: default ( )
131- } ) ;
127+ env. insert (
128+ AIRFLOW_CORE_DAGS_FOLDER . into ( ) ,
129+ EnvVar {
130+ name : AIRFLOW_CORE_DAGS_FOLDER . into ( ) ,
131+ value : Some ( dags_folder) ,
132+ ..Default :: default ( )
133+ } ,
134+ ) ;
132135
133136 if airflow. spec . cluster_config . load_examples {
134- env. insert ( AIRFLOW_CORE_LOAD_EXAMPLES . into ( ) , EnvVar {
135- name : AIRFLOW_CORE_LOAD_EXAMPLES . into ( ) ,
136- value : Some ( "True" . into ( ) ) ,
137- ..Default :: default ( )
138- } ) ;
137+ env. insert (
138+ AIRFLOW_CORE_LOAD_EXAMPLES . into ( ) ,
139+ EnvVar {
140+ name : AIRFLOW_CORE_LOAD_EXAMPLES . into ( ) ,
141+ value : Some ( "True" . into ( ) ) ,
142+ ..Default :: default ( )
143+ } ,
144+ ) ;
139145 } else {
140- env. insert ( AIRFLOW_CORE_LOAD_EXAMPLES . into ( ) , EnvVar {
141- name : AIRFLOW_CORE_LOAD_EXAMPLES . into ( ) ,
142- value : Some ( "False" . into ( ) ) ,
143- ..Default :: default ( )
144- } ) ;
146+ env. insert (
147+ AIRFLOW_CORE_LOAD_EXAMPLES . into ( ) ,
148+ EnvVar {
149+ name : AIRFLOW_CORE_LOAD_EXAMPLES . into ( ) ,
150+ value : Some ( "False" . into ( ) ) ,
151+ ..Default :: default ( )
152+ } ,
153+ ) ;
145154 }
146155
147156 if airflow. spec . cluster_config . expose_config {
148- env. insert ( AIRFLOW_WEBSERVER_EXPOSE_CONFIG . into ( ) , EnvVar {
149- name : AIRFLOW_WEBSERVER_EXPOSE_CONFIG . into ( ) ,
150- value : Some ( "True" . into ( ) ) ,
151- ..Default :: default ( )
152- } ) ;
157+ env. insert (
158+ AIRFLOW_WEBSERVER_EXPOSE_CONFIG . into ( ) ,
159+ EnvVar {
160+ name : AIRFLOW_WEBSERVER_EXPOSE_CONFIG . into ( ) ,
161+ value : Some ( "True" . into ( ) ) ,
162+ ..Default :: default ( )
163+ } ,
164+ ) ;
153165 }
154166
155- env. insert ( AIRFLOW_CORE_EXECUTOR . into ( ) , EnvVar {
156- name : AIRFLOW_CORE_EXECUTOR . into ( ) ,
157- value : Some ( executor. to_string ( ) ) ,
158- ..Default :: default ( )
159- } ) ;
167+ env. insert (
168+ AIRFLOW_CORE_EXECUTOR . into ( ) ,
169+ EnvVar {
170+ name : AIRFLOW_CORE_EXECUTOR . into ( ) ,
171+ value : Some ( executor. to_string ( ) ) ,
172+ ..Default :: default ( )
173+ } ,
174+ ) ;
160175
161176 if let AirflowExecutor :: KubernetesExecutor { .. } = executor {
162177 env. insert (
@@ -167,11 +182,14 @@ pub fn build_airflow_statefulset_envs(
167182 ..Default :: default ( )
168183 } ,
169184 ) ;
170- env. insert ( AIRFLOW_KUBERNETES_EXECUTOR_NAMESPACE . into ( ) , EnvVar {
171- name : AIRFLOW_KUBERNETES_EXECUTOR_NAMESPACE . into ( ) ,
172- value : airflow. namespace ( ) ,
173- ..Default :: default ( )
174- } ) ;
185+ env. insert (
186+ AIRFLOW_KUBERNETES_EXECUTOR_NAMESPACE . into ( ) ,
187+ EnvVar {
188+ name : AIRFLOW_KUBERNETES_EXECUTOR_NAMESPACE . into ( ) ,
189+ value : airflow. namespace ( ) ,
190+ ..Default :: default ( )
191+ } ,
192+ ) ;
175193 }
176194
177195 match airflow_role {
@@ -210,20 +228,26 @@ pub fn build_airflow_statefulset_envs(
210228 // apply overrides last of all with a fixed ordering
211229 if let Some ( env_vars) = env_vars {
212230 for ( k, v) in env_vars. iter ( ) . collect :: < BTreeMap < _ , _ > > ( ) {
213- env. insert ( k. into ( ) , EnvVar {
214- name : k. to_string ( ) ,
215- value : Some ( v. to_string ( ) ) ,
216- ..Default :: default ( )
217- } ) ;
231+ env. insert (
232+ k. into ( ) ,
233+ EnvVar {
234+ name : k. to_string ( ) ,
235+ value : Some ( v. to_string ( ) ) ,
236+ ..Default :: default ( )
237+ } ,
238+ ) ;
218239 }
219240 }
220241
221242 // Needed for the `containerdebug` process to log it's tracing information to.
222- env. insert ( "CONTAINERDEBUG_LOG_DIRECTORY" . to_string ( ) , EnvVar {
223- name : "CONTAINERDEBUG_LOG_DIRECTORY" . to_string ( ) ,
224- value : Some ( format ! ( "{STACKABLE_LOG_DIR}/containerdebug" ) ) ,
225- value_from : None ,
226- } ) ;
243+ env. insert (
244+ "CONTAINERDEBUG_LOG_DIRECTORY" . to_string ( ) ,
245+ EnvVar {
246+ name : "CONTAINERDEBUG_LOG_DIRECTORY" . to_string ( ) ,
247+ value : Some ( format ! ( "{STACKABLE_LOG_DIR}/containerdebug" ) ) ,
248+ value_from : None ,
249+ } ,
250+ ) ;
227251
228252 tracing:: debug!( "Env-var set [{:?}]" , env) ;
229253 Ok ( transform_map_to_vec ( env) )
@@ -257,37 +281,52 @@ fn static_envs(
257281
258282 let dags_folder = get_dags_folder ( git_sync_resources) ;
259283
260- env. insert ( PYTHONPATH . into ( ) , EnvVar {
261- // PYTHONPATH must be extended to include the dags folder so that dag
262- // dependencies can be found: this must be the actual path and not a variable.
263- // Also include the airflow site-packages by default (for airflow and kubernetes classes etc.)
264- name : PYTHONPATH . into ( ) ,
265- value : Some ( format ! ( "{LOG_CONFIG_DIR}:{dags_folder}" ) ) ,
266- ..Default :: default ( )
267- } ) ;
268- env. insert ( AIRFLOW_LOGGING_LOGGING_CONFIG_CLASS . into ( ) , EnvVar {
269- name : AIRFLOW_LOGGING_LOGGING_CONFIG_CLASS . into ( ) ,
270- value : Some ( "log_config.LOGGING_CONFIG" . into ( ) ) ,
271- ..Default :: default ( )
272- } ) ;
273-
274- env. insert ( AIRFLOW_METRICS_STATSD_ON . into ( ) , EnvVar {
275- name : AIRFLOW_METRICS_STATSD_ON . into ( ) ,
276- value : Some ( "True" . into ( ) ) ,
277- ..Default :: default ( )
278- } ) ;
279-
280- env. insert ( AIRFLOW_METRICS_STATSD_HOST . into ( ) , EnvVar {
281- name : AIRFLOW_METRICS_STATSD_HOST . into ( ) ,
282- value : Some ( "0.0.0.0" . into ( ) ) ,
283- ..Default :: default ( )
284- } ) ;
285-
286- env. insert ( AIRFLOW_METRICS_STATSD_PORT . into ( ) , EnvVar {
287- name : AIRFLOW_METRICS_STATSD_PORT . into ( ) ,
288- value : Some ( "9125" . into ( ) ) ,
289- ..Default :: default ( )
290- } ) ;
284+ env. insert (
285+ PYTHONPATH . into ( ) ,
286+ EnvVar {
287+ // PYTHONPATH must be extended to include the dags folder so that dag
288+ // dependencies can be found: this must be the actual path and not a variable.
289+ // Also include the airflow site-packages by default (for airflow and kubernetes classes etc.)
290+ name : PYTHONPATH . into ( ) ,
291+ value : Some ( format ! ( "{LOG_CONFIG_DIR}:{dags_folder}" ) ) ,
292+ ..Default :: default ( )
293+ } ,
294+ ) ;
295+ env. insert (
296+ AIRFLOW_LOGGING_LOGGING_CONFIG_CLASS . into ( ) ,
297+ EnvVar {
298+ name : AIRFLOW_LOGGING_LOGGING_CONFIG_CLASS . into ( ) ,
299+ value : Some ( "log_config.LOGGING_CONFIG" . into ( ) ) ,
300+ ..Default :: default ( )
301+ } ,
302+ ) ;
303+
304+ env. insert (
305+ AIRFLOW_METRICS_STATSD_ON . into ( ) ,
306+ EnvVar {
307+ name : AIRFLOW_METRICS_STATSD_ON . into ( ) ,
308+ value : Some ( "True" . into ( ) ) ,
309+ ..Default :: default ( )
310+ } ,
311+ ) ;
312+
313+ env. insert (
314+ AIRFLOW_METRICS_STATSD_HOST . into ( ) ,
315+ EnvVar {
316+ name : AIRFLOW_METRICS_STATSD_HOST . into ( ) ,
317+ value : Some ( "0.0.0.0" . into ( ) ) ,
318+ ..Default :: default ( )
319+ } ,
320+ ) ;
321+
322+ env. insert (
323+ AIRFLOW_METRICS_STATSD_PORT . into ( ) ,
324+ EnvVar {
325+ name : AIRFLOW_METRICS_STATSD_PORT . into ( ) ,
326+ value : Some ( "9125" . into ( ) ) ,
327+ ..Default :: default ( )
328+ } ,
329+ ) ;
291330
292331 env. insert (
293332 AIRFLOW_API_AUTH_BACKEND . into ( ) ,
@@ -324,54 +363,69 @@ pub fn build_airflow_template_envs(
324363 ) ,
325364 ) ;
326365
327- env. insert ( AIRFLOW_CORE_EXECUTOR . into ( ) , EnvVar {
328- name : AIRFLOW_CORE_EXECUTOR . into ( ) ,
329- value : Some ( "LocalExecutor" . to_string ( ) ) ,
330- ..Default :: default ( )
331- } ) ;
366+ env. insert (
367+ AIRFLOW_CORE_EXECUTOR . into ( ) ,
368+ EnvVar {
369+ name : AIRFLOW_CORE_EXECUTOR . into ( ) ,
370+ value : Some ( "LocalExecutor" . to_string ( ) ) ,
371+ ..Default :: default ( )
372+ } ,
373+ ) ;
332374
333- env. insert ( AIRFLOW_KUBERNETES_EXECUTOR_NAMESPACE . into ( ) , EnvVar {
334- name : AIRFLOW_KUBERNETES_EXECUTOR_NAMESPACE . into ( ) ,
335- value : airflow. namespace ( ) ,
336- ..Default :: default ( )
337- } ) ;
375+ env. insert (
376+ AIRFLOW_KUBERNETES_EXECUTOR_NAMESPACE . into ( ) ,
377+ EnvVar {
378+ name : AIRFLOW_KUBERNETES_EXECUTOR_NAMESPACE . into ( ) ,
379+ value : airflow. namespace ( ) ,
380+ ..Default :: default ( )
381+ } ,
382+ ) ;
338383
339384 // the config map also requires the dag-folder location as this will be passed on
340385 // to the pods started by airflow.
341386 let dags_folder = get_dags_folder ( git_sync_resources) ;
342- env. insert ( AIRFLOW_CORE_DAGS_FOLDER . into ( ) , EnvVar {
343- name : AIRFLOW_CORE_DAGS_FOLDER . into ( ) ,
344- value : Some ( dags_folder) ,
345- ..Default :: default ( )
346- } ) ;
387+ env. insert (
388+ AIRFLOW_CORE_DAGS_FOLDER . into ( ) ,
389+ EnvVar {
390+ name : AIRFLOW_CORE_DAGS_FOLDER . into ( ) ,
391+ value : Some ( dags_folder) ,
392+ ..Default :: default ( )
393+ } ,
394+ ) ;
347395
348396 env. extend ( static_envs ( git_sync_resources) ) ;
349397
350398 // _STACKABLE_POST_HOOK will contain a command to create a shutdown hook that will be
351399 // evaluated in the wrapper for each stackable spark container: this is necessary for pods
352400 // that are created and then terminated (we do a similar thing for spark-k8s).
353401 if config. logging . enable_vector_agent {
354- env. insert ( "_STACKABLE_POST_HOOK" . into ( ) , EnvVar {
355- name : "_STACKABLE_POST_HOOK" . into ( ) ,
356- value : Some (
357- [
358- // Wait for Vector to gather the logs.
359- "sleep 10" ,
360- & create_vector_shutdown_file_command ( STACKABLE_LOG_DIR ) ,
361- ]
362- . join ( "; " ) ,
363- ) ,
364- ..Default :: default ( )
365- } ) ;
402+ env. insert (
403+ "_STACKABLE_POST_HOOK" . into ( ) ,
404+ EnvVar {
405+ name : "_STACKABLE_POST_HOOK" . into ( ) ,
406+ value : Some (
407+ [
408+ // Wait for Vector to gather the logs.
409+ "sleep 10" ,
410+ & create_vector_shutdown_file_command ( STACKABLE_LOG_DIR ) ,
411+ ]
412+ . join ( "; " ) ,
413+ ) ,
414+ ..Default :: default ( )
415+ } ,
416+ ) ;
366417 }
367418
368419 // iterate over a BTreeMap to ensure the vars are written in a predictable order
369420 for ( k, v) in env_overrides. iter ( ) . collect :: < BTreeMap < _ , _ > > ( ) {
370- env. insert ( k. to_string ( ) , EnvVar {
371- name : k. to_string ( ) ,
372- value : Some ( v. to_string ( ) ) ,
373- ..Default :: default ( )
374- } ) ;
421+ env. insert (
422+ k. to_string ( ) ,
423+ EnvVar {
424+ name : k. to_string ( ) ,
425+ value : Some ( v. to_string ( ) ) ,
426+ ..Default :: default ( )
427+ } ,
428+ ) ;
375429 }
376430
377431 tracing:: debug!( "Env-var set [{:?}]" , env) ;
0 commit comments