Skip to content

Commit 1f3cb57

Browse files
committed
add subscription consumer specs
1 parent ac5bb05 commit 1f3cb57

3 files changed

+255
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<?php
2+
3+
namespace Interop\Queue\Spec;
4+
5+
use Interop\Queue\PsrConsumer;
6+
use Interop\Queue\PsrContext;
7+
use Interop\Queue\PsrMessage;
8+
use Interop\Queue\PsrQueue;
9+
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
10+
use PHPUnit\Framework\TestCase;
11+
12+
/**
13+
* @group functional
14+
*/
15+
abstract class SubscriptionConsumerConsumeFromAllSubscribedQueuesSpec extends TestCase
16+
{
17+
/**
18+
* @var PsrContext
19+
*/
20+
private $context;
21+
22+
protected function tearDown()
23+
{
24+
if ($this->context) {
25+
$this->context->close();
26+
}
27+
28+
parent::tearDown();
29+
}
30+
31+
public function test()
32+
{
33+
$this->context = $context = $this->createContext();
34+
35+
$fooQueue = $this->createQueue($context, 'foo_subscription_consumer_consume_from_all_subscribed_queues_spec');
36+
$barQueue = $this->createQueue($context, 'bar_subscription_consumer_consume_from_all_subscribed_queues_spec');
37+
38+
$expectedFooBody = 'fooBody';
39+
$expectedBarBody = 'barBody';
40+
41+
$context->createProducer()->send($fooQueue, $context->createMessage($expectedFooBody));
42+
$context->createProducer()->send($barQueue, $context->createMessage($expectedBarBody));
43+
44+
$fooConsumer = $context->createConsumer($fooQueue);
45+
$barConsumer = $context->createConsumer($barQueue);
46+
47+
$actualBodies = [];
48+
$actualQueues = [];
49+
$callback = function(PsrMessage $message, PsrConsumer $consumer) use (&$actualBodies, &$actualQueues) {
50+
$actualBodies[] = $message->getBody();
51+
$actualQueues[] = $consumer->getQueue()->getQueueName();
52+
53+
$consumer->acknowledge($message);
54+
55+
return true;
56+
};
57+
58+
$subscriptionConsumer = $context->createSubscriptionConsumer();
59+
$subscriptionConsumer->subscribe($fooConsumer, $callback);
60+
$subscriptionConsumer->subscribe($barConsumer, $callback);
61+
62+
$subscriptionConsumer->consume(1000);
63+
64+
$this->assertEquals([$expectedFooBody, $expectedBarBody], $actualBodies);
65+
$this->assertEquals(
66+
[
67+
'foo_subscription_consumer_consume_from_all_subscribed_queues_spec',
68+
'bar_subscription_consumer_consume_from_all_subscribed_queues_spec'
69+
],
70+
$actualQueues
71+
);
72+
}
73+
74+
/**
75+
* @return PsrContext|PsrSubscriptionConsumerAwareContext
76+
*/
77+
abstract protected function createContext();
78+
79+
/**
80+
* @param PsrContext $context
81+
* @param string $queueName
82+
*
83+
* @return PsrQueue
84+
*/
85+
protected function createQueue(PsrContext $context, $queueName)
86+
{
87+
return $context->createQueue($queueName);
88+
}
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
<?php
2+
3+
namespace Interop\Queue\Spec;
4+
5+
use Interop\Queue\PsrConsumer;
6+
use Interop\Queue\PsrContext;
7+
use Interop\Queue\PsrMessage;
8+
use Interop\Queue\PsrQueue;
9+
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
10+
use PHPUnit\Framework\TestCase;
11+
12+
/**
13+
* @group functional
14+
*/
15+
abstract class SubscriptionConsumerConsumeUntilUnsubscribedSpec extends TestCase
16+
{
17+
/**
18+
* @var PsrContext
19+
*/
20+
private $context;
21+
22+
protected function tearDown()
23+
{
24+
if ($this->context) {
25+
$this->context->close();
26+
}
27+
28+
parent::tearDown();
29+
}
30+
31+
public function test()
32+
{
33+
$this->context = $context = $this->createContext();
34+
35+
$fooQueue = $this->createQueue($context, 'foo_subscription_consumer_consume_until_unsubscribed_spec');
36+
$barQueue = $this->createQueue($context, 'bar_subscription_consumer_consume_until_unsubscribed_spec');
37+
38+
$context->createProducer()->send($fooQueue, $context->createMessage());
39+
$context->createProducer()->send($barQueue, $context->createMessage());
40+
41+
$fooConsumer = $context->createConsumer($fooQueue);
42+
$barConsumer = $context->createConsumer($barQueue);
43+
44+
$consumedMessages = 0;
45+
$callback = function(PsrMessage $message, PsrConsumer $consumer) use (&$consumedMessages) {
46+
$consumedMessages++;
47+
48+
$consumer->acknowledge($message);
49+
50+
return true;
51+
};
52+
53+
$subscriptionConsumer = $context->createSubscriptionConsumer();
54+
$subscriptionConsumer->subscribe($fooConsumer, $callback);
55+
$subscriptionConsumer->subscribe($barConsumer, $callback);
56+
57+
$subscriptionConsumer->consume(1000);
58+
59+
$this->assertEquals(2, $consumedMessages);
60+
61+
$context->createProducer()->send($fooQueue, $context->createMessage());
62+
$context->createProducer()->send($barQueue, $context->createMessage());
63+
64+
$consumedMessages = 0;
65+
$subscriptionConsumer->unsubscribe($fooConsumer);
66+
$subscriptionConsumer->consume(1000);
67+
68+
$this->assertEquals(1, $consumedMessages);
69+
}
70+
71+
/**
72+
* @return PsrContext|PsrSubscriptionConsumerAwareContext
73+
*/
74+
abstract protected function createContext();
75+
76+
/**
77+
* @param PsrContext $context
78+
* @param string $queueName
79+
*
80+
* @return PsrQueue
81+
*/
82+
protected function createQueue(PsrContext $context, $queueName)
83+
{
84+
return $context->createQueue($queueName);
85+
}
86+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
3+
namespace Interop\Queue\Spec;
4+
5+
use Interop\Queue\PsrConsumer;
6+
use Interop\Queue\PsrContext;
7+
use Interop\Queue\PsrMessage;
8+
use Interop\Queue\PsrQueue;
9+
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
10+
use PHPUnit\Framework\TestCase;
11+
12+
/**
13+
* @group functional
14+
*/
15+
abstract class SubscriptionConsumerStopOnFalseSpec extends TestCase
16+
{
17+
/**
18+
* @var PsrContext
19+
*/
20+
private $context;
21+
22+
protected function tearDown()
23+
{
24+
if ($this->context) {
25+
$this->context->close();
26+
}
27+
28+
parent::tearDown();
29+
}
30+
31+
public function test()
32+
{
33+
$this->context = $context = $this->createContext();
34+
35+
$fooQueue = $this->createQueue($context, 'foo_subscription_consumer_stop_on_false_spec');
36+
$barQueue = $this->createQueue($context, 'bar_subscription_consumer_stop_on_false_spec');
37+
38+
$expectedFooBody = __CLASS__.'foo'.time();
39+
$expectedBarBody = __CLASS__.'bar'.time();
40+
41+
$context->createProducer()->send($fooQueue, $context->createMessage($expectedFooBody));
42+
$context->createProducer()->send($barQueue, $context->createMessage($expectedBarBody));
43+
44+
$consumedMessages = 0;
45+
$callback = function(PsrMessage $message, PsrConsumer $consumer) use (&$consumedMessages) {
46+
$consumedMessages++;
47+
48+
$consumer->acknowledge($message);
49+
50+
return false;
51+
};
52+
53+
$fooConsumer = $context->createConsumer($fooQueue);
54+
$barConsumer = $context->createConsumer($barQueue);
55+
56+
$subscriptionConsumer = $context->createSubscriptionConsumer();
57+
$subscriptionConsumer->subscribe($fooConsumer, $callback);
58+
$subscriptionConsumer->subscribe($barConsumer, $callback);
59+
60+
$subscriptionConsumer->consume(1000);
61+
62+
$this->assertEquals(1, $consumedMessages);
63+
}
64+
65+
/**
66+
* @return PsrContext|PsrSubscriptionConsumerAwareContext
67+
*/
68+
abstract protected function createContext();
69+
70+
/**
71+
* @param PsrContext $context
72+
* @param string $queueName
73+
*
74+
* @return PsrQueue
75+
*/
76+
protected function createQueue(PsrContext $context, $queueName)
77+
{
78+
return $context->createQueue($queueName);
79+
}
80+
}

0 commit comments

Comments
 (0)