Skip to content

Commit e327e5c

Browse files
Tang8330arxon31
andauthored
Applying PR 1365 (#6)
* fix: running partition watcher only for leader * fix: setting partitions number in assignTopicPartitions * fixed resetting olda values of partitions per topic * cr fix: removed unnecessary map filling * cr fix: removed running partitionWatcher only by leader * cr fix: fixed autoformatting --------- Co-authored-by: arxon31 <[email protected]>
1 parent eee5c2f commit e327e5c

File tree

2 files changed

+26
-26
lines changed

2 files changed

+26
-26
lines changed

consumergroup.go

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ func (g *Generation) heartbeatLoop(interval time.Duration) {
497497
// a bad spot and should rebalance. Commonly you will see an error here if there
498498
// is a problem with the connection to the coordinator and a rebalance will
499499
// establish a new connection to the coordinator.
500-
func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
500+
func (g *Generation) partitionWatcher(interval time.Duration, topic string, startPartitions int) {
501501
g.Start(func(ctx context.Context) {
502502
g.log(func(l Logger) {
503503
l.Printf("started partition watcher for group, %v, topic %v [%v]", g.GroupID, topic, interval)
@@ -509,14 +509,6 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
509509
ticker := time.NewTicker(interval)
510510
defer ticker.Stop()
511511

512-
ops, err := g.conn.readPartitions(topic)
513-
if err != nil {
514-
g.logError(func(l Logger) {
515-
l.Printf("Problem getting partitions during startup, %v\n, Returning and setting up nextGeneration", err)
516-
})
517-
return
518-
}
519-
oParts := len(ops)
520512
for {
521513
select {
522514
case <-ctx.Done():
@@ -525,7 +517,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
525517
ops, err := g.conn.readPartitions(topic)
526518
switch {
527519
case err == nil, errors.Is(err, UnknownTopicOrPartition):
528-
if len(ops) != oParts {
520+
if len(ops) != startPartitions {
529521
g.log(func(l Logger) {
530522
l.Printf("Partition changes found, rebalancing group: %v.", g.GroupID)
531523
})
@@ -651,11 +643,13 @@ func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) {
651643
}
652644

653645
cg := &ConsumerGroup{
654-
config: config,
655-
next: make(chan *Generation),
656-
errs: make(chan error),
657-
done: make(chan struct{}),
646+
config: config,
647+
partitionsPerTopic: make(map[string]int, len(config.Topics)),
648+
next: make(chan *Generation),
649+
errs: make(chan error),
650+
done: make(chan struct{}),
658651
}
652+
659653
cg.wg.Add(1)
660654
go func() {
661655
cg.run()
@@ -670,9 +664,10 @@ func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) {
670664
// Generation is where partition assignments and offset management occur.
671665
// Callers will use Next to get a handle to the Generation.
672666
type ConsumerGroup struct {
673-
config ConsumerGroupConfig
674-
next chan *Generation
675-
errs chan error
667+
config ConsumerGroupConfig
668+
partitionsPerTopic map[string]int
669+
next chan *Generation
670+
errs chan error
676671

677672
closeOnce sync.Once
678673
wg sync.WaitGroup
@@ -789,13 +784,9 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
789784
}
790785
defer conn.Close()
791786

792-
var generationID int32
793-
var groupAssignments GroupMemberAssignments
794-
var assignments map[string][]int32
795-
796787
// join group. this will join the group and prepare assignments if our
797788
// consumer is elected leader. it may also change or assign the member ID.
798-
memberID, generationID, groupAssignments, err = cg.joinGroup(conn, memberID)
789+
memberID, generationID, groupAssignments, err := cg.joinGroup(conn, memberID)
799790
if err != nil {
800791
cg.withErrorLogger(func(log Logger) {
801792
log.Printf("Failed to join group %s: %v", cg.config.ID, err)
@@ -807,7 +798,7 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
807798
})
808799

809800
// sync group
810-
assignments, err = cg.syncGroup(conn, memberID, generationID, groupAssignments)
801+
assignments, err := cg.syncGroup(conn, memberID, generationID, groupAssignments)
811802
if err != nil {
812803
cg.withErrorLogger(func(log Logger) {
813804
log.Printf("Failed to sync group %s: %v", cg.config.ID, err)
@@ -844,8 +835,8 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
844835
// complete.
845836
gen.heartbeatLoop(cg.config.HeartbeatInterval)
846837
if cg.config.WatchPartitionChanges {
847-
for _, topic := range cg.config.Topics {
848-
gen.partitionWatcher(cg.config.PartitionWatchInterval, topic)
838+
for topic, startPartitions := range cg.partitionsPerTopic {
839+
gen.partitionWatcher(cg.config.PartitionWatchInterval, topic, startPartitions)
849840
}
850841
}
851842

@@ -953,6 +944,7 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i
953944
})
954945

955946
var assignments GroupMemberAssignments
947+
956948
if iAmLeader := response.MemberID == response.LeaderID; iAmLeader {
957949
v, err := cg.assignTopicPartitions(conn, response)
958950
if err != nil {
@@ -1036,6 +1028,14 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup
10361028
return nil, err
10371029
}
10381030

1031+
// resetting old values of partitions per topic
1032+
cg.partitionsPerTopic = make(map[string]int, len(topics))
1033+
1034+
// setting new values of partitions per topic
1035+
for _, partition := range partitions {
1036+
cg.partitionsPerTopic[partition.Topic] += 1
1037+
}
1038+
10391039
cg.withLogger(func(l Logger) {
10401040
l.Printf("using '%v' balancer to assign group, %v", group.GroupProtocol, cg.config.ID)
10411041
for _, member := range members {

consumergroup_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ func TestGenerationExitsOnPartitionChange(t *testing.T) {
644644

645645
done := make(chan struct{})
646646
go func() {
647-
gen.partitionWatcher(watchTime, "topic-1")
647+
gen.partitionWatcher(watchTime, "topic-1", 1)
648648
close(done)
649649
}()
650650

0 commit comments

Comments
 (0)