Skip to content

Commit a0873e5

Browse files
authored
Merge pull request #26 from scala-native/poll
Implement Poll abstraction
2 parents c6f81d0 + b6fef31 commit a0873e5

File tree

8 files changed

+123
-82
lines changed

8 files changed

+123
-82
lines changed

client/curl.scala

Lines changed: 37 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import scala.scalanative.libc.stdlib._
55
import scala.scalanative.libc.string._
66
import scala.concurrent._
77
import scala.scalanative.runtime.Boxes
8-
import scala.scalanative.runtime.Intrinsics
98

109
case class ResponseState(
1110
var code: Int = 200,
@@ -16,14 +15,11 @@ case class ResponseState(
1615
object Curl {
1716
import LibCurl._
1817
import LibCurlConstants._
19-
import LibUV._
2018
import LibUVConstants._
2119

2220
var serial = 0L
2321

24-
val loop = EventLoop.loop
25-
var multi: MultiCurl = null
26-
val timerHandle: TimerHandle = malloc(uv_handle_size(UV_TIMER_T))
22+
var multi: MultiCurl = null
2723

2824
val requestPromises = mutable.Map[Long, Promise[ResponseState]]()
2925
val requests = mutable.Map[Long, ResponseState]()
@@ -44,7 +40,6 @@ object Curl {
4440
multi_setopt_ptr(multi, TIMERFUNCTION, func_to_ptr(startTimerCB))
4541
println(s"timerCB: $startTimerCB")
4642

47-
check(uv_timer_init(loop, timerHandle), "uv_timer_init")
4843
initialized = true
4944
println("done")
5045
}
@@ -164,60 +159,52 @@ object Curl {
164159
val socketCB = new CurlSocketCallback {
165160
def apply(
166161
curl: Curl,
167-
socket: Ptr[Byte],
162+
socket: Int,
168163
action: Int,
169164
data: Ptr[Byte],
170165
socket_data: Ptr[Byte]
171166
): Int = {
172167
println(s"socketCB called with action ${action}")
173168
val pollHandle = if (socket_data == null) {
174169
println(s"initializing handle for socket ${socket}")
175-
val buf = malloc(uv_handle_size(UV_POLL_T)).asInstanceOf[Ptr[Ptr[Byte]]]
176-
!buf = socket
177-
check(uv_poll_init_socket(loop, buf, socket), "uv_poll_init_socket")
170+
val poll = Poll(socket)
178171
check(
179-
multi_assign(multi, socket, buf.asInstanceOf[Ptr[Byte]]),
172+
multi_assign(multi, socket, poll.ptr),
180173
"multi_assign"
181174
)
182-
buf
175+
poll
183176
} else {
184-
socket_data.asInstanceOf[Ptr[Ptr[Byte]]]
177+
new Poll(socket_data)
185178
}
186179

187-
val events = action match {
188-
case POLL_NONE => None
189-
case POLL_IN => Some(UV_READABLE)
190-
case POLL_OUT => Some(UV_WRITABLE)
191-
case POLL_INOUT => Some(UV_READABLE | UV_WRITABLE)
192-
case POLL_REMOVE => None
193-
}
180+
val readable = action == POLL_IN || action == POLL_INOUT
181+
val writable = action == POLL_OUT || action == POLL_INOUT
194182

195-
events match {
196-
case Some(ev) =>
197-
println(s"starting poll with events $ev")
198-
uv_poll_start(pollHandle, ev, pollCB)
199-
case None =>
200-
println("stopping poll")
201-
uv_poll_stop(pollHandle)
202-
startTimerCB(multi, 1, null)
183+
if (readable || writable) {
184+
println(
185+
s"starting poll with readable = $readable and writable = $writable"
186+
)
187+
pollHandle.start(readable, writable) { (status, readable, writable) =>
188+
println(
189+
s"ready_for_curl fired with status $status and readable = $readable writable = $writable"
190+
)
191+
var actions = 0
192+
if (readable) actions |= 1
193+
if (writable) actions |= 2
194+
val running_handles = stackalloc[Int]
195+
val result =
196+
multi_socket_action(multi, socket, actions, running_handles)
197+
println("multi_socket_action", result)
198+
}
199+
} else {
200+
println("stopping poll")
201+
pollHandle.stop()
202+
startTimerCB(multi, 1, null)
203203
}
204204
0
205205
}
206206
}
207207

208-
val pollCB = new PollCB {
209-
def apply(pollHandle: PollHandle, status: Int, events: Int): Unit = {
210-
println(
211-
s"ready_for_curl fired with status ${status} and events ${events}"
212-
)
213-
val socket = !(pollHandle.asInstanceOf[Ptr[Ptr[Byte]]])
214-
val actions = (events & 1) | (events & 2) // Whoa, nelly!
215-
val running_handles = stackalloc[Int]
216-
val result = multi_socket_action(multi, socket, actions, running_handles)
217-
println("multi_socket_action", result)
218-
}
219-
}
220-
221208
val startTimerCB = new CurlTimerCallback {
222209
def apply(curl: MultiCurl, timeout_ms: Long, data: Ptr[Byte]): Int = {
223210
println(s"start_timer called with timeout ${timeout_ms} ms")
@@ -226,23 +213,19 @@ object Curl {
226213
1
227214
} else timeout_ms
228215
println("starting timer")
229-
check(uv_timer_start(timerHandle, timeoutCB, time, 0), "uv_timer_start")
216+
Timer.timeout(time) { () =>
217+
println("in timeout callback")
218+
val running_handles = stackalloc[Int]
219+
multi_socket_action(multi, -1, 0, running_handles)
220+
println(s"on_timer fired, ${!running_handles} sockets running")
221+
}
230222
println("cleaning up requests")
231223
cleanup_requests()
232224
println("done")
233225
0
234226
}
235227
}
236228

237-
val timeoutCB = new TimerCB {
238-
def apply(handle: TimerHandle): Unit = {
239-
println("in timeout callback")
240-
val running_handles = stackalloc[Int]
241-
multi_socket_action(multi, int_to_ptr(-1), 0, running_handles)
242-
println(s"on_timer fired, ${!running_handles} sockets running")
243-
}
244-
}
245-
246229
def cleanup_requests(): Unit = {
247230
val messages = stackalloc[Int]
248231
val privateDataPtr = stackalloc[Ptr[Long]]
@@ -299,14 +282,6 @@ object Curl {
299282
Boxes.boxToPtr[Byte](Boxes.unboxToCFuncRawPtr(f))
300283
}
301284

302-
def int_to_ptr(i: Int): Ptr[Byte] = {
303-
Boxes.boxToPtr[Byte](Intrinsics.castIntToRawPtr(i))
304-
}
305-
306-
def long_to_ptr(l: Long): Ptr[Byte] = {
307-
Boxes.boxToPtr[Byte](Intrinsics.castLongToRawPtr(l))
308-
}
309-
310285
}
311286

312287
object LibCurlConstants {
@@ -349,7 +324,7 @@ object LibCurlConstants {
349324

350325
type CurlDataCallback = CFuncPtr4[Ptr[Byte], CSize, CSize, Ptr[Byte], CSize]
351326
type CurlSocketCallback =
352-
CFuncPtr5[Curl, Ptr[Byte], CInt, Ptr[Byte], Ptr[Byte], CInt]
327+
CFuncPtr5[Curl, CInt, CInt, Ptr[Byte], Ptr[Byte], CInt]
353328
type CurlTimerCallback = CFuncPtr3[MultiCurl, Long, Ptr[Byte], CInt]
354329

355330
@name("curl_global_init")
@@ -408,14 +383,14 @@ object LibCurlConstants {
408383
@name("curl_multi_assign")
409384
def multi_assign(
410385
multi: MultiCurl,
411-
socket: Ptr[Byte],
386+
socket: Int,
412387
socket_data: Ptr[Byte]
413388
): Int = extern
414389

415390
@name("curl_multi_socket_action")
416391
def multi_socket_action(
417392
multi: MultiCurl,
418-
socket: Ptr[Byte],
393+
socket: Int,
419394
events: Int,
420395
numhandles: Ptr[Int]
421396
): Int = extern

core/loop.scala renamed to core/src/main/scala/scala/scalanative/loop/Eventloop.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ object EventLoop {
3030
@tailrec
3131
def runUv(): Unit = {
3232
val res = uv_run(loop, UV_RUN_ONCE)
33-
if(res != 0) runUv()
33+
if (res != 0) runUv()
3434
}
3535

3636
scala.scalanative.runtime.loop()
@@ -49,7 +49,7 @@ object EventLoop {
4949
object LibUV {
5050
type UVHandle = Ptr[Byte]
5151
type PipeHandle = Ptr[Byte]
52-
type PollHandle = Ptr[Ptr[Byte]]
52+
type PollHandle = Ptr[Byte]
5353
type TCPHandle = Ptr[Byte]
5454
type PrepareHandle = Ptr[Byte]
5555
type TimerHandle = Ptr[Byte]
@@ -100,6 +100,8 @@ object LibUV {
100100
def uv_pipe_open(handle: PipeHandle, fd: Int): Int = extern
101101
def uv_pipe_bind(handle: PipeHandle, socketName: CString): Int = extern
102102

103+
def uv_poll_init(loop: Loop, handle: PollHandle, fd: Int): Int =
104+
extern
103105
def uv_poll_init_socket(
104106
loop: Loop,
105107
handle: PollHandle,
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package scala.scalanative.loop
2+
3+
import scala.scalanative.libc.stdlib
4+
import LibUV._, LibUVConstants._
5+
import scala.scalanative.unsafe.Ptr
6+
import internals.HandleUtils
7+
8+
@inline class Poll(val ptr: Ptr[Byte]) extends AnyVal {
9+
def start(in: Boolean, out: Boolean)(
10+
callback: (Int, Boolean, Boolean) => Unit
11+
): Unit = {
12+
HandleUtils.setData(ptr, callback)
13+
var events = 0
14+
if (out) events |= UV_WRITABLE
15+
if (in) events |= UV_READABLE
16+
uv_poll_start(ptr, events, Poll.pollCB)
17+
}
18+
19+
def stop(): Unit = {
20+
uv_poll_stop(ptr)
21+
HandleUtils.close(ptr)
22+
}
23+
}
24+
25+
object Poll {
26+
private val pollCB = new PollCB {
27+
def apply(handle: PollHandle, status: Int, events: Int): Unit = {
28+
val callback =
29+
HandleUtils.getData[(Int, Boolean, Boolean) => Unit](handle)
30+
callback.apply(
31+
status,
32+
(events & UV_READABLE) != 0,
33+
(events & UV_WRITABLE) != 0
34+
)
35+
}
36+
}
37+
38+
private lazy val size = uv_handle_size(UV_POLL_T)
39+
40+
def apply(fd: Int): Poll = {
41+
val pollHandle = stdlib.malloc(size)
42+
uv_poll_init(EventLoop.loop, pollHandle, fd)
43+
new Poll(pollHandle)
44+
}
45+
}

core/src/main/scala/scala/scalanative/loop/Timer.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ object Timer {
2929
}
3030
}
3131
@inline
32-
private def startTimer(timeout: Long, repeat: Long, callback: () => Unit): Timer = {
32+
private def startTimer(
33+
timeout: Long,
34+
repeat: Long,
35+
callback: () => Unit
36+
): Timer = {
3337
val timerHandle = stdlib.malloc(uv_handle_size(UV_TIMER_T))
3438
uv_timer_init(EventLoop.loop, timerHandle)
3539
HandleUtils.setData(timerHandle, callback)

core/src/main/scala/scala/scalanative/loop/internals/HandleUtils.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,18 @@ import scala.scalanative.libc.stdlib
77
import scala.collection.mutable
88
import LibUV._
99

10-
private [loop] object HandleUtils {
10+
private[loop] object HandleUtils {
1111
private val references = mutable.Map.empty[Long, Int]
1212

1313
@inline def getData[T <: Object](handle: Ptr[Byte]): T = {
14-
val data = LibUV.uv_handle_get_data(handle)
14+
val data = LibUV.uv_handle_get_data(handle)
1515
val rawptr = castLongToRawPtr(data)
1616
castRawPtrToObject(rawptr).asInstanceOf[T]
1717
}
1818
@inline def setData[T <: Object](handle: Ptr[Byte], function: T): Unit = {
1919
val rawptr = castObjectToRawPtr(function)
20-
val data = castRawPtrToLong(rawptr)
21-
if(references.contains(data)) references(data) += 1
20+
val data = castRawPtrToLong(rawptr)
21+
if (references.contains(data)) references(data) += 1
2222
else references(data) = 1
2323
LibUV.uv_handle_set_data(handle, data)
2424
}
@@ -29,9 +29,9 @@ private [loop] object HandleUtils {
2929
}
3030
@inline def close(handle: Ptr[Byte]): Unit = {
3131
uv_close(handle, onCloseCB)
32-
val data = LibUV.uv_handle_get_data(handle)
32+
val data = LibUV.uv_handle_get_data(handle)
3333
val current = references(data)
34-
if(current > 1) references(data) -= 1
34+
if (current > 1) references(data) -= 1
3535
else references.remove(data)
3636
}
3737
}

core/src/test/scala/scala/scalanative/loop/TimerTests.scala

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import scala.concurrent.Promise
88
object TimerTests extends TestSuite {
99
val tests = Tests {
1010
def now(): Duration = System.currentTimeMillis().millis
11-
val d = 200.millis
11+
val d = 200.millis
1212
test("delay") {
13-
var i = 0
13+
var i = 0
1414
val startTime = now()
1515
def checkDelay(time: Int) = {
1616
i += 1
@@ -25,13 +25,13 @@ object TimerTests extends TestSuite {
2525
} yield ()
2626
}
2727
test("repeat") {
28-
var i = 0
29-
val startTime = now()
30-
val times = 3
31-
val p = Promise[Unit]()
28+
var i = 0
29+
val startTime = now()
30+
val times = 3
31+
val p = Promise[Unit]()
3232
var timer: Timer = null.asInstanceOf[Timer]
3333
timer = Timer.repeat(d.toMillis) { () =>
34-
if(i == times) {
34+
if (i == times) {
3535
p.success(())
3636
timer.clear()
3737
} else i += 1
@@ -42,5 +42,14 @@ object TimerTests extends TestSuite {
4242
assert(took >= d * 3)
4343
}
4444
}
45+
test("clear timeout") {
46+
val handle = Timer.timeout(d.toMillis) { () =>
47+
throw new Exception("This timeout should have not triggered")
48+
}
49+
handle.clear()
50+
for {
51+
() <- Timer.delay(d * 2)
52+
} yield ()
53+
}
4554
}
4655
}

scalajs-compat/RawTimers.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ object RawTimers {
3131
* @return A handle that can be used to cancel the timeout by passing it
3232
* to [[clearTimeout]].
3333
*/
34-
@inline def setTimeout(handler: () => Unit, interval: Double): SetTimeoutHandle =
34+
@inline def setTimeout(
35+
handler: () => Unit,
36+
interval: Double
37+
): SetTimeoutHandle =
3538
Timer.timeout(interval.toLong)(handler)
3639

3740
/** Schedule `handler` for repeated execution every `interval`
@@ -42,6 +45,9 @@ object RawTimers {
4245
* @return A handle that can be used to cancel the interval by passing it
4346
* to [[clearInterval]].
4447
*/
45-
@inline def setInterval(handler: () => Unit, interval: Double): SetIntervalHandle =
48+
@inline def setInterval(
49+
handler: () => Unit,
50+
interval: Double
51+
): SetIntervalHandle =
4652
Timer.repeat(interval.toLong)(handler)
4753
}

scalajs-compat/package.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ package object timers {
2626

2727
import scala.scalanative.loop.Timer
2828

29-
type SetTimeoutHandle = Timer
30-
type SetIntervalHandle = Timer
29+
type SetTimeoutHandle = Timer
30+
type SetIntervalHandle = Timer
3131

3232
/** Schedule something for execution in `interval` milliseconds.
3333
*

0 commit comments

Comments
 (0)