Skip to content

Handle EmptyQueryResponse #500

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ struct ExtendedQueryStateMachine {
case parameterDescriptionReceived(ExtendedQueryContext)
case rowDescriptionReceived(ExtendedQueryContext, [RowDescription.Column])
case noDataMessageReceived(ExtendedQueryContext)

case emptyQueryResponseReceived

/// A state that is used if a noData message was received before. If a row description was received `bufferingRows` is
/// used after receiving a `bindComplete` message
case bindCompleteReceived(ExtendedQueryContext)
Expand Down Expand Up @@ -122,7 +123,7 @@ struct ExtendedQueryStateMachine {
return .forwardStreamError(.queryCancelled, read: true)
}

case .commandComplete, .error, .drain:
case .commandComplete, .emptyQueryResponseReceived, .error, .drain:
// the stream has already finished.
return .wait

Expand Down Expand Up @@ -229,6 +230,7 @@ struct ExtendedQueryStateMachine {
.messagesSent,
.parseCompleteReceived,
.parameterDescriptionReceived,
.emptyQueryResponseReceived,
.bindCompleteReceived,
.streaming,
.drain,
Expand Down Expand Up @@ -268,6 +270,7 @@ struct ExtendedQueryStateMachine {
.parseCompleteReceived,
.parameterDescriptionReceived,
.noDataMessageReceived,
.emptyQueryResponseReceived,
.rowDescriptionReceived,
.bindCompleteReceived,
.commandComplete,
Expand Down Expand Up @@ -309,6 +312,7 @@ struct ExtendedQueryStateMachine {
.parseCompleteReceived,
.parameterDescriptionReceived,
.noDataMessageReceived,
.emptyQueryResponseReceived,
.rowDescriptionReceived,
.commandComplete,
.error:
Expand All @@ -319,7 +323,22 @@ struct ExtendedQueryStateMachine {
}

mutating func emptyQueryResponseReceived() -> Action {
preconditionFailure("Unimplemented")
guard case .bindCompleteReceived(let queryContext) = self.state else {
return self.setAndFireError(.unexpectedBackendMessage(.emptyQueryResponse))
}

switch queryContext.query {
case .unnamed(_, let eventLoopPromise),
.executeStatement(_, let eventLoopPromise):
return self.avoidingStateMachineCoW { state -> Action in
state = .emptyQueryResponseReceived
let result = QueryResult(value: .emptyResponse, logger: queryContext.logger)
return .succeedQuery(eventLoopPromise, with: result)
}

case .prepareStatement(_, _, _, _):
return self.setAndFireError(.unexpectedBackendMessage(.emptyQueryResponse))
}
}

mutating func errorReceived(_ errorMessage: PostgresBackendMessage.ErrorResponse) -> Action {
Expand All @@ -336,7 +355,7 @@ struct ExtendedQueryStateMachine {
return self.setAndFireError(error)
case .streaming, .drain:
return self.setAndFireError(error)
case .commandComplete:
case .commandComplete, .emptyQueryResponseReceived:
return self.setAndFireError(.unexpectedBackendMessage(.error(errorMessage)))
case .error:
preconditionFailure("""
Expand Down Expand Up @@ -382,6 +401,7 @@ struct ExtendedQueryStateMachine {
.parseCompleteReceived,
.parameterDescriptionReceived,
.noDataMessageReceived,
.emptyQueryResponseReceived,
.rowDescriptionReceived,
.bindCompleteReceived:
preconditionFailure("Requested to consume next row without anything going on.")
Expand All @@ -405,6 +425,7 @@ struct ExtendedQueryStateMachine {
.parseCompleteReceived,
.parameterDescriptionReceived,
.noDataMessageReceived,
.emptyQueryResponseReceived,
.rowDescriptionReceived,
.bindCompleteReceived:
return .wait
Expand Down Expand Up @@ -449,6 +470,7 @@ struct ExtendedQueryStateMachine {
}
case .initialized,
.commandComplete,
.emptyQueryResponseReceived,
.drain,
.error:
// we already have the complete stream received, now we are waiting for a
Expand Down Expand Up @@ -495,7 +517,7 @@ struct ExtendedQueryStateMachine {
return .forwardStreamError(error, read: true)
}

case .commandComplete, .error:
case .commandComplete, .emptyQueryResponseReceived, .error:
preconditionFailure("""
This state must not be reached. If the query `.isComplete`, the
ConnectionStateMachine must not send any further events to the substate machine.
Expand All @@ -518,6 +540,9 @@ struct ExtendedQueryStateMachine {
return false
}

case .emptyQueryResponseReceived:
return true

case .initialized, .messagesSent, .parseCompleteReceived, .parameterDescriptionReceived, .bindCompleteReceived, .streaming, .drain:
return false

Expand Down
67 changes: 51 additions & 16 deletions Sources/PostgresNIO/New/PSQLRowStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import Logging

struct QueryResult {
enum Value: Equatable {
case emptyResponse
case noRows(String)
case rowDescription([RowDescription.Column])
}
Expand All @@ -19,6 +20,7 @@ final class PSQLRowStream: @unchecked Sendable {
enum Source {
case stream([RowDescription.Column], PSQLRowsDataSource)
case noRows(Result<String, Error>)
case emptyResponse
}

let eventLoop: EventLoop
Expand All @@ -27,14 +29,20 @@ final class PSQLRowStream: @unchecked Sendable {
private enum BufferState {
case streaming(buffer: CircularBuffer<DataRow>, dataSource: PSQLRowsDataSource)
case finished(buffer: CircularBuffer<DataRow>, commandTag: String)
case empty
case failure(Error)
}


private enum Consumed {
case tag(String)
case emptyResponse
}

private enum DownstreamState {
case waitingForConsumer(BufferState)
case iteratingRows(onRow: (PostgresRow) throws -> (), EventLoopPromise<Void>, PSQLRowsDataSource)
case waitingForAll([PostgresRow], EventLoopPromise<[PostgresRow]>, PSQLRowsDataSource)
case consumed(Result<String, Error>)
case consumed(Result<Consumed, Error>)
case asyncSequence(AsyncSequenceSource, PSQLRowsDataSource, onFinish: @Sendable () -> ())
}

Expand All @@ -58,6 +66,9 @@ final class PSQLRowStream: @unchecked Sendable {
case .noRows(.failure(let error)):
self.rowDescription = []
bufferState = .failure(error)
case .emptyResponse:
self.rowDescription = []
bufferState = .empty
}

self.downstreamState = .waitingForConsumer(bufferState)
Expand Down Expand Up @@ -98,11 +109,16 @@ final class PSQLRowStream: @unchecked Sendable {
self.downstreamState = .asyncSequence(source, dataSource, onFinish: onFinish)
self.executeActionBasedOnYieldResult(yieldResult, source: dataSource)

case .empty:
source.finish()
onFinish()
self.downstreamState = .consumed(.success(.emptyResponse))

case .finished(let buffer, let commandTag):
_ = source.yield(contentsOf: buffer)
source.finish()
onFinish()
self.downstreamState = .consumed(.success(commandTag))
self.downstreamState = .consumed(.success(.tag(commandTag)))

case .failure(let error):
source.finish(error)
Expand Down Expand Up @@ -195,12 +211,16 @@ final class PSQLRowStream: @unchecked Sendable {
PostgresRow(data: $0, lookupTable: self.lookupTable, columns: self.rowDescription)
}

self.downstreamState = .consumed(.success(commandTag))
self.downstreamState = .consumed(.success(.tag(commandTag)))
return self.eventLoop.makeSucceededFuture(rows)

case .failure(let error):
self.downstreamState = .consumed(.failure(error))
return self.eventLoop.makeFailedFuture(error)

case .empty:
self.downstreamState = .consumed(.success(.emptyResponse))
return self.eventLoop.makeSucceededFuture([])
}
}

Expand Down Expand Up @@ -247,7 +267,11 @@ final class PSQLRowStream: @unchecked Sendable {
}

return promise.futureResult


case .empty:
self.downstreamState = .consumed(.success(.emptyResponse))
return self.eventLoop.makeSucceededVoidFuture()

case .finished(let buffer, let commandTag):
do {
for data in buffer {
Expand All @@ -259,7 +283,7 @@ final class PSQLRowStream: @unchecked Sendable {
try onRow(row)
}

self.downstreamState = .consumed(.success(commandTag))
self.downstreamState = .consumed(.success(.tag(commandTag)))
return self.eventLoop.makeSucceededVoidFuture()
} catch {
self.downstreamState = .consumed(.failure(error))
Expand Down Expand Up @@ -290,9 +314,9 @@ final class PSQLRowStream: @unchecked Sendable {
buffer.append(contentsOf: newRows)
self.downstreamState = .waitingForConsumer(.streaming(buffer: buffer, dataSource: dataSource))

case .waitingForConsumer(.finished), .waitingForConsumer(.failure):
case .waitingForConsumer(.finished), .waitingForConsumer(.failure), .waitingForConsumer(.empty):
preconditionFailure("How can new rows be received, if an end was already signalled?")

case .iteratingRows(let onRow, let promise, let dataSource):
do {
for data in newRows {
Expand Down Expand Up @@ -353,20 +377,23 @@ final class PSQLRowStream: @unchecked Sendable {
preconditionFailure("How can we get another end, if an end was already signalled?")

case .iteratingRows(_, let promise, _):
self.downstreamState = .consumed(.success(commandTag))
self.downstreamState = .consumed(.success(.tag(commandTag)))
promise.succeed(())

case .waitingForAll(let rows, let promise, _):
self.downstreamState = .consumed(.success(commandTag))
self.downstreamState = .consumed(.success(.tag(commandTag)))
promise.succeed(rows)

case .asyncSequence(let source, _, let onFinish):
self.downstreamState = .consumed(.success(commandTag))
self.downstreamState = .consumed(.success(.tag(commandTag)))
source.finish()
onFinish()

case .consumed:
case .consumed(.success(.tag)), .consumed(.failure):
break

case .consumed(.success(.emptyResponse)), .waitingForConsumer(.empty):
preconditionFailure("How can we get an end for empty query response?")
}
}

Expand All @@ -375,7 +402,7 @@ final class PSQLRowStream: @unchecked Sendable {
case .waitingForConsumer(.streaming):
self.downstreamState = .waitingForConsumer(.failure(error))

case .waitingForConsumer(.finished), .waitingForConsumer(.failure):
case .waitingForConsumer(.finished), .waitingForConsumer(.failure), .waitingForConsumer(.empty):
preconditionFailure("How can we get another end, if an end was already signalled?")

case .iteratingRows(_, let promise, _):
Expand All @@ -391,8 +418,11 @@ final class PSQLRowStream: @unchecked Sendable {
consumer.finish(error)
onFinish()

case .consumed:
case .consumed(.success(.tag)), .consumed(.failure):
break

case .consumed(.success(.emptyResponse)):
preconditionFailure("How can we get an error for empty query response?")
}
}

Expand All @@ -413,10 +443,15 @@ final class PSQLRowStream: @unchecked Sendable {
}

var commandTag: String {
guard case .consumed(.success(let commandTag)) = self.downstreamState else {
guard case .consumed(.success(let consumed)) = self.downstreamState else {
preconditionFailure("commandTag may only be called if all rows have been consumed")
}
return commandTag
switch consumed {
case .tag(let tag):
return tag
case .emptyResponse:
return ""
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions Sources/PostgresNIO/New/PostgresChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,13 @@ final class PostgresChannelHandler: ChannelDuplexHandler {
eventLoop: context.channel.eventLoop,
logger: result.logger
)

case .emptyResponse:
rows = PSQLRowStream(
source: .emptyResponse,
eventLoop: context.channel.eventLoop,
logger: result.logger
)
}

promise.succeed(rows)
Expand Down
5 changes: 1 addition & 4 deletions Sources/PostgresNIO/PostgresDatabase+Query.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@ public struct PostgresQueryMetadata: Sendable {

init?(string: String) {
let parts = string.split(separator: " ")
guard parts.count >= 1 else {
return nil
}
switch parts[0] {
switch parts.first {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it also parses empty command tags.

case "INSERT":
// INSERT oid rows
guard parts.count == 3 else {
Expand Down
19 changes: 19 additions & 0 deletions Tests/IntegrationTests/PSQLIntegrationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,25 @@ final class IntegrationTests: XCTestCase {
XCTAssertEqual(foo, "hello")
}

func testQueryNothing() throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let eventLoop = eventLoopGroup.next()

var conn: PostgresConnection?
XCTAssertNoThrow(conn = try PostgresConnection.test(on: eventLoop).wait())
defer { XCTAssertNoThrow(try conn?.close().wait()) }

var _result: PostgresQueryResult?
XCTAssertNoThrow(_result = try conn?.query("""
-- Some comments
""", logger: .psqlTest).wait())

let result = try XCTUnwrap(_result)
XCTAssertEqual(result.rows, [])
XCTAssertEqual(result.metadata.command, "")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

command tag will be empty. Maybe that API should change in V2 to reflect that a query can have no command tag / metadata at all.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add an issue for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

func testDecodeIntegers() {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,25 @@ class ExtendedQueryStateMachineTests: XCTestCase {
XCTAssertEqual(state.commandCompletedReceived("SELECT 2"), .forwardStreamComplete([row5, row6], commandTag: "SELECT 2"))
XCTAssertEqual(state.readyForQueryReceived(.idle), .fireEventReadyForQuery)
}


func testExtendedQueryWithNoQuery() {
var state = ConnectionStateMachine.readyForQuery()

let logger = Logger.psqlTest
let promise = EmbeddedEventLoop().makePromise(of: PSQLRowStream.self)
promise.fail(PSQLError.uncleanShutdown) // we don't care about the error at all.
let query: PostgresQuery = "-- some comments"
let queryContext = ExtendedQueryContext(query: query, logger: logger, promise: promise)

XCTAssertEqual(state.enqueue(task: .extendedQuery(queryContext)), .sendParseDescribeBindExecuteSync(query))
XCTAssertEqual(state.parseCompleteReceived(), .wait)
XCTAssertEqual(state.parameterDescriptionReceived(.init(dataTypes: [.int8])), .wait)
XCTAssertEqual(state.noDataReceived(), .wait)
XCTAssertEqual(state.bindCompleteReceived(), .wait)
XCTAssertEqual(state.emptyQueryResponseReceived(), .succeedQuery(promise, with: .init(value: .emptyResponse, logger: logger)))
XCTAssertEqual(state.readyForQueryReceived(.idle), .fireEventReadyForQuery)
}

func testReceiveTotallyUnexpectedMessageInQuery() {
var state = ConnectionStateMachine.readyForQuery()

Expand Down
Loading