diff --git a/pkg/repository/scheduler_queue.go b/pkg/repository/scheduler_queue.go index f03f3ee1c6..923c8c460a 100644 --- a/pkg/repository/scheduler_queue.go +++ b/pkg/repository/scheduler_queue.go @@ -286,6 +286,7 @@ func (d *sharedRepository) markQueueItemsProcessed(ctx context.Context, tenantId taskIds := make([]int64, 0, len(r.Assigned)) taskInsertedAts := make([]pgtype.Timestamptz, 0, len(r.Assigned)) workerIds := make([]uuid.UUID, 0, len(r.Assigned)) + seenTaskIds := make(map[int64]*sqlcv1.V1QueueItem) var minTaskInsertedAt pgtype.Timestamptz @@ -293,6 +294,19 @@ func (d *sharedRepository) markQueueItemsProcessed(ctx context.Context, tenantId // deleted from the v1_queue_items table, so we should not assign them for id, assignedItem := range queueItemIdsToAssignedItem { if _, ok := queuedItemsMap[id]; ok { + if qi, seen := seenTaskIds[assignedItem.QueueItem.TaskID]; seen { + d.l.Error(). + Int64("task_id", assignedItem.QueueItem.TaskID). + Str("task_inserted_at", qi.TaskInsertedAt.Time.String()). + Int64("new_retry_count", int64(qi.RetryCount)). + Int64("old_retry_count", int64(assignedItem.QueueItem.RetryCount)). + Msg("duplicate task id seen when preparing queue items for `UpdateTasksToAssigned`") + } + + seenTaskIds[assignedItem.QueueItem.TaskID] = assignedItem.QueueItem + // todo: add this back if we remove the error log above for deduping + // taskIdToAssignedItem[assignedItem.QueueItem.TaskID] = assignedItem + taskIds = append(taskIds, assignedItem.QueueItem.TaskID) taskInsertedAts = append(taskInsertedAts, assignedItem.QueueItem.TaskInsertedAt) workerIds = append(workerIds, assignedItem.WorkerId)