Skip to content

Commit 5b49367

Browse files
committed
fix amqp orpahned message exception.
1 parent 1f3cb57 commit 5b49367

File tree

1 file changed

+9
-2
lines changed

1 file changed

+9
-2
lines changed

src/SubscriptionConsumerConsumeUntilUnsubscribedSpec.php

+9-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Interop\Queue\PsrContext;
77
use Interop\Queue\PsrMessage;
88
use Interop\Queue\PsrQueue;
9+
use Interop\Queue\PsrSubscriptionConsumer;
910
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
1011
use PHPUnit\Framework\TestCase;
1112

@@ -19,6 +20,11 @@ abstract class SubscriptionConsumerConsumeUntilUnsubscribedSpec extends TestCase
1920
*/
2021
private $context;
2122

23+
/**
24+
* @var PsrSubscriptionConsumer
25+
*/
26+
protected $subscriptionConsumer;
27+
2228
protected function tearDown()
2329
{
2430
if ($this->context) {
@@ -50,19 +56,20 @@ public function test()
5056
return true;
5157
};
5258

53-
$subscriptionConsumer = $context->createSubscriptionConsumer();
59+
$this->subscriptionConsumer = $subscriptionConsumer = $context->createSubscriptionConsumer();
5460
$subscriptionConsumer->subscribe($fooConsumer, $callback);
5561
$subscriptionConsumer->subscribe($barConsumer, $callback);
5662

5763
$subscriptionConsumer->consume(1000);
5864

5965
$this->assertEquals(2, $consumedMessages);
6066

67+
$subscriptionConsumer->unsubscribe($fooConsumer);
68+
6169
$context->createProducer()->send($fooQueue, $context->createMessage());
6270
$context->createProducer()->send($barQueue, $context->createMessage());
6371

6472
$consumedMessages = 0;
65-
$subscriptionConsumer->unsubscribe($fooConsumer);
6673
$subscriptionConsumer->consume(1000);
6774

6875
$this->assertEquals(1, $consumedMessages);

0 commit comments

Comments
 (0)