Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReadableFileHandleProtocol.readToEnd can fail to read the complete contents of file sizes less than the single shot read limit #2769

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions Sources/NIOFileSystem/FileChunks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import NIOPosix
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public struct FileChunks: AsyncSequence {
enum ChunkRange {
case entireFile
case partial(Range<Int64>)
/// Read from the current file access offset. Useful for reading from unseekable files.
case current
/// Read from a specific offset.
case specified(Range<Int64>)
}

public typealias Element = ByteBuffer
Expand All @@ -39,13 +41,12 @@ public struct FileChunks: AsyncSequence {
internal init(
handle: SystemFileHandle,
chunkLength: ByteCount,
range: Range<Int64>
range: Range<Int64>?
) {
let chunkRange: ChunkRange
if range.lowerBound == 0, range.upperBound == .max {
chunkRange = .entireFile
let chunkRange: ChunkRange = if let range {
.specified(range)
} else {
chunkRange = .partial(range)
.current
}

// TODO: choose reasonable watermarks; this should likely be at least somewhat dependent
Expand Down Expand Up @@ -96,9 +97,9 @@ extension BufferedStream where Element == ByteBuffer {
) -> BufferedStream<ByteBuffer> {
let state: ProducerState
switch range {
case .entireFile:
case .current:
state = ProducerState(handle: handle, range: nil)
case .partial(let partialRange):
case .specified(let partialRange):
state = ProducerState(handle: handle, range: partialRange)
}
let protectedState = NIOLockedValueBox(state)
Expand Down
4 changes: 2 additions & 2 deletions Sources/NIOFileSystem/FileHandle.swift
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public struct ReadFileHandle: ReadableFileHandleProtocol, _HasFileHandle {
)
}

public func readChunks(in range: Range<Int64>, chunkLength: ByteCount) -> FileChunks {
public func readChunks(in range: Range<Int64>?, chunkLength: ByteCount) -> FileChunks {
self.fileHandle.systemFileHandle.readChunks(in: range, chunkLength: chunkLength)
}

Expand Down Expand Up @@ -265,7 +265,7 @@ public struct ReadWriteFileHandle: ReadableAndWritableFileHandleProtocol, _HasFi
)
}

public func readChunks(in offset: Range<Int64>, chunkLength: ByteCount) -> FileChunks {
public func readChunks(in offset: Range<Int64>?, chunkLength: ByteCount) -> FileChunks {
self.fileHandle.systemFileHandle.readChunks(in: offset, chunkLength: chunkLength)
}

Expand Down
56 changes: 26 additions & 30 deletions Sources/NIOFileSystem/FileHandleProtocol.swift
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public protocol ReadableFileHandleProtocol: FileHandleProtocol {
/// - range: The absolute offsets into the file to read.
/// - chunkLength: The maximum length of the chunk to read as a ``ByteCount``.
/// - Returns: A sequence of chunks read from the file.
func readChunks(in range: Range<Int64>, chunkLength: ByteCount) -> FileChunks
func readChunks(in range: Range<Int64>?, chunkLength: ByteCount) -> FileChunks
}

// MARK: - Read chunks with default chunk length
Expand All @@ -227,15 +227,13 @@ extension ReadableFileHandleProtocol {
///
/// - Parameters:
/// - range: A range of offsets in the file to read.
/// - chunkLength: The length of chunks to read, defaults to 128 KiB.
/// - as: Type of chunk to read.
/// - SeeAlso: ``ReadableFileHandleProtocol/readChunks(in:chunkLength:)-2dz6``
/// - Returns: An `AsyncSequence` of chunks read from the file.
public func readChunks(
in range: Range<Int64>,
chunkLength: ByteCount = .kibibytes(128)
in range: Range<Int64>?
) -> FileChunks {
return self.readChunks(in: range, chunkLength: chunkLength)
return self.readChunks(in: range, chunkLength: .kibibytes(128))
}

/// Returns an asynchronous sequence of chunks read from the file.
Expand Down Expand Up @@ -376,33 +374,31 @@ extension ReadableFileHandleProtocol {
forceChunkedRead = true
}

if !forceChunkedRead, readSize <= singleShotReadLimit {
return try await self.readChunk(
fromAbsoluteOffset: offset,
length: .bytes(Int64(readSize))
)
let chunkLength: ByteCount = if !forceChunkedRead, readSize <= singleShotReadLimit {
.bytes(Int64(readSize))
Comment on lines +377 to +378
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still support Swift 5.8 so you can't use this syntax, you'll need to declare let chunkLength: ByteCount and then assign to it.

} else {
var accumulator = ByteBuffer()
accumulator.reserveCapacity(readSize)

for try await chunk in self.readChunks(in: offset..., chunkLength: .mebibytes(8)) {
accumulator.writeImmutableBuffer(chunk)
if accumulator.readableBytes > maximumSizeAllowed.bytes {
throw FileSystemError(
code: .resourceExhausted,
message: """
There are more bytes to read than the maximum size allowed \
(\(maximumSizeAllowed)). Read the file in chunks or increase the maximum size \
allowed.
""",
cause: nil,
location: .here()
)
}
}
.mebibytes(8)
}
var accumulator = ByteBuffer()
accumulator.reserveCapacity(readSize)

return accumulator
for try await chunk in self.readChunks(in: offset..., chunkLength: chunkLength) {
accumulator.writeImmutableBuffer(chunk)
if accumulator.readableBytes > maximumSizeAllowed.bytes {
throw FileSystemError(
code: .resourceExhausted,
message: """
There are more bytes to read than the maximum size allowed \
(\(maximumSizeAllowed)). Read the file in chunks or increase the maximum size \
allowed.
""",
cause: nil,
location: .here()
)
}
}

return accumulator
} else {
guard offset == 0 else {
throw FileSystemError(
Expand All @@ -415,7 +411,7 @@ extension ReadableFileHandleProtocol {
var accumulator = ByteBuffer()
accumulator.reserveCapacity(readSize)

for try await chunk in self.readChunks(in: ..., chunkLength: .mebibytes(8)) {
for try await chunk in self.readChunks(in: nil, chunkLength: .mebibytes(8)) {
accumulator.writeImmutableBuffer(chunk)
if accumulator.readableBytes > maximumSizeAllowed.bytes {
throw FileSystemError(
Expand Down
2 changes: 1 addition & 1 deletion Sources/NIOFileSystem/Internal/SystemFileHandle.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,7 @@ extension SystemFileHandle: ReadableFileHandleProtocol {
}

public func readChunks(
in range: Range<Int64>,
in range: Range<Int64>?,
chunkLength size: ByteCount
) -> FileChunks {
return FileChunks(handle: self, chunkLength: size, range: range)
Expand Down
127 changes: 125 additions & 2 deletions Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,114 @@ import XCTest

@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
final class FileHandleTests: XCTestCase {
private enum MockFileHandleError: Error {
case unimplemented(function: String)
}

private final class MockFileHandle: ReadableFileHandleProtocol {
struct ChunkedByteBufferSequence: AsyncSequence {
struct Interator: AsyncIteratorProtocol {
let buffer: ByteBuffer
private(set) var range: Range<Int64>
let chunkLength: Int
mutating func next() async throws -> ByteBuffer? {
guard let slice = self.buffer.getSlice(at: Int(range.lowerBound), length: self.chunkLength) else {
return nil
}
self.range = self.range.lowerBound+Int64(slice.readableBytes)..<self.range.upperBound
return slice

}
}
let buffer: ByteBuffer
let range: Range<Int64>
let chunkLength: ByteCount
func makeAsyncIterator() -> Interator {
.init(buffer: self.buffer, range: self.range, chunkLength: Int(self.chunkLength.bytes))
}
}
let bytes: ByteBuffer
let size: Int
let chunkSize: ByteCount

init(bytes: ByteBuffer, chunkSize: ByteCount = .bytes(Int64.max)) {
self.bytes = bytes
self.size = bytes.readableBytes // capture initial size since we might be moving the read/write index later
self.chunkSize = chunkSize
}

func readChunk(fromAbsoluteOffset offset: Int64, length: ByteCount) async throws -> ByteBuffer {
self.bytes.getSlice(at: Int(offset), length: Int(min(length.bytes, self.chunkSize.bytes))) ?? .init()
}

func readChunks(in range: Range<Int64>?, chunkLength: ByteCount) -> FileChunks {
guard let range else {
preconditionFailure("Reading from the current offset is not implemented for MockFileHandle")
}
return .init(wrapping: ChunkedByteBufferSequence(buffer: self.bytes, range: range, chunkLength: chunkLength))
}

func info() async throws -> FileInfo {
.init(
type: .regular,
permissions: [.ownerReadWrite, .groupRead, .otherRead],
size: Int64(self.size),
userID: .init(rawValue: 501),
groupID: .init(rawValue: 20),
lastAccessTime: .omit,
lastDataModificationTime: .omit,
lastStatusChangeTime: .omit
)
}

func replacePermissions(_ permissions: FilePermissions) async throws {
throw MockFileHandleError.unimplemented(function: #function)
}

func addPermissions(_ permissions: FilePermissions) async throws -> FilePermissions {
throw MockFileHandleError.unimplemented(function: #function)
}

func removePermissions(_ permissions: FilePermissions) async throws -> FilePermissions {
throw MockFileHandleError.unimplemented(function: #function)
}

func attributeNames() async throws -> [String] {
throw MockFileHandleError.unimplemented(function: #function)
}

func valueForAttribute(_ name: String) async throws -> [UInt8] {
throw MockFileHandleError.unimplemented(function: #function)
}

func updateValueForAttribute(_ bytes: some RandomAccessCollection<UInt8> & Sendable, attribute name: String) async throws {
throw MockFileHandleError.unimplemented(function: #function)
}

func removeValueForAttribute(_ name: String) async throws {
throw MockFileHandleError.unimplemented(function: #function)
}

func synchronize() async throws {
throw MockFileHandleError.unimplemented(function: #function)
}

func withUnsafeDescriptor<R>(_ execute: @escaping @Sendable (FileDescriptor) throws -> R) async throws -> R where R : Sendable {
throw MockFileHandleError.unimplemented(function: #function)
}

func detachUnsafeFileDescriptor() throws -> FileDescriptor {
throw MockFileHandleError.unimplemented(function: #function)
}

func close() async throws {
throw MockFileHandleError.unimplemented(function: #function)
}

func setTimes(lastAccess: FileInfo.Timespec?, lastDataModification: FileInfo.Timespec?) async throws {
throw MockFileHandleError.unimplemented(function: #function)
}
}
static let thisFile = FilePath(#filePath)
static let testData = FilePath(#filePath)
.removingLastComponent() // FileHandleTests.swift
Expand Down Expand Up @@ -303,6 +411,20 @@ final class FileHandleTests: XCTestCase {
}
}

func testReadWholeFilePartialChunk() async throws {
let fileContents = ByteBuffer(string: "the quick brown fox jumped over the lazy dog")
let mockHandle = MockFileHandle(
bytes: fileContents,
chunkSize: .bytes(Int64(fileContents.readableBytes / 2)) // simulate reading a chunk of less than the requested size
)
let contents = try await mockHandle.readToEnd(maximumSizeAllowed: .bytes(Int64.max))
XCTAssertEqual(
contents,
fileContents,
"Contents of mock file differ to what was read by readToEnd"
)
}

func testWriteAndReadUnseekableFile() async throws {
let privateTempDirPath = try await FileSystem.shared.createTemporaryDirectory(template: "test-XXX")
self.addTeardownBlock {
Expand All @@ -317,10 +439,11 @@ final class FileHandleTests: XCTestCase {
try await self.withHandle(forFileAtPath: privateTempDirPath.appending("fifo"), accessMode: .readWrite) {
handle in
let someBytes = ByteBuffer(repeating: 42, count: 1546)

try await handle.write(contentsOf: someBytes.readableBytesView, toAbsoluteOffset: 0)

let readSomeBytes = try await handle.readToEnd(maximumSizeAllowed: .bytes(1546))
XCTAssertEqual(readSomeBytes, someBytes)
let contents = try await handle.readToEnd(maximumSizeAllowed: .bytes(1546))
XCTAssertEqual(contents, someBytes, "Data read back from the fifo should match what was written")
}
}

Expand Down
Loading