Skip to content

Commit

Permalink
[Feature] Add new window function 'approx_top_k'
Browse files Browse the repository at this point in the history
Signed-off-by: liuyehcf <[email protected]>
  • Loading branch information
liuyehcf committed Aug 22, 2023
1 parent 2980b0b commit a086a12
Show file tree
Hide file tree
Showing 22 changed files with 2,855 additions and 78 deletions.
20 changes: 18 additions & 2 deletions be/src/exec/analytor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ Status Analytor::prepare(RuntimeState* state, ObjectPool* pool, RuntimeProfile*
func = get_window_function(real_fn_name, arg_type.type, return_type.type, is_input_nullable, fn.binary_type,
state->func_version());
if (func == nullptr) {
return Status::InternalError(strings::Substitute("Invalid window function plan: $0", real_fn_name));
return Status::InternalError(strings::Substitute(
"Invalid window function plan: ($0, $1, $2, $3, $4, $5, $6)", real_fn_name, arg_type.type,
return_type.type, is_input_nullable, fn.binary_type, state->func_version()));
}
_agg_functions[i] = func;
_agg_fn_types[i] = {return_type, is_input_nullable, desc.nodes[0].is_nullable};
Expand Down Expand Up @@ -301,6 +303,7 @@ Status Analytor::open(RuntimeState* state) {
RETURN_IF_ERROR(Expr::open(_order_ctxs, state));
for (int i = 0; i < _agg_fn_ctxs.size(); ++i) {
RETURN_IF_ERROR(Expr::open(_agg_expr_ctxs[i], state));
RETURN_IF_ERROR(_evaluate_const_columns(i));
}

_has_udaf = std::any_of(_fns.begin(), _fns.end(),
Expand Down Expand Up @@ -462,7 +465,8 @@ void Analytor::create_agg_result_columns(int64_t chunk_size) {
// binary column cound't call resize method like Numeric Column,
// so we only reserve it.
if (_agg_fn_types[i].result_type.type == LogicalType::TYPE_CHAR ||
_agg_fn_types[i].result_type.type == LogicalType::TYPE_VARCHAR) {
_agg_fn_types[i].result_type.type == LogicalType::TYPE_VARCHAR ||
_agg_fn_types[i].result_type.type == LogicalType::TYPE_ARRAY) {
_result_window_columns[i]->reserve(chunk_size);
} else {
_result_window_columns[i]->resize(chunk_size);
Expand Down Expand Up @@ -515,6 +519,18 @@ Status Analytor::add_chunk(const ChunkPtr& chunk) {
return Status::OK();
}

Status Analytor::_evaluate_const_columns(int i) {
// used for const columns.
std::vector<ColumnPtr> const_columns;
const_columns.reserve(_agg_expr_ctxs[i].size());
for (auto& j : _agg_expr_ctxs[i]) {
ASSIGN_OR_RETURN(auto col, j->root()->evaluate_const(j));
const_columns.emplace_back(std::move(col));
}
_agg_fn_ctxs[i]->set_constant_columns(const_columns);
return Status::OK();
}

void Analytor::_append_column(size_t chunk_size, Column* dst_column, ColumnPtr& src_column) {
DCHECK(!(src_column->is_constant() && dst_column->is_constant() && (!dst_column->empty()) &&
(!src_column->empty()) && (src_column->compare_at(0, 0, *dst_column, 1) != 0)));
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/analytor.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ class Analytor final : public pipeline::ContextWithDependency {
bool _support_cumulative_algo = false;

private:
Status _evaluate_const_columns(int i);
// if src_column is const, but dst is not, unpack src_column then append. Otherwise just append
void _append_column(size_t chunk_size, Column* dst_column, ColumnPtr& src_column);
void _update_window_batch_normal(int64_t peer_group_start, int64_t peer_group_end, int64_t frame_start,
Expand Down
Loading

0 comments on commit a086a12

Please sign in to comment.