diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 1fedcd8f3a290..c01096716c972 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -423,7 +423,7 @@ public Job translate(List packages) { if (options.getDataflowKmsKey() != null) { environment.setServiceKmsKeyName(options.getDataflowKmsKey()); } - if (options.isHotKeyLoggingEnabled()) { + if (options.isHotKeyLoggingEnabled() || hasExperiment(options, "enable_hot_key_logging")) { DebugOptions debugOptions = new DebugOptions(); debugOptions.setEnableHotKeyLogging(true); environment.setDebugOptions(debugOptions); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java index f901b01ed5663..b94759c239a32 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java @@ -19,6 +19,7 @@ import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudDuration; import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime; +import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment; import com.google.api.client.util.Clock; import com.google.api.services.dataflow.model.ApproximateSplitRequest; @@ -132,7 +133,7 @@ protected void reportProgressHelper() throws Exception { // The key set the in BatchModeExecutionContext is only set in the GroupingShuffleReader // which is the correct key. The key is also translated into a Java object in the reader. - if (options.isHotKeyLoggingEnabled()) { + if (options.isHotKeyLoggingEnabled() || hasExperiment(options, "enable_hot_key_logging")) { hotKeyLogger.logHotKeyDetection( hotKeyDetection.getUserStepName(), TimeUtil.fromCloudDuration(hotKeyDetection.getHotKeyAge()), diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index b0b6377dd8b17..86f2cffe604c3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.work.processing; +import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment; + import com.google.api.services.dataflow.model.MapTask; import com.google.auto.value.AutoValue; import java.util.Collection; @@ -368,7 +370,8 @@ private ExecuteWorkResult executeWork( Duration hotKeyAge = Duration.millis(hotKeyInfo.getHotKeyAgeUsec() / 1000); String stepName = getShuffleTaskStepName(computationState.getMapTask()); - if (options.isHotKeyLoggingEnabled() && keyCoder.isPresent()) { + if ((options.isHotKeyLoggingEnabled() || hasExperiment(options, "enable_hot_key_logging")) + && keyCoder.isPresent()) { hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge, executionKey); } else { hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge);