@@ -18,11 +18,8 @@ private enum ShutdownError: Error {
1818 case alreadyShutdown
1919}
2020
21- /// Collects a number of channels that are open at the moment. To prevent races, `ChannelCollector` uses the
22- /// `EventLoop` of the server `Channel` that it gets passed to synchronise. It is important to call the
23- /// `channelAdded` method in the same event loop tick as the `Channel` is actually created.
24- private final class ChannelCollector {
25- enum LifecycleState {
21+ private struct LifecycleStateMachine : ~ Copyable {
22+ enum LifecycleState : ~ Copyable {
2623 case upAndRunning(
2724 openChannels: [ ObjectIdentifier : Channel ] ,
2825 serverChannel: Channel
@@ -34,89 +31,176 @@ private final class ChannelCollector {
3431 case shutdownCompleted
3532 }
3633
37- private var lifecycleState : LifecycleState
38-
39- private let eventLoop : EventLoop
34+ private var state : LifecycleState
4035
41- /// Initializes a `ChannelCollector` for `Channel`s accepted by `serverChannel`.
42- init ( serverChannel: Channel ) {
43- self . eventLoop = serverChannel. eventLoop
44- self . lifecycleState = . upAndRunning( openChannels: [ : ] , serverChannel: serverChannel)
36+ init ( serverChannel: any Channel ) {
37+ self . state = . upAndRunning( openChannels: [ : ] , serverChannel: serverChannel)
4538 }
4639
47- /// Add a channel to the `ChannelCollector`.
48- ///
49- /// - note: This must be called on `serverChannel.eventLoop`.
50- ///
51- /// - parameters:
52- /// - channel: The `Channel` to add to the `ChannelCollector`.
53- func channelAdded( _ channel: Channel ) throws {
54- self . eventLoop. assertInEventLoop ( )
40+ private init ( _ state: consuming LifecycleState ) {
41+ self . state = state
42+ }
5543
56- switch self . lifecycleState {
44+ enum ChannelAddedAction : ~ Copyable {
45+ case fireChannelShouldQuiesce
46+ case closeChannelAndThrowError
47+ }
48+ mutating func channelAdded( _ channel: any Channel ) -> ChannelAddedAction ? {
49+ switch consume self. state {
5750 case . upAndRunning( var openChannels, let serverChannel) :
5851 openChannels [ ObjectIdentifier ( channel) ] = channel
59- self . lifecycleState = . upAndRunning( openChannels: openChannels, serverChannel: serverChannel)
52+ self = . init( . upAndRunning( openChannels: openChannels, serverChannel: serverChannel) )
53+ return nil
6054
6155 case . shuttingDown( var openChannels, let fullyShutdownPromise) :
6256 openChannels [ ObjectIdentifier ( channel) ] = channel
63- channel. eventLoop. execute {
64- channel. pipeline. fireUserInboundEventTriggered ( ChannelShouldQuiesceEvent ( ) )
65- }
66- self . lifecycleState = . shuttingDown( openChannels: openChannels, fullyShutdownPromise: fullyShutdownPromise)
57+ self = . init( . shuttingDown( openChannels: openChannels, fullyShutdownPromise: fullyShutdownPromise) )
58+ return . fireChannelShouldQuiesce
6759
6860 case . shutdownCompleted:
69- channel . close ( promise : nil )
70- throw ShutdownError . alreadyShutdown
61+ self = . init ( . shutdownCompleted )
62+ return . closeChannelAndThrowError
7163 }
7264 }
7365
74- private func shutdownCompleted( ) {
75- self . eventLoop. assertInEventLoop ( )
76-
77- switch self . lifecycleState {
66+ enum ShutdownCompletedAction : ~ Copyable {
67+ case succeedShutdownPromise( EventLoopPromise < Void > )
68+ }
69+ mutating func shutdownCompleted( ) -> ShutdownCompletedAction {
70+ switch consume self. state {
7871 case . upAndRunning:
7972 preconditionFailure ( " This can never happen because we transition to shuttingDown first " )
8073
8174 case . shuttingDown( _, let fullyShutdownPromise) :
82- self . lifecycleState = . shutdownCompleted
83- fullyShutdownPromise . succeed ( ( ) )
75+ self = . init ( . shutdownCompleted)
76+ return . succeedShutdownPromise ( fullyShutdownPromise )
8477
8578 case . shutdownCompleted:
8679 preconditionFailure ( " We should only complete the shutdown once " )
8780 }
8881 }
8982
90- private func channelRemoved0( _ channel: Channel ) {
91- self . eventLoop. assertInEventLoop ( )
92-
93- switch self . lifecycleState {
83+ mutating func channelRemoved( _ channel: any Channel ) -> ShutdownCompletedAction ? {
84+ switch consume self. state {
9485 case . upAndRunning( var openChannels, let serverChannel) :
9586 let removedChannel = openChannels. removeValue ( forKey: ObjectIdentifier ( channel) )
9687
97- precondition ( removedChannel != nil , " channel \( channel ) not in ChannelCollector \( openChannels ) " )
88+ precondition ( removedChannel != nil , " channel not in ChannelCollector " )
9889
99- self . lifecycleState = . upAndRunning( openChannels: openChannels, serverChannel: serverChannel)
90+ self = . init( . upAndRunning( openChannels: openChannels, serverChannel: serverChannel) )
91+ return nil
10092
10193 case . shuttingDown( var openChannels, let fullyShutdownPromise) :
10294 let removedChannel = openChannels. removeValue ( forKey: ObjectIdentifier ( channel) )
10395
104- precondition ( removedChannel != nil , " channel \( channel ) not in ChannelCollector \( openChannels ) " )
96+ precondition ( removedChannel != nil , " channel not in ChannelCollector " )
10597
106- if openChannels. isEmpty {
107- self . shutdownCompleted ( )
108- } else {
109- self . lifecycleState = . shuttingDown(
98+ self = . init(
99+ . shuttingDown(
110100 openChannels: openChannels,
111101 fullyShutdownPromise: fullyShutdownPromise
112102 )
103+ )
104+
105+ if openChannels. isEmpty {
106+ return self . shutdownCompleted ( )
107+ } else {
108+ return nil
113109 }
114110
115111 case . shutdownCompleted:
116112 preconditionFailure ( " We should not have channels removed after transitioned to completed " )
117113 }
118114 }
119115
116+ enum InitiateShutdownAction : ~ Copyable {
117+ case fireQuiesceEvents(
118+ serverChannel: any Channel ,
119+ fullyShutdownPromise: EventLoopPromise < Void > ,
120+ openChannels: [ ObjectIdentifier : any Channel ]
121+ )
122+ case cascadePromise( fullyShutdownPromise: EventLoopPromise < Void > , cascadeTo: EventLoopPromise < Void > ? )
123+ case succeedPromise
124+ }
125+ mutating func initiateShutdown( _ promise: consuming EventLoopPromise < Void > ? ) -> InitiateShutdownAction ? {
126+ switch consume self. state {
127+ case . upAndRunning( let openChannels, let serverChannel) :
128+ let fullyShutdownPromise = promise ?? serverChannel. eventLoop. makePromise ( of: Void . self)
129+
130+ self = . init( . shuttingDown( openChannels: openChannels, fullyShutdownPromise: fullyShutdownPromise) )
131+ return . fireQuiesceEvents(
132+ serverChannel: serverChannel,
133+ fullyShutdownPromise: fullyShutdownPromise,
134+ openChannels: openChannels
135+ )
136+
137+ case . shuttingDown( let openChannels, let fullyShutdownPromise) :
138+ self = . init( . shuttingDown( openChannels: openChannels, fullyShutdownPromise: fullyShutdownPromise) )
139+ return . cascadePromise( fullyShutdownPromise: fullyShutdownPromise, cascadeTo: promise)
140+
141+ case . shutdownCompleted:
142+ self = . init( . shutdownCompleted)
143+ return . succeedPromise
144+ }
145+ }
146+ }
147+
148+ /// Collects a number of channels that are open at the moment. To prevent races, `ChannelCollector` uses the
149+ /// `EventLoop` of the server `Channel` that it gets passed to synchronise. It is important to call the
150+ /// `channelAdded` method in the same event loop tick as the `Channel` is actually created.
151+ private final class ChannelCollector {
152+ private var lifecycleState : LifecycleStateMachine
153+
154+ private let eventLoop : EventLoop
155+
156+ /// Initializes a `ChannelCollector` for `Channel`s accepted by `serverChannel`.
157+ init ( serverChannel: Channel ) {
158+ self . eventLoop = serverChannel. eventLoop
159+ self . lifecycleState = LifecycleStateMachine ( serverChannel: serverChannel)
160+ }
161+
162+ /// Add a channel to the `ChannelCollector`.
163+ ///
164+ /// - note: This must be called on `serverChannel.eventLoop`.
165+ ///
166+ /// - parameters:
167+ /// - channel: The `Channel` to add to the `ChannelCollector`.
168+ func channelAdded( _ channel: Channel ) throws {
169+ self . eventLoop. assertInEventLoop ( )
170+
171+ switch self . lifecycleState. channelAdded ( channel) {
172+ case . none:
173+ ( )
174+ case . fireChannelShouldQuiesce:
175+ channel. eventLoop. execute {
176+ channel. pipeline. fireUserInboundEventTriggered ( ChannelShouldQuiesceEvent ( ) )
177+ }
178+ case . closeChannelAndThrowError:
179+ channel. close ( promise: nil )
180+ throw ShutdownError . alreadyShutdown
181+ }
182+ }
183+
184+ private func shutdownCompleted( ) {
185+ self . eventLoop. assertInEventLoop ( )
186+
187+ switch self . lifecycleState. shutdownCompleted ( ) {
188+ case . succeedShutdownPromise( let promise) :
189+ promise. succeed ( )
190+ }
191+ }
192+
193+ private func channelRemoved0( _ channel: Channel ) {
194+ self . eventLoop. assertInEventLoop ( )
195+
196+ switch self . lifecycleState. channelRemoved ( channel) {
197+ case . none:
198+ ( )
199+ case . succeedShutdownPromise( let promise) :
200+ promise. succeed ( )
201+ }
202+ }
203+
120204 /// Remove a previously added `Channel` from the `ChannelCollector`.
121205 ///
122206 /// - note: This method can be called from any thread.
@@ -136,12 +220,10 @@ private final class ChannelCollector {
136220 private func initiateShutdown0( promise: EventLoopPromise < Void > ? ) {
137221 self . eventLoop. assertInEventLoop ( )
138222
139- switch self . lifecycleState {
140- case . upAndRunning( let openChannels, let serverChannel) :
141- let fullyShutdownPromise = promise ?? serverChannel. eventLoop. makePromise ( of: Void . self)
142-
143- self . lifecycleState = . shuttingDown( openChannels: openChannels, fullyShutdownPromise: fullyShutdownPromise)
144-
223+ switch self . lifecycleState. initiateShutdown ( promise) {
224+ case . none:
225+ ( )
226+ case . fireQuiesceEvents( let serverChannel, let fullyShutdownPromise, let openChannels) :
145227 serverChannel. pipeline. fireUserInboundEventTriggered ( ChannelShouldQuiesceEvent ( ) )
146228 serverChannel. close ( ) . cascadeFailure ( to: fullyShutdownPromise)
147229
@@ -155,10 +237,10 @@ private final class ChannelCollector {
155237 self . shutdownCompleted ( )
156238 }
157239
158- case . shuttingDown ( _ , let fullyShutdownPromise ) :
159- fullyShutdownPromise. futureResult. cascade ( to: promise )
240+ case . cascadePromise ( let fullyShutdownPromise , let cascadeTo ) :
241+ fullyShutdownPromise. futureResult. cascade ( to: cascadeTo )
160242
161- case . shutdownCompleted :
243+ case . succeedPromise :
162244 promise? . succeed ( ( ) )
163245 }
164246 }
0 commit comments