Skip to content

Commit

Permalink
Merge pull request #3087 from square/bquenaudon.2024-09-04.trailers
Browse files Browse the repository at this point in the history
Don't throw if reading trailers fail
  • Loading branch information
oldergod authored Sep 9, 2024
2 parents 2116bb1 + 9f5f003 commit b48efe1
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,12 @@ private fun Response.checkGrpcResponse() {
internal fun Response.grpcResponseToException(suppressed: IOException? = null): IOException? {
var trailers = headersOf()
var transportException = suppressed
try {
trailers = trailers()
} catch (e: IOException) {
if (transportException == null) transportException = e
if (suppressed == null) {
try {
trailers = trailers()
} catch (e: IOException) {
transportException = e
}
}

val grpcStatus = trailers["grpc-status"] ?: header("grpc-status")
Expand Down
53 changes: 53 additions & 0 deletions wire-grpc-tests/src/test/java/com/squareup/wire/GrpcClientTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ import okhttp3.ResponseBody
import okhttp3.ResponseBody.Companion.toResponseBody
import okio.Buffer
import okio.ByteString
import okio.ForwardingSource
import okio.IOException
import okio.buffer
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Assert.assertThrows
Expand Down Expand Up @@ -94,13 +96,20 @@ class GrpcClientTest {
override fun intercept(chain: Chain) = chain.proceed(chain.request())
}

private var networkInterceptor: Interceptor = object : Interceptor {
override fun intercept(chain: Chain) = chain.proceed(chain.request())
}

@Before
fun setUp() {
okhttpClient = OkHttpClient.Builder()
.addInterceptor { chain ->
callReference.set(chain.call())
interceptor.intercept(chain)
}
.addNetworkInterceptor { chain ->
networkInterceptor.intercept(chain)
}
.protocols(listOf(H2_PRIOR_KNOWLEDGE))
.callTimeout(okHttpClientTimeout)
.build()
Expand Down Expand Up @@ -555,6 +564,50 @@ class GrpcClientTest {
}
}

/**
* Force an IOException in the gRPC response body stream, and confirm that Wire fails with the
* right exception (IOException). We've had bugs where we incorrectly attempt to read trailers
* while handling an exception.
*/
@Test
fun duplexSuspend_responseBodyThrows() {
mockService.enqueue(ReceiveCall("/routeguide.RouteGuide/RouteChat"))
mockService.enqueueSendNote(message = "marco")

// This interceptor throws an exception at the start of the stream instead of returning -1L.
networkInterceptor = object : Interceptor {
override fun intercept(chain: Chain): Response {
val response = chain.proceed(chain.request())
return response.newBuilder()
.body(object : ResponseBody() {
override fun contentLength() = response.body.contentLength()
override fun contentType() = response.body.contentType()
override fun source() = object : ForwardingSource(response.body.source()) {
override fun read(sink: Buffer, byteCount: Long) = throw IOException("boom!")
}.buffer()
})
.build()
}
}

runBlocking {
val (_, responseChannel) = routeGuideService.RouteChat().executeIn(this)

try {
responseChannel.receive()
fail()
} catch (expected: IOException) {
assertThat(expected)
.hasMessage("gRPC transport failure (HTTP status=200, grpc-status=null, grpc-message=null)")
.cause() // Synthetic exception added by coroutines in StackTraceRecovery.kt.
.cause()
.hasMessage("boom!")
}

assertThat(responseChannel.isClosedForReceive).isTrue()
}
}

@Test
fun duplexSuspend_responseChannelThrowsWhenCallCanceledByClient() {
mockService.enqueue(ReceiveCall("/routeguide.RouteGuide/RouteChat"))
Expand Down

0 comments on commit b48efe1

Please sign in to comment.