Skip to content

Commit

Permalink
add alignment process in publish phase for flexible partial update
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Aug 23, 2024
1 parent 3fb9643 commit 98c6d9d
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 17 deletions.
114 changes: 111 additions & 3 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -689,8 +689,15 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
// So here we should read version 5's columns and build a new row, which is
// consists of version 6's update columns and version 5's origin columns
// here we build 2 read plan for ori values and update values

// - for fixed partial update, we should read update columns from current load's rowset
// and read missing columns from previous rowsets to create the final block
// - for flexible partial update, we should read all columns from current load's rowset
// and read non sort key columns from previous rowsets to create the final block
// So we only need to record rows to read for both mode partial update
read_plan_ori.prepare_to_read(loc, pos);
read_plan_update.prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos);

rsid_to_rowset[rowset_find->rowset_id()] = rowset_find;
++pos;
// delete bitmap will be calculate when memtable flush and
Expand Down Expand Up @@ -738,7 +745,9 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
rowset_schema, partial_update_info.get(), read_plan_ori, read_plan_update,
rsid_to_rowset, &block));
} else {
// TODO(bobhan1): add support for flexible partial update
RETURN_IF_ERROR(generate_new_block_for_flexible_partial_update(
rowset_schema, partial_update_info.get(), read_plan_ori, read_plan_update,
rsid_to_rowset, &block));
}
RETURN_IF_ERROR(sort_block(block, ordered_block));
RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block));
Expand Down Expand Up @@ -954,7 +963,6 @@ Status BaseTablet::generate_new_block_for_partial_update(
*rowset_schema, missing_cids, partial_update_info->default_values, old_block,
default_value_block));
}
auto mutable_default_value_columns = default_value_block.mutate_columns();

CHECK(update_rows >= old_rows);

