Skip to content

Commit

Permalink
Upgrade http4s to 0.23.30 and http4s-jdk-http-client to 0.10.0
Browse files Browse the repository at this point in the history
  • Loading branch information
joan38 committed Feb 3, 2025
1 parent 78f6a48 commit 920a0b5
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import com.goyeau.kubernetes.client.util.cache.{AuthorizationParse, ExecToken}
import io.circe.{Decoder, Encoder}
import org.http4s.client.Client
import org.http4s.headers.Authorization
import org.http4s.jdkhttpclient.{JdkHttpClient, JdkWSClient, WSClient}
import org.http4s.jdkhttpclient.{JdkHttpClient, JdkWSClient}
import org.http4s.client.websocket.WSClient
import org.typelevel.log4cats.Logger

import java.net.http.HttpClient
Expand Down Expand Up @@ -72,8 +73,8 @@ object KubernetesClient {
client <- Resource.eval {
Sync[F].delay(HttpClient.newBuilder().sslContext(SslContexts.fromConfig(config)).build())
}
httpClient <- JdkHttpClient[F](client)
wsClient <- JdkWSClient[F](client)
httpClient = JdkHttpClient[F](client)
wsClient = JdkWSClient[F](client)
authorization <- Resource.eval {
OptionT
.fromOption(config.authorization)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ import io.k8s.api.core.v1.{Pod, PodList}
import io.k8s.apimachinery.pkg.apis.meta.v1.Status
import org.http4s.*
import org.http4s.client.Client
import org.http4s.client.websocket.{WSClient, WSDataFrame, WSFrame, WSRequest}
import org.http4s.headers.Authorization
import org.http4s.implicits.*
import org.http4s.jdkhttpclient.*
import org.typelevel.ci.CIString
import org.typelevel.log4cats.Logger
import scodec.bits.ByteVector

import java.nio.file.Path as JPath
import scala.concurrent.duration.DurationInt

Expand Down Expand Up @@ -113,13 +112,11 @@ private[client] class NamespacedPodsApi[F[_]](
("container" -> container) ++?
("command" -> commands)

WSRequest(uri, method = Method.POST)
WSRequest(uri, Headers.empty, Method.POST)
.withOptionalAuthorization(authorization)
.map { r =>
r.copy(
headers = r.headers.put(Header.Raw(CIString("Sec-WebSocket-Protocol"), "v4.channel.k8s.io"))
)
}
.map(r =>
r.withHeaders(headers = r.headers.put(Header.Raw(CIString("Sec-WebSocket-Protocol"), "v4.channel.k8s.io")))
)
}

@deprecated("Use download() which uses fs2.io.file.Path", "0.8.2")
Expand Down Expand Up @@ -200,37 +197,34 @@ private[client] class NamespacedPodsApi[F[_]](
)

val uploadFileResult =
uploadRequest.flatMap { uploadRequest =>
wsClient.connectHighLevel(uploadRequest).use { connection =>
val source = Files[F].readAll(sourceFile, 4096, Flags.Read)
val sendData = source
.mapChunks(chunk => Chunk(WSFrame.Binary(ByteVector(chunk.toChain.prepend(StdInId).toVector))))
.through(connection.sendPipe)
val retryAttempts = 5
val sendWithRetry = Stream
.retry(sendData.compile.drain, delay = 500.millis, nextDelay = _ * 2, maxAttempts = retryAttempts)
.onError { case e =>
Stream.eval(Logger[F].error(e)(s"Failed send file data after $retryAttempts attempts"))
}

val result = for {
signal <- Stream.eval(SignallingRef[F, Boolean](false))
dataStream = sendWithRetry *> Stream.eval(signal.set(true))

output = connection.receiveStream
.through(
processWebSocketData
)
.interruptWhen(signal)
.concurrently(dataStream)
uploadRequest.toResource.flatMap(wsClient.connectHighLevel).use { connection =>
val source = Files[F].readAll(sourceFile, 4096, Flags.Read)
val sendData = source
.mapChunks(chunk => Chunk(WSFrame.Binary(ByteVector(chunk.toChain.prepend(StdInId).toVector))))
.through(connection.sendPipe)
val retryAttempts = 5
val sendWithRetry = Stream
.retry(sendData.compile.drain, delay = 500.millis, nextDelay = _ * 2, maxAttempts = retryAttempts)
.onError { case e =>
Stream.eval(Logger[F].error(e)(s"Failed send file data after $retryAttempts attempts"))
}

val result = for {
signal <- Stream.eval(SignallingRef[F, Boolean](false))
dataStream = sendWithRetry *> Stream.eval(signal.set(true))

output = connection.receiveStream
.through(skipConnectionClosedErrors)
.through(processWebSocketData)
.interruptWhen(signal)
.concurrently(dataStream)

errors = foldErrorStream(
output
).map { case (errors, _) => errors }
} yield errors
errors = foldErrorStream(
output
).map { case (errors, _) => errors }
} yield errors

result.compile.lastOrError.flatten
}
result.compile.lastOrError.flatten
}

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import cats.effect.{Async, Resource}
import com.goyeau.kubernetes.client.KubeConfig
import com.goyeau.kubernetes.client.operation.*
import org.http4s.client.Client
import org.http4s.client.websocket.{WSClient, WSConnectionHighLevel, WSRequest}
import org.http4s.headers.Authorization
import org.http4s.jdkhttpclient.{WSClient, WSConnectionHighLevel, WSRequest}
import org.http4s.{Request, Response}

private[client] class RawApi[F[_]](
Expand Down Expand Up @@ -34,7 +34,7 @@ private[client] class RawApi[F[_]](
request: WSRequest
): Resource[F, WSConnectionHighLevel[F]] =
request
.copy(uri = config.server.resolve(request.uri))
.withUri(config.server.resolve(request.uri))
.withOptionalAuthorization(authorization)
.toResource
.flatMap { request =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import cats.syntax.all.*
import cats.{Applicative, FlatMap}
import org.http4s.client.Client
import org.http4s.headers.Authorization
import org.http4s.jdkhttpclient.WSRequest
import org.http4s.client.websocket.WSRequest
import org.http4s.{EntityDecoder, Request, Response}

package object operation {
Expand All @@ -22,7 +22,7 @@ package object operation {
def withOptionalAuthorization(authorization: Option[F[Authorization]]): F[WSRequest] =
authorization.fold(request.pure[F]) { authorization =>
authorization.map { auth =>
request.copy(headers = request.headers.put(auth))
request.withHeaders(request.headers.put(auth))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions project/Dependencies.sc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ lazy val circe = {
}

lazy val http4s = {
val version = "0.23.24"
val jdkClientVersion = "0.5.0"
val version = "0.23.30"
val jdkClientVersion = "0.10.0"
Agg(
ivy"org.http4s::http4s-dsl:$version",
ivy"org.http4s::http4s-circe:$version",
Expand Down

0 comments on commit 920a0b5

Please sign in to comment.