Skip to content

Commit 3ae00a3

Browse files
committed
jindo file io use upload part to support two phase commit
1 parent f50733b commit 3ae00a3

File tree

8 files changed

+274
-16
lines changed

8 files changed

+274
-16
lines changed

paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,22 @@ public void write(byte[] b, int off, int len) throws IOException {
8989
if (closed) {
9090
throw new IOException("Stream is closed");
9191
}
92-
buffer.write(b, off, len);
93-
position += len;
94-
if (buffer.size() >= partSizeThreshold()) {
95-
uploadPart();
92+
int remaining = len;
93+
int offset = off;
94+
final long threshold = partSizeThreshold();
95+
96+
while (remaining > 0) {
97+
if (buffer.size() >= threshold) {
98+
uploadPart();
99+
}
100+
int currentSize = buffer.size();
101+
long spaceLong = threshold - currentSize;
102+
int space = spaceLong > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) spaceLong;
103+
int count = Math.min(remaining, space);
104+
buffer.write(b, offset, count);
105+
offset += count;
106+
remaining -= count;
107+
position += count;
96108
}
97109
}
98110

@@ -133,9 +145,11 @@ private void uploadPart() throws IOException {
133145
}
134146

135147
File tempFile = null;
148+
int partNumber = uploadedParts.size() + 1;
136149
try {
137150
byte[] data = buffer.toByteArray();
138151
tempFile = Files.createTempFile("multi-part-" + UUID.randomUUID(), ".tmp").toFile();
152+
tempFile.deleteOnExit();
139153
try (FileOutputStream fos = new FileOutputStream(tempFile)) {
140154
fos.write(data);
141155
fos.flush();
@@ -147,11 +161,7 @@ private void uploadPart() throws IOException {
147161
buffer.reset();
148162
} catch (Exception e) {
149163
throw new IOException(
150-
"Failed to upload part "
151-
+ (uploadedParts.size() + 1)
152-
+ " for upload ID: "
153-
+ uploadId,
154-
e);
164+
"Failed to upload part " + partNumber + " for upload ID: " + uploadId, e);
155165
} finally {
156166
if (tempFile != null && tempFile.exists()) {
157167
if (!tempFile.delete()) {

paimon-common/src/test/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStreamTest.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ void testWriteFlushAndCommit() throws IOException {
6666
.isEqualTo("hello world!".getBytes(StandardCharsets.UTF_8).length);
6767

6868
TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
69-
assertThat(store.getUploadedParts()).hasSize(2);
69+
assertThat(store.getUploadedParts()).hasSize(3);
7070
assertThat(committer.targetFilePath().toString()).isEqualTo(store.getStartedObjectName());
7171

7272
committer.commit(fileIO);
@@ -122,6 +122,40 @@ void testCloseForCommitIdempotent() throws IOException {
122122
first.commit(fileIO);
123123
}
124124

125+
@Test
126+
void testBigWriteSplitByThreshold() throws IOException {
127+
TestMultiPartUploadTwoPhaseOutputStream stream =
128+
new TestMultiPartUploadTwoPhaseOutputStream(store, objectPath, 5);
129+
130+
byte[] data1 = "abc".getBytes(StandardCharsets.UTF_8); // 10 bytes, threshold=5
131+
stream.write(data1);
132+
byte[] data2 = "abcdefghij".getBytes(StandardCharsets.UTF_8); // 10 bytes, threshold=5
133+
stream.write(data2);
134+
135+
assertThat(store.getUploadedParts()).hasSize(2);
136+
assertThat(store.getUploadedParts())
137+
.extracting(TestPart::getPartNumber)
138+
.containsExactly(1, 2);
139+
assertThat(store.getUploadedParts())
140+
.extracting(TestPart::getContent)
141+
.containsExactly("abcab", "cdefg");
142+
assertThat(stream.getPos()).isEqualTo(data1.length + data2.length);
143+
stream.flush();
144+
assertThat(store.getUploadedParts())
145+
.extracting(TestPart::getContent)
146+
.containsExactly("abcab", "cdefg", "hij");
147+
TwoPhaseOutputStream.Committer committer = stream.closeForCommit();
148+
assertThat(store.getUploadedParts()).hasSize(3);
149+
150+
committer.commit(fileIO);
151+
152+
assertThat(store.getCompletedUploadId()).isEqualTo(store.getStartedUploadId());
153+
assertThat(store.getCompletedObjectName()).isEqualTo(store.getStartedObjectName());
154+
assertThat(store.getCompletedParts()).containsExactlyElementsOf(store.getUploadedParts());
155+
assertThat(store.getCompletedBytes()).isEqualTo(stream.getPos());
156+
assertThat(store.getAbortedUploadId()).isNull();
157+
}
158+
125159
/** Fake store implementation for testing. */
126160
private static class FakeMultiPartUploadStore
127161
implements MultiPartUploadStore<TestPart, String> {
@@ -308,9 +342,5 @@ int getPartNumber() {
308342
String getContent() {
309343
return content;
310344
}
311-
312-
long getByteLength() {
313-
return byteLength;
314-
}
315345
}
316346
}

paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ protected JindoHadoopSystem getFileSystem(org.apache.hadoop.fs.Path path) throws
124124
return getFileSystemPair(path).getKey();
125125
}
126126

127-
private Pair<JindoHadoopSystem, String> getFileSystemPair(org.apache.hadoop.fs.Path path)
127+
protected Pair<JindoHadoopSystem, String> getFileSystemPair(org.apache.hadoop.fs.Path path)
128128
throws IOException {
129129
if (fsMap == null) {
130130
synchronized (this) {

paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import org.apache.paimon.catalog.CatalogContext;
2222
import org.apache.paimon.fs.FileIO;
23+
import org.apache.paimon.fs.Path;
24+
import org.apache.paimon.fs.TwoPhaseOutputStream;
2325
import org.apache.paimon.options.Options;
2426
import org.apache.paimon.utils.IOUtils;
2527
import org.apache.paimon.utils.Pair;
@@ -122,6 +124,18 @@ public Options hadoopOptions() {
122124
return hadoopOptions;
123125
}
124126

127+
@Override
128+
public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite)
129+
throws IOException {
130+
if (!overwrite && this.exists(path)) {
131+
throw new IOException("File " + path + " already exists.");
132+
}
133+
org.apache.hadoop.fs.Path hadoopPath = path(path);
134+
Pair<JindoHadoopSystem, String> pair = getFileSystemPair(hadoopPath);
135+
JindoHadoopSystem fs = pair.getKey();
136+
return new JindoTwoPhaseOutputStream(new JindoMultiPartUpload(fs, hadoopPath), hadoopPath);
137+
}
138+
125139
@Override
126140
protected Pair<JindoHadoopSystem, String> createFileSystem(org.apache.hadoop.fs.Path path) {
127141
final String scheme = path.toUri().getScheme();
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.jindo;
20+
21+
import org.apache.paimon.fs.MultiPartUploadStore;
22+
23+
import com.aliyun.jindodata.api.spec.protos.JdoMpuUploadPartReply;
24+
import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
25+
import com.aliyun.jindodata.api.spec.protos.JdoObjectPartList;
26+
import com.aliyun.jindodata.common.JindoHadoopSystem;
27+
import com.aliyun.jindodata.store.JindoMpuStore;
28+
import org.apache.hadoop.fs.Path;
29+
30+
import java.io.File;
31+
import java.io.FileInputStream;
32+
import java.io.IOException;
33+
import java.nio.ByteBuffer;
34+
import java.nio.channels.FileChannel;
35+
import java.util.List;
36+
37+
/** Provides the multipart upload by Jindo. */
38+
public class JindoMultiPartUpload implements MultiPartUploadStore<JdoObjectPart, String> {
39+
40+
private final JindoHadoopSystem fs;
41+
private final JindoMpuStore mpuStore;
42+
43+
public JindoMultiPartUpload(JindoHadoopSystem fs, Path filePath) {
44+
this.fs = fs;
45+
this.mpuStore = fs.getMpuStore(filePath);
46+
}
47+
48+
@Override
49+
public Path workingDirectory() {
50+
return fs.getWorkingDirectory();
51+
}
52+
53+
@Override
54+
public String startMultiPartUpload(String objectName) throws IOException {
55+
return mpuStore.initMultiPartUpload(new Path(objectName));
56+
}
57+
58+
@Override
59+
public String completeMultipartUpload(
60+
String objectName,
61+
String uploadId,
62+
List<JdoObjectPart> partETags,
63+
long numBytesInParts) {
64+
try {
65+
JdoObjectPartList partList =
66+
new com.aliyun.jindodata.api.spec.protos.JdoObjectPartList();
67+
partList.setParts(partETags.toArray(new JdoObjectPart[0]));
68+
mpuStore.commitMultiPartUpload(new Path(objectName), uploadId, partList);
69+
return uploadId;
70+
} catch (Exception e) {
71+
throw new RuntimeException("Failed to complete multipart upload for: " + objectName, e);
72+
}
73+
}
74+
75+
@Override
76+
public JdoObjectPart uploadPart(
77+
String objectName, String uploadId, int partNumber, File file, long byteLength)
78+
throws IOException {
79+
try {
80+
ByteBuffer buffer;
81+
try (FileInputStream fis = new FileInputStream(file);
82+
FileChannel channel = fis.getChannel()) {
83+
buffer = ByteBuffer.allocate((int) byteLength);
84+
channel.read(buffer);
85+
buffer.flip();
86+
}
87+
88+
JdoMpuUploadPartReply result =
89+
mpuStore.uploadPart(new Path(objectName), uploadId, partNumber, buffer);
90+
return result.getPartInfo();
91+
} catch (Exception e) {
92+
throw new IOException("Failed to upload part " + partNumber + " for: " + objectName, e);
93+
}
94+
}
95+
96+
@Override
97+
public void abortMultipartUpload(String objectName, String uploadId) {
98+
try {
99+
mpuStore.abortMultipartUpload(new Path(objectName), uploadId);
100+
} catch (Exception e) {
101+
throw new RuntimeException("Failed to abort multipart upload for: " + objectName, e);
102+
}
103+
}
104+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.jindo;
20+
21+
import org.apache.paimon.fs.BaseMultiPartUploadCommitter;
22+
import org.apache.paimon.fs.FileIO;
23+
import org.apache.paimon.fs.MultiPartUploadStore;
24+
import org.apache.paimon.fs.Path;
25+
import org.apache.paimon.utils.Pair;
26+
27+
import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
28+
import com.aliyun.jindodata.common.JindoHadoopSystem;
29+
30+
import java.io.IOException;
31+
import java.util.List;
32+
33+
/** Jindo implementation of MultiPartUploadCommitter. */
34+
public class JindoMultiPartUploadCommitter
35+
extends BaseMultiPartUploadCommitter<JdoObjectPart, String> {
36+
public JindoMultiPartUploadCommitter(
37+
String uploadId, List<JdoObjectPart> uploadedParts, String objectName, long position) {
38+
super(uploadId, uploadedParts, objectName, position);
39+
}
40+
41+
@Override
42+
protected MultiPartUploadStore<JdoObjectPart, String> multiPartUploadStore(
43+
FileIO fileIO, Path targetPath) throws IOException {
44+
JindoFileIO jindoFileIO = (JindoFileIO) fileIO;
45+
org.apache.hadoop.fs.Path hadoopPath = jindoFileIO.path(targetPath);
46+
Pair<JindoHadoopSystem, String> pair = jindoFileIO.getFileSystemPair(hadoopPath);
47+
JindoHadoopSystem fs = pair.getKey();
48+
return new JindoMultiPartUpload(fs, hadoopPath);
49+
}
50+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.jindo;
20+
21+
import org.apache.paimon.fs.MultiPartUploadStore;
22+
import org.apache.paimon.fs.MultiPartUploadTwoPhaseOutputStream;
23+
24+
import com.aliyun.jindodata.api.spec.protos.JdoObjectPart;
25+
26+
import java.io.IOException;
27+
import java.util.List;
28+
29+
/** Jindo implementation of TwoPhaseOutputStream using multipart upload. */
30+
public class JindoTwoPhaseOutputStream
31+
extends MultiPartUploadTwoPhaseOutputStream<JdoObjectPart, String> {
32+
33+
public JindoTwoPhaseOutputStream(
34+
MultiPartUploadStore<JdoObjectPart, String> multiPartUploadStore,
35+
org.apache.hadoop.fs.Path hadoopPath)
36+
throws IOException {
37+
super(multiPartUploadStore, hadoopPath);
38+
}
39+
40+
@Override
41+
public long partSizeThreshold() {
42+
return 8L << 20;
43+
}
44+
45+
@Override
46+
public Committer committer(
47+
String uploadId, List<JdoObjectPart> uploadedParts, String objectName, long position) {
48+
return new JindoMultiPartUploadCommitter(uploadId, uploadedParts, objectName, position);
49+
}
50+
}

paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OssTwoPhaseOutputStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public OssTwoPhaseOutputStream(
4040

4141
@Override
4242
public long partSizeThreshold() {
43-
return 10L << 20;
43+
return 8L << 20;
4444
}
4545

4646
@Override

0 commit comments

Comments
 (0)