Skip to content

feat: Airflow 3.0.1 (experimental) #630

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions Cargo.nix

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ product-config = { git = "https://github.com/stackabletech/product-config.git",
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", features = ["telemetry", "versioned"], tag = "stackable-operator-0.93.1" }

anyhow = "1.0"
base64 = "0.22.1"
built = { version = "0.7", features = ["chrono", "git2"] }
clap = "4.5"
const_format = "0.2"
fnv = "1.0"
futures = { version = "0.3", features = ["compat"] }
indoc = "2.0"
lazy_static = "1.4"
rand = "0.8.5"
rstest = "0.25"
semver = "1.0"
serde = { version = "1.0", features = ["derive"] }
Expand Down
3 changes: 3 additions & 0 deletions rust/operator-binary/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ product-config.workspace = true
stackable-operator.workspace = true

anyhow.workspace = true
base64.workspace = true
clap.workspace = true
const_format.workspace = true
fnv.workspace = true
futures.workspace = true
lazy_static.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
Expand Down
10 changes: 8 additions & 2 deletions rust/operator-binary/src/airflow_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,8 @@ fn build_server_rolegroup_statefulset(
.context(GracefulShutdownSnafu)?;

let mut airflow_container_args = Vec::new();
airflow_container_args.extend(airflow_role.get_commands(authentication_config));
airflow_container_args
.extend(airflow_role.get_commands(authentication_config, resolved_product_image));

airflow_container
.image_from_product_image(resolved_product_image)
Expand All @@ -980,6 +981,7 @@ fn build_server_rolegroup_statefulset(
authentication_config,
authorization_config,
git_sync_resources,
resolved_product_image,
)
.context(BuildStatefulsetEnvVarsSnafu)?,
);
Expand Down Expand Up @@ -1170,7 +1172,10 @@ fn build_server_rolegroup_statefulset(
match_labels: Some(statefulset_match_labels.into()),
..LabelSelector::default()
},
service_name: Some(rolegroup_ref.object_name()),
service_name: Some(format!(
"{name}-metrics",
name = rolegroup_ref.object_name()
)),
template: pod_template,
volume_claim_templates: pvcs,
..StatefulSetSpec::default()
Expand Down Expand Up @@ -1263,6 +1268,7 @@ fn build_executor_template_config_map(
env_overrides,
merged_executor_config,
git_sync_resources,
resolved_product_image,
))
.add_volume_mounts(airflow.volume_mounts())
.context(AddVolumeMountSnafu)?
Expand Down
131 changes: 86 additions & 45 deletions rust/operator-binary/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use stackable_operator::{
cache::UserInformationCache,
cluster_operation::ClusterOperation,
opa::OpaConfig,
product_image_selection::ProductImage,
product_image_selection::{ProductImage, ResolvedProductImage},
resources::{
CpuLimitsFragment, MemoryLimitsFragment, NoRuntimeLimits, NoRuntimeLimitsFragment,
Resources, ResourcesFragment,
Expand Down Expand Up @@ -324,11 +324,6 @@ impl v1alpha1::AirflowCluster {
self.spec.cluster_config.volume_mounts.clone()
}

/// The name of the role-level load-balanced Kubernetes `Service`
pub fn node_role_service_name(&self) -> Option<String> {
self.metadata.name.clone()
}

/// Retrieve and merge resource configs for role and role groups
pub fn merged_config(
&self,
Expand Down Expand Up @@ -548,6 +543,7 @@ impl AirflowRole {
pub fn get_commands(
&self,
auth_config: &AirflowClientAuthenticationDetailsResolved,
resolved_product_image: &ResolvedProductImage,
) -> Vec<String> {
let mut command = vec![
format!(
Expand All @@ -558,43 +554,88 @@ impl AirflowRole {
remove_vector_shutdown_file_command(STACKABLE_LOG_DIR),
];

match &self {
AirflowRole::Webserver => {
// Getting auth commands for AuthClass
command.extend(Self::authentication_start_commands(auth_config));
command.extend(vec![
if resolved_product_image.product_version.starts_with("3.") {
// Start-up commands have changed in 3.x.
// See https://airflow.apache.org/docs/apache-airflow/3.0.1/installation/upgrading_to_airflow3.html#step-6-changes-to-your-startup-scripts and
// https://airflow.apache.org/docs/apache-airflow/3.0.1/installation/setting-up-the-database.html#setting-up-the-database.
// If `airflow db migrate` is not run for each role there may be
// timing issues (services which require the db start before the
// migration is complete). DB-migrations should be eventually be
// optional. See https://github.com/stackabletech/airflow-operator/issues/589.
match &self {
AirflowRole::Webserver => {
command.extend(Self::authentication_start_commands(auth_config));
command.extend(vec![
"prepare_signal_handlers".to_string(),
format!("containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"),
"airflow api-server &".to_string(),
]);
}
AirflowRole::Scheduler => command.extend(vec![
"airflow db migrate".to_string(),
"airflow users create \
--username \"$ADMIN_USERNAME\" \
--firstname \"$ADMIN_FIRSTNAME\" \
--lastname \"$ADMIN_LASTNAME\" \
--email \"$ADMIN_EMAIL\" \
--password \"$ADMIN_PASSWORD\" \
--role \"Admin\""
.to_string(),
"prepare_signal_handlers".to_string(),
format!(
"containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"
),
"airflow dag-processor &".to_string(),
"airflow scheduler &".to_string(),
]),
AirflowRole::Worker => command.extend(vec![
"prepare_signal_handlers".to_string(),
format!("containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"),
"airflow webserver &".to_string(),
]);
format!(
"containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"
),
"airflow db migrate".to_string(),
"airflow celery worker &".to_string(),
]),
}
} else {
match &self {
AirflowRole::Webserver => {
// Getting auth commands for AuthClass
command.extend(Self::authentication_start_commands(auth_config));
command.extend(vec![
"prepare_signal_handlers".to_string(),
format!("containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"),
"airflow webserver &".to_string(),
]);
}
AirflowRole::Scheduler => command.extend(vec![
// Database initialization is limited to the scheduler, see https://github.com/stackabletech/airflow-operator/issues/259
"airflow db init".to_string(),
"airflow db upgrade".to_string(),
"airflow users create \
--username \"$ADMIN_USERNAME\" \
--firstname \"$ADMIN_FIRSTNAME\" \
--lastname \"$ADMIN_LASTNAME\" \
--email \"$ADMIN_EMAIL\" \
--password \"$ADMIN_PASSWORD\" \
--role \"Admin\""
.to_string(),
"prepare_signal_handlers".to_string(),
format!(
"containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"
),
"airflow scheduler &".to_string(),
]),
AirflowRole::Worker => command.extend(vec![
"prepare_signal_handlers".to_string(),
format!(
"containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"
),
"airflow celery worker &".to_string(),
]),
}

AirflowRole::Scheduler => command.extend(vec![
// Database initialization is limited to the scheduler, see https://github.com/stackabletech/airflow-operator/issues/259
"airflow db init".to_string(),
"airflow db upgrade".to_string(),
"airflow users create \
--username \"$ADMIN_USERNAME\" \
--firstname \"$ADMIN_FIRSTNAME\" \
--lastname \"$ADMIN_LASTNAME\" \
--email \"$ADMIN_EMAIL\" \
--password \"$ADMIN_PASSWORD\" \
--role \"Admin\""
.to_string(),
"prepare_signal_handlers".to_string(),
format!(
"containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"
),
"airflow scheduler &".to_string(),
]),
AirflowRole::Worker => command.extend(vec![
"prepare_signal_handlers".to_string(),
format!(
"containerdebug --output={STACKABLE_LOG_DIR}/containerdebug-state.json --loop &"
),
"airflow celery worker &".to_string(),
]),
}

// graceful shutdown part
command.extend(vec![
"wait_for_termination $!".to_string(),
Expand Down Expand Up @@ -852,17 +893,17 @@ fn default_resources(role: &AirflowRole) -> ResourcesFragment<AirflowStorageConf
let (cpu, memory) = match role {
AirflowRole::Worker => (
CpuLimitsFragment {
min: Some(Quantity("500m".into())),
min: Some(Quantity("1".into())),
max: Some(Quantity("2".into())),
},
MemoryLimitsFragment {
limit: Some(Quantity("2Gi".into())),
limit: Some(Quantity("3Gi".into())),
runtime_limits: NoRuntimeLimitsFragment {},
},
),
AirflowRole::Webserver => (
CpuLimitsFragment {
min: Some(Quantity("500m".into())),
min: Some(Quantity("1".into())),
max: Some(Quantity("2".into())),
},
MemoryLimitsFragment {
Expand All @@ -872,11 +913,11 @@ fn default_resources(role: &AirflowRole) -> ResourcesFragment<AirflowStorageConf
),
AirflowRole::Scheduler => (
CpuLimitsFragment {
min: Some(Quantity("500m".to_owned())),
min: Some(Quantity("1".to_owned())),
max: Some(Quantity("2".to_owned())),
},
MemoryLimitsFragment {
limit: Some(Quantity("512Mi".to_owned())),
limit: Some(Quantity("1Gi".to_owned())),
runtime_limits: NoRuntimeLimitsFragment {},
},
),
Expand Down
Loading
Loading