Skip to content

Commit f76cd88

Browse files
gustavodemoraisdawidwys
authored andcommitted
[FLINK-38554][table] Fix rowCount cost for FlinkLogicalMultiJoin
1 parent 31b44d2 commit f76cd88

File tree

3 files changed

+102
-1
lines changed

3 files changed

+102
-1
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMultiJoin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadata
155155

156156
final Double averageRowSize = mq.getAverageRowSize(input);
157157
final double dAverageRowSize = averageRowSize == null ? 100.0 : averageRowSize;
158-
rowCount *= inputRowCount;
158+
rowCount += inputRowCount;
159159
cpu += inputRowCount;
160160
io += inputRowCount * dAverageRowSize;
161161
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,45 @@ void testThreeWayLeftOuterJoinExecPlan() {
230230
+ "LEFT JOIN Payments p ON u.user_id_0 = p.user_id_2");
231231
}
232232

233+
@Test
234+
void testTwoWayJoinWithUnion() {
235+
util.tableEnv()
236+
.executeSql(
237+
"CREATE TABLE Orders2 ("
238+
+ " order_id STRING PRIMARY KEY NOT ENFORCED,"
239+
+ " user_id_1 STRING,"
240+
+ " product STRING"
241+
+ ") WITH ('connector' = 'values', 'changelog-mode' = 'I,D')");
242+
243+
util.verifyRelPlan(
244+
"WITH OrdersUnion as ("
245+
+ "SELECT * FROM Orders "
246+
+ "UNION ALL "
247+
+ "SELECT * FROM Orders2"
248+
+ ") "
249+
+ "SELECT * FROM OrdersUnion o "
250+
+ "LEFT JOIN Users u "
251+
+ "ON o.user_id_1 = u.user_id_0");
252+
}
253+
254+
@Test
255+
void testTwoWayJoinWithRank() {
256+
util.getTableEnv()
257+
.getConfig()
258+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true);
259+
260+
util.verifyRelPlan(
261+
"WITH JoinedEvents as ("
262+
+ "SELECT e1.id as id, e1.val, e1.rowtime as `rowtime`, e2.price "
263+
+ "FROM EventTable1 e1 "
264+
+ "JOIN EventTable2 e2 ON e1.id = e2.id) "
265+
+ "SELECT id, val, `rowtime` FROM ("
266+
+ "SELECT *, "
267+
+ "ROW_NUMBER() OVER (PARTITION BY id ORDER BY `rowtime` DESC) as ts "
268+
+ "FROM JoinedEvents) "
269+
+ "WHERE ts = 1");
270+
}
271+
233272
@Test
234273
void testFourWayComplexJoinRelPlan() {
235274
util.verifyRelPlan(

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -946,6 +946,68 @@ Calc(select=[user_id_0, CAST('Gus' AS VARCHAR(2147483647)) AS name, order_id, CA
946946
+- Exchange(distribution=[hash[user_id_2]])
947947
+- Calc(select=[payment_id, user_id_2], where=[(price > 10)])
948948
+- TableSourceScan(table=[[default_catalog, default_database, Payments, filter=[]]], fields=[payment_id, price, user_id_2])
949+
]]>
950+
</Resource>
951+
</TestCase>
952+
<TestCase name="testTwoWayJoinWithUnion">
953+
<Resource name="sql">
954+
<![CDATA[WITH OrdersUnion as (SELECT * FROM Orders UNION ALL SELECT * FROM Orders2) SELECT * FROM OrdersUnion o LEFT JOIN Users u ON o.user_id_1 = u.user_id_0]]>
955+
</Resource>
956+
<Resource name="ast">
957+
<![CDATA[
958+
LogicalProject(order_id=[$0], user_id_1=[$1], product=[$2], user_id_0=[$3], name=[$4], cash=[$5])
959+
+- LogicalJoin(condition=[=($1, $3)], joinType=[left])
960+
:- LogicalUnion(all=[true])
961+
: :- LogicalProject(order_id=[$0], user_id_1=[$1], product=[$2])
962+
: : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
963+
: +- LogicalProject(order_id=[$0], user_id_1=[$1], product=[$2])
964+
: +- LogicalTableScan(table=[[default_catalog, default_database, Orders2]])
965+
+- LogicalTableScan(table=[[default_catalog, default_database, Users]])
966+
]]>
967+
</Resource>
968+
<Resource name="optimized rel plan">
969+
<![CDATA[
970+
MultiJoin(commonJoinKey=[user_id_1], joinTypes=[LEFT], inputUniqueKeys=[noUniqueKey, (user_id_0)], joinConditions=[=(user_id_1, user_id_0)], select=[order_id,user_id_1,product,user_id_0,name,cash], rowType=[RecordType(VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) product, VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash)])
971+
:- Exchange(distribution=[hash[user_id_1]])
972+
: +- Union(all=[true], union=[order_id, user_id_1, product])
973+
: :- TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, user_id_1, product])
974+
: +- TableSourceScan(table=[[default_catalog, default_database, Orders2]], fields=[order_id, user_id_1, product])
975+
+- Exchange(distribution=[hash[user_id_0]])
976+
+- TableSourceScan(table=[[default_catalog, default_database, Users]], fields=[user_id_0, name, cash])
977+
]]>
978+
</Resource>
979+
</TestCase>
980+
<TestCase name="testTwoWayJoinWithRank">
981+
<Resource name="sql">
982+
<![CDATA[WITH JoinedEvents as (SELECT e1.id as id, e1.val, e1.rowtime as `rowtime`, e2.price FROM EventTable1 e1 JOIN EventTable2 e2 ON e1.id = e2.id) SELECT id, val, `rowtime` FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY `rowtime` DESC) as ts FROM JoinedEvents) WHERE ts = 1]]>
983+
</Resource>
984+
<Resource name="ast">
985+
<![CDATA[
986+
LogicalProject(id=[$0], val=[$1], rowtime=[$2])
987+
+- LogicalFilter(condition=[=($4, 1)])
988+
+- LogicalProject(id=[$0], val=[$1], rowtime=[$2], price=[$3], ts=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC NULLS LAST)])
989+
+- LogicalProject(id=[$0], val=[$1], rowtime=[$2], price=[$4])
990+
+- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
991+
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($2, 5000:INTERVAL SECOND)])
992+
: +- LogicalTableScan(table=[[default_catalog, default_database, EventTable1]])
993+
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($2, 5000:INTERVAL SECOND)])
994+
+- LogicalTableScan(table=[[default_catalog, default_database, EventTable2]])
995+
]]>
996+
</Resource>
997+
<Resource name="optimized rel plan">
998+
<![CDATA[
999+
Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[id], orderBy=[rowtime DESC], select=[id, val, rowtime])
1000+
+- Exchange(distribution=[hash[id]])
1001+
+- Calc(select=[id, val, rowtime])
1002+
+- MultiJoin(commonJoinKey=[id], joinTypes=[INNER], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[=(id, id0)], select=[id,val,rowtime,id0,price,rowtime0], rowType=[RecordType(VARCHAR(2147483647) id, INTEGER val, TIMESTAMP(3) rowtime, VARCHAR(2147483647) id0, DOUBLE price, TIMESTAMP(3) rowtime0)])
1003+
:- Exchange(distribution=[hash[id]])
1004+
: +- Calc(select=[id, val, CAST(rowtime AS TIMESTAMP(3)) AS rowtime])
1005+
: +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)])
1006+
: +- TableSourceScan(table=[[default_catalog, default_database, EventTable1]], fields=[id, val, rowtime])
1007+
+- Exchange(distribution=[hash[id]])
1008+
+- Calc(select=[id, price, CAST(rowtime AS TIMESTAMP(3)) AS rowtime])
1009+
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)])
1010+
+- TableSourceScan(table=[[default_catalog, default_database, EventTable2]], fields=[id, price, rowtime])
9491011
]]>
9501012
</Resource>
9511013
</TestCase>

0 commit comments

Comments
 (0)