diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index f0b123c6e6aec1c..ee4f19d9df38441 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -689,8 +689,15 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, // So here we should read version 5's columns and build a new row, which is // consists of version 6's update columns and version 5's origin columns // here we build 2 read plan for ori values and update values + + // - for fixed partial update, we should read update columns from current load's rowset + // and read missing columns from previous rowsets to create the final block + // - for flexible partial update, we should read all columns from current load's rowset + // and read non sort key columns from previous rowsets to create the final block + // So we only need to record rows to read for both mode partial update read_plan_ori.prepare_to_read(loc, pos); read_plan_update.prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos); + rsid_to_rowset[rowset_find->rowset_id()] = rowset_find; ++pos; // delete bitmap will be calculate when memtable flush and @@ -738,7 +745,9 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, rowset_schema, partial_update_info.get(), read_plan_ori, read_plan_update, rsid_to_rowset, &block)); } else { - // TODO(bobhan1): add support for flexible partial update + RETURN_IF_ERROR(generate_new_block_for_flexible_partial_update( + rowset_schema, partial_update_info.get(), read_plan_ori, read_plan_update, + rsid_to_rowset, &block)); } RETURN_IF_ERROR(sort_block(block, ordered_block)); RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block)); @@ -954,7 +963,6 @@ Status BaseTablet::generate_new_block_for_partial_update( *rowset_schema, missing_cids, partial_update_info->default_values, old_block, default_value_block)); } - auto mutable_default_value_columns = default_value_block.mutate_columns(); CHECK(update_rows >= old_rows); @@ -977,7 +985,7 @@ Status BaseTablet::generate_new_block_for_partial_update( } else if (old_block_delete_signs != nullptr && old_block_delete_signs[read_index_old[idx]] != 0) { if (rs_column.has_default_value()) { - mutable_column->insert_from(*mutable_default_value_columns[i], 0); + mutable_column->insert_from(*default_value_block.get_by_position(i).column, 0); } else if (rs_column.is_nullable()) { assert_cast( mutable_column.get()) @@ -996,6 +1004,106 @@ Status BaseTablet::generate_new_block_for_partial_update( return Status::OK(); } +Status BaseTablet::generate_new_block_for_flexible_partial_update( + TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info, + const FixedReadPlan& read_plan_ori, const FixedReadPlan& read_plan_update, + const std::map& rsid_to_rowset, + vectorized::Block* output_block) { + CHECK(output_block); + + const auto& non_sort_key_cids = partial_update_info->missing_cids; + std::vector all_cids(rowset_schema->num_columns()); + std::iota(all_cids.begin(), all_cids.end(), 0); + auto old_block = rowset_schema->create_block_by_cids(non_sort_key_cids); + auto update_block = rowset_schema->create_block_by_cids(all_cids); + + // rowid in the final block(start from 0, increase continuously) -> rowid to read in update_block + std::map read_index_update; + + // 1. read the current rowset first, if a row in the current rowset has delete sign mark + // we don't need to read values from old block for that row + RETURN_IF_ERROR(read_plan_update.read_columns_by_plan(*rowset_schema, all_cids, rsid_to_rowset, + update_block, &read_index_update)); + size_t update_rows = read_index_update.size(); + + // TODO(bobhan1): add the delete sign optimazation here + // // if there is sequence column in the table, we need to read the sequence column, + // // otherwise it may cause the merge-on-read based compaction policy to produce incorrect results + // const auto* __restrict new_block_delete_signs = + // rowset_schema->has_sequence_col() + // ? nullptr + // : get_delete_sign_column_data(update_block, update_rows); + + // 2. read previous rowsets + // rowid in the final block(start from 0, increase, may not continuous becasue we skip to read some rows) -> rowid to read in old_block + std::map read_index_old; + RETURN_IF_ERROR(read_plan_ori.read_columns_by_plan(*rowset_schema, non_sort_key_cids, + rsid_to_rowset, old_block, &read_index_old)); + size_t old_rows = read_index_old.size(); + DCHECK(update_rows >= old_rows); + const auto* __restrict old_block_delete_signs = + get_delete_sign_column_data(old_block, old_rows); + DCHECK(old_block_delete_signs != nullptr); + + // 3. build default value block + auto default_value_block = old_block.clone_empty(); + RETURN_IF_ERROR(BaseTablet::generate_default_value_block(*rowset_schema, non_sort_key_cids, + partial_update_info->default_values, + old_block, default_value_block)); + + // 4. build the final block + auto full_mutable_columns = output_block->mutate_columns(); + DCHECK(rowset_schema->has_skip_bitmap_col()); + auto skip_bitmap_col_idx = rowset_schema->skip_bitmap_col_idx(); + const std::vector* skip_bitmaps = + &(assert_cast( + update_block.get_by_position(skip_bitmap_col_idx).column->get_ptr().get()) + ->get_data()); + for (std::size_t cid {0}; cid < rowset_schema->num_columns(); cid++) { + if (cid < rowset_schema->num_key_columns()) { + full_mutable_columns[cid] = + std::move(*update_block.get_by_position(cid).column).mutate(); + } else { + const auto& rs_column = rowset_schema->column(cid); + auto col_uid = rs_column.unique_id(); + auto& cur_col = full_mutable_columns[cid]; + for (auto idx = 0; idx < update_rows; ++idx) { + if (skip_bitmaps->at(idx).contains(col_uid)) { + if (old_block_delete_signs != nullptr && + old_block_delete_signs[read_index_old[idx]] != 0) { + if (rs_column.has_default_value()) { + const auto& src_column = + *default_value_block + .get_by_position(cid - + rowset_schema->num_key_columns()) + .column; + cur_col->insert_from(src_column, 0); + } else if (rs_column.is_nullable()) { + assert_cast( + cur_col.get()) + ->insert_null_elements(1); + } else { + cur_col->insert_default(); + } + } else { + const auto& src_column = + *old_block.get_by_position(cid - rowset_schema->num_key_columns()) + .column; + cur_col->insert_from(src_column, idx); + } + } else { + cur_col->insert_from(*update_block.get_by_position(cid).column, idx); + } + } + } + DCHECK(full_mutable_columns[cid]->size() == update_rows); + } + + output_block->set_columns(std::move(full_mutable_columns)); + VLOG_DEBUG << "full block when publish: " << output_block->dump_data(); + return Status::OK(); +} + Status BaseTablet::commit_phase_update_delete_bitmap( const BaseTabletSPtr& tablet, const RowsetSharedPtr& rowset, RowsetIdUnorderedSet& pre_rowset_ids, DeleteBitmapPtr delete_bitmap, diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 8d7be77e66931ab..4ec88d3f15adf83 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -199,8 +199,13 @@ class BaseTablet { static Status generate_new_block_for_partial_update( TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info, - const FixedReadPlan& read_plan_ori, - const FixedReadPlan& read_plan_update, + const FixedReadPlan& read_plan_ori, const FixedReadPlan& read_plan_update, + const std::map& rsid_to_rowset, + vectorized::Block* output_block); + + static Status generate_new_block_for_flexible_partial_update( + TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info, + const FixedReadPlan& read_plan_ori, const FixedReadPlan& read_plan_update, const std::map& rsid_to_rowset, vectorized::Block* output_block); diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index 92683c5faf6fb0c..0c96e0539cb65f8 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -112,13 +112,13 @@ class FlexibleReadPlan { vectorized::Block& old_value_block, std::map>* read_index, const signed char* __restrict skip_map = nullptr) const; - Status fill_non_sort_key_columns(RowsetWriterContext* rowset_ctx, - const std::map& rsid_to_rowset, - const TabletSchema& tablet_schema, vectorized::Block& full_block, - const std::vector& use_default_or_null_flag, - bool has_default_or_nullable, const std::size_t segment_start_pos, - const std::size_t block_start_pos, const vectorized::Block* block, - std::vector* skip_bitmaps) const; + Status fill_non_sort_key_columns( + RowsetWriterContext* rowset_ctx, + const std::map& rsid_to_rowset, + const TabletSchema& tablet_schema, vectorized::Block& full_block, + const std::vector& use_default_or_null_flag, bool has_default_or_nullable, + const std::size_t segment_start_pos, const std::size_t block_start_pos, + const vectorized::Block* block, std::vector* skip_bitmaps) const; private: // rowset_id -> segment_id -> column unique id -> mappings diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 46a8b66461d0653..34751d271b5d4ff 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -789,7 +789,7 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content( use_default_or_null_flag, has_default_or_nullable, segment_start_pos, data.row_pos, data.block, skip_bitmaps)); - // TODO(bobhan1): should we replace the skip bitmap column with null literals here to reduce storage occupation? + // TODO(bobhan1): should we replace the skip bitmap column with empty bitmaps here to reduce storage occupation? // this column is not needed in read path for merge-on-write table // row column should be filled here diff --git a/regression-test/data/fault_injection_p0/flexible_partial_update/test1.json b/regression-test/data/fault_injection_p0/flexible_partial_update/test1.json new file mode 100644 index 000000000000000..fc1883761b5a132 --- /dev/null +++ b/regression-test/data/fault_injection_p0/flexible_partial_update/test1.json @@ -0,0 +1,6 @@ +{"k": 1, "v1": 1999} +{"k": 2, "v3": 3999, "v5": 2999} +{"k": 3, "v3": 3999, "v2": null} +{"k": 4, "v4": 4999, "v1": 1999, "v3": 3999} +{"k": 5, "v5": null} +{"k": 6, "v1": 1999, "v3": 3999} \ No newline at end of file diff --git a/regression-test/data/fault_injection_p0/flexible_partial_update/test2.json b/regression-test/data/fault_injection_p0/flexible_partial_update/test2.json new file mode 100644 index 000000000000000..fdf6e8dcba9cd07 --- /dev/null +++ b/regression-test/data/fault_injection_p0/flexible_partial_update/test2.json @@ -0,0 +1,7 @@ +{"k": 1, "v5": 5777, "v3": 3777, "v2": 2777} +{"k": 2, "v4": 4777, "v1": 1777, "v2": null} +{"k": 3, "v1": 1777, "v5": null} +{"k": 4, "v2": null, "v5": 5777} +{"k": 5, "v4": 4777, "v1": null, "v3": 3777} +{"k": 7, "v2": null, "v5": 5777, "v3": 3777} +{"k": 6, "v5": null, "v2": 2777, "v3": 3777} \ No newline at end of file diff --git a/regression-test/data/fault_injection_p0/flexible_partial_update/test_flexible_partial_update_publish_conflict.out b/regression-test/data/fault_injection_p0/flexible_partial_update/test_flexible_partial_update_publish_conflict.out new file mode 100644 index 000000000000000..4aee7da6abc714d --- /dev/null +++ b/regression-test/data/fault_injection_p0/flexible_partial_update/test_flexible_partial_update_publish_conflict.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 0 0 0 0 0 0 2 0 +1 1 1 1 1 1 0 2 0 +2 2 2 2 2 2 0 2 0 +3 3 3 3 3 3 0 2 0 +4 4 4 4 4 4 0 2 0 +5 5 5 5 5 5 0 2 0 + +-- !sql -- +0 0 0 0 0 0 0 2 0 +1 1999 2777 3777 1 5777 0 4 3 1,4,6 +2 1777 \N 3999 4777 2999 0 4 3 3,5,6 +3 1777 \N 3999 3 \N 0 4 4 2,3,4,6 +4 1999 \N 3999 4999 5777 0 4 4 1,3,4,6 +5 \N 5 3777 4777 \N 0 4 3 2,5,6 +6 1999 2777 3777 1234 \N 0 4 3 1,4,6 +7 \N \N 3777 1234 5777 0 4 3 1,4,6 + diff --git a/regression-test/suites/fault_injection_p0/flexible_partial_update/test_flexible_partial_update_publish_conflict.groovy b/regression-test/suites/fault_injection_p0/flexible_partial_update/test_flexible_partial_update_publish_conflict.groovy new file mode 100644 index 000000000000000..47c9742b98df51d --- /dev/null +++ b/regression-test/suites/fault_injection_p0/flexible_partial_update/test_flexible_partial_update_publish_conflict.groovy @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert + +suite("test_flexible_partial_update_publish_conflict", "nonConcurrent") { + + def tableName = "test_flexible_partial_update_publish_conflict" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE ${tableName} ( + `k` int(11) NULL, + `v1` BIGINT NULL, + `v2` BIGINT NULL DEFAULT "9876", + `v3` BIGINT NOT NULL, + `v4` BIGINT NOT NULL DEFAULT "1234", + `v5` BIGINT NULL + ) UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "store_row_column" = "false"); """ + + sql """insert into ${tableName} select number, number, number, number, number, number from numbers("number" = "6"); """ + order_qt_sql "select k,v1,v2,v3,v4,v5,__DORIS_DELETE_SIGN__,__DORIS_VERSION_COL__,BITMAP_COUNT(__DORIS_SKIP_BITMAP_COL__),BITMAP_TO_STRING(__DORIS_SKIP_BITMAP_COL__) from ${tableName};" + + def enable_publish_spin_wait = { + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + + def enable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + + def disable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // block the partial update in publish phase + enable_publish_spin_wait() + enable_block_in_publish() + def t1 = Thread.start { + streamLoad { + table "${tableName}" + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'strict_mode', 'false' + set 'unique_key_update_mode', 'FLEXIBLE_PARTIAL_UPDATE' + file "test1.json" + time 1000000 + } + } + + Thread.sleep(500) + + def t2 = Thread.start { + streamLoad { + table "${tableName}" + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'strict_mode', 'false' + set 'unique_key_update_mode', 'FLEXIBLE_PARTIAL_UPDATE' + file "test2.json" + time 1000000 + } + } + + Thread.sleep(500) + + disable_block_in_publish() + t1.join() + t2.join() + + order_qt_sql "select k,v1,v2,v3,v4,v5,__DORIS_DELETE_SIGN__,__DORIS_VERSION_COL__,BITMAP_COUNT(__DORIS_SKIP_BITMAP_COL__),BITMAP_TO_STRING(__DORIS_SKIP_BITMAP_COL__) from ${tableName};" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + sql "DROP TABLE IF EXISTS ${tableName};" +} diff --git a/regression-test/suites/unique_with_mow_p0/flexible_partial_update/test_flexible_partial_update.groovy b/regression-test/suites/unique_with_mow_p0/flexible_partial_update/test_flexible_partial_update.groovy index e8074ab4f2b4baa..543fa5681d28e2e 100644 --- a/regression-test/suites/unique_with_mow_p0/flexible_partial_update/test_flexible_partial_update.groovy +++ b/regression-test/suites/unique_with_mow_p0/flexible_partial_update/test_flexible_partial_update.groovy @@ -16,10 +16,6 @@ // under the License. suite('test_flexible_partial_update') { - - sql "set global enable_auto_analyze=false;" - sql "set global enable_auto_analyze_internal_catalog=false;" - sql "sync;" def tableName = "test_flexible_partial_update" sql """ DROP TABLE IF EXISTS ${tableName} """