Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ if(CMAKE_SYSTEM_NAME STREQUAL Windows)
ShLwApi
WS2_32
WinMM
mincore
synchronization)
endif()

Expand Down
298 changes: 201 additions & 97 deletions src/event/event_windows.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
*/

#include "internal.h"

#if DISPATCH_EVENT_BACKEND_WINDOWS

#define DEBUG_TIMERS 0

static HANDLE hPort = NULL;
enum _dispatch_windows_port {
DISPATCH_PORT_POKE = 0,
DISPATCH_PORT_TIMER_CLOCK_WALL,
DISPATCH_PORT_TIMER_CLOCK_UPTIME,
DISPATCH_PORT_TIMER_CLOCK_MONOTONIC,
DISPATCH_PORT_FILE_HANDLE,
DISPATCH_PORT_PIPE_HANDLE_READ,
DISPATCH_PORT_PIPE_HANDLE_WRITE,
Expand Down Expand Up @@ -740,16 +740,16 @@ _dispatch_event_merge_socket_write(dispatch_muxnote_t dmn,
#pragma mark timers

typedef struct _dispatch_windows_timeout_s {
PTP_TIMER pTimer;
enum _dispatch_windows_port ullIdent;
uint64_t fireTime;
uint64_t leeway;
bool bArmed;
} *dispatch_windows_timeout_t;

#define DISPATCH_WINDOWS_TIMEOUT_INITIALIZER(clock) \
[DISPATCH_CLOCK_##clock] = { \
.pTimer = NULL, \
.ullIdent = DISPATCH_PORT_TIMER_CLOCK_##clock, \
.bArmed = FALSE, \
#define DISPATCH_WINDOWS_TIMEOUT_INITIALIZER(clock) \
[DISPATCH_CLOCK_##clock] = { \
.fireTime = 0, \
.leeway = 0, \
.bArmed = FALSE, \
}

static struct _dispatch_windows_timeout_s _dispatch_windows_timeout[] = {
Expand All @@ -770,58 +770,27 @@ _dispatch_event_merge_timer(dispatch_clock_t clock)
_dispatch_timers_heap[tidx].dth_armed = false;
}

static void CALLBACK
_dispatch_timer_callback(PTP_CALLBACK_INSTANCE Instance, PVOID Context,
PTP_TIMER Timer)
{
BOOL bSuccess;

bSuccess = PostQueuedCompletionStatus(hPort, 0, (ULONG_PTR)Context,
NULL);
if (bSuccess == FALSE) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"PostQueuedCompletionStatus");
}
}

void
_dispatch_event_loop_timer_arm(dispatch_timer_heap_t dth DISPATCH_UNUSED,
uint32_t tidx, dispatch_timer_delay_s range,
dispatch_clock_now_cache_t nows)
{
dispatch_windows_timeout_t timer;
FILETIME ftDueTime;
LARGE_INTEGER liTime;

switch (DISPATCH_TIMER_CLOCK(tidx)) {
case DISPATCH_CLOCK_WALL:
timer = &_dispatch_windows_timeout[DISPATCH_CLOCK_WALL];
liTime.QuadPart = range.delay +
_dispatch_time_now_cached(DISPATCH_TIMER_CLOCK(tidx), nows);
break;

case DISPATCH_CLOCK_UPTIME:
case DISPATCH_CLOCK_MONOTONIC:
timer = &_dispatch_windows_timeout[DISPATCH_TIMER_CLOCK(tidx)];
liTime.QuadPart = -((range.delay + 99) / 100);
break;
}

if (timer->pTimer == NULL) {
timer->pTimer = CreateThreadpoolTimer(_dispatch_timer_callback,
(LPVOID)timer->ullIdent, NULL);
if (timer->pTimer == NULL) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"CreateThreadpoolTimer");
}
}

ftDueTime.dwHighDateTime = liTime.HighPart;
ftDueTime.dwLowDateTime = liTime.LowPart;
uint64_t now = _dispatch_time_now_cached(DISPATCH_TIMER_CLOCK(tidx), nows);

SetThreadpoolTimer(timer->pTimer, &ftDueTime, /*msPeriod=*/0,
/*msWindowLength=*/0);
timer = &_dispatch_windows_timeout[DISPATCH_TIMER_CLOCK(tidx)];
timer->fireTime = range.delay + now;
timer->leeway = range.leeway;
timer->bArmed = TRUE;

#if DEBUG_TIMERS
printf("[%lx] Arming clock %d: fire time %"PRIu64", leeway %"PRIu64"\n",
GetCurrentThreadId(),
DISPATCH_TIMER_CLOCK(tidx),
timer->fireTime,
timer->leeway);
#endif
}

