-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add filter pushdown optimizations #300
base: main
Are you sure you want to change the base?
Conversation
…the prev/next tests)
…after pullup trnasformation to remove now-defunct children and renumber the remainder [RUN CI]
…oval, and name collisions, including multiple pullups happening together [RUN CI]
@@ -1,18 +1,16 @@ | |||
ROOT(columns=[('n', n)], orderings=[]) | |||
PROJECT(columns={'n': agg_0}) | |||
AGGREGATE(keys={}, aggregations={'agg_0': COUNT()}) | |||
FILTER(condition=True:bool, columns={'account_balance': account_balance}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice side effect: these True
filters also get deleted.
FILTER(condition=NOT(STARTSWITH(name, 'A':string)), columns={'key': key, 'region_name': region_name}) | ||
PROJECT(columns={'key': key, 'name': name, 'region_name': name}) | ||
PROJECT(columns={'key': key, 'region_name': name}) | ||
FILTER(condition=NOT(STARTSWITH(name, 'A':string)), columns={'key': key, 'name': name}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simple example: filter got pushed before a project
FILTER(condition=size == 15:int64 & ENDSWITH(part_type, 'BRASS':string), columns={'key': key, 'manufacturer': manufacturer}) | ||
SCAN(table=tpch.PART, columns={'key': p_partkey, 'manufacturer': p_mfgr, 'part_type': p_type, 'size': p_size}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good example: these conditions about the size & part type got pushed all the way to the scan
FILTER(condition=YEAR(order_date) == 1992:int64, columns={'customer_key': customer_key, 'part_type': part_type, 'supp_region': supp_region}) | ||
JOIN(conditions=[t0.order_key == t1.key], types=['inner'], columns={'customer_key': t1.customer_key, 'order_date': t1.order_date, 'part_type': t0.part_type, 'supp_region': t0.supp_region}) | ||
PROJECT(columns={'order_key': order_key, 'part_type': part_type, 'supp_region': name_7}) | ||
JOIN(conditions=[t0.supplier_key == t1.key], types=['left'], columns={'name_7': t1.name_7, 'order_key': t0.order_key, 'part_type': t0.part_type}) | ||
FILTER(condition=MONTH(ship_date) == 6:int64 & YEAR(ship_date) == 1992:int64, columns={'order_key': order_key, 'part_type': part_type, 'supplier_key': supplier_key}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good examples: these year/month filters got pushed way down to the scans to order/lineitem.
JOIN(conditions=[t0.region_key == t1.key], types=['inner'], columns={'key': t0.key}) | ||
SCAN(table=tpch.NATION, columns={'key': n_nationkey, 'region_key': n_regionkey}) | ||
FILTER(condition=name == 'EUROPE':string, columns={'key': key}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an interesting case. In the original version, the filter name_3 == EUROPE
happened after a left-join, and name_3
returned to a column from the RHS of the left-join. In the new version, we've pushed this filter into the RHS of the join, which has become an inner join, because any row that would have been a null match due to the left join would get deleted by the filter anyway. Therefore, the left join + filter is the same as filtering the input & doing an inner join.
@@ -2,8 +2,8 @@ ROOT(columns=[('name', name), ('okey', okey)], orderings=[]) | |||
PROJECT(columns={'name': name, 'okey': key_2}) | |||
JOIN(conditions=[t0.key == t1.nation_key], types=['left'], columns={'key_2': t1.key_2, 'name': t0.name}) | |||
SCAN(table=tpch.NATION, columns={'key': n_nationkey, 'name': n_name}) | |||
FILTER(condition=key_2 == 454791:int64, columns={'key_2': key_2, 'nation_key': nation_key}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is another simple example: previously this filter happened after the join, and now it got pushed into the RHS input.
FILTER(condition=r <= 30:int64, columns={'n': n, 'r': r}) | ||
FILTER(condition=ENDSWITH(name, '0':string), columns={'n': n, 'r': r}) | ||
PROJECT(columns={'n': name, 'name': name, 'r': RANKING(args=[], partition=[], order=[(acctbal):desc_first])}) | ||
SCAN(table=tpch.CUSTOMER, columns={'acctbal': c_acctbal, 'name': c_name}) | ||
FILTER(condition=r <= 30:int64 & ENDSWITH(name, '0':string), columns={'n': n, 'r': r}) | ||
PROJECT(columns={'n': name, 'name': name, 'r': RANKING(args=[], partition=[], order=[(acctbal):desc_first])}) | ||
SCAN(table=tpch.CUSTOMER, columns={'acctbal': c_acctbal, 'name': c_name}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Importantly: these filters got merged but neither one got pushed beneath the window function call (which would change the outputs to the window function).
JOIN(conditions=[t0.key == t1.supplier_key], types=['semi'], columns={'account_balance': t0.account_balance}, correl_name='corr3') | ||
PROJECT(columns={'account_balance': account_balance, 'global_avg_price': global_avg_price, 'key': key, 'supplier_avg_price': agg_0}) | ||
JOIN(conditions=[t0.key == t1.supplier_key], types=['left'], columns={'account_balance': t0.account_balance, 'agg_0': t1.agg_0, 'global_avg_price': t0.global_avg_price, 'key': t0.key}) | ||
FILTER(condition=nation_key == 19:int64, columns={'account_balance': account_balance, 'global_avg_price': global_avg_price, 'key': key}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also got pushed all the way to the scan to SUPPLIER
JOIN(conditions=[t0.nation_key == t1.key], types=['semi'], columns={'account_balance': t0.account_balance}, correl_name='corr7') | ||
PROJECT(columns={'account_balance': account_balance, 'nation_key': nation_key, 'tile': PERCENTILE(args=[], partition=[], order=[(account_balance):asc_last, (key):asc_last], n_buckets=10000)}) | ||
SCAN(table=tpch.SUPPLIER, columns={'account_balance': s_acctbal, 'key': s_suppkey, 'nation_key': s_nationkey}) | ||
FILTER(condition=rname == 'EUROPE':string & PERCENTILE(args=[], partition=[], order=[(acctbal):asc_last, (key_5):asc_last], n_buckets=10000) == corr7.tile, columns={'key': key}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The europe filter can NOT get pushed further than this, because it would change the results of the window function call.
SCAN(table=tpch.PARTSUPP, columns={'availqty': ps_availqty, 'supplier_key': ps_suppkey, 'supplycost': ps_supplycost}) | ||
JOIN(conditions=[t0.nation_key == t1.key], types=['inner'], columns={'key': t0.key}) | ||
SCAN(table=tpch.SUPPLIER, columns={'key': s_suppkey, 'nation_key': s_nationkey}) | ||
FILTER(condition=name == 'GERMANY':string, columns={'key': key}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These GERMANY
filters got pushed all the way to the scans
FILTER(condition=ship_date >= datetime.date(1995, 9, 1):date & ship_date < datetime.date(1995, 10, 1):date, columns={'discount': discount, 'extended_price': extended_price, 'part_key': part_key}) | ||
FILTER(condition=ship_date < datetime.date(1995, 10, 1):date & ship_date >= datetime.date(1995, 9, 1):date, columns={'discount': discount, 'extended_price': extended_price, 'part_key': part_key}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Harmless effect of the pushdown: even when no pushdown occurs, the filter conditions get sorted lexigraphically.
…urn it into inner join
JOIN(conditions=[t0.part_key == t1.key], types=['inner'], columns={'brand': t1.brand, 'container': t1.container, 'discount': t0.discount, 'extended_price': t0.extended_price, 'quantity': t0.quantity, 'size': t1.size}) | ||
FILTER(condition=ship_instruct == 'DELIVER IN PERSON':string & ISIN(ship_mode, ['AIR', 'AIR REG']:array[unknown]), columns={'discount': discount, 'extended_price': extended_price, 'part_key': part_key, 'quantity': quantity}) | ||
SCAN(table=tpch.LINEITEM, columns={'discount': l_discount, 'extended_price': l_extendedprice, 'part_key': l_partkey, 'quantity': l_quantity, 'ship_instruct': l_shipinstruct, 'ship_mode': l_shipmode}) | ||
FILTER(condition=size >= 1:int64, columns={'brand': brand, 'container': container, 'key': key, 'size': size}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Part of the conjunction got pushed down
@@ -6,14 +6,14 @@ ROOT(columns=[('NATION', NATION), ('O_YEAR', O_YEAR), ('AMOUNT', AMOUNT)], order | |||
PROJECT(columns={'nation_name': nation_name, 'o_year': YEAR(order_date), 'value': extended_price * 1:int64 - discount - supplycost * quantity}) | |||
JOIN(conditions=[t0.order_key == t1.key], types=['left'], columns={'discount': t0.discount, 'extended_price': t0.extended_price, 'nation_name': t0.nation_name, 'order_date': t1.order_date, 'quantity': t0.quantity, 'supplycost': t0.supplycost}) | |||
JOIN(conditions=[t0.part_key == t1.part_key & t0.supplier_key == t1.supplier_key], types=['inner'], columns={'discount': t1.discount, 'extended_price': t1.extended_price, 'nation_name': t0.nation_name, 'order_key': t1.order_key, 'quantity': t1.quantity, 'supplycost': t0.supplycost}) | |||
FILTER(condition=CONTAINS(name_7, 'green':string), columns={'nation_name': nation_name, 'part_key': part_key, 'supplier_key': supplier_key, 'supplycost': supplycost}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition is also pushable beneath a left join (turning it to inner) since if name_7
is null, the CONTAINS
condition will be False.
if aggregations: | ||
query = query.group_by(*keys) | ||
query = query.group_by(*sorted(keys)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixes a nondeterminism issue in the generated SQL
), | ||
], | ||
) | ||
def test_tpch_relational_to_sqlite_sql( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant with test_pipeline_until_sql_tpch
Args: | ||
`node`: The current node of the relational tree. | ||
`filters`: The set of filter conditions to push down representing | ||
predicates from ancestor nodes that can be pushed this far down. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Returns:
part of the docstring is missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
Left some questions and minor suggestions
pushable_filters: set[RelationalExpression] | ||
match node: | ||
case Filter(): | ||
remaining_filters = set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not used. The if-else returns and not using remaining_filters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops, good catch
pushable_filters, remaining_filters = set(), filters | ||
else: | ||
# Otherwise push all filters that only depend on on columns in | ||
# the project that are pass-through of another column. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding a simple comment example will be useful
original_name: str | ||
if columns is None: | ||
for original_name in node.calc_terms: | ||
name = renamings.get(original_name, original_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that meant to be passing same variable twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, its saying that if original_name
is not a key in renamings
, then just use original_name
instead of doing the dictionary lookup.
""" | ||
A set of operators with the property that the output is null if any of the | ||
inputs are null. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be above the list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the reasoning is so that when you hover on the variable in VSCode the details come up on the tip tool .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the convention is for it to be below.
return False | ||
case _: | ||
raise NotImplementedError( | ||
f"transpose_expression not implemented for {expr.__class__.__name__}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same
f"transpose_expression not implemented for {expr.__class__.__name__}" | |
f"false_when_null_columns not implemented for {expr.__class__.__name__}" |
Translates an expression to rephrase its columns of a column mapping so | ||
that the expression can be pushed beneath the node that the column mapping | ||
came from (e.g. if the node renamed some columns, so the expression needs | ||
to switch from using the new names to the old names). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that the same as saying this?
Translates an expression to rephrase its columns of a column mapping so | |
that the expression can be pushed beneath the node that the column mapping | |
came from (e.g. if the node renamed some columns, so the expression needs | |
to switch from using the new names to the old names). | |
Rewrites an expression by replacing its column references based on a given column mapping, | |
allowing the expression to be pushed beneath the node that introduced the mapping. | |
For example, if a node renamed columns, this function translates the expression | |
from the new column names back to the original names. |
expressions. | ||
|
||
Returns: | ||
The transposed expression. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The transposed expression. | |
The transposed expression with updated column references. |
if new_column.input_name is not None: | ||
new_column = new_column.with_input(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my understanding, why remove input name from the column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the input name is used by join to say which input the column comes from, but now we are pushing into the input itself so that input index is no longer valid.
@@ -375,6 +375,38 @@ def defog_test_data( | |||
return request.param | |||
|
|||
|
|||
def test_defog_until_sql( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of the test is confusing. Doesn't match the docstring description
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Co-authored-by: Hadia Ahmed <[email protected]>
Co-authored-by: Hadia Ahmed <[email protected]>
Resolves #302. Set up a relational optimization pipeline after relational conversion, and add an optimization to push filters down as far as possible. Also allows conditions to be pushed into the RHS of a left join (turning it into an inner join) if the RHS being null implies that the filter would output False. Additionally, converted many of the e2e tests (tpch queries, correlated queries) to also test the generated SQL.