Skip to content

Commit 7b25800

Browse files
NIFI-15904 Fixed Lineage Start Index in Session.create() (apache#11205)
- Set explicit Lineage Start Index using FlowFile ID - Set explicit Entry Date and Lineage Start Date on new FlowFiles
1 parent c9fd0be commit 7b25800

2 files changed

Lines changed: 28 additions & 1 deletion

File tree

nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2069,8 +2069,13 @@ public FlowFile create() {
20692069
attrs.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
20702070
attrs.put(CoreAttributes.UUID.key(), uuid);
20712071

2072-
final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
2072+
final long entryDate = System.currentTimeMillis();
2073+
final long id = context.getNextFlowFileSequence();
2074+
final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(id)
20732075
.addAttributes(attrs)
2076+
// Set Lineage Start to Entry Date and use Identifier as Lineage Start Index for unambiguous ordering
2077+
.entryDate(entryDate)
2078+
.lineageStart(entryDate, id)
20742079
.build();
20752080
final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null);
20762081
record.setWorking(fFile, attrs, false);

nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import java.nio.file.Path;
4646

4747
import static org.junit.jupiter.api.Assertions.assertEquals;
48+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
49+
import static org.junit.jupiter.api.Assertions.assertNotNull;
4850
import static org.mockito.ArgumentMatchers.any;
4951
import static org.mockito.ArgumentMatchers.anyLong;
5052
import static org.mockito.ArgumentMatchers.anyString;
@@ -191,6 +193,26 @@ void testRecordGaugeSessionCommitted() {
191193
assertEquals(GAUGE_VALUE, gaugeRecord.value());
192194
}
193195

196+
@Test
197+
void testCreateLineage() {
198+
final long firstFlowFileId = 1;
199+
final long secondFlowFileId = 2;
200+
when(repositoryContext.getNextFlowFileSequence()).thenReturn(firstFlowFileId, secondFlowFileId);
201+
202+
final FlowFile firstFlowFile = session.create();
203+
204+
assertNotNull(firstFlowFile);
205+
assertNotEquals(0, firstFlowFile.getLineageStartDate());
206+
assertEquals(firstFlowFile.getEntryDate(), firstFlowFile.getLineageStartDate());
207+
assertEquals(firstFlowFileId, firstFlowFile.getId());
208+
assertEquals(firstFlowFileId, firstFlowFile.getLineageStartIndex());
209+
210+
final FlowFile secondFlowFile = session.create();
211+
assertNotNull(secondFlowFile);
212+
assertEquals(secondFlowFileId, secondFlowFile.getId());
213+
assertEquals(secondFlowFileId, secondFlowFile.getLineageStartIndex());
214+
}
215+
194216
private void assertFlowFileEventMatched(final long bytesRead, final long bytesWritten) throws IOException {
195217
verify(flowFileEventRepository).updateRepository(flowFileEventCaptor.capture(), anyString());
196218
final FlowFileEvent flowFileEvent = flowFileEventCaptor.getValue();

0 commit comments

Comments
 (0)