From 7ff01639786717e6b7c87602382f88ca0363f5fa Mon Sep 17 00:00:00 2001 From: ruanwenjun Date: Tue, 26 May 2026 13:59:15 +0800 Subject: [PATCH] [Fix-18292][Registry] Carry deleted node value in JDBC registry REMOVE event The JDBC registry built REMOVE events without eventData, so downstream listeners (AbstractClusterSubscribeListener / AbstractHAServer) parsed a null heartbeat, logged "Unknown cluster change event" and dropped the event, leaving stale master/slot state in memory after ephemeral nodes were purged. Thread the deleted node value through onJdbcRegistryDataDeleted so the REMOVE event carries it as eventData, matching the ZooKeeper and Etcd registries. --- ...JdbcRegistryDataChangeListenerAdapter.java | 3 +- .../JdbcRegistryDataChangeListener.java | 2 +- .../jdbc/server/JdbcRegistryServer.java | 3 +- ...RegistryDataChangeListenerAdapterTest.java | 72 +++++++++++++++++++ 4 files changed, 77 insertions(+), 3 deletions(-) create mode 100644 dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryDataChangeListenerAdapterTest.java diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryDataChangeListenerAdapter.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryDataChangeListenerAdapter.java index ccfb95f96391..a39426f9bb65 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryDataChangeListenerAdapter.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryDataChangeListenerAdapter.java @@ -46,13 +46,14 @@ public void onJdbcRegistryDataChanged(String eventPath, String value) { } @Override - public void onJdbcRegistryDataDeleted(String eventPath) { + public void onJdbcRegistryDataDeleted(String eventPath, String value) { if (!isPathMatch(watchedPath, eventPath, listener.getSubscribeScope())) { return; } final Event event = Event.builder() .watchedPath(watchedPath) .eventPath(eventPath) + .eventData(value) .type(Event.Type.REMOVE) .build(); listener.notify(event); diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryDataChangeListener.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryDataChangeListener.java index cea1df2149d4..d02e4259ef48 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryDataChangeListener.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryDataChangeListener.java @@ -21,7 +21,7 @@ public interface JdbcRegistryDataChangeListener { void onJdbcRegistryDataChanged(String key, String value); - void onJdbcRegistryDataDeleted(String key); + void onJdbcRegistryDataDeleted(String key, String value); void onJdbcRegistryDataAdded(String key, String value); diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java index 3e8b11c81351..61a934bd148e 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java @@ -196,7 +196,8 @@ public void onRegistryRowAdded(JdbcRegistryDataDTO data) { @Override public void onRegistryRowDeleted(JdbcRegistryDataDTO data) { - jdbcRegistryDataChangeListener.onJdbcRegistryDataDeleted(data.getDataKey()); + jdbcRegistryDataChangeListener.onJdbcRegistryDataDeleted(data.getDataKey(), + data.getDataValue()); } }); } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryDataChangeListenerAdapterTest.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryDataChangeListenerAdapterTest.java new file mode 100644 index 000000000000..efe16ac7d802 --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryDataChangeListenerAdapterTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.registry.jdbc; + +import org.apache.dolphinscheduler.registry.api.Event; +import org.apache.dolphinscheduler.registry.api.SubscribeListener; + +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.Test; + +import com.google.common.truth.Truth; + +class JdbcRegistryDataChangeListenerAdapterTest { + + private static final String WATCHED_PATH = "/nodes/master"; + + /** + * The REMOVE event must carry the deleted node value as eventData, otherwise downstream listeners + * (e.g. AbstractClusterSubscribeListener / AbstractHAServer) cannot parse the removed server and will + * silently drop the event, leaving stale registry data in memory. + */ + @Test + void onJdbcRegistryDataDeleted_shouldInjectDeletedValueAsEventData() { + final String eventPath = "/nodes/master/127.0.0.1:5678"; + final String deletedValue = "{\"address\":\"127.0.0.1:5678\"}"; + final AtomicReference notified = new AtomicReference<>(); + + final JdbcRegistryDataChangeListenerAdapter adapter = new JdbcRegistryDataChangeListenerAdapter( + WATCHED_PATH, capturingListener(notified, SubscribeListener.SubscribeScope.CHILDREN_ONLY)); + + adapter.onJdbcRegistryDataDeleted(eventPath, deletedValue); + + final Event event = notified.get(); + Truth.assertThat(event).isNotNull(); + Truth.assertThat(event.getType()).isEqualTo(Event.Type.REMOVE); + Truth.assertThat(event.getWatchedPath()).isEqualTo(WATCHED_PATH); + Truth.assertThat(event.getEventPath()).isEqualTo(eventPath); + Truth.assertThat(event.getEventData()).isEqualTo(deletedValue); + } + + private SubscribeListener capturingListener(final AtomicReference holder, + final SubscribeListener.SubscribeScope scope) { + return new SubscribeListener() { + + @Override + public void notify(final Event event) { + holder.set(event); + } + + @Override + public SubscribeScope getSubscribeScope() { + return scope; + } + }; + } +}