Skip to content

Commit 4634c0b

Browse files
Ship OUTER JOINs to datanodes in following cases
1. An equi-outer-join between the distribution columns of two distributed tables such that the tables are distributed on same set of nodes and distribution strategy is same and datatype of distribution column is same, is shippable to the datanodes where the distributed tables are distributed. 2. An outer-join between a distributed and a replicated relation is shippable to the datanodes where distributed relation is distributed if the replicated relation is replicated on those nodes and the outer side of join is distributed relation. 3. An outer-join between two replicated relations shippable to a datanode where both the relations are available. Justification for shippability can be found in mail thread with subject "shipping outer joins" on pgxc-hackers mailing list.
1 parent 15f9527 commit 4634c0b

File tree

7 files changed

+474
-155
lines changed

7 files changed

+474
-155
lines changed

src/backend/optimizer/path/pgxcpath.c

+1-74
Original file line numberDiff line numberDiff line change
@@ -154,79 +154,6 @@ pgxc_find_remotequery_path(RelOptInfo *rel)
154154
return NULL;
155155
}
156156

157-
/*
158-
* See if the nodelists corresponding to the RemoteQuery paths being joined can
159-
* be merged.
160-
*/
161-
ExecNodes *
162-
pgxc_is_join_reducible(ExecNodes *inner_en, ExecNodes *outer_en, Relids in_relids,
163-
Relids out_relids, JoinType jointype, List *join_quals,
164-
List *rtables)
165-
{
166-
ExecNodes *join_exec_nodes;
167-
bool merge_dist_equijoin = false;
168-
bool merge_replicated_only;
169-
ListCell *cell;
170-
171-
/*
172-
* If either of inner_en or outer_en is NULL, return NULL. We can't ship the
173-
* join when either of the sides do not have datanodes to ship to.
174-
*/
175-
if (!outer_en || !inner_en)
176-
return NULL;
177-
/*
178-
* We only support reduction of INNER, LEFT [OUTER] and FULL [OUTER] joins.
179-
* RIGHT [OUTER] join is converted to LEFT [OUTER] join during join tree
180-
* deconstruction.
181-
*/
182-
if (jointype != JOIN_INNER && jointype != JOIN_LEFT && jointype != JOIN_FULL)
183-
return NULL;
184-
/*
185-
* When join type is other than INNER, we will get the unmatched rows on
186-
* either side. The result will be correct only in case both the sides of
187-
* join are replicated. In case one of the sides is replicated, and the
188-
* unmatched results are not coming from that side, it might be possible to
189-
* ship such join, but this needs to be validated from correctness
190-
* perspective.
191-
*/
192-
merge_replicated_only = (jointype != JOIN_INNER);
193-
194-
/*
195-
* If both the relations are distributed with similar distribution strategy
196-
* walk through the restriction info for this JOIN to find if there is an
197-
* equality condition on the distributed columns of both the relations. In
198-
* such case, we can reduce the JOIN if the distribution nodelist is also
199-
* same.
200-
*/
201-
if (IsExecNodesDistributedByValue(inner_en) &&
202-
inner_en->baselocatortype == outer_en->baselocatortype &&
203-
!merge_replicated_only)
204-
{
205-
foreach(cell, join_quals)
206-
{
207-
Node *qual = (Node *)lfirst(cell);
208-
if (pgxc_qual_has_dist_equijoin(in_relids,
209-
out_relids, InvalidOid,
210-
qual, rtables) &&
211-
pgxc_is_expr_shippable((Expr *)qual, NULL))
212-
{
213-
merge_dist_equijoin = true;
214-
break;
215-
}
216-
}
217-
}
218-
/*
219-
* If the ExecNodes of inner and outer nodes can be merged, the JOIN is
220-
* shippable
221-
* PGXCTODO: Can we take into consideration the JOIN conditions to optimize
222-
* further?
223-
*/
224-
join_exec_nodes = pgxc_merge_exec_nodes(inner_en, outer_en,
225-
merge_dist_equijoin,
226-
merge_replicated_only);
227-
return join_exec_nodes;
228-
}
229-
230157
/*
231158
* pgxc_ship_remotejoin
232159
* If there are RemoteQuery paths for the rels being joined, check if the join
@@ -296,7 +223,7 @@ create_joinrel_rqpath(PlannerInfo *root, RelOptInfo *joinrel,
296223
* If the nodelists on both the sides of JOIN can be merged, the JOIN is
297224
* shippable.
298225
*/
299-
join_en = pgxc_is_join_reducible(inner_en, outer_en,
226+
join_en = pgxc_is_join_shippable(inner_en, outer_en,
300227
innerrel->relids, outerrel->relids,
301228
jointype, join_quals, root->parse->rtable);
302229
if (join_en)

src/backend/optimizer/util/pgxcship.c

+159-50
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ pgxc_FQS_find_datanodes_recurse(Node *node, Query *query, Bitmapset **relids)
300300
* Check whether the JOIN is pushable to the datanodes and
301301
* find the datanodes where the JOIN can be pushed to
302302
*/
303-
result_en = pgxc_is_join_reducible(result_en, en, from_relids,
303+
result_en = pgxc_is_join_shippable(result_en, en, from_relids,
304304
fle_relids, JOIN_INNER,
305305
make_ands_implicit((Expr *)from_expr->quals),
306306
query_rtable);
@@ -349,7 +349,7 @@ pgxc_FQS_find_datanodes_recurse(Node *node, Query *query, Bitmapset **relids)
349349
* Check whether the JOIN is pushable or not, and find the datanodes
350350
* where the JOIN can be pushed to.
351351
*/
352-
result_en = pgxc_is_join_reducible(ren, len, r_relids, l_relids,
352+
result_en = pgxc_is_join_shippable(ren, len, r_relids, l_relids,
353353
join_expr->jointype,
354354
make_ands_implicit((Expr *)join_expr->quals),
355355
query_rtable);
@@ -1074,14 +1074,36 @@ pgxc_shippability_walker(Node *node, Shippability_context *sc_context)
10741074
*/
10751075
if (!pgxc_test_shippability_reason(sc_context, SS_NO_NODES))
10761076
{
1077-
sc_context->sc_subquery_en = pgxc_merge_exec_nodes(sublink_en,
1078-
sc_context->sc_subquery_en,
1079-
false,
1080-
true);
1077+
/*
1078+
* If this is the first time we are finding out the nodes for
1079+
* SubLink, we don't have anything to merge, just assign.
1080+
*/
1081+
if (!sc_context->sc_subquery_en)
1082+
sc_context->sc_subquery_en = sublink_en;
1083+
/*
1084+
* Merge if only the accumulated SubLink ExecNodes and the
1085+
* ExecNodes for this subquery are both replicated.
1086+
*/
1087+
else if (sublink_en && IsExecNodesReplicated(sublink_en) &&
1088+
IsExecNodesReplicated(sc_context->sc_subquery_en))
1089+
{
1090+
sc_context->sc_subquery_en = pgxc_merge_exec_nodes(sublink_en,
1091+
sc_context->sc_subquery_en);
1092+
}
1093+
else
1094+
sc_context->sc_subquery_en = NULL;
1095+
1096+
/*
1097+
* If we didn't find a cumulative ExecNodes, set shippability
1098+
* reason, so that we don't bother merging future sublinks.
1099+
*/
10811100
if (!sc_context->sc_subquery_en)
10821101
pgxc_set_shippability_reason(sc_context, SS_NO_NODES);
10831102
}
1103+
else
1104+
Assert(!sc_context->sc_subquery_en);
10841105

1106+
/* Check if the type of sublink result is shippable */
10851107
pgxc_set_exprtype_shippability(exprType(node), sc_context);
10861108

10871109
/* Wipe out subselect as explained above and walk the copied tree */
@@ -1226,20 +1248,29 @@ pgxc_is_query_shippable(Query *query, int query_level)
12261248
* shipped.
12271249
*/
12281250
pgxc_shippability_walker((Node *)query, &sc_context);
1251+
1252+
exec_nodes = sc_context.sc_exec_nodes;
12291253
/*
1230-
* We have merged the nodelists and distributions of all subqueries seen in
1231-
* the query tree, merge it with the same obtained for the relations
1232-
* involved in the query.
1254+
* The shippability context contains two ExecNodes, one for the subLinks
1255+
* involved in the Query and other for the relation involved in FromClause.
1256+
* They are computed at different times while scanning the query. Merge both
1257+
* of them if they are both replicated. If query doesn't have SubLinks, we
1258+
* don't need to consider corresponding ExecNodes.
12331259
* PGXC_FQS_TODO:
12341260
* Merge the subquery ExecNodes if both of them are replicated.
12351261
* The logic to merge node lists with other distribution
12361262
* strategy is not clear yet.
12371263
*/
1238-
exec_nodes = sc_context.sc_exec_nodes;
1239-
if (exec_nodes)
1240-
exec_nodes = pgxc_merge_exec_nodes(exec_nodes,
1241-
sc_context.sc_subquery_en, false,
1242-
true);
1264+
if (query->hasSubLinks)
1265+
{
1266+
if (exec_nodes && IsExecNodesReplicated(exec_nodes) &&
1267+
sc_context.sc_subquery_en &&
1268+
IsExecNodesReplicated(sc_context.sc_subquery_en))
1269+
exec_nodes = pgxc_merge_exec_nodes(exec_nodes,
1270+
sc_context.sc_subquery_en);
1271+
else
1272+
exec_nodes = NULL;
1273+
}
12431274

12441275
/*
12451276
* Look at the information gathered by the walker in Shippability_context and that
@@ -1366,11 +1397,11 @@ pgxc_is_func_shippable(Oid funcid)
13661397

13671398

13681399
/*
1369-
* pgxc_qual_has_dist_equijoin
1400+
* pgxc_find_dist_equijoin_qual
13701401
* Check equijoin conditions on given relations
13711402
*/
1372-
bool
1373-
pgxc_qual_has_dist_equijoin(Relids varnos_1,
1403+
Expr *
1404+
pgxc_find_dist_equijoin_qual(Relids varnos_1,
13741405
Relids varnos_2, Oid distcol_type, Node *quals, List *rtable)
13751406
{
13761407
List *lquals;
@@ -1446,9 +1477,9 @@ pgxc_qual_has_dist_equijoin(Relids varnos_1,
14461477
!op_hashjoinable(op->opno, exprType((Node *)lvar)))
14471478
continue;
14481479
/* Found equi-join condition on distribution columns */
1449-
return true;
1480+
return qual_expr;
14501481
}
1451-
return false;
1482+
return NULL;
14521483
}
14531484

14541485

@@ -1459,8 +1490,7 @@ pgxc_qual_has_dist_equijoin(Relids varnos_1,
14591490
* If both exec_nodes can not be merged, it returns NULL.
14601491
*/
14611492
ExecNodes *
1462-
pgxc_merge_exec_nodes(ExecNodes *en1, ExecNodes *en2, bool merge_dist_equijoin,
1463-
bool merge_replicated_only)
1493+
pgxc_merge_exec_nodes(ExecNodes *en1, ExecNodes *en2)
14641494
{
14651495
ExecNodes *merged_en = makeNode(ExecNodes);
14661496
ExecNodes *tmp_en;
@@ -1499,22 +1529,11 @@ pgxc_merge_exec_nodes(ExecNodes *en1, ExecNodes *en2, bool merge_dist_equijoin,
14991529
merged_en->nodeList = list_intersection_int(en1->nodeList,
15001530
en2->nodeList);
15011531
merged_en->baselocatortype = LOCATOR_TYPE_REPLICATED;
1502-
/* No intersection, so has to go though standard planner... */
15031532
if (!merged_en->nodeList)
15041533
FreeExecNodes(&merged_en);
15051534
return merged_en;
15061535
}
15071536

1508-
/*
1509-
* We are told to merge the nodelists if both the distributions are
1510-
* replicated. We checked that above, so bail out
1511-
*/
1512-
if (merge_replicated_only)
1513-
{
1514-
FreeExecNodes(&merged_en);
1515-
return merged_en;
1516-
}
1517-
15181537
if (IsExecNodesReplicated(en1) &&
15191538
IsExecNodesColumnDistributed(en2))
15201539
{
@@ -1572,29 +1591,19 @@ pgxc_merge_exec_nodes(ExecNodes *en1, ExecNodes *en2, bool merge_dist_equijoin,
15721591
/*
15731592
* Distributed/distributed case
15741593
* If the caller has suggested that this is an equi-join between two
1575-
* distributed results, check if both are distributed by the same
1576-
* distribution strategy, and have the same nodes in the distribution
1577-
* node list. The caller should have made sure that distribution column
1578-
* type is same.
1594+
* distributed results, check that they have the same nodes in the distribution
1595+
* node list. The caller is expected to fully decide whether to merge
1596+
* the nodes or not.
15791597
*/
1580-
if (merge_dist_equijoin &&
1581-
en1->baselocatortype == en2->baselocatortype &&
1582-
!list_difference_int(en1->nodeList, en2->nodeList) &&
1598+
if (!list_difference_int(en1->nodeList, en2->nodeList) &&
15831599
!list_difference_int(en2->nodeList, en1->nodeList))
15841600
{
15851601
merged_en->nodeList = list_copy(en1->nodeList);
1586-
merged_en->baselocatortype = en1->baselocatortype;
1602+
if (en1->baselocatortype == en2->baselocatortype)
1603+
merged_en->baselocatortype = en1->baselocatortype;
1604+
else
1605+
merged_en->baselocatortype = LOCATOR_TYPE_DISTRIBUTED;
15871606
}
1588-
/*
1589-
* If both the relations are distributed but have only one node in the
1590-
* node list, the JOIN can be pushed down if the single node is same for
1591-
* both the relations.
1592-
* PGXCTODO: Should we set the locatortype as REPLICATED for such
1593-
* relation/s in first place?
1594-
*/
1595-
else if (list_length(en1->nodeList) == 1 && list_length(en2->nodeList) == 1 &&
1596-
(merged_en->nodeList = list_intersection_int(en1->nodeList, en2->nodeList)))
1597-
merged_en->baselocatortype = LOCATOR_TYPE_DISTRIBUTED;
15981607
else
15991608
FreeExecNodes(&merged_en);
16001609
return merged_en;
@@ -1894,3 +1903,103 @@ pgxc_check_fk_shippability(RelationLocInfo *parentLocInfo,
18941903

18951904
return result;
18961905
}
1906+
1907+
/*
1908+
* pgxc_is_join_reducible
1909+
* The shippability of JOIN is decided in following steps
1910+
* 1. Are the JOIN conditions shippable?
1911+
* For INNER JOIN it's possible to apply some of the conditions at the
1912+
* Datanodes and others at coordinator. But for other JOINs, JOIN conditions
1913+
* decide which tuples on the OUTER side are appended with NULL columns from
1914+
* INNER side, we need all the join conditions to be shippable for the join to
1915+
* be shippable.
1916+
* 2. Do the JOIN conditions have quals that will make it shippable?
1917+
* When both sides of JOIN are replicated, irrespective of the quals the JOIN
1918+
* is shippable.
1919+
* INNER joins between replicated and distributed relation are shippable
1920+
* irrespective of the quals. OUTER join between replicated and distributed
1921+
* relation is shippable if distributed relation is the outer relation.
1922+
* All joins between hash/modulo distributed relations are shippable if they
1923+
* have equi-join on the distributed column, such that distribution columns
1924+
* have same datatype and same distribution strategy.
1925+
* 3. Are datanodes where the joining relations exist, compatible?
1926+
* Joins between replicated relations are shippable if both relations share a
1927+
* datanode. Joins between distributed relations are shippable if both
1928+
* relations are distributed on same set of Datanodes. Join between replicated
1929+
* and distributed relations is shippable is replicated relation is replicated
1930+
* on all nodes where distributed relation is distributed.
1931+
*
1932+
* The first step is to be applied by the caller of this function.
1933+
*/
1934+
ExecNodes *
1935+
pgxc_is_join_shippable(ExecNodes *inner_en, ExecNodes *outer_en, Relids in_relids,
1936+
Relids out_relids, JoinType jointype, List *join_quals,
1937+
List *rtables)
1938+
{
1939+
bool merge_nodes = false;
1940+
1941+
/*
1942+
* If either of inner_en or outer_en is NULL, return NULL. We can't ship the
1943+
* join when either of the sides do not have datanodes to ship to.
1944+
*/
1945+
if (!outer_en || !inner_en)
1946+
return NULL;
1947+
/*
1948+
* We only support reduction of INNER, LEFT [OUTER] and FULL [OUTER] joins.
1949+
* RIGHT [OUTER] join is converted to LEFT [OUTER] join during join tree
1950+
* deconstruction.
1951+
*/
1952+
if (jointype != JOIN_INNER && jointype != JOIN_LEFT && jointype != JOIN_FULL)
1953+
return NULL;
1954+
1955+
/* If both sides are replicated or have single node each, we ship any kind of JOIN */
1956+
if ((IsExecNodesReplicated(inner_en) && IsExecNodesReplicated(outer_en)) ||
1957+
(list_length(inner_en->nodeList) == 1 &&
1958+
list_length(outer_en->nodeList) == 1))
1959+
merge_nodes = true;
1960+
1961+
/* If both sides are distributed, ... */
1962+
else if (IsExecNodesColumnDistributed(inner_en) &&
1963+
IsExecNodesColumnDistributed(outer_en))
1964+
{
1965+
/*
1966+
* If two sides are distributed in the same manner by a value, with an
1967+
* equi-join on the distribution column and that condition
1968+
* is shippable, ship the join if node lists from both sides can be
1969+
* merged.
1970+
*/
1971+
if (inner_en->baselocatortype == outer_en->baselocatortype &&
1972+
IsExecNodesDistributedByValue(inner_en))
1973+
{
1974+
Expr *equi_join_expr = pgxc_find_dist_equijoin_qual(in_relids,
1975+
out_relids, InvalidOid,
1976+
(Node *)join_quals, rtables);
1977+
if (equi_join_expr && pgxc_is_expr_shippable(equi_join_expr, NULL))
1978+
merge_nodes = true;
1979+
}
1980+
}
1981+
/*
1982+
* If outer side is distributed and inner side is replicated, we can ship
1983+
* LEFT OUTER and INNER join.
1984+
*/
1985+
else if (IsExecNodesColumnDistributed(outer_en) &&
1986+
IsExecNodesReplicated(inner_en) &&
1987+
(jointype == JOIN_INNER || jointype == JOIN_LEFT))
1988+
merge_nodes = true;
1989+
/*
1990+
* If outer side is replicated and inner side is distributed, we can ship
1991+
* only for INNER join.
1992+
*/
1993+
else if (IsExecNodesReplicated(outer_en) &&
1994+
IsExecNodesColumnDistributed(inner_en) &&
1995+
jointype == JOIN_INNER)
1996+
merge_nodes = true;
1997+
/*
1998+
* If the ExecNodes of inner and outer nodes can be merged, the JOIN is
1999+
* shippable
2000+
*/
2001+
if (merge_nodes)
2002+
return pgxc_merge_exec_nodes(inner_en, outer_en);
2003+
else
2004+
return NULL;
2005+
}

src/include/optimizer/pgxcplan.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ extern bool pgxc_query_contains_utility(List *queries);
132132
extern void pgxc_rqplan_adjust_tlist(RemoteQuery *rqplan);
133133

134134
extern Plan *pgxc_make_modifytable(PlannerInfo *root, Plan *topplan);
135-
extern ExecNodes *pgxc_is_join_reducible(ExecNodes *inner_en, ExecNodes *outer_en,
135+
extern ExecNodes *pgxc_is_join_shippable(ExecNodes *inner_en, ExecNodes *outer_en,
136136
Relids in_relids, Relids out_relids, JoinType jointype,
137137
List *join_quals, List *rtables);
138138

0 commit comments

Comments
 (0)