Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/tablet/tablet_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace NKikimr {

IActor* CreateTabletReqRebuildHistoryGraph(const TActorId &owner, TTabletStorageInfo *info, ui32 blockedGen, NTracing::ITrace *trace, ui64 followerCookie);
IActor* CreateTabletFindLastEntry(const TActorId &owner, bool readBody, TTabletStorageInfo *info, ui32 blockedGen, bool leader);
IActor* CreateTabletReqWriteLog(const TActorId &owner, const TLogoBlobID &entryId, NKikimrTabletBase::TTabletLogEntry *entry, TVector<TEvTablet::TLogEntryReference> &refs, TEvBlobStorage::TEvPut::ETactic commitTactic, TTabletStorageInfo *info, NWilson::TTraceId traceId = {});
IActor* CreateTabletReqWriteLog(const TActorId &owner, const TLogoBlobID &entryId, NKikimrTabletBase::TTabletLogEntry *entry, TVector<TEvTablet::TLogEntryReference> &refs, TEvBlobStorage::TEvPut::ETactic commitTactic, TTabletStorageInfo *info, TMessageRelevanceWatcher relevance, NWilson::TTraceId traceId = {});
IActor* CreateTabletReqBlockBlobStorage(const TActorId &owner, TTabletStorageInfo *info, ui32 generation, bool blockPrevEntry);
IActor* CreateTabletReqDelete(const TActorId &owner, const TIntrusivePtr<TTabletStorageInfo> &tabletStorageInfo, ui32 generation = std::numeric_limits<ui32>::max());

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tablet/tablet_req_reset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class TTabletReqReset : public TActorBootstrapped<TTabletReqReset> {
const TIntrusivePtr<TTabletStorageInfo> TabletStorageInfo;
ui32 Generation = 0;
TActorId CurrentLeader;
TMessageRelevanceOwner Relevance = std::make_shared<TMessageRelevanceTracker>();

void ReplyAndDie(NKikimrProto::EReplyStatus status, const TActorContext& ctx) {
ctx.Send(Owner, new TEvTablet::TEvResetTabletResult(status, TabletStorageInfo->TabletID));
Expand Down Expand Up @@ -62,7 +63,7 @@ class TTabletReqReset : public TActorBootstrapped<TTabletReqReset> {
return ReplyAndDie(ev->Get()->Status, ctx);
}

TTablet::ExternalWriteZeroEntry(TabletStorageInfo.Get(), Generation + 1, SelfId());
TTablet::ExternalWriteZeroEntry(TabletStorageInfo.Get(), Generation + 1, SelfId(), Relevance);
Become(&TTabletReqReset::StateWriteZeroEntry);
}

Expand Down
11 changes: 7 additions & 4 deletions ydb/core/tablet/tablet_req_writelog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class TTabletReqWriteLog : public TActorBootstrapped<TTabletReqWriteLog> {
TVector<ui32> YellowStopChannels;
THashMap<ui32, float> ApproximateFreeSpaceShareByChannel;

TMessageRelevanceWatcher Relevance;

NWilson::TSpan RequestSpan;
TMap<TLogoBlobID, NWilson::TSpan> BlobSpans;

Expand Down Expand Up @@ -143,7 +145,7 @@ class TTabletReqWriteLog : public TActorBootstrapped<TTabletReqWriteLog> {
ui64 cookie = RandomNumber<ui64>();
RequestCookies ^= cookie;

SendPutToGroup(ctx, x->GroupID, Info.Get(), MakeHolder<TEvBlobStorage::TEvPut>(id, buffer, TInstant::Max(), handleClass, tactic), cookie, std::move(traceId));
SendPutToGroup(ctx, x->GroupID, Info.Get(), MakeHolder<TEvBlobStorage::TEvPut>(id, buffer, TInstant::Max(), handleClass, tactic, false, Relevance), cookie, std::move(traceId));
}

public:
Expand All @@ -152,13 +154,14 @@ class TTabletReqWriteLog : public TActorBootstrapped<TTabletReqWriteLog> {
}

TTabletReqWriteLog(const TActorId &owner, const TLogoBlobID &logid, NKikimrTabletBase::TTabletLogEntry *entry, TVector<TEvTablet::TLogEntryReference> &refs,
TEvBlobStorage::TEvPut::ETactic commitTactic, TTabletStorageInfo *info, NWilson::TTraceId traceId)
TEvBlobStorage::TEvPut::ETactic commitTactic, TTabletStorageInfo *info, TMessageRelevanceWatcher relevance, NWilson::TTraceId traceId)
: Owner(owner)
, LogEntryID(logid)
, LogEntry(entry)
, CommitTactic(commitTactic)
, Info(info)
, RepliesToWait(Max<ui32>())
, Relevance(std::move(relevance))
, RequestSpan(TWilsonTablet::TabletDetailed, std::move(traceId), "Tablet.WriteLog")
{
References.swap(refs);
Expand Down Expand Up @@ -215,8 +218,8 @@ class TTabletReqWriteLog : public TActorBootstrapped<TTabletReqWriteLog> {
}
};

IActor* CreateTabletReqWriteLog(const TActorId &owner, const TLogoBlobID &entryId, NKikimrTabletBase::TTabletLogEntry *entry, TVector<TEvTablet::TLogEntryReference> &refs, TEvBlobStorage::TEvPut::ETactic commitTactic, TTabletStorageInfo *info, NWilson::TTraceId traceId) {
return new TTabletReqWriteLog(owner, entryId, entry, refs, commitTactic, info, std::move(traceId));
IActor* CreateTabletReqWriteLog(const TActorId &owner, const TLogoBlobID &entryId, NKikimrTabletBase::TTabletLogEntry *entry, TVector<TEvTablet::TLogEntryReference> &refs, TEvBlobStorage::TEvPut::ETactic commitTactic, TTabletStorageInfo *info, TMessageRelevanceWatcher relevance, NWilson::TTraceId traceId) {
return new TTabletReqWriteLog(owner, entryId, entry, refs, commitTactic, info, std::move(relevance), std::move(traceId));
}

}
12 changes: 6 additions & 6 deletions ydb/core/tablet/tablet_sys.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ void TTablet::WriteZeroEntry(TEvTablet::TDependencyGraph *graph) {

const TLogoBlobID logid(TabletID(), StateStorageInfo.KnownGeneration, 0, 0, 0, 0);
TVector<TEvTablet::TLogEntryReference> refs;
Register(CreateTabletReqWriteLog(SelfId(), logid, entry.Release(), refs, TEvBlobStorage::TEvPut::TacticMinLatency, Info.Get()));
Register(CreateTabletReqWriteLog(SelfId(), logid, entry.Release(), refs, TEvBlobStorage::TEvPut::TacticMinLatency, Info.Get(), Relevance));

BLOG_D(" TTablet::WriteZeroEntry. logid# " << logid.ToString(), "TSYS01");

Expand Down Expand Up @@ -1318,7 +1318,7 @@ bool TTablet::HandleNext(TEvTablet::TEvCommit::TPtr &ev) {

entry->StateStorageConfirmed = true; // todo: do real query against state-storage (optionally?)
entry->Task = Register(
CreateTabletReqWriteLog(SelfId(), logid, x.release(), msg->References, msg->CommitTactic, Info.Get(), std::move(ev->TraceId))
CreateTabletReqWriteLog(SelfId(), logid, x.release(), msg->References, msg->CommitTactic, Info.Get(), Relevance, std::move(ev->TraceId))
);

Graph.StepsInFlight += 1;
Expand Down Expand Up @@ -1666,7 +1666,7 @@ void TTablet::ProgressFollowerQueue() {

TVector<TEvTablet::TLogEntryReference> refs;
Register(
CreateTabletReqWriteLog(SelfId(), entryId, entry.Release(), refs, TEvBlobStorage::TEvPut::ETactic::TacticMinLatency, Info.Get())
CreateTabletReqWriteLog(SelfId(), entryId, entry.Release(), refs, TEvBlobStorage::TEvPut::ETactic::TacticMinLatency, Info.Get(), Relevance)
);
}
}
Expand Down Expand Up @@ -2190,7 +2190,7 @@ void TTablet::Handle(TEvTablet::TEvCompleteRecoveryBoot::TPtr& ev) {

const TLogoBlobID logid(TabletID(), StateStorageInfo.KnownGeneration, 0, 0, 0, 0);
TVector<TEvTablet::TLogEntryReference> refs;
Register(CreateTabletReqWriteLog(SelfId(), logid, entry.Release(), refs, TEvBlobStorage::TEvPut::TacticMinLatency, Info.Get()));
Register(CreateTabletReqWriteLog(SelfId(), logid, entry.Release(), refs, TEvBlobStorage::TEvPut::TacticMinLatency, Info.Get(), Relevance));

ReportTabletStateChange(TTabletStateInfo::WriteZeroEntry);

Expand Down Expand Up @@ -2334,14 +2334,14 @@ void TTablet::Bootstrap() {
ReportTabletStateChange(TTabletStateInfo::ResolveStateStorage);
}

void TTablet::ExternalWriteZeroEntry(TTabletStorageInfo *info, ui32 gen, TActorIdentity owner) {
void TTablet::ExternalWriteZeroEntry(TTabletStorageInfo *info, ui32 gen, TActorIdentity owner, TMessageRelevanceWatcher relevance) {
THolder<NKikimrTabletBase::TTabletLogEntry> entry = MakeHolder<NKikimrTabletBase::TTabletLogEntry>();
entry->SetSnapshot(MakeGenStepPair(0, 0));
entry->SetZeroConfirmed(MakeGenStepPair(0, 0));
entry->SetZeroTailSz(0);
TLogoBlobID logid(info->TabletID, gen, 0, 0, 0, 0);
TVector<TEvTablet::TLogEntryReference> refs;
TActivationContext::Register(CreateTabletReqWriteLog(owner, logid, entry.Release(), refs, TEvBlobStorage::TEvPut::TacticDefault, info));
TActivationContext::Register(CreateTabletReqWriteLog(owner, logid, entry.Release(), refs, TEvBlobStorage::TEvPut::TacticDefault, info, std::move(relevance)));
}

TActorId TTabletSetupInfo::Apply(TTabletStorageInfo *info, TActorIdentity owner) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tablet/tablet_sys.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ class TTablet : public TActor<TTablet> {
THashMap<ui32, TInterconnectPending> InterconnectPending;
ui64 LastInterconnectSubscribeCookie = 0;

TMessageRelevanceOwner Relevance = std::make_shared<TMessageRelevanceTracker>();

ui64 TabletID() const;

void ReportTabletStateChange(ETabletState state);
Expand Down Expand Up @@ -742,7 +744,7 @@ class TTablet : public TActor<TTablet> {
);

TAutoPtr<IEventHandle> AfterRegister(const TActorId &self, const TActorId &parentId) override;
static void ExternalWriteZeroEntry(TTabletStorageInfo *info, ui32 gen, TActorIdentity owner);
static void ExternalWriteZeroEntry(TTabletStorageInfo *info, ui32 gen, TActorIdentity owner, TMessageRelevanceWatcher relevance);
};

}
4 changes: 3 additions & 1 deletion ydb/core/tablet_flat/flat_ops_compact.h
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,8 @@ namespace NTabletFlatExecutor {

auto flag = NKikimrBlobStorage::AsyncBlob;
auto *ev = new TEvPut(id.Logo, std::exchange(glob.Data, TString{ }), TInstant::Max(), flag,
TEvBlobStorage::TEvPut::ETactic::TacticMaxThroughput);
TEvBlobStorage::TEvPut::ETactic::TacticMaxThroughput, /* issueKeepFlag */ false,
/* relevance */ RelevanceTracker);
auto ctx = ActorContext();

SendToBSProxy(ctx, id.Group, ev);
Expand All @@ -607,6 +608,7 @@ namespace NTabletFlatExecutor {
private:
const TLogoBlobID Mask;
const TActorId Owner;
TMessageRelevanceOwner RelevanceTracker = std::make_shared<TMessageRelevanceTracker>();
TAutoPtr<NUtil::ILogger> Logger;
IDriver * Driver = nullptr;
THolder<TCompactCfg> Conf;
Expand Down
Loading