Skip to content

Commit fefe804

Browse files
authored
feat: Expose waitable handles in Windows
This commit allows waitable handles to be polled in Windows. This allows I/O constructs like processes, mutexes and waitable events be registered into the poller and be polled just like anything else. cc #25 Signed-off-by: John Nunley <[email protected]>
1 parent 6499077 commit fefe804

File tree

4 files changed

+285
-13
lines changed

4 files changed

+285
-13
lines changed

examples/windows-command.rs

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
//! Runs a command using waitable handles on Windows.
2+
//!
3+
//! Run with:
4+
//!
5+
//! ```
6+
//! cargo run --example windows-command
7+
//! ```
8+
9+
#[cfg(windows)]
10+
fn main() -> std::io::Result<()> {
11+
use async_io::os::windows::Waitable;
12+
use std::process::Command;
13+
14+
futures_lite::future::block_on(async {
15+
// Spawn a process.
16+
let process = Command::new("cmd")
17+
.args(["/C", "echo hello"])
18+
.spawn()
19+
.expect("failed to spawn process");
20+
21+
// Wrap the process in an `Async` object that waits for it to exit.
22+
let process = Waitable::new(process)?;
23+
24+
// Wait for the process to exit.
25+
process.ready().await?;
26+
27+
Ok(())
28+
})
29+
}
30+
31+
#[cfg(not(windows))]
32+
fn main() {
33+
println!("This example is only supported on Windows.");
34+
}

src/os.rs

+3
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,6 @@ pub mod unix;
1414
target_os = "dragonfly",
1515
))]
1616
pub mod kqueue;
17+
18+
#[cfg(windows)]
19+
pub mod windows;

src/os/windows.rs

