Skip to content

Commit

Permalink
Revamp the logging in the redpanda_migrator_offsets output
Browse files Browse the repository at this point in the history
Signed-off-by: Mihai Todor <[email protected]>
  • Loading branch information
mihaitodor committed Feb 26, 2025
1 parent a91f7b2 commit 90fb453
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions internal/impl/kafka/enterprise/redpanda_migrator_offsets_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,24 +273,26 @@ func (w *redpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.
// timestamps of all the records in the topic. It also sets the timestamp of the returned offset to -1 in this case.
listedOffsets, err := w.client.ListOffsetsAfterMilli(ctx, offsetCommitTimestamp, topic)
if err != nil {
return fmt.Errorf("failed to translate consumer offsets: %s", err)
return fmt.Errorf("failed to list offsets for topic %q and timestamp %d: %s", topic, offsetCommitTimestamp, err)
}

w.mgr.Logger().Tracef("Listed offsets for topic %q and timestamp %d: %+v", topic, offsetCommitTimestamp, listedOffsets)

if err := listedOffsets.Error(); err != nil {
return fmt.Errorf("listed offsets error: %s", err)
return fmt.Errorf("failed to read offsets for topic %q and timestamp %d: %s", topic, offsetCommitTimestamp, err)
}

offset, ok := listedOffsets.Lookup(topic, partition)
if !ok {
// This should never happen, but we check just in case.
return fmt.Errorf("committed offset not yet replicated to the destination %q topic: lookup failed", topic)
return fmt.Errorf("record for timestamp %d not yet replicated to the destination topic %q partition %d: lookup failed", offsetCommitTimestamp, topic, partition)
}

if !isHighWatermark && offset.Timestamp == -1 {
// This can happen if we received an offset update, but the record which was read from the source cluster to
// trigger it has not been replicated to the destination cluster yet. In this case, we raise an error so the
// operation is retried.
return fmt.Errorf("committed offset not yet replicated to the destination %q topic", topic)
return fmt.Errorf("record for timestamp %d not yet replicated to the destination topic %q partition %d", offsetCommitTimestamp, topic, partition)
}

// This is an optimisation to try and avoid unnecessary duplicates in the common case when the received offset
Expand All @@ -303,12 +305,12 @@ func (w *redpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.
if isHighWatermark && offset.Timestamp != -1 {
offsets, err := w.client.ListEndOffsets(ctx, topic)
if err != nil {
return fmt.Errorf("failed to read the high watermark for topic %q and partition %q: %s", topic, partition, err)
return fmt.Errorf("failed to list the high watermark for topic %q and partition %q (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err)
}

highWatermark, ok := offsets.Lookup(topic, partition)
if !ok {
return fmt.Errorf("failed to find the high watermark for topic %q and partition %q: %s", topic, partition, err)
return fmt.Errorf("failed to read the high watermark for topic %q and partition %q (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err)
}
if highWatermark.Offset == offset.Offset+1 {
offset.Offset = highWatermark.Offset
Expand All @@ -326,11 +328,11 @@ func (w *redpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.

offsetResponses, err := w.client.CommitOffsets(ctx, group, offsets)
if err != nil {
return fmt.Errorf("failed to commit consumer offsets: %s", err)
return fmt.Errorf("failed to commit consumer offsets for topic %q and partition %q (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err)
}

if err := offsetResponses.Error(); err != nil {
return fmt.Errorf("committed consumer offsets returned an error: %s", err)
return fmt.Errorf("committed consumer offsets returned an error for topic %q and partition %q (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err)
}

return nil
Expand All @@ -349,7 +351,7 @@ func (w *redpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.

wait := backOff.NextBackOff()
if wait == backoff.Stop {
return fmt.Errorf("failed to update consumer offsets for topic %q and partition %d: %s", topic, partition, err)
return fmt.Errorf("failed to update consumer offsets for topic %q and partition %d (timestamp %d): %s", topic, partition, offsetCommitTimestamp, err)
}

time.Sleep(wait)
Expand Down

0 comments on commit 90fb453

Please sign in to comment.