Skip to content

Commit

Permalink
[Enhancement][FlatJson] Improve flat json performace and extract stra…
Browse files Browse the repository at this point in the history
…tegy (#50696)

Signed-off-by: Seaven <[email protected]>
(cherry picked from commit 45d72ac)

# Conflicts:
#	be/src/exec/olap_scan_prepare.cpp
  • Loading branch information
Seaven authored and mergify[bot] committed Sep 20, 2024
1 parent 468b8d7 commit 5014e52
Show file tree
Hide file tree
Showing 30 changed files with 876 additions and 358 deletions.
26 changes: 5 additions & 21 deletions be/src/column/column_access_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "column/column_access_path.h"

#include <cstddef>
#include <string>
#include <utility>
#include <vector>

Expand All @@ -31,6 +32,7 @@
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "types/logical_type.h"
#include "util/json_flattener.h"

namespace starrocks {

Expand Down Expand Up @@ -209,39 +211,21 @@ StatusOr<std::unique_ptr<ColumnAccessPath>> ColumnAccessPath::create(const TAcce
return std::move(p);
}

std::pair<std::string, std::string> _split_path(const std::string& path) {
size_t pos = 0;
if (path.starts_with("\"")) {
pos = path.find('\"', 1);
DCHECK(pos != std::string::npos);
}
pos = path.find('.', pos);
std::string key;
std::string next;
if (pos == std::string::npos) {
key = path;
} else {
key = path.substr(0, pos);
next = path.substr(pos + 1);
}

return {key, next};
}

ColumnAccessPath* insert_json_path_impl(const std::string& path, ColumnAccessPath* root) {
if (path.empty()) {
return root;
}

auto [key, next] = _split_path(path);
auto [key_view, next] = JsonFlatPath::split_path(path);
auto key = std::string(key_view);
auto child = root->get_child(key);
if (child == nullptr) {
auto n = ColumnAccessPath::create(TAccessPathType::FIELD, key, 0, root->absolute_path());
DCHECK(n.ok());
root->children().emplace_back(std::move(n.value()));
child = root->children().back().get();
}
return insert_json_path_impl(next, child);
return insert_json_path_impl(std::string(next), child);
}

void ColumnAccessPath::insert_json_path(ColumnAccessPath* root, LogicalType type, const std::string& path) {
Expand Down
22 changes: 22 additions & 0 deletions be/src/column/json_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,28 @@ bool JsonColumn::has_flat_column(const std::string& path) const {
return false;
}

bool JsonColumn::is_equallity_schema(const Column* other) const {
if (!other->is_json()) {
return false;
}
auto* other_json = down_cast<const JsonColumn*>(other);
if (this->is_flat_json() && other_json->is_flat_json()) {
if (this->_flat_column_paths.size() != other_json->_flat_column_paths.size()) {
return false;
}
for (size_t i = 0; i < this->_flat_column_paths.size(); i++) {
if (this->_flat_column_paths[i] != other_json->_flat_column_paths[i]) {
return false;
}
if (this->_flat_column_types[i] != other_json->_flat_column_types[i]) {
return false;
}
}
return _flat_columns.size() == other_json->_flat_columns.size();
}
return !this->is_flat_json() && !other_json->is_flat_json();
}

std::string JsonColumn::debug_flat_paths() const {
if (_flat_column_paths.empty()) {
return "[]";
Expand Down
2 changes: 2 additions & 0 deletions be/src/column/json_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ class JsonColumn final : public ColumnFactory<ObjectColumn<JsonValue>, JsonColum
void set_flat_columns(const std::vector<std::string>& paths, const std::vector<LogicalType>& types,
const Columns& flat_columns);

bool is_equallity_schema(const Column* other) const;

std::string debug_flat_paths() const;

private:
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,9 @@ CONF_mDouble(json_flat_sparsity_factor, "0.9");
// the maximum number of extracted JSON sub-field
CONF_mInt32(json_flat_column_max, "100");

// for whitelist on flat json remain data, max set 1kb
CONF_mInt32(json_flat_remain_filter_max_bytes, "1024");

// Allowable intervals for continuous generation of pk dumps
// Disable when pk_dump_interval_seconds <= 0
CONF_mInt64(pk_dump_interval_seconds, "3600"); // 1 hour
Expand Down
8 changes: 4 additions & 4 deletions be/src/connector/lake_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,9 @@ Status LakeDataSource::build_scan_range(RuntimeState* state) {
}

void LakeDataSource::init_counter(RuntimeState* state) {
_access_path_hits_counter = ADD_COUNTER(_runtime_profile, "AccessPathHits", TUnit::UNIT);
_access_path_unhits_counter = ADD_COUNTER(_runtime_profile, "AccessPathUnhits", TUnit::UNIT);

_bytes_read_counter = ADD_COUNTER(_runtime_profile, "BytesRead", TUnit::BYTES);
_rows_read_counter = ADD_COUNTER(_runtime_profile, "RowsRead", TUnit::UNIT);

Expand Down Expand Up @@ -580,9 +583,6 @@ void LakeDataSource::init_counter(RuntimeState* state) {
_prefetch_hit_counter = ADD_CHILD_COUNTER(_runtime_profile, "PrefetchHitCount", TUnit::UNIT, io_statistics_name);
_prefetch_wait_finish_timer = ADD_CHILD_TIMER(_runtime_profile, "PrefetchWaitFinishTime", io_statistics_name);
_prefetch_pending_timer = ADD_CHILD_TIMER(_runtime_profile, "PrefetchPendingTime", io_statistics_name);

_access_path_hits_counter = ADD_COUNTER(_runtime_profile, "AccessPathHits", TUnit::UNIT);
_access_path_unhits_counter = ADD_COUNTER(_runtime_profile, "AccessPathUnhits", TUnit::UNIT);
}

void LakeDataSource::update_realtime_counter(Chunk* chunk) {
Expand Down Expand Up @@ -703,7 +703,7 @@ void LakeDataSource::update_counter() {
_runtime_state->update_num_datacache_count(1);
}

if (_reader->stats().flat_json_hits.size() > 0) {
if (_reader->stats().flat_json_hits.size() > 0 || _reader->stats().merge_json_hits.size() > 0) {
std::string access_path_hits = "AccessPathHits";
int64_t total = 0;
for (auto& [k, v] : _reader->stats().flat_json_hits) {
Expand Down
191 changes: 191 additions & 0 deletions be/src/exec/olap_scan_prepare.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,191 @@ static bool get_predicate_value(ObjectPool* obj_pool, const SlotDescriptor& slot
return true;
}

<<<<<<< HEAD
template <LogicalType SlotType, typename RangeValueType>
void OlapScanConjunctsManager::normalize_in_or_equal_predicate(const SlotDescriptor& slot,
ColumnValueRange<RangeValueType>* range) {
=======
static std::vector<BoxedExprContext> build_expr_context_containers(const std::vector<ExprContext*>& expr_contexts) {
std::vector<BoxedExprContext> containers;
for (auto* expr_ctx : expr_contexts) {
containers.emplace_back(expr_ctx);
}
return containers;
}

static std::vector<BoxedExpr> build_raw_expr_containers(const std::vector<Expr*>& exprs) {
std::vector<BoxedExpr> containers;
for (auto* expr : exprs) {
containers.emplace_back(expr);
}
return containers;
}

template <bool Negative>
static TExprOpcode::type maybe_invert_in_and_equal_op(const TExprOpcode::type op) {
if constexpr (!Negative) {
return op;
} else {
switch (op) {
case TExprOpcode::FILTER_IN:
return TExprOpcode::FILTER_NOT_IN;
case TExprOpcode::FILTER_NOT_IN:
return TExprOpcode::FILTER_IN;
case TExprOpcode::EQ:
return TExprOpcode::NE;
case TExprOpcode::NE:
return TExprOpcode::EQ;

default:
return op;
}
}
}

// ------------------------------------------------------------------------------------
// ChunkPredicateBuilder
// ------------------------------------------------------------------------------------

BoxedExpr::BoxedExpr(Expr* root_expr) : root_expr(root_expr) {}
Expr* BoxedExpr::root() const {
return get_root_expr(root_expr);
}
StatusOr<ExprContext*> BoxedExpr::expr_context(ObjectPool* obj_pool, RuntimeState* state) const {
if (new_expr_ctx == nullptr) {
// Copy expr to prevent two ExprContexts from owning the same Expr, which will cause the same Expr to be
// closed twice.
// - The ExprContext in the `original _opts.conjunct_ctxs_ptr` will own an Expr and all its children.
// - The newly created ExprContext here will also own this Expr.
auto* new_expr = Expr::copy(obj_pool, root_expr);
new_expr_ctx = obj_pool->add(new ExprContext(new_expr));
RETURN_IF_ERROR(new_expr_ctx->prepare(state));
RETURN_IF_ERROR(new_expr_ctx->open(state));
}
return new_expr_ctx;
}

BoxedExprContext::BoxedExprContext(ExprContext* expr_ctx) : expr_ctx(expr_ctx) {}
Expr* BoxedExprContext::root() const {
return get_root_expr(expr_ctx);
}
StatusOr<ExprContext*> BoxedExprContext::expr_context(ObjectPool* obj_pool, RuntimeState* state) const {
return expr_ctx;
}

template <BoxedExprType E, CompoundNodeType Type>
ChunkPredicateBuilder<E, Type>::ChunkPredicateBuilder(const OlapScanConjunctsManagerOptions& opts, std::vector<E> exprs,
bool is_root_builder)
: _opts(opts), _exprs(std::move(exprs)), _is_root_builder(is_root_builder), _normalized_exprs(_exprs.size()) {}

template <BoxedExprType E, CompoundNodeType Type>
StatusOr<bool> ChunkPredicateBuilder<E, Type>::parse_conjuncts() {
RETURN_IF_ERROR(normalize_expressions());
RETURN_IF_ERROR(build_olap_filters());

// Only the root builder builds scan keys.
if (_is_root_builder) {
RETURN_IF_ERROR(build_scan_keys(_opts.scan_keys_unlimited, _opts.max_scan_key_num));
}

if (_opts.enable_column_expr_predicate) {
VLOG_FILE << "OlapScanConjunctsManager: enable_column_expr_predicate = true. push down column expr predicates";
RETURN_IF_ERROR(build_column_expr_predicates());
}

ASSIGN_OR_RETURN(auto normalized, _normalize_compound_predicates());
if (_is_root_builder) {
return normalized;
}
// Non-root builder return true only when all the child predicates are normalized.
return normalized && !SIMD::contain_zero(_normalized_exprs);
}

template <BoxedExprType E, CompoundNodeType Type>
StatusOr<bool> ChunkPredicateBuilder<E, Type>::_normalize_compound_predicates() {
const size_t num_preds = _exprs.size();
for (size_t i = 0; i < num_preds; i++) {
if (_normalized_exprs[i]) {
continue;
}

ASSIGN_OR_RETURN(const bool normalized, _normalize_compound_predicate(_exprs[i].root()));
if (!normalized && !_is_root_builder) {
return false;
}
_normalized_exprs[i] = normalized;
}

return true;
}

template <BoxedExprType E, CompoundNodeType Type>
StatusOr<bool> ChunkPredicateBuilder<E, Type>::_normalize_compound_predicate(const Expr* root_expr) {
auto process = [&]<CompoundNodeType ChildType>() -> StatusOr<bool> {
ChunkPredicateBuilder<BoxedExpr, ChildType> child_builder(
_opts, build_raw_expr_containers(root_expr->children()), false);
ASSIGN_OR_RETURN(const bool normalized, child_builder.parse_conjuncts());
if (normalized) {
_child_builders.emplace_back(std::move(child_builder));
}
return normalized;
};

if (TExprOpcode::COMPOUND_OR == root_expr->op()) {
if (!_opts.pred_tree_params.enable_or) {
return false;
}
return process.template operator()<CompoundNodeType::OR>();
}

if (TExprOpcode::COMPOUND_AND == root_expr->op()) {
return process.template operator()<CompoundNodeType::AND>();
}

return false;
}

template <BoxedExprType E, CompoundNodeType Type>
StatusOr<PredicateCompoundNode<Type>> ChunkPredicateBuilder<E, Type>::get_predicate_tree_root(
PredicateParser* parser, ColumnPredicatePtrs& col_preds_owner) {
auto compound_node = PredicateCompoundNode<Type>{};

const size_t start_i = col_preds_owner.size();
RETURN_IF_ERROR(_get_column_predicates(parser, col_preds_owner));
for (size_t i = start_i; i < col_preds_owner.size(); i++) {
compound_node.add_child(PredicateColumnNode{col_preds_owner[i].get()});
}

for (auto& child_builder : _child_builders) {
RETURN_IF_ERROR(std::visit(
[&](auto& c) {
ASSIGN_OR_RETURN(auto child_node, c.get_predicate_tree_root(parser, col_preds_owner));
compound_node.add_child(std::move(child_node));
return Status::OK();
},
child_builder));
}

return compound_node;
}

template <bool Negative>
static bool is_not_in(const auto* pred) {
if constexpr (Negative) {
return !pred->is_not_in();
} else {
return pred->is_not_in();
}
};

// clang-format off
template <BoxedExprType E, CompoundNodeType Type>
template <LogicalType SlotType, typename RangeValueType, bool Negative>
requires(!lt_is_date<SlotType>) Status ChunkPredicateBuilder<E, Type>::normalize_in_or_equal_predicate(
const SlotDescriptor& slot, ColumnValueRange<RangeValueType>* range) {
// clang-format on

>>>>>>> 45d72ace19 ([Enhancement][FlatJson] Improve flat json performace and extract strategy (#50696))
Status status;
const auto& conjunct_ctxs = (*conjunct_ctxs_ptr);
for (size_t i = 0; i < conjunct_ctxs.size(); i++) {
Expand Down Expand Up @@ -228,10 +410,19 @@ void OlapScanConjunctsManager::normalize_in_or_equal_predicate(const SlotDescrip
return;
}

// clang-format off
// explicit specialization for DATE.
<<<<<<< HEAD
template <>
void OlapScanConjunctsManager::normalize_in_or_equal_predicate<starrocks::TYPE_DATE, DateValue>(
const SlotDescriptor& slot, ColumnValueRange<DateValue>* range) {
=======
template <BoxedExprType E, CompoundNodeType Type>
template <LogicalType SlotType, typename RangeValueType, bool Negative>
requires lt_is_date<SlotType> Status ChunkPredicateBuilder<E, Type>::normalize_in_or_equal_predicate(
const SlotDescriptor& slot, ColumnValueRange<RangeValueType>* range) {
// clang-format on
>>>>>>> 45d72ace19 ([Enhancement][FlatJson] Improve flat json performace and extract strategy (#50696))
Status status;
const auto& conjunct_ctxs = (*conjunct_ctxs_ptr);

Expand Down
3 changes: 3 additions & 0 deletions be/src/exprs/json_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,9 @@ static StatusOr<ColumnPtr> _extract_with_hyper(NativeJsonState* state, const std
state->real_path.paths.emplace_back(p);
continue;
}
if (p.key.find('.') != std::string::npos) {
in_flat = false;
}
if (in_flat) {
flat_path += "." + p.key;
if (p.array_selector->type != NONE) {
Expand Down
Loading

0 comments on commit 5014e52

Please sign in to comment.