Skip to content

Commit 7f02463

Browse files
committed
Backport transaction() and subscribe() support from the 2.x branch
Signed-off-by: Cy Rossignol <[email protected]>
1 parent 1267b25 commit 7f02463

File tree

2 files changed

+364
-9
lines changed

2 files changed

+364
-9
lines changed

src/PredisConnection.php

Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
<?php
2+
3+
namespace Monospice\LaravelRedisSentinel;
4+
5+
use Closure;
6+
use InvalidArgumentException;
7+
use Predis\Client;
8+
use Predis\CommunicationException;
9+
use Predis\PubSub\Consumer as PubSub;
10+
use RuntimeException;
11+
12+
/**
13+
* Executes Redis commands using the Predis client.
14+
*
15+
* This package extends the Predis client to work around issues experienced
16+
* when using the it to send commands over "aggregate" connections (in this
17+
* case, Sentinel connections).
18+
*
19+
* Unlike in the 2.x branch, this PredisConnection implementation extends the
20+
* Predis client instead of consuming it as a dependency so we don't break
21+
* backward compatibility in Laravel 5.3 and below.
22+
*
23+
* @category Package
24+
* @package Monospice\LaravelRedisSentinel
25+
* @author Cy Rossignol <[email protected]>
26+
* @license See LICENSE file
27+
* @link https://github.com/monospice/laravel-redis-sentinel-drivers
28+
*/
29+
class PredisConnection extends Client
30+
{
31+
/**
32+
* The number of times the client attempts to retry a command when it fails
33+
* to connect to a Redis instance behind Sentinel.
34+
*
35+
* @var int
36+
*/
37+
protected $retryLimit = 20;
38+
39+
/**
40+
* The time in milliseconds to wait before the client retries a failed
41+
* command.
42+
*
43+
* @var int
44+
*/
45+
protected $retryWait = 1000;
46+
47+
/**
48+
* Set the default amount of time to wait before determining that a
49+
* connection attempt to a Sentinel server failed.
50+
*
51+
* @param float $seconds The timeout value in seconds.
52+
*
53+
* @return $this The current instance for method chaining.
54+
*/
55+
public function setSentinelTimeout($seconds)
56+
{
57+
$this->getConnection()->setSentinelTimeout($seconds);
58+
59+
return $this;
60+
}
61+
62+
/**
63+
* Set the default number of attempts to retry a command when the client
64+
* fails to connect to a Redis instance behind Sentinel.
65+
*
66+
* @param int $attempts With a value of 0, throw an exception after the
67+
* first failed attempt. Pass a value of -1 to retry connections forever.
68+
*
69+
* @return $this The current instance for method chaining.
70+
*/
71+
public function setRetryLimit($attempts)
72+
{
73+
$this->retryLimit = (int) $attempts;
74+
$this->getConnection()->setRetryLimit($attempts);
75+
76+
return $this;
77+
}
78+
79+
/**
80+
* Set the time to wait before retrying a command after a connection
81+
* attempt failed.
82+
*
83+
* @param int $milliseconds The wait time in milliseconds. When 0, retry
84+
* a failed command immediately.
85+
*
86+
* @return $this The current instance for method chaining.
87+
*/
88+
public function setRetryWait($milliseconds)
89+
{
90+
$this->retryWait = (int) $milliseconds;
91+
$this->getConnection()->setRetryWait($milliseconds);
92+
93+
return $this;
94+
}
95+
96+
/**
97+
* Set whether the client should update the list of known Sentinels each
98+
* time it needs to connect to a Redis server behind Sentinel.
99+
*
100+
* @param bool $enable If TRUE, fetch the updated Sentinel list.
101+
*
102+
* @return $this The current instance for method chaining.
103+
*/
104+
public function setUpdateSentinels($enable)
105+
{
106+
$this->getConnection()->setUpdateSentinels($enable);
107+
108+
return $this;
109+
}
110+
111+
/**
112+
* Subscribe to a set of given channels for messages.
113+
*
114+
* @param array|string $channels The names of the channels to subscribe to.
115+
* @param Closure $callback Executed for each message. Receives the
116+
* message string in the first argument and the message channel as the
117+
* second argument. Return FALSE to unsubscribe.
118+
* @param string $method The subscription command ("subscribe" or
119+
* "psubscribe").
120+
*
121+
* @return void
122+
*/
123+
public function createSubscription(
124+
$channels,
125+
Closure $callback,
126+
$method = 'subscribe'
127+
) {
128+
$this->retryOnFailure(function () use ($method, $channels, $callback) {
129+
$loop = $this->pubSubLoop([ $method => (array) $channels ]);
130+
131+
if ($method === 'psubscribe') {
132+
$messageKind = 'pmessage';
133+
} else {
134+
$messageKind = 'message';
135+
}
136+
137+
$this->consumeMessages($loop, $messageKind, $callback);
138+
139+
unset($loop);
140+
});
141+
}
142+
143+
/**
144+
* Create a new PUB/SUB subscriber and pass messages to the callback if
145+
* provided.
146+
*
147+
* WARNING: Consumers created using this method are not monitored for
148+
* connection failures. For Sentinel support, use one of the methods
149+
* provided by the Laravel API instead (subscribe() and psubscribe()).
150+
*
151+
* @param array|null $options Configures the channel(s) to subscribe to.
152+
* @param callable $callback Optional callback executed for each message
153+
* published to the configured channel(s).
154+
*
155+
* @return \Predis\PubSub\Consumer|null A PUB/SUB context used to create
156+
* a subscription loop if no callback provided.
157+
*/
158+
public function pubSubLoop($options = null, $callback = null)
159+
{
160+
// Messages published to the master propagate to each of the slaves. We
161+
// pick a random slave to distribute load away from the master:
162+
return $this->getRandomSlave()->pubSubLoop($options, $callback);
163+
}
164+
165+
/**
166+
* Execute commands in a transaction.
167+
*
168+
* This package overrides the transaction() method to work around a
169+
* limitation in the Predis API that disallows transactions on "aggregate"
170+
* connections like Sentinel. Note that transactions execute on the Redis
171+
* master instance.
172+
*
173+
* @param array|callable|null $options Predis transaction settings OR a
174+
* callback to execute. When passing a callback for the first argument, do
175+
* not supply a second argument.
176+
* @param callable|null $callback Contains the Redis commands to
177+
* execute in the transaction. The callback receives a
178+
* Predis\Transaction\MultiExec transaction abstraction as the only
179+
* argument. We use this object to execute Redis commands by calling its
180+
* methods just like we would with the Laravel Redis service.
181+
*
182+
* @return array|Predis\Transaction\MultiExec An array containing the
183+
* result for each command executed during the transaction. If no callback
184+
* provided, returns an instance of the Predis transaction abstraction.
185+
*/
186+
public function transaction($options = null, callable $callback = null)
187+
{
188+
return $this->retryOnFailure(function () use ($options, $callback) {
189+
if ($callback !== null) {
190+
return $this->getMaster()->transaction($options, $callback);
191+
}
192+
193+
return $this->getMaster()->transaction($options);
194+
});
195+
}
196+
197+
/**
198+
* Creates a new client instance for the specified connection ID or alias,
199+
* only when working with an aggregate connection (cluster, replication).
200+
* The new client instances uses the same options of the original one.
201+
*
202+
* @param string $connectionID Identifier of a connection.
203+
*
204+
* @return Client A Predis client instance for the specified connection.
205+
*
206+
* @throws InvalidArgumentException When the aggregate connection does not
207+
* contain a node that matches the specified ID.
208+
*/
209+
public function getClientFor($connectionID)
210+
{
211+
if (! $connection = $this->getConnectionById($connectionID)) {
212+
throw new InvalidArgumentException(
213+
"Invalid connection ID: $connectionID."
214+
);
215+
}
216+
217+
// Because this class extends the Predis client, we need to return an
218+
// instance of the base client class so that clients created for the
219+
// Redis servers behind Sentinel don't implement the overrides here.
220+
return new parent($connection, $this->options);
221+
}
222+
223+
/**
224+
* Attempt to retry the provided operation when the client fails to connect
225+
* to a Redis server.
226+
*
227+
* We adapt Predis' Sentinel connection failure handling logic here to
228+
* reproduce the high-availability mode provided by the actual client. To
229+
* work around "aggregate" connection limitations in Predis, this class
230+
* provides methods that don't use the high-level Sentinel connection API
231+
* of Predis directly, so it needs to handle connection failures itself.
232+
*
233+
* @param callable $callback The operation to execute.
234+
*
235+
* @return mixed The result of the first successful attempt.
236+
*
237+
* @throws CommunicationException After exhausting the allowed number of
238+
* attempts to reconnect.
239+
*/
240+
protected function retryOnFailure(callable $callback)
241+
{
242+
$attempts = 0;
243+
244+
do {
245+
try {
246+
return $callback();
247+
} catch (CommunicationException $exception) {
248+
$exception->getConnection()->disconnect();
249+
$this->getConnection()->querySentinel();
250+
251+
usleep($this->retryWait * 1000);
252+
253+
$attempts++;
254+
}
255+
} while ($attempts <= $this->retryLimit);
256+
257+
throw $exception;
258+
}
259+
260+
/**
261+
* Execute the provided callback for each message read by the PUB/SUB
262+
* consumer.
263+
*
264+
* @param PubSub $loop Reads the messages published to a channel.
265+
* @param string $kind The subscribed message type ([p]message).
266+
* @param Closure $callback Executed for each message.
267+
*
268+
* @return void
269+
*/
270+
protected function consumeMessages(PubSub $loop, $kind, Closure $callback)
271+
{
272+
foreach ($loop as $message) {
273+
if ($message->kind === $kind) {
274+
if ($callback($message->payload, $message->channel) === false) {
275+
return;
276+
}
277+
}
278+
}
279+
}
280+
281+
/**
282+
* Get a Predis client instance for the master.
283+
*
284+
* @return Client The client instance for the current master.
285+
*/
286+
protected function getMaster()
287+
{
288+
return $this->getClientFor('master');
289+
}
290+
291+
/**
292+
* Get a Predis client instance for a random slave.
293+
*
294+
* @param bool $fallbackToMaster If TRUE, return a client for the master
295+
* if the connection does not include any slaves.
296+
*
297+
* @return Client The client instance for the selected slave.
298+
*
299+
* @throws RuntimeException When the client cannot reach any replicas
300+
* (and the master if $fallbackToMaster is TRUE).
301+
*/
302+
protected function getRandomSlave($fallbackToMaster = true)
303+
{
304+
$slaves = $this->getConnection()->getSlaves();
305+
306+
if (count($slaves) > 0) {
307+
$slave = $slaves[rand(1, count($slaves)) - 1];
308+
309+
return $this->getClientFor($slave->getParameters()->alias);
310+
}
311+
312+
if ($fallbackToMaster) {
313+
return $this->getMaster();
314+
}
315+
316+
throw new RuntimeException('No slave present on connection.');
317+
}
318+
}

0 commit comments

Comments
 (0)