Skip to content

Commit 10e247c

Browse files
committed
feat: Raise an error when trying to use a consumer after it has been closed
1 parent 0d13659 commit 10e247c

3 files changed

Lines changed: 61 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## [Unreleased]
44
### Changed
5+
- Raise an error when trying to use a consumer after it has been closed
56
- Add `Consumer#stop` method to stop an `each` loop
67
- Add crystal versions 1.15.1 and 1.16.3 to test matrix
78

spec/kafka/consumer_spec.cr

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,24 @@ describe Kafka::Consumer do
2525
consumer.subscribe("foo", "foo")
2626
end
2727
end
28+
29+
it "raises an exception when called after the consumer is closed" do
30+
consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"})
31+
consumer.close
32+
expect_raises(Kafka::ConsumerException, "librdkafka error - Consumer closed") do
33+
consumer.subscribe("foo")
34+
end
35+
end
36+
end
37+
38+
describe "#poll" do
39+
it "raises an exception when called after the consumer is closed" do
40+
consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"})
41+
consumer.close
42+
expect_raises(Kafka::ConsumerException, "librdkafka error - Consumer closed") do
43+
consumer.poll(250)
44+
end
45+
end
2846
end
2947

3048
describe "#each" do
@@ -42,5 +60,33 @@ describe Kafka::Consumer do
4260
consumer.each(timeout: 10) { }
4361
end
4462
end
63+
64+
it "raises an exception when called after the consumer is closed" do
65+
consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"})
66+
consumer.close
67+
expect_raises(Kafka::ConsumerException, "librdkafka error - Consumer closed") do
68+
consumer.each { }
69+
end
70+
end
71+
end
72+
73+
describe "#open?" do
74+
it "returns true after creation" do
75+
consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"})
76+
consumer.open?.should be_true
77+
end
78+
79+
it "returns false after closing" do
80+
consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"})
81+
consumer.close
82+
consumer.open?.should be_false
83+
end
84+
end
85+
86+
describe "#close" do
87+
it "does not raise an exception when called multiple times" do
88+
consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9094", "group.id" => "foo", "broker.address.family" => "v4"})
89+
2.times { consumer.close }
90+
end
4591
end
4692
end

src/kafka/consumer.cr

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ module Kafka
2929
#
3030
# Calls the `rd_kafka_subscribe` C function.
3131
def subscribe(*topics)
32+
verify_handle_open!
3233
tpl = LibRdKafka.topic_partition_list_new(topics.size)
3334
topics.each do |topic|
3435
LibRdKafka.topic_partition_list_add(tpl, topic, -1)
@@ -43,6 +44,7 @@ module Kafka
4344
#
4445
# Calls the `rd_kafka_consumer_poll` C function.
4546
def poll(timeout_ms : Int32, raise_on_error : Bool = true) : Message?
47+
verify_handle_open!
4648
message_ptr = LibRdKafka.consumer_poll(@handle, timeout_ms)
4749
return if message_ptr.null?
4850

@@ -59,6 +61,7 @@ module Kafka
5961
#
6062
# At the beginning of each loop, `Fiber.yield` is called allow other Fibers to run.
6163
def each(timeout = 250, raise_on_error = true, &)
64+
verify_handle_open!
6265
@running = true
6366
while @running
6467
Fiber.yield
@@ -80,13 +83,24 @@ module Kafka
8083
@running = false
8184
end
8285

86+
# Returns whether the consumer is open.
87+
def open?
88+
!@handle.null?
89+
end
90+
8391
# Close the consumer and destroy the Kafka handle.
8492
#
8593
# Calls the `rd_kafka_consumer_close` and `rd_kafka_destroy` C functions.
8694
def close
95+
return if @handle.null?
96+
8797
LibRdKafka.consumer_close(@handle)
8898
LibRdKafka.kafka_destroy(@handle)
8999
@handle = LibRdKafka::KafkaHandle.null
90100
end
101+
102+
private def verify_handle_open!
103+
raise ConsumerException.new("Consumer closed") if @handle.null?
104+
end
91105
end
92106
end

0 commit comments

Comments
 (0)