Skip to content

Commit a9ffd30

Browse files
authored
[core] Support writing blob file for rolling. (#6340)
1 parent 534c72e commit a9ffd30

File tree

15 files changed

+977
-161
lines changed

15 files changed

+977
-161
lines changed

paimon-api/src/main/java/org/apache/paimon/types/BlobType.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919
package org.apache.paimon.types;
2020

2121
import org.apache.paimon.annotation.Public;
22+
import org.apache.paimon.utils.Pair;
23+
24+
import java.util.ArrayList;
25+
import java.util.List;
2226

2327
/**
2428
* Data type of binary large object.
@@ -61,4 +65,21 @@ public String asSQLString() {
6165
public <R> R accept(DataTypeVisitor<R> visitor) {
6266
return visitor.visit(this);
6367
}
68+
69+
public static Pair<RowType, RowType> splitBlob(RowType rowType) {
70+
List<DataField> fields = rowType.getFields();
71+
List<DataField> normalFields = new ArrayList<>();
72+
List<DataField> blobFields = new ArrayList<>();
73+
74+
for (DataField field : fields) {
75+
DataTypeRoot type = field.type().getTypeRoot();
76+
if (type == DataTypeRoot.BLOB) {
77+
blobFields.add(field);
78+
} else {
79+
normalFields.add(field);
80+
}
81+
}
82+
83+
return Pair.of(new RowType(normalFields), new RowType(blobFields));
84+
}
6485
}

paimon-api/src/main/java/org/apache/paimon/types/RowType.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -286,13 +286,6 @@ public void collectFieldIds(Set<Integer> fieldIds) {
286286
}
287287
}
288288

289-
public RowType appendDataField(String name, DataType type) {
290-
List<DataField> newFields = new ArrayList<>(fields);
291-
int newId = currentHighestFieldId(fields) + 1;
292-
newFields.add(new DataField(newId, name, type));
293-
return new RowType(newFields);
294-
}
295-
296289
public RowType project(int[] mapping) {
297290
List<DataField> fields = getFields();
298291
return new RowType(
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.append;
20+
21+
import org.apache.paimon.data.InternalRow;
22+
import org.apache.paimon.io.FileWriter;
23+
import org.apache.paimon.utils.ProjectedRow;
24+
25+
import java.io.IOException;
26+
27+
/**
28+
* A delegating {@link FileWriter} which applies a field projection to each incoming {@link
29+
* InternalRow} before forwarding it to the underlying writer.
30+
*
31+
* <p>This is useful when the physical file schema is a subset of the logical write schema. The
32+
* projection is evaluated via {@link ProjectedRow} to avoid object allocations.
33+
*/
34+
public class PeojectedFileWriter<T extends FileWriter<InternalRow, R>, R>
35+
implements FileWriter<InternalRow, R> {
36+
37+
private final T writer;
38+
private final ProjectedRow projectedRow;
39+
40+
public PeojectedFileWriter(T writer, int[] projection) {
41+
this.writer = writer;
42+
this.projectedRow = ProjectedRow.from(projection);
43+
}
44+
45+
@Override
46+
public void write(InternalRow record) throws IOException {
47+
projectedRow.replaceRow(record);
48+
writer.write(projectedRow);
49+
}
50+
51+
@Override
52+
public long recordCount() {
53+
return writer.recordCount();
54+
}
55+
56+
@Override
57+
public void abort() {
58+
writer.abort();
59+
}
60+
61+
@Override
62+
public R result() throws IOException {
63+
return writer.result();
64+
}
65+
66+
@Override
67+
public void close() throws IOException {
68+
writer.close();
69+
}
70+
71+
public T writer() {
72+
return writer;
73+
}
74+
}

0 commit comments

Comments
 (0)