Skip to content

Commit

Permalink
[Enhancement][FlatJson] Improve flat json performace and extract stra…
Browse files Browse the repository at this point in the history
…tegy (backport #50696) (#51215)

Co-authored-by: Seaven <[email protected]>
  • Loading branch information
mergify[bot] and Seaven committed Sep 20, 2024
1 parent 458756b commit 203310f
Show file tree
Hide file tree
Showing 29 changed files with 685 additions and 358 deletions.
26 changes: 5 additions & 21 deletions be/src/column/column_access_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "column/column_access_path.h"

#include <cstddef>
#include <string>
#include <utility>
#include <vector>

Expand All @@ -31,6 +32,7 @@
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "types/logical_type.h"
#include "util/json_flattener.h"

namespace starrocks {

Expand Down Expand Up @@ -209,39 +211,21 @@ StatusOr<std::unique_ptr<ColumnAccessPath>> ColumnAccessPath::create(const TAcce
return std::move(p);
}

std::pair<std::string, std::string> _split_path(const std::string& path) {
size_t pos = 0;
if (path.starts_with("\"")) {
pos = path.find('\"', 1);
DCHECK(pos != std::string::npos);
}
pos = path.find('.', pos);
std::string key;
std::string next;
if (pos == std::string::npos) {
key = path;
} else {
key = path.substr(0, pos);
next = path.substr(pos + 1);
}

return {key, next};
}

ColumnAccessPath* insert_json_path_impl(const std::string& path, ColumnAccessPath* root) {
if (path.empty()) {
return root;
}

auto [key, next] = _split_path(path);
auto [key_view, next] = JsonFlatPath::split_path(path);
auto key = std::string(key_view);
auto child = root->get_child(key);
if (child == nullptr) {
auto n = ColumnAccessPath::create(TAccessPathType::FIELD, key, 0, root->absolute_path());
DCHECK(n.ok());
root->children().emplace_back(std::move(n.value()));
child = root->children().back().get();
}
return insert_json_path_impl(next, child);
return insert_json_path_impl(std::string(next), child);
}

void ColumnAccessPath::insert_json_path(ColumnAccessPath* root, LogicalType type, const std::string& path) {
Expand Down
22 changes: 22 additions & 0 deletions be/src/column/json_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,28 @@ bool JsonColumn::has_flat_column(const std::string& path) const {
return false;
}

bool JsonColumn::is_equallity_schema(const Column* other) const {
if (!other->is_json()) {
return false;
}
auto* other_json = down_cast<const JsonColumn*>(other);
if (this->is_flat_json() && other_json->is_flat_json()) {
if (this->_flat_column_paths.size() != other_json->_flat_column_paths.size()) {
return false;
}
for (size_t i = 0; i < this->_flat_column_paths.size(); i++) {
if (this->_flat_column_paths[i] != other_json->_flat_column_paths[i]) {
return false;
}
if (this->_flat_column_types[i] != other_json->_flat_column_types[i]) {
return false;
}
}
return _flat_columns.size() == other_json->_flat_columns.size();
}
return !this->is_flat_json() && !other_json->is_flat_json();
}

std::string JsonColumn::debug_flat_paths() const {
if (_flat_column_paths.empty()) {
return "[]";
Expand Down
2 changes: 2 additions & 0 deletions be/src/column/json_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ class JsonColumn final : public ColumnFactory<ObjectColumn<JsonValue>, JsonColum
void set_flat_columns(const std::vector<std::string>& paths, const std::vector<LogicalType>& types,
const Columns& flat_columns);

bool is_equallity_schema(const Column* other) const;

std::string debug_flat_paths() const;

private:
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,9 @@ CONF_mDouble(json_flat_sparsity_factor, "0.9");
// the maximum number of extracted JSON sub-field
CONF_mInt32(json_flat_column_max, "100");

// for whitelist on flat json remain data, max set 1kb
CONF_mInt32(json_flat_remain_filter_max_bytes, "1024");

// Allowable intervals for continuous generation of pk dumps
// Disable when pk_dump_interval_seconds <= 0
CONF_mInt64(pk_dump_interval_seconds, "3600"); // 1 hour
Expand Down
8 changes: 4 additions & 4 deletions be/src/connector/lake_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,9 @@ Status LakeDataSource::build_scan_range(RuntimeState* state) {
}

void LakeDataSource::init_counter(RuntimeState* state) {
_access_path_hits_counter = ADD_COUNTER(_runtime_profile, "AccessPathHits", TUnit::UNIT);
_access_path_unhits_counter = ADD_COUNTER(_runtime_profile, "AccessPathUnhits", TUnit::UNIT);

_bytes_read_counter = ADD_COUNTER(_runtime_profile, "BytesRead", TUnit::BYTES);
_rows_read_counter = ADD_COUNTER(_runtime_profile, "RowsRead", TUnit::UNIT);

Expand Down Expand Up @@ -580,9 +583,6 @@ void LakeDataSource::init_counter(RuntimeState* state) {
_prefetch_hit_counter = ADD_CHILD_COUNTER(_runtime_profile, "PrefetchHitCount", TUnit::UNIT, io_statistics_name);
_prefetch_wait_finish_timer = ADD_CHILD_TIMER(_runtime_profile, "PrefetchWaitFinishTime", io_statistics_name);
_prefetch_pending_timer = ADD_CHILD_TIMER(_runtime_profile, "PrefetchPendingTime", io_statistics_name);

_access_path_hits_counter = ADD_COUNTER(_runtime_profile, "AccessPathHits", TUnit::UNIT);
_access_path_unhits_counter = ADD_COUNTER(_runtime_profile, "AccessPathUnhits", TUnit::UNIT);
}

void LakeDataSource::update_realtime_counter(Chunk* chunk) {
Expand Down Expand Up @@ -703,7 +703,7 @@ void LakeDataSource::update_counter() {
_runtime_state->update_num_datacache_count(1);
}

if (_reader->stats().flat_json_hits.size() > 0) {
if (_reader->stats().flat_json_hits.size() > 0 || _reader->stats().merge_json_hits.size() > 0) {
std::string access_path_hits = "AccessPathHits";
int64_t total = 0;
for (auto& [k, v] : _reader->stats().flat_json_hits) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/exprs/json_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,9 @@ static StatusOr<ColumnPtr> _extract_with_hyper(NativeJsonState* state, const std
state->real_path.paths.emplace_back(p);
continue;
}
if (p.key.find('.') != std::string::npos) {
in_flat = false;
}
if (in_flat) {
flat_path += "." + p.key;
if (p.array_selector->type != NONE) {
Expand Down
30 changes: 29 additions & 1 deletion be/src/storage/chunk_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "column/chunk.h"
#include "column/column_helper.h"
#include "column/column_pool.h"
#include "column/json_column.h"
#include "column/map_column.h"
#include "column/schema.h"
#include "column/struct_column.h"
Expand Down Expand Up @@ -545,14 +546,41 @@ void ChunkAccumulator::finalize() {
_accumulate_count = 0;
}

bool ChunkPipelineAccumulator::_check_json_schema_equallity(const Chunk* one, const Chunk* two) {
if (one->num_columns() != two->num_columns()) {
return false;
}

for (size_t i = 0; i < one->num_columns(); i++) {
auto& c1 = one->get_column_by_index(i);
auto& c2 = two->get_column_by_index(i);
const auto* a1 = ColumnHelper::get_data_column(c1.get());
const auto* a2 = ColumnHelper::get_data_column(c2.get());

if (a1->is_json() && a2->is_json()) {
auto json1 = down_cast<const JsonColumn*>(a1);
if (!json1->is_equallity_schema(a2)) {
return false;
}
} else if (a1->is_json() || a2->is_json()) {
// never hit
DCHECK_EQ(a1->is_json(), a2->is_json());
return false;
}
}

return true;
}

void ChunkPipelineAccumulator::push(const ChunkPtr& chunk) {
chunk->check_or_die();
DCHECK(_out_chunk == nullptr);
if (_in_chunk == nullptr) {
_in_chunk = chunk;
_mem_usage = chunk->bytes_usage();
} else if (_in_chunk->num_rows() + chunk->num_rows() > _max_size ||
_in_chunk->owner_info() != chunk->owner_info() || _in_chunk->owner_info().is_last_chunk()) {
_in_chunk->owner_info() != chunk->owner_info() || _in_chunk->owner_info().is_last_chunk() ||
!_check_json_schema_equallity(chunk.get(), _in_chunk.get())) {
_out_chunk = std::move(_in_chunk);
_in_chunk = chunk;
_mem_usage = chunk->bytes_usage();
Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/chunk_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ class ChunkPipelineAccumulator {
bool need_input() const;
bool is_finished() const;

private:
static bool _check_json_schema_equallity(const Chunk* one, const Chunk* two);

private:
static constexpr double LOW_WATERMARK_ROWS_RATE = 0.75; // 0.75 * chunk_size
#ifdef BE_TEST
Expand Down
42 changes: 37 additions & 5 deletions be/src/storage/meta_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "storage/meta_reader.h"

#include <sstream>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -237,12 +238,43 @@ Status SegmentMetaCollecter::_collect(const std::string& name, ColumnId cid, Col
return Status::NotSupported("Not Support Collect Meta: " + name);
}

std::string append_read_name(const ColumnReader* col_reader) {
std::stringstream stream;
if (col_reader->column_type() == LogicalType::TYPE_JSON) {
for (const auto& sub_reader : *col_reader->sub_readers()) {
stream << fmt::format("{}({}), ", sub_reader->name(), type_to_string(sub_reader->column_type()));
}
return stream.str().substr(0, stream.view().size() - 2);
}
if (col_reader->column_type() == LogicalType::TYPE_ARRAY) {
auto child = append_read_name((*col_reader->sub_readers())[0].get());
if (!child.empty()) {
stream << "[" << child << "]";
}
} else if (col_reader->column_type() == LogicalType::TYPE_MAP) {
auto child = append_read_name((*col_reader->sub_readers())[1].get());
if (!child.empty()) {
stream << "{" << child << "}";
}
} else if (col_reader->column_type() == LogicalType::TYPE_STRUCT) {
for (const auto& sub_reader : *col_reader->sub_readers()) {
auto child = append_read_name(sub_reader.get());
if (!child.empty()) {
stream << sub_reader->name() << "(" << child << "), ";
}
}
return stream.str().substr(0, stream.view().size() - 2);
}
return stream.str();
}

Status SegmentMetaCollecter::_collect_flat_json(ColumnId cid, Column* column) {
const ColumnReader* col_reader = _segment->column(cid);
if (col_reader == nullptr) {
return Status::NotFound("don't found column");
}
if (col_reader->column_type() != TYPE_JSON) {

if (!is_semi_type(col_reader->column_type())) {
return Status::InternalError("column type mismatch");
}

Expand All @@ -253,11 +285,11 @@ Status SegmentMetaCollecter::_collect_flat_json(ColumnId cid, Column* column) {

ArrayColumn* array_column = down_cast<ArrayColumn*>(column);
size_t size = array_column->offsets_column()->get_data().back();
for (const auto& sub_reader : *col_reader->sub_readers()) {
std::string str = fmt::format("{}({})", sub_reader->name(), type_to_string(sub_reader->column_type()));
array_column->elements_column()->append_datum(Slice(str));
auto res = append_read_name(col_reader);
if (!res.empty()) {
array_column->elements_column()->append_datum(Slice(res));
array_column->offsets_column()->append(size + 1);
}
array_column->offsets_column()->append(size + col_reader->sub_readers()->size());
return Status::OK();
}

Expand Down
5 changes: 0 additions & 5 deletions be/src/storage/olap_meta_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,6 @@ Status OlapMetaReader::_init_seg_meta_collecters(const OlapMetaReaderParams& par

Status OlapMetaReader::_get_segments(const TabletSharedPtr& tablet, const Version& version,
std::vector<SegmentSharedPtr>* segments) {
if (tablet->updates() != nullptr) {
LOG(INFO) << "Skipped Update tablet";
return Status::OK();
}

Status acquire_rowset_st;
{
std::shared_lock l(tablet->get_header_lock());
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/rowset/array_column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ StatusOr<std::unique_ptr<ColumnWriter>> create_array_column_writer(const ColumnW
element_options.need_zone_map = false;
element_options.need_bloom_filter = element_column.is_bf_column();
element_options.need_bitmap_index = element_column.has_bitmap_index();
element_options.need_flat = opts.need_flat;
element_options.is_compaction = opts.is_compaction;
if (element_column.type() == LogicalType::TYPE_ARRAY) {
if (element_options.need_bloom_filter) {
return Status::NotSupported("Do not support bloom filter for array type");
Expand Down
4 changes: 3 additions & 1 deletion be/src/storage/rowset/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,15 @@ class BloomFilter {
virtual void add_hash(uint64_t hash) = 0;
virtual bool test_hash(uint64_t hash) const = 0;

static uint32_t estimate_bytes(uint64_t n, double fpp) { return _optimal_bit_num(n, fpp) / 8 + 1; }

private:
// Compute the optimal bit number according to the following rule:
// m = -n * ln(fpp) / (ln(2) ^ 2)
// n: expected distinct record number
// fpp: false positive probablity
// the result will be power of 2
uint32_t _optimal_bit_num(uint64_t n, double fpp);
static uint32_t _optimal_bit_num(uint64_t n, double fpp);

protected:
// bloom filter data
Expand Down
Loading

0 comments on commit 203310f

Please sign in to comment.