Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgraded to the jdk client version 0.10.0 #139

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import cats.data.OptionT
import cats.effect.*
import com.goyeau.kubernetes.client.api.*
import com.goyeau.kubernetes.client.crd.{CrdContext, CustomResource, CustomResourceList}
import com.goyeau.kubernetes.client.operation.*
import com.goyeau.kubernetes.client.util.SslContexts
import com.goyeau.kubernetes.client.util.cache.{AuthorizationParse, ExecToken}
import io.circe.{Decoder, Encoder}
import org.http4s.Request
import org.http4s.client.Client
import org.http4s.headers.Authorization
import org.http4s.jdkhttpclient.{JdkHttpClient, JdkWSClient, WSClient}
import org.http4s.jdkhttpclient.{JdkHttpClient, JdkWSClient}
import org.http4s.client.websocket.{WSClient, WSRequest}
import org.typelevel.log4cats.Logger

import java.net.http.HttpClient

class KubernetesClient[F[_]: Async: Logger](
Expand Down Expand Up @@ -72,8 +74,8 @@ object KubernetesClient {
client <- Resource.eval {
Sync[F].delay(HttpClient.newBuilder().sslContext(SslContexts.fromConfig(config)).build())
}
httpClient <- JdkHttpClient[F](client)
wsClient <- JdkWSClient[F](client)
httpClient = JdkHttpClient[F](client)
wsClient = JdkWSClient[F](client)
authorization <- Resource.eval {
OptionT
.fromOption(config.authorization)
Expand Down
151 changes: 77 additions & 74 deletions kubernetes-client/src/com/goyeau/kubernetes/client/api/PodsApi.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.goyeau.kubernetes.client.api

import cats.effect.{Async, Resource}
import cats.effect.Async
import cats.effect.syntax.all.*
import cats.syntax.all.*
import com.goyeau.kubernetes.client.KubeConfig
import com.goyeau.kubernetes.client.api.ExecRouting.*
Expand All @@ -18,13 +19,16 @@ import org.http4s.*
import org.http4s.client.Client
import org.http4s.headers.Authorization
import org.http4s.implicits.*
import org.http4s.jdkhttpclient.*
import org.http4s.client.websocket.WSClient
import org.http4s.client.websocket.WSRequest
import org.typelevel.ci.CIString
import org.typelevel.log4cats.Logger
import scodec.bits.ByteVector

import java.nio.file.Path as JPath
import scala.concurrent.duration.DurationInt
import org.http4s.client.websocket.WSFrame
import org.http4s.client.websocket.WSDataFrame

private[client] class PodsApi[F[_]: Logger](
val httpClient: Client[F],
Expand Down Expand Up @@ -113,13 +117,11 @@ private[client] class NamespacedPodsApi[F[_]](
("container" -> container) ++?
("command" -> commands)

WSRequest(uri, method = Method.POST)
WSRequest(uri, Headers.empty, Method.POST)
.withOptionalAuthorization(authorization)
.map { r =>
r.copy(
headers = r.headers.put(Header.Raw(CIString("Sec-WebSocket-Protocol"), "v4.channel.k8s.io"))
)
}
.map(r =>
r.withHeaders(headers = r.headers.put(Header.Raw(CIString("Sec-WebSocket-Protocol"), "v4.channel.k8s.io")))
)
}

@deprecated("Use download() which uses fs2.io.file.Path", "0.8.2")
Expand All @@ -141,24 +143,19 @@ private[client] class NamespacedPodsApi[F[_]](
F.ref(List.empty[StdErr]),
F.ref(none[ErrorOrStatus])
).tupled.flatMap { case (stdErr, errorOrStatus) =>
execRequest(podName, Seq("sh", "-c", s"cat ${sourceFile.toString}"), container).flatMap { request =>
wsClient.connectHighLevel(request).use { connection =>
connection.receiveStream
.through(processWebSocketData)
.evalMapFilter[F, Chunk[Byte]] {
case Left(StdOut(data)) => Chunk.array(data).some.pure[F]
case Left(e: StdErr) => stdErr.update(e :: _).as(None)
case Right(statusOrError) => errorOrStatus.update(_.orElse(statusOrError.some)).as(None)
}
.unchunks
.through(Files[F].writeAll(destinationFile))
.compile
.drain
.flatMap { _ =>
(stdErr.get.map(_.reverse), errorOrStatus.get).tupled
}
execStream(podName, container, Seq("sh", "-c", s"cat ${sourceFile.toString}"))
.evalMapFilter[F, Chunk[Byte]] {
case Left(StdOut(data)) => Chunk.array(data).some.pure[F]
case Left(e: StdErr) => stdErr.update(e :: _).as(None)
case Right(statusOrError) => errorOrStatus.update(_.orElse(statusOrError.some)).as(None)
}
.unchunks
.through(Files[F].writeAll(destinationFile))
.compile
.drain
.flatMap { _ =>
(stdErr.get.map(_.reverse), errorOrStatus.get).tupled
}
}
}

@deprecated("Use upload() which uses fs2.io.file.Path", "0.8.2")
Expand All @@ -178,16 +175,7 @@ private[client] class NamespacedPodsApi[F[_]](
): F[(List[StdErr], Option[ErrorOrStatus])] = {
val mkDirResult = destinationFile.parent match {
case Some(dir) =>
execRequest(
podName,
Seq("sh", "-c", s"mkdir -p $dir"),
container
).flatMap { mkDirRequest =>
wsClient
.connectHighLevel(mkDirRequest)
.map(conn => F.delay(conn.receiveStream.through(processWebSocketData)))
.use(_.flatMap(foldErrorStream))
}
foldErrorStream(execStream(podName, container, Seq("sh", "-c", s"mkdir -p $dir")))
case None =>
(List.empty -> None).pure
}
Expand All @@ -200,37 +188,34 @@ private[client] class NamespacedPodsApi[F[_]](
)

val uploadFileResult =
uploadRequest.flatMap { uploadRequest =>
wsClient.connectHighLevel(uploadRequest).use { connection =>
val source = Files[F].readAll(sourceFile, 4096, Flags.Read)
val sendData = source
.mapChunks(chunk => Chunk(WSFrame.Binary(ByteVector(chunk.toChain.prepend(StdInId).toVector))))
.through(connection.sendPipe)
val retryAttempts = 5
val sendWithRetry = Stream
.retry(sendData.compile.drain, delay = 500.millis, nextDelay = _ * 2, maxAttempts = retryAttempts)
.onError { case e =>
Stream.eval(Logger[F].error(e)(s"Failed send file data after $retryAttempts attempts"))
}

val result = for {
signal <- Stream.eval(SignallingRef[F, Boolean](false))
dataStream = sendWithRetry *> Stream.eval(signal.set(true))

output = connection.receiveStream
.through(
processWebSocketData
)
.interruptWhen(signal)
.concurrently(dataStream)

errors = foldErrorStream(
output
).map { case (errors, _) => errors }
} yield errors

result.compile.lastOrError.flatten
}
uploadRequest.toResource.flatMap(wsClient.connectHighLevel).use { connection =>
val source = Files[F].readAll(sourceFile, 4096, Flags.Read)
val sendData = source
.mapChunks(chunk => Chunk(WSFrame.Binary(ByteVector(chunk.toChain.prepend(StdInId).toVector))))
.through(connection.sendPipe)
val retryAttempts = 5
val sendWithRetry = Stream
.retry(sendData.compile.drain, delay = 500.millis, nextDelay = _ * 2, maxAttempts = retryAttempts)
.onError { case e =>
Stream.eval(Logger[F].error(e)(s"Failed send file data after $retryAttempts attempts"))
}

val result = for {
signal <- Stream.eval(SignallingRef[F, Boolean](false))
dataStream = sendWithRetry *> Stream.eval(signal.set(true))

output = connection.receiveStream
.through(skipConnectionClosedErrors)
.through(processWebSocketData)
.interruptWhen(signal)
.concurrently(dataStream)

errors = foldErrorStream(
output
).map { case (errors, _) => errors }
} yield errors

result.compile.lastOrError.flatten
}

for {
Expand All @@ -248,12 +233,15 @@ private[client] class NamespacedPodsApi[F[_]](
stdout: Boolean = true,
stderr: Boolean = true,
tty: Boolean = false
): Resource[F, F[Stream[F, Either[ExecStream, ErrorOrStatus]]]] =
Resource.eval(execRequest(podName, command, container, stdin, stdout, stderr, tty)).flatMap { request =>
wsClient.connectHighLevel(request).map { connection =>
F.delay(connection.receiveStream.through(processWebSocketData))
): Stream[F, Either[ExecStream, ErrorOrStatus]] =
Stream
.eval(execRequest(podName, command, container, stdin, stdout, stderr, tty))
.flatMap(request => Stream.resource(wsClient.connectHighLevel(request)))
.flatMap { connection =>
connection.receiveStream
.through(skipConnectionClosedErrors)
.through(processWebSocketData)
}
}

def exec(
podName: String,
Expand All @@ -264,11 +252,26 @@ private[client] class NamespacedPodsApi[F[_]](
stderr: Boolean = true,
tty: Boolean = false
): F[(List[ExecStream], Option[ErrorOrStatus])] =
execStream(podName, container, command, stdin, stdout, stderr, tty).use(_.flatMap(foldStream))
foldStream(execStream(podName, container, command, stdin, stdout, stderr, tty))

private def skipConnectionClosedErrors: Pipe[F, WSDataFrame, WSDataFrame] =
_.map(_.some)
.recover {
// Need to handle (and ignore) this exception
//
// Because of the "conflict" between the http4s WS client and
// the underlying JDK WS client (which are both high-level clients)
// an extra "Close" frame gets sent to the server, potentially
// after the TCP connection is closed, which causes this exception.
//
// This will be solved in a later version of the http4s (core or jdk).
case e: java.io.IOException if e.getMessage == "closed output" => none
}
.unNone

private def foldStream(
stdoutStream: Stream[F, Either[ExecStream, ErrorOrStatus]]
) =
): F[(List[ExecStream], Option[ErrorOrStatus])] =
stdoutStream.compile.fold((List.empty[ExecStream], none[ErrorOrStatus])) { case ((accEvents, accStatus), data) =>
data match {
case Left(event) =>
Expand All @@ -280,7 +283,7 @@ private[client] class NamespacedPodsApi[F[_]](

private def foldErrorStream(
stdoutStream: Stream[F, Either[ExecStream, ErrorOrStatus]]
) =
): F[(List[StdErr], Option[ErrorOrStatus])] =
stdoutStream.compile.fold((List.empty[StdErr], none[ErrorOrStatus])) { case ((accEvents, accStatus), data) =>
data match {
case Left(event) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.goyeau.kubernetes.client.KubeConfig
import com.goyeau.kubernetes.client.operation.*
import org.http4s.client.Client
import org.http4s.headers.Authorization
import org.http4s.jdkhttpclient.{WSClient, WSConnectionHighLevel, WSRequest}
import org.http4s.client.websocket.{WSClient, WSConnectionHighLevel, WSRequest}
import org.http4s.{Request, Response}

private[client] class RawApi[F[_]](
Expand All @@ -19,22 +19,17 @@ private[client] class RawApi[F[_]](
def runRequest(
request: Request[F]
): Resource[F, Response[F]] =
Request[F](
method = request.method,
uri = config.server.resolve(request.uri),
httpVersion = request.httpVersion,
headers = request.headers,
body = request.body,
attributes = request.attributes
).withOptionalAuthorization(authorization)
request
.withUri(config.server.resolve(request.uri))
.withOptionalAuthorization(authorization)
.toResource
.flatMap(httpClient.run)

def connectWS(
request: WSRequest
): Resource[F, WSConnectionHighLevel[F]] =
request
.copy(uri = config.server.resolve(request.uri))
.withUri(config.server.resolve(request.uri))
.withOptionalAuthorization(authorization)
.toResource
.flatMap { request =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import cats.syntax.all.*
import cats.{Applicative, FlatMap}
import org.http4s.client.Client
import org.http4s.headers.Authorization
import org.http4s.jdkhttpclient.WSRequest
import org.http4s.client.websocket.WSRequest
import org.http4s.{EntityDecoder, Request, Response}

package object operation {
Expand All @@ -21,9 +21,7 @@ package object operation {
implicit private[client] class KubernetesWsRequestOps[F[_]: Applicative](request: WSRequest) {
def withOptionalAuthorization(authorization: Option[F[Authorization]]): F[WSRequest] =
authorization.fold(request.pure[F]) { authorization =>
authorization.map { auth =>
request.copy(headers = request.headers.put(auth))
}
authorization.map(auth => request.withHeaders(request.headers.put(auth)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,18 @@ object SslContexts {
val _ = for {
keyStream <- keyDataStream.orElse(keyFileStream)
certStream <- certDataStream.orElse(certFileStream)
} yield {
Security.addProvider(new BouncyCastleProvider())
val pemKeyPair =
new PEMParser(new InputStreamReader(keyStream)).readObject().asInstanceOf[PEMKeyPair]
val privateKey = new JcaPEMKeyConverter().setProvider("BC").getPrivateKey(pemKeyPair.getPrivateKeyInfo)

val certificateFactory = CertificateFactory.getInstance("X509")
val certificate = certificateFactory.generateCertificate(certStream).asInstanceOf[X509Certificate]

defaultKeyStore.setKeyEntry(
certificate.getSubjectX500Principal.getName,
privateKey,
config.clientKeyPass.fold(Array.empty[Char])(_.toCharArray),
Array(certificate)
)
}
_ = Security.addProvider(new BouncyCastleProvider())
pemKeyPair = new PEMParser(new InputStreamReader(keyStream)).readObject().asInstanceOf[PEMKeyPair]
privateKey = new JcaPEMKeyConverter().setProvider("BC").getPrivateKey(pemKeyPair.getPrivateKeyInfo)

certificateFactory = CertificateFactory.getInstance("X509")
certificate = certificateFactory.generateCertificate(certStream).asInstanceOf[X509Certificate]
} yield defaultKeyStore.setKeyEntry(
certificate.getSubjectX500Principal.getName,
privateKey,
config.clientKeyPass.fold(Array.empty[Char])(_.toCharArray),
Array(certificate)
)

val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
keyManagerFactory.init(defaultKeyStore, Array.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.http4s.{Request, Status}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.nio.file.Files as JFiles
import org.http4s.implicits.*

class RawApiTest extends FunSuite with MinikubeClientProvider[IO] with ContextProvider {
Expand Down
Loading