Skip to content

Commit da97bfe

Browse files
committed
[core] Skip processed sequence group fields to improve performance of PartialUpdateMergeFunction
1 parent a9ffd30 commit da97bfe

File tree

1 file changed

+34
-24
lines changed

1 file changed

+34
-24
lines changed

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ private void updateWithSequenceGroup(KeyValue kv) {
199199
Iterator<WrapperWithFieldIndex<FieldAggregator>> aggIter = fieldAggregators.iterator();
200200
WrapperWithFieldIndex<FieldAggregator> curAgg = aggIter.hasNext() ? aggIter.next() : null;
201201

202-
boolean[] isEmptySequenceGroup = new boolean[getters.length];
202+
boolean[] isProcessedSequenceField = new boolean[getters.length];
203203
for (int i = 0; i < getters.length; i++) {
204204
FieldsComparator seqComparator = null;
205205
if (curComparator != null && curComparator.fieldIndex == i) {
@@ -214,16 +214,18 @@ private void updateWithSequenceGroup(KeyValue kv) {
214214
}
215215

216216
Object accumulator = row.getField(i);
217-
if (seqComparator == null) {
218-
Object field = getters[i].getFieldOrNull(kv.value());
219-
if (aggregator != null) {
220-
row.setField(i, aggregator.agg(accumulator, field));
221-
} else if (field != null) {
222-
row.setField(i, field);
217+
if (seqComparator != null) {
218+
// Skip if this field has already been processed as part of a sequence group
219+
if (isProcessedSequenceField[i]) {
220+
continue;
223221
}
224-
} else {
225-
if (isEmptySequenceGroup(kv, seqComparator, isEmptySequenceGroup)) {
222+
223+
if (isEmptySequenceGroup(kv, seqComparator)) {
226224
// skip null sequence group
225+
// Mark all fields in this sequence group as processed
226+
for (int fieldIndex : seqComparator.compareFields()) {
227+
isProcessedSequenceField[fieldIndex] = true;
228+
}
227229
continue;
228230
}
229231

@@ -237,6 +239,8 @@ private void updateWithSequenceGroup(KeyValue kv) {
237239
for (int fieldIndex : seqComparator.compareFields()) {
238240
row.setField(
239241
fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value()));
242+
// Mark all fields in this sequence group as processed
243+
isProcessedSequenceField[fieldIndex] = true;
240244
}
241245
continue;
242246
}
@@ -245,29 +249,24 @@ private void updateWithSequenceGroup(KeyValue kv) {
245249
} else if (aggregator != null) {
246250
row.setField(i, aggregator.aggReversed(accumulator, field));
247251
}
252+
} else {
253+
Object field = getters[i].getFieldOrNull(kv.value());
254+
if (aggregator != null) {
255+
row.setField(i, aggregator.agg(accumulator, field));
256+
} else if (field != null) {
257+
row.setField(i, field);
258+
}
248259
}
249260
}
250261
}
251262

252-
private boolean isEmptySequenceGroup(
253-
KeyValue kv, FieldsComparator comparator, boolean[] isEmptySequenceGroup) {
254-
255-
// If any flag of the sequence fields is set, it means the sequence group is empty.
256-
if (isEmptySequenceGroup[comparator.compareFields()[0]]) {
257-
return true;
258-
}
259-
263+
private boolean isEmptySequenceGroup(KeyValue kv, FieldsComparator comparator) {
260264
for (int fieldIndex : comparator.compareFields()) {
261265
if (getters[fieldIndex].getFieldOrNull(kv.value()) != null) {
262266
return false;
263267
}
264268
}
265269

266-
// Set the flag of all the sequence fields of the sequence group.
267-
for (int fieldIndex : comparator.compareFields()) {
268-
isEmptySequenceGroup[fieldIndex] = true;
269-
}
270-
271270
return true;
272271
}
273272

@@ -280,7 +279,7 @@ private void retractWithSequenceGroup(KeyValue kv) {
280279
Iterator<WrapperWithFieldIndex<FieldAggregator>> aggIter = fieldAggregators.iterator();
281280
WrapperWithFieldIndex<FieldAggregator> curAgg = aggIter.hasNext() ? aggIter.next() : null;
282281

283-
boolean[] isEmptySequenceGroup = new boolean[getters.length];
282+
boolean[] isProcessedSequenceField = new boolean[getters.length];
284283
for (int i = 0; i < getters.length; i++) {
285284
FieldsComparator seqComparator = null;
286285
if (curComparator != null && curComparator.fieldIndex == i) {
@@ -295,8 +294,17 @@ private void retractWithSequenceGroup(KeyValue kv) {
295294
}
296295

297296
if (seqComparator != null) {
298-
if (isEmptySequenceGroup(kv, seqComparator, isEmptySequenceGroup)) {
297+
// Skip if this field has already been processed as part of a sequence group
298+
if (isProcessedSequenceField[i]) {
299+
continue;
300+
}
301+
302+
if (isEmptySequenceGroup(kv, seqComparator)) {
299303
// skip null sequence group
304+
// Mark all fields in this sequence group as processed
305+
for (int fieldIndex : seqComparator.compareFields()) {
306+
isProcessedSequenceField[fieldIndex] = true;
307+
}
300308
continue;
301309
}
302310

@@ -319,6 +327,8 @@ private void retractWithSequenceGroup(KeyValue kv) {
319327
updatedSequenceFields.add(field);
320328
}
321329
}
330+
// Mark all fields in this sequence group as processed
331+
isProcessedSequenceField[field] = true;
322332
}
323333
} else {
324334
// retract normal field

0 commit comments

Comments
 (0)