|
22 | 22 | import io.trino.plugin.iceberg.MetricsWrapper; |
23 | 23 | import io.trino.plugin.iceberg.PartitionData; |
24 | 24 | import io.trino.spi.Page; |
25 | | -import io.trino.spi.PageBuilder; |
26 | 25 | import io.trino.spi.block.Block; |
| 26 | +import io.trino.spi.block.LongArrayBlock; |
27 | 27 | import io.trino.spi.block.RunLengthEncodedBlock; |
28 | 28 | import io.trino.spi.connector.ConnectorSession; |
29 | 29 | import org.apache.iceberg.FileContent; |
|
38 | 38 |
|
39 | 39 | import static io.airlift.slice.Slices.utf8Slice; |
40 | 40 | import static io.trino.spi.predicate.Utils.nativeValueToBlock; |
41 | | -import static io.trino.spi.type.BigintType.BIGINT; |
42 | 41 | import static io.trino.spi.type.VarcharType.VARCHAR; |
43 | 42 | import static java.util.Objects.requireNonNull; |
44 | 43 | import static java.util.UUID.randomUUID; |
@@ -103,26 +102,69 @@ public void abort() |
103 | 102 |
|
104 | 103 | private void writeDeletes(ImmutableLongBitmapDataProvider rowsToDelete) |
105 | 104 | { |
106 | | - PageBuilder pageBuilder = new PageBuilder(List.of(BIGINT)); |
107 | | - |
| 105 | + PositionsList deletedPositions = new PositionsList(4 * 1024); |
108 | 106 | rowsToDelete.forEach(rowPosition -> { |
109 | | - pageBuilder.declarePosition(); |
110 | | - BIGINT.writeLong(pageBuilder.getBlockBuilder(0), rowPosition); |
111 | | - if (pageBuilder.isFull()) { |
112 | | - writePage(pageBuilder.build()); |
113 | | - pageBuilder.reset(); |
| 107 | + deletedPositions.add(rowPosition); |
| 108 | + if (deletedPositions.isFull()) { |
| 109 | + writePage(deletedPositions); |
| 110 | + deletedPositions.reset(); |
114 | 111 | } |
115 | 112 | }); |
116 | 113 |
|
117 | | - if (!pageBuilder.isEmpty()) { |
118 | | - writePage(pageBuilder.build()); |
| 114 | + if (!deletedPositions.isEmpty()) { |
| 115 | + writePage(deletedPositions); |
119 | 116 | } |
120 | 117 | } |
121 | 118 |
|
122 | | - private void writePage(Page page) |
| 119 | + private void writePage(PositionsList deletedPositions) |
123 | 120 | { |
124 | 121 | writer.appendRows(new Page( |
125 | | - RunLengthEncodedBlock.create(dataFilePathBlock, page.getPositionCount()), |
126 | | - page.getBlock(0))); |
| 122 | + deletedPositions.size(), |
| 123 | + RunLengthEncodedBlock.create(dataFilePathBlock, deletedPositions.size()), |
| 124 | + new LongArrayBlock(deletedPositions.size(), Optional.empty(), deletedPositions.elements()))); |
| 125 | + } |
| 126 | + |
| 127 | + // Wrapper around a long[] to provide an effectively final variable for lambda use |
| 128 | + private static class PositionsList |
| 129 | + { |
| 130 | + private long[] positions; |
| 131 | + private int size; |
| 132 | + |
| 133 | + PositionsList(int initialCapacity) |
| 134 | + { |
| 135 | + this.positions = new long[initialCapacity]; |
| 136 | + this.size = 0; |
| 137 | + } |
| 138 | + |
| 139 | + void add(long position) |
| 140 | + { |
| 141 | + positions[size++] = position; |
| 142 | + } |
| 143 | + |
| 144 | + boolean isEmpty() |
| 145 | + { |
| 146 | + return size == 0; |
| 147 | + } |
| 148 | + |
| 149 | + boolean isFull() |
| 150 | + { |
| 151 | + return size == positions.length; |
| 152 | + } |
| 153 | + |
| 154 | + long[] elements() |
| 155 | + { |
| 156 | + return positions; |
| 157 | + } |
| 158 | + |
| 159 | + int size() |
| 160 | + { |
| 161 | + return size; |
| 162 | + } |
| 163 | + |
| 164 | + void reset() |
| 165 | + { |
| 166 | + size = 0; |
| 167 | + positions = new long[positions.length]; |
| 168 | + } |
127 | 169 | } |
128 | 170 | } |
0 commit comments