Skip to content

Commit 3e835ba

Browse files
authored
Add external canceller for TEvPut requests (#30553)
1 parent 3c7edea commit 3e835ba

File tree

7 files changed

+116
-13
lines changed

7 files changed

+116
-13
lines changed

ydb/core/base/blobstorage.h

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ static constexpr ui64 MaxCollectGarbageFlagsPerMessage = 10000;
3939
static constexpr TDuration VDiskCooldownTimeout = TDuration::Seconds(15);
4040
static constexpr TDuration VDiskCooldownTimeoutOnProxy = TDuration::Seconds(12);
4141

42+
struct TMessageRelevanceTracker {};
43+
using TMessageRelevanceOwner = std::shared_ptr<TMessageRelevanceTracker>;
44+
using TMessageRelevanceWatcher = std::weak_ptr<TMessageRelevanceTracker>;
4245

4346
struct TStorageStatusFlags {
4447
ui32 Raw = 0;
@@ -1071,6 +1074,7 @@ struct TEvBlobStorage {
10711074
const bool IgnoreBlock = false;
10721075
mutable NLWTrace::TOrbit Orbit;
10731076
std::vector<std::pair<ui64, ui32>> ExtraBlockChecks; // (TabletId, Generation) pairs
1077+
std::optional<TMessageRelevanceWatcher> ExternalRelevanceWatcher;
10741078

10751079
TEvPut(TCloneEventPolicy, const TEvPut& origin)
10761080
: Id(origin.Id)
@@ -1081,18 +1085,21 @@ struct TEvBlobStorage {
10811085
, IssueKeepFlag(origin.IssueKeepFlag)
10821086
, IgnoreBlock(origin.IgnoreBlock)
10831087
, ExtraBlockChecks(origin.ExtraBlockChecks)
1088+
, ExternalRelevanceWatcher(origin.ExternalRelevanceWatcher)
10841089
{}
10851090

10861091
TEvPut(const TLogoBlobID &id, TRope &&buffer, TInstant deadline,
10871092
NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog,
1088-
ETactic tactic = TacticDefault, bool issueKeepFlag = false, bool ignoreBlock = false)
1093+
ETactic tactic = TacticDefault, bool issueKeepFlag = false, bool ignoreBlock = false,
1094+
std::optional<TMessageRelevanceWatcher> externalRelevanceWatcher = std::nullopt)
10891095
: Id(id)
10901096
, Buffer(std::move(buffer))
10911097
, Deadline(deadline)
10921098
, HandleClass(handleClass)
10931099
, Tactic(tactic)
10941100
, IssueKeepFlag(issueKeepFlag)
10951101
, IgnoreBlock(ignoreBlock)
1102+
, ExternalRelevanceWatcher(externalRelevanceWatcher)
10961103
{
10971104
Y_ABORT_UNLESS(Id, "EvPut invalid: LogoBlobId must have non-zero tablet field, id# %s", Id.ToString().c_str());
10981105
Y_ABORT_UNLESS(Buffer.size() < (40 * 1024 * 1024),
@@ -1110,21 +1117,27 @@ struct TEvBlobStorage {
11101117

11111118
TEvPut(const TLogoBlobID &id, TRcBuf &&buffer, TInstant deadline,
11121119
NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog,
1113-
ETactic tactic = TacticDefault, bool issueKeepFlag = false)
1114-
: TEvPut(id, TRope(std::move(buffer)), deadline, handleClass, tactic, issueKeepFlag)
1120+
ETactic tactic = TacticDefault, bool issueKeepFlag = false,
1121+
std::optional<TMessageRelevanceWatcher> externalRelevanceWatcher = std::nullopt)
1122+
: TEvPut(id, TRope(std::move(buffer)), deadline, handleClass, tactic, issueKeepFlag,
1123+
/*ignoreBlock=*/false, std::move(externalRelevanceWatcher))
11151124
{}
11161125

11171126
TEvPut(const TLogoBlobID &id, const TString &buffer, TInstant deadline,
11181127
NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog,
1119-
ETactic tactic = TacticDefault, bool issueKeepFlag = false)
1120-
: TEvPut(id, TRope(buffer), deadline, handleClass, tactic, issueKeepFlag)
1128+
ETactic tactic = TacticDefault, bool issueKeepFlag = false,
1129+
std::optional<TMessageRelevanceWatcher> externalRelevanceWatcher = std::nullopt)
1130+
: TEvPut(id, TRope(buffer), deadline, handleClass, tactic, issueKeepFlag,
1131+
/*ignoreBlock=*/false, std::move(externalRelevanceWatcher))
11211132
{}
11221133

11231134

11241135
TEvPut(const TLogoBlobID &id, const TSharedData &buffer, TInstant deadline,
11251136
NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog,
1126-
ETactic tactic = TacticDefault, bool issueKeepFlag = false)
1127-
: TEvPut(id, TRope(buffer), deadline, handleClass, tactic, issueKeepFlag)
1137+
ETactic tactic = TacticDefault, bool issueKeepFlag = false,
1138+
std::optional<TMessageRelevanceWatcher> externalRelevanceWatcher = std::nullopt)
1139+
: TEvPut(id, TRope(buffer), deadline, handleClass, tactic, issueKeepFlag,
1140+
/*ignoreBlock=*/false, std::move(externalRelevanceWatcher))
11281141
{}
11291142

11301143
TString Print(bool isFull) const {

ydb/core/blobstorage/dsproxy/dsproxy.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ const ui64 UnconfiguredBufferSizeLimit = 32 << 20;
3838
const TDuration ProxyEstablishSessionsTimeout = TDuration::Seconds(5);
3939

4040
const TDuration DsMinimumDelayBetweenPutWakeups = TDuration::Seconds(1);
41+
const TDuration DsMaximumDelayBetweenPutWakeups = TDuration::Seconds(60);
4142

4243
const ui64 BufferSizeThreshold = 1 << 20;
4344

@@ -195,6 +196,8 @@ class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActo
195196

196197
std::optional<ui32> ForceGroupGeneration; // work only with this specific group generation and nothing else
197198
bool DoSendDeathNote = true; // unschedules DSProxy timeout on termination, be careful with disabling
199+
200+
std::optional<TMessageRelevanceWatcher> ExternalRelevanceWatcher = std::nullopt;
198201
};
199202

200203
struct TTypeSpecificParameters {
@@ -237,6 +240,12 @@ class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActo
237240
}
238241

239242
Y_ABORT_UNLESS(CostModel);
243+
244+
if (params.Common.ExternalRelevanceWatcher) {
245+
Relevance = std::move(*params.Common.ExternalRelevanceWatcher);
246+
} else {
247+
Relevance = std::make_shared<TMessageRelevanceTracker>();
248+
}
240249
}
241250

242251
virtual ~TBlobStorageGroupRequestActor() = default;
@@ -290,6 +299,8 @@ class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActo
290299
static double GetTotalTimeMs(const NKikimrBlobStorage::TTimestamps& timestamps);
291300
static double GetVDiskTimeMs(const NKikimrBlobStorage::TTimestamps& timestamps);
292301

302+
bool CheckForExternalCancellation();
303+
293304
private:
294305
void CheckPostponedQueue();
295306

@@ -315,7 +326,7 @@ class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActo
315326
private:
316327
const TActorId Source;
317328
const ui64 Cookie;
318-
std::shared_ptr<TMessageRelevanceTracker> MessageRelevanceTracker = std::make_shared<TMessageRelevanceTracker>();
329+
std::variant<TMessageRelevanceOwner, TMessageRelevanceWatcher> Relevance;
319330
ui32 RequestsInFlight = 0;
320331
std::unique_ptr<IEventBase> Response;
321332
const TMaybe<TGroupStat::EKind> LatencyQueueKind;

ydb/core/blobstorage/dsproxy/dsproxy_put.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
311311
GetVDiskTimeMs(record.GetTimestamps()));
312312
}
313313

