Skip to content

Commit 49d8024

Browse files
committed
Implement EventLoopForIO on Windows.
1 parent 1901174 commit 49d8024

13 files changed

+248
-20
lines changed

amio.ambuild

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,10 @@ def Configure(binary):
6666
]
6767
elif builder.target_platform == 'windows':
6868
binary.sources += [
69+
'windows/windows-base-poller.cc',
6970
'windows/windows-context.cc',
7071
'windows/windows-errors.cc',
72+
'windows/windows-event-loop.cc',
7173
'windows/windows-file.cc',
7274
'windows/windows-iocp.cc',
7375
'windows/windows-net.cc',

include/amio-eventloop.h

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class AMIO_LINK TaskQueue
9797
};
9898

9999
// An event loop encapsulates a TaskQueue and optionally other polling systems.
100-
class AMIO_LINK EventLoop : public TaskQueue::Delegate
100+
class AMIO_LINK EventLoop
101101
{
102102
public:
103103
// Event loops should be freed with |delete|.
@@ -118,15 +118,22 @@ class AMIO_LINK EventLoop : public TaskQueue::Delegate
118118
// Polls for new events in a loop. The only way to exit the loop is to post
119119
// a ShouldQuit() message.
120120
virtual void Loop() = 0;
121+
122+
// Shuts down the event loop; this is usually called after Loop() exits. It
123+
// must not be called from within Loop().
124+
virtual void Shutdown() = 0;
121125
};
122126

127+
#if defined(KE_POSIX)
123128
// An EventQueue is a wrapper around a poller. Instead of immediately
124129
// delivering events, it buffers status changes and can deliver them
125130
// incrementally. This is useful for constructing event loops that
126131
// prioritize some tasks over others.
127132
//
128-
// To use a status queue, simply Poll() and then ask the queue to process
129-
// events.
133+
// To use an EventQueue, simply Poll() and then ask the queue to process events.
134+
// EventQueues are not implemented on Windows. A Poller is effectively an
135+
// EventQueue already, and users who wish to have multiple may simply use
136+
// multiple pollers.
130137
//
131138
// EventQueues are not thread-safe. All operations must occur on the same
132139
// thread.
@@ -153,6 +160,7 @@ class EventQueue : public IODispatcher
153160
// not shutdown the underlying poller.
154161
virtual void Shutdown() = 0;
155162
};
163+
#endif
156164

157165
// An event loop for I/O multiplexing. This is essentially a wrapper around
158166
// a Poller and a single EventQueue. Tasks are prioritized over events.

include/amio-windows.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,8 @@ class AMIO_LINK Transport : public ke::RefcountedThreadsafe<Transport>
275275
// discard any events received on closed transports.
276276
//
277277
// If you have allocated data associated with a context, it is best to use
278-
// IUserData so it is always freed.
278+
// IUserData so it is always freed, since any pending IO operations will
279+
// not post back their context.
279280
virtual void Close() = 0;
280281

281282
// Returns true if the handle has been closed.

tests/AMBuild.tests

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ runner.compiler.linkflags += [KE.static_lib.binary]
2626

2727
runner.sources += [
2828
'main.cc',
29-
'common/test-event-queues.cc',
3029
'common/test-event-loops.cc',
3130
'common/test-network.cc',
3231
'common/test-server-client.cc',
@@ -39,6 +38,7 @@ if builder.target_platform == 'windows':
3938
]
4039
else:
4140
runner.sources += [
41+
'posix/test-event-queues.cc',
4242
'posix/test-pipes.cc',
4343
'posix/test-threading.cc',
4444
]

tests/common/test-event-loops.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ class PokeThread : public IRunnable
2828

2929
void Run() override {
3030
me_->Loop();
31-
other_->PostQuit();
3231
Ended = true;
32+
other_->PostQuit();
3333
}
3434

3535
bool Ended;

tests/common/test-server-client.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,19 @@ TestServerClient::Run()
147147
return false;
148148

149149
server->Close();
150+
server = nullptr;
150151

151152
// Try to connect again. We should get an error.
152153
cli_helper = new ClientHelper();
153154
if (!Client::Create(&client, poller_, address, Protocol::TCP, cli_helper)) {
154155
if (!check(client.connection == nullptr, "should not have connected"))
155156
return false;
156157

158+
#if defined(KE_WINDOWS)
159+
// Sleep for one second to give the network stack time to catch up.
160+
Sleep(1000);
161+
#endif
162+
157163
if (!check_error(poller_->Poll(), "poll for error"))
158164
return false;
159165

File renamed without changes.

windows/windows-base-poller.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,11 @@ WinBasePoller::unlink_impl(WinContext *context)
113113
// Undo everything we did in link().
114114
context->detach();
115115
context->Release();
116-
}
116+
}
117+
118+
AlreadyRefed<WinContext>
119+
WinBasePoller::take(WinContext *context)
120+
{
121+
context->detach();
122+
return AlreadyRefed<WinContext>(context);
123+
}

