Skip to content

Use ConnectionLease to return connection #571

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Sources/ConnectionPoolModule/ConnectionLease.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
public struct ConnectionLease<Connection: PooledConnection>: Sendable {
public var connection: Connection

@usableFromInline
let _release: @Sendable (Connection) -> ()

@inlinable
public init(connection: Connection, release: @escaping @Sendable (Connection) -> Void) {
self.connection = connection
self._release = release
}

@inlinable
public func release() {
self._release(self.connection)
}
}
7 changes: 5 additions & 2 deletions Sources/ConnectionPoolModule/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public protocol ConnectionRequestProtocol: Sendable {

/// A function that is called with a connection or a
/// `PoolError`.
func complete(with: Result<Connection, ConnectionPoolError>)
func complete(with: Result<ConnectionLease<Connection>, ConnectionPoolError>)
}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
Expand Down Expand Up @@ -402,8 +402,11 @@ public final class ConnectionPool<
/*private*/ func runRequestAction(_ action: StateMachine.RequestAction) {
switch action {
case .leaseConnection(let requests, let connection):
let lease = ConnectionLease(connection: connection) { connection in
self.releaseConnection(connection)
}
for request in requests {
request.complete(with: .success(connection))
request.complete(with: .success(lease))
}

case .failRequest(let request, let error):
Expand Down
16 changes: 8 additions & 8 deletions Sources/ConnectionPoolModule/ConnectionRequest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@ public struct ConnectionRequest<Connection: PooledConnection>: ConnectionRequest
public var id: ID

@usableFromInline
private(set) var continuation: CheckedContinuation<Connection, any Error>
private(set) var continuation: CheckedContinuation<ConnectionLease<Connection>, any Error>

@inlinable
init(
id: Int,
continuation: CheckedContinuation<Connection, any Error>
continuation: CheckedContinuation<ConnectionLease<Connection>, any Error>
) {
self.id = id
self.continuation = continuation
}

public func complete(with result: Result<Connection, ConnectionPoolError>) {
public func complete(with result: Result<ConnectionLease<Connection>, ConnectionPoolError>) {
self.continuation.resume(with: result)
}
}
Expand Down Expand Up @@ -46,15 +46,15 @@ extension ConnectionPool where Request == ConnectionRequest<Connection> {
}

@inlinable
public func leaseConnection() async throws -> Connection {
public func leaseConnection() async throws -> ConnectionLease<Connection> {
let requestID = requestIDGenerator.next()

let connection = try await withTaskCancellationHandler {
if Task.isCancelled {
throw CancellationError()
}

return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Connection, Error>) in
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<ConnectionLease<Connection>, Error>) in
let request = Request(
id: requestID,
continuation: continuation
Expand All @@ -71,8 +71,8 @@ extension ConnectionPool where Request == ConnectionRequest<Connection> {

@inlinable
public func withConnection<Result>(_ closure: (Connection) async throws -> Result) async throws -> Result {
let connection = try await self.leaseConnection()
defer { self.releaseConnection(connection) }
return try await closure(connection)
let lease = try await self.leaseConnection()
defer { lease.release() }
return try await closure(lease.connection)
}
}
8 changes: 3 additions & 5 deletions Sources/ConnectionPoolTestUtils/MockRequest.swift
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import _ConnectionPoolModule

public final class MockRequest: ConnectionRequestProtocol, Hashable, Sendable {
public typealias Connection = MockConnection

public final class MockRequest<Connection: PooledConnection>: ConnectionRequestProtocol, Hashable, Sendable {
public struct ID: Hashable, Sendable {
var objectID: ObjectIdentifier

Expand All @@ -11,7 +9,7 @@ public final class MockRequest: ConnectionRequestProtocol, Hashable, Sendable {
}
}

public init() {}
public init(connectionType: Connection.Type = Connection.self) {}

public var id: ID { ID(self) }

Expand All @@ -23,7 +21,7 @@ public final class MockRequest: ConnectionRequestProtocol, Hashable, Sendable {
hasher.combine(self.id)
}

public func complete(with: Result<Connection, ConnectionPoolError>) {
public func complete(with: Result<ConnectionLease<Connection>, ConnectionPoolError>) {

}
}
28 changes: 15 additions & 13 deletions Sources/PostgresNIO/Pool/PostgresClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,11 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
/// - Returns: The closure's return value.
@_disfavoredOverload
public func withConnection<Result>(_ closure: (PostgresConnection) async throws -> Result) async throws -> Result {
let connection = try await self.leaseConnection()
let lease = try await self.leaseConnection()

defer { self.pool.releaseConnection(connection) }
defer { lease.release() }

return try await closure(connection)
return try await closure(lease.connection)
}

#if compiler(>=6.0)
Expand All @@ -319,11 +319,11 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
// DO NOT FIX THE WHITESPACE IN THE NEXT LINE UNTIL 5.10 IS UNSUPPORTED
// https://github.com/swiftlang/swift/issues/79285
_ closure: (PostgresConnection) async throws -> sending Result) async throws -> sending Result {
let connection = try await self.leaseConnection()
let lease = try await self.leaseConnection()

defer { self.pool.releaseConnection(connection) }
defer { lease.release() }

return try await closure(connection)
return try await closure(lease.connection)
}

/// Lease a connection, which is in an open transaction state, for the provided `closure`'s lifetime.
Expand Down Expand Up @@ -404,7 +404,8 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
throw PSQLError(code: .tooManyParameters, query: query, file: file, line: line)
}

let connection = try await self.leaseConnection()
let lease = try await self.leaseConnection()
let connection = lease.connection

var logger = logger
logger[postgresMetadataKey: .connectionID] = "\(connection.id)"
Expand All @@ -419,12 +420,12 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
connection.channel.write(HandlerTask.extendedQuery(context), promise: nil)

promise.futureResult.whenFailure { _ in
self.pool.releaseConnection(connection)
lease.release()
}

return try await promise.futureResult.map {
$0.asyncSequence(onFinish: {
self.pool.releaseConnection(connection)
lease.release()
})
}.get()
} catch var error as PSQLError {
Expand All @@ -446,7 +447,8 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
let logger = logger ?? Self.loggingDisabled

do {
let connection = try await self.leaseConnection()
let lease = try await self.leaseConnection()
let connection = lease.connection

let promise = connection.channel.eventLoop.makePromise(of: PSQLRowStream.self)
let task = HandlerTask.executePreparedStatement(.init(
Expand All @@ -460,11 +462,11 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
connection.channel.write(task, promise: nil)

promise.futureResult.whenFailure { _ in
self.pool.releaseConnection(connection)
lease.release()
}

return try await promise.futureResult
.map { $0.asyncSequence(onFinish: { self.pool.releaseConnection(connection) }) }
.map { $0.asyncSequence(onFinish: { lease.release() }) }
.get()
.map { try preparedStatement.decodeRow($0) }
} catch var error as PSQLError {
Expand Down Expand Up @@ -504,7 +506,7 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {

// MARK: - Private Methods -

private func leaseConnection() async throws -> PostgresConnection {
private func leaseConnection() async throws -> ConnectionLease<PostgresConnection> {
if !self.runningAtomic.load(ordering: .relaxed) {
self.backgroundLogger.warning("Trying to lease connection from `PostgresClient`, but `PostgresClient.run()` hasn't been called yet.")
}
Expand Down
Loading
Loading