+192
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
//! Functionality that is only available on Windows.
2+
3+
use crate::reactor::{Reactor, Readable, Registration};
4+
use crate::Async;
5+
6+
use std::convert::TryFrom;
7+
use std::future::Future;
8+
use std::io::{self, Result};
9+
use std::os::windows::io::{AsHandle, AsRawHandle, BorrowedHandle, OwnedHandle, RawHandle};
10+
use std::pin::Pin;
11+
use std::task::{Context, Poll};
12+
13+
/// A waitable handle registered in the reactor.
14+
///
15+
/// Some handles in Windows are “waitable”, which means that they emit a “readiness” signal after some event occurs. This function can be used to wait for such events to occur on a handle. This function can be used in addition to regular socket polling.
16+
///
17+
/// Waitable objects include the following:
18+
///
19+
/// - Console inputs
20+
/// - Waitable events
21+
/// - Mutexes
22+
/// - Processes
23+
/// - Semaphores
24+
/// - Threads
25+
/// - Timer
26+
///
27+
/// This structure can be used to wait for any of these objects to become ready.
28+
///
29+
/// ## Implementation
30+
///
31+
/// The current implementation waits on the handle by registering it in the application-global
32+
/// Win32 threadpool. However, in the futur it may be possible to migrate to an implementation
33+
/// on Windows 10 that uses a mechanism similar to [`MsgWaitForMultipleObjectsEx`].
34+
///
35+
/// [`MsgWaitForMultipleObjectsEx`]: https://docs.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-msgwaitformultipleobjectsex
36+
///
37+
/// ## Caveats
38+
///
39+
/// Read the documentation for the [`Async`](crate::Async) type for more information regarding the
40+
/// abilities and caveats with using this type.
41+
#[derive(Debug)]
42+
pub struct Waitable<T>(Async<T>);
43+
44+
impl<T> AsRef<T> for Waitable<T> {
45+
fn as_ref(&self) -> &T {
46+
self.0.as_ref()
47+
}
48+
}
49+
50+
impl<T: AsHandle> Waitable<T> {
51+
/// Create a new [`Waitable`] around a waitable handle.
52+
///
53+
/// # Examples
54+
///
55+
/// ```no_run
56+
/// use std::process::Command;
57+
/// use async_io::os::windows::Waitable;
58+
///
59+
/// // Create a new process to wait for.
60+
/// let mut child = Command::new("sleep").arg("5").spawn().unwrap();
61+
///
62+
/// // Wrap the process in an `Async` object that waits for it to exit.
63+
/// let process = Waitable::new(child).unwrap();
64+
///
65+
/// // Wait for the process to exit.
66+
/// # async_io::block_on(async {
67+
/// process.ready().await.unwrap();
68+
/// # });
69+
/// ```
70+
pub fn new(handle: T) -> Result<Self> {
71+
Ok(Self(Async {
72+
source: Reactor::get()
73+
.insert_io(unsafe { Registration::new_waitable(handle.as_handle()) })?,
74+
io: Some(handle),
75+
}))
76+
}
77+
}
78+
79+
impl<T: AsRawHandle> AsRawHandle for Waitable<T> {
80+
fn as_raw_handle(&self) -> RawHandle {
81+
self.get_ref().as_raw_handle()
82+
}
83+
}
84+
85+
impl<T: AsHandle> AsHandle for Waitable<T> {
86+
fn as_handle(&self) -> BorrowedHandle<'_> {
87+
self.get_ref().as_handle()
88+
}
89+
}
90+
91+
impl<T: AsHandle + From<OwnedHandle>> TryFrom<OwnedHandle> for Waitable<T> {
92+
type Error = io::Error;
93+
94+
fn try_from(handle: OwnedHandle) -> Result<Self> {
95+
Self::new(handle.into())
96+
}
97+
}
98+
99+
impl<T: Into<OwnedHandle>> TryFrom<Waitable<T>> for OwnedHandle {
100+
type Error = io::Error;
101+
102+
fn try_from(value: Waitable<T>) -> std::result::Result<Self, Self::Error> {
103+
value.into_inner().map(|handle| handle.into())
104+
}
105+
}
106+
107+
impl<T> Waitable<T> {
108+
/// Get a reference to the inner handle.
109+
pub fn get_ref(&self) -> &T {
110+
self.0.get_ref()
111+
}
112+
113+
/// Get a mutable reference to the inner handle.
114+
///
115+
/// # Safety
116+
///
117+
/// The underlying I/O source must not be dropped or moved out using this function.
118+
pub unsafe fn get_mut(&mut self) -> &mut T {
119+
self.0.get_mut()
120+
}
121+
122+
/// Consumes the [`Waitable`], returning the inner handle.
123+
pub fn into_inner(self) -> Result<T> {
124+
self.0.into_inner()
125+
}
126+
127+
/// Waits until the [`Waitable`] object is ready.
128+
///
129+
/// This method completes when the underlying [`Waitable`] object has completed. See the documentation
130+
/// for the [`Waitable`] object for more information.
131+
///
132+
/// # Examples
133+
///
134+
/// ```no_run
135+
/// use std::process::Command;
136+
/// use async_io::os::windows::Waitable;
137+
///
138+
/// # futures_lite::future::block_on(async {
139+
/// let child = Command::new("sleep").arg("5").spawn()?;
140+
/// let process = Waitable::new(child)?;
141+
///
142+
/// // Wait for the process to exit.
143+
/// process.ready().await?;
144+
/// # std::io::Result::Ok(()) });
145+
/// ```
146+
pub fn ready(&self) -> Ready<'_, T> {
147+
Ready(self.0.readable())
148+
}
149+
150+
/// Polls the I/O handle for readiness.
151+
///
152+
/// When this method returns [`Poll::Ready`], that means that the OS has delivered a notification
153+
/// that the underlying [`Waitable`] object is ready. See the documentation for the [`Waitable`]
154+
/// object for more information.
155+
///
156+
/// # Caveats
157+
///
158+
/// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
159+
/// will just keep waking each other in turn, thus wasting CPU time.
160+
///
161+
/// # Examples
162+
///
163+
/// ```no_run
164+
/// use std::process::Command;
165+
/// use async_io::os::windows::Waitable;
166+
/// use futures_lite::future;
167+
///
168+
/// # futures_lite::future::block_on(async {
169+
/// let child = Command::new("sleep").arg("5").spawn()?;
170+
/// let process = Waitable::new(child)?;
171+
///
172+
/// // Wait for the process to exit.
173+
/// future::poll_fn(|cx| process.poll_ready(cx)).await?;
174+
/// # std::io::Result::Ok(()) });
175+
/// ```
176+
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
177+
self.0.poll_readable(cx)
178+
}
179+
}
180+
181+
/// Future for [`Filter::ready`].
182+
#[must_use = "futures do nothing unless you `.await` or poll them"]
183+
#[derive(Debug)]
184+
pub struct Ready<'a, T>(Readable<'a, T>);
185+
186+
impl<T> Future for Ready<'_, T> {
187+
type Output = Result<()>;
188+
189+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
190+
Pin::new(&mut self.0).poll(cx)
191+
}
192+
}