windows/windows-base-poller.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,18 @@ class WinBasePoller
5050
void link(WinContext *context, const T &object, RequestType type) {
5151
if (object)
5252
object->AddRef();
53-
5453
this->link_impl(context, type);
5554
}
5655
template <typename T>
5756
void unlink(WinContext *context, const T &object) {
5857
this->unlink_impl(context);
59-
6058
if (object)
6159
object->Release();
6260
}
6361

62+
// Special version just for Poll().
63+
AlreadyRefed<WinContext> take(WinContext *context);
64+
6465
private:
6566
void link_impl(WinContext *context, RequestType type);
6667
void unlink_impl(WinContext *context);

windows/windows-event-loop.cc

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// vim: set ts=2 sw=2 tw=99 et:
2+
//
3+
// Copyright (C) 2014 David Anderson
4+
//
5+
// This file is part of the AlliedModders I/O Library.
6+
//
7+
// The AlliedModders I/O library is licensed under the GNU General Public
8+
// License, version 3 or higher. For more information, see LICENSE.txt
9+
//
10+
#include "windows-event-loop.h"
11+
#include <stdio.h>
12+
13+
using namespace ke;
14+
using namespace amio;
15+
16+
PassRef<IOError>
17+
EventLoopForIO::Create(Ref<EventLoopForIO> *outp, Ref<Poller> poller)
18+
{
19+
if (!poller) {
20+
if (Ref<IOError> error = PollerFactory::Create(&poller))
21+
return error;
22+
}
23+
24+
*outp = new WindowsEventLoopForIO(poller);
25+
return nullptr;
26+
}
27+
28+
WindowsEventLoopForIO::WindowsEventLoopForIO(Ref<Poller> poller)
29+
: poller_(poller),
30+
tasks_(this),
31+
wakeup_(new Wakeup()),
32+
received_wakeup_(false)
33+
{
34+
}
35+
36+
void
37+
WindowsEventLoopForIO::Shutdown()
38+
{
39+
wakeup_ = nullptr;
40+
poller_ = nullptr;
41+
}
42+
43+
WindowsEventLoopForIO::Wakeup::Wakeup()
44+
{
45+
context_ = IOContext::New();
46+
}
47+
48+
void
49+
WindowsEventLoopForIO::Wakeup::OnCompleted(IOResult &r)
50+
{
51+
// Just take the context back.
52+
context_ = r.context;
53+
}
54+
55+
void
56+
WindowsEventLoopForIO::Wakeup::Signal(Ref<Poller> poller)
57+
{
58+
// If the context is already in the completion port, then we'll always be
59+
// able to dequeue something, so we don't need to post a wakeup message.
60+
if (!context_)
61+
return;
62+
63+
if (Ref<IOError> error = poller->Post(context_, this)) {
64+
fprintf(stderr, "could not post wakeup: %s\n", error->Message());
65+
return;
66+
}
67+
68+
context_ = nullptr;
69+
}
70+
71+
PassRef<Poller>
72+
WindowsEventLoopForIO::GetPoller()
73+
{
74+
return poller_;
75+
}
76+
77+
PassRef<IOError>
78+
WindowsEventLoopForIO::Attach(Ref<Transport> transport, Ref<IOListener> listener)
79+
{
80+
return poller_->Attach(transport, listener);
81+
}
82+
83+
void
84+
WindowsEventLoopForIO::PostTask(Task *task)
85+
{
86+
tasks_.PostTask(task);
87+
}
88+
89+
void
90+
WindowsEventLoopForIO::PostQuit()
91+
{
92+
tasks_.PostQuit();
93+
}
94+
95+
bool
96+
WindowsEventLoopForIO::ShouldQuit()
97+
{
98+
return tasks_.ShouldQuit();
99+
}
100+
101+
void
102+
WindowsEventLoopForIO::Loop()
103+
{
104+
while (!ShouldQuit()) {
105+
if (tasks_.ProcessNextTask())
106+
continue;
107+
108+
received_wakeup_ = false;
109+
110+
if (Ref<IOError> error = poller_->PollOne()) {
111+
fprintf(stderr, "Could not poll: %s\n", error->Message());
112+
continue;
113+
}
114+
115+
if (received_wakeup_)
116+
continue;
117+
}
118+
}
119+
120+
void
121+
WindowsEventLoopForIO::NotifyTask()
122+
{
123+
wakeup_->Signal(poller_);
124+
}
125+
126+
void
127+
WindowsEventLoopForIO::NotifyQuit()
128+
{
129+
wakeup_->Signal(poller_);
130+
}

