diff --git a/README.md b/README.md index 384ed5c2..33de35c3 100644 --- a/README.md +++ b/README.md @@ -7,14 +7,12 @@ [![License](https://poser.pugx.org/workerman/workerman/license)](https://packagist.org/packages/workerman/workerman) ## What is it -Workerman is an asynchronous event-driven PHP framework with high performance to build fast and scalable network applications. -Workerman supports HTTP, Websocket, SSL and other custom protocols. -Workerman supports event extension. +Workerman is an asynchronous event-driven PHP framework with high performance to build fast and scalable network applications. It supports HTTP, WebSocket, custom protocols, coroutines, and connection pools, making it ideal for handling high-concurrency scenarios efficiently. ## Requires A POSIX compatible operating system (Linux, OSX, BSD) POSIX and PCNTL extensions required -Event extension recommended for better performance +Event/Swoole/Swow extension recommended for better performance ## Installation @@ -153,90 +151,6 @@ $ws_worker->onMessage = function ($connection, $data) { Worker::runAll(); ``` -### Custom protocol -Protocols/MyTextProtocol.php -```php - -namespace Protocols; - -/** - * User defined protocol - * Format Text+"\n" - */ -class MyTextProtocol -{ - public static function input($recv_buffer) - { - // Find the position of the first occurrence of "\n" - $pos = strpos($recv_buffer, "\n"); - - // Not a complete package. Return 0 because the length of package can not be calculated - if ($pos === false) { - return 0; - } - - // Return length of the package - return $pos + 1; - } - - public static function decode($recv_buffer) - { - return trim($recv_buffer); - } - - public static function encode($data) - { - return $data . "\n"; - } -} -``` - -```php -use Workerman\Worker; - -require_once __DIR__ . '/vendor/autoload.php'; - -// #### MyTextProtocol worker #### -$text_worker = new Worker('MyTextProtocol://0.0.0.0:5678'); - -$text_worker->onConnect = function ($connection) { - echo "New connection\n"; -}; - -$text_worker->onMessage = function ($connection, $data) { - // Send data to client - $connection->send("Hello world\n"); -}; - -$text_worker->onClose = function ($connection) { - echo "Connection closed\n"; -}; - -// Run all workers -Worker::runAll(); -``` - -### Timer -```php - -use Workerman\Worker; -use Workerman\Timer; - -require_once __DIR__ . '/vendor/autoload.php'; - -$task = new Worker(); -$task->onWorkerStart = function ($task) { - // 2.5 seconds - $time_interval = 2.5; - $timer_id = Timer::add($time_interval, function () { - echo "Timer run\n"; - }); -}; - -// Run all workers -Worker::runAll(); -``` - ### AsyncTcpConnection (tcp/ws/text/frame etc...) ```php @@ -267,112 +181,267 @@ $worker->onWorkerStart = function () { Worker::runAll(); ``` +### Coroutine - -#### Use HTTP proxy +Coroutine is used to create coroutines, enabling the execution of asynchronous tasks to improve concurrency performance. ```php onWorkerStart = function($worker){ - echo '开始链接' . PHP_EOL; - $url = 'ws://stream.binance.com:9443/ws'; - $con = new AsyncTcpConnection($url); - $con->transport = 'ssl'; -// $con->proxySocks5 = '127.0.0.1:1080'; - $con->proxyHttp = '127.0.0.1:25378'; - - $con->onConnect = function(AsyncTcpConnection $con) { - $ww = [ - 'id' => 1, - 'method' => 'SUBSCRIBE', - 'params' => [ - "btcusdt@aggTrade", - "btcusdt@depth" - ] - ]; - echo '链接成功'; - $con->send(json_encode($ww)); - echo 'ok'; - }; +$worker->eventLoop = Swoole::class; // Or Swow::class or Fiber::class - $con->onMessage = function(AsyncTcpConnection $con, $data) { - echo $data; - }; +$worker->onMessage = function (TcpConnection $connection, Request $request) { + Coroutine::create(function () { + echo file_get_contents("https://www.example.com/event/notify"); + }); + $connection->send('ok'); +}; - $con->onClose = function (AsyncTcpConnection $con) { - echo 'onClose' . PHP_EOL; - }; +Worker::runAll(); +``` - $con->onError = function (AsyncTcpConnection $con, $code, $msg) { - echo "error [ $code ] $msg\n"; - }; +> Note: Coroutine require Swoole extension or Swow extension or [Fiber revolt/event-loop](https://github.com/revoltphp/event-loop), and the same applies below + +### Barrier +Barrier is used to manage concurrency and synchronization in coroutines. It allows tasks to run concurrently and waits until all tasks are completed, ensuring process synchronization. + +```php +connect(); +// Http Server +$worker = new Worker('http://0.0.0.0:8001'); +$worker->eventLoop = Swoole::class; // Or Swow::class or Fiber::class +$worker->onMessage = function (TcpConnection $connection, Request $request) { + $barrier = Barrier::create(); + for ($i=1; $i<5; $i++) { + Coroutine::create(function () use ($barrier, $i) { + file_get_contents("http://127.0.0.1:8002?task_id=$i"); + }); + } + // Wait all coroutine done + Barrier::wait($barrier); + $connection->send('All Task Done'); }; -\Workerman\Worker::runAll(); + +// Task Server +$task = new Worker('http://0.0.0.0:8002'); +$task->onMessage = function (TcpConnection $connection, Request $request) { + $task_id = $request->get('task_id'); + $message = "Task $task_id Done"; + echo $message . PHP_EOL; + $connection->close($message); +}; + +Worker::runAll(); ``` +### Parallel +Parallel executes multiple tasks concurrently and collects results. Use add to add tasks and wait to wait for completion and get results. Unlike Barrier, Parallel directly returns the results of each task. +```php +eventLoop = Swoole::class; // Or Swow::class or Fiber::class +$worker->onMessage = function (TcpConnection $connection, Request $request) { + $parallel = new Parallel(); + for ($i=1; $i<5; $i++) { + $parallel->add(function () use ($i) { + return file_get_contents("http://127.0.0.1:8002?task_id=$i"); + }); + } + $results = $parallel->wait(); + $connection->send(json_encode($results)); // Response: ["Task 1 Done","Task 2 Done","Task 3 Done","Task 4 Done"] +}; -#### Use Socks5 proxy +// Task Server +$task = new Worker('http://0.0.0.0:8002'); +$task->onMessage = function (TcpConnection $connection, Request $request) { + $task_id = $request->get('task_id'); + $message = "Task $task_id Done"; + $connection->close($message); +}; + +Worker::runAll(); +``` + +### Channel + +Channel is a mechanism for communication between coroutines. One coroutine can push data into the channel, while another can pop data from it, enabling synchronization and data sharing between coroutines. ```php eventLoop = Swoole::class; // Or Swow::class or Fiber::class +$worker->onMessage = function (TcpConnection $connection, Request $request) { + $channel = new Channel(2); + Coroutine::create(function () use ($channel) { + $channel->push('Task 1 Done'); + }); + Coroutine::create(function () use ($channel) { + $channel->push('Task 2 Done'); + }); + $result = []; + for ($i = 0; $i < 2; $i++) { + $result[] = $channel->pop(); + } + $connection->send(json_encode($result)); // Response: ["Task 1 Done","Task 2 Done"] +}; +Worker::runAll(); +``` -use Workerman\Connection\AsyncTcpConnection; -$worker = new \Workerman\Worker(); -$worker->onWorkerStart = function($worker){ - echo '开始链接' . PHP_EOL; - $url = 'ws://stream.binance.com:9443/ws'; - $con = new AsyncTcpConnection($url); - $con->transport = 'ssl'; - $con->proxySocks5 = '127.0.0.1:1080'; -// $con->proxyHttp = '127.0.0.1:25378'; - - $con->onConnect = function(AsyncTcpConnection $con) { - $ww = [ - 'id' => 1, - 'method' => 'SUBSCRIBE', - 'params' => [ - "btcusdt@aggTrade", - "btcusdt@depth" - ] - ]; - echo '链接成功'; - $con->send(json_encode($ww)); - echo 'ok'; - }; +### Pool - $con->onMessage = function(AsyncTcpConnection $con, $data) { - echo $data; - }; +Pool is used to manage connection or resource pools, improving performance by reusing resources (e.g., database connections). It supports acquiring, returning, creating, and destroying resources. - $con->onClose = function (AsyncTcpConnection $con) { - echo 'onClose' . PHP_EOL; - }; +```php +onError = function (AsyncTcpConnection $con, $code, $msg) { - echo "error [ $code ] $msg\n"; - }; +class RedisPool +{ + private Pool $pool; + public function __construct($host, $port, $max_connections = 10) + { + $pool = new Pool($max_connections); + $pool->setConnectionCreator(function () use ($host, $port) { + $redis = new \Redis(); + $redis->connect($host, $port); + return $redis; + }); + $pool->setConnectionCloser(function ($redis) { + $redis->close(); + }); + $pool->setHeartbeatChecker(function ($redis) { + $redis->ping(); + }); + $this->pool = $pool; + } + public function get(): \Redis + { + return $this->pool->get(); + } + public function put($redis): void + { + $this->pool->put($redis); + } +} - $con->connect(); +// Http Server +$worker = new Worker('http://0.0.0.0:8001'); +$worker->eventLoop = Swoole::class; // Or Swow::class or Fiber::class +$worker->onMessage = function (TcpConnection $connection, Request $request) { + static $pool; + if (!$pool) { + $pool = new RedisPool('127.0.0.1', 6379, 10); + } + $redis = $pool->get(); + $redis->set('key', 'hello'); + $value = $redis->get('key'); + $pool->put($redis); + $connection->send($value); }; -\Workerman\Worker::runAll(); +Worker::runAll(); ``` +### Pool for automatic acquisition and release + +```php +get(); + Context::set('pdo', $pdo); + // When the coroutine is destroyed, return the connection to the pool + Coroutine::defer(function () use ($pdo) { + self::$pool->put($pdo); + }); + } + return call_user_func_array([$pdo, $name], $arguments); + } + private static function initializePool(): void + { + self::$pool = new Pool(10); + self::$pool->setConnectionCreator(function () { + return new \PDO('mysql:host=127.0.0.1;dbname=your_database', 'your_username', 'your_password'); + }); + self::$pool->setConnectionCloser(function ($pdo) { + $pdo = null; + }); + self::$pool->setHeartbeatChecker(function ($pdo) { + $pdo->query('SELECT 1'); + }); + } +} +// Http Server +$worker = new Worker('http://0.0.0.0:8001'); +$worker->eventLoop = Swoole::class; // Or Swow::class or Fiber::class +$worker->onMessage = function (TcpConnection $connection, Request $request) { + $value = Db::query('SELECT NOW() as now')->fetchAll(); + $connection->send(json_encode($value)); +}; +Worker::runAll(); +``` ## Available commands ```php start.php start ``` @@ -390,7 +459,6 @@ proxy supports TLS1.3, no Sniproxy channel https://www.techempower.com/benchmarks/#section=data-r19&hw=ph&test=plaintext&l=zik073-1r - ### Supported by [![JetBrains logo.](https://resources.jetbrains.com/storage/products/company/brand/logos/jetbrains.svg)](https://jb.gg/OpenSourceSupport)