void
Expand All @@ -830,20 +799,142 @@ _dispatch_event_loop_timer_delete(dispatch_timer_heap_t dth DISPATCH_UNUSED,
{
dispatch_windows_timeout_t timer;

switch (DISPATCH_TIMER_CLOCK(tidx)) {
case DISPATCH_CLOCK_WALL:
timer = &_dispatch_windows_timeout[DISPATCH_CLOCK_WALL];
break;
timer = &_dispatch_windows_timeout[DISPATCH_TIMER_CLOCK(tidx)];
#if DEBUG_TIMERS
BOOL wasArmed = timer->bArmed;
#endif
timer->bArmed = FALSE;

case DISPATCH_CLOCK_UPTIME:
case DISPATCH_CLOCK_MONOTONIC:
timer = &_dispatch_windows_timeout[DISPATCH_TIMER_CLOCK(tidx)];
break;
#if DEBUG_TIMERS
if (wasArmed) {
printf("[%lx] Disarming clock %d\n",
GetCurrentThreadId(),
DISPATCH_TIMER_CLOCK(tidx));
}
#endif
}

SetThreadpoolTimer(timer->pTimer, NULL, /*msPeriod=*/0,
/*msWindowLength=*/0);
timer->bArmed = FALSE;
static uint32_t
_dispatch_service_event_loop_timers(dispatch_clock_now_cache_t nows,
BOOL shouldWait) {
int nextClock = -1;
uint64_t nextDelay = ~(uint64_t)0;
uint64_t minDelayWithLeeway = ~(uint64_t)0;
BOOL didFireTimer = FALSE;

#if DEBUG_TIMERS
printf("[%lx] Runnimg timers\n", GetCurrentThreadId());
#endif

// Fire any timer events that have passed, and work out the
// minimum delay until the next one we need to deal with, taking
// leeway into account (e.g. a timer that needs to fire in 10ms
// with 0ms leeway will take priority over a timer that needs to
// fire in 1ms with 100ms leeway).
for (int clock = 0; clock < DISPATCH_CLOCK_COUNT; ++clock) {
#if DEBUG_TIMERS
printf("Clock %d: ", clock);
#endif

if (!_dispatch_windows_timeout[clock].bArmed) {
#if DEBUG_TIMERS
printf("not armed\n");
#endif
continue;
}

uint64_t now = _dispatch_time_now_cached(clock, nows);
#if DEBUG_TIMERS
printf("current time %"PRIu64", ", now);
#endif
if (_dispatch_windows_timeout[clock].fireTime <= now) {
#if DEBUG_TIMERS
uint64_t lateness = now - _dispatch_windows_timeout[clock].fireTime;
printf("firing timer (late by %"PRIu64")\n", lateness);
#endif
didFireTimer = TRUE;
_dispatch_event_merge_timer(clock);
continue;
}

uint64_t delay = _dispatch_windows_timeout[clock].fireTime - now;
uint64_t leeway = _dispatch_windows_timeout[clock].leeway;
uint64_t delayWithLeeway;

#if DEBUG_TIMERS
printf("delay %"PRIu64", leeway %"PRIu64"\n", delay, leeway);
#endif

// Use saturating addition here to avoid wrapping
if (~(uint64_t)0 - delay < leeway)
delayWithLeeway = ~(uint64_t)0;
else
delayWithLeeway = delay + leeway;

if (delayWithLeeway < minDelayWithLeeway) {
nextClock = clock;
nextDelay = delay;
minDelayWithLeeway = delayWithLeeway;
}
}

// If we fired a timer, we mustn't wait; the timer code might need to
// run in order to set up another timer.
if (didFireTimer) {
#if DEBUG_TIMERS
printf("Timer fired, so not waiting\n");
#endif
return 0;
}

// If we aren't waiting for a timer, we want to wait forever on the
// completion port.
if (nextClock == -1) {
#if DEBUG_TIMERS
printf("Not waiting\n");
#endif
return INFINITE;
}

#if DEBUG_TIMERS
printf("Waiting for clock %d\n", nextClock);
#endif

// Calculate the number of milliseconds we should wait in an ideal world.
// Windows can only actually wait multiples of its current tick length,
// which defaults to 1/64s, but may vary depending on other programs
// that are executing.
uint32_t msToWait = nextDelay / 1000000;

#if DEBUG_TIMERS
printf("msToWait = %"PRIu32"\n", msToWait);
#endif

// Check if we have at least one tick (15ms) of leeway remaining;
// if so, use it up. This is an approximation, but prevents us from
// overrunning because of multiple passes through the loop.
if (_dispatch_windows_timeout[nextClock].leeway >= 15000000) {
#if DEBUG_TIMERS
printf("In leeway\n");
#endif
if (shouldWait)
_dispatch_windows_timeout[nextClock].leeway -= 15000000;
} else {
// Otherwise, adjust msToWait down by one tick (15ms), so that we
// spin the event loop for any wait shorter than that; this means
// that we can accurately wait for 1ms (for instance), rather than
// always waiting for exact multiples of 15ms.
if (msToWait < 15)
msToWait = 0;
else
msToWait -= 15;

#if DEBUG_TIMERS
printf("Adjusted msToWait = %"PRIu32"\n", msToWait);
#endif
}

return msToWait;
}

#pragma mark dispatch_loop
Expand Down Expand Up @@ -881,30 +972,51 @@ DISPATCH_NOINLINE
void
_dispatch_event_loop_drain(uint32_t flags)
{
DWORD dwNumberOfBytesTransferred;
ULONG_PTR ulCompletionKey;
LPOVERLAPPED pOV;
BOOL bSuccess;
BOOL shouldWait = !(flags & KEVENT_FLAG_IMMEDIATE);
OVERLAPPED_ENTRY entries[64];
ULONG ulEntryCount = 0;

pOV = (LPOVERLAPPED)&pOV;
bSuccess = GetQueuedCompletionStatus(hPort, &dwNumberOfBytesTransferred,
&ulCompletionKey, &pOV,
(flags & KEVENT_FLAG_IMMEDIATE) ? 0 : INFINITE);
while (bSuccess) {
switch (ulCompletionKey) {
case DISPATCH_PORT_POKE:
break;
do {
dispatch_clock_now_cache_s nows = { };

// Run the timers first, calculating the number of milliseconds until
// the next time we need to wake up
DWORD dwMsToWait = _dispatch_service_event_loop_timers(&nows,
shouldWait);

// Read entries from the IO completion port
BOOL bSuccess = GetQueuedCompletionStatusEx(
hPort,
entries,
sizeof(entries) / sizeof(entries[0]),
&ulEntryCount,
shouldWait ? dwMsToWait : 0,
TRUE
);

case DISPATCH_PORT_TIMER_CLOCK_WALL:
_dispatch_event_merge_timer(DISPATCH_CLOCK_WALL);
break;
if (!bSuccess) {
DWORD dwErr = GetLastError();

case DISPATCH_PORT_TIMER_CLOCK_UPTIME:
_dispatch_event_merge_timer(DISPATCH_CLOCK_UPTIME);
break;
// If the port has been closed, or we timed-out, we're done.
if (dwErr == ERROR_ABANDONED_WAIT_0 || dwErr == WAIT_TIMEOUT)
break;

case DISPATCH_PORT_TIMER_CLOCK_MONOTONIC:
_dispatch_event_merge_timer(DISPATCH_CLOCK_MONOTONIC);
// If an APC occurred, go around again (we still want to wait).
if (dwErr == WAIT_IO_COMPLETION)
continue;

DISPATCH_INTERNAL_CRASH(dwErr, "GetQueuedCompletionStatus");
}
} while (false);

for (ULONG ulEntry = 0; ulEntry < ulEntryCount; ++ulEntry) {
ULONG_PTR ulCompletionKey = entries[ulEntry].lpCompletionKey;
LPOVERLAPPED pOV = entries[ulEntry].lpOverlapped;
DWORD dwNumberOfBytesTransferred =
entries[ulEntry].dwNumberOfBytesTransferred;

switch (ulCompletionKey) {
case DISPATCH_PORT_POKE:
break;

case DISPATCH_PORT_FILE_HANDLE:
Expand All @@ -913,36 +1025,28 @@ _dispatch_event_loop_drain(uint32_t flags)

case DISPATCH_PORT_PIPE_HANDLE_READ:
_dispatch_event_merge_pipe_handle_read((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
dwNumberOfBytesTransferred);
break;

case DISPATCH_PORT_PIPE_HANDLE_WRITE:
_dispatch_event_merge_pipe_handle_write((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
dwNumberOfBytesTransferred);
break;

case DISPATCH_PORT_SOCKET_READ:
_dispatch_event_merge_socket_read((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
dwNumberOfBytesTransferred);
break;

case DISPATCH_PORT_SOCKET_WRITE:
_dispatch_event_merge_socket_write((dispatch_muxnote_t)pOV,
dwNumberOfBytesTransferred);
dwNumberOfBytesTransferred);
break;

default:
DISPATCH_INTERNAL_CRASH(ulCompletionKey,
"unsupported completion key");
"unsupported completion key");
}

bSuccess = GetQueuedCompletionStatus(hPort,
&dwNumberOfBytesTransferred, &ulCompletionKey, &pOV, 0);
}

if (bSuccess == FALSE && pOV != NULL) {
DISPATCH_INTERNAL_CRASH(GetLastError(),
"GetQueuedCompletionStatus");
}
}

Expand Down
Loading