Skip to content

Commit ad8ee0c

Browse files
committed
Add support for reading parquet file thanks to arrow-dataset #576
1 parent 1bb565b commit ad8ee0c

File tree

8 files changed

+84
-13
lines changed

8 files changed

+84
-13
lines changed

dataframe-arrow/api/dataframe-arrow.api

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public final class org/jetbrains/kotlinx/dataframe/io/ArrowReadingKt {
3737
public static synthetic fun readArrowIPC$default (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;[BLorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;ILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
3838
public static final fun toDataFrame (Lorg/apache/arrow/vector/ipc/ArrowReader;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
3939
public static synthetic fun toDataFrame$default (Lorg/apache/arrow/vector/ipc/ArrowReader;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;ILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
40+
public static final fun readParquet (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;Ljava/net/URL;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
41+
public static synthetic fun readParquet$default (Lorg/jetbrains/kotlinx/dataframe/DataFrame$Companion;Ljava/net/URL;Lorg/jetbrains/kotlinx/dataframe/api/NullabilityOptions;ILjava/lang/Object;)Lorg/jetbrains/kotlinx/dataframe/DataFrame;
4042
}
4143

4244
public final class org/jetbrains/kotlinx/dataframe/io/ArrowTypesMatchingKt {

dataframe-arrow/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ dependencies {
2020
implementation(libs.arrow.vector)
2121
implementation(libs.arrow.format)
2222
implementation(libs.arrow.memory)
23+
implementation(libs.arrow.dataset)
2324
implementation(libs.commonsCompress)
2425
implementation(libs.kotlin.reflect)
2526
implementation(libs.kotlin.datetimeJvm)

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.jetbrains.kotlinx.dataframe.io
22

3+
import org.apache.arrow.dataset.file.FileFormat
34
import org.apache.arrow.memory.RootAllocator
45
import org.apache.arrow.vector.ipc.ArrowReader
56
import org.apache.commons.compress.utils.SeekableInMemoryByteChannel
@@ -185,3 +186,11 @@ public fun DataFrame.Companion.readArrow(
185186
*/
186187
public fun ArrowReader.toDataFrame(nullability: NullabilityOptions = NullabilityOptions.Infer): AnyFrame =
187188
DataFrame.Companion.readArrowImpl(this, nullability)
189+
190+
/**
191+
* Read [Parquet](https://parquet.apache.org/) data from existing [url] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html)
192+
*/
193+
public fun DataFrame.Companion.readParquet(
194+
url: URL,
195+
nullability: NullabilityOptions = NullabilityOptions.Infer,
196+
): AnyFrame = readArrowDataset(url.toString(), FileFormat.PARQUET, nullability)

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ import kotlinx.datetime.LocalTime
66
import kotlinx.datetime.toKotlinLocalDate
77
import kotlinx.datetime.toKotlinLocalDateTime
88
import kotlinx.datetime.toKotlinLocalTime
9+
import org.apache.arrow.dataset.file.FileFormat
10+
import org.apache.arrow.dataset.file.FileSystemDatasetFactory
11+
import org.apache.arrow.dataset.jni.DirectReservationListener
12+
import org.apache.arrow.dataset.jni.NativeMemoryPool
13+
import org.apache.arrow.dataset.scanner.ScanOptions
914
import org.apache.arrow.memory.RootAllocator
1015
import org.apache.arrow.vector.BigIntVector
1116
import org.apache.arrow.vector.BitVector
@@ -414,3 +419,27 @@ internal fun DataFrame.Companion.readArrowImpl(
414419
return flattened.concatKeepingSchema()
415420
}
416421
}
422+
423+
internal fun DataFrame.Companion.readArrowDataset(
424+
fileUri: String,
425+
fileFormat: FileFormat,
426+
nullability: NullabilityOptions = NullabilityOptions.Infer,
427+
): AnyFrame {
428+
val scanOptions = ScanOptions(32768)
429+
RootAllocator().use { allocator ->
430+
FileSystemDatasetFactory(
431+
allocator,
432+
NativeMemoryPool.createListenable(DirectReservationListener.instance()),
433+
fileFormat,
434+
fileUri,
435+
).use { datasetFactory ->
436+
datasetFactory.finish().use { dataset ->
437+
dataset.newScan(scanOptions).use { scanner ->
438+
scanner.scanBatches().use { reader ->
439+
return readArrow(reader, nullability)
440+
}
441+
}
442+
}
443+
}
444+
}
445+
}

dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,4 +653,17 @@ internal class ArrowKtTest {
653653
DataFrame.readArrow(dbArrowReader) shouldBe expected
654654
}
655655
}
656+
657+
@Test
658+
fun testReadParquet() {
659+
val path = testResource("test.arrow.parquet").path
660+
val dataFrame = DataFrame.readParquet(URL("file:$path"))
661+
dataFrame.rowsCount() shouldBe 300
662+
assertEstimations(
663+
exampleFrame = dataFrame,
664+
expectedNullable = false,
665+
hasNulls = false,
666+
fromParquet = true,
667+
)
668+
}
656669
}

dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@ import java.time.LocalTime as JavaLocalTime
2424
* Assert that we have got the same data that was originally saved on example creation.
2525
* Example generation project is currently located at https://github.com/Kopilov/arrow_example
2626
*/
27-
internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean, hasNulls: Boolean) {
27+
internal fun assertEstimations(
28+
exampleFrame: AnyFrame,
29+
expectedNullable: Boolean,
30+
hasNulls: Boolean,
31+
fromParquet: Boolean = false,
32+
) {
2833
/**
2934
* In [exampleFrame] we get two concatenated batches. To assert the estimations, we should transform frame row number to batch row number
3035
*/
@@ -142,16 +147,27 @@ internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean
142147
assertValueOrNull(iBatch(i), element, JavaLocalDate.ofEpochDay(iBatch(i).toLong() * 30).toKotlinLocalDate())
143148
}
144149

145-
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
146-
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
147-
datetimeCol.forEachIndexed { i, element ->
148-
assertValueOrNull(
149-
rowNumber = iBatch(i),
150-
actual = element,
151-
expected = JavaLocalDateTime
152-
.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC)
153-
.toKotlinLocalDateTime(),
154-
)
150+
if (fromParquet) {
151+
// parquet format have only one type of date: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date without time
152+
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDate?>
153+
datetimeCol.type() shouldBe typeOf<LocalDate>().withNullability(expectedNullable)
154+
datetimeCol.forEachIndexed { i, element ->
155+
assertValueOrNull(iBatch(i), element, JavaLocalDate.ofEpochDay(iBatch(i).toLong() * 30).toKotlinLocalDate())
156+
}
157+
} else {
158+
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
159+
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
160+
datetimeCol.forEachIndexed { i, element ->
161+
assertValueOrNull(
162+
rowNumber = iBatch(i),
163+
actual = element,
164+
expected = JavaLocalDateTime.ofEpochSecond(
165+
iBatch(i).toLong() * 60 * 60 * 24 * 30,
166+
0,
167+
ZoneOffset.UTC,
168+
).toKotlinLocalDateTime(),
169+
)
170+
}
155171
}
156172

157173
val timeSecCol = exampleFrame["time32_seconds"] as DataColumn<LocalTime?>
Binary file not shown.

gradle/libs.versions.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ junit-platform = "1.11.3"
4444
kotestAsserions = "6.0.0.M1"
4545

4646
jsoup = "1.18.3"
47-
arrow = "18.1.0"
47+
arrow = "18.3.0"
4848
kodex = "0.4.3"
4949
simpleGit = "2.2.1"
5050
dependencyVersions = "0.51.0"
@@ -53,7 +53,7 @@ shadow = "8.3.5"
5353
android-gradle-api = "7.3.1" # need to revise our tests to update
5454
ktor = "3.0.1" # needs jupyter compatibility with Kotlin 2.1 to update
5555
kotlin-compile-testing = "1.6.0"
56-
duckdb = "1.1.3"
56+
duckdb = "1.2.2.0"
5757
buildconfig = "5.5.1"
5858
benchmark = "0.4.12"
5959

@@ -116,6 +116,7 @@ arrow-format = { group = "org.apache.arrow", name = "arrow-format", version.ref
116116
arrow-vector = { group = "org.apache.arrow", name = "arrow-vector", version.ref = "arrow" }
117117
arrow-memory = { group = "org.apache.arrow", name = "arrow-memory-unsafe", version.ref = "arrow" }
118118
arrow-c-data = { group = "org.apache.arrow", name = "arrow-c-data", version.ref = "arrow" }
119+
arrow-dataset = { group = "org.apache.arrow", name = "arrow-dataset", version.ref = "arrow" }
119120

120121
geotools-main = { module = "org.geotools:gt-main", version.ref = "geotools" }
121122
geotools-shapefile = { module = "org.geotools:gt-shapefile", version.ref = "geotools" }

0 commit comments

Comments
 (0)