@@ -95,15 +95,13 @@ func withCLocaleSetToGerman(_ body: () throws -> Void) throws {
95
95
try body ( )
96
96
}
97
97
98
- class TestHTTPDelegate : HTTPClientResponseDelegate {
98
+ final class TestHTTPDelegate : HTTPClientResponseDelegate {
99
99
typealias Response = Void
100
100
101
101
init ( backpressureEventLoop: EventLoop ? = nil ) {
102
- self . backpressureEventLoop = backpressureEventLoop
102
+ self . state = NIOLockedValueBox ( MutableState ( backpressureEventLoop: backpressureEventLoop ) )
103
103
}
104
104
105
- var backpressureEventLoop : EventLoop ?
106
-
107
105
enum State {
108
106
case idle
109
107
case head( HTTPResponseHead )
@@ -112,77 +110,96 @@ class TestHTTPDelegate: HTTPClientResponseDelegate {
112
110
case error( Error )
113
111
}
114
112
115
- var state = State . idle
113
+ struct MutableState : Sendable {
114
+ var state : State = . idle
115
+ var backpressureEventLoop : EventLoop ?
116
+ }
117
+
118
+ let state : NIOLockedValueBox < MutableState >
116
119
117
120
func didReceiveHead( task: HTTPClient . Task < Response > , _ head: HTTPResponseHead ) -> EventLoopFuture < Void > {
118
- self . state = . head( head)
119
- return ( self . backpressureEventLoop ?? task. eventLoop) . makeSucceededFuture ( ( ) )
121
+ let eventLoop = self . state. withLockedValue {
122
+ $0. state = . head( head)
123
+ return ( $0. backpressureEventLoop ?? task. eventLoop)
124
+ }
125
+
126
+ return eventLoop. makeSucceededVoidFuture ( )
120
127
}
121
128
122
129
func didReceiveBodyPart( task: HTTPClient . Task < Response > , _ buffer: ByteBuffer ) -> EventLoopFuture < Void > {
123
- switch self . state {
124
- case . head( let head) :
125
- self . state = . body( head, buffer)
126
- case . body( let head, var body) :
127
- var buffer = buffer
128
- body. writeBuffer ( & buffer)
129
- self . state = . body( head, body)
130
- default :
131
- preconditionFailure ( " expecting head or body " )
130
+ let eventLoop = self . state. withLockedValue {
131
+ switch $0. state {
132
+ case . head( let head) :
133
+ $0. state = . body( head, buffer)
134
+ case . body( let head, var body) :
135
+ var buffer = buffer
136
+ body. writeBuffer ( & buffer)
137
+ $0. state = . body( head, body)
138
+ default :
139
+ preconditionFailure ( " expecting head or body " )
140
+ }
141
+ return ( $0. backpressureEventLoop ?? task. eventLoop)
132
142
}
133
- return ( self . backpressureEventLoop ?? task. eventLoop) . makeSucceededFuture ( ( ) )
143
+
144
+ return eventLoop. makeSucceededVoidFuture ( )
134
145
}
135
146
136
147
func didFinishRequest( task: HTTPClient . Task < Response > ) throws { }
137
148
}
138
149
139
- class CountingDelegate : HTTPClientResponseDelegate {
150
+ final class CountingDelegate : HTTPClientResponseDelegate {
140
151
typealias Response = Int
141
152
142
- var count = 0
153
+ private let _count = NIOLockedValueBox ( 0 )
143
154
144
155
func didReceiveBodyPart( task: HTTPClient . Task < Response > , _ buffer: ByteBuffer ) -> EventLoopFuture < Void > {
145
156
let str = buffer. getString ( at: 0 , length: buffer. readableBytes)
146
157
if str? . starts ( with: " id: " ) ?? false {
147
- self . count += 1
158
+ self . _count . withLockedValue { $0 += 1 }
148
159
}
149
160
return task. eventLoop. makeSucceededFuture ( ( ) )
150
161
}
151
162
152
163
func didFinishRequest( task: HTTPClient . Task < Response > ) throws -> Int {
153
- self . count
164
+ self . _count . withLockedValue { $0 }
154
165
}
155
166
}
156
167
157
- class DelayOnHeadDelegate : HTTPClientResponseDelegate {
168
+ final class DelayOnHeadDelegate : HTTPClientResponseDelegate {
158
169
typealias Response = ByteBuffer
159
170
160
171
let eventLoop : EventLoop
161
- let didReceiveHead : ( HTTPResponseHead , EventLoopPromise < Void > ) -> Void
162
-
163
- private var data : ByteBuffer
172
+ let didReceiveHead : @Sendable ( HTTPResponseHead , EventLoopPromise < Void > ) -> Void
164
173
165
- private var mayReceiveData = false
174
+ struct State : Sendable {
175
+ var data : ByteBuffer
176
+ var mayReceiveData = false
177
+ var expectError = false
178
+ }
166
179
167
- private var expectError = false
180
+ private let state : NIOLockedValueBox < State >
168
181
169
- init ( eventLoop: EventLoop , didReceiveHead: @escaping ( HTTPResponseHead , EventLoopPromise < Void > ) -> Void ) {
182
+ init ( eventLoop: EventLoop , didReceiveHead: @escaping @ Sendable ( HTTPResponseHead , EventLoopPromise < Void > ) -> Void ) {
170
183
self . eventLoop = eventLoop
171
184
self . didReceiveHead = didReceiveHead
172
- self . data = ByteBuffer ( )
185
+ self . state = NIOLockedValueBox ( State ( data : ByteBuffer ( ) ) )
173
186
}
174
187
175
188
func didReceiveHead( task: HTTPClient . Task < Response > , _ head: HTTPResponseHead ) -> EventLoopFuture < Void > {
176
- XCTAssertFalse ( self . mayReceiveData)
177
- XCTAssertFalse ( self . expectError)
189
+ self . state. withLockedValue {
190
+ XCTAssertFalse ( $0. mayReceiveData)
191
+ XCTAssertFalse ( $0. expectError)
192
+ }
178
193
179
194
let promise = self . eventLoop. makePromise ( of: Void . self)
180
- promise. futureResult. whenComplete {
181
- switch $0 {
182
- case . success:
183
- self . mayReceiveData = true
184
- case . failure:
185
- self . expectError = true
195
+ promise. futureResult. whenComplete { result in
196
+ self . state. withLockedValue { state in
197
+ switch result {
198
+ case . success:
199
+ state. mayReceiveData = true
200
+ case . failure:
201
+ state. expectError = true
202
+ }
186
203
}
187
204
}
188
205
@@ -191,20 +208,26 @@ class DelayOnHeadDelegate: HTTPClientResponseDelegate {
191
208
}
192
209
193
210
func didReceiveBodyPart( task: HTTPClient . Task < Response > , _ buffer: ByteBuffer ) -> EventLoopFuture < Void > {
194
- XCTAssertTrue ( self . mayReceiveData)
195
- XCTAssertFalse ( self . expectError)
196
- self . data. writeImmutableBuffer ( buffer)
211
+ self . state. withLockedValue {
212
+ XCTAssertTrue ( $0. mayReceiveData)
213
+ XCTAssertFalse ( $0. expectError)
214
+ $0. data. writeImmutableBuffer ( buffer)
215
+ }
197
216
return self . eventLoop. makeSucceededFuture ( ( ) )
198
217
}
199
218
200
219
func didFinishRequest( task: HTTPClient . Task < Response > ) throws -> Response {
201
- XCTAssertTrue ( self . mayReceiveData)
202
- XCTAssertFalse ( self . expectError)
203
- return self . data
220
+ self . state. withLockedValue {
221
+ XCTAssertTrue ( $0. mayReceiveData)
222
+ XCTAssertFalse ( $0. expectError)
223
+ return $0. data
224
+ }
204
225
}
205
226
206
227
func didReceiveError( task: HTTPClient . Task < ByteBuffer > , _ error: Error ) {
207
- XCTAssertTrue ( self . expectError)
228
+ self . state. withLockedValue {
229
+ XCTAssertTrue ( $0. expectError)
230
+ }
208
231
}
209
232
}
210
233
@@ -336,7 +359,7 @@ enum TestTLS {
336
359
)
337
360
}
338
361
339
- internal final class HTTPBin < RequestHandler: ChannelInboundHandler >
362
+ internal final class HTTPBin < RequestHandler: ChannelInboundHandler > : Sendable
340
363
where
341
364
RequestHandler. InboundIn == HTTPServerRequestPart ,
342
365
RequestHandler. OutboundOut == HTTPServerResponsePart
@@ -415,11 +438,15 @@ where
415
438
}
416
439
417
440
var port : Int {
418
- Int ( self . serverChannel. localAddress!. port!)
441
+ self . serverChannel. withLockedValue {
442
+ Int ( $0!. localAddress!. port!)
443
+ }
419
444
}
420
445
421
446
var socketAddress : SocketAddress {
422
- self . serverChannel. localAddress!
447
+ self . serverChannel. withLockedValue {
448
+ $0!. localAddress!
449
+ }
423
450
}
424
451
425
452
var baseURL : String {
@@ -447,17 +474,17 @@ where
447
474
448
475
private let mode : Mode
449
476
private let sslContext : NIOSSLContext ?
450
- private var serverChannel : Channel !
477
+ private let serverChannel = NIOLockedValueBox < Channel ? > ( nil )
451
478
private let isShutdown = ManagedAtomic ( false )
452
- private let handlerFactory : ( Int ) -> ( RequestHandler )
479
+ private let handlerFactory : @ Sendable ( Int ) -> ( RequestHandler )
453
480
454
481
init (
455
482
_ mode: Mode = . http1_1( ssl: false , compress: false ) ,
456
483
proxy: Proxy = . none,
457
484
bindTarget: BindTarget = . localhostIPv4RandomPort,
458
485
reusePort: Bool = false ,
459
486
trafficShapingTargetBytesPerSecond: Int ? = nil ,
460
- handlerFactory: @escaping ( Int ) -> ( RequestHandler )
487
+ handlerFactory: @escaping @ Sendable ( Int ) -> ( RequestHandler )
461
488
) {
462
489
self . mode = mode
463
490
self . sslContext = HTTPBin . sslContext ( for: mode)
@@ -477,14 +504,14 @@ where
477
504
478
505
let connectionIDAtomic = ManagedAtomic ( 0 )
479
506
480
- self . serverChannel = try ! ServerBootstrap ( group: self . group)
507
+ let serverChannel = try ! ServerBootstrap ( group: self . group)
481
508
. serverChannelOption ( ChannelOptions . socket ( SocketOptionLevel ( SOL_SOCKET) , SO_REUSEADDR) , value: 1 )
482
509
. serverChannelOption (
483
510
ChannelOptions . socket ( SocketOptionLevel ( SOL_SOCKET) , SO_REUSEPORT) ,
484
511
value: reusePort ? 1 : 0
485
512
)
486
- . serverChannelInitializer { channel in
487
- channel. pipeline. addHandler ( self . activeConnCounterHandler)
513
+ . serverChannelInitializer { [ activeConnCounterHandler ] channel in
514
+ channel. pipeline. addHandler ( activeConnCounterHandler)
488
515
} . childChannelInitializer { channel in
489
516
if let trafficShapingTargetBytesPerSecond = trafficShapingTargetBytesPerSecond {
490
517
try ! channel. pipeline. syncOperations. addHandler (
@@ -528,6 +555,7 @@ where
528
555
return channel. eventLoop. makeFailedFuture ( error)
529
556
}
530
557
} . bind ( to: socketAddress) . wait ( )
558
+ self . serverChannel. withLockedValue { $0 = serverChannel }
531
559
}
532
560
533
561
private func syncAddHTTPProxyHandlers(
@@ -1092,13 +1120,13 @@ internal final class HTTPBinHandler: ChannelInboundHandler {
1092
1120
)
1093
1121
context. write ( wrapOutboundOut ( . body( . byteBuffer( responseBody) ) ) , promise: nil )
1094
1122
}
1095
- context. eventLoop. scheduleTask ( in: self . delay) {
1123
+ context. eventLoop. assumeIsolated ( ) . scheduleTask ( in: self . delay) {
1096
1124
guard context. channel. isActive else {
1097
1125
context. close ( promise: nil )
1098
1126
return
1099
1127
}
1100
1128
1101
- context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) ) . whenComplete { result in
1129
+ context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) ) . assumeIsolated ( ) . whenComplete { result in
1102
1130
self . isServingRequest = false
1103
1131
switch result {
1104
1132
case . success:
@@ -1133,7 +1161,7 @@ internal final class HTTPBinHandler: ChannelInboundHandler {
1133
1161
}
1134
1162
}
1135
1163
1136
- final class ConnectionsCountHandler : ChannelInboundHandler {
1164
+ final class ConnectionsCountHandler : ChannelInboundHandler , Sendable {
1137
1165
typealias InboundIn = Channel
1138
1166
1139
1167
private let activeConns = ManagedAtomic ( 0 )
@@ -1152,8 +1180,8 @@ final class ConnectionsCountHandler: ChannelInboundHandler {
1152
1180
1153
1181
_ = self . activeConns. loadThenWrappingIncrement ( ordering: . relaxed)
1154
1182
_ = self . createdConns. loadThenWrappingIncrement ( ordering: . relaxed)
1155
- channel. closeFuture. whenComplete { _ in
1156
- _ = self . activeConns. loadThenWrappingDecrement ( ordering: . relaxed)
1183
+ channel. closeFuture. whenComplete { [ activeConns ] _ in
1184
+ _ = activeConns. loadThenWrappingDecrement ( ordering: . relaxed)
1157
1185
}
1158
1186
1159
1187
context. fireChannelRead ( data)
@@ -1173,7 +1201,7 @@ internal final class CloseWithoutClosingServerHandler: ChannelInboundHandler {
1173
1201
1174
1202
func handlerAdded( context: ChannelHandlerContext ) {
1175
1203
self . onClosePromise = context. eventLoop. makePromise ( )
1176
- self . onClosePromise!. futureResult. whenSuccess ( self . callback!)
1204
+ self . onClosePromise!. futureResult. assumeIsolated ( ) . whenSuccess ( self . callback!)
1177
1205
self . callback = nil
1178
1206
}
1179
1207
@@ -1235,7 +1263,7 @@ final class ExpectClosureServerHandler: ChannelInboundHandler {
1235
1263
1236
1264
struct EventLoopFutureTimeoutError : Error { }
1237
1265
1238
- extension EventLoopFuture {
1266
+ extension EventLoopFuture where Value : Sendable {
1239
1267
func timeout( after failDelay: TimeAmount ) -> EventLoopFuture < Value > {
1240
1268
let promise = self . eventLoop. makePromise ( of: Value . self)
1241
1269
@@ -1261,28 +1289,27 @@ struct CollectEverythingLogHandler: LogHandler {
1261
1289
var logLevel : Logger . Level = . info
1262
1290
let logStore : LogStore
1263
1291
1264
- class LogStore {
1292
+ final class LogStore : Sendable {
1265
1293
struct Entry {
1266
1294
var level : Logger . Level
1267
1295
var message : String
1268
1296
var metadata : [ String : String ]
1269
1297
}
1270
1298
1271
- var lock = NIOLock ( )
1272
- var logs : [ Entry ] = [ ]
1299
+ private let logs = NIOLockedValueBox < [ Entry ] > ( [ ] )
1273
1300
1274
1301
var allEntries : [ Entry ] {
1275
1302
get {
1276
- self . lock . withLock { self . logs }
1303
+ self . logs . withLockedValue { $0 }
1277
1304
}
1278
1305
set {
1279
- self . lock . withLock { self . logs = newValue }
1306
+ self . logs . withLockedValue { $0 = newValue }
1280
1307
}
1281
1308
}
1282
1309
1283
1310
func append( level: Logger . Level , message: Logger . Message , metadata: Logger . Metadata ? ) {
1284
- self . lock . withLock {
1285
- self . logs . append (
1311
+ self . logs . withLockedValue {
1312
+ $0 . append (
1286
1313
Entry (
1287
1314
level: level,
1288
1315
message: message. description,
@@ -1301,6 +1328,7 @@ struct CollectEverythingLogHandler: LogHandler {
1301
1328
level: Logger . Level ,
1302
1329
message: Logger . Message ,
1303
1330
metadata: Logger . Metadata ? ,
1331
+ source: String ,
1304
1332
file: String ,
1305
1333
function: String ,
1306
1334
line: UInt
@@ -1322,10 +1350,10 @@ struct CollectEverythingLogHandler: LogHandler {
1322
1350
/// consume the bytes by calling ``next()`` on the delegate.
1323
1351
///
1324
1352
/// The sole purpose of this class is to enable straight-line stream tests.
1325
- class ResponseStreamDelegate : HTTPClientResponseDelegate {
1353
+ final class ResponseStreamDelegate : HTTPClientResponseDelegate {
1326
1354
typealias Response = Void
1327
1355
1328
- enum State {
1356
+ enum State : Sendable {
1329
1357
/// The delegate is in the idle state. There are no http response parts to be buffered
1330
1358
/// and the consumer did not signal a demand. Transitions to all other states are allowed.
1331
1359
case idle
@@ -1343,10 +1371,11 @@ class ResponseStreamDelegate: HTTPClientResponseDelegate {
1343
1371
}
1344
1372
1345
1373
let eventLoop : EventLoop
1346
- private var state : State = . idle
1374
+ private let state : NIOLoopBoundBox < State >
1347
1375
1348
1376
init ( eventLoop: EventLoop ) {
1349
1377
self . eventLoop = eventLoop
1378
+ self . state = . makeBoxSendingValue( . idle, eventLoop: eventLoop)
1350
1379
}
1351
1380
1352
1381
func next( ) -> EventLoopFuture < ByteBuffer ? > {
@@ -1360,25 +1389,25 @@ class ResponseStreamDelegate: HTTPClientResponseDelegate {
1360
1389
}
1361
1390
1362
1391
private func next0( ) -> EventLoopFuture < ByteBuffer ? > {
1363
- switch self . state {
1392
+ switch self . state. value {
1364
1393
case . idle:
1365
1394
let promise = self . eventLoop. makePromise ( of: ByteBuffer ? . self)
1366
- self . state = . waitingForBytes( promise)
1395
+ self . state. value = . waitingForBytes( promise)
1367
1396
return promise. futureResult
1368
1397
1369
1398
case . buffering( let byteBuffer, done: false ) :
1370
- self . state = . idle
1399
+ self . state. value = . idle
1371
1400
return self . eventLoop. makeSucceededFuture ( byteBuffer)
1372
1401
1373
1402
case . buffering( let byteBuffer, done: true ) :
1374
- self . state = . finished
1403
+ self . state. value = . finished
1375
1404
return self . eventLoop. makeSucceededFuture ( byteBuffer)
1376
1405
1377
1406
case . waitingForBytes:
1378
1407
preconditionFailure ( " Don't call `.next` twice " )
1379
1408
1380
1409
case . failed( let error) :
1381
- self . state = . finished
1410
+ self . state. value = . finished
1382
1411
return self . eventLoop. makeFailedFuture ( error)
1383
1412
1384
1413
case . finished:
@@ -1408,16 +1437,16 @@ class ResponseStreamDelegate: HTTPClientResponseDelegate {
1408
1437
func didReceiveBodyPart( task: HTTPClient . Task < Response > , _ buffer: ByteBuffer ) -> EventLoopFuture < Void > {
1409
1438
self . eventLoop. preconditionInEventLoop ( )
1410
1439
1411
- switch self . state {
1440
+ switch self . state. value {
1412
1441
case . idle:
1413
- self . state = . buffering( buffer, done: false )
1442
+ self . state. value = . buffering( buffer, done: false )
1414
1443
case . waitingForBytes( let promise) :
1415
- self . state = . idle
1444
+ self . state. value = . idle
1416
1445
promise. succeed ( buffer)
1417
1446
case . buffering( var byteBuffer, done: false ) :
1418
1447
var buffer = buffer
1419
1448
byteBuffer. writeBuffer ( & buffer)
1420
- self . state = . buffering( byteBuffer, done: false )
1449
+ self . state. value = . buffering( byteBuffer, done: false )
1421
1450
case . buffering( _, done: true ) , . finished, . failed:
1422
1451
preconditionFailure ( " Invalid state: \( self . state) " )
1423
1452
}
@@ -1428,14 +1457,14 @@ class ResponseStreamDelegate: HTTPClientResponseDelegate {
1428
1457
func didReceiveError( task: HTTPClient . Task < Response > , _ error: Error ) {
1429
1458
self . eventLoop. preconditionInEventLoop ( )
1430
1459
1431
- switch self . state {
1460
+ switch self . state. value {
1432
1461
case . idle:
1433
- self . state = . failed( error)
1462
+ self . state. value = . failed( error)
1434
1463
case . waitingForBytes( let promise) :
1435
- self . state = . finished
1464
+ self . state. value = . finished
1436
1465
promise. fail ( error)
1437
1466
case . buffering( _, done: false ) :
1438
- self . state = . failed( error)
1467
+ self . state. value = . failed( error)
1439
1468
case . buffering( _, done: true ) , . finished, . failed:
1440
1469
preconditionFailure ( " Invalid state: \( self . state) " )
1441
1470
}
@@ -1444,14 +1473,14 @@ class ResponseStreamDelegate: HTTPClientResponseDelegate {
1444
1473
func didFinishRequest( task: HTTPClient . Task < Response > ) throws {
1445
1474
self . eventLoop. preconditionInEventLoop ( )
1446
1475
1447
- switch self . state {
1476
+ switch self . state. value {
1448
1477
case . idle:
1449
- self . state = . finished
1478
+ self . state. value = . finished
1450
1479
case . waitingForBytes( let promise) :
1451
- self . state = . finished
1480
+ self . state. value = . finished
1452
1481
promise. succeed ( nil )
1453
1482
case . buffering( let byteBuffer, done: false ) :
1454
- self . state = . buffering( byteBuffer, done: true )
1483
+ self . state. value = . buffering( byteBuffer, done: true )
1455
1484
case . buffering( _, done: true ) , . finished, . failed:
1456
1485
preconditionFailure ( " Invalid state: \( self . state) " )
1457
1486
}
@@ -1473,7 +1502,7 @@ class HTTPEchoHandler: ChannelInboundHandler {
1473
1502
case . body( let bytes) :
1474
1503
context. writeAndFlush ( self . wrapOutboundOut ( . body( . byteBuffer( bytes) ) ) , promise: nil )
1475
1504
case . end:
1476
- context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) ) . whenSuccess {
1505
+ context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) ) . assumeIsolated ( ) . whenSuccess {
1477
1506
context. close ( promise: nil )
1478
1507
}
1479
1508
}
@@ -1495,7 +1524,7 @@ final class HTTPEchoHeaders: ChannelInboundHandler {
1495
1524
case . body:
1496
1525
break
1497
1526
case . end:
1498
- context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) ) . whenSuccess {
1527
+ context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) ) . assumeIsolated ( ) . whenSuccess {
1499
1528
context. close ( promise: nil )
1500
1529
}
1501
1530
}
@@ -1661,7 +1690,7 @@ final class BasicInboundTrafficShapingHandler: ChannelDuplexHandler {
1661
1690
let buffer = Self . unwrapInboundIn ( data)
1662
1691
let byteCount = buffer. readableBytes
1663
1692
self . currentSecondBytesSeen += byteCount
1664
- context. eventLoop. scheduleTask ( in: . seconds( 1 ) ) {
1693
+ context. eventLoop. assumeIsolated ( ) . scheduleTask ( in: . seconds( 1 ) ) {
1665
1694
self . currentSecondBytesSeen -= byteCount
1666
1695
self . evaluatePause ( context: loopBoundContext. value)
1667
1696
}
0 commit comments