diff --git a/android-test/build.gradle.kts b/android-test/build.gradle.kts index 8b97915e931b..d945f83dd3a1 100644 --- a/android-test/build.gradle.kts +++ b/android-test/build.gradle.kts @@ -20,7 +20,7 @@ android { testInstrumentationRunner = "androidx.test.runner.AndroidJUnitRunner" testInstrumentationRunnerArguments += mapOf( "runnerBuilder" to "de.mannodermaus.junit5.AndroidJUnit5Builder", - "notPackage" to "org.bouncycastle", + "notPackage" to "org.bouncycastle,com.google.common", "configurationParameters" to "junit.jupiter.extensions.autodetection.enabled=true" ) } @@ -65,6 +65,7 @@ dependencies { "friendsImplementation"(projects.okhttpDnsoverhttps) testImplementation(projects.okhttp) + testImplementation(projects.okhttpCoroutines) testImplementation(libs.junit) testImplementation(libs.junit.ktx) testImplementation(libs.assertk) @@ -104,6 +105,8 @@ dependencies { androidTestImplementation(libs.squareup.moshi) androidTestImplementation(libs.squareup.moshi.kotlin) androidTestImplementation(libs.squareup.okio.fakefilesystem) + androidTestImplementation(libs.kotlinx.coroutines.test) + androidTestImplementation(projects.okhttpCoroutines) androidTestImplementation(libs.androidx.test.runner) androidTestImplementation(libs.junit.jupiter.api) diff --git a/android-test/src/androidTest/java/okhttp/android/test/HttpEngineBridgeTest.kt b/android-test/src/androidTest/java/okhttp/android/test/HttpEngineBridgeTest.kt new file mode 100644 index 000000000000..78b58fc21863 --- /dev/null +++ b/android-test/src/androidTest/java/okhttp/android/test/HttpEngineBridgeTest.kt @@ -0,0 +1,139 @@ +/* + * Copyright (C) 2025 Block, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp.android.test + +import android.content.Context +import android.net.http.ConnectionMigrationOptions +import android.net.http.ConnectionMigrationOptions.MIGRATION_OPTION_ENABLED +import android.net.http.DnsOptions +import android.net.http.DnsOptions.DNS_OPTION_ENABLED +import android.net.http.HttpEngine +import android.net.http.QuicOptions +import androidx.test.core.app.ApplicationProvider +import androidx.test.filters.SdkSuppress +import kotlinx.coroutines.test.runTest +import okhttp3.Cache +import okhttp3.HttpUrl.Companion.toHttpUrl +import okhttp3.OkHttpClient +import okhttp3.Request +import okhttp3.android.httpengine.HttpEngineCallDecorator.Companion.callDecorator +import okhttp3.coroutines.executeAsync +import okio.Path.Companion.toPath +import okio.fakefilesystem.FakeFileSystem +import org.junit.Test + +@SdkSuppress(minSdkVersion = 34) +class HttpEngineBridgeTest { + val context = ApplicationProvider.getApplicationContext() + + val httpEngine = + HttpEngine + .Builder(context) + .setStoragePath( + context.filesDir + .resolve("httpEngine") + .apply { + mkdirs() + }.path, + ).setConnectionMigrationOptions( + ConnectionMigrationOptions + .Builder() + .setAllowNonDefaultNetworkUsage(MIGRATION_OPTION_ENABLED) + .setDefaultNetworkMigration(MIGRATION_OPTION_ENABLED) + .setPathDegradationMigration(MIGRATION_OPTION_ENABLED) + .build(), + ).addQuicHint("www.google.com", 443, 443) + .addQuicHint("google.com", 443, 443) + .setDnsOptions( + DnsOptions + .Builder() + .setPersistHostCache(DNS_OPTION_ENABLED) + .setPreestablishConnectionsToStaleDnsResults(DNS_OPTION_ENABLED) + .setUseHttpStackDnsResolver(DNS_OPTION_ENABLED) + .setStaleDnsOptions( + DnsOptions.StaleDnsOptions + .Builder() + .setUseStaleOnNameNotResolved(DNS_OPTION_ENABLED) + .build(), + ).build(), + ).setEnableQuic(true) + .setQuicOptions( + QuicOptions + .Builder() + .addAllowedQuicHost("www.google.com") + .addAllowedQuicHost("google.com") + .build(), + ).build() + + var client = + OkHttpClient + .Builder() + .addCallDecorator(httpEngine.callDecorator) + .build() + + val imageUrls = + listOf( + "https://storage.googleapis.com/cronet/sun.jpg", + "https://storage.googleapis.com/cronet/flower.jpg", + "https://storage.googleapis.com/cronet/chair.jpg", + "https://storage.googleapis.com/cronet/white.jpg", + "https://storage.googleapis.com/cronet/moka.jpg", + "https://storage.googleapis.com/cronet/walnut.jpg", + ).map { it.toHttpUrl() } + + @Test + fun testNewCall() = + runTest { + val call = client.newCall(Request("https://google.com/robots.txt".toHttpUrl())) + + val response = call.executeAsync() + + println(response.body.string().take(40)) + + val call2 = client.newCall(Request("https://google.com/robots.txt".toHttpUrl())) + + val response2 = call2.executeAsync() + + println(response2.body.string().take(40)) + println(response2.protocol) + } + + @Test + fun testWithCache() = + runTest { + client = + client + .newBuilder() + .cache(Cache(FakeFileSystem(), "/cache".toPath(), 100_000_000)) + .build() + + repeat(10) { + imageUrls.forEach { + val call = client.newCall(Request(it)) + + val response = call.executeAsync() + + println( + "${response.request.url} cached=${response.cacheResponse != null} " + + response.body + .byteString() + .md5() + .hex(), + ) + } + } + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 2db72c4bde91..77fb8524c10d 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -6,6 +6,7 @@ com-squareup-moshi = "1.15.2" com-squareup-okio = "3.16.0" de-mannodermaus-junit5 = "1.8.0" graalvm = "24.2.2" +guava = "33.4.8-android" #noinspection UnusedVersionCatalogEntry junit-platform = "1.13.4" kotlinx-serialization = "1.9.0" @@ -62,6 +63,7 @@ gradlePlugin-mavenPublish = "com.vanniktech:gradle-maven-publish-plugin:0.34.0" gradlePlugin-mavenSympathy = "io.github.usefulness.maven-sympathy:io.github.usefulness.maven-sympathy.gradle.plugin:0.3.0" gradlePlugin-shadow = "com.gradleup.shadow:shadow-gradle-plugin:9.0.2" gradlePlugin-spotless = "com.diffplug.spotless:spotless-plugin-gradle:7.2.1" +guava = { module = "com.google.guava:guava", version.ref = "guava" } hamcrestLibrary = "org.hamcrest:hamcrest-library:3.0" httpClient5 = "org.apache.httpcomponents.client5:httpclient5:5.5" #noinspection NewerVersionAvailable diff --git a/okhttp/api/android/okhttp.api b/okhttp/api/android/okhttp.api index 1f0b9839ac73..be34b850346e 100644 --- a/okhttp/api/android/okhttp.api +++ b/okhttp/api/android/okhttp.api @@ -129,6 +129,16 @@ public abstract interface class okhttp3/Call : java/lang/Cloneable { public abstract fun timeout ()Lokio/Timeout; } +public abstract interface class okhttp3/Call$Chain { + public abstract fun getClient ()Lokhttp3/OkHttpClient; + public abstract fun getRequest ()Lokhttp3/Request; + public abstract fun proceed (Lokhttp3/Request;)Lokhttp3/Call; +} + +public abstract interface class okhttp3/Call$Decorator { + public abstract fun newCall (Lokhttp3/Call$Chain;)Lokhttp3/Call; +} + public abstract interface class okhttp3/Call$Factory { public abstract fun newCall (Lokhttp3/Request;)Lokhttp3/Call; } @@ -902,6 +912,7 @@ public class okhttp3/OkHttpClient : okhttp3/Call$Factory, okhttp3/WebSocket$Fact public final fun fastFallback ()Z public final fun followRedirects ()Z public final fun followSslRedirects ()Z + public final fun getCallDecorators ()Ljava/util/List; public final fun hostnameVerifier ()Ljavax/net/ssl/HostnameVerifier; public final fun interceptors ()Ljava/util/List; public final fun minWebSocketMessageToCompress ()J @@ -927,6 +938,7 @@ public final class okhttp3/OkHttpClient$Builder { public final fun -addInterceptor (Lkotlin/jvm/functions/Function1;)Lokhttp3/OkHttpClient$Builder; public final fun -addNetworkInterceptor (Lkotlin/jvm/functions/Function1;)Lokhttp3/OkHttpClient$Builder; public fun ()V + public final fun addCallDecorator (Lokhttp3/Call$Decorator;)Lokhttp3/OkHttpClient$Builder; public final fun addInterceptor (Lokhttp3/Interceptor;)Lokhttp3/OkHttpClient$Builder; public final fun addNetworkInterceptor (Lokhttp3/Interceptor;)Lokhttp3/OkHttpClient$Builder; public final fun authenticator (Lokhttp3/Authenticator;)Lokhttp3/OkHttpClient$Builder; @@ -1274,3 +1286,33 @@ public abstract class okhttp3/WebSocketListener { public fun onOpen (Lokhttp3/WebSocket;Lokhttp3/Response;)V } +public final class okhttp3/android/httpengine/HttpEngineCallDecorator : okhttp3/Call$Decorator { + public static final field Companion Lokhttp3/android/httpengine/HttpEngineCallDecorator$Companion; + public fun (Landroid/net/http/HttpEngine;Lkotlin/jvm/functions/Function1;)V + public synthetic fun (Landroid/net/http/HttpEngine;Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun newCall (Lokhttp3/Call$Chain;)Lokhttp3/Call; +} + +public final class okhttp3/android/httpengine/HttpEngineCallDecorator$Companion { + public final fun getCallDecorator (Landroid/net/http/HttpEngine;)Lokhttp3/android/httpengine/HttpEngineCallDecorator; +} + +public final class okhttp3/android/httpengine/HttpEngineCallDecorator$HttpEngineCall : okhttp3/Call { + public fun (Lokhttp3/android/httpengine/HttpEngineCallDecorator;Lokhttp3/Call;)V + public fun cancel ()V + public synthetic fun clone ()Ljava/lang/Object; + public fun clone ()Lokhttp3/Call; + public fun enqueue (Lokhttp3/Callback;)V + public fun execute ()Lokhttp3/Response; + public final fun getHttpEngine ()Landroid/net/http/HttpEngine; + public final fun getRealCall ()Lokhttp3/Call; + public fun isCanceled ()Z + public fun isExecuted ()Z + public fun request ()Lokhttp3/Request; + public fun timeout ()Lokio/Timeout; +} + +public final class okhttp3/android/httpengine/HttpEngineTimeoutException : java/io/IOException { + public fun ()V +} + diff --git a/okhttp/api/jvm/okhttp.api b/okhttp/api/jvm/okhttp.api index ca4df1afdcfa..d4a6d7b9d50d 100644 --- a/okhttp/api/jvm/okhttp.api +++ b/okhttp/api/jvm/okhttp.api @@ -129,6 +129,16 @@ public abstract interface class okhttp3/Call : java/lang/Cloneable { public abstract fun timeout ()Lokio/Timeout; } +public abstract interface class okhttp3/Call$Chain { + public abstract fun getClient ()Lokhttp3/OkHttpClient; + public abstract fun getRequest ()Lokhttp3/Request; + public abstract fun proceed (Lokhttp3/Request;)Lokhttp3/Call; +} + +public abstract interface class okhttp3/Call$Decorator { + public abstract fun newCall (Lokhttp3/Call$Chain;)Lokhttp3/Call; +} + public abstract interface class okhttp3/Call$Factory { public abstract fun newCall (Lokhttp3/Request;)Lokhttp3/Call; } @@ -901,6 +911,7 @@ public class okhttp3/OkHttpClient : okhttp3/Call$Factory, okhttp3/WebSocket$Fact public final fun fastFallback ()Z public final fun followRedirects ()Z public final fun followSslRedirects ()Z + public final fun getCallDecorators ()Ljava/util/List; public final fun hostnameVerifier ()Ljavax/net/ssl/HostnameVerifier; public final fun interceptors ()Ljava/util/List; public final fun minWebSocketMessageToCompress ()J @@ -926,6 +937,7 @@ public final class okhttp3/OkHttpClient$Builder { public final fun -addInterceptor (Lkotlin/jvm/functions/Function1;)Lokhttp3/OkHttpClient$Builder; public final fun -addNetworkInterceptor (Lkotlin/jvm/functions/Function1;)Lokhttp3/OkHttpClient$Builder; public fun ()V + public final fun addCallDecorator (Lokhttp3/Call$Decorator;)Lokhttp3/OkHttpClient$Builder; public final fun addInterceptor (Lokhttp3/Interceptor;)Lokhttp3/OkHttpClient$Builder; public final fun addNetworkInterceptor (Lokhttp3/Interceptor;)Lokhttp3/OkHttpClient$Builder; public final fun authenticator (Lokhttp3/Authenticator;)Lokhttp3/OkHttpClient$Builder; diff --git a/okhttp/build.gradle.kts b/okhttp/build.gradle.kts index 3198289c19f4..e9c0903cae63 100644 --- a/okhttp/build.gradle.kts +++ b/okhttp/build.gradle.kts @@ -95,6 +95,7 @@ kotlin { compileOnly(libs.conscrypt.openjdk) implementation(libs.androidx.annotation) implementation(libs.androidx.startup.runtime) + implementation(libs.guava) } } diff --git a/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/HttpEngineCallDecorator.kt b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/HttpEngineCallDecorator.kt new file mode 100644 index 000000000000..4aff68781681 --- /dev/null +++ b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/HttpEngineCallDecorator.kt @@ -0,0 +1,87 @@ +package okhttp3.android.httpengine + +import android.annotation.SuppressLint +import android.net.http.HttpEngine +import android.os.Build +import android.os.ext.SdkExtensions +import androidx.annotation.RequiresExtension +import okhttp3.Call +import okhttp3.Interceptor +import okhttp3.OkHttpClient +import okhttp3.Request +import okhttp3.internal.SuppressSignatureCheck +import okhttp3.internal.cache.CacheInterceptor +import okhttp3.internal.http.BridgeInterceptor +import okhttp3.internal.http.RetryAndFollowUpInterceptor + +@SuppressSignatureCheck +class HttpEngineCallDecorator( + internal val httpEngine: HttpEngine, + private val useHttpEngine: (Request) -> Boolean = { isHttpEngineSupported() }, +) : Call.Decorator { + // TODO make this work with forked clients + internal lateinit var client: OkHttpClient + + @SuppressLint("NewApi") + private val httpEngineInterceptor = HttpEngineInterceptor(this) + + override fun newCall(chain: Call.Chain): Call { + val call = httpEngineCall(chain) + + return call ?: chain.proceed(chain.request) + } + + @SuppressLint("NewApi") + @Synchronized + private fun httpEngineCall(chain: Call.Chain): Call? { + if (!useHttpEngine(chain.request)) { + return null + } + + if (!::client.isInitialized) { + val originalClient = chain.client + client = + originalClient + .newBuilder() + .apply { + networkInterceptors.clear() + + // TODO refactor RetryAndFollowUpInterceptor to not require the Client directly + interceptors += RetryAndFollowUpInterceptor(originalClient) + interceptors += BridgeInterceptor(originalClient.cookieJar) + interceptors += CacheInterceptor(originalClient.cache) + interceptors += httpEngineInterceptor + interceptors += + Interceptor { + throw IllegalStateException("Shouldn't attempt to connect with OkHttp") + } + + // Keep decorators after this one in the new client + callDecorators.subList(0, callDecorators.indexOf(this@HttpEngineCallDecorator) + 1).clear() + }.build() + } + + return HttpEngineCall(client.newCall(chain.request)) + } + + @RequiresExtension(extension = Build.VERSION_CODES.S, version = 7) + inner class HttpEngineCall( + val realCall: Call, + ) : Call by realCall { + val httpEngine: HttpEngine + get() = this@HttpEngineCallDecorator.httpEngine + + override fun cancel() { + realCall.cancel() + httpEngineInterceptor.cancelCall(realCall) + } + } + + companion object { + val HttpEngine.callDecorator + get() = HttpEngineCallDecorator(this) + } +} + +private fun isHttpEngineSupported(): Boolean = + Build.VERSION.SDK_INT >= Build.VERSION_CODES.R && SdkExtensions.getExtensionVersion(Build.VERSION_CODES.S) >= 7 diff --git a/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/HttpEngineInterceptor.kt b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/HttpEngineInterceptor.kt new file mode 100644 index 000000000000..617594ef61cd --- /dev/null +++ b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/HttpEngineInterceptor.kt @@ -0,0 +1,107 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.android.httpengine + +import android.net.http.UrlRequest +import android.os.Build +import androidx.annotation.RequiresExtension +import java.io.IOException +import java.util.concurrent.ConcurrentHashMap +import okhttp3.Call +import okhttp3.Interceptor +import okhttp3.Response +import okhttp3.ResponseBody +import okhttp3.internal.SuppressSignatureCheck + +/** + * An OkHttp interceptor that redirects HTTP traffic to use Cronet instead of using the OkHttp + * network stack. + * + * The interceptor should be used as the last application interceptor to ensure that all other + * interceptors are visited before sending the request on wire and after a response is returned. + * + * The interceptor is a plug-and-play replacement for the OkHttp stack for the most part, + * however, there are some caveats to keep in mind: + * + * 1. The majority of OkHttp core is bypassed. This includes all network + * interceptors. + * 2. Some response fields are not being populated due to mismatches between Cronet's and + * OkHttp's architecture. + */ +@SuppressSignatureCheck +@RequiresExtension(extension = Build.VERSION_CODES.S, version = 7) +internal class HttpEngineInterceptor( + httpEngineDecorator: HttpEngineCallDecorator, +) : Interceptor { + private val converter: RequestResponseConverter = RequestResponseConverter.build(httpEngineDecorator) + private val activeCalls: MutableMap = ConcurrentHashMap() + + @Throws(IOException::class) + override fun intercept(chain: Interceptor.Chain): Response { + if (chain.call().isCanceled()) { + throw IOException("Canceled") + } + + val request = chain.request() + + val requestAndOkHttpResponse: RequestResponseConverter.CronetRequestAndOkHttpResponse = + converter.convert(request) + + activeCalls[chain.call()] = requestAndOkHttpResponse.request + + try { + requestAndOkHttpResponse.request.start() + return toInterceptorResponse(requestAndOkHttpResponse.response, chain.call()) + } catch (e: RuntimeException) { + // If the response is retrieved successfully the caller is responsible for closing + // the response, which will remove it from the active calls map. + activeCalls.remove(chain.call()) + throw e + } catch (e: IOException) { + activeCalls.remove(chain.call()) + throw e + } + } + + private fun toInterceptorResponse( + response: Response, + call: Call, + ): Response { + checkNotNull(response.body) + + if (response.body is HttpEngineInterceptorResponseBody) { + return response + } + + return response + .newBuilder() + .body(HttpEngineInterceptorResponseBody(response.body, call)) + .build() + } + + fun cancelCall(call: Call) { + activeCalls.remove(call) + } + + private inner class HttpEngineInterceptorResponseBody( + delegate: ResponseBody, + private val call: Call, + ) : HttpEngineTransportResponseBody(delegate) { + override fun customCloseHook() { + activeCalls.remove(call) + } + } +} diff --git a/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/HttpEngineTimeoutException.kt b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/HttpEngineTimeoutException.kt new file mode 100644 index 000000000000..3d2326d839ff --- /dev/null +++ b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/HttpEngineTimeoutException.kt @@ -0,0 +1,20 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.android.httpengine + +import java.io.IOException + +class HttpEngineTimeoutException : IOException() diff --git a/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/HttpEngineTransportResponseBody.kt b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/HttpEngineTransportResponseBody.kt new file mode 100644 index 000000000000..05418cc4c36c --- /dev/null +++ b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/HttpEngineTransportResponseBody.kt @@ -0,0 +1,42 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.android.httpengine + +import android.os.Build +import androidx.annotation.RequiresExtension +import okhttp3.MediaType +import okhttp3.ResponseBody +import okhttp3.internal.SuppressSignatureCheck +import okio.BufferedSource + +@SuppressSignatureCheck +@RequiresExtension(extension = Build.VERSION_CODES.S, version = 7) +internal abstract class HttpEngineTransportResponseBody protected constructor( + private val delegate: ResponseBody, +) : ResponseBody() { + override fun contentType(): MediaType? = delegate.contentType() + + override fun contentLength(): Long = delegate.contentLength() + + override fun source(): BufferedSource = delegate.source() + + override fun close() { + delegate.close() + customCloseHook() + } + + abstract fun customCloseHook() +} diff --git a/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/OkHttpBridgeRequestCallback.kt b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/OkHttpBridgeRequestCallback.kt new file mode 100644 index 000000000000..c3fe2bfac760 --- /dev/null +++ b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/OkHttpBridgeRequestCallback.kt @@ -0,0 +1,281 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.android.httpengine + +import android.net.http.HttpException +import android.net.http.UrlRequest +import android.net.http.UrlResponseInfo +import android.os.Build +import androidx.annotation.RequiresExtension +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.SettableFuture +import java.io.IOException +import java.nio.ByteBuffer +import java.util.Collections +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.concurrent.Volatile +import okhttp3.OkHttpClient +import okhttp3.internal.SuppressSignatureCheck +import okio.Buffer +import okio.Source +import okio.Timeout + +/** + * An implementation of Cronet's callback. This is the heart of the bridge and deals with most of + * the async-sync paradigm translation. + * + * + * Translating the UrlResponseInfo is relatively straightforward as the entire object is + * available immediately and is relatively small, so it can easily fit in memory. + * + * + * Translating the body is a bit more tricky because of the mismatch between OkHttp and Cronet + * designs. We invoke Cronet's read and wait for the result using synchronization primitives (see + * BodySource implementation). The implementation is assuming that there's always at most one read() + * request in flight (which is safe to assume), and relies on reasonable fairness of thread + * scheduling, especially when handling cancellations. + */ +@SuppressSignatureCheck +@RequiresExtension(extension = Build.VERSION_CODES.S, version = 7) +internal class OkHttpBridgeRequestCallback( + private val client: OkHttpClient, +) : UrlRequest.Callback { + /** A bridge between Cronet's asynchronous callbacks and OkHttp's blocking stream-like reads. */ + private val bodySourceFuture: SettableFuture = SettableFuture.create() + + /** Signal whether the request is finished and the response has been fully read. */ + private val finished = AtomicBoolean(false) + + /** Signal whether the request was canceled. */ + private val canceled = AtomicBoolean(false) + + /** + * An internal, blocking, thread safe way of passing data between the callback methods and [ ][.bodySourceFuture]. + * + * + * Has a capacity of 2 - at most one slot for a read result and at most 1 slot for cancellation + * signal, this guarantees that all inserts are non-blocking. + */ + private val callbackResults: BlockingQueue = ArrayBlockingQueue(2) + + /** The response headers. */ + private val headersFuture: SettableFuture = SettableFuture.create() + + /** The previous responses as reported to [.onRedirectReceived], from oldest to newest. * */ + private val urlResponseInfoChain: MutableList = ArrayList() + + /** The request being processed. Set when the request is first seen by the callback. */ + @Volatile + private var request: UrlRequest? = null + + val urlResponseInfo: ListenableFuture + /** Returns the [UrlResponseInfo] for the request associated with this callback. */ + get() = headersFuture + + val bodySource: ListenableFuture + /** + * Returns the OkHttp [Source] for the request associated with this callback. + * + * Note that retrieving data from the `Source` instance might block further as the + * response body is streamed. + */ + get() = bodySourceFuture + + fun getUrlResponseInfoChain(): MutableList = Collections.unmodifiableList(urlResponseInfoChain) + + override fun onRedirectReceived( + urlRequest: UrlRequest, + urlResponseInfo: UrlResponseInfo, + nextUrl: String, + ) { + check(headersFuture.set(urlResponseInfo)) + // Note: This might not match the content length headers but we have no way of accessing + // the actual body with current Cronet's APIs (see RedirectStrategy). + check(bodySourceFuture.set(Buffer())) + urlRequest.cancel() + } + + override fun onResponseStarted( + urlRequest: UrlRequest, + urlResponseInfo: UrlResponseInfo, + ) { + request = urlRequest + + check(headersFuture.set(urlResponseInfo)) + check(bodySourceFuture.set(CronetBodySource())) + } + + override fun onReadCompleted( + urlRequest: UrlRequest, + urlResponseInfo: UrlResponseInfo, + byteBuffer: ByteBuffer, + ) { + callbackResults.add(CallbackResult(CallbackStep.ON_READ_COMPLETED, byteBuffer, null)) + } + + override fun onSucceeded( + urlRequest: UrlRequest, + urlResponseInfo: UrlResponseInfo, + ) { + callbackResults.add(CallbackResult(CallbackStep.ON_SUCCESS, null, null)) + } + + override fun onFailed( + urlRequest: UrlRequest, + urlResponseInfo: UrlResponseInfo?, + e: HttpException, + ) { + // If this was called before we start reading the body, the exception will + // propagate in the future providing headers and the body wrapper. + if (headersFuture.setException(e) && bodySourceFuture.setException(e)) { + return + } + + // If this was called as a reaction to a read() call, the read result will propagate + // the exception. + callbackResults.add(CallbackResult(CallbackStep.ON_FAILED, null, e)) + } + + override fun onCanceled( + urlRequest: UrlRequest, + responseInfo: UrlResponseInfo?, + ) { + canceled.set(true) + callbackResults.add(CallbackResult(CallbackStep.ON_CANCELED, null, null)) + + // If there's nobody listening it's possible that the cancellation happened before we even + // received anything from the server. In that case inform the thread that's awaiting server + // response about the cancellation as well. This becomes a no-op if the futures + // were already set. + val e = IOException("The request was canceled!") + headersFuture.setException(e) + bodySourceFuture.setException(e) + } + + private inner class CronetBodySource : Source { + private var buffer: ByteBuffer? = ByteBuffer.allocateDirect(CRONET_BYTE_BUFFER_CAPACITY) + + /** Whether the close() method has been called. */ + @Volatile + private var closed = false + + @Throws(IOException::class) + override fun read( + sink: Buffer, + byteCount: Long, + ): Long { + if (canceled.get()) { + throw IOException("The request was canceled!") + } + + // Using IAE instead of NPE (checkNotNull) for okio.RealBufferedSource consistency + check(byteCount >= 0) { "byteCount < 0: $byteCount" } + check(!closed) { "closed" } + + if (finished.get()) { + return -1 + } + + if (byteCount < buffer!!.limit()) { + buffer!!.limit(byteCount.toInt()) + } + + request!!.read(buffer!!) + + var result: CallbackResult? + try { + result = callbackResults.poll(client.readTimeoutMillis.toLong(), TimeUnit.MILLISECONDS) + } catch (e: InterruptedException) { + Thread.currentThread().interrupt() + result = null + } + + if (result == null) { + // Either readResult.poll() was interrupted or it timed out. + request!!.cancel() + throw HttpEngineTimeoutException() + } + + when (result.callbackStep) { + CallbackStep.ON_FAILED -> { + finished.set(true) + buffer = null + throw IOException(result.exception) + } + + CallbackStep.ON_SUCCESS -> { + finished.set(true) + buffer = null + return -1 + } + + CallbackStep.ON_CANCELED -> { + // The canceled flag is already set by the onCanceled method + // so not setting it here. + buffer = null + throw IOException("The request was canceled!") + } + + CallbackStep.ON_READ_COMPLETED -> { + val resultBuffer = result.buffer!! + resultBuffer.flip() + val bytesWritten = sink.write(result.buffer) + result.buffer.clear() + return bytesWritten.toLong() + } + } + } + + override fun timeout(): Timeout { + // TODO(danstahr): This should likely respect the OkHttp timeout somehow + return Timeout.NONE + } + + override fun close() { + if (closed) { + return + } + closed = true + if (!finished.get()) { + request!!.cancel() + } + } + } + + private class CallbackResult( + val callbackStep: CallbackStep, + val buffer: ByteBuffer?, + val exception: HttpException?, + ) + + private enum class CallbackStep { + ON_READ_COMPLETED, + ON_SUCCESS, + ON_FAILED, + ON_CANCELED, + } + + companion object { + /** + * The byte buffer capacity for reading Cronet response bodies. Each response callback will + * allocate its own buffer of this size once the response starts being processed. + */ + private val CRONET_BYTE_BUFFER_CAPACITY = 32 * 1024 + } +} diff --git a/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/RequestBodyConverter.kt b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/RequestBodyConverter.kt new file mode 100644 index 000000000000..051a255ffdd8 --- /dev/null +++ b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/RequestBodyConverter.kt @@ -0,0 +1,27 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.android.httpengine + +import android.net.http.UploadDataProvider +import okhttp3.RequestBody + +/** An interface for classes converting from OkHttp to Cronet request bodies. */ +internal interface RequestBodyConverter { + fun convertRequestBody( + requestBody: RequestBody, + writeTimeoutMillis: Int, + ): UploadDataProvider +} diff --git a/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/RequestBodyConverterImpl.kt b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/RequestBodyConverterImpl.kt new file mode 100644 index 000000000000..8bdef964187e --- /dev/null +++ b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/RequestBodyConverterImpl.kt @@ -0,0 +1,349 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.android.httpengine + +import android.net.http.UploadDataProvider +import android.net.http.UploadDataSink +import android.os.Build +import androidx.annotation.RequiresExtension +import androidx.annotation.VisibleForTesting +import com.google.common.base.Verify +import com.google.common.util.concurrent.FutureCallback +import com.google.common.util.concurrent.Futures +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.ListeningExecutorService +import com.google.common.util.concurrent.MoreExecutors +import com.google.common.util.concurrent.Uninterruptibles +import java.io.IOException +import java.nio.ByteBuffer +import java.util.concurrent.Callable +import java.util.concurrent.ExecutionException +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException +import kotlin.concurrent.Volatile +import okhttp3.RequestBody +import okhttp3.android.httpengine.UploadBodyDataBroker.ReadResult +import okhttp3.internal.SuppressSignatureCheck +import okio.Buffer +import okio.BufferedSink +import okio.buffer + +@SuppressSignatureCheck +@RequiresExtension(extension = Build.VERSION_CODES.S, version = 7) +internal class RequestBodyConverterImpl( + private val inMemoryRequestBodyConverter: InMemoryRequestBodyConverter, + private val streamingRequestBodyConverter: StreamingRequestBodyConverter, +) : RequestBodyConverter { + @Throws(IOException::class) + override fun convertRequestBody( + requestBody: RequestBody, + writeTimeoutMillis: Int, + ): UploadDataProvider { + val contentLength = requestBody.contentLength() + if (contentLength == -1L || contentLength > IN_MEMORY_BODY_LENGTH_THRESHOLD_BYTES) { + return streamingRequestBodyConverter.convertRequestBody(requestBody, writeTimeoutMillis) + } else { + return inMemoryRequestBodyConverter.convertRequestBody(requestBody, writeTimeoutMillis) + } + } + + /** + * Implementation of [RequestBodyConverter] that doesn't need to hold the entire request + * body in memory. + * + * + * + * + * + * 1. [RequestBody.writeTo] is invoked on the body, but the sink doesn't + * accept any data + * 1. A call to [UploadDataProvider.read] unblocks the sink, + * which accepts a part of the body (size depends on the buffer's capacity), then blocks + * again. Buffer is sent to Cronet. + * + * + * This is repeated until the entire body has been read. + */ + @VisibleForTesting + internal class StreamingRequestBodyConverter( + private val httpEngineCallDecorator: HttpEngineCallDecorator, + ) : RequestBodyConverter { + @RequiresExtension(extension = Build.VERSION_CODES.S, version = 7) + override fun convertRequestBody( + requestBody: RequestBody, + writeTimeoutMillis: Int, + ): UploadDataProvider = + StreamingUploadDataProvider( + requestBody, + UploadBodyDataBroker(), + httpEngineCallDecorator, + writeTimeoutMillis.toLong(), + ) + + @RequiresExtension(extension = Build.VERSION_CODES.S, version = 7) + private class StreamingUploadDataProvider( + private val okHttpRequestBody: RequestBody, + private val broker: UploadBodyDataBroker, + httpEngineCallDecorator: HttpEngineCallDecorator, + writeTimeoutMillis: Long, + ) : UploadDataProvider() { + private val readTaskExecutor: ListeningExecutorService by lazy { + MoreExecutors.listeningDecorator(httpEngineCallDecorator.client.dispatcher.executorService) + } + + // So that we don't have to special case infinity. Int.MAX_VALUE is ~infinity for all + // practical use cases. + private val writeTimeoutMillis: Long = + if (writeTimeoutMillis == 0L) Int.MAX_VALUE.toLong() else writeTimeoutMillis + + /** The future for the task that reads the OkHttp request body in the background. */ + private var readTaskFuture: ListenableFuture? = null + + /** The number of bytes we read from the OkHttp body thus far. */ + private var totalBytesReadFromOkHttp: Long = 0 + + @Throws(IOException::class) + override fun getLength(): Long = okHttpRequestBody.contentLength() + + @Throws(IOException::class) + override fun read( + uploadDataSink: UploadDataSink, + byteBuffer: ByteBuffer, + ) { + ensureReadTaskStarted() + + if (length == -1L) { + readUnknownBodyLength(uploadDataSink, byteBuffer) + } else { + readKnownBodyLength(uploadDataSink, byteBuffer) + } + } + + @Throws(IOException::class) + fun readKnownBodyLength( + uploadDataSink: UploadDataSink, + byteBuffer: ByteBuffer, + ) { + try { + val readResult: ReadResult = readFromOkHttp(byteBuffer) + + if (totalBytesReadFromOkHttp > length) { + throw prepareBodyTooLongException(length, totalBytesReadFromOkHttp) + } + + if (totalBytesReadFromOkHttp < length) { + when (readResult) { + ReadResult.SUCCESS -> uploadDataSink.onReadSucceeded(false) + ReadResult.END_OF_BODY -> throw IOException("The source has been exhausted but we expected more data!") + } + return + } + // Else we're handling what's supposed to be the last chunk + handleLastBodyRead(uploadDataSink, byteBuffer) + } catch (e: TimeoutException) { + readTaskFuture!!.cancel(true) + uploadDataSink.onReadError(IOException(e)) + } catch (e: ExecutionException) { + readTaskFuture!!.cancel(true) + uploadDataSink.onReadError(IOException(e)) + } + } + + /** + * The last body read is special for fixed length bodies - if Cronet receives exactly the + * right amount of data it won't ask for more, even if there is more data in the stream. As a + * result, when we read the advertised number of bytes, we need to make sure that the stream + * is indeed finished. + */ + fun handleLastBodyRead( + uploadDataSink: UploadDataSink, + filledByteBuffer: ByteBuffer, + ) { + // We reuse the same buffer for the END_OF_DATA read (it should be non-destructive and if + // it overwrites what's in there we don't mind as that's an error anyway). We just need + // to make sure we restore the original position afterwards. We don't use mark() / reset() + // as the mark position can be invalidated by limit manipulation. + val bufferPosition = filledByteBuffer.position() + filledByteBuffer.position(0) + + val readResult: ReadResult = readFromOkHttp(filledByteBuffer) + + if (readResult != ReadResult.END_OF_BODY) { + throw prepareBodyTooLongException(length, totalBytesReadFromOkHttp) + } + + Verify.verify( + filledByteBuffer.position() == 0, + "END_OF_BODY reads shouldn't write anything to the buffer", + ) + + // revert the position change + filledByteBuffer.position(bufferPosition) + + uploadDataSink.onReadSucceeded(false) + } + + fun readUnknownBodyLength( + uploadDataSink: UploadDataSink, + byteBuffer: ByteBuffer, + ) { + try { + val readResult: ReadResult = readFromOkHttp(byteBuffer) + uploadDataSink.onReadSucceeded(readResult == ReadResult.END_OF_BODY) + } catch (e: TimeoutException) { + readTaskFuture!!.cancel(true) + uploadDataSink.onReadError(IOException(e)) + } catch (e: ExecutionException) { + readTaskFuture!!.cancel(true) + uploadDataSink.onReadError(IOException(e)) + } + } + + fun ensureReadTaskStarted() { + // We don't expect concurrent calls so a simple flag is sufficient + if (readTaskFuture == null) { + readTaskFuture = + readTaskExecutor.submit( + Callable { + val bufferedSink: BufferedSink = broker.buffer() + okHttpRequestBody.writeTo(bufferedSink) + bufferedSink.flush() + broker.handleEndOfStreamSignal() + }, + ) + + Futures.addCallback( + readTaskFuture!!, + object : FutureCallback { + override fun onSuccess(result: Unit) {} + + override fun onFailure(t: Throwable) { + broker.setBackgroundReadError(t) + } + }, + MoreExecutors.directExecutor(), + ) + } + } + + @Throws(TimeoutException::class, ExecutionException::class) + fun readFromOkHttp(byteBuffer: ByteBuffer): ReadResult { + val positionBeforeRead = byteBuffer.position() + val readResult: ReadResult = + Uninterruptibles.getUninterruptibly( + broker.enqueueBodyRead(byteBuffer), + writeTimeoutMillis, + TimeUnit.MILLISECONDS, + ) + val bytesRead = byteBuffer.position() - positionBeforeRead + totalBytesReadFromOkHttp += bytesRead.toLong() + return readResult + } + + override fun rewind(uploadDataSink: UploadDataSink) { + // TODO(danstahr): OkHttp 4 can use isOneShot flag here and rewind safely. + uploadDataSink.onRewindError(UnsupportedOperationException("Rewind is not supported!")) + } + + companion object { + private fun prepareBodyTooLongException( + expectedLength: Long, + minActualLength: Long, + ): IOException = + IOException( + "Expected " + expectedLength + " bytes but got at least " + minActualLength, + ) + } + } + } + + /** + * Converts OkHttp's [RequestBody] to Cronet's [UploadDataProvider] by materializing + * the body in memory first. + * + * + * This strategy shouldn't be used for large requests (and for requests with uncapped length) + * to avoid OOM issues. + */ + @RequiresExtension(extension = Build.VERSION_CODES.S, version = 7) + @VisibleForTesting + internal class InMemoryRequestBodyConverter : RequestBodyConverter { + @Throws(IOException::class) + override fun convertRequestBody( + requestBody: RequestBody, + writeTimeoutMillis: Int, + ): UploadDataProvider { + // content length is immutable by contract + + val length = requestBody.contentLength() + if (length < 0 || length > IN_MEMORY_BODY_LENGTH_THRESHOLD_BYTES) { + throw IOException( + ( + "Expected definite length less than " + + IN_MEMORY_BODY_LENGTH_THRESHOLD_BYTES + + "but got " + + length + ), + ) + } + + return object : UploadDataProvider() { + @Volatile + private var isMaterialized = false + private val materializedBody = Buffer() + + override fun getLength(): Long = length + + @Throws(IOException::class) + override fun read( + uploadDataSink: UploadDataSink, + byteBuffer: ByteBuffer, + ) { + // We're not expecting any concurrent calls here so a simple flag should be sufficient. + if (!isMaterialized) { + requestBody.writeTo(materializedBody) + materializedBody.flush() + isMaterialized = true + val reportedLength = getLength() + val actualLength = materializedBody.size + if (actualLength != reportedLength) { + throw IOException( + "Expected " + reportedLength + " bytes but got " + actualLength, + ) + } + } + check(materializedBody.read(byteBuffer) != -1) { "The source has been exhausted but we expected more!" } + uploadDataSink.onReadSucceeded(false) + } + + override fun rewind(uploadDataSink: UploadDataSink) { + // TODO(danstahr): OkHttp 4 can use isOneShot flag here and rewind safely. + uploadDataSink.onRewindError(UnsupportedOperationException()) + } + } + } + } + + companion object { + private val IN_MEMORY_BODY_LENGTH_THRESHOLD_BYTES = (1024 * 1024).toLong() + + fun create(httpEngineCallDecorator: HttpEngineCallDecorator): RequestBodyConverterImpl = + RequestBodyConverterImpl( + InMemoryRequestBodyConverter(), + StreamingRequestBodyConverter(httpEngineCallDecorator), + ) + } +} diff --git a/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/RequestResponseConverter.kt b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/RequestResponseConverter.kt new file mode 100644 index 000000000000..ac8c539ea0fb --- /dev/null +++ b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/RequestResponseConverter.kt @@ -0,0 +1,159 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.android.httpengine + +import android.net.http.UrlRequest +import android.os.Build +import androidx.annotation.RequiresExtension +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.MoreExecutors +import java.io.IOException +import okhttp3.Request +import okhttp3.Response +import okhttp3.internal.SuppressSignatureCheck + +@SuppressSignatureCheck +/** Converts OkHttp requests to Cronet requests. */ +@RequiresExtension(extension = Build.VERSION_CODES.S, version = 7) +internal class RequestResponseConverter( + private val httpEngineDecorator: HttpEngineCallDecorator, + private val requestBodyConverter: RequestBodyConverter, + private val responseConverter: ResponseConverter, +) { + private val executor by lazy { + httpEngineDecorator.client.dispatcher.executorService + } + + /** + * Converts OkHttp's [Request] to a corresponding Cronet's [UrlRequest]. + * + * + * Since Cronet doesn't have a notion of a Response, which is handled entirely from the + * callbacks, this method also returns a [Future] like object the + * caller should use to obtain the matching [Response] for the given request. For example: + * + *
+   * RequestResponseConverter converter = ...
+   * CronetRequestAndOkHttpResponse reqResp = converter.convert(okHttpRequest);
+   * reqResp.getRequest.start();
+   *
+   * // Will block until status code, headers... are available
+   * Response okHttpResponse = reqResp.getResponse();
+   *
+   * // use OkHttp Response as usual
+   
* + */ + @RequiresExtension(extension = Build.VERSION_CODES.S, version = 7) + @Throws(IOException::class) + fun convert(okHttpRequest: Request): CronetRequestAndOkHttpResponse { + val callback = + OkHttpBridgeRequestCallback(httpEngineDecorator.client) + + // The OkHttp request callback methods are lightweight, the heavy lifting is done by OkHttp / + // app owned threads. Use a direct executor to avoid extra thread hops. + val builder: UrlRequest.Builder = + httpEngineDecorator.httpEngine + .newUrlRequestBuilder( + okHttpRequest.url.toString(), + MoreExecutors.directExecutor(), + callback, + ).setDirectExecutorAllowed(true) + + builder.setHttpMethod(okHttpRequest.method) + + for (i in 0.. by lazy { + responseConverter.toResponseAsync( + request, + callback, + ) + } + } + + /** A [Future] like holder for OkHttp's [Response]. */ + internal interface ResponseSupplier { + @get:Throws(IOException::class) + val response: Response + + val responseFuture: ListenableFuture + } + + /** A simple data class for bundling Cronet request and OkHttp response. */ + internal class CronetRequestAndOkHttpResponse( + val request: UrlRequest, + private val responseSupplier: ResponseSupplier, + ) { + val response: Response + get() = responseSupplier.response + + val responseAsync: ListenableFuture + get() = responseSupplier.responseFuture + } + + companion object { + private const val CONTENT_LENGTH_HEADER_NAME = "Content-Length" + private const val CONTENT_TYPE_HEADER_NAME = "Content-Type" + private const val CONTENT_TYPE_HEADER_DEFAULT_VALUE = "application/octet-stream" + + fun build(httpEngineDecorator: HttpEngineCallDecorator): RequestResponseConverter = + RequestResponseConverter( + httpEngineDecorator, + // There must always be enough executors to blocking-read the OkHttp request bodies + // otherwise deadlocks can occur. + RequestBodyConverterImpl.create(httpEngineDecorator), + ResponseConverter(), + ) + } +} diff --git a/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/ResponseConverter.kt b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/ResponseConverter.kt new file mode 100644 index 000000000000..c9a3b7f8ef6f --- /dev/null +++ b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/ResponseConverter.kt @@ -0,0 +1,278 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.android.httpengine + +import android.net.http.UrlResponseInfo +import android.os.Build +import androidx.annotation.RequiresExtension +import com.google.common.base.Ascii +import com.google.common.base.Splitter +import com.google.common.collect.ImmutableSet +import com.google.common.collect.Iterables +import com.google.common.util.concurrent.Futures +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.MoreExecutors +import com.google.common.util.concurrent.Uninterruptibles +import java.io.IOException +import java.net.ProtocolException +import java.util.concurrent.ExecutionException +import java.util.concurrent.Future +import okhttp3.CipherSuite +import okhttp3.Handshake +import okhttp3.MediaType.Companion.toMediaTypeOrNull +import okhttp3.Protocol +import okhttp3.Request +import okhttp3.Response +import okhttp3.ResponseBody +import okhttp3.ResponseBody.Companion.asResponseBody +import okhttp3.TlsVersion +import okhttp3.internal.SuppressSignatureCheck +import okio.Source +import okio.buffer + +/** + * Converts Cronet's responses (or, more precisely, its chunks as they come from Cronet's [ ]), to OkHttp's [Response]. + */ +@SuppressSignatureCheck +@RequiresExtension(extension = Build.VERSION_CODES.S, version = 7) +internal class ResponseConverter { + /** + * Creates an OkHttp's Response from the OkHttp-Cronet bridging callback. + * + * + * As long as the callback's `UrlResponseInfo` is available this method is non-blocking. + * However, this method doesn't fetch the entire body response. As a result, subsequent calls to + * the result's [Response.body] methods might block further. + */ + @Throws(IOException::class) + fun toResponse( + request: Request, + callback: OkHttpBridgeRequestCallback, + ): Response { + val cronetResponseInfo: UrlResponseInfo = + getFutureValue(callback.urlResponseInfo) + val responseBuilder: Response.Builder = + createResponse(request, cronetResponseInfo, getFutureValue(callback.bodySource)) + + val redirectResponseInfos = callback.getUrlResponseInfoChain() + val urlChain = cronetResponseInfo.urlChain + + if (!redirectResponseInfos.isEmpty()) { + check( + urlChain.size == redirectResponseInfos.size + 1, + ) { + "The number of redirects should be consistent across URLs and headers!" + } + + var priorResponse: Response? = null + for (i in redirectResponseInfos.indices) { + val redirectedRequest = request.newBuilder().url(urlChain[i]).build() + priorResponse = + createResponse(redirectedRequest, redirectResponseInfos[i], null) + .priorResponse(priorResponse) + .build() + } + + responseBuilder + .request(request.newBuilder().url(Iterables.getLast(urlChain)).build()) + .priorResponse(priorResponse) + } + + return responseBuilder.build() + } + + fun toResponseAsync( + request: Request, + callback: OkHttpBridgeRequestCallback, + ): ListenableFuture = + Futures + .whenAllComplete(callback.urlResponseInfo, callback.bodySource) + .call({ toResponse(request, callback) }, MoreExecutors.directExecutor()) + + companion object { + private const val CONTENT_LENGTH_HEADER_NAME = "Content-Length" + private const val CONTENT_TYPE_HEADER_NAME = "Content-Type" + private const val CONTENT_ENCODING_HEADER_NAME = "Content-Encoding" + + // https://source.chromium.org/search?q=symbol:FilterSourceStream::ParseEncodingType%20f:cc + private val ENCODINGS_HANDLED_BY_CRONET: ImmutableSet = + ImmutableSet.of("br", "deflate", "gzip", "x-gzip") + + private val COMMA_SPLITTER: Splitter = Splitter.on(',').trimResults().omitEmptyStrings() + + @Throws(IOException::class) + private fun createResponse( + request: Request, + cronetResponseInfo: UrlResponseInfo, + bodySource: Source?, + ): Response.Builder { + val responseBuilder = Response.Builder() + + val contentType: String? = getLastHeaderValue(CONTENT_TYPE_HEADER_NAME, cronetResponseInfo) + + // If all content encodings are those known to Cronet natively, Cronet decodes the body stream. + // Otherwise, it's sent to the callbacks verbatim. For consistency with OkHttp, we only leave + // the Content-Encoding headers if Cronet didn't decode the request. Similarly, for consistency, + // we strip the Content-Length header of decoded responses. + var contentLengthString: String? = null + + // Theoretically, the content encodings can be scattered across multiple comma separated + // Content-Encoding headers. This list contains individual encodings. + val contentEncodingItems: List = + buildList { + val headerMap = cronetResponseInfo.headers.asMap + headerMap[CONTENT_ENCODING_HEADER_NAME]?.forEach { + addAll(COMMA_SPLITTER.split(it)) + } + } + + val keepEncodingAffectedHeaders = + contentEncodingItems.isEmpty() || + !ENCODINGS_HANDLED_BY_CRONET.containsAll(contentEncodingItems) + + if (keepEncodingAffectedHeaders) { + contentLengthString = getLastHeaderValue(CONTENT_LENGTH_HEADER_NAME, cronetResponseInfo) + } + + var responseBody: ResponseBody? = null + if (bodySource != null) { + responseBody = + createResponseBody( + request, + cronetResponseInfo.httpStatusCode, + contentType, + contentLengthString, + bodySource, + ) + } + + responseBuilder + .request(request) + .code(cronetResponseInfo.httpStatusCode) + .message(cronetResponseInfo.httpStatusText) + .protocol(convertProtocol(cronetResponseInfo.negotiatedProtocol)) + // TODO don't fake this + .handshake( + Handshake(TlsVersion.TLS_1_3, CipherSuite.TLS_AES_128_CCM_8_SHA256, listOf(), { listOf() }), + ).apply { + if (responseBody != null) { + body(responseBody) + } + } + + for (header in cronetResponseInfo.headers.asList) { + var copyHeader = true + if (!keepEncodingAffectedHeaders) { + if (Ascii.equalsIgnoreCase(header.key, CONTENT_LENGTH_HEADER_NAME) || + Ascii.equalsIgnoreCase(header.key, CONTENT_ENCODING_HEADER_NAME) + ) { + copyHeader = false + } + } + if (copyHeader) { + responseBuilder.addHeader(header.key, header.value) + } + } + + return responseBuilder + } + + /** + * Creates an OkHttp's ResponseBody from the OkHttp-Cronet bridging callback. + * + * + * As long as the callback's `UrlResponseInfo` is available this method is non-blocking. + * However, this method doesn't fetch the entire body response. As a result, subsequent calls to + * [ResponseBody] methods might block further to fetch parts of the body. + */ + @Throws(IOException::class) + private fun createResponseBody( + request: Request, + httpStatusCode: Int, + contentType: String?, + contentLengthString: String?, + bodySource: Source, + ): ResponseBody { + // Ignore content-length header for HEAD requests (consistency with OkHttp) + val contentLength: Long = + if (request.method == "HEAD") { + 0 + } else { + try { + contentLengthString?.toLong() ?: -1 + } catch (e: NumberFormatException) { + // TODO(danstahr): add logging + -1 + } + } + + // Check for absence of body in No Content / Reset Content responses (OkHttp consistency) + if ((httpStatusCode == 204 || httpStatusCode == 205) && contentLength > 0) { + throw ProtocolException( + "HTTP $httpStatusCode had non-zero Content-Length: $contentLengthString", + ) + } + + return bodySource + .buffer() + .asResponseBody( + contentType?.toMediaTypeOrNull(), + contentLength, + ) + } + + /** Converts Cronet's negotiated protocol string to OkHttp's [Protocol]. */ + private fun convertProtocol(negotiatedProtocol: String): Protocol { + // See + // https://www.iana.org/assignments/tls-extensiontype-values/tls-extensiontype-values.xhtml#alpn-protocol-ids + if (negotiatedProtocol.contains("quic")) { + return Protocol.QUIC + } else if (negotiatedProtocol.contains("h3")) { + // TODO(danstahr): Should be h3 for newer OkHttp + return Protocol.QUIC + } else if (negotiatedProtocol.contains("spdy")) { + return Protocol.HTTP_2 + } else if (negotiatedProtocol.contains("h2")) { + return Protocol.HTTP_2 + } else if (negotiatedProtocol.contains("http/1.1")) { + return Protocol.HTTP_1_1 + } + + return Protocol.HTTP_1_0 + } + + /** Returns the last header value for the given name, or null if the header isn't present. */ + private fun getLastHeaderValue( + name: String?, + responseInfo: UrlResponseInfo, + ): String? { + val headers = responseInfo.headers.asMap[name] + if (headers == null || headers.isEmpty()) { + return null + } + return Iterables.getLast(headers) + } + + @Throws(IOException::class) + private fun getFutureValue(future: Future): T { + try { + return Uninterruptibles.getUninterruptibly(future) + } catch (e: ExecutionException) { + throw IOException(e) + } + } + } +} diff --git a/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/UploadBodyDataBroker.kt b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/UploadBodyDataBroker.kt new file mode 100644 index 000000000000..bb4dfa39789a --- /dev/null +++ b/okhttp/src/androidMain/kotlin/okhttp3/android/httpengine/UploadBodyDataBroker.kt @@ -0,0 +1,174 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okhttp3.android.httpengine + +import android.os.Build +import androidx.annotation.RequiresExtension +import com.google.common.util.concurrent.Futures +import com.google.common.util.concurrent.SettableFuture +import java.io.IOException +import java.nio.ByteBuffer +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.BlockingQueue +import java.util.concurrent.Future +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference +import kotlin.math.min +import okhttp3.internal.SuppressSignatureCheck +import okio.Buffer +import okio.Sink +import okio.Timeout + +@SuppressSignatureCheck +@RequiresExtension(extension = Build.VERSION_CODES.S, version = 7) +internal class UploadBodyDataBroker : Sink { + /** + * The read request calls to [android.net.http.UploadDataProvider.read] associated with this broker that we haven't started handling. + * + * + * We don't expect more than one parallel read call for a single request body provider. + */ + private val pendingRead: BlockingQueue>> = + ArrayBlockingQueue>>(1) + + /** + * Whether the sink has been closed. + * + * + * Calling close() has no practical use but we check that nobody tries to write to the sink + * after closing it, which is an indication of misuse. + */ + private val isClosed = AtomicBoolean() + + /** + * The exception thrown by the body reading background thread, if any. The exception will be + * rethrown every time someone attempts to continue reading the body. + */ + private val backgroundReadThrowable = AtomicReference() + + /** + * Indicates that Cronet is ready to receive another body part. + * + * + * This method is executed by Cronet's upload data provider. + */ + fun enqueueBodyRead(readBuffer: ByteBuffer): Future { + backgroundReadThrowable.get()?.let { + return Futures.immediateFailedFuture(it) + } + val future: SettableFuture = SettableFuture.create() + pendingRead.add(Pair(readBuffer, future)) + + // Properly handle interleaving handleBackgroundReadError / enqueueBodyRead calls. + backgroundReadThrowable.get()?.let { + future.setException(it) + } + return future + } + + /** + * Signals that reading the OkHttp body failed with the given throwable. + * + * This method is executed by the background OkHttp body reading thread. + */ + fun setBackgroundReadError(t: Throwable) { + backgroundReadThrowable.set(t) + pendingRead.poll()?.second?.setException(t) + } + + /** + * Signals that reading the body has ended and no future bytes will be sent. + * + * + * This method is executed by the background OkHttp body reading thread. + */ + @Throws(IOException::class) + fun handleEndOfStreamSignal() { + check(!isClosed.getAndSet(true)) { "Already closed" } + + this.pendingCronetRead.second.set(ReadResult.END_OF_BODY) + } + + /** + * {@inheritDoc} + * + * This method is executed by the background OkHttp body reading thread. + */ + override fun write( + source: Buffer, + byteCount: Long, + ) { + // This is just a safeguard, close() is a no-op if the body length contract is honored. + check(!isClosed.get()) + + var bytesRemaining = byteCount + + while (bytesRemaining != 0L) { + val payload: Pair> = + this.pendingCronetRead + + val readBuffer = payload.first + val future: SettableFuture = payload.second + + val originalBufferLimit = readBuffer.limit() + val bytesToDrain = min(originalBufferLimit.toLong(), bytesRemaining).toInt() + + readBuffer.limit(bytesToDrain) + + try { + val bytesRead = source.read(readBuffer).toLong() + if (bytesRead == -1L) { + val e = IOException("The source has been exhausted but we expected more!") + future.setException(e) + throw e + } + bytesRemaining -= bytesRead + readBuffer.limit(originalBufferLimit) + future.set(ReadResult.SUCCESS) + } catch (e: IOException) { + future.setException(e) + throw e + } + } + } + + private val pendingCronetRead: Pair> + get() { + try { + return pendingRead.take() + } catch (e: InterruptedException) { + Thread.currentThread().interrupt() + throw IOException("Interrupted while waiting for a read to finish!") + } + } + + override fun close() { + isClosed.set(true) + } + + override fun flush() { + // Not necessary, we "flush" by sending the data to Cronet straight away when write() is called. + // Note that this class is wrapped with a okio buffer so writes to the outer layer won't be + // seen by this class immediately. + } + + override fun timeout(): Timeout = Timeout.NONE + + internal enum class ReadResult { + SUCCESS, + END_OF_BODY, + } +} diff --git a/okhttp/src/commonJvmAndroid/kotlin/okhttp3/Call.kt b/okhttp/src/commonJvmAndroid/kotlin/okhttp3/Call.kt index fdd3d3da294e..371bd4c715e2 100644 --- a/okhttp/src/commonJvmAndroid/kotlin/okhttp3/Call.kt +++ b/okhttp/src/commonJvmAndroid/kotlin/okhttp3/Call.kt @@ -96,4 +96,33 @@ interface Call : Cloneable { fun interface Factory { fun newCall(request: Request): Call } + + /** + * The equivalent of an Interceptor for [Call.Factory], but supported directly within [OkHttpClient] newCall. + * + * An [Interceptor] forms a chain as part of execution of a Call. Instead, Call.Decorator intercepts + * [Call.Factory.newCall] with similar flexibility to Application [OkHttpClient.interceptors]. + * + * That is, it may do any of + * - Modify the request such as adding Tracing Context + * - Wrap the [Call] returned + * - Return some [Call] implementation that will immediately fail avoiding network calls based on network or + * authentication state. + * - Redirect the [Call], such as using an alternative [Call.Factory]. + * - Defer execution, something not safe in an Interceptor. + * + * It should not throw an exception, instead it should return a Call that will fail on [Call.execute]. + * + * A Decorator that changes the OkHttpClient should typically retain later decorators in the new client. + */ + fun interface Decorator { + fun newCall(chain: Chain): Call + } + + interface Chain { + val client: OkHttpClient + val request: Request + + fun proceed(request: Request): Call + } } diff --git a/okhttp/src/commonJvmAndroid/kotlin/okhttp3/OkHttpClient.kt b/okhttp/src/commonJvmAndroid/kotlin/okhttp3/OkHttpClient.kt index de3e75d5e701..39739f422061 100644 --- a/okhttp/src/commonJvmAndroid/kotlin/okhttp3/OkHttpClient.kt +++ b/okhttp/src/commonJvmAndroid/kotlin/okhttp3/OkHttpClient.kt @@ -145,6 +145,14 @@ open class OkHttpClient internal constructor( val interceptors: List = builder.interceptors.toImmutableList() + /** + * Returns an immutable list of Call decorators that have a chance to return a different, likely + * decorating, implementation of Call. This allows functionality such as fail fast without normal Call + * execution based on network conditions, or setting Tracing context on the calling thread. + */ + val callDecorators: List = + builder.callDecorators.toImmutableList() + /** * Returns an immutable list of interceptors that observe a single network request and response. * These interceptors must call [Interceptor.Chain.proceed] exactly once: it is an error for @@ -265,6 +273,27 @@ open class OkHttpClient internal constructor( internal val routeDatabase: RouteDatabase = builder.routeDatabase ?: RouteDatabase() internal val taskRunner: TaskRunner = builder.taskRunner ?: TaskRunner.INSTANCE + private val decoratedCallFactory = + callDecorators.foldRight( + Call.Factory { request -> + RealCall(client = this, originalRequest = request, forWebSocket = false) + }, + ) { callDecorator, next -> + Call.Factory { request -> + callDecorator.newCall( + object : Call.Chain { + override val client: OkHttpClient + get() = this@OkHttpClient + + override val request: Request + get() = request + + override fun proceed(request: Request): Call = next.newCall(request) + }, + ) + } + } + @get:JvmName("connectionPool") val connectionPool: ConnectionPool = builder.connectionPool ?: ConnectionPool( @@ -359,7 +388,7 @@ open class OkHttpClient internal constructor( } /** Prepares the [request] to be executed at some point in the future. */ - override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false) + override fun newCall(request: Request): Call = decoratedCallFactory.newCall(request) /** Uses [request] to connect a new web socket. */ override fun newWebSocket( @@ -596,6 +625,7 @@ open class OkHttpClient internal constructor( internal var dispatcher: Dispatcher = Dispatcher() internal var connectionPool: ConnectionPool? = null internal val interceptors: MutableList = mutableListOf() + internal val callDecorators: MutableList = mutableListOf() internal val networkInterceptors: MutableList = mutableListOf() internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory() internal var retryOnConnectionFailure = true @@ -631,6 +661,7 @@ open class OkHttpClient internal constructor( this.dispatcher = okHttpClient.dispatcher this.connectionPool = okHttpClient.connectionPool this.interceptors += okHttpClient.interceptors + this.callDecorators += okHttpClient.callDecorators this.networkInterceptors += okHttpClient.networkInterceptors this.eventListenerFactory = okHttpClient.eventListenerFactory this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure @@ -735,6 +766,11 @@ open class OkHttpClient internal constructor( this.eventListenerFactory = eventListenerFactory } + fun addCallDecorator(decorator: Call.Decorator) = + apply { + callDecorators += decorator + } + /** * Configure this client to retry or not when a connectivity problem is encountered. By default, * this client silently recovers from the following problems: