Skip to content

Commit 1b0b3c4

Browse files
committed
限制Worker进程异常退出后,10分钟内最多10次重新拉起,超过限制后不再重新拉起;Master进程名显示当前Master的启动文件名。
1 parent 7884873 commit 1b0b3c4

12 files changed

+70
-29
lines changed

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
## HEAD (Unreleased)
44
_(none)_
55

6+
## 0.8.0 (2017-08-13)
7+
8+
* 限制Worker进程异常退出后,10分钟内最多10次重新拉起,超过限制后不再重新拉起。
9+
* Master进程名显示当前Master的启动文件名。
10+
611
## 0.7.5 (2017-08-11)
712

813
* 修复 ForawrdWorker 未设 Logger 报错的问题。

bin/plumber

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,5 @@ if (!$config instanceof \Pimple\Container ) {
4545
exit(2);
4646
}
4747

48-
$plumber = new Plumber($config);
48+
$plumber = new Plumber($config, $configFile);
4949
$plumber->main($cmd);

composer.json

+3-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@
2323
"ext-swoole": ">=1.7.17",
2424
"pimple/pimple": "^3",
2525
"docopt/docopt": "^1",
26-
"codeages/beanstalk-client" : "^0.1.4",
27-
"monolog/monolog": "^1.22.0"
26+
"codeages/beanstalk-client" : "^0.1",
27+
"codeages/rate-limiter": "^0.2",
28+
"monolog/monolog": "^1.22"
2829
},
2930
"bin": ["bin/plumber"],
3031
"require-dev": {

example/Example2Worker.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ class Example2Worker implements IWorker
1010

1111
public function execute($data)
1212
{
13-
$this->logger->info("I'm example 2 worker.");
13+
sleep(1);
14+
throw new \RuntimeException("exception from example2 worker.");
1415

1516
return array('code' => IWorker::FINISH);
1617
}

example/put_message.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
$beanstalk = new BeanstalkClient($config);
1414

1515
$beanstalk->connect();
16-
$beanstalk->useTube('ExampleForward');
16+
$beanstalk->useTube('Example2');
1717

1818
$i=0;
1919

src/ContainerAwareTrait.php

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
<?php
2+
23
namespace Codeages\Plumber;
34

45
use Pimple\Container;

src/ForwardWorker.php

-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
namespace Codeages\Plumber;
44

5-
use Psr\Log\LoggerInterface;
65
use Codeages\Beanstalk\Client as BeanstalkClient;
76
use Codeages\Beanstalk\ClientProxy as BeanstalkClientProxy;
87

src/IWorker.php

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
<?php
2+
23
namespace Codeages\Plumber;
34

45
use Pimple\Container;

src/Plumber.php

+47-20
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22

33
namespace Codeages\Plumber;
44

5+
use swoole_process;
6+
use Psr\Log\LoggerInterface;
57
use Pimple\Container;
68
use Monolog\Logger;
79
use Monolog\Handler\StreamHandler;
810
use Monolog\ErrorHandler;
9-
use Psr\Log\LoggerInterface;
10-
use swoole_process;
11+
use Codeages\RateLimiter\RateLimiter;
1112

1213
class Plumber
1314
{
@@ -16,8 +17,10 @@ class Plumber
1617
*/
1718
protected $container;
1819

20+
protected $configFilePath;
21+
1922
/**
20-
* @var boolean
23+
* @var bool
2124
*/
2225
protected $daemon;
2326

@@ -28,15 +31,27 @@ class Plumber
2831
*/
2932
protected $logger;
3033

34+
/**
35+
* @var RateLimiter
36+
*/
37+
protected $limiter;
38+
3139
const ALREADY_RUNNING_ERROR = 1;
3240

3341
const LOCK_PROCESS_ERROR = 2;
3442

35-
public function __construct(Container $container)
43+
public function __construct(Container $container, $configFilePath)
3644
{
3745
$container['run_flag'] = new SharedRunFlag();
3846
$this->locker = new ProcessLocker($container['pid_path']);
47+
$this->limiter = new RateLimiter(
48+
'process_recreate',
49+
10,
50+
600,
51+
new \Codeages\RateLimiter\Storage\ArrayStorage()
52+
);
3953
$this->container = $container;
54+
$this->configFilePath = $configFilePath;
4055
}
4156

4257
public function main($op)
@@ -70,7 +85,7 @@ protected function start($daemon = true)
7085
exit(self::LOCK_PROCESS_ERROR);
7186
}
7287

73-
swoole_set_process_name('plumber: master');
88+
swoole_set_process_name(sprintf('plumber: master (%s)', $this->configFilePath));
7489

7590
$logger = new Logger('plumber');
7691
if ($daemon) {
@@ -81,12 +96,12 @@ protected function start($daemon = true)
8196
ErrorHandler::register($logger);
8297
$this->container['logger'] = $this->logger = $logger;
8398

84-
$this->workers = $this->createWorkers();
99+
$this->workers = $this->startWorkers();
85100
$this->registerSignal();
86101

87102
foreach ($this->workers as $worker) {
88-
swoole_event_add($worker->pipe, function ($pipe) use ($worker, $logger) {
89-
$logger->info("read from worker:".$worker->read());
103+
swoole_event_add($worker['process']->pipe, function ($pipe) use ($worker, $logger) {
104+
$logger->info('read from worker:'.$worker['process']->read());
90105
});
91106
}
92107

@@ -106,6 +121,7 @@ protected function stop()
106121
$pid = $this->locker->getId();
107122
if (empty($pid)) {
108123
echo "plumber is not running.\n";
124+
109125
return;
110126
}
111127

@@ -132,20 +148,23 @@ protected function restart()
132148
/**
133149
* 创建队列的监听器.
134150
*/
135-
private function createWorkers()
151+
private function startWorkers()
136152
{
137153
$workers = [];
138154
foreach ($this->container['tubes'] as $name => $options) {
139155
for ($i = 0; $i < $options['worker_num']; ++$i) {
140-
$worker = $this->createWorker($name);
141-
$workers[$worker->pid] = $worker;
156+
$worker = $this->startWorker($name);
157+
$workers[$worker->pid] = array(
158+
'tube' => $name,
159+
'process' => $worker,
160+
);
142161
}
143162
}
144163

145164
return $workers;
146165
}
147166

148-
private function createWorker($queueName)
167+
private function startWorker($queueName)
149168
{
150169
$worker = new \swoole_process(function ($process) use ($queueName) {
151170
$process->name("plumber: queue `{$queueName}` worker");
@@ -172,18 +191,26 @@ private function registerSignal()
172191
}
173192

174193
if ($this->container['run_flag']->isRuning()) {
175-
$newPid = $this->workers[$ret['pid']]->start();
176-
$this->workers[$newPid] = $this->workers[$ret['pid']];
194+
$worker = $this->workers[$ret['pid']];
177195
unset($this->workers[$ret['pid']]);
178-
$this->logger->notice("process #{$ret['pid']} exited, #{$newPid} is recreated.", $ret);
196+
197+
$remainTimes = $this->limiter->check($worker['tube']);
198+
if ($remainTimes > 0) {
199+
$newPid = $worker['process']->start();
200+
$this->workers[$newPid] = $worker;
201+
$this->logger->notice("tube {$worker['tube']} process #{$ret['pid']} exited, #{$newPid} is recreated, remain {$remainTimes} recreated times. .", $ret);
202+
} else {
203+
$this->logger->notice("tube {$worker['tube']} process #{$ret['pid']} exited, reached max recreated times.", $ret);
204+
}
179205
} else {
180206
unset($this->workers[$ret['pid']]);
181207
$this->logger->info("process #{$ret['pid']} exited.", $ret);
182-
if (empty($this->workers)) {
183-
$this->locker->release();
184-
$this->logger->info("stoped.");
185-
swoole_event_exit();
186-
}
208+
}
209+
210+
if (empty($this->workers)) {
211+
$this->locker->release();
212+
$this->logger->info('stoped.');
213+
swoole_event_exit();
187214
}
188215
}
189216
});

src/ProcessLocker.php

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public function lock($id)
2020
}
2121
$id = intval($id);
2222
file_put_contents($this->path, $id);
23+
2324
return true;
2425
}
2526

src/SharedRunFlag.php

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
<?php
2+
23
namespace Codeages\Plumber;
34

45
use swoole_atomic;

src/WorkerProcess.php

+7-3
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ public function run()
5252
try {
5353
$result = $executor->execute($job);
5454
} catch (\Exception $e) {
55-
$message = sprintf('tube({$tubeName}, #%d): execute job #%d exception, `%s`', $process->pid, $job['id'], $e->getMessage());
55+
$message = sprintf('tube(%s, #%d): execute job #%d exception, `%s`', $this->tubeName, $process->pid, $job['id'], $e->getMessage());
5656
$logger->error($message, $job);
57-
continue;
57+
throw $e;
5858
}
5959

6060
$code = is_array($result) ? $result['code'] : $result;
@@ -170,13 +170,15 @@ private function retryJob($job, $result)
170170
$stats = $queue->statsJob($job['id']);
171171
if ($stats === false) {
172172
$logger->error("tube({$tubeName}, #{$process->pid}): job #{$job['id']} get stats failed, in retry executed.", $job);
173+
173174
return;
174175
}
175176

176177
$logger->info("tube({$tubeName}, #{$process->pid}): job #{$job['id']} retry {$message['__retry']} times.");
177178
$deleted = $queue->delete($job['id']);
178179
if (!$deleted) {
179180
$logger->error("tube({$tubeName}, #{$process->pid}): job #{$job['id']} delete failed, in retry executed.", $job);
181+
180182
return;
181183
}
182184

@@ -204,16 +206,18 @@ private function buryJob($job, $result)
204206
$stats = $queue->statsJob($job['id']);
205207
if ($stats === false) {
206208
$logger->error("tube({$tubeName}, #{$process->pid}): job #{$job['id']} get stats failed, in bury executed.", $job);
209+
207210
return;
208211
}
209212

210213
$pri = isset($result['pri']) ? $result['pri'] : $stats['pri'];
211214
$burried = $queue->bury($job['id'], $pri);
212215
if ($burried === false) {
213216
$logger->error("tube({$tubeName}, #{$process->pid}): job #{$job['id']} bury failed", $job);
217+
214218
return;
215219
}
216220

217-
$logger->info("tube({$tubeName}, #{$process->pid}): job #{$job['id']} buried.");
221+
$logger->notice("tube({$tubeName}, #{$process->pid}): job #{$job['id']} buried.");
218222
}
219223
}

0 commit comments

Comments
 (0)