File tree Expand file tree Collapse file tree 2 files changed +2
-2
lines changed Expand file tree Collapse file tree 2 files changed +2
-2
lines changed Original file line number Diff line number Diff line change @@ -254,9 +254,8 @@ def assign_partitions!
254
254
255
255
@pending_message_queue . each do |message |
256
256
partition = message . partition
257
-
257
+ partition_count = @cluster . partitions_for ( message . topic ) . count
258
258
begin
259
- partition_count = @cluster . partitions_for ( message . topic ) . count
260
259
261
260
if partition . nil?
262
261
partition = @partitioner . call ( partition_count , message )
Original file line number Diff line number Diff line change @@ -403,6 +403,7 @@ def write(chunk)
403
403
rescue Kafka ::UnknownTopicOrPartition
404
404
if @use_default_for_unknown_topic && topic != @default_topic
405
405
log . warn "'#{ topic } ' topic not found. Retry with '#{ default_topic } ' topic"
406
+ producer . clear_buffer
406
407
topic = @default_topic
407
408
retry
408
409
end
You can’t perform that action at this time.
0 commit comments