Skip to content

Commit af27b52

Browse files
committed
feat: Make serializer configurable via YAML configuration
- Add support for configuring serializers through YAML in RdKafkaContext - Allow serializer specification as a class name, array with options, or instance
1 parent f5759ec commit af27b52

File tree

2 files changed

+51
-1
lines changed

2 files changed

+51
-1
lines changed

pkg/rdkafka/RdKafkaContext.php

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Interop\Queue\Queue;
1717
use Interop\Queue\SubscriptionConsumer;
1818
use Interop\Queue\Topic;
19+
use InvalidArgumentException;
1920
use RdKafka\Conf;
2021
use RdKafka\KafkaConsumer;
2122
use RdKafka\Producer as VendorProducer;
@@ -54,8 +55,35 @@ public function __construct(array $config)
5455
$this->config = $config;
5556
$this->kafkaConsumers = [];
5657
$this->rdKafkaConsumers = [];
58+
$this->configureSerializer($config);
59+
}
60+
61+
/**
62+
* @param array $config
63+
* @return void
64+
*/
65+
private function configureSerializer(array $config): void
66+
{
67+
if (!isset($config['serializer'])) {
68+
$this->setSerializer(new JsonSerializer());
69+
return;
70+
}
5771

58-
$this->setSerializer(new JsonSerializer());
72+
if (is_string($config['serializer'])) {
73+
$this->setSerializer(new $config['serializer']());
74+
} elseif (is_array($config['serializer']) && isset($config['serializer']['class'])) {
75+
$serializerClass = $config['serializer']['class'];
76+
$serializerOptions = $config['serializer']['options'] ?? [];
77+
if (!empty($serializerOptions)) {
78+
$this->setSerializer(new $serializerClass($serializerOptions));
79+
} else {
80+
$this->setSerializer(new $serializerClass());
81+
}
82+
} elseif ($config['serializer'] instanceof Serializer) {
83+
$this->setSerializer($config['serializer']);
84+
} else {
85+
throw new InvalidArgumentException('Invalid serializer configuration');
86+
}
5987
}
6088

6189
/**

pkg/rdkafka/Tests/RdKafkaContextTest.php

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use Enqueue\RdKafka\Serializer;
99
use Interop\Queue\Exception\InvalidDestinationException;
1010
use Interop\Queue\Exception\TemporaryQueueNotSupportedException;
11+
use InvalidArgumentException;
1112
use PHPUnit\Framework\TestCase;
1213

1314
class RdKafkaContextTest extends TestCase
@@ -36,6 +37,27 @@ public function testShouldSetJsonSerializerInConstructor()
3637
$this->assertInstanceOf(JsonSerializer::class, $context->getSerializer());
3738
}
3839

40+
public function testShouldUseStringSerializerClassFromConfig()
41+
{
42+
$mockSerializerClass = get_class($this->createMock(Serializer::class));
43+
44+
$context = new RdKafkaContext([
45+
'serializer' => $mockSerializerClass
46+
]);
47+
48+
$this->assertInstanceOf($mockSerializerClass, $context->getSerializer());
49+
}
50+
51+
public function testShouldThrowExceptionOnInvalidSerializerConfig()
52+
{
53+
$this->expectException(InvalidArgumentException::class);
54+
$this->expectExceptionMessage('Invalid serializer configuration');
55+
56+
new RdKafkaContext([
57+
'serializer' => 123
58+
]);
59+
}
60+
3961
public function testShouldAllowGetPreviouslySetSerializer()
4062
{
4163
$context = new RdKafkaContext([]);

0 commit comments

Comments
 (0)