314+
if (CheckForExternalCancellation()) {
315+
return;
316+
}
317+
314318
if (status == NKikimrProto::BLOCKED || status == NKikimrProto::DEADLINE) {
315319
TString error = TStringBuilder() << "Got VPutResult status# " << status << " from VDiskId# " << vdiskId;
316320
TPutImpl::TPutResultVec putResults;
@@ -387,6 +391,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
387391
}
388392
}
389393

394+
if (CheckForExternalCancellation()) {
395+
return;
396+
}
397+
390398
// Handle put results
391399
bool isCauseRegistered = !RootCauseTrack.IsOn;
392400
TPutImpl::TPutResultVec putResults;
@@ -736,6 +744,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
736744
<< " Not answered in "
737745
<< (TActivationContext::Monotonic() - RequestStartTime) << " seconds");
738746

747+
if (CheckForExternalCancellation()) {
748+
return;
749+
}
750+
739751
const TInstant now = TActivationContext::Now();
740752
while (!PutDeadlineMasks.empty()) {
741753
auto [deadline, mask] = *PutDeadlineMasks.begin();
@@ -841,7 +853,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
841853
}
842854

843855
if (deadline != TInstant::Max()) {
844-
Schedule(deadline, new TKikimrEvents::TEvWakeup);
856+
Schedule(std::min(now + DsMaximumDelayBetweenPutWakeups, deadline), new TKikimrEvents::TEvWakeup);
845857
}
846858
}
847859

