diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index e30534ef6ed6..cc7e20a35281 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -2,6 +2,7 @@ #include "defs.h" #include "blobstorage_pdisk_category.h" +#include "blobstorage_relevance.h" #include "boot_type.h" #include "events.h" #include "tablet_types.h" @@ -39,10 +40,6 @@ static constexpr ui64 MaxCollectGarbageFlagsPerMessage = 10000; static constexpr TDuration VDiskCooldownTimeout = TDuration::Seconds(15); static constexpr TDuration VDiskCooldownTimeoutOnProxy = TDuration::Seconds(12); -struct TMessageRelevanceTracker {}; -using TMessageRelevanceOwner = std::shared_ptr; -using TMessageRelevanceWatcher = std::weak_ptr; - struct TStorageStatusFlags { ui32 Raw = 0; @@ -1076,6 +1073,17 @@ struct TEvBlobStorage { std::vector> ExtraBlockChecks; // (TabletId, Generation) pairs std::optional ExternalRelevanceWatcher; + struct TParameters { + TLogoBlobID BlobId; + TRope Buffer; + TInstant Deadline; + NKikimrBlobStorage::EPutHandleClass HandleClass = NKikimrBlobStorage::TabletLog; + ETactic Tactic = TacticDefault; + bool IssueKeepFlag = false; + bool IgnoreBlock = false; + std::optional ExternalRelevanceWatcher = std::nullopt; + }; + TEvPut(TCloneEventPolicy, const TEvPut& origin) : Id(origin.Id) , Buffer(origin.Buffer) @@ -1088,58 +1096,84 @@ struct TEvBlobStorage { , ExternalRelevanceWatcher(origin.ExternalRelevanceWatcher) {} - TEvPut(const TLogoBlobID &id, TRope &&buffer, TInstant deadline, - NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog, - ETactic tactic = TacticDefault, bool issueKeepFlag = false, bool ignoreBlock = false, - std::optional externalRelevanceWatcher = std::nullopt) - : Id(id) - , Buffer(std::move(buffer)) - , Deadline(deadline) - , HandleClass(handleClass) - , Tactic(tactic) - , IssueKeepFlag(issueKeepFlag) - , IgnoreBlock(ignoreBlock) - , ExternalRelevanceWatcher(externalRelevanceWatcher) + TEvPut(TParameters parameters) + : Id(parameters.BlobId) + , Buffer(std::move(parameters.Buffer)) + , Deadline(parameters.Deadline) + , HandleClass(parameters.HandleClass) + , Tactic(parameters.Tactic) + , IssueKeepFlag(parameters.IssueKeepFlag) + , IgnoreBlock(parameters.IgnoreBlock) + , ExternalRelevanceWatcher(std::move(parameters.ExternalRelevanceWatcher)) { Y_ABORT_UNLESS(Id, "EvPut invalid: LogoBlobId must have non-zero tablet field, id# %s", Id.ToString().c_str()); Y_ABORT_UNLESS(Buffer.size() < (40 * 1024 * 1024), "EvPut invalid: LogoBlobId# %s buffer.Size# %zu", - id.ToString().data(), Buffer.size()); - Y_ABORT_UNLESS(Buffer.size() == id.BlobSize(), + Id.ToString().data(), Buffer.size()); + Y_ABORT_UNLESS(Buffer.size() == Id.BlobSize(), "EvPut invalid: LogoBlobId# %s buffer.Size# %zu", - id.ToString().data(), Buffer.size()); - REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&id, sizeof(id)); + Id.ToString().data(), Buffer.size()); + REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&Id, sizeof(Id)); REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(Buffer.GetContiguousSpan().Data(), Buffer.size()); - REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&deadline, sizeof(deadline)); - REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&handleClass, sizeof(handleClass)); - REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&tactic, sizeof(tactic)); + REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&Deadline, sizeof(Deadline)); + REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&HandleClass, sizeof(HandleClass)); + REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(&Tactic, sizeof(Tactic)); } + + TEvPut(const TLogoBlobID &id, TRope &&buffer, TInstant deadline, + NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog, + ETactic tactic = TacticDefault, bool issueKeepFlag = false, bool ignoreBlock = false) + : TEvPut(TParameters{ + .BlobId = id, + .Buffer = std::move(buffer), + .Deadline = deadline, + .HandleClass = handleClass, + .Tactic = tactic, + .IssueKeepFlag = issueKeepFlag, + .IgnoreBlock = ignoreBlock, + }) + {} + TEvPut(const TLogoBlobID &id, TRcBuf &&buffer, TInstant deadline, NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog, - ETactic tactic = TacticDefault, bool issueKeepFlag = false, - std::optional externalRelevanceWatcher = std::nullopt) - : TEvPut(id, TRope(std::move(buffer)), deadline, handleClass, tactic, issueKeepFlag, - /*ignoreBlock=*/false, std::move(externalRelevanceWatcher)) + ETactic tactic = TacticDefault, bool issueKeepFlag = false) + : TEvPut(TParameters{ + .BlobId = id, + .Buffer = TRope(std::move(buffer)), + .Deadline = deadline, + .HandleClass = handleClass, + .Tactic = tactic, + .IssueKeepFlag = issueKeepFlag, + }) {} TEvPut(const TLogoBlobID &id, const TString &buffer, TInstant deadline, NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog, - ETactic tactic = TacticDefault, bool issueKeepFlag = false, - std::optional externalRelevanceWatcher = std::nullopt) - : TEvPut(id, TRope(buffer), deadline, handleClass, tactic, issueKeepFlag, - /*ignoreBlock=*/false, std::move(externalRelevanceWatcher)) + ETactic tactic = TacticDefault, bool issueKeepFlag = false) + : TEvPut(TParameters{ + .BlobId = id, + .Buffer = TRope(buffer), + .Deadline = deadline, + .HandleClass = handleClass, + .Tactic = tactic, + .IssueKeepFlag = issueKeepFlag, + }) {} TEvPut(const TLogoBlobID &id, const TSharedData &buffer, TInstant deadline, NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog, - ETactic tactic = TacticDefault, bool issueKeepFlag = false, - std::optional externalRelevanceWatcher = std::nullopt) - : TEvPut(id, TRope(buffer), deadline, handleClass, tactic, issueKeepFlag, - /*ignoreBlock=*/false, std::move(externalRelevanceWatcher)) + ETactic tactic = TacticDefault, bool issueKeepFlag = false) + : TEvPut(TParameters{ + .BlobId = id, + .Buffer = TRope(buffer), + .Deadline = deadline, + .HandleClass = handleClass, + .Tactic = tactic, + .IssueKeepFlag = issueKeepFlag, + }) {} - TString Print(bool isFull) const { TStringStream str; str << "TEvPut {Id# " << Id.ToString(); diff --git a/ydb/core/base/blobstorage_relevance.cpp b/ydb/core/base/blobstorage_relevance.cpp new file mode 100644 index 000000000000..33d8a085ce7c --- /dev/null +++ b/ydb/core/base/blobstorage_relevance.cpp @@ -0,0 +1,15 @@ +#include "blobstorage_relevance.h" + +namespace NKikimr { + +TMessageRelevance::TMessageRelevance(const TMessageRelevanceOwner& onwer, + std::optional external) + : InternalWatcher(onwer) + , ExternalWatcher(external) +{} + +bool TMessageRelevance::IsRelevant() const { + return !InternalWatcher.expired() && (!ExternalWatcher || !ExternalWatcher->expired()); +} + +} // namespace NKikimr diff --git a/ydb/core/base/blobstorage_relevance.h b/ydb/core/base/blobstorage_relevance.h new file mode 100644 index 000000000000..1a163728bdca --- /dev/null +++ b/ydb/core/base/blobstorage_relevance.h @@ -0,0 +1,26 @@ +#pragma once + +#include +#include + +namespace NKikimr { + +struct TMessageRelevanceTracker {}; +using TMessageRelevanceOwner = std::shared_ptr; +using TMessageRelevanceWatcher = std::weak_ptr; + +class TMessageRelevance { +public: + TMessageRelevance() = default; + TMessageRelevance(const TMessageRelevanceOwner& owner, + std::optional external = std::nullopt); + bool IsRelevant() const; + +private: + // tracks request actor state and cancels request when actor dies + TMessageRelevanceWatcher InternalWatcher; + // can be passed as request parameter to cancel request on demand + std::optional ExternalWatcher; +}; + +} // namespace NKikimr diff --git a/ydb/core/base/ya.make b/ydb/core/base/ya.make index 14bd5c9000ef..771e8639f8a3 100644 --- a/ydb/core/base/ya.make +++ b/ydb/core/base/ya.make @@ -16,6 +16,7 @@ SRCS( blobstorage.h blobstorage.cpp blobstorage_grouptype.cpp + blobstorage_relevance.cpp boot_type.h boot_type.cpp channel_profiles.h diff --git a/ydb/core/blobstorage/backpressure/event.h b/ydb/core/blobstorage/backpressure/event.h index 7400a9ce8850..73f17caecc73 100644 --- a/ydb/core/blobstorage/backpressure/event.h +++ b/ydb/core/blobstorage/backpressure/event.h @@ -40,7 +40,7 @@ class TEventHolder { TIntrusivePtr Buffer; TBSProxyContextPtr BSProxyCtx; std::unique_ptr LocalEvent; - std::optional> Tracker; + std::optional Tracker; public: TEventHolder() @@ -93,7 +93,7 @@ class TEventHolder { } bool Relevant() const { - return !Tracker || !Tracker->expired(); + return !Tracker || Tracker->IsRelevant(); } ui32 GetByteSize() const { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h index 26c94c9d9687..35b324e7b152 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy.h @@ -221,6 +221,8 @@ class TBlobStorageGroupRequestActor : public TActor()) + , ExternalRelevanceWatcher(std::move(params.Common.ExternalRelevanceWatcher)) , LatencyQueueKind(params.Common.LatencyQueueKind) , RacingDomains(&Info->GetTopology()) , ExecutionRelay(std::move(params.Common.ExecutionRelay)) @@ -240,12 +242,6 @@ class TBlobStorageGroupRequestActor : public TActor(); - } } virtual ~TBlobStorageGroupRequestActor() = default; @@ -326,7 +322,8 @@ class TBlobStorageGroupRequestActor : public TActor Relevance; + TMessageRelevanceOwner RelevanceOwner; + std::optional ExternalRelevanceWatcher; ui32 RequestsInFlight = 0; std::unique_ptr Response; const TMaybe LatencyQueueKind; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp index 14c23df78ad1..5e88008d2b9f 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp @@ -1066,8 +1066,7 @@ namespace NKikimr { if constexpr (!std::is_same_v && !std::is_same_v) { - std::visit([&](const auto& relevance) { ev.MessageRelevanceTracker = relevance; }, - Relevance); + ev.MessageRelevanceTracker = TMessageRelevance(RelevanceOwner, ExternalRelevanceWatcher); ui64 cost; if constexpr (std::is_same_v) { bool internalQueue; @@ -1132,17 +1131,11 @@ namespace NKikimr { } bool TBlobStorageGroupRequestActor::CheckForExternalCancellation() { - bool cancelled = false; - std::visit( - TOverloaded{ - [&](const TMessageRelevanceOwner&) { cancelled = false; }, - [&](const TMessageRelevanceWatcher& watcher) { cancelled = watcher.expired(); } - }, Relevance); - - if (cancelled) { + if (ExternalRelevanceWatcher && ExternalRelevanceWatcher->expired()) { ReplyAndDie(NKikimrProto::ERROR); + return true; } - return cancelled; + return false; } void TBlobStorageGroupProxy::Handle(TEvGetQueuesInfo::TPtr ev) { diff --git a/ydb/core/blobstorage/ut_blobstorage/cancellation.cpp b/ydb/core/blobstorage/ut_blobstorage/cancellation.cpp index 482e36fe2fe9..36b1d21cd6b3 100644 --- a/ydb/core/blobstorage/ut_blobstorage/cancellation.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/cancellation.cpp @@ -35,9 +35,14 @@ Y_UNIT_TEST_SUITE(Cancellation) { { TString data = MakeData(10); TLogoBlobID blobId(1, 1, 1, 1, data.size(), 1); - TEvBlobStorage::TEvPut* ev = new TEvBlobStorage::TEvPut(blobId, data, TInstant::Max(), - NKikimrBlobStorage::TabletLog, TEvBlobStorage::TEvPut::TacticDefault, - false, owner); + TEvBlobStorage::TEvPut* ev = new TEvBlobStorage::TEvPut( + TEvBlobStorage::TEvPut::TParameters{ + .BlobId = blobId, + .Buffer = TRope(data), + .Deadline = TInstant::Max(), + .ExternalRelevanceWatcher = owner, + } + ); owner.reset(); ctx.Env->Runtime->WrapInActorContext(ctx.Edge, [&] { SendToBSProxy(ctx.Edge, ctx.GroupId, ev); diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h index 79296998c89c..390619c7745b 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h @@ -570,7 +570,7 @@ namespace NKikimr { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// struct TEventWithRelevanceTracker { - std::optional MessageRelevanceTracker; + std::optional MessageRelevanceTracker; }; struct TEvBlobStorage::TEvVPut