Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ void StreamLoadAction::handle(HttpRequest* req) {
ctx->load_cost_nanos = MonotonicNanos() - ctx->start_nanos;

if (!ctx->status.ok() && !ctx->status.is_publish_timeout()) {
if (ctx->need_rollback) {
if (ctx->need_rollback()) {
(void)_exec_env->stream_load_executor()->rollback_txn(ctx);
ctx->need_rollback = false;
ctx->clear_need_rollback();
}
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(ctx->status);
Expand Down Expand Up @@ -215,7 +215,8 @@ Status StreamLoadAction::_handle_batch_write(starrocks::HttpRequest* http_req, S
}

int StreamLoadAction::on_header(HttpRequest* req) {
auto* ctx = new StreamLoadContext(_exec_env, &StreamLoadMetrics::instance()->streaming_load_current_processing);
auto* ctx = new StreamLoadContext(_exec_env, _exec_env->load_stream_mgr(),
&StreamLoadMetrics::instance()->streaming_load_current_processing);
ctx->ref();
req->set_handler_ctx(ctx);

Expand Down Expand Up @@ -262,9 +263,9 @@ int StreamLoadAction::on_header(HttpRequest* req) {
auto st = _on_header(req, ctx);
if (!st.ok()) {
ctx->status = st;
if (ctx->need_rollback) {
if (ctx->need_rollback()) {
(void)_exec_env->stream_load_executor()->rollback_txn(ctx);
ctx->need_rollback = false;
ctx->clear_need_rollback();
}
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(st);
Expand Down
8 changes: 4 additions & 4 deletions be/src/http/action/transaction_stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ static void _send_reply(HttpRequest* req, const std::string& str) {
}

void TransactionManagerAction::_send_error_reply(HttpRequest* req, const Status& st) {
auto ctx = std::make_unique<StreamLoadContext>(_exec_env);
auto ctx = std::make_unique<StreamLoadContext>(_exec_env, _exec_env->load_stream_mgr());
ctx->label = req->header(HTTP_LABEL_KEY);

auto str = ctx->to_resp_json(req->param(HTTP_TXN_OP_KEY), st);
Expand Down Expand Up @@ -180,7 +180,7 @@ TransactionStreamLoadAction::TransactionStreamLoadAction(ExecEnv* exec_env) : _e
TransactionStreamLoadAction::~TransactionStreamLoadAction() = default;

void TransactionStreamLoadAction::_send_error_reply(HttpRequest* req, const Status& st) {
auto ctx = std::make_unique<StreamLoadContext>(_exec_env);
auto ctx = std::make_unique<StreamLoadContext>(_exec_env, _exec_env->load_stream_mgr());
ctx->label = req->header(HTTP_LABEL_KEY);

auto str = ctx->to_resp_json(TXN_LOAD, st);
Expand Down Expand Up @@ -214,7 +214,7 @@ void TransactionStreamLoadAction::handle(HttpRequest* req) {
ctx->last_active_ts = MonotonicNanos();

if (!ctx->status.ok()) {
if (ctx->need_rollback) {
if (ctx->need_rollback()) {
(void)_exec_env->transaction_mgr()->_rollback_transaction(ctx);
}
}
Expand Down Expand Up @@ -297,7 +297,7 @@ int TransactionStreamLoadAction::on_header(HttpRequest* req) {
auto st = _on_header(req, ctx);
if (!st.ok()) {
ctx->status = st;
if (ctx->need_rollback) {
if (ctx->need_rollback()) {
(void)_exec_env->transaction_mgr()->_rollback_transaction(ctx);
}
auto resp = _exec_env->transaction_mgr()->_build_reply(TXN_LOAD, ctx);
Expand Down
7 changes: 4 additions & 3 deletions be/src/runtime/batch_write/batch_write_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ StatusOr<StreamLoadContext*> BatchWriteMgr::create_and_register_pipe(
auto pipe = std::make_shared<TimeBoundedStreamLoadPipe>(pipe_name, batch_write_interval_ms,
config::merge_commit_stream_load_pipe_block_wait_us,
config::merge_commit_stream_load_pipe_max_buffered_bytes);
RETURN_IF_ERROR(exec_env->load_stream_mgr()->put(load_id, pipe));
StreamLoadContext* ctx = new StreamLoadContext(exec_env, load_id);
auto* load_stream_mgr = exec_env->load_stream_mgr();
RETURN_IF_ERROR(load_stream_mgr->put(load_id, pipe));
StreamLoadContext* ctx = new StreamLoadContext(exec_env, load_id, load_stream_mgr);
ctx->ref();
ctx->id = load_id;
ctx->db = db;
Expand Down Expand Up @@ -170,7 +171,7 @@ static std::string s_empty;

void BatchWriteMgr::receive_stream_load_rpc(ExecEnv* exec_env, brpc::Controller* cntl,
const PStreamLoadRequest* request, PStreamLoadResponse* response) {
auto* ctx = new StreamLoadContext(exec_env);
auto* ctx = new StreamLoadContext(exec_env, exec_env->load_stream_mgr());
ctx->ref();
DeferOp defer([&]() {
response->set_json_result(ctx->to_json());
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/rejected_record_sync_daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ Status RejectedRecordSyncDaemon::post_to_stream_load(const std::string& payload)
// ROOT, skipping password / INSERT-privilege checks. The placeholder
// user/passwd fields below are kept syntactically valid; FE thrift
// ignores them once the token bypass fires.
StreamLoadContext* ctx = new StreamLoadContext(_env);
StreamLoadContext* ctx = new StreamLoadContext(_env, _env->load_stream_mgr());
ctx->ref();
DeferOp release([&] {
if (ctx->unref()) {
Expand Down
26 changes: 14 additions & 12 deletions be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ std::string build_kafka_source_info(const TKafkaLoadInfo& info) {
return std::string(buf.GetString(), buf.GetSize());
}

void set_need_rollback(StreamLoadContext* ctx, ExecEnv* exec_env) {
ctx->set_need_rollback([exec_env](StreamLoadContext* rollback_ctx) {
return exec_env->stream_load_executor()->rollback_txn(rollback_ctx);
});
}

} // namespace

Status RoutineLoadTaskExecutor::init(MetricRegistry* metrics) {
Expand Down Expand Up @@ -127,7 +133,7 @@ Status RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRe
DCHECK(request.has_kafka_info());

// This context is meaningless, just for unifing the interface
StreamLoadContext ctx(_exec_env);
StreamLoadContext ctx(_exec_env, _exec_env->load_stream_mgr());
ctx.load_type = TLoadType::ROUTINE_LOAD;
ctx.load_src_type = TLoadSourceType::KAFKA;
ctx.label = "NaN";
Expand All @@ -144,7 +150,6 @@ Status RoutineLoadTaskExecutor::get_kafka_partition_meta(const PKafkaMetaProxyRe
t_info.__set_properties(properties);

ctx.kafka_info = std::make_unique<KafkaLoadInfo>(t_info);
ctx.need_rollback = false;

std::shared_ptr<DataConsumer> consumer;
RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer));
Expand All @@ -170,7 +175,7 @@ Status RoutineLoadTaskExecutor::get_kafka_partition_offset(const PKafkaOffsetPro
DCHECK(request.has_kafka_info());

// This context is meaningless, just for unifing the interface
StreamLoadContext ctx(_exec_env);
StreamLoadContext ctx(_exec_env, _exec_env->load_stream_mgr());
ctx.load_type = TLoadType::ROUTINE_LOAD;
ctx.load_src_type = TLoadSourceType::KAFKA;
ctx.label = "NaN";
Expand All @@ -187,7 +192,6 @@ Status RoutineLoadTaskExecutor::get_kafka_partition_offset(const PKafkaOffsetPro
t_info.__set_properties(properties);

ctx.kafka_info = std::make_unique<KafkaLoadInfo>(t_info);
ctx.need_rollback = false;

// convert pb repeated value to vector
std::vector<int32_t> partition_ids;
Expand Down Expand Up @@ -220,7 +224,7 @@ Status RoutineLoadTaskExecutor::get_pulsar_partition_meta(const PPulsarMetaProxy
DCHECK(request.has_pulsar_info());

// This context is meaningless, just for unifing the interface
StreamLoadContext ctx(_exec_env);
StreamLoadContext ctx(_exec_env, _exec_env->load_stream_mgr());
ctx.load_type = TLoadType::ROUTINE_LOAD;
ctx.load_src_type = TLoadSourceType::PULSAR;
ctx.label = "NaN";
Expand All @@ -238,7 +242,6 @@ Status RoutineLoadTaskExecutor::get_pulsar_partition_meta(const PPulsarMetaProxy
t_info.__set_properties(properties);

ctx.pulsar_info = std::make_unique<PulsarLoadInfo>(t_info);
ctx.need_rollback = false;

std::shared_ptr<DataConsumer> consumer;
RETURN_IF_ERROR(_data_consumer_pool.get_consumer(&ctx, &consumer));
Expand All @@ -257,7 +260,7 @@ Status RoutineLoadTaskExecutor::get_pulsar_partition_backlog(const PPulsarBacklo
DCHECK(request.has_pulsar_info());

// This context is meaningless, just for unifing the interface
StreamLoadContext ctx(_exec_env);
StreamLoadContext ctx(_exec_env, _exec_env->load_stream_mgr());
ctx.load_type = TLoadType::ROUTINE_LOAD;
ctx.load_src_type = TLoadSourceType::PULSAR;
ctx.label = "NaN";
Expand All @@ -275,7 +278,6 @@ Status RoutineLoadTaskExecutor::get_pulsar_partition_backlog(const PPulsarBacklo
t_info.__set_properties(properties);

ctx.pulsar_info = std::make_unique<PulsarLoadInfo>(t_info);
ctx.need_rollback = false;

// convert pb repeated value to vector
std::vector<std::string> partitions;
Expand Down Expand Up @@ -314,7 +316,7 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
}

// create the context
auto* ctx = new StreamLoadContext(_exec_env);
auto* ctx = new StreamLoadContext(_exec_env, _exec_env->load_stream_mgr());
ctx->load_type = TLoadType::ROUTINE_LOAD;
ctx->load_src_type = task.type;
ctx->job_id = task.job_id;
Expand Down Expand Up @@ -348,7 +350,7 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
}
// the routine load task'txn has alreay began in FE.
// so it need to rollback if encounter error.
ctx->need_rollback = true;
set_need_rollback(ctx, _exec_env);
if (task.__isset.max_filter_ratio) {
ctx->max_filter_ratio = task.max_filter_ratio;
} else {
Expand Down Expand Up @@ -549,9 +551,9 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool
void RoutineLoadTaskExecutor::err_handler(StreamLoadContext* ctx, const Status& st, std::string_view err_msg) {
LOG(WARNING) << err_msg << " " << ctx->brief();
ctx->status = st;
if (ctx->need_rollback) {
if (ctx->need_rollback()) {
(void)_exec_env->stream_load_executor()->rollback_txn(ctx);
ctx->need_rollback = false;
ctx->clear_need_rollback();
}
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(st);
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/stream_load/stream_context_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ Status StreamContextMgr::create_channel_context(ExecEnv* exec_env, const std::st
int32_t format, StreamLoadContext*& ctx, const TUniqueId& load_id,
long txn_id) {
auto pipe = std::make_shared<StreamLoadPipe>(true);
RETURN_IF_ERROR(exec_env->load_stream_mgr()->put(load_id, pipe));
ctx = new StreamLoadContext(exec_env, load_id);
auto* load_stream_mgr = exec_env->load_stream_mgr();
RETURN_IF_ERROR(load_stream_mgr->put(load_id, pipe));
ctx = new StreamLoadContext(exec_env, load_id, load_stream_mgr);
if (ctx == nullptr) {
return Status::InternalError("allocate stream load context fail");
}
Expand All @@ -108,7 +109,6 @@ Status StreamContextMgr::create_channel_context(ExecEnv* exec_env, const std::st

ctx->start_nanos = UnixSeconds();
ctx->last_active_ts = ctx->start_nanos;
ctx->need_rollback = false;
ctx->format = static_cast<TFileFormatType::type>(format);

ctx->body_sink = pipe;
Expand Down
41 changes: 35 additions & 6 deletions be/src/runtime/stream_load/stream_load_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,38 @@

#include <fmt/format.h>

#include <utility>

#include "common/logging.h"
#include "common/system/master_info.h"
#include "compute_env/load/load_stream_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/stream_load_executor.h"

namespace starrocks {

StreamLoadContext::StreamLoadContext(ExecEnv* exec_env, LoadStreamMgr* load_stream_mgr, IntGauge* running_loads)
: StreamLoadContext(exec_env, UniqueId::gen_uid(), load_stream_mgr, running_loads) {}

StreamLoadContext::StreamLoadContext(ExecEnv* exec_env, UniqueId id, LoadStreamMgr* load_stream_mgr,
IntGauge* running_loads)
: id(id), _exec_env(exec_env), _load_stream_mgr(load_stream_mgr), _refs(0), _running_loads(running_loads) {
start_nanos = MonotonicNanos();
if (_running_loads != nullptr) {
_running_loads->increment(1);
}
}

StreamLoadContext::~StreamLoadContext() noexcept {
if (need_rollback) {
(void)_exec_env->stream_load_executor()->rollback_txn(this);
need_rollback = false;
if (_need_rollback) {
DCHECK(_rollback_txn_callback);
if (_rollback_txn_callback) {
(void)_rollback_txn_callback(this);
}
clear_need_rollback();
}

_exec_env->load_stream_mgr()->remove(id);
if (_load_stream_mgr != nullptr) {
_load_stream_mgr->remove(id);
}
if (_running_loads != nullptr) {
_running_loads->increment(-1);
}
Expand Down Expand Up @@ -335,6 +353,17 @@ bool StreamLoadContext::check_and_set_http_limiter(ConcurrentLimiter* limiter) {
return _http_limiter_guard->set_limiter(limiter);
}

void StreamLoadContext::set_need_rollback(RollbackTxnCallback callback) {
CHECK(callback);
_need_rollback = true;
_rollback_txn_callback = std::move(callback);
}

void StreamLoadContext::clear_need_rollback() {
_need_rollback = false;
_rollback_txn_callback = nullptr;
}

void StreamLoadContext::release(StreamLoadContext* context) {
if (context != nullptr && context->unref()) {
delete context;
Expand Down
26 changes: 15 additions & 11 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <rapidjson/prettywriter.h>

#include <cstdint>
#include <functional>
#include <future>
#include <vector>

Expand All @@ -58,6 +59,7 @@

namespace starrocks {

class LoadStreamMgr;
class RuntimeProfile;

// kafka related info
Expand Down Expand Up @@ -146,16 +148,12 @@ const std::string DEFAULT_WAREHOUSE = "default_warehouse";

class StreamLoadContext {
public:
explicit StreamLoadContext(ExecEnv* exec_env, IntGauge* running_loads = nullptr)
: StreamLoadContext(exec_env, UniqueId::gen_uid(), running_loads) {}

explicit StreamLoadContext(ExecEnv* exec_env, UniqueId id, IntGauge* running_loads = nullptr)
: id(id), _exec_env(exec_env), _refs(0), _running_loads(running_loads) {
start_nanos = MonotonicNanos();
if (_running_loads != nullptr) {
_running_loads->increment(1);
}
}
using RollbackTxnCallback = std::function<Status(StreamLoadContext*)>;

StreamLoadContext(ExecEnv* exec_env, LoadStreamMgr* load_stream_mgr, IntGauge* running_loads = nullptr);

StreamLoadContext(ExecEnv* exec_env, UniqueId id, LoadStreamMgr* load_stream_mgr,
IntGauge* running_loads = nullptr);

~StreamLoadContext() noexcept;

Expand All @@ -176,6 +174,10 @@ class StreamLoadContext {

bool check_and_set_http_limiter(ConcurrentLimiter* limiter);

void set_need_rollback(RollbackTxnCallback callback);
void clear_need_rollback();
bool need_rollback() const { return _need_rollback; }

static void release(StreamLoadContext* context);

// Returns the Thrift RPC timeout (in milliseconds) shared by the stream-load plan (put)
Expand Down Expand Up @@ -295,7 +297,6 @@ class StreamLoadContext {
std::mutex lock;

std::shared_ptr<MessageBodySink> body_sink;
bool need_rollback = false;
int64_t txn_id = -1;

std::promise<Status> promise;
Expand Down Expand Up @@ -361,8 +362,11 @@ class StreamLoadContext {

private:
ExecEnv* _exec_env;
LoadStreamMgr* _load_stream_mgr;
std::atomic<int> _refs;
IntGauge* _running_loads;
bool _need_rollback = false;
RollbackTxnCallback _rollback_txn_callback;
};

} // namespace starrocks
Loading
Loading