From 8970cb5e13a422297ebdad92b82a8b89ecbf5905 Mon Sep 17 00:00:00 2001 From: ruanwenjun Date: Sat, 9 May 2026 22:22:49 +0800 Subject: [PATCH] [Chore][Master] Quietly exit WorkerGroupDispatcher loop on interrupt MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WorkerGroupDispatcher#run consumed TaskDispatchableEventBus#take() which was annotated with @SneakyThrows, so an InterruptedException raised when the master shuts down (the dispatch thread is parked on the queue) was rethrown as a RuntimeException and surfaced with a full stack trace — alarming "thread died" noise during a perfectly graceful shutdown. Drop @SneakyThrows from take() so it declares InterruptedException, and let the dispatch loop catch it: re-set the interrupt flag, log a single info line, and return so the daemon thread exits cleanly. Also clamp the dispatch-retry waiting time to >= 1s so a freshly-counted failure does not immediately re-enqueue the task against the same unhealthy worker group. In addition, document how to run dolphinscheduler-master tests in the module's CLAUDE.md: no Docker required, watch out for stale JaCoCo classes, surefire forks 4 JVMs in parallel, and the trailing "kill self fork JVM ... 30 seconds after System.exit(0)" line is a harmless warning. --- dolphinscheduler-master/CLAUDE.md | 38 ++++++++++++++++++- .../dispatcher/TaskDispatchableEventBus.java | 6 +-- .../dispatcher/WorkerGroupDispatcher.java | 15 ++++++-- 3 files changed, 50 insertions(+), 9 deletions(-) diff --git a/dolphinscheduler-master/CLAUDE.md b/dolphinscheduler-master/CLAUDE.md index 57b1e6bc0b39..0028557dc2cf 100644 --- a/dolphinscheduler-master/CLAUDE.md +++ b/dolphinscheduler-master/CLAUDE.md @@ -52,7 +52,43 @@ Exposed interfaces: ## Tests -`src/test/java` — unit + integration. Integration tests extend `AbstractMasterIntegrationTestCase`; they simulate distributed scenarios including failover. +`src/test/java` — unit + integration. Integration tests extend `AbstractMasterIntegrationTestCase` (a `@SpringBootTest`) and live under `server/master/integration/cases/`; they simulate distributed scenarios including failover. + +### Running the suite + +From the repo root: + +```bash +# Whole module (unit + integration). Use `clean` to avoid the JaCoCo +# "Cannot process instrumented class" failure when classes from a previous +# build are still on disk. +./mvnw -pl dolphinscheduler-master -am clean test + +# Single test class. -Dsurefire.failIfNoSpecifiedTests=false is required +# whenever -am pulls in upstream modules: the -Dtest filter applies to +# every reactor module, and surefire fails the build on any module that +# has zero matches without this flag. +./mvnw -pl dolphinscheduler-master -am clean test \ + -Dtest=WorkerGroupDispatcherTest \ + -Dsurefire.failIfNoSpecifiedTests=false + +# Single method +./mvnw -pl dolphinscheduler-master -am clean test \ + -Dtest=WorkerGroupDispatcherTest#dispatch \ + -Dsurefire.failIfNoSpecifiedTests=false +``` + +### Local environment + +- **No Docker required.** Integration tests boot Spring Boot against an in-memory H2 (`spring-it-application.yaml`), with a fake registry — Testcontainers is not used here. The repo-wide `-Dm1_chip=true` flag only applies to `dolphinscheduler-api-test` / `dolphinscheduler-e2e`; ignore it when running this module. +- **Surefire forks 4 JVMs in parallel** (`forkCount=4`, `reuseForks=true`, see root `pom.xml`). Tests must therefore be parallel-safe — never share static mutable state across cases. Per-fork JaCoCo files land at `target/jacoco-${forkNumber}.exec`. +- The `Surefire is going to kill self fork JVM. The exit has elapsed 30 seconds after System.exit(0).` line at the end is **a harmless warning** — `BUILD SUCCESS` on the same run is the real signal. It just means a fork held a non-daemon thread past `System.exit`; do not chase it unless the build itself fails. + +### When tests fail + +1. Re-run with `clean` first — the JaCoCo error above and stale generated sources both vanish after a clean rebuild. +2. For integration tests, check `target/surefire-reports/*-output.txt` per fork — exceptions from the embedded master/H2 are logged there, not on stdout. +3. New lifecycle events / failover paths must be exercised against `AbstractMasterIntegrationTestCase` (see the gotcha above) — extend an existing case under `integration/cases/` rather than writing a bare unit test. ## Related modules diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchableEventBus.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchableEventBus.java index f6fdb119b081..e177cf7ec7ba 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchableEventBus.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/TaskDispatchableEventBus.java @@ -20,8 +20,6 @@ import org.apache.dolphinscheduler.eventbus.AbstractDelayEventBus; import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent; -import lombok.SneakyThrows; - public class TaskDispatchableEventBus, T extends Comparable> extends AbstractDelayEventBus { @@ -30,8 +28,8 @@ public void add(V v) { super.publish(v); } - @SneakyThrows - public V take() { + @Override + public V take() throws InterruptedException { return super.take(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java index 853e8d2c4bf1..cecdbd4bd488 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java @@ -87,7 +87,14 @@ public synchronized void start() { @Override public void run() { while (runningFlag.get()) { - TaskDispatchableEvent taskEntry = workerGroupEventBus.take(); + final TaskDispatchableEvent taskEntry; + try { + taskEntry = workerGroupEventBus.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("{} interrupted, exiting run loop", this.getName()); + return; + } ITaskExecution taskExecution = taskEntry.getData(); try ( TaskExecutorMDCUtils.MDCAutoClosable ignore = @@ -123,9 +130,9 @@ private void doDispatchTask(ITaskExecution taskExecution) { // If dispatch failed, will put the task back to the queue // The task will be dispatched after waiting time. - // the waiting time will increase multiple of times, but will not exceed 60 seconds - long waitingTimeMillis = Math.min( - taskExecution.getTaskExecutionContext().increaseDispatchFailTimes() * 1_000L, 60_000L); + // the waiting time grows with the fail count, bounded between 1 and 60 seconds + long waitingTimeMillis = Math.max(1_000L, Math.min( + taskExecution.getTaskExecutionContext().increaseDispatchFailTimes() * 1_000L, 60_000L)); dispatchTask(taskExecution, waitingTimeMillis); log.warn("Dispatch Task: {} failed will retry after: {}/ms", taskInstanceId, waitingTimeMillis, ex);