diff --git a/src/Client.php b/src/Client.php index 82779b0..3a98dfe 100644 --- a/src/Client.php +++ b/src/Client.php @@ -15,6 +15,8 @@ use support\Log; use Workerman\RedisQueue\Client as RedisClient; +use Workerman\RedisQueue\UnretryableException; +use Workerman\Timer; /** * Class RedisQueue @@ -23,35 +25,169 @@ * Strings methods * @method static void send($queue, $data, $delay=0) */ -class Client +class Client extends RedisClient { + protected string $name = ''; + + protected static int $retry_timer = 0; + /** * @var Client[] */ - protected static $_connections = null; - + protected static array $_connections = []; /** - * @param string $name - * @return RedisClient + * Get redis connection + * + * @param string $name + * @return static */ - public static function connection($name = 'default') { - if (!isset(static::$_connections[$name])) { + public static function connection(string $name = 'default'): static + { + if (! isset(static::$_connections[$name])) { $config = config('redis_queue', config('plugin.webman.redis-queue.redis', [])); - if (!isset($config[$name])) { - throw new \RuntimeException("RedisQueue connection $name not found"); + if (! isset($config[$name])) { + throw new \RuntimeException("RedisQueue connection {$name} not found"); } - $host = $config[$name]['host']; - $options = $config[$name]['options']; - $client = new RedisClient($host, $options); + + $client = new static($config[$name]['host'], $config[$name]['options']); if (method_exists($client, 'logger')) { $client->logger(Log::channel('plugin.webman.redis-queue.default')); } + + $client->name = $name; static::$_connections[$name] = $client; } return static::$_connections[$name]; } + public function close() + { + unset(static::$_connections[$this->name]); + + // 停止同步异步任务的定时器 + Timer::del(static::$retry_timer); + + // 将订阅队列清空,避免后续还有 pull 定时任务被添加 + $this->_subscribeQueues = []; + + // 延时 1.01s 关闭连接,避免关闭之前还有监听的数据到来(pull 的时候定时器延时为 0.000001,阻塞延时为 1,所以要稍大一些,确保最后一次 pull 完成) + Timer::add(1.01, function () { + $this->_redisSubscribe->close(); + $this->_redisSend->close(); + }, [], false); + } + + public function pull() + { + $this->tryToPullDelayQueue(); + if (! $this->_subscribeQueues || ! empty($this->_redisSubscribe->brPoping)) { + return; + } + + $cb = function ($data) use (&$cb) { + // 消费数据 + $this->consume($data); + + // 重新监听 + if ($this->_subscribeQueues) { + $this->_redisSubscribe->brPoping = 1; + Timer::add(0.000001, [$this->_redisSubscribe, 'brPop'], [\array_keys($this->_subscribeQueues), 1, $cb], false); + } + + // 处理信号 + function_exists('pcntl_signal_dispatch') && pcntl_signal_dispatch(); + }; + + $this->_redisSubscribe->brPoping = 1; + $this->_redisSubscribe->brPop(\array_keys($this->_subscribeQueues), 1, $cb); + } + + protected function tryToPullDelayQueue() + { + if (static::$retry_timer) { + return; + } + static::$retry_timer = Timer::add(1, function () { + $now = time(); + $options = ['LIMIT', 0, 128]; + $this->_redisSend->zrevrangebyscore($this->_options['prefix'] . static::QUEUE_DELAYED, $now, '-inf', $options, function ($items) { + if ($items === false) { + throw new \RuntimeException($this->_redisSend->error()); + } + foreach ($items as $package_str) { + $this->_redisSend->zRem($this->_options['prefix'] . static::QUEUE_DELAYED, $package_str, function ($result) use ($package_str) { + if ($result !== 1) { + return; + } + $package = \json_decode($package_str, true); + if (!$package) { + $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_FAILED, $package_str); + return; + } + $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_WAITING . $package['queue'], $package_str); + }); + } + }); + }); + } + + protected function consume($data) + { + if (empty($data)) { + return; + } + + $this->_redisSubscribe->brPoping = 0; + + [$redis_key, $package_str] = $data; + if (! $package = json_decode($package_str, true)) { + $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_FAILED, $package_str); + return; + } + + // 取消订阅/未订阅,放回队列 + if (! $callback = $this->_subscribeQueues[$redis_key] ?? null) { + $this->_redisSend->rPush($redis_key, $package_str); + return; + } + + try { + \call_user_func($callback, $package['data']); + } catch (\Throwable $e) { + $this->log((string)$e); + $package['max_attempts'] = $this->_options['max_attempts']; + $package['error'] = $e->getMessage(); + + if ($e instanceof UnretryableException) { + $this->fail($package); + return; + } + + $modified = null; + if ($this->_consumeFailure) { + try { + $modified = \call_user_func($this->_consumeFailure, $e, $package); + } catch (\Throwable $ta) { + $this->log((string)$ta); + } + } + + if (is_array($modified)) { + $package['data'] = $modified['data'] ?? $package['data']; + $package['attempts'] = $modified['attempts'] ?? $package['attempts']; + $package['max_attempts'] = $modified['max_attempts'] ?? $package['max_attempts']; + $package['error'] = $modified['error'] ?? $package['error']; + } + + if (++$package['attempts'] > $package['max_attempts']) { + $this->fail($package); + } else { + $this->retry($package); + } + } + } + /** * @param $name * @param $arguments @@ -59,6 +195,6 @@ public static function connection($name = 'default') { */ public static function __callStatic($name, $arguments) { - return static::connection('default')->{$name}(... $arguments); + return static::connection()->{$name}(... $arguments); } } diff --git a/src/Process/Consumer.php b/src/Process/Consumer.php index 653d64e..8fca438 100644 --- a/src/Process/Consumer.php +++ b/src/Process/Consumer.php @@ -27,62 +27,86 @@ class Consumer /** * @var string */ - protected $_consumerDir = ''; + protected string $_consumerDir = ''; /** * @var array */ - protected $_consumers = []; + protected array $_consumers = []; /** - * StompConsumer constructor. - * @param string $consumer_dir + * @var string[] */ - public function __construct($consumer_dir = '') + protected array $_connections = []; + + /** + * Consumer constructor. + * + * @param string $consumer_dir + */ + public function __construct(string $consumer_dir = '') { $this->_consumerDir = $consumer_dir; } - /** - * onWorkerStart. - */ - public function onWorkerStart() + public function onWorkerStart(): void { - if (!is_dir($this->_consumerDir)) { + if (! is_dir($this->_consumerDir)) { echo "Consumer directory {$this->_consumerDir} not exists\r\n"; return; } - $dir_iterator = new \RecursiveDirectoryIterator($this->_consumerDir); - $iterator = new \RecursiveIteratorIterator($dir_iterator); - foreach ($iterator as $file) { - if (is_dir($file)) { + + $iterator = new \RecursiveIteratorIterator( + new \RecursiveDirectoryIterator( + $this->_consumerDir, + \FilesystemIterator::CURRENT_AS_FILEINFO|\FilesystemIterator::SKIP_DOTS|\FilesystemIterator::KEY_AS_PATHNAME + ) + ); + + // 对每一个 consumer 设置订阅 + /** @var \SplFileInfo $file */ + foreach ($iterator as $pathname => $file) { + // 文件夹或非 PHP 文件 + if ($file->isDir() || $file->getExtension() !== 'php') { continue; } - $fileinfo = new \SplFileInfo($file); - $ext = $fileinfo->getExtension(); - if ($ext === 'php') { - $class = str_replace('/', "\\", substr(substr($file, strlen(base_path())), 0, -4)); - if (is_a($class, 'Webman\RedisQueue\Consumer', true)) { - $consumer = Container::get($class); - $connection_name = $consumer->connection ?? 'default'; - $queue = $consumer->queue; - if (!$queue) { - echo "Consumer {$class} queue not exists\r\n"; - continue; - } - $this->_consumers[$queue] = $consumer; - $connection = Client::connection($connection_name); - $connection->subscribe($queue, [$consumer, 'consume']); - if (method_exists($connection, 'onConsumeFailure')) { - $connection->onConsumeFailure(function ($exeption, $package) { - $consumer = $this->_consumers[$package['queue']] ?? null; - if ($consumer && method_exists($consumer, 'onConsumeFailure')) { - return call_user_func([$consumer, 'onConsumeFailure'], $exeption, $package); - } - }); + + // 非 Consumer 子类 + $class = str_replace('/', "\\", substr(substr($pathname, strlen(base_path())), 0, -4)); + if (! is_a($class, 'Webman\RedisQueue\Consumer', true)) { + continue; + } + + $consumer = Container::get($class); + if (! $queue = $consumer->queue) { + echo "Consumer {$class} queue not exists\r\n"; + continue; + } + + // 保存链接和队列信息 + $connection_name = $consumer->connection ?? 'default'; + $this->_connections[$connection_name] = true; + $this->_consumers[$queue] = $consumer; + + $connection = Client::connection($connection_name); + $connection->subscribe($queue, [$consumer, 'consume']); + if (method_exists($connection, 'onConsumeFailure')) { + $connection->onConsumeFailure(function ($exception, $package) { + $consumer = $this->_consumers[$package['queue']] ?? null; + if ($consumer && method_exists($consumer, 'onConsumeFailure')) { + return call_user_func([$consumer, 'onConsumeFailure'], $exception, $package); } - } + return $package; + }); } } } + + public function onWorkerReload(): void + { + // 关闭所有订阅的连接 + foreach ($this->_connections as $name => $value) { + Client::connection($name)->close(); + } + } }