Skip to content

Commit

Permalink
ws client: handle io errors (connection closed), tweak the return typ…
Browse files Browse the repository at this point in the history
…e of `PodsApi#execStream`, minor refactoring
  • Loading branch information
yurique committed Nov 25, 2023
1 parent f46000e commit 4da98ab
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import com.goyeau.kubernetes.client.util.SslContexts
import com.goyeau.kubernetes.client.util.cache.{AuthorizationParse, ExecToken}
import io.circe.{Decoder, Encoder}
import org.http4s.client.Client
import org.http4s.client.middleware.{RequestLogger, ResponseLogger}
import org.http4s.headers.Authorization
import org.http4s.jdkhttpclient.{JdkHttpClient, JdkWSClient}
import org.http4s.client.websocket.{WSClient, WSClientHighLevel, WSConnection, WSConnectionHighLevel, WSRequest}
import org.http4s.client.websocket.WSClient
import org.typelevel.log4cats.Logger

import java.net.http.HttpClient
Expand Down
134 changes: 68 additions & 66 deletions kubernetes-client/src/com/goyeau/kubernetes/client/api/PodsApi.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.goyeau.kubernetes.client.api

import cats.effect.{Async, Resource}
import cats.effect.syntax.all.*
import cats.syntax.all.*
import com.goyeau.kubernetes.client.KubeConfig
import com.goyeau.kubernetes.client.api.ExecRouting.*
Expand Down Expand Up @@ -142,24 +143,19 @@ private[client] class NamespacedPodsApi[F[_]](
F.ref(List.empty[StdErr]),
F.ref(none[ErrorOrStatus])
).tupled.flatMap { case (stdErr, errorOrStatus) =>
execRequest(podName, Seq("sh", "-c", s"cat ${sourceFile.toString}"), container).flatMap { request =>
wsClient.connectHighLevel(request).use { connection =>
connection.receiveStream
.through(processWebSocketData)
.evalMapFilter[F, Chunk[Byte]] {
case Left(StdOut(data)) => Chunk.array(data).some.pure[F]
case Left(e: StdErr) => stdErr.update(e :: _).as(None)
case Right(statusOrError) => errorOrStatus.update(_.orElse(statusOrError.some)).as(None)
}
.unchunks
.through(Files[F].writeAll(destinationFile))
.compile
.drain
.flatMap { _ =>
(stdErr.get.map(_.reverse), errorOrStatus.get).tupled
}
execStream(podName, container, Seq("sh", "-c", s"cat ${sourceFile.toString}"))
.evalMapFilter[F, Chunk[Byte]] {
case Left(StdOut(data)) => Chunk.array(data).some.pure[F]
case Left(e: StdErr) => stdErr.update(e :: _).as(None)
case Right(statusOrError) => errorOrStatus.update(_.orElse(statusOrError.some)).as(None)
}
.unchunks
.through(Files[F].writeAll(destinationFile))
.compile
.drain
.flatMap { _ =>
(stdErr.get.map(_.reverse), errorOrStatus.get).tupled
}
}
}

@deprecated("Use upload() which uses fs2.io.file.Path", "0.8.2")
Expand All @@ -179,16 +175,7 @@ private[client] class NamespacedPodsApi[F[_]](
): F[(List[StdErr], Option[ErrorOrStatus])] = {
val mkDirResult = destinationFile.parent match {
case Some(dir) =>
execRequest(
podName,
Seq("sh", "-c", s"mkdir -p $dir"),
container
).flatMap { mkDirRequest =>
wsClient
.connectHighLevel(mkDirRequest)
.map(conn => F.delay(conn.receiveStream.through(processWebSocketData)))
.use(_.flatMap(foldErrorStream))
}
foldErrorStream(execStream(podName, container, Seq("sh", "-c", s"mkdir -p $dir")))
case None =>
(List.empty -> None).pure
}
Expand All @@ -201,37 +188,34 @@ private[client] class NamespacedPodsApi[F[_]](
)

val uploadFileResult =
uploadRequest.flatMap { uploadRequest =>
wsClient.connectHighLevel(uploadRequest).use { connection =>
val source = Files[F].readAll(sourceFile, 4096, Flags.Read)
val sendData = source
.mapChunks(chunk => Chunk(WSFrame.Binary(ByteVector(chunk.toChain.prepend(StdInId).toVector))))
.through(connection.sendPipe)
val retryAttempts = 5
val sendWithRetry = Stream
.retry(sendData.compile.drain, delay = 500.millis, nextDelay = _ * 2, maxAttempts = retryAttempts)
.onError { case e =>
Stream.eval(Logger[F].error(e)(s"Failed send file data after $retryAttempts attempts"))
}

val result = for {
signal <- Stream.eval(SignallingRef[F, Boolean](false))
dataStream = sendWithRetry *> Stream.eval(signal.set(true))

output = connection.receiveStream
.through(
processWebSocketData
)
.interruptWhen(signal)
.concurrently(dataStream)

errors = foldErrorStream(
output
).map { case (errors, _) => errors }
} yield errors

result.compile.lastOrError.flatten
}
uploadRequest.toResource.flatMap(wsClient.connectHighLevel).use { connection =>
val source = Files[F].readAll(sourceFile, 4096, Flags.Read)
val sendData = source
.mapChunks(chunk => Chunk(WSFrame.Binary(ByteVector(chunk.toChain.prepend(StdInId).toVector))))
.through(connection.sendPipe)
val retryAttempts = 5
val sendWithRetry = Stream
.retry(sendData.compile.drain, delay = 500.millis, nextDelay = _ * 2, maxAttempts = retryAttempts)
.onError { case e =>
Stream.eval(Logger[F].error(e)(s"Failed send file data after $retryAttempts attempts"))
}

val result = for {
signal <- Stream.eval(SignallingRef[F, Boolean](false))
dataStream = sendWithRetry *> Stream.eval(signal.set(true))

output = connection.receiveStream
.through(skipConnectionClosedErrors)
.through(processWebSocketData)
.interruptWhen(signal)
.concurrently(dataStream)

errors = foldErrorStream(
output
).map { case (errors, _) => errors }
} yield errors

result.compile.lastOrError.flatten
}

for {
Expand All @@ -249,12 +233,15 @@ private[client] class NamespacedPodsApi[F[_]](
stdout: Boolean = true,
stderr: Boolean = true,
tty: Boolean = false
): Resource[F, F[Stream[F, Either[ExecStream, ErrorOrStatus]]]] =
Resource.eval(execRequest(podName, command, container, stdin, stdout, stderr, tty)).flatMap { request =>
wsClient.connectHighLevel(request).map { connection =>
F.delay(connection.receiveStream.through(processWebSocketData))
): Stream[F, Either[ExecStream, ErrorOrStatus]] =
Stream
.eval(execRequest(podName, command, container, stdin, stdout, stderr, tty))
.flatMap(request => Stream.resource(wsClient.connectHighLevel(request)))
.flatMap { connection =>
connection.receiveStream
.through(skipConnectionClosedErrors)
.through(processWebSocketData)
}
}

def exec(
podName: String,
Expand All @@ -265,11 +252,26 @@ private[client] class NamespacedPodsApi[F[_]](
stderr: Boolean = true,
tty: Boolean = false
): F[(List[ExecStream], Option[ErrorOrStatus])] =
execStream(podName, container, command, stdin, stdout, stderr, tty).use(_.flatMap(foldStream))
foldStream(execStream(podName, container, command, stdin, stdout, stderr, tty))

private def skipConnectionClosedErrors: Pipe[F, WSDataFrame, WSDataFrame] =
_.map(_.some)
.recover {
// Need to handle (and ignore) this exception
//
// Because of the "conflict" between the http4s WS client and
// the underlying JDK WS client (which are both high-level clients)
// an extra "Close" frame gets sent to the server, potentially
// after the TCP connection is closed, which causes this exception.
//
// This will be solved in a later version of the http4s (core or jdk).
case e: java.io.IOException if e.getMessage == "closed output" => none
}
.unNone

private def foldStream(
stdoutStream: Stream[F, Either[ExecStream, ErrorOrStatus]]
) =
): F[(List[ExecStream], Option[ErrorOrStatus])] =
stdoutStream.compile.fold((List.empty[ExecStream], none[ErrorOrStatus])) { case ((accEvents, accStatus), data) =>
data match {
case Left(event) =>
Expand All @@ -281,7 +283,7 @@ private[client] class NamespacedPodsApi[F[_]](

private def foldErrorStream(
stdoutStream: Stream[F, Either[ExecStream, ErrorOrStatus]]
) =
): F[(List[StdErr], Option[ErrorOrStatus])] =
stdoutStream.compile.fold((List.empty[StdErr], none[ErrorOrStatus])) { case ((accEvents, accStatus), data) =>
data match {
case Left(event) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.goyeau.kubernetes.client.KubeConfig
import com.goyeau.kubernetes.client.operation.*
import org.http4s.client.Client
import org.http4s.headers.Authorization
import org.http4s.jdkhttpclient.{WSClient, WSConnectionHighLevel, WSRequest}
import org.http4s.client.websocket.{WSClient, WSConnectionHighLevel, WSRequest}
import org.http4s.{Request, Response}

private[client] class RawApi[F[_]](
Expand All @@ -19,22 +19,17 @@ private[client] class RawApi[F[_]](
def runRequest(
request: Request[F]
): Resource[F, Response[F]] =
Request[F](
method = request.method,
uri = config.server.resolve(request.uri),
httpVersion = request.httpVersion,
headers = request.headers,
body = request.body,
attributes = request.attributes
).withOptionalAuthorization(authorization)
request
.withUri(config.server.resolve(request.uri))
.withOptionalAuthorization(authorization)
.toResource
.flatMap(httpClient.run)

def connectWS(
request: WSRequest
): Resource[F, WSConnectionHighLevel[F]] =
request
.copy(uri = config.server.resolve(request.uri))
.withUri(config.server.resolve(request.uri))
.withOptionalAuthorization(authorization)
.toResource
.flatMap { request =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.nio.file.Files as JFiles
import scala.util.Random
import org.http4s.implicits.*
import org.http4s.jdkhttpclient.WSConnectionHighLevel

class RawApiTest extends FunSuite with MinikubeClientProvider[IO] with ContextProvider {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import com.goyeau.kubernetes.client.Utils.retry
import com.goyeau.kubernetes.client.api.CustomResourceDefinitionsApiTest
import com.goyeau.kubernetes.client.{EventType, KubernetesClient, WatchEvent}
import fs2.concurrent.SignallingRef
import fs2.{Pipe, Stream}
import io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta
import munit.FunSuite
import org.http4s.Status
Expand Down

0 comments on commit 4da98ab

Please sign in to comment.