Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.datahub.upgrade;

import com.linkedin.datahub.upgrade.cleanup.Cleanup;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager;
import com.linkedin.datahub.upgrade.loadindices.LoadIndices;
import com.linkedin.datahub.upgrade.removeunknownaspects.RemoveUnknownAspects;
Expand Down Expand Up @@ -86,6 +87,10 @@ private static final class Args {
@Named("reindexDebug")
private ReindexDebug reindexDebug;

@Autowired(required = false)
@Named("cleanup")
private Cleanup cleanup;

@Override
public void run(String... cmdLineArgs) {
// Register upgrades with null checks and warnings
Expand Down Expand Up @@ -149,6 +154,12 @@ public void run(String... cmdLineArgs) {
log.info("ReindexDebug upgrade not available - bean not found");
}

if (cleanup != null) {
_upgradeManager.register(cleanup);
} else {
log.info("Cleanup upgrade not available - bean not found");
}

final Args args = new Args();
new CommandLine(args).setCaseInsensitiveEnumValuesAllowed(true).parseArgs(cmdLineArgs);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.datahub.upgrade;

import com.linkedin.datahub.upgrade.cleanup.CleanupCondition;
import com.linkedin.datahub.upgrade.cleanup.CleanupUpgradeConfig;
import com.linkedin.datahub.upgrade.conditions.GeneralUpgradeCondition;
import com.linkedin.datahub.upgrade.conditions.LoadIndicesCondition;
import com.linkedin.datahub.upgrade.conditions.SqlSetupCondition;
Expand Down Expand Up @@ -34,4 +36,10 @@ public static class SqlSetupConfiguration {}
@Conditional(GeneralUpgradeCondition.class)
@Import(GeneralUpgradeConfiguration.class)
public static class GeneralConfiguration {}

/** Configuration for Cleanup upgrade - teardown of all DataHub infrastructure resources */
@Configuration
@Conditional(CleanupCondition.class)
@Import(CleanupUpgradeConfig.class)
public static class CleanupConfiguration {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.linkedin.datahub.upgrade.cleanup;

import com.linkedin.datahub.upgrade.Upgrade;
import com.linkedin.datahub.upgrade.UpgradeStep;
import java.util.List;

/**
* Upgrade that tears down all infrastructure resources created by DataHub setup jobs. Intended to
* run as a Helm pre-delete hook so that {@code helm uninstall} leaves no DataHub-specific state in
* shared infrastructure (Elasticsearch, Kafka, SQL).
*
* <p>Execution order: Elasticsearch → Kafka → SQL. Elasticsearch is cleaned first so that indices
* are not queried while the database is being dropped.
*/
public class Cleanup implements Upgrade {

private final List<UpgradeStep> steps;

public Cleanup(List<UpgradeStep> steps) {
this.steps = steps;
}

@Override
public String id() {
return "Cleanup";
}

@Override
public List<UpgradeStep> steps() {
return steps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.linkedin.datahub.upgrade.cleanup;

import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.springframework.boot.ApplicationArguments;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;

/**
* Spring condition that matches when the CLI arguments contain {@code Cleanup}. This ensures the
* cleanup-specific Spring configuration is only loaded for the cleanup upgrade path.
*/
public class CleanupCondition implements Condition {
public static final String CLEANUP_ARG = "Cleanup";
public static final Set<String> CLEANUP_ARGS = Set.of(CLEANUP_ARG);

@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
List<String> nonOptionArgs =
context.getBeanFactory().getBean(ApplicationArguments.class).getNonOptionArgs();
if (nonOptionArgs == null) {
return false;
}
return nonOptionArgs.stream().filter(Objects::nonNull).anyMatch(CLEANUP_ARGS::contains);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package com.linkedin.datahub.upgrade.cleanup;

import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.config.OpenTelemetryConfig;
import com.linkedin.datahub.upgrade.sqlsetup.SqlSetupArgs;
import com.linkedin.datahub.upgrade.sqlsetup.config.SqlSetupConfig;
import com.linkedin.datahub.upgrade.sqlsetup.config.SqlSetupEbeanFactory;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.utils.EnvironmentUtils;
import io.ebean.Database;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.actuate.autoconfigure.metrics.MetricsAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType;
import org.springframework.context.annotation.Import;

/**
* Spring configuration for the Cleanup upgrade. Loads the minimal set of beans needed to tear down
* Elasticsearch indices, Kafka topics, and the SQL database.
*/
@Slf4j
@Configuration
@Import({
MetricsAutoConfiguration.class,
OpenTelemetryConfig.class,
SqlSetupConfig.class,
SqlSetupEbeanFactory.class
})
@ComponentScan(
basePackages = {
"com.linkedin.gms.factory.config",
"com.linkedin.gms.factory.common",
"com.linkedin.gms.factory.entity",
"com.linkedin.gms.factory.entityclient",
"com.linkedin.gms.factory.plugins",
"com.linkedin.gms.factory.entityregistry",
"com.linkedin.gms.factory.search",
"com.linkedin.gms.factory.timeseries",
"com.linkedin.gms.factory.context",
"com.linkedin.gms.factory.system_telemetry"
},
excludeFilters = {
@ComponentScan.Filter(
type = FilterType.ASSIGNABLE_TYPE,
classes = {})
})
public class CleanupUpgradeConfig {

@Autowired(required = false)
private BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents;

@Autowired(required = false)
private ConfigurationProvider configurationProvider;

@Autowired(required = false)
private KafkaProperties kafkaProperties;

@Autowired(required = false)
@Qualifier("ebeanServer")
private Database ebeanServer;

@Autowired(required = false)
@Qualifier("sqlSetupArgs")
private SqlSetupArgs sqlSetupArgs;

@Bean(name = "cleanup")
@Nonnull
public Cleanup createCleanup() {
boolean esEnabled = EnvironmentUtils.getBoolean("CLEANUP_ELASTICSEARCH_ENABLED", true);
boolean kafkaEnabled = EnvironmentUtils.getBoolean("CLEANUP_KAFKA_ENABLED", true);
boolean sqlEnabled = EnvironmentUtils.getBoolean("CLEANUP_SQL_ENABLED", true);

List<UpgradeStep> steps = new ArrayList<>();

// Order: ES first (so indices aren't queried during DB drop), then Kafka, then SQL
if (esEnabled && esComponents != null && configurationProvider != null) {
steps.add(new DeleteElasticsearchIndicesStep(esComponents, configurationProvider));
log.info("Elasticsearch cleanup step enabled");
} else if (esEnabled) {
log.warn("Elasticsearch cleanup requested but ES components not available — skipping");
}

if (kafkaEnabled && configurationProvider != null && kafkaProperties != null) {
KafkaConfiguration kafkaConfig = configurationProvider.getKafka();
steps.add(new DeleteKafkaTopicsStep(kafkaConfig, kafkaProperties));
log.info("Kafka cleanup step enabled");
} else if (kafkaEnabled) {
log.warn("Kafka cleanup requested but Kafka config not available — skipping");
}

if (sqlEnabled && ebeanServer != null && sqlSetupArgs != null) {
steps.add(new DropDatabaseStep(ebeanServer, sqlSetupArgs));
log.info("SQL cleanup step enabled");
} else if (sqlEnabled) {
log.warn("SQL cleanup requested but database not available — skipping");
}

return new Cleanup(steps);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package com.linkedin.datahub.upgrade.cleanup;

import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.utils.EnvironmentUtils;
import com.linkedin.metadata.utils.elasticsearch.responses.RawResponse;
import com.linkedin.upgrade.DataHubUpgradeState;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.client.Request;
import org.opensearch.client.ResponseException;

/**
* Deletes all Elasticsearch/OpenSearch resources created by DataHub: indices matching the
* configured prefix, usage event data streams / aliases, index templates, ILM/ISM policies, and
* security roles/users.
*/
@Slf4j
public class DeleteElasticsearchIndicesStep implements UpgradeStep {

private final BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents;
private final ConfigurationProvider configurationProvider;

public DeleteElasticsearchIndicesStep(
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents esComponents,
ConfigurationProvider configurationProvider) {
this.esComponents = esComponents;
this.configurationProvider = configurationProvider;
}

@Override
public String id() {
return "DeleteElasticsearchIndicesStep";
}

@Override
public int retryCount() {
return 2;
}

@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
String indexPrefix = configurationProvider.getElasticSearch().getIndex().getFinalPrefix();
boolean isOpenSearch = esComponents.getSearchClient().getEngineType().isOpenSearch();

deleteUsageEventResources(indexPrefix, isOpenSearch);
deleteAllIndices(indexPrefix);
deleteSecurityResources(indexPrefix, isOpenSearch);

log.info("Elasticsearch cleanup completed successfully");
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.SUCCEEDED);
} catch (Exception e) {
log.error("DeleteElasticsearchIndicesStep failed.", e);
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED);
}
};
}

/** Delete all indices matching {prefix}* */
private void deleteAllIndices(String prefix) {
String pattern = prefix + "*";
try {
log.info("Deleting all indices matching pattern: {}", pattern);
performDelete("/" + pattern + "?ignore_unavailable=true");
log.info("Successfully deleted indices matching {}", pattern);
} catch (Exception e) {
log.warn("Failed to delete indices matching {}: {}", pattern, e.getMessage());
}
}

/** Delete usage event data streams, index templates, and ILM/ISM policies. */
private void deleteUsageEventResources(String prefix, boolean isOpenSearch) {
String prefixedDataStream = prefix + "datahub_usage_event";
String prefixedTemplate = prefix + "datahub_usage_event_index_template";
String prefixedPolicy = prefix + "datahub_usage_event_policy";

if (isOpenSearch) {
// OpenSearch: delete alias, then ISM policy
safeDelete("/" + prefixedDataStream, "usage event alias");
safeDelete("/_index_template/" + prefixedTemplate, "usage event index template");
// Try both ISM API paths
safeDelete("/_plugins/_ism/policies/" + prefixedPolicy, "ISM policy (plugins)");
safeDelete("/_opendistro/_ism/policies/" + prefixedPolicy, "ISM policy (opendistro)");
} else {
// Elasticsearch: delete data stream, index template, ILM policy
safeDelete("/_data_stream/" + prefixedDataStream, "usage event data stream");
safeDelete("/_index_template/" + prefixedTemplate, "usage event index template");
safeDelete("/_ilm/policy/" + prefixedPolicy, "ILM policy");
}
}

/** Delete the security role and user created by CreateUserStep. */
private void deleteSecurityResources(String prefix, boolean isOpenSearch) {
String roleName = prefix + "access";
String username = EnvironmentUtils.getString("CREATE_USER_ES_USERNAME");

if (isOpenSearch) {
// Role mapping
safeDelete("/_plugins/_security/api/rolesmapping/" + roleName, "OpenSearch role mapping");
safeDelete(
"/_opendistro/_security/api/rolesmapping/" + roleName,
"OpenSearch role mapping (opendistro)");
// User
if (username != null && !username.isEmpty()) {
safeDelete(
"/_opendistro/_security/api/internalusers/" + username, "OpenSearch internal user");
}
// Role
safeDelete("/_opendistro/_security/api/roles/" + roleName, "OpenSearch role");
} else {
// Elasticsearch Cloud
if (username != null && !username.isEmpty()) {
safeDelete("/_security/user/" + username, "Elasticsearch user");
}
safeDelete("/_security/role/" + roleName, "Elasticsearch role");
}
}

/** Perform a DELETE request, logging but not throwing on 404. */
private void safeDelete(String endpoint, String description) {
try {
performDelete(endpoint);
log.info("Deleted {}: {}", description, endpoint);
} catch (ResponseException e) {
int status = e.getResponse().getStatusLine().getStatusCode();
if (status == 404) {
log.info("{} not found (already absent): {}", description, endpoint);
} else {
log.warn("Failed to delete {} (HTTP {}): {}", description, status, e.getMessage());
}
} catch (Exception e) {
log.warn("Failed to delete {}: {}", description, e.getMessage());
}
}

private void performDelete(String endpoint) throws Exception {
log.info("DELETE => {}", endpoint);
Request request = new Request("DELETE", endpoint);
RawResponse response = esComponents.getSearchClient().performLowLevelRequest(request);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode >= 400) {
throw new RuntimeException("DELETE " + endpoint + " returned HTTP " + statusCode);
}
}
}
Loading
Loading