Skip to content

Commit

Permalink
[BugFix] fix feedback npe when processing local agg
Browse files Browse the repository at this point in the history
Signed-off-by: stephen <[email protected]>
  • Loading branch information
stephen-shelby committed Nov 12, 2024
1 parent bb07a54 commit c1f762d
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 7 deletions.
2 changes: 2 additions & 0 deletions be/src/exec/aggregate/aggregate_blocking_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ pipeline::OpFactories AggregateBlockingNode::decompose_to_pipeline(pipeline::Pip
if (!has_group_by_keys) {
ops_with_sink =
context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), ops_with_sink);
} else if (agg_node.local_not_split) {
// ignore
} else if (could_local_shuffle) {
ops_with_sink = try_interpolate_local_shuffle(ops_with_sink);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public class AggregationNode extends PlanNode {
// OlapScanNode and these PlanNodes have the same data partition policy.
private boolean identicallyDistributed = false;

private boolean locallNotSplit = false;

/**
* Create an agg node that is not an intermediate node.
* isIntermediate is true if it is a slave node in a 2-part agg plan.
Expand All @@ -121,6 +123,14 @@ public void unsetNeedsFinalize() {
updateplanNodeName();
}

public boolean isLocallNotSplit() {
return locallNotSplit;
}

public void setLocallNotSplit(boolean locallNotSplit) {
this.locallNotSplit = locallNotSplit;
}

/**
* Sets this node as a preaggregation. Only valid to call this if it is not marked
* as a preaggregation
Expand Down Expand Up @@ -235,7 +245,7 @@ protected void toThrift(TPlanNode msg) {
}
msg.agg_node.setUse_sort_agg(useSortAgg);
msg.agg_node.setUse_per_bucket_optimize(usePerBucketOptimize);

msg.agg_node.setLocal_not_split(locallNotSplit);
List<Expr> groupingExprs = aggInfo.getGroupingExprs();
if (groupingExprs != null) {
msg.agg_node.setGrouping_exprs(Expr.treesToThrift(groupingExprs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ public Void visitPhysicalHashAggregate(OptExpression optExpression, Void context
double inputRows = skeletonNode.getNodeExecStats().getPushRows();
double streamingOutputRows = skeletonNode.getNodeExecStats().getPullRows();
BlockingAggNode blockingAggNode = findBlockingAggNode(skeletonNode);
double blockingOutputRows = blockingAggNode.getNodeExecStats().getPullRows();
if (blockingOutputRows < inputRows && (inputRows / streamingOutputRows) < STREAMING_AGGREGATION_THRESHOLD
&& (inputRows / blockingOutputRows) > AGGREGATION_THRESHOLD) {
tuningGuides.addTuningGuide(skeletonNode.getNodeId(),
new StreamingAggTuningGuide((StreamingAggNode) skeletonNode));
if (blockingAggNode != null) {
double blockingOutputRows = blockingAggNode.getNodeExecStats().getPullRows();
if (blockingOutputRows < inputRows && (inputRows / streamingOutputRows) < STREAMING_AGGREGATION_THRESHOLD
&& (inputRows / blockingOutputRows) > AGGREGATION_THRESHOLD) {
tuningGuides.addTuningGuide(skeletonNode.getNodeId(),
new StreamingAggTuningGuide((StreamingAggNode) skeletonNode));
}
}
}
visit(optExpression, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public SkeletonNode visitPhysicalJoin(OptExpression optExpression, SkeletonNode
public SkeletonNode visitPhysicalHashAggregate(OptExpression optExpression, SkeletonNode parent) {
int planNodeId = optExpression.getOp().getPlanNodeId();
PhysicalHashAggregateOperator aggOperator = (PhysicalHashAggregateOperator) optExpression.getOp();
if (aggOperator.getType().isAnyGlobal()) {
if (aggOperator.getType().isAnyGlobal() || !aggOperator.isSplit()) {
BlockingAggNode node = new BlockingAggNode(optExpression, nodeExecStatsMap.get(planNodeId), parent);
visitChildren(node, optExpression.getInputs());
fillNodeId(optExpression.getOp(), node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2222,6 +2222,7 @@ public PlanFragment visitPhysicalHashAggregate(OptExpression optExpr, ExecPlan c
aggregationNode =
new AggregationNode(context.getNextNodeId(), inputFragment.getPlanRoot(),
aggInfo);
aggregationNode.setGlobalNotSplit(true);
} else {
aggregateExprList.forEach(FunctionCallExpr::setMergeAggFn);
AggregateInfo aggInfo = AggregateInfo.create(
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,8 @@ struct TAggregationNode {

// enable runtime limit, pipelines share one limit
29: optional bool enable_pipeline_share_limit = false

30: optional bool local_not_split = false
}

struct TRepeatNode {
Expand Down

0 comments on commit c1f762d

Please sign in to comment.