Skip to content

Redesign projection pushdown to be more granular #19550

@adriangb

Description

@adriangb

The current physical layer projection pushdown rule is:

fn try_swapping_with_projection(
    &self,
    projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {

Operators like FilterExec then treat the pushdown as all or nothing:

// If the projection does not narrow the schema, we should not try to push it down:
if projection.expr().len() < projection.input().schema().fields().len() {
    // Each column in the predicate expression must exist after the projection.
    if let Some(new_predicate) =
        update_expr(self.predicate(), projection.expr(), false)?
    { ... {
 }

This poses a problem for #19387 for cases like complex_function(struct_col['field']).
To do the optimal thing I think we have to change the rule to operate on a per-expression per-subtree basis.
That is: for each projection expression look for subtrees that would be beneficial to push down, split the projection into two and push down subtrees of each expression. For example, given the data:

copy (select {email: '[email protected]', address: '123 Main St, NYC'} as user) to 'test.parquet';
select lower(user['email'])
from 'test.parquet' t
where user['address'] ilike '%nyc%';

Will produce a plan of the form:

ProjectionExec: expr=[lower(get_field(user@0, email)) as lower(t.user[email])]
  RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1
    CoalesceBatchesExec: target_batch_size=8192
      DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[user], predicate = [get_field(user@0, address) ILIKE %nyc%], file_type=parquet

(The actual plan is a bit different because filters referencing struct columns are currently not pushed down but that's an orthogonal issue that I will expand on in a different issue and link to here)

Ideally we want a plan of the form:

ProjectionExec: expr=[lower("get_field(user@0, email)"@0) as lower(t.user[email])]
  RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1
    CoalesceBatchesExec: target_batch_size=8192
      DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[get_field(user@0, email) as get_field(user@0, email)], predicate = [get_field(user@0, address) ILIKE %nyc%], file_type=parquet

To achieve this I propose we add some helper methods to ProjectionExprs along the lines of:

fn split_expressions(&self, f: F) -> Result<(Option<ProjectionExprs>, Option<ProjectionExprs>)>
where
    F: Fn(Arc<dyn PhysicalExpr>) -> Result<bool> { ... }

Then operators would use this as:

fn try_swapping_with_projection(
    &self,
    projection: &ProjectionExec,
) -> Result<Option<Arc<dyn DataSource>>> {
    let (keep, pushdown) = projection.expr().split_expressions(|expr| expr.is_trivial())?;
    // Proceed as normal with `pushdown`, if keep is Some() wrap result in ProjectionExpr

Where PhysicalExpr::is_trivial comes from #19538

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions