diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 146867cab47c4cb..6c04d083a939b9c 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -117,10 +118,18 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { _db_id = pschema.db_id(); _table_id = pschema.table_id(); _version = pschema.version(); - _is_fixed_partial_update = pschema.partial_update(); - _is_flexible_partial_update = pschema.is_flexible_partial_update(); + if (pschema.has_unique_key_update_mode()) { + _unique_key_update_mode = pschema.unique_key_update_mode(); + } else { + // for backward compatibility + if (pschema.has_partial_update() && pschema.partial_update()) { + _unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS; + } else { + _unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT; + } + } _is_strict_mode = pschema.is_strict_mode(); - if (_is_fixed_partial_update) { + if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { _auto_increment_column = pschema.auto_increment_column(); if (!_auto_increment_column.empty() && pschema.auto_increment_column_unique_id() == -1) { return Status::InternalError( @@ -153,7 +162,7 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { index->index_id = p_index.id(); index->schema_hash = p_index.schema_hash(); for (const auto& pcolumn_desc : p_index.columns_desc()) { - if (!_is_fixed_partial_update || + if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS || _partial_update_input_columns.contains(pcolumn_desc.name())) { auto it = slots_map.find(std::make_pair( to_lower(pcolumn_desc.name()), @@ -187,13 +196,39 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) { _db_id = tschema.db_id; _table_id = tschema.table_id; _version = tschema.version; - _is_fixed_partial_update = tschema.is_partial_update; - _is_flexible_partial_update = - tschema.__isset.is_flexible_partial_update && tschema.is_flexible_partial_update; + if (tschema.__isset.unique_key_update_mode) { + switch (tschema.unique_key_update_mode) { + case doris::TUniqueKeyUpdateMode::UPSERT: { + _unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT; + break; + } + case doris::TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS: { + _unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS; + break; + } + case doris::TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS: { + _unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS; + break; + } + default: { + return Status::InternalError( + "Unknown unique_key_update_mode: {}, should be one of " + "UPSERT/UPDATE_FIXED_COLUMNS/UPDATE_FLEXIBLE_COLUMNS", + tschema.unique_key_update_mode); + } + } + } else { + // for backward compatibility + if (tschema.__isset.is_partial_update && tschema.is_partial_update) { + _unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS; + } else { + _unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT; + } + } if (tschema.__isset.is_strict_mode) { _is_strict_mode = tschema.is_strict_mode; } - if (_is_fixed_partial_update) { + if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { _auto_increment_column = tschema.auto_increment_column; if (!_auto_increment_column.empty() && tschema.auto_increment_column_unique_id == -1) { return Status::InternalError( @@ -221,7 +256,7 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) { index->index_id = t_index.id; index->schema_hash = t_index.schema_hash; for (const auto& tcolumn_desc : t_index.columns_desc) { - if (!_is_fixed_partial_update || + if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS || _partial_update_input_columns.contains(tcolumn_desc.column_name)) { auto it = slots_map.find( std::make_pair(to_lower(tcolumn_desc.column_name), @@ -270,8 +305,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const { pschema->set_db_id(_db_id); pschema->set_table_id(_table_id); pschema->set_version(_version); - pschema->set_partial_update(_is_fixed_partial_update); - pschema->set_is_flexible_partial_update(_is_flexible_partial_update); + pschema->set_unique_key_update_mode(_unique_key_update_mode); pschema->set_is_strict_mode(_is_strict_mode); pschema->set_auto_increment_column(_auto_increment_column); pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id); diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 00fffb587ac7c97..70e4d34b5c3b24a 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -88,11 +89,17 @@ class OlapTableSchemaParam { return _proto_schema; } + UniqueKeyUpdateModePB unique_key_update_mode() const { return _unique_key_update_mode; } + bool is_partial_update() const { - return _is_fixed_partial_update || _is_flexible_partial_update; + return _unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT; + } + bool is_fixed_partial_update() const { + return _unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS; + } + bool is_flexible_partial_update() const { + return _unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS; } - bool is_fixed_partial_update() const { return _is_fixed_partial_update; } - bool is_flexible_partial_update() const { return _is_flexible_partial_update; } std::set partial_update_input_columns() const { return _partial_update_input_columns; @@ -115,9 +122,8 @@ class OlapTableSchemaParam { mutable POlapTableSchemaParam* _proto_schema = nullptr; std::vector _indexes; mutable ObjectPool _obj_pool; - bool _is_fixed_partial_update = false; + UniqueKeyUpdateModePB _unique_key_update_mode {UniqueKeyUpdateModePB::UPSERT}; std::set _partial_update_input_columns; - bool _is_flexible_partial_update; bool _is_strict_mode = false; std::string _auto_increment_column; int32_t _auto_increment_column_unique_id; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 2b27d9634627564..bbbdea66779d6a2 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -621,16 +621,17 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, request.__set_enable_profile(false); } } + StringCaseMap unique_key_update_mode_map = { {"UPSERT", TUniqueKeyUpdateMode::UPSERT}, - {"FIXED_PARTIAL_UPDATE", TUniqueKeyUpdateMode::FIXED_PARTIAL_UPDATE}, - {"FLEXIBLE_PARTIAL_UPDATE", TUniqueKeyUpdateMode::FLEXIBLE_PARTIAL_UPDATE}}; + {"UPDATE_FIXED_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS}, + {"UPDATE_FLEXIBLE_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}}; if (!http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) { std::string unique_key_update_mode_str = http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE); auto iter = unique_key_update_mode_map.find(unique_key_update_mode_str); if (iter != unique_key_update_mode_map.end()) { TUniqueKeyUpdateMode::type unique_key_update_mode = iter->second; - if (unique_key_update_mode == TUniqueKeyUpdateMode::FLEXIBLE_PARTIAL_UPDATE) { + if (unique_key_update_mode == TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) { // check constraints when flexible partial update is enabled if (ctx->format != TFileFormatType::FORMAT_JSON) { return Status::InvalidArgument( @@ -650,6 +651,11 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, return Status::InvalidArgument( "Don't support flexible partial update when jsonpaths is specified"); } + if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when hidden_columns is " + "specified"); + } } request.__set_unique_key_update_mode(unique_key_update_mode); } else { @@ -663,11 +669,12 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, !http_req->header(HTTP_PARTIAL_COLUMNS).empty()) { // only consider `partial_columns` parameter when `unique_key_update_mode` is not set if (iequal(http_req->header(HTTP_PARTIAL_COLUMNS), "true")) { + request.__set_unique_key_update_mode(TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS); + // for backward compatibility request.__set_partial_update(true); - } else { - request.__set_partial_update(false); } } + if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) { bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true"); request.__set_memtable_on_sink_node(value); diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index d1457165f4e1320..139997eaf144ae8 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -237,8 +237,7 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, } // set partial update columns info _partial_update_info = std::make_shared(); - _partial_update_info->init(*_tablet_schema, table_schema_param->is_fixed_partial_update(), - table_schema_param->is_flexible_partial_update(), + _partial_update_info->init(*_tablet_schema, table_schema_param->unique_key_update_mode(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), table_schema_param->timezone(), diff --git a/be/src/olap/partial_update_info.cpp b/be/src/olap/partial_update_info.cpp index d8e42010b0f55ab..95b8db27341667e 100644 --- a/be/src/olap/partial_update_info.cpp +++ b/be/src/olap/partial_update_info.cpp @@ -31,20 +31,12 @@ namespace doris { -void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool fixed_partial_update, - bool flexible_partial_update, +void PartialUpdateInfo::init(const TabletSchema& tablet_schema, + UniqueKeyUpdateModePB unique_key_update_mode, const std::set& partial_update_cols, bool is_strict_mode, int64_t timestamp_ms, const std::string& timezone, const std::string& auto_increment_column, int64_t cur_max_version) { - DCHECK(!(fixed_partial_update && flexible_partial_update)) - << "fixed_partial_update and flexible_partial_update can not be set simutanously!"; - if (fixed_partial_update) { - partial_update_mode = PartialUpdateModePB::FIXED; - } else if (flexible_partial_update) { - partial_update_mode = PartialUpdateModePB::FLEXIBLE; - } else { - partial_update_mode = PartialUpdateModePB::NONE; - } + partial_update_mode = unique_key_update_mode; partial_update_input_columns = partial_update_cols; max_version_in_flush_phase = cur_max_version; this->timestamp_ms = timestamp_ms; @@ -53,7 +45,7 @@ void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool fixed_parti update_cids.clear(); for (auto i = 0; i < tablet_schema.num_columns(); ++i) { - if (fixed_partial_update) { + if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { auto tablet_column = tablet_schema.column(i); if (!partial_update_input_columns.contains(tablet_column.name())) { missing_cids.emplace_back(i); @@ -111,9 +103,9 @@ void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) { if (!partial_update_info_pb->has_partial_update_mode()) { // for backward compatibility if (partial_update_info_pb->is_partial_update()) { - partial_update_mode = PartialUpdateModePB::FIXED; + partial_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS; } else { - partial_update_mode = PartialUpdateModePB::NONE; + partial_update_mode = UniqueKeyUpdateModePB::UPSERT; } } else { partial_update_mode = partial_update_info_pb->partial_update_mode(); @@ -151,13 +143,13 @@ void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) { std::string PartialUpdateInfo::summary() const { std::string mode; switch (partial_update_mode) { - case PartialUpdateModePB::NONE: - mode = "none"; + case UniqueKeyUpdateModePB::UPSERT: + mode = "upsert"; break; - case PartialUpdateModePB::FIXED: + case UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS: mode = "fixed partial update"; break; - case PartialUpdateModePB::FLEXIBLE: + case UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS: mode = "flexible partial update"; break; } diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index 0c96e0539cb65f8..25da07ba76a6d3d 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -16,6 +16,8 @@ // under the License. #pragma once +#include + #include #include #include @@ -38,27 +40,27 @@ struct RowsetId; class BitmapValue; struct PartialUpdateInfo { - void init(const TabletSchema& tablet_schema, bool fixed_partial_update, - bool flexible_partial_update, const std::set& partial_update_cols, - bool is_strict_mode, int64_t timestamp_ms, const std::string& timezone, + void init(const TabletSchema& tablet_schema, UniqueKeyUpdateModePB unique_key_update_mode, + const std::set& partial_update_cols, bool is_strict_mode, + int64_t timestamp_ms, const std::string& timezone, const std::string& auto_increment_column, int64_t cur_max_version = -1); void to_pb(PartialUpdateInfoPB* partial_update_info) const; void from_pb(PartialUpdateInfoPB* partial_update_info); std::string summary() const; - bool is_partial_update() const { return partial_update_mode != PartialUpdateModePB::NONE; } + bool is_partial_update() const { return partial_update_mode != UniqueKeyUpdateModePB::UPSERT; } bool is_fixed_partial_update() const { - return partial_update_mode == PartialUpdateModePB::FIXED; + return partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS; } bool is_flexible_partial_update() const { - return partial_update_mode == PartialUpdateModePB::FLEXIBLE; + return partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS; } private: void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema); public: - PartialUpdateModePB partial_update_mode; + UniqueKeyUpdateModePB partial_update_mode {UniqueKeyUpdateModePB::UPSERT}; int64_t max_version_in_flush_phase {-1}; std::set partial_update_input_columns; std::vector missing_cids; diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 87a36a659b5bc7a..6a817b3a59a2f80 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -408,8 +408,7 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, } // set partial update columns info _partial_update_info = std::make_shared(); - _partial_update_info->init(*_tablet_schema, table_schema_param->is_fixed_partial_update(), - table_schema_param->is_flexible_partial_update(), + _partial_update_info->init(*_tablet_schema, table_schema_param->unique_key_update_mode(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), table_schema_param->timezone(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java index 6e36420dc22ec06..2e1a888f12ea770 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java @@ -43,6 +43,7 @@ import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -87,8 +88,7 @@ public enum JobType { // used for stream load, FILE_LOCAL or FILE_STREAM private TFileType fileType; private List hiddenColumns = null; - private boolean isFixedPartialUpdate = false; - private boolean isFlexiblePartialUpdate = false; + private TUniqueKeyUpdateMode uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; // for broker load public FileGroupInfo(long loadJobId, long txnId, Table targetTable, BrokerDesc brokerDesc, @@ -110,8 +110,7 @@ public FileGroupInfo(long loadJobId, long txnId, Table targetTable, BrokerDesc b // for stream load public FileGroupInfo(TUniqueId loadId, long txnId, Table targetTable, BrokerDesc brokerDesc, BrokerFileGroup fileGroup, TBrokerFileStatus fileStatus, boolean strictMode, - TFileType fileType, List hiddenColumns, boolean isFixedPartialUpdate, - boolean isFlexiblePartialUpdate) { + TFileType fileType, List hiddenColumns, TUniqueKeyUpdateMode uniqueKeyUpdateMode) { this.jobType = JobType.STREAM_LOAD; this.loadId = loadId; this.txnId = txnId; @@ -124,8 +123,7 @@ public FileGroupInfo(TUniqueId loadId, long txnId, Table targetTable, BrokerDesc this.strictMode = strictMode; this.fileType = fileType; this.hiddenColumns = hiddenColumns; - this.isFixedPartialUpdate = isFixedPartialUpdate; - this.isFlexiblePartialUpdate = isFlexiblePartialUpdate; + this.uniqueKeyUpdateMode = uniqueKeyUpdateMode; } public Table getTargetTable() { @@ -166,12 +164,16 @@ public List getHiddenColumns() { return hiddenColumns; } + public TUniqueKeyUpdateMode getUniqueKeyUpdateMode() { + return uniqueKeyUpdateMode; + } + public boolean isFixedPartialUpdate() { - return isFixedPartialUpdate; + return uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; } public boolean isFlexiblePartialUpdate() { - return isFlexiblePartialUpdate; + return uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS; } public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy) throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/LoadScanProvider.java index 8a2d9114d5d9c18..cf69b24a7376fcd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/LoadScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/LoadScanProvider.java @@ -221,7 +221,7 @@ private void initColumns(FileLoadScanNode.ParamCreateContext context, Analyzer a Load.initColumns(fileGroupInfo.getTargetTable(), columnDescs, context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer, context.srcTupleDescriptor, context.srcSlotDescByName, srcSlotIds, formatType(context.fileGroup.getFileFormat()), fileGroupInfo.getHiddenColumns(), - fileGroupInfo.isFixedPartialUpdate(), fileGroupInfo.isFlexiblePartialUpdate()); + fileGroupInfo.getUniqueKeyUpdateMode()); int columnCountFromPath = 0; if (context.fileGroup.getColumnNamesFromPath() != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 7eab7f5d704787e..bcabc72d79afeb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -73,6 +73,7 @@ import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.TEtlState; import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -260,7 +261,7 @@ public static List getSchemaChangeShadowColumnDesc(Table tbl, public static void initColumns(Table tbl, List columnExprs, Map>> columnToHadoopFunction) throws UserException { initColumns(tbl, columnExprs, columnToHadoopFunction, null, null, null, null, null, null, null, false, - false, false); + TUniqueKeyUpdateMode.UPSERT); } /* @@ -270,12 +271,12 @@ public static void initColumns(Table tbl, List columnExprs, public static void initColumns(Table tbl, LoadTaskInfo.ImportColumnDescs columnDescs, Map>> columnToHadoopFunction, Map exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc, Map slotDescByName, - List srcSlotIds, TFileFormatType formatType, List hiddenColumns, boolean isPartialUpdate, - boolean isFlexiblePartialUpdate) + List srcSlotIds, TFileFormatType formatType, List hiddenColumns, + TUniqueKeyUpdateMode uniquekeyUpdateMode) throws UserException { rewriteColumns(columnDescs); initColumns(tbl, columnDescs.descs, columnToHadoopFunction, exprsByName, analyzer, srcTupleDesc, slotDescByName, - srcSlotIds, formatType, hiddenColumns, true, isPartialUpdate, isFlexiblePartialUpdate); + srcSlotIds, formatType, hiddenColumns, true, uniquekeyUpdateMode); } /* @@ -290,8 +291,7 @@ private static void initColumns(Table tbl, List columnExprs, Map>> columnToHadoopFunction, Map exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc, Map slotDescByName, List srcSlotIds, TFileFormatType formatType, List hiddenColumns, - boolean needInitSlotAndAnalyzeExprs, boolean isPartialUpdate, - boolean isFlexiblePartialUpdate) throws UserException { + boolean needInitSlotAndAnalyzeExprs, TUniqueKeyUpdateMode uniquekeyUpdateMode) throws UserException { // We make a copy of the columnExprs so that our subsequent changes // to the columnExprs will not affect the original columnExprs. // skip the mapping columns not exist in schema @@ -336,7 +336,7 @@ private static void initColumns(Table tbl, List columnExprs, } copiedColumnExprs.add(columnDesc); } - if (hasSkipBitmapColumn && isFlexiblePartialUpdate) { + if (hasSkipBitmapColumn && uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) { Preconditions.checkArgument(!specifyFileFieldNames); Preconditions.checkArgument(hiddenColumns == null); if (LOG.isDebugEnabled()) { @@ -388,7 +388,7 @@ private static void initColumns(Table tbl, List columnExprs, exprsByName.put(column.getName(), NullLiteral.create(column.getType())); continue; } - if (isPartialUpdate) { + if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { continue; } if (column.isAutoInc()) { @@ -460,7 +460,7 @@ private static void initColumns(Table tbl, List columnExprs, if (formatType == TFileFormatType.FORMAT_ARROW) { slotDesc.setColumn(new Column(realColName, colToType.get(realColName))); } else { - if (isFlexiblePartialUpdate && hasSkipBitmapColumn) { + if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && hasSkipBitmapColumn) { // we store the unique ids of missing columns in skip bitmap column in flexible partial update int colUniqueId = tblColumn.getUniqueId(); if (realColName.equals(Column.SKIP_BITMAP_COL)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java index 35c84cef11331e6..06991d5fb52fbbd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java @@ -61,12 +61,6 @@ public int getValue() { } } - public enum UniquekeyUpdateMode { - UPSERT, - FIXED_PARTIAL_UPDATE, - FLEXIBLE_PARTIAL_UPDATE - } - private static final Logger LOG = LogManager.getLogger(LoadTask.class); public static final Comparator COMPARATOR = Comparator.comparing(LoadTask::getPriorityValue) .thenComparingLong(LoadTask::getSignature); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java index 01a8e30cb760f14..f1134cb19285931 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java @@ -49,6 +49,7 @@ import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -108,10 +109,9 @@ public void setLoadInfo(long loadJobId, long txnId, Table targetTable, BrokerDes // Only for stream load/routine load job. public void setLoadInfo(TUniqueId loadId, long txnId, Table targetTable, BrokerDesc brokerDesc, BrokerFileGroup fileGroup, TBrokerFileStatus fileStatus, boolean strictMode, - TFileType fileType, List hiddenColumns, boolean isFixedPartialUpdate, - boolean isFlexiblePartialUpdate) { + TFileType fileType, List hiddenColumns, TUniqueKeyUpdateMode uniquekeyUpdateMode) { FileGroupInfo fileGroupInfo = new FileGroupInfo(loadId, txnId, targetTable, brokerDesc, fileGroup, - fileStatus, strictMode, fileType, hiddenColumns, isFixedPartialUpdate, isFlexiblePartialUpdate); + fileStatus, strictMode, fileType, hiddenColumns, uniquekeyUpdateMode); fileGroupInfos.add(fileGroupInfo); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index b1268b48a8d6b7f..ce291d9043914e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -76,6 +76,7 @@ import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TTabletLocation; import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; @@ -106,8 +107,7 @@ public class OlapTableSink extends DataSink { // specified partition ids. private List partitionIds; // partial update input columns - private boolean isFixedPartialUpdate = false; - private boolean isFlexiblePartialUpdate = false; + private TUniqueKeyUpdateMode uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; private HashSet partialUpdateInputColumns; // set after init called @@ -179,13 +179,18 @@ public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeou } } - public void setPartialUpdateInputColumns(boolean isFixedPartialUpdate, HashSet columns) { - this.isFixedPartialUpdate = isFixedPartialUpdate; - this.partialUpdateInputColumns = columns; + public void setPartialUpdateInputColumns(boolean isPartialUpdate, HashSet columns) { + if (isPartialUpdate) { + this.uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; + this.partialUpdateInputColumns = columns; + } } - public void setFlexiblePartialUpdate(boolean isFlexiblePartialUpdate) { - this.isFlexiblePartialUpdate = isFlexiblePartialUpdate; + public void setPartialUpdateInfo(TUniqueKeyUpdateMode uniqueKeyUpdateMode, HashSet columns) { + this.uniqueKeyUpdateMode = uniqueKeyUpdateMode; + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { + this.partialUpdateInputColumns = columns; + } } public void updateLoadId(TUniqueId newLoadId) { @@ -247,10 +252,10 @@ public String getExplainString(String prefix, TExplainLevel explainLevel) { } strBuilder.append(prefix + " TUPLE ID: " + tupleDescriptor.getId() + "\n"); strBuilder.append(prefix + " " + DataPartition.RANDOM.getExplainString(explainLevel)); - boolean isPartialUpdate = isFixedPartialUpdate || isFlexiblePartialUpdate; + boolean isPartialUpdate = uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPSERT; strBuilder.append(prefix + " IS_PARTIAL_UPDATE: " + isPartialUpdate); if (isPartialUpdate) { - if (isFixedPartialUpdate) { + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { strBuilder.append(prefix + " PARTIAL_UPDATE_MODE: FIXED_PARTIAL_UPDATE"); } else { strBuilder.append(prefix + " PARTIAL_UPDATE_MODE: FLEXIBLE_PARTIAL_UPDATE"); @@ -329,9 +334,10 @@ public TOlapTableSchemaParam createSchema(long dbId, OlapTable table, Analyzer a indexSchema.setIndexesDesc(indexDesc); schemaParam.addToIndexes(indexSchema); } - schemaParam.setIsPartialUpdate(isFixedPartialUpdate); - schemaParam.setIsFlexiblePartialUpdate(isFlexiblePartialUpdate); - if (isFixedPartialUpdate) { + // for backward compatibility + schemaParam.setIsPartialUpdate(uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS); + schemaParam.setUniqueKeyUpdateMode(uniqueKeyUpdateMode); + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { for (String s : partialUpdateInputColumns) { schemaParam.addToPartialUpdateInputColumns(s); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 4f3b4497aeea355..07999b30656bba7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -62,6 +62,7 @@ import org.apache.doris.thrift.TScanRangeLocations; import org.apache.doris.thrift.TScanRangeParams; import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -143,25 +144,21 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde scanTupleDesc = descTable.createTupleDescriptor("ScanTuple"); boolean negative = taskInfo.getNegative(); // get partial update related info - boolean isFixedPartialUpdate = taskInfo.isFixedPartialUpdate(); - boolean isFlexiblePartialUpdate = taskInfo.isFlexiblePartialUpdate(); - if ((isFixedPartialUpdate || isFlexiblePartialUpdate) && !destTable.getEnableUniqueKeyMergeOnWrite()) { + TUniqueKeyUpdateMode uniquekeyUpdateMode = taskInfo.getUniqueKeyUpdateMode(); + if (uniquekeyUpdateMode != TUniqueKeyUpdateMode.UPSERT && !destTable.getEnableUniqueKeyMergeOnWrite()) { throw new UserException("Only unique key merge on write support partial update"); } - if (isFixedPartialUpdate && isFlexiblePartialUpdate) { - throw new AnalysisException("isFixedPartialUpdate and isFlexiblePartialUpdate" - + "can't be set to true bothly."); - } - if (isFlexiblePartialUpdate && !destTable.hasSkipBitmapColumn()) { + if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && !destTable.hasSkipBitmapColumn()) { throw new UserException("Flexible partial update can only support table with skip bitmap hidden column." + "But table " + destTable.getName() + "doesn't have it"); } - if (isFlexiblePartialUpdate && !destTable.getEnableLightSchemaChange()) { + if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS + && !destTable.getEnableLightSchemaChange()) { throw new UserException("Flexible partial update can only support table with light_schema_change enabled." + "But table " + destTable.getName() + "'s property light_schema_change is false"); } HashSet partialUpdateInputColumns = new HashSet<>(); - if (isFixedPartialUpdate) { + if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { for (Column col : destTable.getFullSchema()) { boolean existInExpr = false; if (col.hasOnUpdateDefaultValue()) { @@ -200,7 +197,8 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde } // here we should be full schema to fill the descriptor table for (Column col : destTable.getFullSchema()) { - if (isFixedPartialUpdate && !partialUpdateInputColumns.contains(col.getName())) { + if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS + && !partialUpdateInputColumns.contains(col.getName())) { continue; } SlotDescriptor slotDesc = descTable.addSlotDescriptor(tupleDesc); @@ -265,7 +263,7 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde // The load id will pass to csv reader to find the stream load context from new load stream manager fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, BrokerDesc.createForStreamLoad(), fileGroup, fileStatus, taskInfo.isStrictMode(), taskInfo.getFileType(), taskInfo.getHiddenColumns(), - isFixedPartialUpdate, isFlexiblePartialUpdate); + uniquekeyUpdateMode); scanNode = fileScanNode; scanNode.init(analyzer); @@ -297,8 +295,7 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde int txnTimeout = timeout == 0 ? ConnectContext.get().getExecTimeout() : timeout; olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode(), txnTimeout); - olapTableSink.setPartialUpdateInputColumns(isFixedPartialUpdate, partialUpdateInputColumns); - olapTableSink.setFlexiblePartialUpdate(isFlexiblePartialUpdate); + olapTableSink.setPartialUpdateInfo(uniquekeyUpdateMode, partialUpdateInputColumns); olapTableSink.complete(analyzer); // for stream load, we only need one fragment, ScanNode -> DataSink. diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index 878ce423097cdbf..cf48aad9772f674 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -25,6 +25,7 @@ import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; @@ -110,8 +111,8 @@ default long getFileSize() { boolean isFixedPartialUpdate(); - default LoadTask.UniquekeyUpdateMode getUniquekeyUpdateMode() { - return LoadTask.UniquekeyUpdateMode.UPSERT; + default TUniqueKeyUpdateMode getUniqueKeyUpdateMode() { + return TUniqueKeyUpdateMode.UPSERT; } default boolean isFlexiblePartialUpdate() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 5652e97ddc866ed..3e08ce264add286 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -35,6 +35,7 @@ import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; @@ -83,7 +84,7 @@ public class StreamLoadTask implements LoadTaskInfo { private String headerType = ""; private List hiddenColumns; private boolean trimDoubleQuotes = false; - private LoadTask.UniquekeyUpdateMode uniquekeyUpdateMode = LoadTask.UniquekeyUpdateMode.UPSERT; + private TUniqueKeyUpdateMode uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; private int skipLines = 0; private boolean enableProfile = false; @@ -298,17 +299,17 @@ public boolean getEnableProfile() { @Override public boolean isFixedPartialUpdate() { - return uniquekeyUpdateMode == LoadTask.UniquekeyUpdateMode.FIXED_PARTIAL_UPDATE; + return uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; } @Override - public LoadTask.UniquekeyUpdateMode getUniquekeyUpdateMode() { + public TUniqueKeyUpdateMode getUniqueKeyUpdateMode() { return uniquekeyUpdateMode; } @Override public boolean isFlexiblePartialUpdate() { - return uniquekeyUpdateMode == LoadTask.UniquekeyUpdateMode.FLEXIBLE_PARTIAL_UPDATE; + return uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS; } @Override @@ -463,15 +464,17 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws } if (request.isSetUniqueKeyUpdateMode()) { try { - uniquekeyUpdateMode = LoadTask.UniquekeyUpdateMode - .valueOf(request.getUniqueKeyUpdateMode().toString()); + uniquekeyUpdateMode = request.getUniqueKeyUpdateMode(); } catch (IllegalArgumentException e) { throw new UserException("unknown unique_key_update_mode: " + request.getUniqueKeyUpdateMode().toString()); } - } - if (!request.isSetUniqueKeyUpdateMode() && request.isSetPartialUpdate()) { - uniquekeyUpdateMode = LoadTask.UniquekeyUpdateMode.FIXED_PARTIAL_UPDATE; + } else { + if (request.isSetPartialUpdate() && request.isPartialUpdate()) { + uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; + } else { + uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; + } } if (request.isSetMemtableOnSinkNode()) { this.memtableOnSinkNode = request.isMemtableOnSinkNode(); diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index 6f258bb61bf95d5..6f25a49af8fe1d2 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -66,13 +66,13 @@ message POlapTableSchemaParam { repeated PSlotDescriptor slot_descs = 4; required PTupleDescriptor tuple_desc = 5; repeated POlapTableIndexSchema indexes = 6; - optional bool partial_update = 7 [default = false]; // flag for fixed partial update + optional bool partial_update = 7 [default = false]; // deprecated, use unique_key_update_mode repeated string partial_update_input_columns = 8; optional bool is_strict_mode = 9 [default = false]; optional string auto_increment_column = 10; optional int64 timestamp_ms = 11 [default = 0]; optional string timezone = 12; optional int32 auto_increment_column_unique_id = 13 [default = -1]; - optional bool is_flexible_partial_update = 14 [default = false]; + optional UniqueKeyUpdateModePB unique_key_update_mode = 14 [default = UPSERT]; }; diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 80f9208bfdf5ea3..4b832b5e3216fb4 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -359,10 +359,10 @@ enum SortType { ZORDER = 1; } -enum PartialUpdateModePB { - NONE = 0; - FIXED = 1; - FLEXIBLE = 2; +enum UniqueKeyUpdateModePB { + UPSERT = 0; + UPDATE_FIXED_COLUMNS = 1; + UPDATE_FLEXIBLE_COLUMNS = 2; } // ATTN: When adding or deleting fields, please update `message TabletSchemaCloudPB` @@ -628,5 +628,5 @@ message PartialUpdateInfoPB { optional bool is_schema_contains_auto_inc_column = 10 [default = false]; repeated string default_values = 11; optional int64 max_version_in_flush_phase = 12 [default = -1]; - optional PartialUpdateModePB partial_update_mode = 13 [default = NONE]; + optional UniqueKeyUpdateModePB partial_update_mode = 13 [default = UPSERT]; } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 925c358ceb4383d..ec4f81b29ef71b0 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -247,13 +247,13 @@ struct TOlapTableSchemaParam { 5: required TTupleDescriptor tuple_desc 6: required list indexes 7: optional bool is_dynamic_schema // deprecated - 8: optional bool is_partial_update // flag for fixed partial update + 8: optional bool is_partial_update // deprecated, use unique_key_update_mode 9: optional list partial_update_input_columns 10: optional bool is_strict_mode = false 11: optional string auto_increment_column 12: optional i32 auto_increment_column_unique_id = -1 13: optional Types.TInvertedIndexFileStorageFormat inverted_index_file_storage_format = Types.TInvertedIndexFileStorageFormat.V1 - 14: optional bool is_flexible_partial_update = false + 14: optional Types.TUniqueKeyUpdateMode unique_key_update_mode = Types.TUniqueKeyUpdateMode.UPSERT } struct TTabletLocation { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index bc7801d7bebff8d..00311dc14ef7a3e 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -719,8 +719,8 @@ enum TMergeType { enum TUniqueKeyUpdateMode { UPSERT, - FIXED_PARTIAL_UPDATE, - FLEXIBLE_PARTIAL_UPDATE + UPDATE_FIXED_COLUMNS, + UPDATE_FLEXIBLE_COLUMNS } enum TSortType { diff --git a/regression-test/suites/fault_injection_p0/flexible_partial_update/test_flexible_partial_update_publish_conflict.groovy b/regression-test/suites/fault_injection_p0/flexible_partial_update/test_flexible_partial_update_publish_conflict.groovy index 47c9742b98df51d..2accbff6ae83f52 100644 --- a/regression-test/suites/fault_injection_p0/flexible_partial_update/test_flexible_partial_update_publish_conflict.groovy +++ b/regression-test/suites/fault_injection_p0/flexible_partial_update/test_flexible_partial_update_publish_conflict.groovy @@ -75,7 +75,7 @@ suite("test_flexible_partial_update_publish_conflict", "nonConcurrent") { set 'format', 'json' set 'read_json_by_line', 'true' set 'strict_mode', 'false' - set 'unique_key_update_mode', 'FLEXIBLE_PARTIAL_UPDATE' + set 'unique_key_update_mode', 'UPDATE_FLEXIBLE_COLUMNS' file "test1.json" time 1000000 } @@ -89,7 +89,7 @@ suite("test_flexible_partial_update_publish_conflict", "nonConcurrent") { set 'format', 'json' set 'read_json_by_line', 'true' set 'strict_mode', 'false' - set 'unique_key_update_mode', 'FLEXIBLE_PARTIAL_UPDATE' + set 'unique_key_update_mode', 'UPDATE_FLEXIBLE_COLUMNS' file "test2.json" time 1000000 } diff --git a/regression-test/suites/unique_with_mow_p0/flexible_partial_update/test_flexible_partial_update.groovy b/regression-test/suites/unique_with_mow_p0/flexible_partial_update/test_flexible_partial_update.groovy index 543fa5681d28e2e..4acf1fd39ac1b12 100644 --- a/regression-test/suites/unique_with_mow_p0/flexible_partial_update/test_flexible_partial_update.groovy +++ b/regression-test/suites/unique_with_mow_p0/flexible_partial_update/test_flexible_partial_update.groovy @@ -42,7 +42,7 @@ suite('test_flexible_partial_update') { set 'format', 'json' set 'read_json_by_line', 'true' set 'strict_mode', 'false' - set 'unique_key_update_mode', 'FLEXIBLE_PARTIAL_UPDATE' + set 'unique_key_update_mode', 'UPDATE_FLEXIBLE_COLUMNS' file "test1.json" time 1000000 // limit inflight 10s }