Skip to content

Commit

Permalink
use enum and rename unique key update mode
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Aug 23, 2024
1 parent 98c6d9d commit ae9c207
Show file tree
Hide file tree
Showing 22 changed files with 170 additions and 128 deletions.
56 changes: 45 additions & 11 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <gen_cpp/Partitions_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/descriptors.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>

#include <algorithm>
Expand Down Expand Up @@ -117,10 +118,18 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
_db_id = pschema.db_id();
_table_id = pschema.table_id();
_version = pschema.version();
_is_fixed_partial_update = pschema.partial_update();
_is_flexible_partial_update = pschema.is_flexible_partial_update();
if (pschema.has_unique_key_update_mode()) {
_unique_key_update_mode = pschema.unique_key_update_mode();
} else {
// for backward compatibility
if (pschema.has_partial_update() && pschema.partial_update()) {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
} else {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT;
}
}
_is_strict_mode = pschema.is_strict_mode();
if (_is_fixed_partial_update) {
if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
_auto_increment_column = pschema.auto_increment_column();
if (!_auto_increment_column.empty() && pschema.auto_increment_column_unique_id() == -1) {
return Status::InternalError(
Expand Down Expand Up @@ -153,7 +162,7 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
index->index_id = p_index.id();
index->schema_hash = p_index.schema_hash();
for (const auto& pcolumn_desc : p_index.columns_desc()) {
if (!_is_fixed_partial_update ||
if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS ||
_partial_update_input_columns.contains(pcolumn_desc.name())) {
auto it = slots_map.find(std::make_pair(
to_lower(pcolumn_desc.name()),
Expand Down Expand Up @@ -187,13 +196,39 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
_db_id = tschema.db_id;
_table_id = tschema.table_id;
_version = tschema.version;
_is_fixed_partial_update = tschema.is_partial_update;
_is_flexible_partial_update =
tschema.__isset.is_flexible_partial_update && tschema.is_flexible_partial_update;
if (tschema.__isset.unique_key_update_mode) {
switch (tschema.unique_key_update_mode) {
case doris::TUniqueKeyUpdateMode::UPSERT: {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT;
break;
}
case doris::TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS: {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
break;
}
case doris::TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS: {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS;
break;
}
default: {
return Status::InternalError(
"Unknown unique_key_update_mode: {}, should be one of "
"UPSERT/UPDATE_FIXED_COLUMNS/UPDATE_FLEXIBLE_COLUMNS",
tschema.unique_key_update_mode);
}
}
} else {
// for backward compatibility
if (tschema.__isset.is_partial_update && tschema.is_partial_update) {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
} else {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT;
}
}
if (tschema.__isset.is_strict_mode) {
_is_strict_mode = tschema.is_strict_mode;
}
if (_is_fixed_partial_update) {
if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
_auto_increment_column = tschema.auto_increment_column;
if (!_auto_increment_column.empty() && tschema.auto_increment_column_unique_id == -1) {
return Status::InternalError(
Expand Down Expand Up @@ -221,7 +256,7 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
index->index_id = t_index.id;
index->schema_hash = t_index.schema_hash;
for (const auto& tcolumn_desc : t_index.columns_desc) {
if (!_is_fixed_partial_update ||
if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS ||
_partial_update_input_columns.contains(tcolumn_desc.column_name)) {
auto it = slots_map.find(
std::make_pair(to_lower(tcolumn_desc.column_name),
Expand Down Expand Up @@ -270,8 +305,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_db_id(_db_id);
pschema->set_table_id(_table_id);
pschema->set_version(_version);
pschema->set_partial_update(_is_fixed_partial_update);
pschema->set_is_flexible_partial_update(_is_flexible_partial_update);
pschema->set_unique_key_update_mode(_unique_key_update_mode);
pschema->set_is_strict_mode(_is_strict_mode);
pschema->set_auto_increment_column(_auto_increment_column);
pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id);
Expand Down
16 changes: 11 additions & 5 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <butil/fast_rand.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/descriptors.pb.h>
#include <gen_cpp/olap_file.pb.h>

#include <cstdint>
#include <functional>
Expand Down Expand Up @@ -88,11 +89,17 @@ class OlapTableSchemaParam {
return _proto_schema;
}

UniqueKeyUpdateModePB unique_key_update_mode() const { return _unique_key_update_mode; }

bool is_partial_update() const {
return _is_fixed_partial_update || _is_flexible_partial_update;
return _unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT;
}
bool is_fixed_partial_update() const {
return _unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
}
bool is_flexible_partial_update() const {
return _unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS;
}
bool is_fixed_partial_update() const { return _is_fixed_partial_update; }
bool is_flexible_partial_update() const { return _is_flexible_partial_update; }

std::set<std::string> partial_update_input_columns() const {
return _partial_update_input_columns;
Expand All @@ -115,9 +122,8 @@ class OlapTableSchemaParam {
mutable POlapTableSchemaParam* _proto_schema = nullptr;
std::vector<OlapTableIndexSchema*> _indexes;
mutable ObjectPool _obj_pool;
bool _is_fixed_partial_update = false;
UniqueKeyUpdateModePB _unique_key_update_mode {UniqueKeyUpdateModePB::UPSERT};
std::set<std::string> _partial_update_input_columns;
bool _is_flexible_partial_update;
bool _is_strict_mode = false;
std::string _auto_increment_column;
int32_t _auto_increment_column_unique_id;
Expand Down
17 changes: 12 additions & 5 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,16 +621,17 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
request.__set_enable_profile(false);
}
}

StringCaseMap<TUniqueKeyUpdateMode::type> unique_key_update_mode_map = {
{"UPSERT", TUniqueKeyUpdateMode::UPSERT},
{"FIXED_PARTIAL_UPDATE", TUniqueKeyUpdateMode::FIXED_PARTIAL_UPDATE},
{"FLEXIBLE_PARTIAL_UPDATE", TUniqueKeyUpdateMode::FLEXIBLE_PARTIAL_UPDATE}};
{"UPDATE_FIXED_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS},
{"UPDATE_FLEXIBLE_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}};
if (!http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) {
std::string unique_key_update_mode_str = http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE);
auto iter = unique_key_update_mode_map.find(unique_key_update_mode_str);
if (iter != unique_key_update_mode_map.end()) {
TUniqueKeyUpdateMode::type unique_key_update_mode = iter->second;
if (unique_key_update_mode == TUniqueKeyUpdateMode::FLEXIBLE_PARTIAL_UPDATE) {
if (unique_key_update_mode == TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) {
// check constraints when flexible partial update is enabled
if (ctx->format != TFileFormatType::FORMAT_JSON) {
return Status::InvalidArgument(
Expand All @@ -650,6 +651,11 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
return Status::InvalidArgument(
"Don't support flexible partial update when jsonpaths is specified");
}
if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) {
return Status::InvalidArgument(
"Don't support flexible partial update when hidden_columns is "
"specified");
}
}
request.__set_unique_key_update_mode(unique_key_update_mode);
} else {
Expand All @@ -663,11 +669,12 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
!http_req->header(HTTP_PARTIAL_COLUMNS).empty()) {
// only consider `partial_columns` parameter when `unique_key_update_mode` is not set
if (iequal(http_req->header(HTTP_PARTIAL_COLUMNS), "true")) {
request.__set_unique_key_update_mode(TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS);
// for backward compatibility
request.__set_partial_update(true);
} else {
request.__set_partial_update(false);
}
}

if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) {
bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true");
request.__set_memtable_on_sink_node(value);
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,7 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
}
// set partial update columns info
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_fixed_partial_update(),
table_schema_param->is_flexible_partial_update(),
_partial_update_info->init(*_tablet_schema, table_schema_param->unique_key_update_mode(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(), table_schema_param->timezone(),
Expand Down
28 changes: 10 additions & 18 deletions be/src/olap/partial_update_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,12 @@

namespace doris {

void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool fixed_partial_update,
bool flexible_partial_update,
void PartialUpdateInfo::init(const TabletSchema& tablet_schema,
UniqueKeyUpdateModePB unique_key_update_mode,
const std::set<string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone,
const std::string& auto_increment_column, int64_t cur_max_version) {
DCHECK(!(fixed_partial_update && flexible_partial_update))
<< "fixed_partial_update and flexible_partial_update can not be set simutanously!";
if (fixed_partial_update) {
partial_update_mode = PartialUpdateModePB::FIXED;
} else if (flexible_partial_update) {
partial_update_mode = PartialUpdateModePB::FLEXIBLE;
} else {
partial_update_mode = PartialUpdateModePB::NONE;
}
partial_update_mode = unique_key_update_mode;
partial_update_input_columns = partial_update_cols;
max_version_in_flush_phase = cur_max_version;
this->timestamp_ms = timestamp_ms;
Expand All @@ -53,7 +45,7 @@ void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool fixed_parti
update_cids.clear();

for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
if (fixed_partial_update) {
if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
auto tablet_column = tablet_schema.column(i);
if (!partial_update_input_columns.contains(tablet_column.name())) {
missing_cids.emplace_back(i);
Expand Down Expand Up @@ -111,9 +103,9 @@ void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
if (!partial_update_info_pb->has_partial_update_mode()) {
// for backward compatibility
if (partial_update_info_pb->is_partial_update()) {
partial_update_mode = PartialUpdateModePB::FIXED;
partial_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
} else {
partial_update_mode = PartialUpdateModePB::NONE;
partial_update_mode = UniqueKeyUpdateModePB::UPSERT;
}
} else {
partial_update_mode = partial_update_info_pb->partial_update_mode();
Expand Down Expand Up @@ -151,13 +143,13 @@ void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
std::string PartialUpdateInfo::summary() const {
std::string mode;
switch (partial_update_mode) {
case PartialUpdateModePB::NONE:
mode = "none";
case UniqueKeyUpdateModePB::UPSERT:
mode = "upsert";
break;
case PartialUpdateModePB::FIXED:
case UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS:
mode = "fixed partial update";
break;
case PartialUpdateModePB::FLEXIBLE:
case UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS:
mode = "flexible partial update";
break;
}
Expand Down
16 changes: 9 additions & 7 deletions be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// under the License.

#pragma once
#include <gen_cpp/olap_file.pb.h>

#include <cstdint>
#include <map>
#include <set>
Expand All @@ -38,27 +40,27 @@ struct RowsetId;
class BitmapValue;

struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool fixed_partial_update,
bool flexible_partial_update, const std::set<std::string>& partial_update_cols,
bool is_strict_mode, int64_t timestamp_ms, const std::string& timezone,
void init(const TabletSchema& tablet_schema, UniqueKeyUpdateModePB unique_key_update_mode,
const std::set<std::string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone,
const std::string& auto_increment_column, int64_t cur_max_version = -1);
void to_pb(PartialUpdateInfoPB* partial_update_info) const;
void from_pb(PartialUpdateInfoPB* partial_update_info);
std::string summary() const;

bool is_partial_update() const { return partial_update_mode != PartialUpdateModePB::NONE; }
bool is_partial_update() const { return partial_update_mode != UniqueKeyUpdateModePB::UPSERT; }
bool is_fixed_partial_update() const {
return partial_update_mode == PartialUpdateModePB::FIXED;
return partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
}
bool is_flexible_partial_update() const {
return partial_update_mode == PartialUpdateModePB::FLEXIBLE;
return partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS;
}

private:
void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema);

public:
PartialUpdateModePB partial_update_mode;
UniqueKeyUpdateModePB partial_update_mode {UniqueKeyUpdateModePB::UPSERT};
int64_t max_version_in_flush_phase {-1};
std::set<std::string> partial_update_input_columns;
std::vector<uint32_t> missing_cids;
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,7 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
}
// set partial update columns info
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_fixed_partial_update(),
table_schema_param->is_flexible_partial_update(),
_partial_update_info->init(*_tablet_schema, table_schema_param->unique_key_update_mode(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(), table_schema_param->timezone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TUniqueKeyUpdateMode;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -87,8 +88,7 @@ public enum JobType {
// used for stream load, FILE_LOCAL or FILE_STREAM
private TFileType fileType;
private List<String> hiddenColumns = null;
private boolean isFixedPartialUpdate = false;
private boolean isFlexiblePartialUpdate = false;
private TUniqueKeyUpdateMode uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT;

// for broker load
public FileGroupInfo(long loadJobId, long txnId, Table targetTable, BrokerDesc brokerDesc,
Expand All @@ -110,8 +110,7 @@ public FileGroupInfo(long loadJobId, long txnId, Table targetTable, BrokerDesc b
// for stream load
public FileGroupInfo(TUniqueId loadId, long txnId, Table targetTable, BrokerDesc brokerDesc,
BrokerFileGroup fileGroup, TBrokerFileStatus fileStatus, boolean strictMode,
TFileType fileType, List<String> hiddenColumns, boolean isFixedPartialUpdate,
boolean isFlexiblePartialUpdate) {
TFileType fileType, List<String> hiddenColumns, TUniqueKeyUpdateMode uniqueKeyUpdateMode) {
this.jobType = JobType.STREAM_LOAD;
this.loadId = loadId;
this.txnId = txnId;
Expand All @@ -124,8 +123,7 @@ public FileGroupInfo(TUniqueId loadId, long txnId, Table targetTable, BrokerDesc
this.strictMode = strictMode;
this.fileType = fileType;
this.hiddenColumns = hiddenColumns;
this.isFixedPartialUpdate = isFixedPartialUpdate;
this.isFlexiblePartialUpdate = isFlexiblePartialUpdate;
this.uniqueKeyUpdateMode = uniqueKeyUpdateMode;
}

public Table getTargetTable() {
Expand Down Expand Up @@ -166,12 +164,16 @@ public List<String> getHiddenColumns() {
return hiddenColumns;
}

public TUniqueKeyUpdateMode getUniqueKeyUpdateMode() {
return uniqueKeyUpdateMode;
}

public boolean isFixedPartialUpdate() {
return isFixedPartialUpdate;
return uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
}

public boolean isFlexiblePartialUpdate() {
return isFlexiblePartialUpdate;
return uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS;
}

public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy) throws UserException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private void initColumns(FileLoadScanNode.ParamCreateContext context, Analyzer a
Load.initColumns(fileGroupInfo.getTargetTable(), columnDescs, context.fileGroup.getColumnToHadoopFunction(),
context.exprMap, analyzer, context.srcTupleDescriptor, context.srcSlotDescByName, srcSlotIds,
formatType(context.fileGroup.getFileFormat()), fileGroupInfo.getHiddenColumns(),
fileGroupInfo.isFixedPartialUpdate(), fileGroupInfo.isFlexiblePartialUpdate());
fileGroupInfo.getUniqueKeyUpdateMode());

int columnCountFromPath = 0;
if (context.fileGroup.getColumnNamesFromPath() != null) {
Expand Down
Loading

0 comments on commit ae9c207

Please sign in to comment.