Skip to content

Update to tokio 1.37 and various changes #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: Rust

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

env:
CARGO_TERM_COLOR: always

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- name: Build
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose
11 changes: 9 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,18 @@ include = ["**/*.rs", "Cargo.toml"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "0.2", features = ["fs", "blocking", "rt-core"] }
tokio = { version = "1.37", features = [
"fs",
"rt",
"rt-multi-thread",
"io-util",
"sync",
"time",
] }
fs3 = "0.5.0"
futures-lite = "1.11.3"

[dev-dependencies]
tokio = { version = "0.2", features = ["macros"] }
tokio = { version = "1.37", features = ["macros"] }
fork = "0.1.18"
tempfile = "3.2.0"
30 changes: 15 additions & 15 deletions benches/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,49 @@

extern crate test;

use test::Bencher;
use async_file_lock::FileLock;
use tempfile::NamedTempFile;
use test::Bencher;
use tokio::fs::File;
use tokio::prelude::io::AsyncWriteExt;
use async_file_lock::FileLock;
use tokio::io::AsyncWriteExt;

#[bench]
fn tokio_write(b: &mut Bencher) {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let mut file = rt.block_on(async {
File::create(NamedTempFile::new().unwrap().into_temp_path()).await.unwrap()
File::create(NamedTempFile::new().unwrap().into_temp_path())
.await
.unwrap()
});
b.iter(|| {
rt.block_on(async {
file.write(b"a")
});
rt.block_on(async { file.write(b"a") });
})
}

#[bench]
fn normal_write(b: &mut Bencher) {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let mut file = rt.block_on(async {
let mut file = FileLock::create(NamedTempFile::new().unwrap().into_temp_path()).await.unwrap();
let mut file = FileLock::create(NamedTempFile::new().unwrap().into_temp_path())
.await
.unwrap();
file.lock_exclusive().await;
file
});
b.iter(|| {
rt.block_on(async {
file.write(b"a")
});
rt.block_on(async { file.write(b"a") });
})
}

#[bench]
fn auto_write(b: &mut Bencher) {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let mut file = rt.block_on(async {
FileLock::create(NamedTempFile::new().unwrap().into_temp_path()).await.unwrap()
FileLock::create(NamedTempFile::new().unwrap().into_temp_path())
.await
.unwrap()
});
b.iter(|| {
rt.block_on(async {
file.write(b"a")
});
rt.block_on(async { file.write(b"a") });
})
}
2 changes: 2 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[toolchain]
channel = "nightly"
121 changes: 63 additions & 58 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
#![deny(unused_must_use)]
// #![cfg_attr(test, feature(test))]

use std::task::{Context, Poll};
use std::pin::Pin;
use std::fmt::Formatter;
use futures_lite::{ready, FutureExt};
use std::fmt::Debug;
use std::fmt::Formatter;
use std::future::Future;
use std::io::{Error, Result, SeekFrom, Seek};
use std::io::{Error, Result, Seek, SeekFrom};
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite};
use tokio::task::{spawn_blocking, JoinHandle};
use futures_lite::{ready, FutureExt};
use fs3::FileExt;
use std::path::Path;
use std::mem::MaybeUninit;
use {fs3::FileExt, tokio::io::ReadBuf};

/// Locks a file asynchronously.
/// Auto locks a file if any read or write methods are called. If [Self::lock_exclusive]
Expand All @@ -40,7 +39,13 @@ impl FileLock {
/// Opens a file in read and write mode that is unlocked.
// This function will create a file if it does not exist, and will truncate it if it does.
pub async fn create(path: impl AsRef<Path>) -> Result<FileLock> {
let file = OpenOptions::new().write(true).read(true).create(true).truncate(true).open(path).await?;
let file = OpenOptions::new()
.write(true)
.read(true)
.create(true)
.truncate(true)
.open(path)
.await?;
Ok(FileLock::new_tokio(file).await)
}

Expand All @@ -61,7 +66,7 @@ impl FileLock {
result: None,
locking_fut: None,
unlocking_fut: None,
seek_fut: None
seek_fut: None,
}
}

Expand All @@ -76,7 +81,7 @@ impl FileLock {
result: None,
locking_fut: None,
unlocking_fut: None,
seek_fut: None
seek_fut: None,
}
}

Expand All @@ -96,11 +101,14 @@ impl FileLock {
panic!("File already locked.");
}
self.is_manually_locked = true;
self.unlocked_file.as_mut().unwrap().try_lock_exclusive().map(|_| {
self.locked_file = Some(File::from_std(self.unlocked_file.take().unwrap()));
self.state = State::Locked;

})
self.unlocked_file
.as_mut()
.unwrap()
.try_lock_exclusive()
.map(|_| {
self.locked_file = Some(File::from_std(self.unlocked_file.take().unwrap()));
self.state = State::Locked;
})
}

