Skip to content

Commit

Permalink
feat: support parsing of SQL queries with APPLY
Browse files Browse the repository at this point in the history
This change adds support for parsing of SQL queries with APPLY (join with
correlated subquery), and to build OuterReferences map of correlated variables
present in the query's join predicates. The OuterRefs will be used while
constructing Substrait plans to bind correlated variables. The change also
adds few example queries which depend on APPLY / LATERAL operators.

This change still does not map calcite-correlated-join to Substrait, as the
spec for APPLY is still not approved. As such, while the parsing of calcite
query plans will succeed after this change, the unit tests and run time
conversion will continue to fail in the final step of building the
Substrait plan. Additional changes are needed to support APPLY.

Refs #substrait-io/substrait/issues/357
  • Loading branch information
ashvina committed Nov 24, 2022
1 parent 6319d54 commit 67cd3c5
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Map;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.logical.LogicalCorrelate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rex.*;
Expand Down Expand Up @@ -48,6 +49,32 @@ public RelNode visit(LogicalFilter filter) throws RuntimeException {
return super.visit(filter);
}

@Override
public RelNode visit(LogicalCorrelate correlate) throws RuntimeException {
for (CorrelationId id : correlate.getVariablesSet()) {
if (!nestedDepth.containsKey(id)) {
nestedDepth.put(id, 0);
}
}

apply(correlate.getLeft());

// Correlated join is a special case. The right-rel is a correlated sub-query but not a REX. So,
// the RexVisitor cannot be applied to it to correctly compute the depth map. Hence, we need to
// manually compute the depth map for the right-rel.
for (Map.Entry<CorrelationId, Integer> entry : nestedDepth.entrySet()) {
nestedDepth.put(entry.getKey(), entry.getValue() + 1);
}

apply(correlate.getRight()); // look inside sub-queries

for (Map.Entry<CorrelationId, Integer> entry : nestedDepth.entrySet()) {
nestedDepth.put(entry.getKey(), entry.getValue() - 1);
}

return correlate;
}

@Override
public RelNode visitOther(RelNode other) throws RuntimeException {
for (RelNode child : other.getInputs()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ WHERE p_size <
AND PS.ps_suppkey = l.l_suppkey))
Filter --- $coor0
Filter --- $corr0
/ \ condition
/ p_size < RexSubquery
Scan(P) |
Expand All @@ -23,7 +23,7 @@ WHERE p_size <
|
Project
|
Filter --- $coor2
Filter --- $corr2
/ \
/ \
Scan (L) \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ protected SqlConverterBase(FeatureBoard features) {
new ProxyingMetadataHandlerProvider(DefaultRelMetadataProvider.INSTANCE);
return new RelMetadataQuery(handler);
});
parserConfig = SqlParser.Config.DEFAULT.withParserFactory(SqlDdlParserImpl.FACTORY);
featureBoard = features == null ? FEATURES_DEFAULT : features;
parserConfig =
SqlParser.Config.DEFAULT
.withParserFactory(SqlDdlParserImpl.FACTORY)
.withConformance(featureBoard.sqlConformanceMode());
}

protected static final SimpleExtension.ExtensionCollection EXTENSION_COLLECTION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
@SuppressWarnings("UnstableApiUsage")
@Value.Enclosing
public class SubstraitRelVisitor extends RelNodeVisitor<Rel, RuntimeException> {

static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(SubstraitRelVisitor.class);
private static final FeatureBoard FEATURES_DEFAULT = ImmutableFeatureBoard.builder().build();
Expand Down Expand Up @@ -196,6 +197,21 @@ public Rel visit(LogicalJoin join) {

@Override
public Rel visit(LogicalCorrelate correlate) {
// left input of correlated-join is similar to the left input of a logical join
var left = apply(correlate.getLeft());

// right input of correlated-join is similar to a correlated sub-query
var correlatedRight = apply(correlate.getRight());

var correlationId = correlate.getCorrelationId().getId();

var joinType =
switch (correlate.getJoinType()) {
case INNER -> Join.JoinType.INNER; // corresponds to CROSS APPLY join
case LEFT -> Join.JoinType.LEFT; // corresponds to OUTER APPLY join
default -> throw new UnsupportedOperationException(
"Unsupported correlated join type: " + correlate.getJoinType());
};
return super.visit(correlate);
}

Expand Down
69 changes: 69 additions & 0 deletions isthmus/src/test/java/io/substrait/isthmus/ApplyJoinPlanTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.substrait.isthmus;

import org.apache.calcite.adapter.tpcds.TpcdsSchema;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class ApplyJoinPlanTest {

@Test
public void lateralJoinQuery() {
TpcdsSchema schema = new TpcdsSchema(1.0);
String sql;
sql =
"""
SELECT ss_sold_date_sk, ss_item_sk, ss_customer_sk
FROM store_sales CROSS JOIN LATERAL
(select i_item_sk from item where item.i_item_sk = store_sales.ss_item_sk)""";

SqlToSubstrait s = new SqlToSubstrait();
Assertions.assertThrows(
UnsupportedOperationException.class,
() -> s.execute(sql, "tpcds", schema),
"Lateral join is not supported");
}

@Test
public void outerApplyQuery() throws SqlParseException {
TpcdsSchema schema = new TpcdsSchema(1.0);
String sql;
sql =
"""
SELECT ss_sold_date_sk, ss_item_sk, ss_customer_sk
FROM store_sales OUTER APPLY
(select i_item_sk from item where item.i_item_sk = store_sales.ss_item_sk)""";

FeatureBoard featureBoard =
ImmutableFeatureBoard.builder()
.sqlConformanceMode(SqlConformanceEnum.SQL_SERVER_2008)
.build();
SqlToSubstrait s = new SqlToSubstrait(featureBoard);
Assertions.assertThrows(
UnsupportedOperationException.class,
() -> s.execute(sql, "tpcds", schema),
"APPLY is not supported");
}

@Test
public void crossApplyQuery() throws SqlParseException {
TpcdsSchema schema = new TpcdsSchema(1.0);
String sql;
sql =
"""
SELECT ss_sold_date_sk, ss_item_sk, ss_customer_sk
FROM store_sales CROSS APPLY
(select i_item_sk from item where item.i_item_sk = store_sales.ss_item_sk)""";

FeatureBoard featureBoard =
ImmutableFeatureBoard.builder()
.sqlConformanceMode(SqlConformanceEnum.SQL_SERVER_2008)
.build();
SqlToSubstrait s = new SqlToSubstrait(featureBoard);
Assertions.assertThrows(
UnsupportedOperationException.class,
() -> s.execute(sql, "tpcds", schema),
"APPLY is not supported");
}
}

0 comments on commit 67cd3c5

Please sign in to comment.