src/reactor/windows.rs

+56-13
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,42 @@
11
// SPDX-License-Identifier: MIT OR Apache-2.0
22

3-
use polling::{Event, Poller};
3+
use polling::os::iocp::PollerIocpExt;
4+
use polling::{Event, PollMode, Poller};
45
use std::fmt;
56
use std::io::Result;
6-
use std::os::windows::io::{AsRawSocket, BorrowedSocket, RawSocket};
7+
use std::os::windows::io::{
8+
AsRawHandle, AsRawSocket, BorrowedHandle, BorrowedSocket, RawHandle, RawSocket,
9+
};
710

811
/// The raw registration into the reactor.
912
#[doc(hidden)]
10-
pub struct Registration {
13+
pub enum Registration {
1114
/// Raw socket handle on Windows.
1215
///
1316
/// # Invariant
1417
///
1518
/// This describes a valid socket that has not been `close`d. It will not be
1619
/// closed while this object is alive.
17-
raw: RawSocket,
20+
Socket(RawSocket),
21+
22+
/// Waitable handle for Windows.
23+
///
24+
/// # Invariant
25+
///
26+
/// This describes a valid waitable handle that has not been `close`d. It will not be
27+
/// closed while this object is alive.
28+
Handle(RawHandle),
1829
}
1930

31+
unsafe impl Send for Registration {}
32+
unsafe impl Sync for Registration {}
33+
2034
impl fmt::Debug for Registration {
2135
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
22-
fmt::Debug::fmt(&self.raw, f)
36+
match self {
37+
Self::Socket(raw) => fmt::Debug::fmt(raw, f),
38+
Self::Handle(handle) => fmt::Debug::fmt(handle, f),
39+
}
2340
}
2441
}
2542

@@ -30,31 +47,57 @@ impl Registration {
3047
///
3148
/// The provided file descriptor must be valid and not be closed while this object is alive.
3249
pub(crate) unsafe fn new(f: BorrowedSocket<'_>) -> Self {
33-
Self {
34-
raw: f.as_raw_socket(),
35-
}
50+
Self::Socket(f.as_raw_socket())
51+
}
52+
53+
/// Create a new [`Registration`] around a waitable handle.
54+
///
55+
/// # Safety
56+
///
57+
/// The provided handle must be valid and not be closed while this object is alive.
58+
pub(crate) unsafe fn new_waitable(f: BorrowedHandle<'_>) -> Self {
59+
Self::Handle(f.as_raw_handle())
3660
}
3761

3862
/// Registers the object into the reactor.
3963
#[inline]
4064
pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> {
4165
// SAFETY: This object's existence validates the invariants of Poller::add
42-
unsafe { poller.add(self.raw, Event::none(token)) }
66+
unsafe {
67+
match self {
68+
Self::Socket(raw) => poller.add(*raw, Event::none(token)),
69+
Self::Handle(handle) => {
70+
poller.add_waitable(*handle, Event::none(token), PollMode::Oneshot)
71+
}
72+
}
73+
}
4374
}
4475

4576
/// Re-registers the object into the reactor.
4677
#[inline]
4778
pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> {
4879
// SAFETY: self.raw is a valid file descriptor
49-
let fd = unsafe { BorrowedSocket::borrow_raw(self.raw) };
50-
poller.modify(fd, interest)
80+
match self {
81+
Self::Socket(raw) => {
82+
poller.modify(unsafe { BorrowedSocket::borrow_raw(*raw) }, interest)
83+
}
84+
Self::Handle(handle) => poller.modify_waitable(
85+
unsafe { BorrowedHandle::borrow_raw(*handle) },
86+
interest,
87+
PollMode::Oneshot,
88+
),
89+
}
5190
}
5291

5392
/// Deregisters the object from the reactor.
5493
#[inline]
5594
pub(crate) fn delete(&self, poller: &Poller) -> Result<()> {
5695
// SAFETY: self.raw is a valid file descriptor
57-
let fd = unsafe { BorrowedSocket::borrow_raw(self.raw) };
58-
poller.delete(fd)
96+
match self {
97+
Self::Socket(raw) => poller.delete(unsafe { BorrowedSocket::borrow_raw(*raw) }),
98+
Self::Handle(handle) => {
99+
poller.remove_waitable(unsafe { BorrowedHandle::borrow_raw(*handle) })
100+
}
101+
}
59102
}
60103
}

0 commit comments

Comments
 (0)