/// Locks the file for reading until [`Self::unlock`] is called.
Expand All @@ -119,11 +127,14 @@ impl FileLock {
panic!("File already locked.");
}
self.is_manually_locked = true;
self.unlocked_file.as_mut().unwrap().try_lock_shared().map(|_| {
self.locked_file = Some(File::from_std(self.unlocked_file.take().unwrap()));
self.state = State::Locked;

})
self.unlocked_file
.as_mut()
.unwrap()
.try_lock_shared()
.map(|_| {
self.locked_file = Some(File::from_std(self.unlocked_file.take().unwrap()));
self.state = State::Locked;
})
}

/// Unlocks the file.
Expand Down Expand Up @@ -154,11 +165,11 @@ impl FileLock {
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::prelude::*;
/// use tokio::io::AsyncWriteExt;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut file = File::create("foo.txt").await?;
/// file.write_all(b"hello, world!").await?;
/// file.write(b"hello, world!").await?;
/// file.sync_all().await?;
/// # Ok(())
/// # }
Expand All @@ -173,9 +184,7 @@ impl FileLock {
return file.sync_all().await;
}
let file = self.unlocked_file.take().unwrap();
let (result, file) = spawn_blocking(|| {
(file.sync_all(), file)
}).await.unwrap();
let (result, file) = spawn_blocking(|| (file.sync_all(), file)).await.unwrap();
self.unlocked_file = Some(file);
result
}
Expand All @@ -193,11 +202,11 @@ impl FileLock {
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::prelude::*;
/// use tokio::io::AsyncWriteExt;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut file = File::create("foo.txt").await?;
/// file.write_all(b"hello, world!").await?;
/// file.write(b"hello, world!").await?;
/// file.sync_data().await?;
/// # Ok(())
/// # }
Expand All @@ -212,9 +221,7 @@ impl FileLock {
return file.sync_data().await;
}
let file = self.unlocked_file.take().unwrap();
let (result, file) = spawn_blocking(|| {
(file.sync_data(), file)
}).await.unwrap();
let (result, file) = spawn_blocking(|| (file.sync_data(), file)).await.unwrap();
self.unlocked_file = Some(file);
result
}
Expand Down Expand Up @@ -316,15 +323,15 @@ macro_rules! poll_loop {
SeekFrom::Current(0) => $self.state = State::Working,
_ => {
let mode = $self.mode;
$self.as_mut().start_seek($cx, mode);
$self.as_mut().start_seek(mode);
$self.state = State::Seeking;
}
},
State::Working => {
// println!("working");
$working
// println!("worked");
},
}
State::Locking => {
if let Err(e) = ready!($self.$lock($cx)) {
return Poll::Ready(Err(e));
Expand Down Expand Up @@ -403,17 +410,19 @@ impl AsyncWrite for FileLock {
}
}

impl AsyncRead for FileLock {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
false
}
// impl FileLock {
// unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
// false
// }
// }

impl AsyncRead for FileLock {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
poll_loop! {self, cx, |x| x as usize, poll_shared_lock,
buf: &mut ReadBuf,
) -> Poll<Result<()>> {
poll_loop! {self, cx, /* |x| x as usize */|_| (), poll_shared_lock,
State::Working => {
let result = ready!(Pin::new(self.locked_file.as_mut().unwrap())
.as_mut()
Expand All @@ -423,36 +432,33 @@ impl AsyncRead for FileLock {
return Poll::Ready(result);
} else {
self.state = State::Unlocking;
self.result = Some(result.map(|x| x as u64));
self.result = Some(result.map(|_| 0u64/* x as u64 */));
}
}
};
}
}

impl AsyncSeek for FileLock {
fn start_seek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
position: SeekFrom,
) -> Poll<Result<()>> {
fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> Result<()> {
if let Some(ref mut locked_file) = self.locked_file {
return Pin::new(locked_file)
.as_mut()
.start_seek(cx, position);
return Pin::new(locked_file).as_mut().start_seek(position);
}
let mut file = self.unlocked_file.take().expect("Cannot seek while in the process of locking/unlocking/seeking");
self.seek_fut = Some(spawn_blocking(move || {
(file.seek(position), file)
}));
return Poll::Ready(Ok(()));
let mut file = self
.unlocked_file
.take()
.expect("Cannot seek while in the process of locking/unlocking/seeking");
self.seek_fut = Some(spawn_blocking(move || (file.seek(position), file)));
return Ok(());
}

fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<u64>> {
if let Some(ref mut locked_file) = self.locked_file {
return Pin::new(locked_file)
.as_mut()
.poll_complete(cx)
return Pin::new(locked_file).as_mut().poll_complete(cx);
}
// NOTE: calling this without calling start_seek might return the same result
if let None = self.seek_fut {
return Poll::Ready(Ok(0)); // but we return 0
}
let (result, file) = ready!(Pin::new(self.seek_fut.as_mut().unwrap()).poll(cx)).unwrap();
self.seek_fut = None;
Expand Down Expand Up @@ -547,4 +553,3 @@ impl<'a> Future for UnlockFuture<'a> {
self.file_lock.poll_unlock(cx)
}
}

Loading