Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[env](compile) open compile check in runtime file #42499

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions be/src/vec/runtime/ip_address_cidr.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@

#include "util/sse_util.hpp"
#include "vec/common/format_ip.h"
#include "vec/common/ipv6_to_binary.h"
namespace doris {
#include "common/compile_check_begin.h"

namespace vectorized {
static inline std::pair<UInt32, UInt32> apply_cidr_mask(UInt32 src, UInt8 bits_to_keep) {
Expand All @@ -46,7 +48,7 @@ static inline void apply_cidr_mask(const char* __restrict src, char* __restrict

for (int8_t i = IPV6_BINARY_LENGTH - 1; i >= 0; --i) {
dst_lower[i] = src[i] & mask[i];
dst_upper[i] = dst_lower[i] | ~mask[i];
dst_upper[i] = char(dst_lower[i] | ~mask[i]);
}
}
} // namespace vectorized
Expand Down Expand Up @@ -96,14 +98,14 @@ struct IPAddressCIDR {
};

bool match_ipv4_subnet(uint32_t addr, uint32_t cidr_addr, uint8_t prefix) {
uint32_t mask = (prefix >= 32) ? 0xffffffffu : ~(0xffffffffu >> prefix);
uint32_t mask = (prefix >= 32) ? 0xffffffffU : ~(0xffffffffU >> prefix);
return (addr & mask) == (cidr_addr & mask);
}

#if defined(__SSE2__) || defined(__aarch64__)

bool match_ipv6_subnet(const uint8_t* addr, const uint8_t* cidr_addr, uint8_t prefix) {
uint16_t mask = _mm_movemask_epi8(
uint16_t mask = (uint16_t)_mm_movemask_epi8(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: use auto when initializing with a cast to avoid duplicating the type name [modernize-use-auto]

Suggested change
uint16_t mask = (uint16_t)_mm_movemask_epi8(
auto mask = (uint16_t)_mm_movemask_epi8(

_mm_cmpeq_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i*>(addr)),
_mm_loadu_si128(reinterpret_cast<const __m128i*>(cidr_addr))));
mask = ~mask;
Expand Down Expand Up @@ -191,4 +193,5 @@ inline bool is_address_in_range(const IPAddressVariant& address, const IPAddress
return false;
}

} // namespace doris
} // namespace doris
#include "common/compile_check_end.h"
3 changes: 3 additions & 0 deletions be/src/vec/runtime/ipv4_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "vec/common/string_ref.h"

namespace doris {
#include "common/compile_check_begin.h"

class IPv4Value {
public:
Expand Down Expand Up @@ -101,3 +102,5 @@ class IPv4Value {
};

} // namespace doris

#include "common/compile_check_end.h"
3 changes: 3 additions & 0 deletions be/src/vec/runtime/ipv6_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "vec/data_types/data_type_number_base.h"

namespace doris {
#include "common/compile_check_begin.h"

class IPv6Value {
public:
Expand Down Expand Up @@ -98,3 +99,5 @@ class IPv6Value {
};

} // namespace doris

#include "common/compile_check_end.h"
12 changes: 7 additions & 5 deletions be/src/vec/runtime/partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,23 @@

#include "partitioner.h"

#include "common/cast_set.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
#include "runtime/thread_context.h"
#include "vec/columns/column_const.h"
#include "vec/sink/vdata_stream_sender.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"

template <typename ChannelIds>
Status Crc32HashPartitioner<ChannelIds>::do_partitioning(RuntimeState* state, Block* block) const {
int rows = block->rows();
size_t rows = block->rows();

if (rows > 0) {
auto column_to_keep = block->columns();

int result_size = _partition_expr_ctxs.size();
int result_size = cast_set<int>(_partition_expr_ctxs.size());
std::vector<int> result(result_size);

_hash_vals.resize(rows);
Expand All @@ -42,7 +44,7 @@ Status Crc32HashPartitioner<ChannelIds>::do_partitioning(RuntimeState* state, Bl
_do_hash(unpack_if_const(block->get_by_position(result[j]).column).first, hashes, j);
}

for (int i = 0; i < rows; i++) {
for (size_t i = 0; i < rows; i++) {
hashes[i] = ChannelIds()(hashes[i], _partition_count);
}

Expand All @@ -55,13 +57,13 @@ template <typename ChannelIds>
void Crc32HashPartitioner<ChannelIds>::_do_hash(const ColumnPtr& column,
uint32_t* __restrict result, int idx) const {
column->update_crcs_with_value(result, _partition_expr_ctxs[idx]->root()->type().type,
column->size());
cast_set<uint32_t>(column->size()));
}

template <typename ChannelIds>
Status Crc32HashPartitioner<ChannelIds>::clone(RuntimeState* state,
std::unique_ptr<PartitionerBase>& partitioner) {
auto* new_partitioner = new Crc32HashPartitioner<ChannelIds>(_partition_count);
auto* new_partitioner = new Crc32HashPartitioner<ChannelIds>(cast_set<int>(_partition_count));
partitioner.reset(new_partitioner);
new_partitioner->_partition_expr_ctxs.resize(_partition_expr_ctxs.size());
for (size_t i = 0; i < _partition_expr_ctxs.size(); i++) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/runtime/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "vec/exprs/vexpr_context.h"

namespace doris {
#include "common/compile_check_begin.h"
class MemTracker;

namespace vectorized {
Expand Down Expand Up @@ -111,3 +112,5 @@ struct SpillPartitionChannelIds {

} // namespace vectorized
} // namespace doris

#include "common/compile_check_end.h"
1 change: 1 addition & 0 deletions be/src/vec/runtime/shared_hash_table_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "pipeline/exec/hashjoin_build_sink.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"

void SharedHashTableController::set_builder_and_consumers(TUniqueId builder, int node_id) {
// Only need to set builder and consumers with pipeline engine enabled.
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/runtime/shared_hash_table_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "vec/core/block.h"

namespace doris {
#include "common/compile_check_begin.h"

class RuntimeState;
class MinMaxFuncBase;
Expand Down Expand Up @@ -101,3 +102,5 @@ class SharedHashTableController {

} // namespace vectorized
} // namespace doris

#include "common/compile_check_end.h"
7 changes: 6 additions & 1 deletion be/src/vec/runtime/time_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "vec/data_types/data_type_time.h"

namespace doris {
#include "common/compile_check_begin.h"

/// TODO: Due to the "Time type is not supported for OLAP table" issue, a lot of basic content is missing.It will be supplemented later.
class TimeValue {
Expand Down Expand Up @@ -58,7 +59,9 @@ class TimeValue {
static std::string to_string(TimeType time, int scale) {
return timev2_to_buffer_from_double(time, scale);
}
static int32_t hour(TimeType time) { return check_over_max_time(time) / ONE_HOUR_MICROSECONDS; }
static int32_t hour(TimeType time) {
return static_cast<int32_t>(check_over_max_time(time) / ONE_HOUR_MICROSECONDS);
}

static int32_t minute(TimeType time) {
return (check_over_max_time(time) % ONE_HOUR_MICROSECONDS) / ONE_MINUTE_MICROSECONDS;
Expand All @@ -70,3 +73,5 @@ class TimeValue {
};

} // namespace doris

#include "common/compile_check_end.h"
8 changes: 5 additions & 3 deletions be/src/vec/runtime/vcsv_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cstdlib>
#include <cstring>

#include "common/cast_set.h"
#include "common/status.h"
#include "io/fs/file_writer.h"
#include "runtime/primitive_type.h"
Expand All @@ -36,6 +37,7 @@
#include "vec/exprs/vexpr_context.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"

static const unsigned char bom[] = {0xEF, 0xBB, 0xBF};

Expand Down Expand Up @@ -70,7 +72,7 @@ VCSVTransformer::VCSVTransformer(RuntimeState* state, doris::io::FileWriter* fil
_options.escape_char = hive_serde_properties->escape_char[0];
}
_options.null_format = hive_serde_properties->null_format.data();
_options.null_len = hive_serde_properties->null_format.length();
_options.null_len = cast_set<int>(hive_serde_properties->null_format.length());
// The list of separators + escapeChar are the bytes required to be escaped.
if (_options.escape_char != 0) {
_options.need_escape[_options.escape_char & 0xff] = true;
Expand Down Expand Up @@ -111,7 +113,7 @@ Status VCSVTransformer::write(const Block& block) {
auto ser_col = ColumnString::create();
ser_col->reserve(block.columns());
VectorBufferWriter buffer_writer(*ser_col.get());
for (size_t i = 0; i < block.rows(); i++) {
for (int i = 0; i < block.rows(); i++) {
for (size_t col_id = 0; col_id < block.columns(); col_id++) {
if (col_id != 0) {
buffer_writer.write(_column_separator.data(), _column_separator.size());
Expand Down Expand Up @@ -158,7 +160,7 @@ Status VCSVTransformer::_flush_plain_text_outstream(ColumnString& ser_col) {

std::string VCSVTransformer::_gen_csv_header_types() {
std::string types;
int num_columns = _output_vexpr_ctxs.size();
int num_columns = (int)_output_vexpr_ctxs.size();
for (int i = 0; i < num_columns; ++i) {
types += type_to_string(_output_vexpr_ctxs[i]->root()->type().type);
if (i < num_columns - 1) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/runtime/vcsv_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "vfile_format_transformer.h"

namespace doris {
#include "common/compile_check_begin.h"
namespace io {
class FileWriter;
} // namespace io
Expand Down Expand Up @@ -83,3 +84,5 @@ class VCSVTransformer final : public VFileFormatTransformer {
};

} // namespace doris::vectorized

#include "common/compile_check_end.h"
5 changes: 3 additions & 2 deletions be/src/vec/runtime/vdata_stream_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "vec/runtime/vdata_stream_recvr.h"

namespace doris {
#include "common/compile_check_begin.h"
namespace vectorized {

VDataStreamMgr::VDataStreamMgr() {
Expand Down Expand Up @@ -81,7 +82,7 @@ Status VDataStreamMgr::find_recvr(const TUniqueId& fragment_instance_id, PlanNod
std::shared_ptr<VDataStreamRecvr>* res, bool acquire_lock) {
VLOG_ROW << "looking up fragment_instance_id=" << print_id(fragment_instance_id)
<< ", node=" << node_id;
size_t hash_value = get_hash_value(fragment_instance_id, node_id);
uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
// Create lock guard and not own lock currently and will lock conditionally
std::unique_lock recvr_lock(_lock, std::defer_lock);
if (acquire_lock) {
Expand Down Expand Up @@ -157,7 +158,7 @@ Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, P
std::shared_ptr<VDataStreamRecvr> targert_recvr;
VLOG_QUERY << "deregister_recvr(): fragment_instance_id=" << print_id(fragment_instance_id)
<< ", node=" << node_id;
size_t hash_value = get_hash_value(fragment_instance_id, node_id);
uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
{
std::lock_guard<std::mutex> l(_lock);
auto range = _receiver_map.equal_range(hash_value);
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/runtime/vdata_stream_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "common/status.h"

namespace google {
#include "common/compile_check_begin.h"
namespace protobuf {
class Closure;
}
Expand Down Expand Up @@ -96,3 +97,5 @@ class VDataStreamMgr {
};
} // namespace vectorized
} // namespace doris

#include "common/compile_check_end.h"
1 change: 1 addition & 0 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "vec/runtime/vsorted_run_merger.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"

VDataStreamRecvr::SenderQueue::SenderQueue(
VDataStreamRecvr* parent_recvr, int num_senders, RuntimeProfile* profile,
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "vec/exprs/vexpr_fwd.h"

namespace doris {
#include "common/compile_check_begin.h"
class MemTracker;
class PBlock;
class MemTrackerLimiter;
Expand Down Expand Up @@ -273,3 +274,5 @@ class VDataStreamRecvr::SenderQueue {

} // namespace vectorized
} // namespace doris

#include "common/compile_check_end.h"
3 changes: 3 additions & 0 deletions be/src/vec/runtime/vfile_format_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "vec/exprs/vexpr_fwd.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"

class VFileFormatTransformer {
public:
Expand Down Expand Up @@ -61,3 +62,5 @@ class VFileFormatTransformer {
vectorized::DataTypeSerDe::FormatOptions _options;
};
} // namespace doris::vectorized

#include "common/compile_check_end.h"
4 changes: 3 additions & 1 deletion be/src/vec/runtime/vorc_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <exception>
#include <ostream>

#include "common/cast_set.h"
#include "common/status.h"
#include "io/fs/file_writer.h"
#include "orc/Int128.hh"
Expand Down Expand Up @@ -59,6 +60,7 @@
#include "vec/runtime/vdatetime_value.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"
VOrcOutputStream::VOrcOutputStream(doris::io::FileWriter* file_writer)
: _file_writer(file_writer), _cur_pos(0), _written_len(0), _name("VOrcOutputStream") {}

Expand Down Expand Up @@ -344,7 +346,7 @@ Status VOrcTransformer::write(const Block& block) {
}
}};

size_t sz = block.rows();
int sz = cast_set<int>(block.rows());
auto row_batch = _create_row_batch(sz);
auto* root = dynamic_cast<orc::StructVectorBatch*>(row_batch.get());
try {
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/runtime/vorc_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "vec/runtime/vparquet_transformer.h"

namespace doris {
#include "common/compile_check_begin.h"
namespace io {
class FileWriter;
} // namespace io
Expand Down Expand Up @@ -131,3 +132,5 @@ class VOrcTransformer final : public VFileFormatTransformer {
};

} // namespace doris::vectorized

#include "common/compile_check_end.h"
1 change: 1 addition & 0 deletions be/src/vec/runtime/vparquet_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
#include "vec/runtime/vdatetime_value.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"

ParquetOutputStream::ParquetOutputStream(doris::io::FileWriter* file_writer)
: _file_writer(file_writer), _cur_pos(0), _written_len(0) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/runtime/vparquet_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "vfile_format_transformer.h"

namespace doris {
#include "common/compile_check_begin.h"
namespace io {
class FileWriter;
} // namespace io
Expand Down Expand Up @@ -134,3 +135,5 @@ class VParquetTransformer final : public VFileFormatTransformer {
};

} // namespace doris::vectorized

#include "common/compile_check_end.h"
1 change: 1 addition & 0 deletions be/src/vec/runtime/vsorted_run_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "vec/utils/util.hpp"

namespace doris::vectorized {
#include "common/compile_check_begin.h"

VSortedRunMerger::VSortedRunMerger(const VExprContextSPtrs& ordering_expr,
const std::vector<bool>& is_asc_order,
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/runtime/vsorted_run_merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "vec/exprs/vexpr_fwd.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"

// VSortedRunMerger is used to merge multiple sorted runs of blocks. A run is a sorted
// sequence of blocks, which are fetched from a BlockSupplier function object.
Expand Down Expand Up @@ -98,3 +99,5 @@ class VSortedRunMerger {
};

} // namespace doris::vectorized

#include "common/compile_check_end.h"
Loading