7
7
use PhpAmqpLib \Exception \AMQPTimeoutException ;
8
8
use PhpAmqpLib \Message \AMQPMessage ;
9
9
10
- final class BatchConsumer extends BaseAmqp implements DequeuerInterface
10
+ class BatchConsumer extends BaseAmqp implements DequeuerInterface
11
11
{
12
12
/**
13
13
* @var \Closure|callable
14
14
*/
15
- private $ callback ;
15
+ protected $ callback ;
16
16
17
17
/**
18
18
* @var bool
19
19
*/
20
- private $ forceStop = false ;
20
+ protected $ forceStop = false ;
21
21
22
22
/**
23
23
* @var int
24
24
*/
25
- private $ idleTimeout = 0 ;
25
+ protected $ idleTimeout = 0 ;
26
26
27
27
/**
28
28
* @var bool
@@ -32,32 +32,54 @@ final class BatchConsumer extends BaseAmqp implements DequeuerInterface
32
32
/**
33
33
* @var int
34
34
*/
35
- private $ idleTimeoutExitCode ;
35
+ protected $ idleTimeoutExitCode ;
36
36
37
37
/**
38
38
* @var int
39
39
*/
40
- private $ memoryLimit = null ;
40
+ protected $ memoryLimit = null ;
41
41
42
42
/**
43
43
* @var int
44
44
*/
45
- private $ prefetchCount ;
45
+ protected $ prefetchCount ;
46
46
47
47
/**
48
48
* @var int
49
49
*/
50
- private $ timeoutWait = 3 ;
50
+ protected $ timeoutWait = 3 ;
51
51
52
52
/**
53
53
* @var array
54
54
*/
55
- private $ messages = array ();
55
+ protected $ messages = array ();
56
56
57
57
/**
58
58
* @var int
59
59
*/
60
- private $ batchCounter = 0 ;
60
+ protected $ batchCounter = 0 ;
61
+
62
+ /**
63
+ * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
64
+ * any currently running consumption will not be interrupted.
65
+ */
66
+ protected $ gracefulMaxExecutionDateTime ;
67
+
68
+ /**
69
+ * @param \DateTime|null $dateTime
70
+ */
71
+ public function setGracefulMaxExecutionDateTime (\DateTime $ dateTime = null )
72
+ {
73
+ $ this ->gracefulMaxExecutionDateTime = $ dateTime ;
74
+ }
75
+
76
+ /**
77
+ * @param int $secondsInTheFuture
78
+ */
79
+ public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture ($ secondsInTheFuture )
80
+ {
81
+ $ this ->setGracefulMaxExecutionDateTime (new \DateTime ("+ {$ secondsInTheFuture } seconds " ));
82
+ }
61
83
62
84
/**
63
85
* @param \Closure|callable $callback
@@ -80,6 +102,7 @@ public function consume()
80
102
$ this ->batchConsume ();
81
103
}
82
104
105
+ $ this ->checkGracefulMaxExecutionDateTime ();
83
106
$ this ->maybeStopConsumer ();
84
107
85
108
$ timeout = $ this ->isEmptyBatch () ? $ this ->getIdleTimeout () : $ this ->getTimeoutWait ();
@@ -530,4 +553,24 @@ public function getMemoryLimit()
530
553
{
531
554
return $ this ->memoryLimit ;
532
555
}
556
+
557
+ /**
558
+ * Check graceful max execution date time and stop if limit is reached
559
+ *
560
+ * @return void
561
+ */
562
+ private function checkGracefulMaxExecutionDateTime ()
563
+ {
564
+ if (!$ this ->gracefulMaxExecutionDateTime ) {
565
+ return ;
566
+ }
567
+
568
+ $ now = new \DateTime ();
569
+
570
+ if ($ this ->gracefulMaxExecutionDateTime > $ now ) {
571
+ return ;
572
+ }
573
+
574
+ $ this ->forceStopConsumer ();
575
+ }
533
576
}
0 commit comments