windows/windows-event-loop.h

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// vim: set ts=2 sw=2 tw=99 et:
2+
//
3+
// Copyright (C) 2014 David Anderson
4+
//
5+
// This file is part of the AlliedModders I/O Library.
6+
//
7+
// The AlliedModders I/O library is licensed under the GNU General Public
8+
// License, version 3 or higher. For more information, see LICENSE.txt
9+
//
10+
#ifndef _include_amio_windows_event_loop_h_
11+
#define _include_amio_windows_event_loop_h_
12+
13+
#include <amio.h>
14+
#include <amio-eventloop.h>
15+
#include "../shared/shared-task-queue.h"
16+
17+
namespace amio {
18+
19+
using namespace ke;
20+
21+
class WindowsEventLoopForIO
22+
: public EventLoopForIO,
23+
public TaskQueue::Delegate,
24+
public RefcountedThreadsafe<WindowsEventLoopForIO>
25+
{
26+
public:
27+
WindowsEventLoopForIO(Ref<Poller> poller);
28+
29+
KE_IMPL_REFCOUNTING_TS(WindowsEventLoopForIO);
30+
31+
PassRef<Poller> GetPoller() override;
32+
33+
PassRef<IOError> Attach(Ref<Transport> transport, Ref<IOListener> listener) override;
34+
35+
void PostTask(Task *task) override;
36+
void PostQuit() override;
37+
bool ShouldQuit() override;
38+
void Loop() override;
39+
void Shutdown() override;
40+
41+
void NotifyTask() override;
42+
void NotifyQuit() override;
43+
44+
private:
45+
class Wakeup
46+
: public IOListener,
47+
public RefcountedThreadsafe<Wakeup>
48+
{
49+
public:
50+
Wakeup();
51+
KE_IMPL_REFCOUNTING_TS(Wakeup);
52+
53+
void OnCompleted(IOResult &r);
54+
void Signal(Ref<Poller> poller);
55+
56+
private:
57+
Ref<IOContext> context_;
58+
};
59+
60+
private:
61+
Ref<Poller> poller_;
62+
TaskQueueImpl tasks_;
63+
Ref<Wakeup> wakeup_;
64+
bool received_wakeup_;
65+
};
66+
67+
} // namespace amio
68+
69+
#endif // _include_amio_windows_event_loop_h_

windows/windows-iocp.cc

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ CompletionPort::Dispatch(WinContext *context, OVERLAPPED_ENTRY &entry, DWORD err
144144
{
145145
Ref<IOListener> listener;
146146

147+
IOResult result;
148+
result.bytes = entry.dwNumberOfBytesTransferred;
149+
result.completed = true;
150+
147151
// We take the lock at this point - since we could race with changeListener
148152
// or WinContext::cancel().
149153
RequestType request;
@@ -160,8 +164,8 @@ CompletionPort::Dispatch(WinContext *context, OVERLAPPED_ENTRY &entry, DWORD err
160164

161165
// If we have a transport and it's closed - or the IO operation was
162166
// cancelled - just leave now. Note we always grab the transport even
163-
// if the state was Cancelled, since we have to free the ref.
164-
if (request == RequestType::Cancelled)
167+
// if the state was Cancelled, since we have to acquire its ref.
168+
if (transport->Closed() || request == RequestType::Cancelled)
165169
return false;
166170

167171
// Would be much nicer if we could use RtlNtStatusToDosError() here, but
@@ -181,15 +185,11 @@ CompletionPort::Dispatch(WinContext *context, OVERLAPPED_ENTRY &entry, DWORD err
181185
}
182186

183187
// Detach the context. After this point, we can return whenever we want.
184-
// Note that the object has already been refed above, so we always pass
185-
// nullptr here.
186-
WinBasePoller::unlink(context, static_cast<IRefcounted *>(nullptr));
188+
// Note that the object may have exactly one ref at this point, so it's
189+
// important that it goes right into a Refed location.
190+
result.context = WinBasePoller::take(context);
187191
}
188192

189-
IOResult result;
190-
result.bytes = entry.dwNumberOfBytesTransferred;
191-
result.completed = true;
192-
result.context = context;
193193
if (error) {
194194
if (error == ERROR_HANDLE_EOF)
195195
result.ended = true;

0 commit comments

Comments
 (0)