8383import java .util .Locale ;
8484import java .util .Map ;
8585import java .util .Random ;
86+ import java .util .concurrent .BrokenBarrierException ;
8687import java .util .concurrent .CountDownLatch ;
88+ import java .util .concurrent .CyclicBarrier ;
8789import java .util .concurrent .Executor ;
8890import java .util .concurrent .ScheduledExecutorService ;
91+ import java .util .concurrent .TimeUnit ;
92+ import java .util .concurrent .TimeoutException ;
8993import java .util .logging .Level ;
9094import java .util .logging .Logger ;
9195import javax .annotation .Nullable ;
@@ -499,8 +503,15 @@ public Runnable start(Listener listener) {
499503 outboundFlow = new OutboundFlowController (this , frameWriter );
500504 }
501505 final CountDownLatch latch = new CountDownLatch (1 );
506+ final CountDownLatch latchForExtraThread = new CountDownLatch (1 );
507+ // The transport needs up to two threads to function once started,
508+ // but only needs one during handshaking. Start another thread during handshaking
509+ // to make sure there's still a free thread available. If the number of threads is exhausted,
510+ // it is better to kill the transport than for all the transports to hang unable to send.
511+ CyclicBarrier barrier = new CyclicBarrier (2 );
502512 // Connecting in the serializingExecutor, so that some stream operations like synStream
503513 // will be executed after connected.
514+
504515 serializingExecutor .execute (new Runnable () {
505516 @ Override
506517 public void run () {
@@ -510,8 +521,14 @@ public void run() {
510521 // initial preface.
511522 try {
512523 latch .await ();
524+ barrier .await (1000 , TimeUnit .MILLISECONDS );
513525 } catch (InterruptedException e ) {
514526 Thread .currentThread ().interrupt ();
527+ } catch (TimeoutException | BrokenBarrierException e ) {
528+ startGoAway (0 , ErrorCode .INTERNAL_ERROR , Status .UNAVAILABLE
529+ .withDescription ("Timed out waiting for second handshake thread. "
530+ + "The transport executor pool may have run out of threads" ));
531+ return ;
515532 }
516533 // Use closed source on failure so that the reader immediately shuts down.
517534 BufferedSource source = Okio .buffer (new Source () {
@@ -575,6 +592,7 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()
575592 return ;
576593 } finally {
577594 clientFrameHandler = new ClientFrameHandler (variant .newReader (source , true ));
595+ latchForExtraThread .countDown ();
578596 }
579597 synchronized (lock ) {
580598 socket = Preconditions .checkNotNull (sock , "socket" );
@@ -584,6 +602,21 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()
584602 }
585603 }
586604 });
605+
606+ executor .execute (new Runnable () {
607+ @ Override
608+ public void run () {
609+ try {
610+ barrier .await (1000 , TimeUnit .MILLISECONDS );
611+ latchForExtraThread .await ();
612+ } catch (BrokenBarrierException | TimeoutException e ) {
613+ // Something bad happened, maybe too few threads available!
614+ // This will be handled in the handshake thread.
615+ } catch (InterruptedException e ) {
616+ Thread .currentThread ().interrupt ();
617+ }
618+ }
619+ });
587620 // Schedule to send connection preface & settings before any other write.
588621 try {
589622 sendConnectionPrefaceAndSettings ();
0 commit comments