@@ -41,8 +41,8 @@ use stackable_operator::{
4141 api:: {
4242 apps:: v1:: { StatefulSet , StatefulSetSpec } ,
4343 core:: v1:: {
44- ConfigMap , PersistentVolumeClaim , PodTemplateSpec , Probe , Service , ServiceAccount ,
45- ServicePort , ServiceSpec , TCPSocketAction ,
44+ ConfigMap , PersistentVolumeClaim , PodTemplateSpec , Probe , ServiceAccount ,
45+ TCPSocketAction ,
4646 } ,
4747 } ,
4848 apimachinery:: pkg:: { apis:: meta:: v1:: LabelSelector , util:: intstr:: IntOrString } ,
@@ -101,6 +101,10 @@ use crate::{
101101 pdb:: add_pdbs,
102102 } ,
103103 product_logging:: extend_config_map_with_log_config,
104+ service:: {
105+ build_rolegroup_headless_service, build_rolegroup_metrics_service,
106+ stateful_set_service_name,
107+ } ,
104108} ;
105109
106110pub const AIRFLOW_CONTROLLER_NAME : & str = "airflowcluster" ;
@@ -339,6 +343,9 @@ pub enum Error {
339343 ApplyGroupListener {
340344 source : stackable_operator:: cluster_resources:: Error ,
341345 } ,
346+
347+ #[ snafu( display( "failed to configure service" ) ) ]
348+ ServiceConfiguration { source : crate :: service:: Error } ,
342349}
343350
344351type Result < T , E = Error > = std:: result:: Result < T , E > ;
@@ -491,12 +498,53 @@ pub async fn reconcile_airflow(
491498 )
492499 . context ( InvalidGitSyncSpecSnafu ) ?;
493500
494- let rg_service = build_rolegroup_service ( airflow, & resolved_product_image, & rolegroup) ?;
495- cluster_resources. add ( client, rg_service) . await . context (
496- ApplyRoleGroupServiceSnafu {
501+ let role_group_service_recommended_labels = build_recommended_labels (
502+ airflow,
503+ AIRFLOW_CONTROLLER_NAME ,
504+ & resolved_product_image. app_version_label ,
505+ & rolegroup. role ,
506+ & rolegroup. role_group ,
507+ ) ;
508+
509+ let role_group_service_selector = Labels :: role_group_selector (
510+ airflow,
511+ APP_NAME ,
512+ & rolegroup. role ,
513+ & rolegroup. role_group ,
514+ )
515+ . context ( LabelBuildSnafu ) ?;
516+
517+ // Only apply headless service for something exposing an HTTP port
518+ if airflow_role. get_http_port ( ) . is_some ( ) {
519+ let rg_headless_service = build_rolegroup_headless_service (
520+ airflow,
521+ & rolegroup,
522+ role_group_service_recommended_labels. clone ( ) ,
523+ role_group_service_selector. clone ( ) . into ( ) ,
524+ )
525+ . context ( ServiceConfigurationSnafu ) ?;
526+
527+ cluster_resources
528+ . add ( client, rg_headless_service)
529+ . await
530+ . context ( ApplyRoleGroupServiceSnafu {
531+ rolegroup : rolegroup. clone ( ) ,
532+ } ) ?;
533+ }
534+
535+ let rg_metrics_service = build_rolegroup_metrics_service (
536+ airflow,
537+ & rolegroup,
538+ role_group_service_recommended_labels,
539+ role_group_service_selector. into ( ) ,
540+ )
541+ . context ( ServiceConfigurationSnafu ) ?;
542+ cluster_resources
543+ . add ( client, rg_metrics_service)
544+ . await
545+ . context ( ApplyRoleGroupServiceSnafu {
497546 rolegroup : rolegroup. clone ( ) ,
498- } ,
499- ) ?;
547+ } ) ?;
500548
501549 let rg_statefulset = build_server_rolegroup_statefulset (
502550 airflow,
@@ -767,53 +815,6 @@ fn build_rolegroup_config_map(
767815 } )
768816}
769817
770- /// The rolegroup [`Service`] is a headless service that allows direct access to the instances of a certain rolegroup
771- ///
772- /// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing.
773- fn build_rolegroup_service (
774- airflow : & v1alpha1:: AirflowCluster ,
775- resolved_product_image : & ResolvedProductImage ,
776- rolegroup : & RoleGroupRef < v1alpha1:: AirflowCluster > ,
777- ) -> Result < Service > {
778- let ports = vec ! [ ServicePort {
779- name: Some ( METRICS_PORT_NAME . into( ) ) ,
780- port: METRICS_PORT . into( ) ,
781- protocol: Some ( "TCP" . to_string( ) ) ,
782- ..Default :: default ( )
783- } ] ;
784-
785- let prometheus_label =
786- Label :: try_from ( ( "prometheus.io/scrape" , "true" ) ) . context ( BuildLabelSnafu ) ?;
787-
788- let metadata = build_rolegroup_metadata (
789- airflow,
790- & resolved_product_image,
791- & rolegroup,
792- prometheus_label,
793- format ! ( "{name}-metrics" , name = rolegroup. object_name( ) ) ,
794- ) ?;
795-
796- let service_selector_labels =
797- Labels :: role_group_selector ( airflow, APP_NAME , & rolegroup. role , & rolegroup. role_group )
798- . context ( BuildLabelSnafu ) ?;
799-
800- let service_spec = ServiceSpec {
801- // Internal communication does not need to be exposed
802- type_ : Some ( "ClusterIP" . to_string ( ) ) ,
803- cluster_ip : Some ( "None" . to_string ( ) ) ,
804- ports : Some ( ports) ,
805- selector : Some ( service_selector_labels. into ( ) ) ,
806- publish_not_ready_addresses : Some ( true ) ,
807- ..ServiceSpec :: default ( )
808- } ;
809-
810- Ok ( Service {
811- metadata,
812- spec : Some ( service_spec) ,
813- status : None ,
814- } )
815- }
816-
817818fn build_rolegroup_metadata (
818819 airflow : & v1alpha1:: AirflowCluster ,
819820 resolved_product_image : & & ResolvedProductImage ,
@@ -1160,10 +1161,7 @@ fn build_server_rolegroup_statefulset(
11601161 match_labels : Some ( statefulset_match_labels. into ( ) ) ,
11611162 ..LabelSelector :: default ( )
11621163 } ,
1163- service_name : Some ( format ! (
1164- "{name}-metrics" ,
1165- name = rolegroup_ref. object_name( )
1166- ) ) ,
1164+ service_name : stateful_set_service_name ( airflow_role, rolegroup_ref) ,
11671165 template : pod_template,
11681166 volume_claim_templates : pvcs,
11691167 ..StatefulSetSpec :: default ( )
@@ -1349,7 +1347,6 @@ pub fn error_policy(
13491347 _ => Action :: requeue ( * Duration :: from_secs ( 10 ) ) ,
13501348 }
13511349}
1352- // I want to add secret volumes right here
13531350
13541351fn add_authentication_volumes_and_volume_mounts (
13551352 authentication_config : & AirflowClientAuthenticationDetailsResolved ,
0 commit comments