Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement][FlatJson] Improve flat json performace and extract strategy (backport #50696) #51215

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 5 additions & 21 deletions be/src/column/column_access_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "column/column_access_path.h"

#include <cstddef>
#include <string>
#include <utility>
#include <vector>

Expand All @@ -31,6 +32,7 @@
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "types/logical_type.h"
#include "util/json_flattener.h"

namespace starrocks {

Expand Down Expand Up @@ -209,39 +211,21 @@ StatusOr<std::unique_ptr<ColumnAccessPath>> ColumnAccessPath::create(const TAcce
return std::move(p);
}

std::pair<std::string, std::string> _split_path(const std::string& path) {
size_t pos = 0;
if (path.starts_with("\"")) {
pos = path.find('\"', 1);
DCHECK(pos != std::string::npos);
}
pos = path.find('.', pos);
std::string key;
std::string next;
if (pos == std::string::npos) {
key = path;
} else {
key = path.substr(0, pos);
next = path.substr(pos + 1);
}

return {key, next};
}

ColumnAccessPath* insert_json_path_impl(const std::string& path, ColumnAccessPath* root) {
if (path.empty()) {
return root;
}

auto [key, next] = _split_path(path);
auto [key_view, next] = JsonFlatPath::split_path(path);
auto key = std::string(key_view);
auto child = root->get_child(key);
if (child == nullptr) {
auto n = ColumnAccessPath::create(TAccessPathType::FIELD, key, 0, root->absolute_path());
DCHECK(n.ok());
root->children().emplace_back(std::move(n.value()));
child = root->children().back().get();
}
return insert_json_path_impl(next, child);
return insert_json_path_impl(std::string(next), child);
}

void ColumnAccessPath::insert_json_path(ColumnAccessPath* root, LogicalType type, const std::string& path) {
Expand Down
22 changes: 22 additions & 0 deletions be/src/column/json_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,28 @@ bool JsonColumn::has_flat_column(const std::string& path) const {
return false;
}

bool JsonColumn::is_equallity_schema(const Column* other) const {
if (!other->is_json()) {
return false;
}
auto* other_json = down_cast<const JsonColumn*>(other);
if (this->is_flat_json() && other_json->is_flat_json()) {
if (this->_flat_column_paths.size() != other_json->_flat_column_paths.size()) {
return false;
}
for (size_t i = 0; i < this->_flat_column_paths.size(); i++) {
if (this->_flat_column_paths[i] != other_json->_flat_column_paths[i]) {
return false;
}
if (this->_flat_column_types[i] != other_json->_flat_column_types[i]) {
return false;
}
}
return _flat_columns.size() == other_json->_flat_columns.size();
}
return !this->is_flat_json() && !other_json->is_flat_json();
}

std::string JsonColumn::debug_flat_paths() const {
if (_flat_column_paths.empty()) {
return "[]";
Expand Down
2 changes: 2 additions & 0 deletions be/src/column/json_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ class JsonColumn final : public ColumnFactory<ObjectColumn<JsonValue>, JsonColum
void set_flat_columns(const std::vector<std::string>& paths, const std::vector<LogicalType>& types,
const Columns& flat_columns);

bool is_equallity_schema(const Column* other) const;

std::string debug_flat_paths() const;

private:
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,9 @@ CONF_mDouble(json_flat_sparsity_factor, "0.9");
// the maximum number of extracted JSON sub-field
CONF_mInt32(json_flat_column_max, "100");

// for whitelist on flat json remain data, max set 1kb
CONF_mInt32(json_flat_remain_filter_max_bytes, "1024");

// Allowable intervals for continuous generation of pk dumps
// Disable when pk_dump_interval_seconds <= 0
CONF_mInt64(pk_dump_interval_seconds, "3600"); // 1 hour
Expand Down
8 changes: 4 additions & 4 deletions be/src/connector/lake_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,9 @@ Status LakeDataSource::build_scan_range(RuntimeState* state) {
}

void LakeDataSource::init_counter(RuntimeState* state) {
_access_path_hits_counter = ADD_COUNTER(_runtime_profile, "AccessPathHits", TUnit::UNIT);
_access_path_unhits_counter = ADD_COUNTER(_runtime_profile, "AccessPathUnhits", TUnit::UNIT);

_bytes_read_counter = ADD_COUNTER(_runtime_profile, "BytesRead", TUnit::BYTES);
_rows_read_counter = ADD_COUNTER(_runtime_profile, "RowsRead", TUnit::UNIT);

Expand Down Expand Up @@ -580,9 +583,6 @@ void LakeDataSource::init_counter(RuntimeState* state) {
_prefetch_hit_counter = ADD_CHILD_COUNTER(_runtime_profile, "PrefetchHitCount", TUnit::UNIT, io_statistics_name);
_prefetch_wait_finish_timer = ADD_CHILD_TIMER(_runtime_profile, "PrefetchWaitFinishTime", io_statistics_name);
_prefetch_pending_timer = ADD_CHILD_TIMER(_runtime_profile, "PrefetchPendingTime", io_statistics_name);

_access_path_hits_counter = ADD_COUNTER(_runtime_profile, "AccessPathHits", TUnit::UNIT);
_access_path_unhits_counter = ADD_COUNTER(_runtime_profile, "AccessPathUnhits", TUnit::UNIT);
}

void LakeDataSource::update_realtime_counter(Chunk* chunk) {
Expand Down Expand Up @@ -703,7 +703,7 @@ void LakeDataSource::update_counter() {
_runtime_state->update_num_datacache_count(1);
}

if (_reader->stats().flat_json_hits.size() > 0) {
if (_reader->stats().flat_json_hits.size() > 0 || _reader->stats().merge_json_hits.size() > 0) {
std::string access_path_hits = "AccessPathHits";
int64_t total = 0;
for (auto& [k, v] : _reader->stats().flat_json_hits) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/exprs/json_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,9 @@ static StatusOr<ColumnPtr> _extract_with_hyper(NativeJsonState* state, const std
state->real_path.paths.emplace_back(p);
continue;
}
if (p.key.find('.') != std::string::npos) {
in_flat = false;
}
if (in_flat) {
flat_path += "." + p.key;
if (p.array_selector->type != NONE) {
Expand Down
30 changes: 29 additions & 1 deletion be/src/storage/chunk_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "column/chunk.h"
#include "column/column_helper.h"
#include "column/column_pool.h"
#include "column/json_column.h"
#include "column/map_column.h"
#include "column/schema.h"
#include "column/struct_column.h"
Expand Down Expand Up @@ -545,14 +546,41 @@ void ChunkAccumulator::finalize() {
_accumulate_count = 0;
}

bool ChunkPipelineAccumulator::_check_json_schema_equallity(const Chunk* one, const Chunk* two) {
if (one->num_columns() != two->num_columns()) {
return false;
}

for (size_t i = 0; i < one->num_columns(); i++) {
auto& c1 = one->get_column_by_index(i);
auto& c2 = two->get_column_by_index(i);
const auto* a1 = ColumnHelper::get_data_column(c1.get());
const auto* a2 = ColumnHelper::get_data_column(c2.get());

if (a1->is_json() && a2->is_json()) {
auto json1 = down_cast<const JsonColumn*>(a1);
if (!json1->is_equallity_schema(a2)) {
return false;
}
} else if (a1->is_json() || a2->is_json()) {
// never hit
DCHECK_EQ(a1->is_json(), a2->is_json());
return false;
}
}

return true;
}

void ChunkPipelineAccumulator::push(const ChunkPtr& chunk) {
chunk->check_or_die();
DCHECK(_out_chunk == nullptr);
if (_in_chunk == nullptr) {
_in_chunk = chunk;
_mem_usage = chunk->bytes_usage();
} else if (_in_chunk->num_rows() + chunk->num_rows() > _max_size ||
_in_chunk->owner_info() != chunk->owner_info() || _in_chunk->owner_info().is_last_chunk()) {
_in_chunk->owner_info() != chunk->owner_info() || _in_chunk->owner_info().is_last_chunk() ||
!_check_json_schema_equallity(chunk.get(), _in_chunk.get())) {
_out_chunk = std::move(_in_chunk);
_in_chunk = chunk;
_mem_usage = chunk->bytes_usage();
Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/chunk_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ class ChunkPipelineAccumulator {
bool need_input() const;
bool is_finished() const;

private:
static bool _check_json_schema_equallity(const Chunk* one, const Chunk* two);

private:
static constexpr double LOW_WATERMARK_ROWS_RATE = 0.75; // 0.75 * chunk_size
#ifdef BE_TEST
Expand Down
42 changes: 37 additions & 5 deletions be/src/storage/meta_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "storage/meta_reader.h"

#include <sstream>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -237,12 +238,43 @@ Status SegmentMetaCollecter::_collect(const std::string& name, ColumnId cid, Col
return Status::NotSupported("Not Support Collect Meta: " + name);
}

std::string append_read_name(const ColumnReader* col_reader) {
std::stringstream stream;
if (col_reader->column_type() == LogicalType::TYPE_JSON) {
for (const auto& sub_reader : *col_reader->sub_readers()) {
stream << fmt::format("{}({}), ", sub_reader->name(), type_to_string(sub_reader->column_type()));
}
return stream.str().substr(0, stream.view().size() - 2);
}
if (col_reader->column_type() == LogicalType::TYPE_ARRAY) {
auto child = append_read_name((*col_reader->sub_readers())[0].get());
if (!child.empty()) {
stream << "[" << child << "]";
}
} else if (col_reader->column_type() == LogicalType::TYPE_MAP) {
auto child = append_read_name((*col_reader->sub_readers())[1].get());
if (!child.empty()) {
stream << "{" << child << "}";
}
} else if (col_reader->column_type() == LogicalType::TYPE_STRUCT) {
for (const auto& sub_reader : *col_reader->sub_readers()) {
auto child = append_read_name(sub_reader.get());
if (!child.empty()) {
stream << sub_reader->name() << "(" << child << "), ";
}
}
return stream.str().substr(0, stream.view().size() - 2);
}
return stream.str();
}

Status SegmentMetaCollecter::_collect_flat_json(ColumnId cid, Column* column) {
const ColumnReader* col_reader = _segment->column(cid);
if (col_reader == nullptr) {
return Status::NotFound("don't found column");
}
if (col_reader->column_type() != TYPE_JSON) {

if (!is_semi_type(col_reader->column_type())) {
return Status::InternalError("column type mismatch");
}

Expand All @@ -253,11 +285,11 @@ Status SegmentMetaCollecter::_collect_flat_json(ColumnId cid, Column* column) {

ArrayColumn* array_column = down_cast<ArrayColumn*>(column);
size_t size = array_column->offsets_column()->get_data().back();
for (const auto& sub_reader : *col_reader->sub_readers()) {
std::string str = fmt::format("{}({})", sub_reader->name(), type_to_string(sub_reader->column_type()));
array_column->elements_column()->append_datum(Slice(str));
auto res = append_read_name(col_reader);
if (!res.empty()) {
array_column->elements_column()->append_datum(Slice(res));
array_column->offsets_column()->append(size + 1);
}
array_column->offsets_column()->append(size + col_reader->sub_readers()->size());
return Status::OK();
}

Expand Down
5 changes: 0 additions & 5 deletions be/src/storage/olap_meta_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,6 @@ Status OlapMetaReader::_init_seg_meta_collecters(const OlapMetaReaderParams& par

Status OlapMetaReader::_get_segments(const TabletSharedPtr& tablet, const Version& version,
std::vector<SegmentSharedPtr>* segments) {
if (tablet->updates() != nullptr) {
LOG(INFO) << "Skipped Update tablet";
return Status::OK();
}

Status acquire_rowset_st;
{
std::shared_lock l(tablet->get_header_lock());
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/rowset/array_column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ StatusOr<std::unique_ptr<ColumnWriter>> create_array_column_writer(const ColumnW
element_options.need_zone_map = false;
element_options.need_bloom_filter = element_column.is_bf_column();
element_options.need_bitmap_index = element_column.has_bitmap_index();
element_options.need_flat = opts.need_flat;
element_options.is_compaction = opts.is_compaction;
if (element_column.type() == LogicalType::TYPE_ARRAY) {
if (element_options.need_bloom_filter) {
return Status::NotSupported("Do not support bloom filter for array type");
Expand Down
4 changes: 3 additions & 1 deletion be/src/storage/rowset/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,15 @@ class BloomFilter {
virtual void add_hash(uint64_t hash) = 0;
virtual bool test_hash(uint64_t hash) const = 0;

static uint32_t estimate_bytes(uint64_t n, double fpp) { return _optimal_bit_num(n, fpp) / 8 + 1; }

private:
// Compute the optimal bit number according to the following rule:
// m = -n * ln(fpp) / (ln(2) ^ 2)
// n: expected distinct record number
// fpp: false positive probablity
// the result will be power of 2
uint32_t _optimal_bit_num(uint64_t n, double fpp);
static uint32_t _optimal_bit_num(uint64_t n, double fpp);

protected:
// bloom filter data
Expand Down
Loading
Loading