Skip to content

Commit 0ea5d4b

Browse files
committed
[amqp] Add ssl connection support.
1 parent 7c730e9 commit 0ea5d4b

12 files changed

+401
-5
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ bin/jp.php
99
bin/php-parse
1010
bin/google-cloud-batch
1111
vendor
12+
var
1213
.php_cs
1314
.php_cs.cache
1415
composer.lock

bin/build-rabbitmq-image.sh

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#!/usr/bin/env bash
2+
3+
set -e
4+
set -x
5+
6+
(cd docker && docker build --rm --force-rm --no-cache --pull --squash --tag "enqueue/rabbitmq:latest" -f Dockerfile.rabbitmq .)
7+
(cd docker && docker login --username="$DOCKER_USER" --password="$DOCKER_PASSWORD")
8+
(cd docker && docker push "enqueue/rabbitmq-ssl:latest")

bin/build-rabbitmq-ssl-image.sh

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#!/usr/bin/env bash
2+
3+
4+
set -e
5+
set -x
6+
7+
#mkdir -p /tmp/roboconf
8+
#rm -rf /tmp/roboconf/*
9+
#
10+
#(cd /tmp/roboconf && git clone [email protected]:roboconf/rabbitmq-with-ssl-in-docker.git)
11+
#
12+
#(cd /tmp/roboconf/rabbitmq-with-ssl-in-docker && docker build --rm --force-rm --no-cache --pull --squash --tag "enqueue/rabbitmq-ssl:latest" .)
13+
#
14+
#(cd /tmp/roboconf/rabbitmq-with-ssl-in-docker && docker login --username="$DOCKER_USER" --password="$DOCKER_PASSWORD")
15+
#(cd /tmp/roboconf/rabbitmq-with-ssl-in-docker && docker push "enqueue/rabbitmq-ssl:latest")
16+
17+
docker run --rm -v "`pwd`/var/rabbitmq_certificates:/enqueue" "enqueue/rabbitmq-ssl:latest" cp /home/testca/cacert.pem /enqueue/cacert.pem
18+
19+
20+

docker-compose.yml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ services:
66
# build: { context: docker, dockerfile: Dockerfile }
77
depends_on:
88
- rabbitmq
9+
- rabbitmq_ssl
910
- mysql
1011
- redis
1112
- beanstalkd
@@ -17,6 +18,7 @@ services:
1718
- './:/mqdev'
1819
environment:
1920
- AMQP_DSN=amqp://guest:guest@rabbitmq:5672/mqdev
21+
- AMQPS_DSN=amqps://guest:guest@rabbitmq_ssl:5671
2022
- DOCTINE_DSN=mysql://root:rootpass@mysql/mqdev
2123
- SYMFONY__RABBITMQ__HOST=rabbitmq
2224
- SYMFONY__RABBITMQ__USER=guest
@@ -54,8 +56,16 @@ services:
5456
ports:
5557
- "15677:15672"
5658

59+
rabbitmq_ssl:
60+
image: enqueue/rabbitmq-ssl:latest
61+
environment:
62+
- RABBITMQ_DEFAULT_USER=guest
63+
- RABBITMQ_DEFAULT_PASS=guest
64+
volumes:
65+
- './var/rabbitmq_certificates:/home/client'
66+
5767
beanstalkd:
58-
image: 'schickling/beanstalkd'
68+
image: 'jonbaldie/beanstalkd'
5969

6070
gearmand:
6171
image: 'artefactual/gearmand'

