Skip to content

Commit 9609321

Browse files
authored
test: add produce consume e2e test (#202)
1 parent 571bbc4 commit 9609321

File tree

4 files changed

+74
-4
lines changed

4 files changed

+74
-4
lines changed

infection.json.dist

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@
1616
},
1717
"text": "infection-log.txt"
1818
},
19-
"minMsi": 40,
19+
"minMsi": 35,
2020
"minCoveredMsi": 50
2121
}

tests/Clients/Consumer/KafkaBatchConsumerTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public function testMaxBatchSize(): void
2222
{
2323
$testProducer = new TestProducer();
2424
for ($i = 0; $i < 100; $i++) {
25-
$testProducer->run(self::PAYLOAD);
25+
$testProducer->run(self::TOPIC, self::PAYLOAD);
2626
}
2727

2828
$consumer = new KafkaConsumer($this->getConfig());

tests/Clients/Consumer/KafkaTest.php

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Clients\Consumer;
6+
7+
use Exception;
8+
use PHPUnit\Framework\TestCase;
9+
use SimPod\Kafka\Clients\Consumer\ConsumerConfig;
10+
use SimPod\Kafka\Clients\Consumer\KafkaConsumer;
11+
use SimPod\Kafka\Tests\Clients\Consumer\TestProducer;
12+
13+
use function mt_rand;
14+
use function Safe\gethostname;
15+
16+
use const RD_KAFKA_RESP_ERR__PARTITION_EOF;
17+
use const RD_KAFKA_RESP_ERR__TIMED_OUT;
18+
use const RD_KAFKA_RESP_ERR_NO_ERROR;
19+
20+
final class KafkaTest extends TestCase
21+
{
22+
public function testProduceConsume(): void
23+
{
24+
$topic = 'produce-consume';
25+
$headers = ['key' => 'value'];
26+
27+
$testProducer = new TestProducer();
28+
$testProducer->run($topic, 'test', $headers);
29+
30+
unset($testProducer);
31+
32+
$consumer = new KafkaConsumer($this->getConfig());
33+
$consumer->subscribe([$topic]);
34+
35+
while (true) {
36+
$message = $consumer->consume(5000);
37+
switch ($message->err) {
38+
case RD_KAFKA_RESP_ERR_NO_ERROR:
39+
self::assertSame($message->headers, $headers);
40+
41+
break 2;
42+
// phpcs:ignore PSR2.ControlStructures.SwitchDeclaration.TerminatingComment
43+
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
44+
self::fail('No more messages; will wait for more');
45+
// phpcs:ignore PSR2.ControlStructures.SwitchDeclaration.TerminatingComment
46+
case RD_KAFKA_RESP_ERR__TIMED_OUT:
47+
self::fail('Timed out');
48+
default:
49+
throw new Exception($message->errstr(), $message->err);
50+
}
51+
}
52+
}
53+
54+
private function getConfig(): ConsumerConfig
55+
{
56+
$consumerConfig = new ConsumerConfig();
57+
$consumerConfig->set(ConsumerConfig::BOOTSTRAP_SERVERS_CONFIG, '127.0.0.1:9092');
58+
$consumerConfig->set(ConsumerConfig::CLIENT_ID_CONFIG, gethostname());
59+
$consumerConfig->set(ConsumerConfig::GROUP_ID_CONFIG, mt_rand());
60+
$consumerConfig->set(ConsumerConfig::AUTO_OFFSET_RESET_CONFIG, 'earliest');
61+
62+
return $consumerConfig;
63+
}
64+
}

tests/Clients/Consumer/TestProducer.php

+8-2
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,15 @@ static function (KafkaProducer $producer): void {
2323
);
2424
}
2525

26-
public function run(string $payload): void
26+
/** @param array<string, string>|null $headers */
27+
public function run(string $topic, string $payload, array|null $headers = null): void
2728
{
28-
$this->producer->produce(KafkaBatchConsumerTest::TOPIC, null, $payload);
29+
$this->producer->produce(
30+
$topic,
31+
null,
32+
$payload,
33+
headers: $headers,
34+
);
2935
}
3036

3137
private function getConfig(): ProducerConfig

0 commit comments

Comments
 (0)