Expand All @@ -977,7 +985,7 @@ Status BaseTablet::generate_new_block_for_partial_update(
} else if (old_block_delete_signs != nullptr &&
old_block_delete_signs[read_index_old[idx]] != 0) {
if (rs_column.has_default_value()) {
mutable_column->insert_from(*mutable_default_value_columns[i], 0);
mutable_column->insert_from(*default_value_block.get_by_position(i).column, 0);
} else if (rs_column.is_nullable()) {
assert_cast<vectorized::ColumnNullable*, TypeCheckOnRelease::DISABLE>(
mutable_column.get())
Expand All @@ -996,6 +1004,106 @@ Status BaseTablet::generate_new_block_for_partial_update(
return Status::OK();
}

Status BaseTablet::generate_new_block_for_flexible_partial_update(
TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info,
const FixedReadPlan& read_plan_ori, const FixedReadPlan& read_plan_update,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block* output_block) {
CHECK(output_block);

const auto& non_sort_key_cids = partial_update_info->missing_cids;
std::vector<uint32_t> all_cids(rowset_schema->num_columns());
std::iota(all_cids.begin(), all_cids.end(), 0);
auto old_block = rowset_schema->create_block_by_cids(non_sort_key_cids);
auto update_block = rowset_schema->create_block_by_cids(all_cids);

// rowid in the final block(start from 0, increase continuously) -> rowid to read in update_block
std::map<uint32_t, uint32_t> read_index_update;

// 1. read the current rowset first, if a row in the current rowset has delete sign mark
// we don't need to read values from old block for that row
RETURN_IF_ERROR(read_plan_update.read_columns_by_plan(*rowset_schema, all_cids, rsid_to_rowset,
update_block, &read_index_update));
size_t update_rows = read_index_update.size();

// TODO(bobhan1): add the delete sign optimazation here
// // if there is sequence column in the table, we need to read the sequence column,
// // otherwise it may cause the merge-on-read based compaction policy to produce incorrect results
// const auto* __restrict new_block_delete_signs =
// rowset_schema->has_sequence_col()
// ? nullptr
// : get_delete_sign_column_data(update_block, update_rows);

// 2. read previous rowsets
// rowid in the final block(start from 0, increase, may not continuous becasue we skip to read some rows) -> rowid to read in old_block
std::map<uint32_t, uint32_t> read_index_old;
RETURN_IF_ERROR(read_plan_ori.read_columns_by_plan(*rowset_schema, non_sort_key_cids,
rsid_to_rowset, old_block, &read_index_old));
size_t old_rows = read_index_old.size();
DCHECK(update_rows >= old_rows);
const auto* __restrict old_block_delete_signs =
get_delete_sign_column_data(old_block, old_rows);
DCHECK(old_block_delete_signs != nullptr);

// 3. build default value block
auto default_value_block = old_block.clone_empty();
RETURN_IF_ERROR(BaseTablet::generate_default_value_block(*rowset_schema, non_sort_key_cids,
partial_update_info->default_values,
old_block, default_value_block));

// 4. build the final block
auto full_mutable_columns = output_block->mutate_columns();
DCHECK(rowset_schema->has_skip_bitmap_col());
auto skip_bitmap_col_idx = rowset_schema->skip_bitmap_col_idx();
const std::vector<BitmapValue>* skip_bitmaps =
&(assert_cast<const vectorized::ColumnBitmap*, TypeCheckOnRelease::DISABLE>(
update_block.get_by_position(skip_bitmap_col_idx).column->get_ptr().get())
->get_data());
for (std::size_t cid {0}; cid < rowset_schema->num_columns(); cid++) {
if (cid < rowset_schema->num_key_columns()) {
full_mutable_columns[cid] =
std::move(*update_block.get_by_position(cid).column).mutate();
} else {
const auto& rs_column = rowset_schema->column(cid);
auto col_uid = rs_column.unique_id();
auto& cur_col = full_mutable_columns[cid];
for (auto idx = 0; idx < update_rows; ++idx) {
if (skip_bitmaps->at(idx).contains(col_uid)) {
if (old_block_delete_signs != nullptr &&
old_block_delete_signs[read_index_old[idx]] != 0) {
if (rs_column.has_default_value()) {
const auto& src_column =
*default_value_block
.get_by_position(cid -
rowset_schema->num_key_columns())
.column;
cur_col->insert_from(src_column, 0);
} else if (rs_column.is_nullable()) {
assert_cast<vectorized::ColumnNullable*, TypeCheckOnRelease::DISABLE>(
cur_col.get())
->insert_null_elements(1);
} else {
cur_col->insert_default();
}
} else {
const auto& src_column =
*old_block.get_by_position(cid - rowset_schema->num_key_columns())
.column;
cur_col->insert_from(src_column, idx);
}
} else {
cur_col->insert_from(*update_block.get_by_position(cid).column, idx);
}
}
}
DCHECK(full_mutable_columns[cid]->size() == update_rows);
}

output_block->set_columns(std::move(full_mutable_columns));
VLOG_DEBUG << "full block when publish: " << output_block->dump_data();
return Status::OK();
}

Status BaseTablet::commit_phase_update_delete_bitmap(
const BaseTabletSPtr& tablet, const RowsetSharedPtr& rowset,
RowsetIdUnorderedSet& pre_rowset_ids, DeleteBitmapPtr delete_bitmap,
Expand Down
9 changes: 7 additions & 2 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,13 @@ class BaseTablet {

static Status generate_new_block_for_partial_update(
TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info,
const FixedReadPlan& read_plan_ori,
const FixedReadPlan& read_plan_update,
const FixedReadPlan& read_plan_ori, const FixedReadPlan& read_plan_update,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block* output_block);

static Status generate_new_block_for_flexible_partial_update(
TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info,
const FixedReadPlan& read_plan_ori, const FixedReadPlan& read_plan_update,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block* output_block);

Expand Down
14 changes: 7 additions & 7 deletions be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ class FlexibleReadPlan {
vectorized::Block& old_value_block,
std::map<uint32_t, std::map<uint32_t, uint32_t>>* read_index,
const signed char* __restrict skip_map = nullptr) const;
Status fill_non_sort_key_columns(RowsetWriterContext* rowset_ctx,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
const TabletSchema& tablet_schema, vectorized::Block& full_block,
const std::vector<bool>& use_default_or_null_flag,
bool has_default_or_nullable, const std::size_t segment_start_pos,
const std::size_t block_start_pos, const vectorized::Block* block,
std::vector<BitmapValue>* skip_bitmaps) const;
Status fill_non_sort_key_columns(
RowsetWriterContext* rowset_ctx,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
const TabletSchema& tablet_schema, vectorized::Block& full_block,
const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
const std::size_t segment_start_pos, const std::size_t block_start_pos,
const vectorized::Block* block, std::vector<BitmapValue>* skip_bitmaps) const;

private:
// rowset_id -> segment_id -> column unique id -> mappings
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(
use_default_or_null_flag, has_default_or_nullable, segment_start_pos, data.row_pos,
data.block, skip_bitmaps));

// TODO(bobhan1): should we replace the skip bitmap column with null literals here to reduce storage occupation?
// TODO(bobhan1): should we replace the skip bitmap column with empty bitmaps here to reduce storage occupation?
// this column is not needed in read path for merge-on-write table

// row column should be filled here
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"k": 1, "v1": 1999}
{"k": 2, "v3": 3999, "v5": 2999}
{"k": 3, "v3": 3999, "v2": null}
{"k": 4, "v4": 4999, "v1": 1999, "v3": 3999}
{"k": 5, "v5": null}
{"k": 6, "v1": 1999, "v3": 3999}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"k": 1, "v5": 5777, "v3": 3777, "v2": 2777}
{"k": 2, "v4": 4777, "v1": 1777, "v2": null}
{"k": 3, "v1": 1777, "v5": null}
{"k": 4, "v2": null, "v5": 5777}
{"k": 5, "v4": 4777, "v1": null, "v3": 3777}
{"k": 7, "v2": null, "v5": 5777, "v3": 3777}
{"k": 6, "v5": null, "v2": 2777, "v3": 3777}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
0 0 0 0 0 0 0 2 0
1 1 1 1 1 1 0 2 0
2 2 2 2 2 2 0 2 0
3 3 3 3 3 3 0 2 0
4 4 4 4 4 4 0 2 0
5 5 5 5 5 5 0 2 0

-- !sql --
0 0 0 0 0 0 0 2 0
1 1999 2777 3777 1 5777 0 4 3 1,4,6
2 1777 \N 3999 4777 2999 0 4 3 3,5,6
3 1777 \N 3999 3 \N 0 4 4 2,3,4,6
4 1999 \N 3999 4999 5777 0 4 4 1,3,4,6
5 \N 5 3777 4777 \N 0 4 3 2,5,6
6 1999 2777 3777 1234 \N 0 4 3 1,4,6
7 \N \N 3777 1234 5777 0 4 3 1,4,6

Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.junit.Assert

suite("test_flexible_partial_update_publish_conflict", "nonConcurrent") {

def tableName = "test_flexible_partial_update_publish_conflict"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """ CREATE TABLE ${tableName} (
`k` int(11) NULL,
`v1` BIGINT NULL,
`v2` BIGINT NULL DEFAULT "9876",
`v3` BIGINT NOT NULL,
`v4` BIGINT NOT NULL DEFAULT "1234",
`v5` BIGINT NULL
) UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1
PROPERTIES(
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"store_row_column" = "false"); """

sql """insert into ${tableName} select number, number, number, number, number, number from numbers("number" = "6"); """
order_qt_sql "select k,v1,v2,v3,v4,v5,__DORIS_DELETE_SIGN__,__DORIS_VERSION_COL__,BITMAP_COUNT(__DORIS_SKIP_BITMAP_COL__),BITMAP_TO_STRING(__DORIS_SKIP_BITMAP_COL__) from ${tableName};"

def enable_publish_spin_wait = {
if (isCloudMode()) {
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
} else {
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait")
}
}

def enable_block_in_publish = {
if (isCloudMode()) {
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
} else {
GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
}
}

def disable_block_in_publish = {
if (isCloudMode()) {
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
} else {
GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block")
}
}

try {
GetDebugPoint().clearDebugPointsForAllFEs()
GetDebugPoint().clearDebugPointsForAllBEs()

// block the partial update in publish phase
enable_publish_spin_wait()
enable_block_in_publish()
def t1 = Thread.start {
streamLoad {
table "${tableName}"
set 'format', 'json'
set 'read_json_by_line', 'true'
set 'strict_mode', 'false'
set 'unique_key_update_mode', 'FLEXIBLE_PARTIAL_UPDATE'
file "test1.json"
time 1000000
}
}

Thread.sleep(500)

def t2 = Thread.start {
streamLoad {
table "${tableName}"
set 'format', 'json'
set 'read_json_by_line', 'true'
set 'strict_mode', 'false'
set 'unique_key_update_mode', 'FLEXIBLE_PARTIAL_UPDATE'
file "test2.json"
time 1000000
}
}

Thread.sleep(500)

disable_block_in_publish()
t1.join()
t2.join()

order_qt_sql "select k,v1,v2,v3,v4,v5,__DORIS_DELETE_SIGN__,__DORIS_VERSION_COL__,BITMAP_COUNT(__DORIS_SKIP_BITMAP_COL__),BITMAP_TO_STRING(__DORIS_SKIP_BITMAP_COL__) from ${tableName};"

} catch(Exception e) {
logger.info(e.getMessage())
throw e
} finally {
GetDebugPoint().clearDebugPointsForAllFEs()
GetDebugPoint().clearDebugPointsForAllBEs()
}

sql "DROP TABLE IF EXISTS ${tableName};"
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
// under the License.

suite('test_flexible_partial_update') {

sql "set global enable_auto_analyze=false;"
sql "set global enable_auto_analyze_internal_catalog=false;"
sql "sync;"

def tableName = "test_flexible_partial_update"
sql """ DROP TABLE IF EXISTS ${tableName} """
Expand Down

0 comments on commit 98c6d9d

Please sign in to comment.