Skip to content

Commit 569448e

Browse files
committed
spark filter pushdown
Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent e0a0527 commit 569448e

14 files changed

Lines changed: 652 additions & 108 deletions

File tree

java/build.gradle.kts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,10 @@ allprojects {
8181
}
8282
}
8383

84-
tasks.register("format").get().dependsOn("spotlessApply")
84+
if (project.name == "vortex-spark_2.12") {
85+
// vortex-spark_2.12 and vortex-spark_2.13 share a projectDir; format from the 2.13 variant only.
86+
tasks.register("format") { enabled = false }
87+
} else {
88+
tasks.register("format").get().dependsOn("spotlessApply")
89+
}
8590
}

java/vortex-jni/src/main/java/dev/vortex/api/Expression.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ public static Expression isNull(Expression child) {
7373
return new Expression(NativeExpression.isNull(child.nativePointer()));
7474
}
7575

76+
public static Expression isNotNull(Expression child) {
77+
return new Expression(NativeExpression.isNotNull(child.nativePointer()));
78+
}
79+
7680
public static Expression literal(boolean value) {
7781
return new Expression(NativeExpression.literalBool(value, false));
7882
}

java/vortex-jni/src/main/java/dev/vortex/jni/NativeExpression.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ private NativeExpression() {}
2727

2828
public static native long isNull(long childPointer);
2929

30+
public static native long isNotNull(long childPointer);
31+
3032
public static native long literalBool(boolean value, boolean isNull);
3133

3234
public static native long literalI8(byte value, boolean isNull);

java/vortex-spark/src/main/java/dev/vortex/spark/VortexDataSourceV2.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.spark.sql.SparkSession;
2121
import org.apache.spark.sql.connector.catalog.Table;
2222
import org.apache.spark.sql.connector.catalog.TableProvider;
23+
import org.apache.spark.sql.connector.expressions.Expressions;
2324
import org.apache.spark.sql.connector.expressions.Transform;
2425
import org.apache.spark.sql.sources.DataSourceRegister;
2526
import org.apache.spark.sql.types.DataType;
@@ -118,6 +119,38 @@ public StructType inferSchema(CaseInsensitiveStringMap options) {
118119
return dataSchema;
119120
}
120121

122+
/**
123+
* Infers partition transforms by inspecting Hive-style {@code key=value} segments in the first listed file path.
124+
*
125+
* <p>Spark calls this before {@link #getTable(StructType, Transform[], Map)} when the caller did not provide
126+
* explicit partitioning. Returning identity transforms here lets downstream components (notably
127+
* {@link dev.vortex.spark.read.VortexScanBuilder}) tell which schema columns are encoded in the directory layout
128+
* rather than stored inside the Vortex files, which matters for predicate pushdown.
129+
*/
130+
@Override
131+
public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
132+
var paths = getPaths(options);
133+
if (paths.isEmpty()) {
134+
return new Transform[0];
135+
}
136+
var formatOptions = buildDataSourceOptions(options.asCaseSensitiveMap());
137+
String pathToInfer = Objects.requireNonNull(Iterables.getLast(paths));
138+
if (!pathToInfer.endsWith(".vortex")) {
139+
Optional<String> firstFile =
140+
NativeFiles.listFiles(VortexSparkSession.get(formatOptions), pathToInfer, formatOptions).stream()
141+
.findFirst();
142+
if (firstFile.isEmpty()) {
143+
return new Transform[0];
144+
}
145+
pathToInfer = firstFile.get();
146+
}
147+
Map<String, String> partitionValues = PartitionPathUtils.parsePartitionValues(pathToInfer);
148+
if (partitionValues.isEmpty()) {
149+
return new Transform[0];
150+
}
151+
return partitionValues.keySet().stream().map(Expressions::identity).toArray(Transform[]::new);
152+
}
153+
121154
/**
122155
* Creates a Vortex table instance with the given schema and properties.
123156
*

java/vortex-spark/src/main/java/dev/vortex/spark/VortexTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
5858
Map<String, String> opts = Maps.newHashMap();
5959
opts.putAll(formatOptions);
6060
opts.putAll(options);
61-
return new VortexScanBuilder(opts)
61+
return new VortexScanBuilder(opts, partitionTransforms)
6262
.addAllPaths(paths)
6363
.addAllColumns(Arrays.asList(CatalogV2Util.structTypeToV2Columns(schema)));
6464
}

java/vortex-spark/src/main/java/dev/vortex/spark/read/PrefetchingIterator.java

Lines changed: 0 additions & 94 deletions
This file was deleted.

0 commit comments

Comments
 (0)