Skip to content

Commit f528c70

Browse files
committed
netty: Associate netty stream eagerly to avoid client hang
In grpc#12185, RPCs were randomly hanging. In grpc#12207 this was tracked down to the headers promise completing successfully, but the netty stream was null. This was because the headers write hadn't completed but stream.close() had been called by goingAway().
1 parent 6dfa03c commit f528c70

File tree

2 files changed

+33
-0
lines changed

2 files changed

+33
-0
lines changed

netty/src/main/java/io/grpc/netty/NettyClientHandler.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,19 @@ public void operationComplete(ChannelFuture future) throws Exception {
768768
}
769769
}
770770
});
771+
// When the HEADERS are not buffered because of MAX_CONCURRENT_STREAMS in
772+
// StreamBufferingEncoder, the stream is created immediately even if the bytes of the HEADERS
773+
// are delayed because the OS may have too much buffered and isn't accepting the write. The
774+
// write promise is also delayed until flush(). However, we need to associate the netty stream
775+
// with the transport state so that goingAway() and forcefulClose() and able to notify the
776+
// stream of failures.
777+
//
778+
// This leaves a hole when MAX_CONCURRENT_STREAMS is reached, as http2Stream will be null, but
779+
// it is better than nothing.
780+
Http2Stream http2Stream = connection().stream(streamId);
781+
if (http2Stream != null) {
782+
http2Stream.setProperty(streamKey, stream);
783+
}
771784
}
772785

773786
/**

netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,26 @@ public void receivedAbruptGoAwayShouldFailRacingQueuedStreamid() throws Exceptio
453453
assertTrue(future.isDone());
454454
}
455455

456+
@Test
457+
public void receivedAbruptGoAwayShouldFailRacingQueuedIoStreamid() throws Exception {
458+
// Purposefully avoid flush(), since we want the write to not actually complete.
459+
// EmbeddedChannel doesn't support flow control, so this is the next closest approximation.
460+
ChannelFuture future = channel().write(
461+
newCreateStreamCommand(grpcHeaders, streamTransportState));
462+
// Read a GOAWAY that indicates our stream can't be sent
463+
channelRead(goAwayFrame(0, 0 /* NO_ERROR */, Unpooled.copiedBuffer("this is a test", UTF_8)));
464+
465+
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
466+
verify(streamListener).closed(captor.capture(), same(REFUSED),
467+
ArgumentMatchers.<Metadata>notNull());
468+
assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode());
469+
assertEquals(
470+
"Abrupt GOAWAY closed sent stream. HTTP/2 error code: NO_ERROR, "
471+
+ "debug data: this is a test",
472+
captor.getValue().getDescription());
473+
assertTrue(future.isDone());
474+
}
475+
456476
@Test
457477
public void receivedGoAway_shouldFailBufferedStreamsExceedingMaxConcurrentStreams()
458478
throws Exception {

0 commit comments

Comments
 (0)