Skip to content

Commit

Permalink
process sequence map col
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Aug 23, 2024
1 parent ae9c207 commit 92da59e
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 10 deletions.
7 changes: 7 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
_version = pschema.version();
if (pschema.has_unique_key_update_mode()) {
_unique_key_update_mode = pschema.unique_key_update_mode();
if (pschema.has_sequence_map_col_unique_id()) {
_sequence_map_col_uid = pschema.sequence_map_col_unique_id();
}
} else {
// for backward compatibility
if (pschema.has_partial_update() && pschema.partial_update()) {
Expand Down Expand Up @@ -217,6 +220,9 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
tschema.unique_key_update_mode);
}
}
if (tschema.__isset.sequence_map_col_unique_id) {
_sequence_map_col_uid = tschema.sequence_map_col_unique_id;
}
} else {
// for backward compatibility
if (tschema.__isset.is_partial_update && tschema.is_partial_update) {
Expand Down Expand Up @@ -311,6 +317,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id);
pschema->set_timestamp_ms(_timestamp_ms);
pschema->set_timezone(_timezone);
pschema->set_sequence_map_col_unique_id(_sequence_map_col_uid);
for (auto col : _partial_update_input_columns) {
*pschema->add_partial_update_input_columns() = col;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class OlapTableSchemaParam {
void set_timezone(std::string timezone) { _timezone = timezone; }
std::string timezone() const { return _timezone; }
bool is_strict_mode() const { return _is_strict_mode; }
int32_t sequence_map_col_uid() const { return _sequence_map_col_uid; }
std::string debug_string() const;

private:
Expand All @@ -129,6 +130,7 @@ class OlapTableSchemaParam {
int32_t _auto_increment_column_unique_id;
int64_t _timestamp_ms = 0;
std::string _timezone;
int32_t _sequence_map_col_uid {-1};
};

using OlapTableIndexTablets = TOlapTableIndexTablets;
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/partial_update_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ void PartialUpdateInfo::init(const TabletSchema& tablet_schema,
UniqueKeyUpdateModePB unique_key_update_mode,
const std::set<string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone,
const std::string& auto_increment_column, int64_t cur_max_version) {
const std::string& auto_increment_column, int32_t sequence_map_col_uid,
int64_t cur_max_version) {
partial_update_mode = unique_key_update_mode;
partial_update_input_columns = partial_update_cols;
max_version_in_flush_phase = cur_max_version;
sequence_map_col_unqiue_id = sequence_map_col_uid;
this->timestamp_ms = timestamp_ms;
this->timezone = timezone;
missing_cids.clear();
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, UniqueKeyUpdateModePB unique_key_update_mode,
const std::set<std::string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone,
const std::string& auto_increment_column, int64_t cur_max_version = -1);
const std::string& auto_increment_column, int32_t sequence_map_col_uid = -1,
int64_t cur_max_version = -1);
void to_pb(PartialUpdateInfoPB* partial_update_info) const;
void from_pb(PartialUpdateInfoPB* partial_update_info);
std::string summary() const;
Expand All @@ -55,6 +56,7 @@ struct PartialUpdateInfo {
bool is_flexible_partial_update() const {
return partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS;
}
int32_t sequence_map_col_uid() const { return sequence_map_col_unqiue_id; }

private:
void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema);
Expand All @@ -76,6 +78,8 @@ struct PartialUpdateInfo {

// default values for missing cids
std::vector<std::string> default_values;

int32_t sequence_map_col_unqiue_id {-1};
};

// used in mow partial update
Expand Down
11 changes: 9 additions & 2 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(
// a valid sequence column to encode the key with seq col.
vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
int32_t seq_col_unique_id = -1;
int32_t seq_map_col_unique_id = _opts.rowset_ctx->partial_update_info->sequence_map_col_uid();
bool schema_has_sequence_col = _tablet_schema->has_sequence_col();
if (schema_has_sequence_col) {
auto seq_col_idx = _tablet_schema->sequence_col_idx();
Expand Down Expand Up @@ -691,8 +692,14 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(
size_t delta_pos = block_pos - data.row_pos;
size_t segment_pos = segment_start_pos + delta_pos;
auto& skip_bitmap = skip_bitmaps->at(block_pos);
// always need to read delete sign column from old rows
skip_bitmap.add(_tablet_schema->delete_sign_idx());

// the hidden sequence column should have the same mark with sequence map column
if (seq_map_col_unique_id != -1) {
DCHECK(schema_has_sequence_col);
if (skip_bitmap.contains(seq_map_col_unique_id)) {
skip_bitmap.add(seq_col_unique_id);
}
}

std::string key = _full_encode_keys(key_columns, delta_pos);
_maybe_invalid_row_cache(key);
Expand Down
12 changes: 6 additions & 6 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,12 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
}
// set partial update columns info
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->unique_key_update_mode(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(), table_schema_param->timezone(),
table_schema_param->auto_increment_coulumn(),
_max_version_in_flush_phase);
_partial_update_info->init(
*_tablet_schema, table_schema_param->unique_key_update_mode(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(),
table_schema_param->timezone(), table_schema_param->auto_increment_coulumn(),
table_schema_param->sequence_map_col_uid(), _max_version_in_flush_phase);
}

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,12 @@ public TOlapTableSchemaParam createSchema(long dbId, OlapTable table, Analyzer a
// for backward compatibility
schemaParam.setIsPartialUpdate(uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS);
schemaParam.setUniqueKeyUpdateMode(uniqueKeyUpdateMode);
if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && table.getSequenceMapCol() != null) {
Column seqMapCol = table.getFullSchema().stream()
.filter(col -> col.getName().equalsIgnoreCase(table.getSequenceMapCol()))
.findFirst().get();
schemaParam.setSequenceMapColUniqueId(seqMapCol.getUniqueId());
}
if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) {
for (String s : partialUpdateInputColumns) {
schemaParam.addToPartialUpdateInputColumns(s);
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/descriptors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,6 @@ message POlapTableSchemaParam {
optional string timezone = 12;
optional int32 auto_increment_column_unique_id = 13 [default = -1];
optional UniqueKeyUpdateModePB unique_key_update_mode = 14 [default = UPSERT];
optional int32 sequence_map_col_unique_id = 15 [default = -1];
};

1 change: 1 addition & 0 deletions gensrc/thrift/Descriptors.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ struct TOlapTableSchemaParam {
12: optional i32 auto_increment_column_unique_id = -1
13: optional Types.TInvertedIndexFileStorageFormat inverted_index_file_storage_format = Types.TInvertedIndexFileStorageFormat.V1
14: optional Types.TUniqueKeyUpdateMode unique_key_update_mode = Types.TUniqueKeyUpdateMode.UPSERT
15: optional i32 sequence_map_col_unique_id = -1
}

struct TTabletLocation {
Expand Down

0 comments on commit 92da59e

Please sign in to comment.