diff --git a/src/Client/SyncClient.php b/src/Client/SyncClient.php index 2cb0d8e..4ce5da5 100644 --- a/src/Client/SyncClient.php +++ b/src/Client/SyncClient.php @@ -18,10 +18,6 @@ use longlang\phpkafka\Protocol\KafkaRequest; use longlang\phpkafka\Protocol\RequestHeader\RequestHeader; use longlang\phpkafka\Protocol\ResponseHeader\ResponseHeader; -use longlang\phpkafka\Protocol\SaslAuthenticate\SaslAuthenticateRequest; -use longlang\phpkafka\Protocol\SaslAuthenticate\SaslAuthenticateResponse; -use longlang\phpkafka\Protocol\SaslHandshake\SaslHandshakeRequest; -use longlang\phpkafka\Protocol\SaslHandshake\SaslHandshakeResponse; use longlang\phpkafka\Protocol\Type\Int32; use longlang\phpkafka\Sasl\SaslInterface; use longlang\phpkafka\Socket\SocketInterface; @@ -203,22 +199,10 @@ protected function sendAuthInfo(): void if (!isset($config['type']) || empty($config['type'])) { return; } - $class = new $config['type']($this->getConfig()); + $class = new $config['type']($this, $this->getConfig()); if (!$class instanceof SaslInterface) { return; } - $handshakeRequest = new SaslHandshakeRequest(); - $handshakeRequest->setMechanism($class->getName()); - $correlationId = $this->send($handshakeRequest); - /** @var SaslHandshakeResponse $handshakeResponse */ - $handshakeResponse = $this->recv($correlationId); - ErrorCode::check($handshakeResponse->getErrorCode()); - - $authenticateRequest = new SaslAuthenticateRequest(); - $authenticateRequest->setAuthBytes($class->getAuthBytes()); - $correlationId = $this->send($authenticateRequest); - /** @var SaslAuthenticateResponse $authenticateResponse */ - $authenticateResponse = $this->recv($correlationId); - ErrorCode::check($authenticateResponse->getErrorCode()); + $class->auth(); } } diff --git a/src/Sasl/PlainSasl.php b/src/Sasl/PlainSasl.php index 5f7c98b..4121177 100644 --- a/src/Sasl/PlainSasl.php +++ b/src/Sasl/PlainSasl.php @@ -4,8 +4,14 @@ namespace longlang\phpkafka\Sasl; +use longlang\phpkafka\Client\ClientInterface; use longlang\phpkafka\Config\CommonConfig; use longlang\phpkafka\Exception\KafkaErrorException; +use longlang\phpkafka\Protocol\ErrorCode; +use longlang\phpkafka\Protocol\SaslAuthenticate\SaslAuthenticateRequest; +use longlang\phpkafka\Protocol\SaslAuthenticate\SaslAuthenticateResponse; +use longlang\phpkafka\Protocol\SaslHandshake\SaslHandshakeRequest; +use longlang\phpkafka\Protocol\SaslHandshake\SaslHandshakeResponse; class PlainSasl implements SaslInterface { @@ -14,15 +20,37 @@ class PlainSasl implements SaslInterface */ protected $config; - public function __construct(CommonConfig $config) + /** + * @var ClientInterface + */ + protected $client; + + public function __construct(ClientInterface $client, CommonConfig $config) { + $this->client = $client; $this->config = $config; } + public function auth(): void { + $handshakeRequest = new SaslHandshakeRequest(); + $handshakeRequest->setMechanism($this->getName()); + $correlationId = $this->client->send($handshakeRequest); + /** @var SaslHandshakeResponse $handshakeResponse */ + $handshakeResponse = $this->client->recv($correlationId); + ErrorCode::check($handshakeResponse->getErrorCode()); + + $authenticateRequest = new SaslAuthenticateRequest(); + $authenticateRequest->setAuthBytes($this->getAuthBytes()); + $correlationId = $this->client->send($authenticateRequest); + /** @var SaslAuthenticateResponse $authenticateResponse */ + $authenticateResponse = $this->client->recv($correlationId); + ErrorCode::check($authenticateResponse->getErrorCode()); + } + /** * 授权模式. */ - public function getName(): string + protected function getName(): string { return 'PLAIN'; } @@ -30,7 +58,7 @@ public function getName(): string /** * 获得加密串. */ - public function getAuthBytes(): string + protected function getAuthBytes(): string { $config = $this->config->getSasl(); if (empty($config['username']) || empty($config['password'])) { diff --git a/src/Sasl/SaslInterface.php b/src/Sasl/SaslInterface.php index 68265a0..51bf689 100644 --- a/src/Sasl/SaslInterface.php +++ b/src/Sasl/SaslInterface.php @@ -4,19 +4,12 @@ namespace longlang\phpkafka\Sasl; +use longlang\phpkafka\Client\ClientInterface; use longlang\phpkafka\Config\CommonConfig; interface SaslInterface { - public function __construct(CommonConfig $config); + public function __construct(ClientInterface $client, CommonConfig $config); - /** - * 获得授权名称. - */ - public function getName(): string; - - /** - * 返回授权信息. - */ - public function getAuthBytes(): string; + public function auth(): void; }