-
Notifications
You must be signed in to change notification settings - Fork 302
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
clustering traffic admission control #2970
base: main
Are you sure you want to change the base?
Conversation
76ddbff
to
2bb4e5b
Compare
💻 Deploy preview available: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to rename this file to cluster.go and the cluster.go -> service.go as it makes more sense, but it messes up the diff a lot, so I will leave this for later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ready for feedback, I have just a few small things to address in the meantime.
In cases with very large k8s clusters for example, where discovery is immense and costly, I think we would want to wait to even do discovery until after cluster has converged. Do you agree? Should there be an additional option to wait for cluster converge before doing any work? |
The discovery is the same for each instance, regardless of number of instances in the cluster, so stopping them from doing work will not improve things. Every instance needs to be able to handle the entire cluster discovery in current architecture. This PR follows the design that I shared here where we explicitly said that
I think scaling discovery is a separate problem to address (and some plans on what to do are here) and once we have it sharded in some way we can definitely include the min cluster requirements to it in the future. |
I will still need to add the following, as per design:
I would like to do this in a follow-up PR. |
fa2747c
to
50df532
Compare
@@ -269,62 +299,16 @@ func (s *Service) Run(ctx context.Context, host service.Host) error { | |||
ctx, cancel := context.WithCancel(ctx) | |||
defer cancel() | |||
|
|||
limiter := rate.NewLimiter(rate.Every(stateUpdateMinInterval), 1) | |||
s.node.Observe(ckit.FuncObserver(func(peers []peer.Peer) (reregister bool) { | |||
tracer := s.tracer.Tracer("") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic was moved below to a goroutine that handles dispatching cluster change notifications. This is because when cluster minimum size timer expires we also need to dispatch cluster change notifications, even though the peers in ckit didn't change.
internal/service/cluster/cluster.go
Outdated
// Set the gauge to the configured minimum cluster size | ||
minClusterSizeGauge.Set(float64(opts.MinimumClusterSize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding this static metric so we can clearly show on dashboards when we're below minimum. Will make dashboards changes in a follow-up PR.
643510b
to
dcc03cd
Compare
// slow components can currently lead to timeouts and communication errors | ||
// TODO: consider decoupling cluster operations from runtime/components performance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually have done this by putting the dispatching of cluster updates on a separate goroutine. TODO: remove this comment.
_, subSpan := tracer.Start(spanCtx, "NotifyClusterChange", trace.WithSpanKind(trace.SpanKindInternal)) | ||
subSpan.SetAttributes(attribute.String("component_id", comp.ID.String())) | ||
|
||
clusterComponent.NotifyClusterChange() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this fire in a goroutine? To ensure all components get notified close to simultaneously?
"minimum_cluster_size", c.opts.MinimumClusterSize, | ||
"peers_count", len(c.sharder.Peers()), | ||
) | ||
c.clusterChangeCallback() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels off that the callback is called before releasing the lock, but maybe I'm thinking about it wrong.
span.SetAttributes(attribute.Int("minimum_cluster_size", s.opts.MinimumClusterSize)) | ||
|
||
// Notify all components about the clustering change. | ||
components := component.GetAllComponents(host, component.InfoOptions{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to think about this change -
- We get a new callback on node.Observe() from ckit. This triggers a notification.
- The notification calls all cluster aware components' NotifyClusterChange(). This triggers a Ready() call in most/all cluster aware components.
- The first cluster aware component (dependent on limiter) triggers a relevant stateChange if there is one.
- The state change triggers a notification, which calls all cluster aware components' NotifyClusterChange()
- All? Cluster aware components hit the limiter.
Does this sound right? Something here feels off, if I'm understanding it correctly.
PR Description
Added
--cluster.wait-for-size
and--cluster.wait-timeout
flags which allow to specify the minimum cluster sizerequired before components that use clustering begin processing traffic to ensure adequate cluster capacity is
available.
Extended the existing tests, including the e2e tests added previously.
Which issue(s) this PR fixes
Fixes #201
Notes to the Reviewer
PR Checklist