From 4da98ab3e2c66e5f3a8a43e65deef5ae6119e3f5 Mon Sep 17 00:00:00 2001 From: Iurii Malchenko Date: Sat, 25 Nov 2023 03:31:42 +0100 Subject: [PATCH] ws client: handle io errors (connection closed), tweak the return type of `PodsApi#execStream`, minor refactoring --- .../kubernetes/client/KubernetesClient.scala | 3 +- .../kubernetes/client/api/PodsApi.scala | 134 +++++++++--------- .../goyeau/kubernetes/client/api/RawApi.scala | 15 +- .../kubernetes/client/api/RawApiTest.scala | 2 - .../client/operation/WatchableTests.scala | 1 - 5 files changed, 74 insertions(+), 81 deletions(-) diff --git a/kubernetes-client/src/com/goyeau/kubernetes/client/KubernetesClient.scala b/kubernetes-client/src/com/goyeau/kubernetes/client/KubernetesClient.scala index cfe3b16..4adbb2e 100644 --- a/kubernetes-client/src/com/goyeau/kubernetes/client/KubernetesClient.scala +++ b/kubernetes-client/src/com/goyeau/kubernetes/client/KubernetesClient.scala @@ -9,10 +9,9 @@ import com.goyeau.kubernetes.client.util.SslContexts import com.goyeau.kubernetes.client.util.cache.{AuthorizationParse, ExecToken} import io.circe.{Decoder, Encoder} import org.http4s.client.Client -import org.http4s.client.middleware.{RequestLogger, ResponseLogger} import org.http4s.headers.Authorization import org.http4s.jdkhttpclient.{JdkHttpClient, JdkWSClient} -import org.http4s.client.websocket.{WSClient, WSClientHighLevel, WSConnection, WSConnectionHighLevel, WSRequest} +import org.http4s.client.websocket.WSClient import org.typelevel.log4cats.Logger import java.net.http.HttpClient diff --git a/kubernetes-client/src/com/goyeau/kubernetes/client/api/PodsApi.scala b/kubernetes-client/src/com/goyeau/kubernetes/client/api/PodsApi.scala index 766de60..a619065 100644 --- a/kubernetes-client/src/com/goyeau/kubernetes/client/api/PodsApi.scala +++ b/kubernetes-client/src/com/goyeau/kubernetes/client/api/PodsApi.scala @@ -1,6 +1,7 @@ package com.goyeau.kubernetes.client.api import cats.effect.{Async, Resource} +import cats.effect.syntax.all.* import cats.syntax.all.* import com.goyeau.kubernetes.client.KubeConfig import com.goyeau.kubernetes.client.api.ExecRouting.* @@ -142,24 +143,19 @@ private[client] class NamespacedPodsApi[F[_]]( F.ref(List.empty[StdErr]), F.ref(none[ErrorOrStatus]) ).tupled.flatMap { case (stdErr, errorOrStatus) => - execRequest(podName, Seq("sh", "-c", s"cat ${sourceFile.toString}"), container).flatMap { request => - wsClient.connectHighLevel(request).use { connection => - connection.receiveStream - .through(processWebSocketData) - .evalMapFilter[F, Chunk[Byte]] { - case Left(StdOut(data)) => Chunk.array(data).some.pure[F] - case Left(e: StdErr) => stdErr.update(e :: _).as(None) - case Right(statusOrError) => errorOrStatus.update(_.orElse(statusOrError.some)).as(None) - } - .unchunks - .through(Files[F].writeAll(destinationFile)) - .compile - .drain - .flatMap { _ => - (stdErr.get.map(_.reverse), errorOrStatus.get).tupled - } + execStream(podName, container, Seq("sh", "-c", s"cat ${sourceFile.toString}")) + .evalMapFilter[F, Chunk[Byte]] { + case Left(StdOut(data)) => Chunk.array(data).some.pure[F] + case Left(e: StdErr) => stdErr.update(e :: _).as(None) + case Right(statusOrError) => errorOrStatus.update(_.orElse(statusOrError.some)).as(None) + } + .unchunks + .through(Files[F].writeAll(destinationFile)) + .compile + .drain + .flatMap { _ => + (stdErr.get.map(_.reverse), errorOrStatus.get).tupled } - } } @deprecated("Use upload() which uses fs2.io.file.Path", "0.8.2") @@ -179,16 +175,7 @@ private[client] class NamespacedPodsApi[F[_]]( ): F[(List[StdErr], Option[ErrorOrStatus])] = { val mkDirResult = destinationFile.parent match { case Some(dir) => - execRequest( - podName, - Seq("sh", "-c", s"mkdir -p $dir"), - container - ).flatMap { mkDirRequest => - wsClient - .connectHighLevel(mkDirRequest) - .map(conn => F.delay(conn.receiveStream.through(processWebSocketData))) - .use(_.flatMap(foldErrorStream)) - } + foldErrorStream(execStream(podName, container, Seq("sh", "-c", s"mkdir -p $dir"))) case None => (List.empty -> None).pure } @@ -201,37 +188,34 @@ private[client] class NamespacedPodsApi[F[_]]( ) val uploadFileResult = - uploadRequest.flatMap { uploadRequest => - wsClient.connectHighLevel(uploadRequest).use { connection => - val source = Files[F].readAll(sourceFile, 4096, Flags.Read) - val sendData = source - .mapChunks(chunk => Chunk(WSFrame.Binary(ByteVector(chunk.toChain.prepend(StdInId).toVector)))) - .through(connection.sendPipe) - val retryAttempts = 5 - val sendWithRetry = Stream - .retry(sendData.compile.drain, delay = 500.millis, nextDelay = _ * 2, maxAttempts = retryAttempts) - .onError { case e => - Stream.eval(Logger[F].error(e)(s"Failed send file data after $retryAttempts attempts")) - } - - val result = for { - signal <- Stream.eval(SignallingRef[F, Boolean](false)) - dataStream = sendWithRetry *> Stream.eval(signal.set(true)) - - output = connection.receiveStream - .through( - processWebSocketData - ) - .interruptWhen(signal) - .concurrently(dataStream) - - errors = foldErrorStream( - output - ).map { case (errors, _) => errors } - } yield errors - - result.compile.lastOrError.flatten - } + uploadRequest.toResource.flatMap(wsClient.connectHighLevel).use { connection => + val source = Files[F].readAll(sourceFile, 4096, Flags.Read) + val sendData = source + .mapChunks(chunk => Chunk(WSFrame.Binary(ByteVector(chunk.toChain.prepend(StdInId).toVector)))) + .through(connection.sendPipe) + val retryAttempts = 5 + val sendWithRetry = Stream + .retry(sendData.compile.drain, delay = 500.millis, nextDelay = _ * 2, maxAttempts = retryAttempts) + .onError { case e => + Stream.eval(Logger[F].error(e)(s"Failed send file data after $retryAttempts attempts")) + } + + val result = for { + signal <- Stream.eval(SignallingRef[F, Boolean](false)) + dataStream = sendWithRetry *> Stream.eval(signal.set(true)) + + output = connection.receiveStream + .through(skipConnectionClosedErrors) + .through(processWebSocketData) + .interruptWhen(signal) + .concurrently(dataStream) + + errors = foldErrorStream( + output + ).map { case (errors, _) => errors } + } yield errors + + result.compile.lastOrError.flatten } for { @@ -249,12 +233,15 @@ private[client] class NamespacedPodsApi[F[_]]( stdout: Boolean = true, stderr: Boolean = true, tty: Boolean = false - ): Resource[F, F[Stream[F, Either[ExecStream, ErrorOrStatus]]]] = - Resource.eval(execRequest(podName, command, container, stdin, stdout, stderr, tty)).flatMap { request => - wsClient.connectHighLevel(request).map { connection => - F.delay(connection.receiveStream.through(processWebSocketData)) + ): Stream[F, Either[ExecStream, ErrorOrStatus]] = + Stream + .eval(execRequest(podName, command, container, stdin, stdout, stderr, tty)) + .flatMap(request => Stream.resource(wsClient.connectHighLevel(request))) + .flatMap { connection => + connection.receiveStream + .through(skipConnectionClosedErrors) + .through(processWebSocketData) } - } def exec( podName: String, @@ -265,11 +252,26 @@ private[client] class NamespacedPodsApi[F[_]]( stderr: Boolean = true, tty: Boolean = false ): F[(List[ExecStream], Option[ErrorOrStatus])] = - execStream(podName, container, command, stdin, stdout, stderr, tty).use(_.flatMap(foldStream)) + foldStream(execStream(podName, container, command, stdin, stdout, stderr, tty)) + + private def skipConnectionClosedErrors: Pipe[F, WSDataFrame, WSDataFrame] = + _.map(_.some) + .recover { + // Need to handle (and ignore) this exception + // + // Because of the "conflict" between the http4s WS client and + // the underlying JDK WS client (which are both high-level clients) + // an extra "Close" frame gets sent to the server, potentially + // after the TCP connection is closed, which causes this exception. + // + // This will be solved in a later version of the http4s (core or jdk). + case e: java.io.IOException if e.getMessage == "closed output" => none + } + .unNone private def foldStream( stdoutStream: Stream[F, Either[ExecStream, ErrorOrStatus]] - ) = + ): F[(List[ExecStream], Option[ErrorOrStatus])] = stdoutStream.compile.fold((List.empty[ExecStream], none[ErrorOrStatus])) { case ((accEvents, accStatus), data) => data match { case Left(event) => @@ -281,7 +283,7 @@ private[client] class NamespacedPodsApi[F[_]]( private def foldErrorStream( stdoutStream: Stream[F, Either[ExecStream, ErrorOrStatus]] - ) = + ): F[(List[StdErr], Option[ErrorOrStatus])] = stdoutStream.compile.fold((List.empty[StdErr], none[ErrorOrStatus])) { case ((accEvents, accStatus), data) => data match { case Left(event) => diff --git a/kubernetes-client/src/com/goyeau/kubernetes/client/api/RawApi.scala b/kubernetes-client/src/com/goyeau/kubernetes/client/api/RawApi.scala index ebb6b52..89012b2 100644 --- a/kubernetes-client/src/com/goyeau/kubernetes/client/api/RawApi.scala +++ b/kubernetes-client/src/com/goyeau/kubernetes/client/api/RawApi.scala @@ -6,7 +6,7 @@ import com.goyeau.kubernetes.client.KubeConfig import com.goyeau.kubernetes.client.operation.* import org.http4s.client.Client import org.http4s.headers.Authorization -import org.http4s.jdkhttpclient.{WSClient, WSConnectionHighLevel, WSRequest} +import org.http4s.client.websocket.{WSClient, WSConnectionHighLevel, WSRequest} import org.http4s.{Request, Response} private[client] class RawApi[F[_]]( @@ -19,14 +19,9 @@ private[client] class RawApi[F[_]]( def runRequest( request: Request[F] ): Resource[F, Response[F]] = - Request[F]( - method = request.method, - uri = config.server.resolve(request.uri), - httpVersion = request.httpVersion, - headers = request.headers, - body = request.body, - attributes = request.attributes - ).withOptionalAuthorization(authorization) + request + .withUri(config.server.resolve(request.uri)) + .withOptionalAuthorization(authorization) .toResource .flatMap(httpClient.run) @@ -34,7 +29,7 @@ private[client] class RawApi[F[_]]( request: WSRequest ): Resource[F, WSConnectionHighLevel[F]] = request - .copy(uri = config.server.resolve(request.uri)) + .withUri(config.server.resolve(request.uri)) .withOptionalAuthorization(authorization) .toResource .flatMap { request => diff --git a/kubernetes-client/test/src/com/goyeau/kubernetes/client/api/RawApiTest.scala b/kubernetes-client/test/src/com/goyeau/kubernetes/client/api/RawApiTest.scala index cb082a5..967da58 100644 --- a/kubernetes-client/test/src/com/goyeau/kubernetes/client/api/RawApiTest.scala +++ b/kubernetes-client/test/src/com/goyeau/kubernetes/client/api/RawApiTest.scala @@ -18,9 +18,7 @@ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import java.nio.file.Files as JFiles -import scala.util.Random import org.http4s.implicits.* -import org.http4s.jdkhttpclient.WSConnectionHighLevel class RawApiTest extends FunSuite with MinikubeClientProvider[IO] with ContextProvider { diff --git a/kubernetes-client/test/src/com/goyeau/kubernetes/client/operation/WatchableTests.scala b/kubernetes-client/test/src/com/goyeau/kubernetes/client/operation/WatchableTests.scala index bccbcb8..16b1a2d 100644 --- a/kubernetes-client/test/src/com/goyeau/kubernetes/client/operation/WatchableTests.scala +++ b/kubernetes-client/test/src/com/goyeau/kubernetes/client/operation/WatchableTests.scala @@ -7,7 +7,6 @@ import com.goyeau.kubernetes.client.Utils.retry import com.goyeau.kubernetes.client.api.CustomResourceDefinitionsApiTest import com.goyeau.kubernetes.client.{EventType, KubernetesClient, WatchEvent} import fs2.concurrent.SignallingRef -import fs2.{Pipe, Stream} import io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta import munit.FunSuite import org.http4s.Status