@@ -323,6 +323,7 @@ class TSubscriptionClientSender : public TActorBootstrapped<TSubscriptionClientS
323323 HFuncTraced (TEvConsole::TEvConfigSubscriptionNotification, Handle);
324324 HFuncTraced (TEvents::TEvPoisonPill, Handle);
325325 HFuncTraced (TEvents::TEvUndelivered, Handle);
326+ HFuncTraced (TEvents::TEvWakeup, Handle);
326327 HFuncTraced (TEvInterconnect::TEvNodeDisconnected, Handle);
327328 IgnoreFunc (TEvInterconnect::TEvNodeConnected);
328329
@@ -351,6 +352,13 @@ class TSubscriptionClientSender : public TActorBootstrapped<TSubscriptionClientS
351352 Die (ctx);
352353 }
353354
355+ void Handle (TEvents::TEvWakeup::TPtr &/* ev*/ , const TActorContext &ctx)
356+ {
357+ LOG_DEBUG_S (ctx, NKikimrServices::CMS_CONFIGS,
358+ " TSubscriptionClientSender(" << Subscription->Subscriber .ToString () << " ) received wake up" );
359+ Send (OwnerId, new TConfigsProvider::TEvPrivate::TEvWorkerCoolDown (Subscription));
360+ }
361+
354362 void Handle (TEvInterconnect::TEvNodeDisconnected::TPtr &/* ev*/ , const TActorContext &ctx)
355363 {
356364 LOG_DEBUG_S (ctx, NKikimrServices::CMS_CONFIGS,
@@ -370,6 +378,8 @@ class TSubscriptionClientSender : public TActorBootstrapped<TSubscriptionClientS
370378 " TSubscriptionClientSender(" << Subscription->Subscriber .ToString () << " ) send TEvConfigSubscriptionNotificationRequest: "
371379 << notification.Get ()->Record .ShortDebugString ());
372380
381+ const float mbytes = notification.Get ()->GetCachedByteSize () / 1'000'000 .f ;
382+ Schedule (TDuration::MilliSeconds (100 ) * mbytes, new TEvents::TEvWakeup ());
373383 Send (Subscription->Subscriber , notification.Release (), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession);
374384 }
375385
@@ -522,7 +532,33 @@ void TConfigsProvider::CheckSubscriptions(const TInMemorySubscriptionSet &subscr
522532 const TActorContext &ctx)
523533{
524534 for (auto &subscription : subscriptions)
525- CheckSubscription (subscription, ctx);
535+ ScheduledUpdates[subscription->Subscriber ] = EUpdate::All;
536+ ProcessScheduledUpdates (ctx);
537+ }
538+
539+ void TConfigsProvider::ProcessScheduledUpdates (const TActorContext &ctx)
540+ {
541+ while (!ScheduledUpdates.empty () && InflightUpdates.size () < MAX_INFLIGHT_UPDATES) {
542+ auto it = ScheduledUpdates.begin ();
543+ if (auto subscription = InMemoryIndex.GetSubscription (it->first )) {
544+ switch (it->second ) {
545+ case EUpdate::All:
546+ if (CheckSubscription (subscription, ctx)) {
547+ InflightUpdates.insert (subscription->Subscriber );
548+ }
549+ break ;
550+ case EUpdate::Yaml:
551+ if (UpdateConfig (subscription, ctx)) {
552+ InflightUpdates.insert (subscription->Subscriber );
553+ }
554+ break ;
555+ }
556+ }
557+ ScheduledUpdates.erase (it);
558+ }
559+
560+ *Counters.ScheduledConfigUpdates = ScheduledUpdates.size ();
561+ *Counters.InflightConfigUpdates = InflightUpdates.size ();
526562}
527563
528564void TConfigsProvider::CheckSubscription (TSubscription::TPtr subscription,
@@ -583,7 +619,7 @@ void TConfigsProvider::CheckSubscription(TSubscription::TPtr subscription,
583619 subscription->Worker = ctx.RegisterWithSameMailbox (worker);
584620}
585621
586- void TConfigsProvider::CheckSubscription (TInMemorySubscription::TPtr subscription,
622+ bool TConfigsProvider::CheckSubscription (TInMemorySubscription::TPtr subscription,
587623 const TActorContext &ctx)
588624{
589625 LOG_TRACE_S (ctx, NKikimrServices::CMS_CONFIGS,
@@ -665,7 +701,7 @@ void TConfigsProvider::CheckSubscription(TInMemorySubscription::TPtr subscriptio
665701 LOG_TRACE_S (ctx, NKikimrServices::CMS_CONFIGS,
666702 " TConfigsProvider: no changes found for subscription"
667703 << " " << subscription->Subscriber .ToString () << " :" << subscription->Generation );
668- return ;
704+ return false ;
669705 }
670706
671707 LOG_TRACE_S (ctx, NKikimrServices::CMS_CONFIGS,
@@ -693,13 +729,6 @@ void TConfigsProvider::CheckSubscription(TInMemorySubscription::TPtr subscriptio
693729 for (auto &[id, hash] : VolatileYamlConfigHashes) {
694730 auto *volatileConfig = request->Record .AddVolatileConfigs ();
695731 volatileConfig->SetId (id);
696- auto hashes = subscription->VolatileYamlConfigHashes .size ();
697- Y_UNUSED (hashes);
698- auto itt = subscription->VolatileYamlConfigHashes .find (id);
699- if (itt != subscription->VolatileYamlConfigHashes .end ()) {
700- auto tmp = itt->second ;
701- Y_UNUSED (tmp);
702- }
703732 if (auto it = subscription->VolatileYamlConfigHashes .find (id); it != subscription->VolatileYamlConfigHashes .end () && it->second == hash) {
704733 volatileConfig->SetNotChanged (true );
705734 } else {
@@ -720,8 +749,9 @@ void TConfigsProvider::CheckSubscription(TInMemorySubscription::TPtr subscriptio
720749 }
721750
722751 ctx.Send (subscription->Worker , request.Release ());
723-
724752 subscription->FirstUpdateSent = true ;
753+
754+ return true ;
725755}
726756
727757void TConfigsProvider::DumpStateHTML (IOutputStream &os) const {
@@ -837,7 +867,8 @@ void TConfigsProvider::Handle(TEvConsole::TEvConfigSubscriptionRequest::TPtr &ev
837867
838868 subscription->Worker = RegisterWithSameMailbox (new TSubscriptionClientSender (subscription, SelfId ()));
839869
840- CheckSubscription (subscription, ctx);
870+ ScheduledUpdates[subscription->Subscriber ] = EUpdate::All;
871+ ProcessScheduledUpdates (ctx);
841872}
842873
843874void TConfigsProvider::Handle (TEvConsole::TEvConfigSubscriptionCanceled::TPtr &ev, const TActorContext &ctx)
@@ -865,11 +896,27 @@ void TConfigsProvider::Handle(TEvPrivate::TEvWorkerDisconnected::TPtr &ev, const
865896 auto existing = InMemoryIndex.GetSubscription (subscription->Subscriber );
866897 if (existing == subscription) {
867898 InMemoryIndex.RemoveSubscription (subscription->Subscriber );
899+ ScheduledUpdates.erase (subscription->Subscriber );
900+ InflightUpdates.erase (subscription->Subscriber );
901+
868902 Send (subscription->Subscriber , new TEvConsole::TEvConfigSubscriptionCanceled (subscription->Generation ));
869903
870904 LOG_DEBUG_S (ctx, NKikimrServices::CMS_CONFIGS, " TConfigsProvider removed subscription "
871905 << subscription->Subscriber << " :" << subscription->Generation << " (subscription worker died)" );
872906 }
907+
908+ ProcessScheduledUpdates (ctx);
909+ }
910+
911+ void TConfigsProvider::Handle (TEvPrivate::TEvWorkerCoolDown::TPtr &ev, const TActorContext &ctx)
912+ {
913+ auto subscription = ev->Get ()->Subscription ;
914+ auto existing = InMemoryIndex.GetSubscription (subscription->Subscriber );
915+ if (existing == subscription) {
916+ InflightUpdates.erase (subscription->Subscriber );
917+ }
918+
919+ ProcessScheduledUpdates (ctx);
873920}
874921
875922void TConfigsProvider::Handle (TEvConsole::TEvCheckConfigUpdatesRequest::TPtr &ev, const TActorContext &ctx)
@@ -1259,19 +1306,21 @@ void TConfigsProvider::Handle(TEvPrivate::TEvUpdateYamlConfig::TPtr &ev, const T
12591306 }
12601307
12611308 for (auto &[_, subscription] : InMemoryIndex.GetSubscriptions ()) {
1262- UpdateConfig (subscription, ctx );
1309+ ScheduledUpdates. emplace (subscription-> Subscriber , EUpdate::Yaml );
12631310 }
12641311 } else {
12651312 const auto * subs = InMemoryIndex.GetSubscriptions (ev->Get ()->ChangedDatabase );
12661313 if (subs) {
12671314 for (auto &subscription : *subs) {
1268- UpdateConfig (subscription, ctx );
1315+ ScheduledUpdates. emplace (subscription-> Subscriber , EUpdate::Yaml );
12691316 }
12701317 }
12711318 }
1319+
1320+ ProcessScheduledUpdates (ctx);
12721321}
12731322
1274- void TConfigsProvider::UpdateConfig (TInMemorySubscription::TPtr subscription,
1323+ bool TConfigsProvider::UpdateConfig (TInMemorySubscription::TPtr subscription,
12751324 const TActorContext &ctx)
12761325{
12771326 if (subscription->ServeYaml ) {
@@ -1309,7 +1358,10 @@ void TConfigsProvider::UpdateConfig(TInMemorySubscription::TPtr subscription,
13091358 }
13101359
13111360 ctx.Send (subscription->Worker , request.Release ());
1361+ return true ;
13121362 }
1363+
1364+ return false ;
13131365}
13141366
13151367} // namespace NKikimr::NConsole
0 commit comments