Skip to content

Commit fbe6f19

Browse files
authored
fix: Patch NiFi to avoid FlowSynchronizationException issue (#1481)
* fix: Patch NiFi to avoid FlowSynchronizationException issue * propagate exception * add patch for 2.7.2 * add patch for 2.6.0 * rename patches to be trackable * changelog * add patch for sibling issue NIFI-15801 before (updated) NIFI-15901 patch * add patch for sibling issue NIFI-15801 before (updated) NIFI-15901 patch: version 2.7.2 * add patch for sibling issue NIFI-15801 before (updated) NIFI-15901 patch: version 2.6.0 * updated changelog
1 parent 26d6f74 commit fbe6f19

7 files changed

Lines changed: 582 additions & 0 deletions

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ All notable changes to this project will be documented in this file.
88

99
- hadoop: Add precompiled hadoop for later reuse in dependent images ([#1466], [#1474]).
1010
- nifi: Add version `2.9.0` ([#1463]).
11+
- nifi: Backport NIFI-15801 to 2.x versions ([#1481]).
12+
- nifi: Backport NIFI-15901 to 2.x versions ([#1481]).
1113

1214
### Changed
1315

@@ -29,6 +31,7 @@ All notable changes to this project will be documented in this file.
2931
[#1471]: https://github.com/stackabletech/docker-images/pull/1471
3032
[#1474]: https://github.com/stackabletech/docker-images/pull/1474
3133
[#1476]: https://github.com/stackabletech/docker-images/pull/1476
34+
[#1481]: https://github.com/stackabletech/docker-images/pull/1481
3235

3336
## [26.3.0] - 2026-03-16
3437

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
From ec247bbf5e8b607267abaa2b302dc4a355e9767e Mon Sep 17 00:00:00 2001
2+
From: Andrew Kenworthy <andrew.kenworthy@stackable.tech>
3+
Date: Tue, 5 May 2026 17:44:42 +0200
4+
Subject: NIFI-15801 Stop processors in synchronizeProcessors before updating
5+
6+
---
7+
...tandardVersionedComponentSynchronizer.java | 61 ++++++++++++-------
8+
1 file changed, 40 insertions(+), 21 deletions(-)
9+
10+
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
11+
index b79ee4d6e8..c3e059171e 100644
12+
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
13+
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
14+
@@ -269,8 +269,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
15+
final ProcessGroup newProcessGroup = addProcessGroup(group, processGroup, options.getComponentIdGenerator(),
16+
additions.getParameterContexts(), additions.getParameterProviders(), group);
17+
additionsBuilder.addProcessGroup(newProcessGroup);
18+
- } catch (final ProcessorInstantiationException pie) {
19+
- throw new RuntimeException(pie);
20+
+ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) {
21+
+ throw new RuntimeException(e);
22+
}
23+
});
24+
25+
@@ -392,8 +392,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
26+
final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() == null ? group : context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId());
27+
synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(),
28+
parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings());
29+
- } catch (final ProcessorInstantiationException pie) {
30+
- throw new RuntimeException(pie);
31+
+ } catch (final ProcessorInstantiationException | FlowSynchronizationException e) {
32+
+ throw new RuntimeException(e);
33+
}
34+
});
35+
36+
@@ -422,7 +422,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
37+
38+
private void synchronize(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts,
39+
final Map<String, ParameterProviderReference> parameterProviderReferences, final ProcessGroup topLevelGroup, final boolean updateGroupSettings)
40+
- throws ProcessorInstantiationException {
41+
+ throws ProcessorInstantiationException, FlowSynchronizationException {
42+
43+
// Some components, such as Processors, may have a Scheduled State of RUNNING in the proposed flow. However, if we
44+
// transition the service into the RUNNING state, and then we need to update a Connection that is connected to it,
45+
@@ -687,7 +687,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
46+
47+
private void synchronizeChildGroups(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, VersionedParameterContext> versionedParameterContexts,
48+
final Map<String, ProcessGroup> childGroupsByVersionedId, final Map<String, ParameterProviderReference> parameterProviderReferences,
49+
- final ProcessGroup topLevelGroup) throws ProcessorInstantiationException {
50+
+ final ProcessGroup topLevelGroup) throws ProcessorInstantiationException, FlowSynchronizationException {
51+
52+
for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) {
53+
final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
54+
@@ -1189,21 +1189,39 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
55+
56+
private void synchronizeProcessors(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, ProcessorNode> processorsByVersionedId,
57+
final ProcessGroup topLevelGroup)
58+
- throws ProcessorInstantiationException {
59+
+ throws ProcessorInstantiationException, FlowSynchronizationException {
60+
61+
- for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) {
62+
- final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
63+
- if (processor == null) {
64+
- final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup);
65+
- LOG.info("Added {} to {}", added, group);
66+
- } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
67+
- updateProcessor(processor, proposedProcessor, topLevelGroup);
68+
- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state,
69+
- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state
70+
- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor)));
71+
- LOG.info("Updated {}", processor);
72+
- } else {
73+
- processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY()));
74+
+ final Set<ProcessorNode> processorsToRestart = new HashSet<>();
75+
+
76+
+ try {
77+
+ for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) {
78+
+ final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
79+
+ if (processor == null) {
80+
+ final ProcessorNode added = addProcessor(group, proposedProcessor, context.getComponentIdGenerator(), topLevelGroup);
81+
+ LOG.info("Added {} to {}", added, group);
82+
+ } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
83+
+ final long processorStopDeadline = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis();
84+
+ try {
85+
+ final boolean stopped = stopOrTerminate(processor, processorStopDeadline, syncOptions);
86+
+ if (stopped && proposedProcessor.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) {
87+
+ processorsToRestart.add(processor);
88+
+ }
89+
+ } catch (final TimeoutException e) {
90+
+ throw new FlowSynchronizationException("Failed to stop processor " + processor + " in preparation for update", e);
91+
+ }
92+
+ updateProcessor(processor, proposedProcessor, topLevelGroup);
93+
+ // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state,
94+
+ // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state
95+
+ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(processor, getPropertyValues(processor)));
96+
+ LOG.info("Updated {}", processor);
97+
+ } else {
98+
+ processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY()));
99+
+ }
100+
+ }
101+
+ } finally {
102+
+ for (final ProcessorNode processor : processorsToRestart) {
103+
+ processor.getProcessGroup().startProcessor(processor, false);
104+
+ notifyScheduledStateChange((ComponentNode) processor, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING);
105+
}
106+
}
107+
}
108+
@@ -1375,7 +1393,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
109+
110+
private ProcessGroup addProcessGroup(final ProcessGroup destination, final VersionedProcessGroup proposed, final ComponentIdGenerator componentIdGenerator,
111+
final Map<String, VersionedParameterContext> versionedParameterContexts,
112+
- final Map<String, ParameterProviderReference> parameterProviderReferences, ProcessGroup topLevelGroup) throws ProcessorInstantiationException {
113+
+ final Map<String, ParameterProviderReference> parameterProviderReferences, ProcessGroup topLevelGroup)
114+
+ throws ProcessorInstantiationException, FlowSynchronizationException {
115+
final String id = componentIdGenerator.generateUuid(proposed.getIdentifier(), proposed.getInstanceIdentifier(), destination.getIdentifier());
116+
final ProcessGroup group = context.getFlowManager().createProcessGroup(id);
117+
group.setVersionedComponentId(proposed.getIdentifier());
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
From 67b404db6005a90b9ef6fce018524b181dae9b98 Mon Sep 17 00:00:00 2001
2+
From: Andrew Kenworthy <andrew.kenworthy@stackable.tech>
3+
Date: Tue, 5 May 2026 17:50:06 +0200
4+
Subject: NIFI-15901 Disable controller services before updating
5+
6+
---
7+
...tandardVersionedComponentSynchronizer.java | 44 ++++++++++++++++---
8+
1 file changed, 37 insertions(+), 7 deletions(-)
9+
10+
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
11+
index c3e059171e..030f7f5a90 100644
12+
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
13+
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
14+
@@ -707,7 +707,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
15+
}
16+
17+
private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, ControllerServiceNode> servicesByVersionedId,
18+
- final ProcessGroup topLevelGroup) {
19+
+ final ProcessGroup topLevelGroup) throws FlowSynchronizationException {
20+
// Controller Services have to be handled a bit differently than other components. This is because Processors and Controller
21+
// Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding
22+
// Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each
23+
@@ -741,17 +741,47 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
24+
updateControllerService(addedService, proposedService, topLevelGroup);
25+
}
26+
27+
- // Update all of the Controller Services to match the VersionedControllerService
28+
+ // Update all Controller Services to match the VersionedControllerService.
29+
+ // Services may still be ENABLED here because not all callers disable them before sync-ing.
30+
+ // We must disable before calling updateControllerService, which calls setProperties
31+
+ // which calls verifyModifiable and throws IllegalStateException on ENABLED services.
32+
for (final Map.Entry<ControllerServiceNode, VersionedControllerService> entry : services.entrySet()) {
33+
final ControllerServiceNode service = entry.getKey();
34+
final VersionedControllerService proposedService = entry.getValue();
35+
36+
if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
37+
- updateControllerService(service, proposedService, topLevelGroup);
38+
- // Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state,
39+
- // so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state
40+
- createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service)));
41+
- LOG.info("Updated {}", service);
42+
+ final long stopTimeout = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis();
43+
+ final Set<ComponentNode> referencesToRestart = new HashSet<>();
44+
+ final Set<ControllerServiceNode> servicesToRestart = new HashSet<>();
45+
+
46+
+ try {
47+
+ try {
48+
+ stopControllerService(service, proposedService, stopTimeout,
49+
+ syncOptions.getComponentStopTimeoutAction(),
50+
+ referencesToRestart, servicesToRestart, syncOptions);
51+
+ } catch (final TimeoutException e) {
52+
+ throw new FlowSynchronizationException("Failed to stop Controller Service " + service + " in preparation for update", e);
53+
+ } catch (final InterruptedException e) {
54+
+ Thread.currentThread().interrupt();
55+
+ throw new FlowSynchronizationException("Interrupted while stopping Controller Service " + service, e);
56+
+ }
57+
+ updateControllerService(service, proposedService, topLevelGroup);
58+
+ createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service)));
59+
+ LOG.info("Updated {}", service);
60+
+ } finally {
61+
+ // Re-enable services and restart components that were stopped for the update,
62+
+ // restoring the controller to its pre-update running state.
63+
+ if (proposedService.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) {
64+
+ // Use the component scheduler (not the provider directly) which has
65+
+ // already been paused, avoiding a race with the enable loop below.
66+
+ context.getComponentScheduler().enableControllerServicesAsync(servicesToRestart);
67+
+ notifyScheduledStateChange(servicesToRestart, syncOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
68+
+ context.getControllerServiceProvider().scheduleReferencingComponents(
69+
+ service, referencesToRestart, context.getComponentScheduler());
70+
+ referencesToRestart.forEach(componentNode ->
71+
+ notifyScheduledStateChange(componentNode, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING));
72+
+ }
73+
+ }
74+
}
75+
}
76+

0 commit comments

Comments
 (0)