Skip to content

Commit c17e79f

Browse files
committed
Merge branch 'master' into mongodb_transport
2 parents 947e00f + eaf9267 commit c17e79f

16 files changed

+1045
-17
lines changed

.travis.yml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,12 @@ language: php
77

88
matrix:
99
include:
10-
# - php: 5.6
11-
# env: SYMFONY_VERSION=2.8.* UNIT_TESTS=true
1210
- php: 7.1
1311
env: SYMFONY_VERSION=3.0.* PHPSTAN=true
1412
- php: 7.1
1513
env: SYMFONY_VERSION=3.0.* PHP_CS_FIXER=true
1614
- php: 7.0
1715
env: SYMFONY_VERSION=2.8.* UNIT_TESTS=true
18-
- php: 5.6
19-
env: SYMFONY_VERSION=3.0.* UNIT_TESTS=true
2016
- php: 7.0
2117
env: SYMFONY_VERSION=3.0.* UNIT_TESTS=true
2218
- php: 7.1

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Change Log
22

3+
## [0.8.27](https://github.com/php-enqueue/enqueue-dev/tree/0.8.27) (2018-05-01)
4+
[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.8.26...0.8.27)
5+
6+
- Kafka symfony transport [\#432](https://github.com/php-enqueue/enqueue-dev/pull/432) ([dheineman](https://github.com/dheineman))
7+
- Drop PHP5 support, Drop Symfony 2.X support. [\#419](https://github.com/php-enqueue/enqueue-dev/pull/419) ([makasim](https://github.com/makasim))
8+
9+
- How can I use the Symfony Bundle with Kafka? [\#428](https://github.com/php-enqueue/enqueue-dev/issues/428)
10+
311
## [0.8.26](https://github.com/php-enqueue/enqueue-dev/tree/0.8.26) (2018-04-19)
412
[Full Changelog](https://github.com/php-enqueue/enqueue-dev/compare/0.8.25...0.8.26)
513

docs/bundle/config_reference.md

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ enqueue:
2121
connection_timeout: 1
2222
buffer_size: 1000
2323
lazy: true
24-
25-
# Should be true if you want to use secure connections. False by default
2624
ssl_on: false
2725
rabbitmq_stomp:
2826
host: localhost
@@ -34,6 +32,7 @@ enqueue:
3432
connection_timeout: 1
3533
buffer_size: 1000
3634
lazy: true
35+
ssl_on: false
3736

3837
# The option tells whether RabbitMQ broker has management plugin installed or not
3938
management_plugin_installed: false
@@ -42,7 +41,7 @@ enqueue:
4241
# The option tells whether RabbitMQ broker has delay plugin installed or not
4342
delay_plugin_installed: false
4443
amqp:
45-
driver: ~ # One of "ext"; "lib"; "bunny"
44+
driver: ~
4645

4746
# The connection to AMQP broker set as a string. Other parameters could be used as defaults
4847
dsn: ~
@@ -106,7 +105,7 @@ enqueue:
106105
# Path to local private key file on filesystem in case of separate files for certificate (local_cert) and private key. A string.
107106
ssl_key: ~
108107
rabbitmq_amqp:
109-
driver: ~ # One of "ext"; "lib"; "bunny"
108+
driver: ~
110109

111110
# The connection to AMQP broker set as a string. Other parameters could be used as defaults
112111
dsn: ~
@@ -190,18 +189,27 @@ enqueue:
190189
polling_interval: 100
191190
redis:
192191

192+
# The redis connection given as DSN. For example redis://host:port?vendor=predis
193+
dsn: ~
194+
193195
# can be a host, or the path to a unix domain socket
194-
host: ~ # Required
196+
host: ~
195197
port: ~
196198

197199
# The library used internally to interact with Redis server
198-
vendor: ~ # One of "phpredis"; "predis", Required
200+
vendor: ~ # One of "phpredis"; "predis"; "custom"
201+
202+
# A custom redis service id, used with vendor true only
203+
redis: ~
199204

200205
# bool, Whether it use single persisted connection or open a new one for every context
201206
persisted: false
202207

203208
# the connection will be performed as later as possible, if the option set to true
204209
lazy: true
210+
211+
# Database index to select when connected.
212+
database: 0
205213
dbal:
206214

207215
# The Doctrine DBAL DSN. Other parameters are ignored if set
@@ -229,6 +237,7 @@ enqueue:
229237

230238
# the connection will be performed as later as possible, if the option set to true
231239
lazy: true
240+
endpoint: null
232241
gps:
233242

234243
# The connection to Google Pub/Sub broker set as a string. Other parameters are ignored if set
@@ -248,8 +257,36 @@ enqueue:
248257

249258
# The connection will be performed as later as possible, if the option set to true
250259
lazy: true
260+
rdkafka:
261+
262+
# The kafka DSN. Other parameters are ignored if set
263+
dsn: ~
264+
265+
# The kafka global configuration properties
266+
global: []
267+
268+
# The kafka topic configuration properties
269+
topic: []
270+
271+
# Delivery report callback
272+
dr_msg_cb: ~
273+
274+
# Error callback
275+
error_cb: ~
276+
277+
# Called after consumer group has been rebalanced
278+
rebalance_cb: ~
279+
280+
# Which partitioner to use
281+
partitioner: ~ # One of "RD_KAFKA_MSG_PARTITIONER_RANDOM"; "RD_KAFKA_MSG_PARTITIONER_CONSISTENT"
282+
283+
# Logging level (syslog(3) levels)
284+
log_level: ~
285+
286+
# Commit asynchronous
287+
commit_async: false
251288
client:
252-
traceable_producer: false
289+
traceable_producer: true
253290
prefix: enqueue
254291
app_name: app
255292
router_topic: default

docs/client/supported_brokers.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,24 @@ Here's the list of transports supported by Enqueue Client:
1313
| Redis | [enqueue/gps](../transport/redis.md) | redis: |
1414
| Amazon SQS | [enqueue/sqs](../transport/sqs.md) | sqs: |
1515
| STOMP, RabbitMQ | [enqueue/stomp](../transport/stomp.md) | stomp: |
16+
| Kafka | [enqueue/stomp](../transport/kafka.md) | kafka: |
1617
| Null | [enqueue/null](../transport/null.md) | null: |
1718

1819
Here's the list of protocols and Client features supported by them
1920

2021
| Protocol | Priority | Delay | Expiration | Setup broker | Message bus |
2122
|:--------------:|:--------:|:--------:|:----------:|:------------:|:-----------:|
2223
| AMQP | No | No | Yes | Yes | Yes |
23-
| RabbitMQ AMQP | Yes | Yes* | Yes | Yes | Yes |
24+
| RabbitMQ AMQP | Yes | Yes | Yes | Yes | Yes |
2425
| STOMP | No | No | Yes | No | Yes** |
25-
| RabbitMQ STOMP | Yes | Yes* | Yes | Yes*** | Yes** |
26+
| RabbitMQ STOMP | Yes | Yes | Yes | Yes*** | Yes** |
2627
| Filesystem | No | No | No | Yes | No |
2728
| Redis | No | No | No | Not needed | No |
2829
| Doctrine DBAL | Yes | Yes | No | Yes | No |
2930
| Amazon SQS | No | Yes | No | Yes | Not impl |
31+
| Kafka | No | No | No | Yes | No |
3032
| Google PubSub | Not impl | Not impl | Not impl | Yes | Not impl |
3133

32-
* \* Possible if a RabbitMQ delay plugin is installed.
3334
* \*\* Possible if topics (exchanges) are configured on broker side manually.
3435
* \*\*\* Possible if RabbitMQ Management Plugin is installed.
3536

pkg/dbal/DbalConsumer.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
use Doctrine\DBAL\Connection;
66
use Doctrine\DBAL\Types\Type;
7-
use Enqueue\Util\JSON;
87
use Interop\Queue\InvalidMessageException;
98
use Interop\Queue\PsrConsumer;
109
use Interop\Queue\PsrMessage;

pkg/dbal/DbalProducer.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
namespace Enqueue\Dbal;
44

55
use Doctrine\DBAL\Types\Type;
6-
use Enqueue\Util\JSON;
76
use Interop\Queue\Exception;
87
use Interop\Queue\InvalidDestinationException;
98
use Interop\Queue\InvalidMessageException;

pkg/dbal/JSON.php

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<?php
2+
3+
namespace Enqueue\Dbal;
4+
5+
class JSON
6+
{
7+
/**
8+
* @param string $string
9+
*
10+
* @throws \InvalidArgumentException
11+
*
12+
* @return array
13+
*/
14+
public static function decode($string)
15+
{
16+
if (!is_string($string)) {
17+
throw new \InvalidArgumentException(sprintf(
18+
'Accept only string argument but got: "%s"',
19+
is_object($string) ? get_class($string) : gettype($string)
20+
));
21+
}
22+
23+
// PHP7 fix - empty string and null cause syntax error
24+
if (empty($string)) {
25+
return null;
26+
}
27+
28+
$decoded = json_decode($string, true);
29+
if (JSON_ERROR_NONE !== json_last_error()) {
30+
throw new \InvalidArgumentException(sprintf(
31+
'The malformed json given. Error %s and message %s',
32+
json_last_error(),
33+
json_last_error_msg()
34+
));
35+
}
36+
37+
return $decoded;
38+
}
39+
40+
/**
41+
* @param mixed $value
42+
*
43+
* @return string
44+
*/
45+
public static function encode($value)
46+
{
47+
$encoded = json_encode($value, JSON_UNESCAPED_UNICODE);
48+
49+
if (JSON_ERROR_NONE !== json_last_error()) {
50+
throw new \InvalidArgumentException(sprintf(
51+
'Could not encode value into json. Error %s and message %s',
52+
json_last_error(),
53+
json_last_error_msg()
54+
));
55+
}
56+
57+
return $encoded;
58+
}
59+
}

pkg/enqueue-bundle/EnqueueBundle.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
use Enqueue\Fs\Symfony\FsTransportFactory;
2222
use Enqueue\Gps\GpsConnectionFactory;
2323
use Enqueue\Gps\Symfony\GpsTransportFactory;
24+
use Enqueue\RdKafka\RdKafkaConnectionFactory;
25+
use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory;
2426
use Enqueue\Redis\RedisConnectionFactory;
2527
use Enqueue\Redis\Symfony\RedisTransportFactory;
2628
use Enqueue\Sqs\SqsConnectionFactory;
@@ -104,6 +106,12 @@ class_exists(AmqpLibConnectionFactory::class)
104106
$extension->setTransportFactory(new MissingTransportFactory('gps', ['enqueue/gps']));
105107
}
106108

109+
if (class_exists(RdKafkaConnectionFactory::class)) {
110+
$extension->setTransportFactory(new RdKafkaTransportFactory('rdkafka'));
111+
} else {
112+
$extension->setTransportFactory(new MissingTransportFactory('rdkafka', ['enqueue/rdkafka']));
113+
}
114+
107115
$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
108116
$container->addCompilerPass(new AsyncTransformersPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
109117
}

pkg/enqueue/Symfony/DefaultTransportFactory.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
use Enqueue\Gps\Symfony\GpsTransportFactory;
1111
use Enqueue\Null\NullConnectionFactory;
1212
use Enqueue\Null\Symfony\NullTransportFactory;
13+
use Enqueue\RdKafka\RdKafkaConnectionFactory;
14+
use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory;
1315
use Enqueue\Redis\RedisConnectionFactory;
1416
use Enqueue\Redis\Symfony\RedisTransportFactory;
1517
use Enqueue\Sqs\SqsConnectionFactory;
@@ -209,6 +211,10 @@ private function findFactory($dsn)
209211
return new StompTransportFactory('default_stomp');
210212
}
211213

214+
if ($factory instanceof RdKafkaConnectionFactory) {
215+
return new RdKafkaTransportFactory('default_kafka');
216+
}
217+
212218
throw new \LogicException(sprintf(
213219
'There is no supported transport factory for the connection factory "%s" created from DSN "%s"',
214220
get_class($factory),

pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,5 +289,7 @@ public static function provideDSNs()
289289
yield ['redis:', 'default_redis'];
290290

291291
yield ['stomp:', 'default_stomp'];
292+
293+
yield ['kafka:', 'default_kafka'];
292294
}
293295
}

0 commit comments

Comments
 (0)