Skip to content

Commit a0b0985

Browse files
authored
[HTTP2StateMachine] test and fix HTTP2 go away (#452)
1 parent a57c4b3 commit a0b0985

File tree

4 files changed

+148
-18
lines changed

4 files changed

+148
-18
lines changed

Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -371,19 +371,20 @@ extension HTTPConnectionPool {
371371
/// This will put the connection into the idle state.
372372
///
373373
/// - Parameter connection: The new established connection.
374-
/// - Returns: An index and an ``AvailableConnectionContext`` to determine the next action for the now idle connection.
374+
/// - Returns: An index and an ``EstablishedConnectionContext`` to determine the next action for the now idle connection.
375375
/// Call ``leaseStreams(at:count:)`` or ``closeConnection(at:)`` with the supplied index after
376376
/// this.
377-
mutating func newHTTP2ConnectionEstablished(_ connection: Connection, maxConcurrentStreams: Int) -> (Int, AvailableConnectionContext) {
377+
mutating func newHTTP2ConnectionEstablished(_ connection: Connection, maxConcurrentStreams: Int) -> (Int, EstablishedConnectionContext) {
378378
guard let index = self.connections.firstIndex(where: { $0.connectionID == connection.id }) else {
379379
preconditionFailure("There is a new connection that we didn't request!")
380380
}
381381
precondition(connection.eventLoop === self.connections[index].eventLoop, "Expected the new connection to be on EL")
382382
let availableStreams = self.connections[index].connected(connection, maxStreams: maxConcurrentStreams)
383-
let context = AvailableConnectionContext(
383+
let context = EstablishedConnectionContext(
384384
availableStreams: availableStreams,
385385
eventLoop: connection.eventLoop,
386-
isIdle: self.connections[index].isIdle
386+
isIdle: self.connections[index].isIdle,
387+
connectionID: connection.id
387388
)
388389
return (index, context)
389390
}
@@ -419,20 +420,21 @@ extension HTTPConnectionPool {
419420
/// - Parameters:
420421
/// - connectionID: The connectionID for which we received new settings
421422
/// - newMaxStreams: new maximum concurrent streams
422-
/// - Returns: index of the connection and new number of available streams in the `AvailableConnectionContext`
423+
/// - Returns: index of the connection and new number of available streams in the `EstablishedConnectionContext`
423424
/// - Precondition: Connections must be in the `.active` or `.draining` state.
424425
mutating func newHTTP2MaxConcurrentStreamsReceived(
425426
_ connectionID: Connection.ID,
426427
newMaxStreams: Int
427-
) -> (Int, AvailableConnectionContext) {
428+
) -> (Int, EstablishedConnectionContext) {
428429
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
429430
preconditionFailure("We tried to update the maximum number of concurrent streams for a connection that does not exists")
430431
}
431432
let availableStreams = self.connections[index].newMaxConcurrentStreams(newMaxStreams)
432-
let context = AvailableConnectionContext(
433+
let context = EstablishedConnectionContext(
433434
availableStreams: availableStreams,
434435
eventLoop: self.connections[index].eventLoop,
435-
isIdle: self.connections[index].isIdle
436+
isIdle: self.connections[index].isIdle,
437+
connectionID: connectionID
436438
)
437439
return (index, context)
438440
}
@@ -471,25 +473,27 @@ extension HTTPConnectionPool {
471473
/// lease `count` streams after connections establishment
472474
/// - Parameters:
473475
/// - index: index of the connection you got by calling `newHTTP2ConnectionEstablished(_:maxConcurrentStreams:)`
474-
/// - count: number of streams you want to lease. You get the current available streams from the `AvailableConnectionContext` which `newHTTP2ConnectionEstablished(_:maxConcurrentStreams:)` returns
476+
/// - count: number of streams you want to lease. You get the current available streams from the `EstablishedConnectionContext` which `newHTTP2ConnectionEstablished(_:maxConcurrentStreams:)` returns
475477
/// - Returns: connection to execute `count` requests on
476-
/// - precondition: `index` needs to be valid. `count` must be greater than or equal to *0* and not execeed the number of available streams.
478+
/// - precondition: `index` needs to be valid. `count` must be greater than or equal to *1* and not exceed the number of available streams.
477479
mutating func leaseStreams(at index: Int, count: Int) -> (Connection, LeasedStreamContext) {
480+
precondition(count >= 1, "stream lease count must be greater than or equal to 1")
478481
let isIdle = self.connections[index].isIdle
479482
let connection = self.connections[index].lease(count)
480483
let context = LeasedStreamContext(wasIdle: isIdle)
481484
return (connection, context)
482485
}
483486

484-
mutating func releaseStream(_ connectionID: Connection.ID) -> (Int, AvailableConnectionContext) {
487+
mutating func releaseStream(_ connectionID: Connection.ID) -> (Int, EstablishedConnectionContext) {
485488
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
486489
preconditionFailure("We tried to release a connection we do not know anything about")
487490
}
488491
let availableStreams = self.connections[index].release()
489-
let context = AvailableConnectionContext(
492+
let context = EstablishedConnectionContext(
490493
availableStreams: availableStreams,
491494
eventLoop: self.connections[index].eventLoop,
492-
isIdle: self.connections[index].isIdle
495+
isIdle: self.connections[index].isIdle,
496+
connectionID: connectionID
493497
)
494498
return (index, context)
495499
}
@@ -567,14 +571,16 @@ extension HTTPConnectionPool {
567571

568572
// MARK: Result structs
569573

570-
/// Information around an available connection
571-
struct AvailableConnectionContext {
574+
/// Information around a connection which is either in the .active or .draining state.
575+
struct EstablishedConnectionContext {
572576
/// number of streams which can be leased
573577
var availableStreams: Int
574578
/// The eventLoop the connection is running on.
575579
var eventLoop: EventLoop
576580
/// true if no stream is leased
577581
var isIdle: Bool
582+
/// id of the connection
583+
var connectionID: Connection.ID
578584
}
579585

580586
struct LeasedStreamContext {

Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ extension HTTPConnectionPool {
184184

185185
private mutating func nextActionForAvailableConnection(
186186
at index: Int,
187-
context: HTTP2Connections.AvailableConnectionContext
187+
context: HTTP2Connections.EstablishedConnectionContext
188188
) -> Action {
189189
switch self.state {
190190
case .running:
@@ -196,19 +196,22 @@ extension HTTPConnectionPool {
196196
let remainingAvailableStreams = context.availableStreams - requestsToExecute.count
197197
// use the remaining available streams for requests without a required event loop
198198
requestsToExecute += self.requests.popFirst(max: remainingAvailableStreams, for: nil)
199-
let (connection, _) = self.connections.leaseStreams(at: index, count: requestsToExecute.count)
200199

201200
let requestAction = { () -> RequestAction in
202201
if requestsToExecute.isEmpty {
203202
return .none
204203
} else {
204+
// we can only lease streams if the connection has available streams.
205+
// Otherwise we might crash even if we try to lease zero streams,
206+
// because the connection might already be in the draining state.
207+
let (connection, _) = self.connections.leaseStreams(at: index, count: requestsToExecute.count)
205208
return .executeRequestsAndCancelTimeouts(requestsToExecute, connection)
206209
}
207210
}()
208211

209212
let connectionAction = { () -> ConnectionAction in
210213
if context.isIdle, requestsToExecute.isEmpty {
211-
return .scheduleTimeoutTimer(connection.id, on: context.eventLoop)
214+
return .scheduleTimeoutTimer(context.connectionID, on: context.eventLoop)
212215
} else {
213216
return .none
214217
}

Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ extension HTTPConnectionPool_HTTP2StateMachineTests {
3333
("testSchedulingAndCancelingOfIdleTimeout", testSchedulingAndCancelingOfIdleTimeout),
3434
("testConnectionTimeout", testConnectionTimeout),
3535
("testConnectionEstablishmentFailure", testConnectionEstablishmentFailure),
36+
("testGoAwayOnIdleConnection", testGoAwayOnIdleConnection),
37+
("testGoAwayWithLeasedStream", testGoAwayWithLeasedStream),
38+
("testGoAwayWithPendingRequestsStartsNewConnection", testGoAwayWithPendingRequestsStartsNewConnection),
3639
]
3740
}
3841
}

Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,4 +435,122 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase {
435435
}
436436
XCTAssertEqual(eventLoop.id, el1.id)
437437
}
438+
439+
func testGoAwayOnIdleConnection() {
440+
let elg = EmbeddedEventLoopGroup(loops: 1)
441+
let el1 = elg.next()
442+
443+
// establish one idle http2 connection
444+
let idGenerator = HTTPConnectionPool.Connection.ID.Generator()
445+
var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator)
446+
let conn1ID = http1Conns.createNewConnection(on: el1)
447+
var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator)
448+
let migrationAction = state.migrateConnectionsFromHTTP1(
449+
connections: http1Conns,
450+
requests: HTTPConnectionPool.RequestQueue()
451+
)
452+
XCTAssertEqual(migrationAction, .none)
453+
let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1)
454+
let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100)
455+
XCTAssertEqual(connectAction.request, .none)
456+
XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1))
457+
458+
let goAwayAction = state.http2ConnectionGoAwayReceived(conn1ID)
459+
XCTAssertEqual(goAwayAction.request, .none)
460+
XCTAssertEqual(goAwayAction.connection, .none, "Connection is automatically closed by HTTP2IdleHandler")
461+
}
462+
463+
func testGoAwayWithLeasedStream() {
464+
let elg = EmbeddedEventLoopGroup(loops: 1)
465+
let el1 = elg.next()
466+
467+
// establish one idle http2 connection
468+
let idGenerator = HTTPConnectionPool.Connection.ID.Generator()
469+
var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator)
470+
let conn1ID = http1Conns.createNewConnection(on: el1)
471+
var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator)
472+
let migrationAction = state.migrateConnectionsFromHTTP1(
473+
connections: http1Conns,
474+
requests: HTTPConnectionPool.RequestQueue()
475+
)
476+
XCTAssertEqual(migrationAction, .none)
477+
let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1)
478+
let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100)
479+
XCTAssertEqual(connectAction.request, .none)
480+
XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1))
481+
482+
// execute request on idle connection
483+
let mockRequest1 = MockHTTPRequest(eventLoop: el1)
484+
let request1 = HTTPConnectionPool.Request(mockRequest1)
485+
let request1Action = state.executeRequest(request1)
486+
XCTAssertEqual(request1Action.request, .executeRequest(request1, conn1, cancelTimeout: false))
487+
XCTAssertEqual(request1Action.connection, .cancelTimeoutTimer(conn1ID))
488+
489+
let goAwayAction = state.http2ConnectionGoAwayReceived(conn1ID)
490+
XCTAssertEqual(goAwayAction.request, .none)
491+
XCTAssertEqual(goAwayAction.connection, .none)
492+
493+
// close stream
494+
let closeStream1Action = state.http2ConnectionStreamClosed(conn1ID)
495+
XCTAssertEqual(closeStream1Action.request, .none)
496+
XCTAssertEqual(closeStream1Action.connection, .none, "Connection is automatically closed by HTTP2IdleHandler")
497+
}
498+
499+
func testGoAwayWithPendingRequestsStartsNewConnection() {
500+
let elg = EmbeddedEventLoopGroup(loops: 1)
501+
let el1 = elg.next()
502+
503+
// establish one idle http2 connection
504+
let idGenerator = HTTPConnectionPool.Connection.ID.Generator()
505+
var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator)
506+
let conn1ID = http1Conns.createNewConnection(on: el1)
507+
var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator)
508+
let migrationAction = state.migrateConnectionsFromHTTP1(
509+
connections: http1Conns,
510+
requests: HTTPConnectionPool.RequestQueue()
511+
)
512+
XCTAssertEqual(migrationAction, .none)
513+
let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1)
514+
let connectAction1 = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 1)
515+
XCTAssertEqual(connectAction1.request, .none)
516+
XCTAssertEqual(connectAction1.connection, .scheduleTimeoutTimer(conn1ID, on: el1))
517+
518+
// execute request
519+
let mockRequest1 = MockHTTPRequest(eventLoop: el1)
520+
let request1 = HTTPConnectionPool.Request(mockRequest1)
521+
let request1Action = state.executeRequest(request1)
522+
XCTAssertEqual(request1Action.request, .executeRequest(request1, conn1, cancelTimeout: false))
523+
XCTAssertEqual(request1Action.connection, .cancelTimeoutTimer(conn1ID))
524+
525+
// queue request
526+
let mockRequest2 = MockHTTPRequest(eventLoop: el1)
527+
let request2 = HTTPConnectionPool.Request(mockRequest2)
528+
let request2Action = state.executeRequest(request2)
529+
XCTAssertEqual(request2Action.request, .scheduleRequestTimeout(for: request2, on: el1))
530+
XCTAssertEqual(request2Action.connection, .none)
531+
532+
// go away should create a new connection
533+
let goAwayAction = state.http2ConnectionGoAwayReceived(conn1ID)
534+
XCTAssertEqual(goAwayAction.request, .none)
535+
guard case .createConnection(let conn2ID, let eventLoop) = goAwayAction.connection else {
536+
return XCTFail("unexpected connection action \(goAwayAction.connection)")
537+
}
538+
XCTAssertEqual(el1.id, eventLoop.id)
539+
540+
// new connection should execute pending request
541+
let conn2 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn2ID, eventLoop: el1)
542+
let connectAction2 = state.newHTTP2ConnectionEstablished(conn2, maxConcurrentStreams: 1)
543+
XCTAssertEqual(connectAction2.request, .executeRequestsAndCancelTimeouts([request2], conn2))
544+
XCTAssertEqual(connectAction2.connection, .none)
545+
546+
// close stream from conn1
547+
let closeStream1Action = state.http2ConnectionStreamClosed(conn1ID)
548+
XCTAssertEqual(closeStream1Action.request, .none)
549+
XCTAssertEqual(closeStream1Action.connection, .none, "Connection is automatically closed by HTTP2IdleHandler")
550+
551+
// close stream from conn2
552+
let closeStream2Action = state.http2ConnectionStreamClosed(conn2ID)
553+
XCTAssertEqual(closeStream2Action.request, .none)
554+
XCTAssertEqual(closeStream2Action.connection, .scheduleTimeoutTimer(conn2ID, on: el1))
555+
}
438556
}

0 commit comments

Comments
 (0)