7907: Implement AsyncRead for files with io uring#64
7907: Implement AsyncRead for files with io uring#64martin-augment wants to merge 5 commits intomasterfrom
AsyncRead for files with io uring#64Conversation
WalkthroughThe PR introduces changes across Tokio's file I/O and io-uring subsystems. It refactors file read operations to support io-uring optimization through a new private helper function, replaces OwnedFd with reference-counted ArcFd for descriptor management, introduces a generic buffer abstraction via ReadBuffer trait supporting both Vec and Buf types, adds helper methods for buffer preparation and completion, updates cancellation data structures to distinguish buffer types, and includes a comprehensive test suite validating AsyncRead behavior with io-uring on various configurations. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @martin-augment, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
PR Review: Implement AsyncRead for files with io_uringSummaryThis PR implements io_uring support for Code Quality & Architecture ⭐Strengths
Architectural Decisions
Potential Issues 🔍CriticalNone identified - The core implementation appears sound. Minor Issues
Performance Considerations ⚡Improvements
Considerations
Security Concerns 🔒Buffer Safety
Observations
Test Coverage ✅Excellent CoverageThe new test file (tokio/tests/fs_uring_file_read.rs) is comprehensive:
Minor Gaps
API ChangesBreaking ChangesNone - All changes are additive and behind feature flags. New CancelData Variants
RecommendationsBefore Merge
Future Improvements
ConclusionThis is a high-quality implementation that follows Tokio's patterns and includes excellent test coverage. The code is safe, performant, and maintains backward compatibility. The abstraction design is clean and extensible. Recommendation: Approve with minor suggestions ✅ The minor issues noted are suggestions for improvement rather than blocking concerns. The PR is ready to merge as-is, though addressing the recommendations would further improve code quality. Great work on this implementation! 🎉 |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@tokio/src/io/uring/read.rs`:
- Around line 24-36: The cast of max_len: usize to u32 in impl ReadBuffer for
Vec<u8>::uring_read_prepare is unsafe because it can silently truncate; update
uring_read_prepare to validate the conversion before casting (e.g.,
assert!(max_len <= u32::MAX) or use u32::try_from(max_len).unwrap_or_else(|_|
panic!(...))) so any oversized max_len fails loudly; keep the returned tuple
semantics ((ptr, max_len as u32)) but only perform the as-u32 conversion after
the checked conversion. Ensure the change is applied inside the Vec<u8> impl
ReadBuffer block for the uring_read_prepare method.
- Around line 38-47: The Buf::prepare_uring_read path can silently truncate when
converting lengths to u32; update Buf::prepare_uring_read (called from
ReadBuffer::uring_read_prepare) to explicitly check that the requested max
length fits in a u32 (e.g. use u32::try_from(len).ok() or assert!(len <=
u32::MAX as usize)) and handle the overflow case deterministically (panic with a
clear message or return an error/zero length) instead of performing a blind
cast, so the io-uring length cannot be truncated.
In `@tokio/tests/fs_uring_file_read.rs`:
- Around line 222-243: The test test_file_read_cancel has a race where the
spawned task may finish before handle.abort(), causing unwrap_err() to panic;
fix by ensuring the spawned task is suspended before aborting (e.g., insert a
synchronization point like calling tokio::task::yield_now().await or using a
oneshot/barrier that the task awaits right after opening the file and before
file.read_to_end). Update the task body (the async move closure passed to
tokio::spawn that opens the file and calls file.read_to_end) to await the
synchronization primitive so abort() reliably cancels an in-flight read.
🧹 Nitpick comments (1)
tokio/src/io/blocking.rs (1)
286-293: Thelen as u32cast silently truncates ifmax_buf_size > u32::MAX.While
DEFAULT_MAX_BUF_SIZEis 2 MB and the user-facingset_max_buf_sizedoesn't document an upper bound, the io-uringReadopcode usesu32for length, so this aligns with the kernel interface. However, a debug assertion or saturating cast would make the intent explicit and prevent surprises if someone passes an unreasonably large value.Optional: add a debug assertion
let len = std::cmp::min(spare.len(), max_buf_size); let ptr = spare.as_mut_ptr().cast::<u8>(); + debug_assert!(len <= u32::MAX as usize, "uring read length exceeds u32::MAX"); (ptr, len as u32)
| impl ReadBuffer for Vec<u8> { | ||
| fn uring_read_prepare(&mut self, max_len: usize) -> (*mut u8, u32) { | ||
| assert!(self.spare_capacity_mut().len() >= max_len); | ||
| let ptr = self.spare_capacity_mut().as_mut_ptr().cast(); | ||
| (ptr, max_len as u32) | ||
| } | ||
|
|
||
| unsafe fn uring_read_complete(&mut self, n: u32) { | ||
| // SAFETY: the kernel wrote `n` bytes into spare capacity starting | ||
| // at the old self.len(), so self.len() + n bytes are now initialized. | ||
| unsafe { self.set_len(self.len() + n as usize) }; | ||
| } | ||
| } |
There was a problem hiding this comment.
max_len as u32 truncation is unchecked — could silently produce incorrect reads.
Line 28 casts max_len (a usize) to u32 without any bounds check. While current callers cap this at MAX_READ_SIZE (64 MiB), the ReadBuffer trait is pub(crate) and any future caller passing max_len > u32::MAX would get a silent truncation, leading to a read shorter than the caller expects.
Since this is in a safety-sensitive path (the returned length feeds the io-uring SQE), add an assertion or use try_from:
Proposed fix
fn uring_read_prepare(&mut self, max_len: usize) -> (*mut u8, u32) {
assert!(self.spare_capacity_mut().len() >= max_len);
+ let len: u32 = u32::try_from(max_len).expect("max_len must fit in u32 for io-uring SQE");
let ptr = self.spare_capacity_mut().as_mut_ptr().cast();
- (ptr, max_len as u32)
+ (ptr, len)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| impl ReadBuffer for Vec<u8> { | |
| fn uring_read_prepare(&mut self, max_len: usize) -> (*mut u8, u32) { | |
| assert!(self.spare_capacity_mut().len() >= max_len); | |
| let ptr = self.spare_capacity_mut().as_mut_ptr().cast(); | |
| (ptr, max_len as u32) | |
| } | |
| unsafe fn uring_read_complete(&mut self, n: u32) { | |
| // SAFETY: the kernel wrote `n` bytes into spare capacity starting | |
| // at the old self.len(), so self.len() + n bytes are now initialized. | |
| unsafe { self.set_len(self.len() + n as usize) }; | |
| } | |
| } | |
| impl ReadBuffer for Vec<u8> { | |
| fn uring_read_prepare(&mut self, max_len: usize) -> (*mut u8, u32) { | |
| assert!(self.spare_capacity_mut().len() >= max_len); | |
| let len: u32 = u32::try_from(max_len).expect("max_len must fit in u32 for io-uring SQE"); | |
| let ptr = self.spare_capacity_mut().as_mut_ptr().cast(); | |
| (ptr, len) | |
| } | |
| unsafe fn uring_read_complete(&mut self, n: u32) { | |
| // SAFETY: the kernel wrote `n` bytes into spare capacity starting | |
| // at the old self.len(), so self.len() + n bytes are now initialized. | |
| unsafe { self.set_len(self.len() + n as usize) }; | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@tokio/src/io/uring/read.rs` around lines 24 - 36, The cast of max_len: usize
to u32 in impl ReadBuffer for Vec<u8>::uring_read_prepare is unsafe because it
can silently truncate; update uring_read_prepare to validate the conversion
before casting (e.g., assert!(max_len <= u32::MAX) or use
u32::try_from(max_len).unwrap_or_else(|_| panic!(...))) so any oversized max_len
fails loudly; keep the returned tuple semantics ((ptr, max_len as u32)) but only
perform the as-u32 conversion after the checked conversion. Ensure the change is
applied inside the Vec<u8> impl ReadBuffer block for the uring_read_prepare
method.
| impl ReadBuffer for Buf { | ||
| fn uring_read_prepare(&mut self, max_len: usize) -> (*mut u8, u32) { | ||
| self.prepare_uring_read(max_len) | ||
| } | ||
|
|
||
| unsafe fn uring_read_complete(&mut self, n: u32) { | ||
| // SAFETY: caller guarantees kernel wrote exactly n bytes. | ||
| unsafe { self.complete_uring_read(n as usize) }; | ||
| } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check Buf::prepare_uring_read and complete_uring_read signatures and implementation
ast-grep --pattern $'impl Buf {
$$$
fn prepare_uring_read($$$) {
$$$
}
$$$
}'
echo "---"
rg -n 'prepare_uring_read\|complete_uring_read' --type rust -C5Repository: martin-augment/tokio
Length of output: 67
🏁 Script executed:
#!/bin/bash
# Check if file exists and read its contents
git ls-files | grep -E 'read\.rs|uring'
echo "---"
# Check the specific file
if [ -f "tokio/src/io/uring/read.rs" ]; then
echo "File exists. Reading contents:"
cat -n "tokio/src/io/uring/read.rs"
else
echo "File not found at expected path"
# Search for similar files
find . -name "*.rs" -path "*/io/*" -path "*/uring/*" 2>/dev/null | head -20
fiRepository: martin-augment/tokio
Length of output: 5254
🏁 Script executed:
#!/bin/bash
# Find the blocking module and Buf struct
find . -path "*/io/blocking*" -name "*.rs" 2>/dev/null
echo "---"
# Search for Buf struct definition and its methods
rg -n 'impl Buf|struct Buf|fn prepare_uring_read|fn complete_uring_read' --type rust -C3 tokio/src/io/Repository: martin-augment/tokio
Length of output: 4607
🏁 Script executed:
#!/bin/bash
# Read the io_uring specific implementations
sed -n '280,310p' tokio/src/io/blocking.rsRepository: martin-augment/tokio
Length of output: 1420
Guard against max_buf_size > u32::MAX truncation in Buf::prepare_uring_read.
The prepare_uring_read method at line 286 in tokio/src/io/blocking.rs casts len to u32 without checking for overflow. If max_buf_size or spare.len() exceeds u32::MAX, the cast will silently truncate, returning an incorrect length to the io-uring operation. Add an explicit overflow check or assertion to prevent this.
🤖 Prompt for AI Agents
In `@tokio/src/io/uring/read.rs` around lines 38 - 47, The Buf::prepare_uring_read
path can silently truncate when converting lengths to u32; update
Buf::prepare_uring_read (called from ReadBuffer::uring_read_prepare) to
explicitly check that the requested max length fits in a u32 (e.g. use
u32::try_from(len).ok() or assert!(len <= u32::MAX as usize)) and handle the
overflow case deterministically (panic with a clear message or return an
error/zero length) instead of performing a blind cast, so the io-uring length
cannot be truncated.
| async fn test_file_read_cancel() { | ||
| let data: Vec<u8> = (0..10_000).map(|i| (i % 256) as u8).collect(); | ||
| let (_tmp, path) = create_temp_file(&data); | ||
|
|
||
| let path2 = path.clone(); | ||
| let handle = tokio::spawn(async move { | ||
| let mut file = File::open(&path2).await.unwrap(); | ||
| let mut buf = Vec::new(); | ||
| file.read_to_end(&mut buf).await.unwrap(); | ||
| buf | ||
| }); | ||
|
|
||
| handle.abort(); | ||
| let res = handle.await; | ||
| assert!(res.unwrap_err().is_cancelled()); | ||
|
|
||
| // Verify runtime still works | ||
| let mut file = File::open(&path).await.unwrap(); | ||
| let mut buf = Vec::new(); | ||
| file.read_to_end(&mut buf).await.unwrap(); | ||
| assert_eq!(buf, data); | ||
| } |
There was a problem hiding this comment.
Potential test flakiness in cancel test.
There's a race between the spawned task completing and the abort() call. If the task finishes before abort(), handle.await returns Ok(buf) and unwrap_err() panics. While unlikely in practice (the task must open the file and read 10KB before the immediate abort()), this could be flaky on fast systems or under light load.
Consider adding a synchronization point (e.g., a barrier or tokio::task::yield_now() inside the task before the read) to ensure the task is in-flight when abort() is called.
🤖 Prompt for AI Agents
In `@tokio/tests/fs_uring_file_read.rs` around lines 222 - 243, The test
test_file_read_cancel has a race where the spawned task may finish before
handle.abort(), causing unwrap_err() to panic; fix by ensuring the spawned task
is suspended before aborting (e.g., insert a synchronization point like calling
tokio::task::yield_now().await or using a oneshot/barrier that the task awaits
right after opening the file and before file.read_to_end). Update the task body
(the async move closure passed to tokio::spawn that opens the file and calls
file.read_to_end) to await the synchronization primitive so abort() reliably
cancels an in-flight read.
There was a problem hiding this comment.
Code Review
This pull request introduces io_uring support for AsyncRead on tokio::fs::File, which is a great performance enhancement. The implementation is well-structured, introducing a generic ReadBuffer trait and using ArcFd to handle file descriptors. The changes are gated behind the appropriate feature flags and are accompanied by a comprehensive set of new tests, which is excellent. My main feedback is to address a piece of duplicated logic for handling interrupted syscalls, which could be refactored into a shared helper function to improve maintainability.
| loop { | ||
| let (res, r_fd, r_buf) = | ||
| // u64::MAX to use and advance the file position | ||
| Op::read_at(fd, buf, max_buf_size, u64::MAX).await; | ||
| match res { | ||
| Err(e) if e.kind() == io::ErrorKind::Interrupted => { | ||
| buf = r_buf; | ||
| fd = r_fd; | ||
| continue; | ||
| } | ||
| Err(e) => break (Operation::Read(Err(e)), r_buf), | ||
| Ok(n) => break (Operation::Read(Ok(n as usize)), r_buf), | ||
| } | ||
| } |
There was a problem hiding this comment.
This loop to handle io::ErrorKind::Interrupted is duplicated. A very similar loop exists in tokio/src/fs/read_uring.rs in the op_read function (lines 122-137).
To improve maintainability and avoid future inconsistencies, this logic should be extracted into a shared helper function. For example, a function like read_at_uninterruptible could be added to tokio/src/io/uring/read.rs or another shared module.
Example implementation:
async fn read_at_uninterruptible<B: ReadBuffer + ...>(...) -> (io::Result<u32>, ArcFd, B) {
loop {
let (res, r_fd, r_buf) = Op::read_at(fd, buf, max_len, offset).await;
if let Err(e) = &res {
if e.kind() == io::ErrorKind::Interrupted {
fd = r_fd;
buf = r_buf;
continue;
}
}
return (res, r_fd, r_buf);
}
}Then, both this file and read_uring.rs could call this helper, simplifying the code in both places.
🤖 Augment PR SummarySummary: This PR adds an io-uring-backed fast path for Changes:
🤖 Was this summary useful? React with 👍 or 👎 |
| let spare = self.buf.spare_capacity_mut(); | ||
| let len = std::cmp::min(spare.len(), max_buf_size); | ||
| let ptr = spare.as_mut_ptr().cast::<u8>(); | ||
| (ptr, len as u32) |
There was a problem hiding this comment.
(ptr, len as u32) relies on len <= u32::MAX; if max_buf_size exceeds that, the truncating cast can make the io-uring read length unexpectedly small (even 0), potentially causing a premature EOF to be reported to AsyncRead callers. Is it worth adding a guard to prevent silent truncation here?
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
| for (i, (_tmp, path)) in files.iter().enumerate() { | ||
| let path = path.clone(); | ||
| let expected: Vec<u8> = (0..1024).map(|j| ((i as u16 + j) % 256) as u8).collect(); | ||
| tracker.spawn(async move { |
There was a problem hiding this comment.
The JoinHandle returned by tracker.spawn(...) is dropped, so panics inside the task (including the assert_eq!) may not reliably fail the test. It may be worth ensuring task failures are observed so this test actually validates concurrent reads.
Severity: low
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
7907: To review by AI