Skip to content

Commit

Permalink
[BugFix] fix feedback npe when processing local agg
Browse files Browse the repository at this point in the history
Signed-off-by: stephen <[email protected]>
  • Loading branch information
stephen-shelby committed Nov 12, 2024
1 parent bb07a54 commit 5f803f5
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 7 deletions.
4 changes: 4 additions & 0 deletions be/src/exec/olap_scan_prepare.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,10 @@ Status ChunkPredicateBuilder<E, Type>::normalize_join_runtime_filter(const SlotD

if (rf->has_null()) continue;

if (rf->in_values() != nullptr) {
range->add_fixed_values(FILTER_IN, rf->in_values());
}

// If this column doesn't have other filter, we use join runtime filter
// to fast comput row range in storage engine
if (range->is_init_state()) {
Expand Down
17 changes: 17 additions & 0 deletions be/src/exec/pipeline/hashjoin/hash_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,23 @@ Status HashJoinBuildOperator::set_finishing(RuntimeState* state) {
auto&& in_filters = _partial_rf_merger->get_total_in_filters();
auto&& bloom_filters = _partial_rf_merger->get_total_bloom_filters();

if (_distribution_mode == TJoinDistributionMode::BROADCAST) {
auto in_filter_it = in_filters.begin();
auto bloom_filter_it = bloom_filters.begin();
while (in_filter_it != in_filters.end() && bloom_filter_it != bloom_filters.end()) {
DeferOp defer([&] {
++in_filter_it;
++bloom_filter_it;
});
auto* in_filter = *in_filter_it;
auto* bloom_filter = *bloom_filter_it;
if (in_filter == nullptr || bloom_filter == nullptr || bloom_filter->runtime_filter() == nullptr) {
continue;
}
bloom_filter->runtime_filter()->set_in_values(in_filter);
}
}

{
size_t total_bf_bytes = std::accumulate(bloom_filters.begin(), bloom_filters.end(), 0ull,
[](size_t total, RuntimeFilterBuildDescriptor* desc) -> size_t {
Expand Down
2 changes: 2 additions & 0 deletions be/src/exprs/in_const_predicate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@ class VectorizedInConstPredicate final : public Predicate {

bool is_use_array() const { return _array_size != 0; }

bool is_eq_null() const { return _eq_null; }

private:
// Note(yan): It's very tempting to use real bitmap, but the real scenario is, the array size is usually small like dict codes.
// To usse real bitmap involves bit shift, and/or ops, which eats much cpu cycles.
Expand Down
13 changes: 13 additions & 0 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "exprs/runtime_filter.h"

#include "types/logical_type_infra.h"
#include "in_const_predicate.hpp"
#include "util/compression/stream_compression.h"

namespace starrocks {
Expand Down Expand Up @@ -247,4 +248,16 @@ void JoinRuntimeFilter::clear_bf() {
_size = 0;
}

template <LogicalType Type>
void RuntimeBloomFilter<Type>::set_in_values(ExprContext* in_filter) {
if (auto* cast_in_filter = dynamic_cast<VectorizedInConstPredicate<Type>*>(in_filter->root());
cast_in_filter != nullptr) {
_in_values = cast_in_filter->get_all_values();
_is_in_null_safe = cast_in_filter->is_eq_null();
}
}
#define INSTANTIATE_RUNTIME_BLOOM_FILTER(TYPE) template class RuntimeBloomFilter<TYPE>;
APPLY_FOR_ALL_SCALAR_TYPE(INSTANTIATE_RUNTIME_BLOOM_FILTER);
#undef INSTANTIATE_RUNTIME_BLOOM_FILTER

} // namespace starrocks
45 changes: 45 additions & 0 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
#include "types/logical_type.h"
#include "serde/column_array_serde.h"

namespace starrocks {
// 0x1. initial global runtime filter impl
Expand All @@ -39,6 +40,7 @@ inline const constexpr uint8_t RF_VERSION = 0x2;
inline const constexpr uint8_t RF_VERSION_V2 = 0x3;
static_assert(sizeof(RF_VERSION_V2) == sizeof(RF_VERSION));
inline const constexpr int32_t RF_VERSION_SZ = sizeof(RF_VERSION_V2);
class ExprContext;

// compatible code from 2.5 to 3.0
// TODO: remove it
Expand Down Expand Up @@ -343,6 +345,10 @@ class JoinRuntimeFilter {
[](size_t total, const SimdBlockFilter& bf) -> size_t { return total + bf.get_alloc_size(); });
}

virtual void set_in_values(ExprContext* in_filter) = 0;
ColumnPtr in_values() const { return _in_values; }
bool is_in_null_safe() const { return _is_in_null_safe; }

// RuntimeFilter version
// if the RuntimeFilter is updated, the version will be updated as well,
// (usually used for TopN Filter)
Expand Down Expand Up @@ -393,6 +399,9 @@ class JoinRuntimeFilter {
size_t _rf_version = 0;
// local colocate filters is local filter we don't have to serialize them
std::vector<JoinRuntimeFilter*> _group_colocate_filters;

ColumnPtr _in_values = nullptr;
bool _is_in_null_safe = false;
};

template <typename ModuloFunc>
Expand Down Expand Up @@ -603,6 +612,8 @@ class RuntimeBloomFilter final : public JoinRuntimeFilter {
return ss.str();
}

void set_in_values(ExprContext* in_filter) override;

size_t max_serialized_size() const override {
size_t size = sizeof(Type) + JoinRuntimeFilter::max_serialized_size();
// _has_min_max. for backward compatibility.
Expand All @@ -616,6 +627,12 @@ class RuntimeBloomFilter final : public JoinRuntimeFilter {
size += sizeof(_max.size) + _max.size;
}

size += sizeof(bool); // has_in_values
if (_in_values != nullptr) {
size += serde::ColumnArraySerde::max_serialized_size(*_in_values);
size += sizeof(bool); // _is_in_null_safe
}

return size;
}

Expand Down Expand Up @@ -656,6 +673,21 @@ class RuntimeBloomFilter final : public JoinRuntimeFilter {
offset += _max.size;
}
}

if (_in_values != nullptr) {
const bool has_data = true;
memcpy(data + offset, &has_data, sizeof(has_data));
offset += sizeof(has_data);
auto* buf = data + offset;
offset += serde::ColumnArraySerde::serialize(*_in_values, buf) - buf;
memcpy(data + offset, &_is_in_null_safe, sizeof(_is_in_null_safe));
offset += sizeof(_is_in_null_safe);
} else {
const bool has_data = false;
memcpy(data + offset, &has_data, sizeof(has_data));
offset += sizeof(has_data);
}

return offset;
}

Expand Down Expand Up @@ -707,6 +739,19 @@ class RuntimeBloomFilter final : public JoinRuntimeFilter {
}
}

{
bool has_in_values = false;
memcpy(&has_in_values, data + offset, sizeof(has_in_values));
offset += sizeof(has_in_values);
if (has_in_values) {
_in_values = ColumnHelper::create_column(TypeDescriptor{Type}, true);
auto* buf = data + offset;
offset += serde::ColumnArraySerde::deserialize(buf, _in_values.get()) - buf;
memcpy(&_is_in_null_safe, data + offset, sizeof(_is_in_null_safe));
offset += sizeof(_is_in_null_safe);
}
}

return offset;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ protected void toThrift(TPlanNode msg) {
}
msg.agg_node.setUse_sort_agg(useSortAgg);
msg.agg_node.setUse_per_bucket_optimize(usePerBucketOptimize);

List<Expr> groupingExprs = aggInfo.getGroupingExprs();
if (groupingExprs != null) {
msg.agg_node.setGrouping_exprs(Expr.treesToThrift(groupingExprs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ public Void visitPhysicalHashAggregate(OptExpression optExpression, Void context
double inputRows = skeletonNode.getNodeExecStats().getPushRows();
double streamingOutputRows = skeletonNode.getNodeExecStats().getPullRows();
BlockingAggNode blockingAggNode = findBlockingAggNode(skeletonNode);
double blockingOutputRows = blockingAggNode.getNodeExecStats().getPullRows();
if (blockingOutputRows < inputRows && (inputRows / streamingOutputRows) < STREAMING_AGGREGATION_THRESHOLD
&& (inputRows / blockingOutputRows) > AGGREGATION_THRESHOLD) {
tuningGuides.addTuningGuide(skeletonNode.getNodeId(),
new StreamingAggTuningGuide((StreamingAggNode) skeletonNode));
if (blockingAggNode != null) {
double blockingOutputRows = blockingAggNode.getNodeExecStats().getPullRows();
if (blockingOutputRows < inputRows && (inputRows / streamingOutputRows) < STREAMING_AGGREGATION_THRESHOLD
&& (inputRows / blockingOutputRows) > AGGREGATION_THRESHOLD) {
tuningGuides.addTuningGuide(skeletonNode.getNodeId(),
new StreamingAggTuningGuide((StreamingAggNode) skeletonNode));
}
}
}
visit(optExpression, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public SkeletonNode visitPhysicalJoin(OptExpression optExpression, SkeletonNode
public SkeletonNode visitPhysicalHashAggregate(OptExpression optExpression, SkeletonNode parent) {
int planNodeId = optExpression.getOp().getPlanNodeId();
PhysicalHashAggregateOperator aggOperator = (PhysicalHashAggregateOperator) optExpression.getOp();
if (aggOperator.getType().isAnyGlobal()) {
if (aggOperator.getType().isAnyGlobal() || !aggOperator.isSplit()) {
BlockingAggNode node = new BlockingAggNode(optExpression, nodeExecStatsMap.get(planNodeId), parent);
visitChildren(node, optExpression.getInputs());
fillNodeId(optExpression.getOp(), node);
Expand Down

0 comments on commit 5f803f5

Please sign in to comment.