Skip to content

Commit 6188125

Browse files
committed
GH-1041: Implement snappy compression
1 parent 39b0593 commit 6188125

File tree

8 files changed

+112
-2
lines changed

8 files changed

+112
-2
lines changed

compression/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,10 @@ under the License.
5757
<artifactId>zstd-jni</artifactId>
5858
<version>1.5.7-6</version>
5959
</dependency>
60+
<dependency>
61+
<groupId>org.xerial.snappy</groupId>
62+
<artifactId>snappy-java</artifactId>
63+
<version>1.1.10.7</version>
64+
</dependency>
6065
</dependencies>
6166
</project>

compression/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
requires org.apache.arrow.memory.core;
2525
requires org.apache.arrow.vector;
2626
requires org.apache.commons.compress;
27+
requires org.xerial.snappy;
2728

2829
// Also defined under META-INF/services to support non-modular applications
2930
provides CompressionCodec.Factory with

compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public CompressionCodec createCodec(CompressionUtil.CodecType codecType) {
3333
switch (codecType) {
3434
case LZ4_FRAME:
3535
return new Lz4CompressionCodec();
36+
case SNAPPY:
37+
return new SnappyCompressionCodec();
3638
case ZSTD:
3739
return new ZstdCompressionCodec();
3840
default:
@@ -45,6 +47,8 @@ public CompressionCodec createCodec(CompressionUtil.CodecType codecType, int com
4547
switch (codecType) {
4648
case LZ4_FRAME:
4749
return new Lz4CompressionCodec();
50+
case SNAPPY:
51+
return new SnappyCompressionCodec();
4852
case ZSTD:
4953
return new ZstdCompressionCodec(compressionLevel);
5054
default:
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.compression;
18+
19+
import org.apache.arrow.memory.ArrowBuf;
20+
import org.apache.arrow.memory.BufferAllocator;
21+
import org.apache.arrow.util.Preconditions;
22+
import org.apache.arrow.vector.compression.AbstractCompressionCodec;
23+
import org.apache.arrow.vector.compression.CompressionUtil;
24+
import org.xerial.snappy.Snappy;
25+
26+
/** Compression codec for the Snappy algorithm. */
27+
public class SnappyCompressionCodec extends AbstractCompressionCodec {
28+
29+
@Override
30+
protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
31+
Preconditions.checkArgument(
32+
uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
33+
"The uncompressed buffer size exceeds the integer limit %s.",
34+
Integer.MAX_VALUE);
35+
36+
byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
37+
uncompressedBuffer.getBytes(/* index= */ 0, inBytes);
38+
39+
final byte[] outBytes;
40+
try {
41+
outBytes = Snappy.compress(inBytes);
42+
} catch (Exception e) {
43+
throw new RuntimeException("Error compressing with Snappy", e);
44+
}
45+
46+
ArrowBuf compressedBuffer =
47+
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
48+
compressedBuffer.setBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes);
49+
compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
50+
return compressedBuffer;
51+
}
52+
53+
@Override
54+
protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
55+
Preconditions.checkArgument(
56+
compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
57+
"The compressed buffer size exceeds the integer limit %s",
58+
Integer.MAX_VALUE);
59+
60+
long decompressedLength = readUncompressedLength(compressedBuffer);
61+
62+
byte[] inBytes =
63+
new byte
64+
[(int) (compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)];
65+
compressedBuffer.getBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, inBytes);
66+
67+
final byte[] outBytes;
68+
try {
69+
outBytes = Snappy.uncompress(inBytes);
70+
} catch (Exception e) {
71+
throw new RuntimeException("Error decompressing with Snappy", e);
72+
}
73+
74+
if (outBytes.length != decompressedLength) {
75+
throw new RuntimeException(
76+
"Expected != actual decompressed length: "
77+
+ decompressedLength
78+
+ " != "
79+
+ outBytes.length);
80+
}
81+
82+
ArrowBuf decompressedBuffer = allocator.buffer(decompressedLength);
83+
decompressedBuffer.setBytes(/* index= */ 0, outBytes);
84+
decompressedBuffer.writerIndex(decompressedLength);
85+
return decompressedBuffer;
86+
}
87+
88+
@Override
89+
public CompressionUtil.CodecType getCodecType() {
90+
return CompressionUtil.CodecType.SNAPPY;
91+
}
92+
}
93+

compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ static Collection<Arguments> codecs() {
8888
CompressionCodec lz4Codec = new Lz4CompressionCodec();
8989
params.add(Arguments.arguments(len, lz4Codec));
9090

91+
CompressionCodec snappyCodec = new SnappyCompressionCodec();
92+
params.add(Arguments.arguments(len, snappyCodec));
93+
9194
CompressionCodec zstdCodec = new ZstdCompressionCodec();
9295
params.add(Arguments.arguments(len, zstdCodec));
9396

format/src/main/java/org/apache/arrow/flatbuf/CompressionType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ public final class CompressionType {
2121
private CompressionType() { }
2222
public static final byte LZ4_FRAME = 0;
2323
public static final byte ZSTD = 1;
24+
public static final byte SNAPPY = 2;
2425

25-
public static final String[] names = { "LZ4_FRAME", "ZSTD", };
26+
public static final String[] names = { "LZ4_FRAME", "ZSTD", "SNAPPY", };
2627

2728
public static String name(int e) { return names[e]; }
2829
}

vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ public enum CodecType {
3333

3434
LZ4_FRAME(org.apache.arrow.flatbuf.CompressionType.LZ4_FRAME),
3535

36-
ZSTD(org.apache.arrow.flatbuf.CompressionType.ZSTD);
36+
ZSTD(org.apache.arrow.flatbuf.CompressionType.ZSTD),
37+
38+
SNAPPY(org.apache.arrow.flatbuf.CompressionType.SNAPPY);
3739

3840
private final byte type;
3941

vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public CompressionCodec createCodec(CompressionUtil.CodecType codecType) {
5959
case NO_COMPRESSION:
6060
return NoCompressionCodec.INSTANCE;
6161
case LZ4_FRAME:
62+
case SNAPPY:
6263
case ZSTD:
6364
throw new IllegalArgumentException(
6465
"Please add arrow-compression module to use CommonsCompressionFactory for "

0 commit comments

Comments
 (0)