pkg/amqp-bunny/AmqpConnectionFactory.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ public function getConfig()
8585
*/
8686
private function establishConnection()
8787
{
88+
if ($this->config->isSslOn()) {
89+
throw new \LogicException('The bunny library does not support SSL connections');
90+
}
91+
8892
if (false == $this->client) {
8993
$bunnyConfig = [];
9094
$bunnyConfig['host'] = $this->config->getHost();
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests\Spec;
4+
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
6+
use Enqueue\AmqpBunny\AmqpContext;
7+
use Interop\Queue\PsrContext;
8+
use Interop\Queue\Spec\SendToAndReceiveFromQueueSpec;
9+
10+
/**
11+
* @group functional
12+
*/
13+
class AmqpSslSendToAndReceiveFromQueueTest extends SendToAndReceiveFromQueueSpec
14+
{
15+
public function test()
16+
{
17+
$this->expectException(\LogicException::class);
18+
$this->expectExceptionMessage('The bunny library does not support SSL connections');
19+
parent::test();
20+
}
21+
22+
/**
23+
* {@inheritdoc}
24+
*/
25+
protected function createContext()
26+
{
27+
$baseDir = realpath(__DIR__.'/../../../../');
28+
29+
// guard
30+
$this->assertNotEmpty($baseDir);
31+
32+
$certDir = $baseDir.'/var/rabbitmq_certificates';
33+
$this->assertDirectoryExists($certDir);
34+
35+
$factory = new AmqpConnectionFactory([
36+
'dsn' => getenv('AMQPS_DSN'),
37+
'ssl_verify' => false,
38+
'ssl_cacert' => $certDir.'/cacert.pem',
39+
'ssl_cert' => $certDir.'/cert.pem',
40+
'ssl_key' => $certDir.'/key.pem',
41+
]);
42+
43+
return $factory->createContext();
44+
}
45+
46+
/**
47+
* {@inheritdoc}
48+
*
49+
* @param AmqpContext $context
50+
*/
51+
protected function createQueue(PsrContext $context, $queueName)
52+
{
53+
$queue = $context->createQueue($queueName);
54+
$context->declareQueue($queue);
55+
$context->purgeQueue($queue);
56+
57+
return $queue;
58+
}
59+
}

pkg/amqp-ext/AmqpConnectionFactory.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,20 @@ private function establishConnection()
113113
$extConfig['read_timeout'] = $this->config->getReadTimeout();
114114
$extConfig['write_timeout'] = $this->config->getWriteTimeout();
115115
$extConfig['connect_timeout'] = $this->config->getConnectionTimeout();
116+
$extConfig['heartbeat'] = $this->config->getHeartbeat();
117+
118+
if ($this->config->isSslOn()) {
119+
$extConfig['verify'] = $this->config->isSslVerify();
120+
$extConfig['cacert'] = $this->config->getSslCaCert();
121+
$extConfig['cert'] = $this->config->getSslCert();
122+
$extConfig['key'] = $this->config->getSslKey();
123+
}
116124

117125
$this->connection = new \AMQPConnection($extConfig);
118126

119127
$this->config->isPersisted() ? $this->connection->pconnect() : $this->connection->connect();
120128
}
129+
121130
if (false == $this->connection->isConnected()) {
122131
$this->config->isPersisted() ? $this->connection->preconnect() : $this->connection->reconnect();
123132
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpExt\Tests\Spec;
4+
5+
use Enqueue\AmqpExt\AmqpConnectionFactory;
6+
use Enqueue\AmqpExt\AmqpContext;
7+
use Interop\Queue\PsrContext;
8+
use Interop\Queue\Spec\SendToAndReceiveFromQueueSpec;
9+
10+
/**
11+
* @group functional
12+
*/
13+
class AmqpSslSendToAndReceiveFromQueueTest extends SendToAndReceiveFromQueueSpec
14+
{
15+
/**
16+
* {@inheritdoc}
17+
*/
18+
protected function createContext()
19+
{
20+
$baseDir = realpath(__DIR__.'/../../../../');
21+
22+
// guard
23+
$this->assertNotEmpty($baseDir);
24+
25+
$certDir = $baseDir.'/var/rabbitmq_certificates';
26+
$this->assertDirectoryExists($certDir);
27+
28+
$factory = new AmqpConnectionFactory([
29+
'dsn' => getenv('AMQPS_DSN'),
30+
'ssl_verify' => false,
31+
'ssl_cacert' => $certDir.'/cacert.pem',
32+
'ssl_cert' => $certDir.'/cert.pem',
33+
'ssl_key' => $certDir.'/key.pem',
34+
]);
35+
36+
return $factory->createContext();
37+
}
38+
39+
/**
40+
* {@inheritdoc}
41+
*
42+
* @param AmqpContext $context
43+
*/
44+
protected function createQueue(PsrContext $context, $queueName)
45+
{
46+
$queue = $context->createQueue($queueName);
47+
$context->declareQueue($queue);
48+
$context->purgeQueue($queue);
49+
50+
return $queue;
51+
}
52+
}

pkg/amqp-lib/AmqpConnectionFactory.php

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use PhpAmqpLib\Connection\AMQPLazyConnection;
1111
use PhpAmqpLib\Connection\AMQPLazySocketConnection;
1212
use PhpAmqpLib\Connection\AMQPSocketConnection;
13+
use PhpAmqpLib\Connection\AMQPSSLConnection;
1314
use PhpAmqpLib\Connection\AMQPStreamConnection;
1415

1516
class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrategyAware
@@ -84,7 +85,32 @@ private function establishConnection()
8485
{
8586
if (false == $this->connection) {
8687
if ($this->config->getOption('stream')) {
87-
if ($this->config->isLazy()) {
88+
if ($this->config->isSslOn()) {
89+
$con = new AMQPSSLConnection(
90+
$this->config->getHost(),
91+
$this->config->getPort(),
92+
$this->config->getUser(),
93+
$this->config->getPass(),
94+
$this->config->getVHost(),
95+
[
96+
'cafile' => $this->config->getSslCaCert(),
97+
'local_cert' => $this->config->getSslCert(),
98+
'local_pk' => $this->config->getSslKey(),
99+
'verify_peer' => $this->config->isSslVerify(),
100+
'verify_peer_name' => $this->config->isSslVerify(),
101+
],
102+
[
103+
'insist' => $this->config->getOption('insist'),
104+
'login_method' => $this->config->getOption('login_method'),
105+
'login_response' => $this->config->getOption('login_response'),
106+
'locale' => $this->config->getOption('locale'),
107+
'connection_timeout' => $this->config->getConnectionTimeout(),
108+
'read_write_timeout' => (int) round(min($this->config->getReadTimeout(), $this->config->getWriteTimeout())),
109+
'keepalive' => $this->config->getOption('keepalive'),
110+
'heartbeat' => (int) round($this->config->getHeartbeat()),
111+
]
112+
);
113+
} elseif ($this->config->isLazy()) {
88114
$con = new AMQPLazyConnection(
89115
$this->config->getHost(),
90116
$this->config->getPort(),
@@ -120,6 +146,10 @@ private function establishConnection()
120146
);
121147
}
122148
} else {
149+
if ($this->config->isSslOn()) {
150+
throw new \LogicException('The socket connection implementation does not support ssl connections.');
151+
}
152+
123153
if ($this->config->isLazy()) {
124154
$con = new AMQPLazySocketConnection(
125155
$this->config->getHost(),
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib\Tests\Spec;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Enqueue\AmqpLib\AmqpContext;
7+
use Interop\Queue\PsrContext;
8+
use Interop\Queue\Spec\SendToAndReceiveFromQueueSpec;
9+
10+
/**
11+
* @group functional
12+
*/
13+
class AmqpSslSendToAndReceiveFromQueueTest extends SendToAndReceiveFromQueueSpec
14+
{
15+
/**
16+
* {@inheritdoc}
17+
*/
18+
protected function createContext()
19+
{
20+
$baseDir = realpath(__DIR__.'/../../../../');
21+
22+
// guard
23+
$this->assertNotEmpty($baseDir);
24+
25+
$certDir = $baseDir.'/var/rabbitmq_certificates';
26+
$this->assertDirectoryExists($certDir);
27+
28+
$factory = new AmqpConnectionFactory([
29+
'dsn' => getenv('AMQPS_DSN'),
30+
'ssl_verify' => false,
31+
'ssl_cacert' => $certDir.'/cacert.pem',
32+
'ssl_cert' => $certDir.'/cert.pem',
33+
'ssl_key' => $certDir.'/key.pem',
34+
]);
35+
36+
return $factory->createContext();
37+
}
38+
39+
/**
40+
* {@inheritdoc}
41+
*
42+
* @param AmqpContext $context
43+
*/
44+
protected function createQueue(PsrContext $context, $queueName)
45+
{
46+
$queue = $context->createQueue($queueName);
47+
$context->declareQueue($queue);
48+
$context->purgeQueue($queue);
49+
50+
return $queue;
51+
}
52+
}

0 commit comments

Comments
 (0)