diff --git a/tokio/src/fs/symlink.rs b/tokio/src/fs/symlink.rs index 5d75046f13c..47596b3d758 100644 --- a/tokio/src/fs/symlink.rs +++ b/tokio/src/fs/symlink.rs @@ -12,5 +12,20 @@ pub async fn symlink(original: impl AsRef, link: impl AsRef) -> io:: let original = original.as_ref().to_owned(); let link = link.as_ref().to_owned(); + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" + ))] + { + let handle = crate::runtime::Handle::current(); + let driver_handle = handle.inner.driver().io(); + if driver_handle.check_and_init(io_uring::opcode::SymlinkAt::CODE)? { + return crate::runtime::driver::op::Op::symlink(&original, &link)?.await; + } + } + asyncify(move || std::os::unix::fs::symlink(original, link)).await } diff --git a/tokio/src/io/uring/mod.rs b/tokio/src/io/uring/mod.rs index facad596f63..47c5870b965 100644 --- a/tokio/src/io/uring/mod.rs +++ b/tokio/src/io/uring/mod.rs @@ -1,4 +1,5 @@ pub(crate) mod open; pub(crate) mod read; +pub(crate) mod symlink; pub(crate) mod utils; pub(crate) mod write; diff --git a/tokio/src/io/uring/symlink.rs b/tokio/src/io/uring/symlink.rs new file mode 100644 index 00000000000..c69418a0cbd --- /dev/null +++ b/tokio/src/io/uring/symlink.rs @@ -0,0 +1,54 @@ +use super::utils::cstr; + +use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; + +use io_uring::{opcode, types}; +use std::ffi::CString; +use std::io; +use std::io::Error; +use std::path::Path; + +#[derive(Debug)] +pub(crate) struct Symlink { + /// This field will be read by the kernel during the operation, so we + /// need to ensure it is valid for the entire duration of the operation. + #[allow(dead_code)] + original: CString, + /// This field will be read by the kernel during the operation, so we + /// need to ensure it is valid for the entire duration of the operation. + #[allow(dead_code)] + link: CString, +} + +impl Completable for Symlink { + type Output = io::Result<()>; + + fn complete(self, cqe: CqeResult) -> Self::Output { + cqe.result.map(|_| ()) + } + + fn complete_with_error(self, err: Error) -> Self::Output { + Err(err) + } +} + +impl Cancellable for Symlink { + fn cancel(self) -> CancelData { + CancelData::Symlink(self) + } +} + +impl Op { + /// Submit a request to create a symbolic link. + pub(crate) fn symlink(original: &Path, link: &Path) -> io::Result { + let original = cstr(original)?; + let link = cstr(link)?; + + let symlink_op = + opcode::SymlinkAt::new(types::Fd(libc::AT_FDCWD), original.as_ptr(), link.as_ptr()) + .build(); + + // SAFETY: Parameters are valid for the entire duration of the operation + Ok(unsafe { Op::new(symlink_op, Symlink { original, link }) }) + } +} diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index d2b9289ceee..e8001be568c 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -1,5 +1,6 @@ use crate::io::uring::open::Open; use crate::io::uring::read::Read; +use crate::io::uring::symlink::Symlink; use crate::io::uring::write::Write; use crate::runtime::Handle; @@ -19,6 +20,7 @@ pub(crate) enum CancelData { Open(Open), Write(Write), Read(Read), + Symlink(Symlink), } #[derive(Debug)] diff --git a/tokio/tests/fs_uring_symlink.rs b/tokio/tests/fs_uring_symlink.rs new file mode 100644 index 00000000000..ae95a71fb4b --- /dev/null +++ b/tokio/tests/fs_uring_symlink.rs @@ -0,0 +1,168 @@ +//! Uring symlink operations tests. + +#![cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" +))] + +use futures::FutureExt; +use std::future::poll_fn; +use std::future::Future; +use std::path::PathBuf; +use std::pin::pin; +use std::sync::mpsc; +use std::task::Poll; +use std::time::Duration; +use tempfile::TempDir; +use tokio::runtime::{Builder, Runtime}; +use tokio::task::JoinSet; +use tokio_test::assert_pending; +use tokio_util::task::TaskTracker; + +fn multi_rt(n: usize) -> Box Runtime> { + Box::new(move || { + Builder::new_multi_thread() + .worker_threads(n) + .enable_all() + .build() + .unwrap() + }) +} + +fn current_rt() -> Box Runtime> { + Box::new(|| Builder::new_current_thread().enable_all().build().unwrap()) +} + +fn rt_combinations() -> Vec Runtime>> { + vec![ + current_rt(), + multi_rt(1), + multi_rt(2), + multi_rt(8), + multi_rt(64), + multi_rt(256), + ] +} + +#[test] +fn shutdown_runtime_while_performing_io_uring_ops() { + fn run(rt: Runtime) { + let (done_tx, done_rx) = mpsc::channel(); + let (workdir, target) = create_tmp_dir(); + + rt.spawn(async move { + // spawning a bunch of uring operations. + for i in 0..usize::MAX { + let link = workdir.path().join(format!("{i}")); + let target = target.clone(); + tokio::spawn(async move { + let mut fut = pin!(tokio::fs::symlink(target, &link)); + + poll_fn(|cx| { + assert_pending!(fut.as_mut().poll(cx)); + Poll::<()>::Pending + }) + .await; + + fut.await.unwrap(); + }); + + // Avoid busy looping. + tokio::task::yield_now().await; + } + }); + + std::thread::spawn(move || { + rt.shutdown_timeout(Duration::from_millis(300)); + done_tx.send(()).unwrap(); + }); + + done_rx.recv().unwrap(); + } + + for rt in rt_combinations() { + run(rt()); + } +} + +#[test] +fn symlink_many_files() { + fn run(rt: Runtime) { + let (workdir, target) = create_tmp_dir(); + + rt.block_on(async move { + const N_LINKS: usize = 10_000; + + let tracker = TaskTracker::new(); + + for i in 0..N_LINKS { + let target = target.clone(); + let link = workdir.path().join(format!("{i}")); + tracker.spawn(async move { + tokio::fs::symlink(&target, &link).await.unwrap(); + }); + } + tracker.close(); + tracker.wait().await; + + let mut resolve_tasks = JoinSet::new(); + for i in 0..N_LINKS { + let link = workdir.path().join(format!("{i}")); + resolve_tasks.spawn(async move { tokio::fs::read_link(&link).await.unwrap() }); + } + + while let Some(resolve_result) = resolve_tasks.join_next().await { + assert_eq!(&resolve_result.unwrap(), &target); + } + }); + } + + for rt in rt_combinations() { + run(rt()); + } +} + +#[tokio::test] +async fn cancel_op_future() { + let (workdir, target) = create_tmp_dir(); + + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let handle = tokio::spawn(async move { + poll_fn(|cx| { + let link = workdir.path().join("link"); + let fut = tokio::fs::symlink(&target, &link); + + // If io_uring is enabled (and not falling back to the thread pool), + // the first poll should return Pending. + let _pending = pin!(fut).poll_unpin(cx); + + tx.send(()).unwrap(); + + Poll::<()>::Pending + }) + .await; + }); + + // Wait for the first poll + rx.recv().await.unwrap(); + + handle.abort(); + + let res = handle.await.unwrap_err(); + assert!(res.is_cancelled()); +} + +fn create_tmp_dir() -> (TempDir, PathBuf) { + let workdir = tempfile::tempdir().unwrap(); + let target = workdir.path().join("target"); + std::fs::OpenOptions::new() + .create_new(true) + .write(true) + .open(&target) + .unwrap(); + + (workdir, target) +}