diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index 3b6a89d6640c..2093328b9520 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -199,7 +199,7 @@ private void updateWithSequenceGroup(KeyValue kv) { Iterator> aggIter = fieldAggregators.iterator(); WrapperWithFieldIndex curAgg = aggIter.hasNext() ? aggIter.next() : null; - boolean[] isEmptySequenceGroup = new boolean[getters.length]; + boolean[] isProcessedSequenceField = new boolean[getters.length]; for (int i = 0; i < getters.length; i++) { FieldsComparator seqComparator = null; if (curComparator != null && curComparator.fieldIndex == i) { @@ -214,15 +214,13 @@ private void updateWithSequenceGroup(KeyValue kv) { } Object accumulator = row.getField(i); - if (seqComparator == null) { - Object field = getters[i].getFieldOrNull(kv.value()); - if (aggregator != null) { - row.setField(i, aggregator.agg(accumulator, field)); - } else if (field != null) { - row.setField(i, field); + if (seqComparator != null) { + // Skip if this field has already been processed as part of a sequence group + if (isProcessedSequenceField[i]) { + continue; } - } else { - if (isEmptySequenceGroup(kv, seqComparator, isEmptySequenceGroup)) { + + if (isEmptySequenceGroup(kv, seqComparator, isProcessedSequenceField)) { // skip null sequence group continue; } @@ -237,6 +235,8 @@ private void updateWithSequenceGroup(KeyValue kv) { for (int fieldIndex : seqComparator.compareFields()) { row.setField( fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value())); + // Mark these sequence fields as processed + isProcessedSequenceField[fieldIndex] = true; } continue; } @@ -245,27 +245,28 @@ private void updateWithSequenceGroup(KeyValue kv) { } else if (aggregator != null) { row.setField(i, aggregator.aggReversed(accumulator, field)); } + } else { + Object field = getters[i].getFieldOrNull(kv.value()); + if (aggregator != null) { + row.setField(i, aggregator.agg(accumulator, field)); + } else if (field != null) { + row.setField(i, field); + } } } } private boolean isEmptySequenceGroup( - KeyValue kv, FieldsComparator comparator, boolean[] isEmptySequenceGroup) { - - // If any flag of the sequence fields is set, it means the sequence group is empty. - if (isEmptySequenceGroup[comparator.compareFields()[0]]) { - return true; - } - + KeyValue kv, FieldsComparator comparator, boolean[] isProcessedSequenceField) { for (int fieldIndex : comparator.compareFields()) { if (getters[fieldIndex].getFieldOrNull(kv.value()) != null) { return false; } } - // Set the flag of all the sequence fields of the sequence group. + // Mark these sequence fields as processed for (int fieldIndex : comparator.compareFields()) { - isEmptySequenceGroup[fieldIndex] = true; + isProcessedSequenceField[fieldIndex] = true; } return true; @@ -280,7 +281,7 @@ private void retractWithSequenceGroup(KeyValue kv) { Iterator> aggIter = fieldAggregators.iterator(); WrapperWithFieldIndex curAgg = aggIter.hasNext() ? aggIter.next() : null; - boolean[] isEmptySequenceGroup = new boolean[getters.length]; + boolean[] isProcessedSequenceField = new boolean[getters.length]; for (int i = 0; i < getters.length; i++) { FieldsComparator seqComparator = null; if (curComparator != null && curComparator.fieldIndex == i) { @@ -295,7 +296,12 @@ private void retractWithSequenceGroup(KeyValue kv) { } if (seqComparator != null) { - if (isEmptySequenceGroup(kv, seqComparator, isEmptySequenceGroup)) { + // Skip if this field has already been processed as part of a sequence group + if (isProcessedSequenceField[i]) { + continue; + } + + if (isEmptySequenceGroup(kv, seqComparator, isProcessedSequenceField)) { // skip null sequence group continue; } @@ -319,6 +325,8 @@ private void retractWithSequenceGroup(KeyValue kv) { updatedSequenceFields.add(field); } } + // Mark these sequence fields as processed + isProcessedSequenceField[field] = true; } } else { // retract normal field