Skip to content

Commit 344d30b

Browse files
sebstofabianfettadam-fowler
authored
[core] Only one LambdaRuntime.run() can be called at a time (fix #507) (#508)
This is a proposal to fix issue #507 **changes** - `LambdaRuntime.init()` uses a `Mutex<Bool>` to make sure only one instance is created - `LambdaRuntime.init()` can now throw an error in case an instance already exists (I did not use `fatalError()` to make it easier to test) - All `convenience init()` methods catch possible errors instead of re-throwing it to a void breaking the user-facing API - Renamed existing `LambdaRuntimeError` to `LambdaRuntimeClientError` - Introduced a new type `LambdaRuntimeError` to represent the double initialization error --------- Co-authored-by: Fabian Fett <[email protected]> Co-authored-by: Adam Fowler <[email protected]>
1 parent bf2385a commit 344d30b

File tree

8 files changed

+173
-32
lines changed

8 files changed

+173
-32
lines changed

Examples/HelloJSON/Package.swift

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@ let package = Package(
1515
// during CI, the dependency on local version of swift-aws-lambda-runtime is added dynamically below
1616
.package(
1717
url: "https://github.com/swift-server/swift-aws-lambda-runtime.git",
18-
branch: "ff-package-traits",
19-
traits: [
20-
.trait(name: "FoundationJSONSupport")
21-
]
18+
branch: "main"
2219
)
2320
],
2421
targets: [

Sources/AWSLambdaRuntime/Lambda+LocalServer.swift

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,9 @@ extension Lambda {
4848
@usableFromInline
4949
static func withLocalServer(
5050
invocationEndpoint: String? = nil,
51+
logger: Logger,
5152
_ body: sending @escaping () async throws -> Void
5253
) async throws {
53-
var logger = Logger(label: "LocalServer")
54-
logger.logLevel = Lambda.env("LOG_LEVEL").flatMap(Logger.Level.init) ?? .info
55-
5654
try await LambdaHTTPServer.withLocalServer(
5755
invocationEndpoint: invocationEndpoint,
5856
logger: logger
@@ -93,7 +91,7 @@ internal struct LambdaHTTPServer {
9391
case serverReturned(Swift.Result<Void, any Error>)
9492
}
9593

96-
struct UnsafeTransferBox<Value>: @unchecked Sendable {
94+
fileprivate struct UnsafeTransferBox<Value>: @unchecked Sendable {
9795
let value: Value
9896

9997
init(value: sending Value) {
@@ -133,6 +131,7 @@ internal struct LambdaHTTPServer {
133131
}
134132
}
135133

134+
// it's ok to keep this at `info` level because it is only used for local testing and unit tests
136135
logger.info(
137136
"Server started and listening",
138137
metadata: [
@@ -202,12 +201,18 @@ internal struct LambdaHTTPServer {
202201
return result
203202

204203
case .serverReturned(let result):
205-
logger.error(
206-
"Server shutdown before closure completed",
207-
metadata: [
208-
"error": "\(result.maybeError != nil ? "\(result.maybeError!)" : "none")"
209-
]
210-
)
204+
205+
if result.maybeError is CancellationError {
206+
logger.trace("Server's task cancelled")
207+
} else {
208+
logger.error(
209+
"Server shutdown before closure completed",
210+
metadata: [
211+
"error": "\(result.maybeError != nil ? "\(result.maybeError!)" : "none")"
212+
]
213+
)
214+
}
215+
211216
switch await group.next()! {
212217
case .closureResult(let result):
213218
return result
@@ -265,9 +270,12 @@ internal struct LambdaHTTPServer {
265270
}
266271
}
267272
}
273+
} catch let error as CancellationError {
274+
logger.trace("The task was cancelled", metadata: ["error": "\(error)"])
268275
} catch {
269276
logger.error("Hit error: \(error)")
270277
}
278+
271279
} onCancel: {
272280
channel.channel.close(promise: nil)
273281
}

Sources/AWSLambdaRuntime/LambdaRuntime.swift

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,23 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import Logging
16-
import NIOConcurrencyHelpers
1716
import NIOCore
17+
import Synchronization
1818

1919
#if canImport(FoundationEssentials)
2020
import FoundationEssentials
2121
#else
2222
import Foundation
2323
#endif
2424

25-
// We need `@unchecked` Sendable here, as `NIOLockedValueBox` does not understand `sending` today.
26-
// We don't want to use `NIOLockedValueBox` here anyway. We would love to use Mutex here, but this
27-
// sadly crashes the compiler today.
28-
public final class LambdaRuntime<Handler>: @unchecked Sendable where Handler: StreamingLambdaHandler {
29-
// TODO: We want to change this to Mutex as soon as this doesn't crash the Swift compiler on Linux anymore
25+
// This is our guardian to ensure only one LambdaRuntime is running at the time
26+
// We use an Atomic here to ensure thread safety
27+
private let _isRunning = Atomic<Bool>(false)
28+
29+
public final class LambdaRuntime<Handler>: Sendable where Handler: StreamingLambdaHandler {
3030
@usableFromInline
31-
let handlerMutex: NIOLockedValueBox<Handler?>
31+
/// we protect the handler behind a Mutex to ensure that we only ever have one copy of it
32+
let handlerStorage: SendingStorage<Handler>
3233
@usableFromInline
3334
let logger: Logger
3435
@usableFromInline
@@ -39,7 +40,7 @@ public final class LambdaRuntime<Handler>: @unchecked Sendable where Handler: St
3940
eventLoop: EventLoop = Lambda.defaultEventLoop,
4041
logger: Logger = Logger(label: "LambdaRuntime")
4142
) {
42-
self.handlerMutex = NIOLockedValueBox(handler)
43+
self.handlerStorage = SendingStorage(handler)
4344
self.eventLoop = eventLoop
4445

4546
// by setting the log level here, we understand it can not be changed dynamically at runtime
@@ -62,14 +63,24 @@ public final class LambdaRuntime<Handler>: @unchecked Sendable where Handler: St
6263
}
6364
#endif
6465

65-
@inlinable
66+
/// Make sure only one run() is called at a time
67+
// @inlinable
6668
internal func _run() async throws {
67-
let handler = self.handlerMutex.withLockedValue { handler in
68-
let result = handler
69-
handler = nil
70-
return result
69+
70+
// we use an atomic global variable to ensure only one LambdaRuntime is running at the time
71+
let (_, original) = _isRunning.compareExchange(expected: false, desired: true, ordering: .acquiringAndReleasing)
72+
73+
// if the original value was already true, run() is already running
74+
if original {
75+
throw LambdaRuntimeError(code: .runtimeCanOnlyBeStartedOnce)
76+
}
77+
78+
defer {
79+
_isRunning.store(false, ordering: .releasing)
7180
}
7281

82+
// The handler can be non-sendable, we want to ensure we only ever have one copy of it
83+
let handler = try? self.handlerStorage.get()
7384
guard let handler else {
7485
throw LambdaRuntimeError(code: .runtimeCanOnlyBeStartedOnce)
7586
}
@@ -100,8 +111,10 @@ public final class LambdaRuntime<Handler>: @unchecked Sendable where Handler: St
100111
#if LocalServerSupport
101112
// we're not running on Lambda and we're compiled in DEBUG mode,
102113
// let's start a local server for testing
103-
try await Lambda.withLocalServer(invocationEndpoint: Lambda.env("LOCAL_LAMBDA_SERVER_INVOCATION_ENDPOINT"))
104-
{
114+
try await Lambda.withLocalServer(
115+
invocationEndpoint: Lambda.env("LOCAL_LAMBDA_SERVER_INVOCATION_ENDPOINT"),
116+
logger: self.logger
117+
) {
105118

106119
try await LambdaRuntimeClient.withRuntimeClient(
107120
configuration: .init(ip: "127.0.0.1", port: 7000),

Sources/AWSLambdaRuntime/LambdaRuntimeError.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package struct LambdaRuntimeError: Error {
1717
@usableFromInline
1818
package enum Code: Sendable {
19+
/// internal error codes for LambdaRuntimeClient
1920
case closingRuntimeClient
2021

2122
case connectionToControlPlaneLost

Sources/AWSLambdaRuntime/Utils.swift

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import Dispatch
16+
import NIOConcurrencyHelpers
1617
import NIOPosix
1718

19+
// import Synchronization
20+
1821
enum Consts {
1922
static let apiPrefix = "/2018-06-01"
2023
static let invocationURLPrefix = "\(apiPrefix)/runtime/invocation"
@@ -132,3 +135,34 @@ extension AmazonHeaders {
132135
return "\(version)-\(datePadding)\(dateValue)-\(identifier)"
133136
}
134137
}
138+
139+
/// Temporary storage for value being sent from one isolation domain to another
140+
// use NIOLockedValueBox instead of Mutex to avoid compiler crashes on 6.0
141+
// see https://github.com/swiftlang/swift/issues/78048
142+
@usableFromInline
143+
struct SendingStorage<Value>: ~Copyable, @unchecked Sendable {
144+
@usableFromInline
145+
struct ValueAlreadySentError: Error {
146+
@usableFromInline
147+
init() {}
148+
}
149+
150+
@usableFromInline
151+
// let storage: Mutex<Value?>
152+
let storage: NIOLockedValueBox<Value?>
153+
154+
@inlinable
155+
init(_ value: sending Value) {
156+
self.storage = .init(value)
157+
}
158+
159+
@inlinable
160+
func get() throws -> Value {
161+
// try self.storage.withLock {
162+
try self.storage.withLockedValue {
163+
guard let value = $0 else { throw ValueAlreadySentError() }
164+
$0 = nil
165+
return value
166+
}
167+
}
168+
}

Sources/MockServer/MockHTTPServer.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ struct HttpServer {
123123
}
124124
}
125125
}
126+
// it's ok to keep this at `info` level because it is only used for local testing and unit tests
126127
logger.info("Server shutting down")
127128
}
128129

Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,11 @@ struct LambdaRuntimeClientTests {
147147
(event: String, context: LambdaContext) in
148148
"Hello \(event)"
149149
}
150-
var logger = Logger(label: "LambdaRuntime")
151-
logger.logLevel = .debug
150+
152151
let serviceGroup = ServiceGroup(
153152
services: [runtime],
154153
gracefulShutdownSignals: [.sigterm, .sigint],
155-
logger: logger
154+
logger: Logger(label: "TestLambdaRuntimeGracefulShutdown")
156155
)
157156
try await withThrowingTaskGroup(of: Void.self) { group in
158157
group.addTask {
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftAWSLambdaRuntime open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Foundation
16+
import Logging
17+
import NIOCore
18+
import Synchronization
19+
import Testing
20+
21+
@testable import AWSLambdaRuntime
22+
23+
@Suite("LambdaRuntimeTests")
24+
struct LambdaRuntimeTests {
25+
26+
@Test("LambdaRuntime can only be run once")
27+
func testLambdaRuntimerunOnce() async throws {
28+
29+
// First runtime
30+
let runtime1 = LambdaRuntime(
31+
handler: MockHandler(),
32+
eventLoop: Lambda.defaultEventLoop,
33+
logger: Logger(label: "LambdaRuntimeTests.Runtime1")
34+
)
35+
36+
// Second runtime
37+
let runtime2 = LambdaRuntime(
38+
handler: MockHandler(),
39+
eventLoop: Lambda.defaultEventLoop,
40+
logger: Logger(label: "LambdaRuntimeTests.Runtime2")
41+
)
42+
43+
try await withThrowingTaskGroup(of: Void.self) { taskGroup in
44+
// start the first runtime
45+
taskGroup.addTask {
46+
// ChannelError will be thrown when we cancel the task group
47+
await #expect(throws: ChannelError.self) {
48+
try await runtime1.run()
49+
}
50+
}
51+
52+
// wait a small amount to ensure runtime1 task is started
53+
try await Task.sleep(for: .seconds(1))
54+
55+
// Running the second runtime should trigger LambdaRuntimeError
56+
await #expect(throws: LambdaRuntimeError.self) {
57+
try await runtime2.run()
58+
}
59+
60+
// cancel runtime 1 / task 1
61+
taskGroup.cancelAll()
62+
}
63+
64+
// Running the second runtime should work now
65+
try await withThrowingTaskGroup(of: Void.self) { taskGroup in
66+
taskGroup.addTask {
67+
// ChannelError will be thrown when we cancel the task group
68+
await #expect(throws: ChannelError.self) {
69+
try await runtime2.run()
70+
}
71+
}
72+
73+
// Set timeout and cancel the runtime 2
74+
try await Task.sleep(for: .seconds(2))
75+
taskGroup.cancelAll()
76+
}
77+
}
78+
}
79+
80+
struct MockHandler: StreamingLambdaHandler {
81+
mutating func handle(
82+
_ event: NIOCore.ByteBuffer,
83+
responseWriter: some AWSLambdaRuntime.LambdaResponseStreamWriter,
84+
context: AWSLambdaRuntime.LambdaContext
85+
) async throws {
86+
87+
}
88+
}

0 commit comments

Comments
 (0)