Skip to content

Commit e89b6e6

Browse files
committed
update partSizeThreshold and add doc
1 parent ee388e3 commit e89b6e6

File tree

6 files changed

+26
-37
lines changed

6 files changed

+26
-37
lines changed

paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,12 @@ public MultiPartUploadTwoPhaseOutputStream(
5757
this.position = 0;
5858
}
5959

60-
public abstract int partSizeThreshold();
60+
// OSS limit: 100KB ~ 5GB
61+
// S3 limit: 5MiB ~ 5GiB
62+
// Considering memory usage, and referencing Flink's setting of 10MiB.
63+
public int partSizeThreshold() {
64+
return 10 << 20;
65+
}
6166

6267
public abstract Committer committer(
6368
String uploadId, List<T> uploadedParts, String objectName, long position);
@@ -102,6 +107,10 @@ public void write(byte[] b, int off, int len) throws IOException {
102107
offset += count;
103108
remaining -= count;
104109
position += count;
110+
// consume buffer if it is full
111+
if (buffer.size() >= partSizeThreshold()) {
112+
uploadPart();
113+
}
105114
}
106115
}
107116

@@ -144,15 +153,18 @@ private void uploadPart() throws IOException {
144153
File tempFile = null;
145154
int partNumber = uploadedParts.size() + 1;
146155
try {
147-
byte[] data = buffer.toByteArray();
148156
tempFile = Files.createTempFile("multi-part-" + UUID.randomUUID(), ".tmp").toFile();
149157
try (FileOutputStream fos = new FileOutputStream(tempFile)) {
150-
fos.write(data);
158+
buffer.writeTo(fos);
151159
fos.flush();
152160
}
153161
T partETag =
154162
multiPartUploadStore.uploadPart(
155-
objectName, uploadId, uploadedParts.size() + 1, tempFile, data.length);
163+
objectName,
164+
uploadId,
165+
uploadedParts.size() + 1,
166+
tempFile,
167+
checkedDownCast(tempFile.length()));
156168
uploadedParts.add(partETag);
157169
buffer.reset();
158170
} catch (Exception e) {
@@ -166,4 +178,13 @@ private void uploadPart() throws IOException {
166178
}
167179
}
168180
}
181+
182+
private static int checkedDownCast(long value) {
183+
int downCast = (int) value;
184+
if (downCast != value) {
185+
throw new IllegalArgumentException(
186+
"Cannot downcast long value " + value + " to integer.");
187+
}
188+
return downCast;
189+
}
169190
}

paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@ void testWriteFlushAndCommit() throws IOException {
5555
new TestMultiPartUploadTwoPhaseOutputStream(store, objectPath, 5);
5656

5757
stream.write("hello".getBytes(StandardCharsets.UTF_8));
58-
assertThat(store.getUploadedParts()).hasSize(0);
59-
stream.flush();
6058
assertThat(store.getUploadedParts()).hasSize(1);
6159
assertThat(store.getUploadedParts()).extracting(TestPart::getPartNumber).containsExactly(1);
6260
assertThat(store.getUploadedParts())

paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoTwoPhaseOutputStream.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,6 @@ public JindoTwoPhaseOutputStream(
3737
super(multiPartUploadStore, hadoopPath);
3838
}
3939

40-
@Override
41-
public int partSizeThreshold() {
42-
return 8 << 20;
43-
}
44-
4540
@Override
4641
public Committer committer(
4742
String uploadId, List<JdoObjectPart> uploadedParts, String objectName, long position) {

paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,6 @@ public OssTwoPhaseOutputStream(
3838
super(multiPartUploadStore, hadoopPath);
3939
}
4040

41-
@Override
42-
public int partSizeThreshold() {
43-
return 8 << 20;
44-
}
45-
4641
@Override
4742
public Committer committer(
4843
String uploadId, List<PartETag> uploadedParts, String objectName, long position) {

paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3MultiPartUpload.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,7 @@ public PartETag uploadPart(
8282
throws IOException {
8383
final UploadPartRequest uploadRequest =
8484
s3accessHelper.newUploadPartRequest(
85-
objectName,
86-
uploadId,
87-
partNumber,
88-
checkedDownCast(byteLength),
89-
null,
90-
file,
91-
0L);
85+
objectName, uploadId, partNumber, byteLength, null, file, 0L);
9286
return s3accessHelper.uploadPart(uploadRequest).getPartETag();
9387
}
9488

@@ -108,13 +102,4 @@ private static final class InternalWriteOperationHelper extends WriteOperationHe
108102
super(owner, conf, statisticsContext, auditSpanSource, auditSpan);
109103
}
110104
}
111-
112-
private static int checkedDownCast(long value) {
113-
int downCast = (int) value;
114-
if (downCast != value) {
115-
throw new IllegalArgumentException(
116-
"Cannot downcast long value " + value + " to integer.");
117-
}
118-
return downCast;
119-
}
120105
}

paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/S3TwoPhaseOutputStream.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,6 @@ public S3TwoPhaseOutputStream(
3838
super(multiPartUploadStore, hadoopPath);
3939
}
4040

41-
@Override
42-
public int partSizeThreshold() {
43-
return 5 << 20;
44-
}
45-
4641
@Override
4742
public Committer committer(
4843
String uploadId, List<PartETag> uploadedParts, String objectName, long position) {

0 commit comments

Comments
 (0)