ydb/core/blobstorage/dsproxy/dsproxy_request.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ namespace NKikimr {
245245
.ExecutionRelay = ev->Get()->ExecutionRelay,
246246
.LatencyQueueKind = kind,
247247
.ForceGroupGeneration = ev->Get()->ForceGroupGeneration,
248+
.ExternalRelevanceWatcher = ev->Get()->ExternalRelevanceWatcher,
248249
},
249250
.TimeStatsEnabled = Mon->TimeStats.IsEnabled(),
250251
.Stats = PerDiskStats,
@@ -585,6 +586,7 @@ namespace NKikimr {
585586
.ExecutionRelay = ev->Get()->ExecutionRelay,
586587
.LatencyQueueKind = kind,
587588
.ForceGroupGeneration = forceGroupGeneration,
589+
.ExternalRelevanceWatcher = ev->Get()->ExternalRelevanceWatcher,
588590
},
589591
.TimeStatsEnabled = Mon->TimeStats.IsEnabled(),
590592
.Stats = PerDiskStats,
@@ -1064,7 +1066,8 @@ namespace NKikimr {
10641066

10651067
if constexpr (!std::is_same_v<T, TEvBlobStorage::TEvVStatus> &&
10661068
!std::is_same_v<T, TEvBlobStorage::TEvVAssimilate>) {
1067-
ev.MessageRelevanceTracker = MessageRelevanceTracker;
1069+
std::visit([&](const auto& relevance) { ev.MessageRelevanceTracker = relevance; },
1070+
Relevance);
10681071
ui64 cost;
10691072
if constexpr (std::is_same_v<T, TEvBlobStorage::TEvVMultiPut>) {
10701073
bool internalQueue;
@@ -1128,6 +1131,20 @@ namespace NKikimr {
11281131
return true;
11291132
}
11301133

1134+
bool TBlobStorageGroupRequestActor::CheckForExternalCancellation() {
1135+
bool cancelled = false;
1136+
std::visit(
1137+
TOverloaded{
1138+
[&](const TMessageRelevanceOwner&) { cancelled = false; },
1139+
[&](const TMessageRelevanceWatcher& watcher) { cancelled = watcher.expired(); }
1140+
}, Relevance);
1141+
1142+
if (cancelled) {
1143+
ReplyAndDie(NKikimrProto::ERROR);
1144+
}
1145+
return cancelled;
1146+
}
1147+
11311148
void TBlobStorageGroupProxy::Handle(TEvGetQueuesInfo::TPtr ev) {
11321149
ui32 groupSize = Info->GetTotalVDisksNum();
11331150
std::unique_ptr<TEvQueuesInfo> res = std::make_unique<TEvQueuesInfo>(groupSize);
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
2+
#include <ydb/core/blobstorage/ut_blobstorage/lib/common.h>
3+
4+
#include <util/stream/null.h>
5+
6+
#include <ydb/core/blobstorage/ut_blobstorage/lib/ut_helpers.h>
7+
8+
#define Ctest Cnull
9+
10+
Y_UNIT_TEST_SUITE(Cancellation) {
11+
using TTestCtx = TTestCtxBase;
12+
13+
Y_UNIT_TEST(CancelPut) {
14+
TBlobStorageGroupType erasure = TBlobStorageGroupType::Erasure4Plus2Block;
15+
TTestCtx ctx({
16+
.NodeCount = erasure.BlobSubgroupSize() + 1,
17+
.Erasure = erasure,
18+
.MaxPutTimeoutDSProxy = TDuration::Minutes(10),
19+
});
20+
21+
ctx.Initialize();
22+
ctx.AllocateEdgeActorOnSpecificNode(ctx.NodeCount);
23+
24+
TMessageRelevanceOwner owner = std::make_shared<TMessageRelevanceTracker>();
25+
26+
ctx.Env->Runtime->FilterFunction = [&](ui32, std::unique_ptr<IEventHandle>& ev) -> bool {
27+
if (ev->GetTypeRewrite() == TEvBlobStorage::TEvVPut::EventType &&
28+
ev->Sender.NodeId() != ev->Recipient.NodeId()) {
29+
UNIT_FAIL("Unexpected TEvVPut event# " << ev->Sender << "->" <<
30+
ev->Recipient << " " << ev->ToString());
31+
}
32+
return true;
33+
};
34+
35+
{
36+
TString data = MakeData(10);
37+
TLogoBlobID blobId(1, 1, 1, 1, data.size(), 1);
38+
TEvBlobStorage::TEvPut* ev = new TEvBlobStorage::TEvPut(blobId, data, TInstant::Max(),
39+
NKikimrBlobStorage::TabletLog, TEvBlobStorage::TEvPut::TacticDefault,
40+
false, owner);
41+
owner.reset();
42+
ctx.Env->Runtime->WrapInActorContext(ctx.Edge, [&] {
43+
SendToBSProxy(ctx.Edge, ctx.GroupId, ev);
44+
});
45+
}
46+
47+
auto res = ctx.Env->WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(ctx.Edge, false,
48+
TAppData::TimeProvider->Now() + TDuration::Seconds(65));
49+
UNIT_ASSERT(res);
50+
}
51+
}

ydb/core/blobstorage/ut_blobstorage/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ SRCS(
1717
backpressure.cpp
1818
block_race.cpp
1919
bsc_cache.cpp
20+
cancellation.cpp
2021
counting_events.cpp
2122
corrupted_reads.cpp
2223
deadlines.cpp

ydb/core/blobstorage/vdisk/common/vdisk_events.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -569,10 +569,8 @@ namespace NKikimr {
569569
// TEvVPut
570570
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
571571

572-
struct TMessageRelevanceTracker {};
573-
574572
struct TEventWithRelevanceTracker {
575-
std::optional<std::weak_ptr<TMessageRelevanceTracker>> MessageRelevanceTracker;
573+
std::optional<TMessageRelevanceWatcher> MessageRelevanceTracker;
576574
};
577575

578576
struct TEvBlobStorage::TEvVPut

0 commit comments

Comments
 (0)