Skip to content

Commit 032ad98

Browse files
committed
Rework Scheduler
1 parent 7c058d5 commit 032ad98

15 files changed

+136
-93
lines changed

composer.json

+3-6
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,19 @@
2121
],
2222
"require": {
2323
"php": "~7.0",
24-
"async-interop/promise": "^0.3",
25-
"async-interop/event-loop": "^0.4",
26-
"async-interop/event-loop-implementation": "^0.4"
24+
"async-interop/promise": "^0.3"
2725
},
2826
"require-dev": {
29-
"wyrihaximus/react-async-interop-loop": "^0.2.1",
3027
"satooshi/php-coveralls": "~1.0",
3128
"phpunit/phpcov": "^3.1",
32-
"phpunit/phpunit": "^5.5"
29+
"phpunit/phpunit": "^5.5",
30+
"react/event-loop": "^0.4.2"
3331
},
3432
"autoload": {
3533
"psr-4": { "Rx\\": "src" }
3634
},
3735
"autoload-dev": {
3836
"files": [
39-
"test/loop-auto-start.php",
4037
"test/helper-functions.php"
4138
],
4239
"psr-4": {

demo/bootstrap.php

+9-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
* file that was distributed with this source code.
1010
*/
1111

12+
use React\EventLoop\Factory;
13+
use Rx\Scheduler;
14+
1215
if (file_exists($file = __DIR__.'/../vendor/autoload.php')) {
1316
$autoload = require_once $file;
1417
$autoload->addPsr4('Vendor\\Rx\\Operator\\', __DIR__ . '/custom-operator');
@@ -33,5 +36,10 @@ function () use ($prefix) { echo $prefix . "Complete!\n"; }
3336
);
3437
};
3538

36-
3739
$stdoutObserver = $createStdoutObserver();
40+
41+
$loop = Factory::create();
42+
Scheduler::setDefault(new Scheduler\EventLoopScheduler($loop));
43+
register_shutdown_function(function () use ($loop) {
44+
$loop->run();
45+
});

demo/subscribeOn/subscribeOn.php

+8-5
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,35 @@
22

33
require_once __DIR__ . '/../bootstrap.php';
44

5-
use Interop\Async\Loop;
5+
use React\EventLoop\Factory;
66
use Rx\Disposable\CallbackDisposable;
77
use Rx\ObserverInterface;
88
use Rx\Scheduler\EventLoopScheduler;
99

10-
$observable = Rx\Observable::create(function (ObserverInterface $observer) {
10+
$loop = Factory::create();
11+
12+
$observable = Rx\Observable::create(function (ObserverInterface $observer) use ($loop) {
1113
$handler = function () use ($observer) {
1214
$observer->onNext(42);
1315
$observer->onCompleted();
1416
};
1517

1618
// Change scheduler for here
17-
$timer = Loop::delay(1, $handler);
19+
$timer = $loop->addTimer(0.001, $handler);
1820

1921
return new CallbackDisposable(function () use ($timer) {
2022
// And change scheduler for here
2123
if ($timer) {
22-
Loop::cancel($timer);
24+
$timer->cancel();
2325
}
2426
});
2527
});
2628

2729
$observable
28-
->subscribeOn(new EventLoopScheduler())
30+
->subscribeOn(new EventLoopScheduler($loop))
2931
->subscribe($stdoutObserver);
3032

33+
$loop->run();
3134

3235
//Next value: 42
3336
//Complete!

src/AsyncSchedulerInterface.php

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
3+
namespace Rx;
4+
5+
interface AsyncSchedulerInterface extends SchedulerInterface
6+
{
7+
}

src/Observable.php

+4-4
Original file line numberDiff line numberDiff line change
@@ -1070,14 +1070,14 @@ public function skipUntil(ObservableInterface $other): Observable
10701070
* Returns an observable sequence that produces a value after dueTime has elapsed.
10711071
*
10721072
* @param integer $dueTime - milliseconds
1073-
* @param SchedulerInterface $scheduler
1073+
* @param AsyncSchedulerInterface $scheduler
10741074
* @return TimerObservable
10751075
*
10761076
* @demo timer/timer.php
10771077
* @operator
10781078
* @reactivex timer
10791079
*/
1080-
public static function timer(int $dueTime, SchedulerInterface $scheduler = null): TimerObservable
1080+
public static function timer(int $dueTime, AsyncSchedulerInterface $scheduler = null): TimerObservable
10811081
{
10821082
return new TimerObservable($dueTime, $scheduler ?: Scheduler::getAsync());
10831083
}
@@ -1561,14 +1561,14 @@ public function delay(int $delay, SchedulerInterface $scheduler = null): Observa
15611561
*
15621562
* @param $timeout
15631563
* @param ObservableInterface $timeoutObservable
1564-
* @param SchedulerInterface $scheduler
1564+
* @param AsyncSchedulerInterface $scheduler
15651565
* @return Observable
15661566
*
15671567
* @demo timeout/timeout.php
15681568
* @operator
15691569
* @reactivex timeout
15701570
*/
1571-
public function timeout(int $timeout, ObservableInterface $timeoutObservable = null, SchedulerInterface $scheduler = null): Observable
1571+
public function timeout(int $timeout, ObservableInterface $timeoutObservable = null, AsyncSchedulerInterface $scheduler = null): Observable
15721572
{
15731573
return $this->lift(function () use ($timeout, $timeoutObservable, $scheduler) {
15741574
return new TimeoutOperator($timeout, $timeoutObservable, $scheduler ?: Scheduler::getAsync());

src/Scheduler.php

+24-12
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,39 @@ class Scheduler
1515

1616
public static function getDefault(): SchedulerInterface
1717
{
18-
if (!static::$default) {
19-
static::$default = new EventLoopScheduler();
18+
if (static::$default) {
19+
return static::$default;
2020
}
2121

22-
return static::$default;
22+
throw new \Exception(
23+
"Please set a default scheduler (for react: Scheduler::setDefault(new EventLoopScheduler(\$loop));"
24+
);
2325
}
2426

2527
public static function setDefault(SchedulerInterface $scheduler)
2628
{
2729
if (static::$default !== null) {
28-
throw new \Exception("Scheduler can only be set once. (Are you calling set after get?)");
30+
throw new \Exception("Scheduler can only be set once.");
2931
}
32+
3033
static::$default = $scheduler;
3134
}
3235

33-
public static function getAsync(): SchedulerInterface
36+
public static function getAsync(): AsyncSchedulerInterface
3437
{
35-
if (!static::$async) {
36-
static::$async = new EventLoopScheduler();
38+
if (static::$async) {
39+
return static::$async;
3740
}
38-
return self::$async;
41+
42+
if (static::$default instanceof AsyncSchedulerInterface) {
43+
static::$async = static::$default;
44+
45+
return static::$async;
46+
}
47+
48+
throw new \Exception(
49+
"Please set an async scheduler (for react: Scheduler::setAsync(new EventLoopScheduler(\$loop));"
50+
);
3951
}
4052

4153
public static function getImmediate(): ImmediateScheduler
@@ -46,18 +58,18 @@ public static function getImmediate(): ImmediateScheduler
4658
return self::$immediate;
4759
}
4860

49-
public static function setAsync($async)
61+
public static function setAsync(AsyncSchedulerInterface $async)
5062
{
5163
if (static::$async !== null) {
52-
throw new \Exception("Scheduler can only be set once. (Are you calling set after get?)");
64+
throw new \Exception("Scheduler can only be set once.");
5365
}
5466
self::$async = $async;
5567
}
5668

57-
public static function setImmediate($immediate)
69+
public static function setImmediate(SchedulerInterface $immediate)
5870
{
5971
if (static::$immediate !== null) {
60-
throw new \Exception("Scheduler can only be set once. (Are you calling set after get?)");
72+
throw new \Exception("Scheduler can only be set once.");
6173
}
6274
self::$immediate = $immediate;
6375
}

src/Scheduler/EventLoopScheduler.php

+16-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
namespace Rx\Scheduler;
66

7-
use Interop\Async\Loop;
7+
use React\EventLoop\LoopInterface;
88
use Rx\DisposableInterface;
99

1010
final class EventLoopScheduler extends VirtualTimeScheduler
@@ -13,8 +13,20 @@ final class EventLoopScheduler extends VirtualTimeScheduler
1313

1414
private $insideInvoke = false;
1515

16-
public function __construct()
16+
private $delayCallback;
17+
18+
/**
19+
* EventLoopScheduler constructor.
20+
* @param callable|LoopInterface $timerCallableOrLoop
21+
*/
22+
public function __construct($timerCallableOrLoop)
1723
{
24+
$this->delayCallback = $timerCallableOrLoop instanceof LoopInterface ?
25+
function ($ms, $callable) use ($timerCallableOrLoop) {
26+
$timerCallableOrLoop->addTimer($ms / 1000, $callable);
27+
} :
28+
$timerCallableOrLoop;
29+
1830
parent::__construct($this->now(), function ($a, $b) {
1931
return $a - $b;
2032
});
@@ -25,7 +37,7 @@ public function scheduleAbsoluteWithState($state, int $dueTime, callable $action
2537
$disp = parent::scheduleAbsoluteWithState($state, $dueTime, $action);
2638

2739
if (!$this->insideInvoke) {
28-
Loop::delay(0, [$this, 'start']);
40+
call_user_func($this->delayCallback, 0, [$this, 'start']);
2941
}
3042

3143
return $disp;
@@ -41,7 +53,7 @@ public function start()
4153
if ($next !== null) {
4254
if ($next->getDueTime() > $this->clock) {
4355
$this->nextTimer = $next->getDueTime();
44-
Loop::delay($this->nextTimer - $this->clock, [$this, "start"]);
56+
call_user_func($this->delayCallback, $this->nextTimer - $this->clock, [$this, "start"]);
4557
break;
4658
}
4759

src/Scheduler/VirtualTimeScheduler.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44

55
namespace Rx\Scheduler;
66

7+
use Rx\AsyncSchedulerInterface;
78
use Rx\Disposable\EmptyDisposable;
89
use Rx\Disposable\SerialDisposable;
910
use Rx\DisposableInterface;
10-
use Rx\SchedulerInterface;
1111

12-
class VirtualTimeScheduler implements SchedulerInterface
12+
class VirtualTimeScheduler implements AsyncSchedulerInterface
1313
{
1414
protected $clock;
1515
protected $comparer;

test/Rx/Functional/Operator/AsObservableTest.php

-18
Original file line numberDiff line numberDiff line change
@@ -134,22 +134,4 @@ public function testAsObservableIsNotEager()
134134

135135
$this->assertTrue($subscribed);
136136
}
137-
138-
public function testAsObservablePassThroughScheduler()
139-
{
140-
$gotValue = false;
141-
Observable::interval(10)
142-
->asObservable()
143-
->take(1)
144-
->subscribe(new CallbackObserver(
145-
function ($x) use (&$gotValue) {
146-
$this->assertEquals(0, $x);
147-
$gotValue = true;
148-
}
149-
));
150-
151-
Loop::get()->run();
152-
153-
$this->assertTrue($gotValue);
154-
}
155137
}

test/Rx/Functional/Operator/CombineLatestTest.php

+10-23
Original file line numberDiff line numberDiff line change
@@ -901,33 +901,20 @@ public function combineLatest_selector_throws()
901901
*/
902902
public function combineLatest_delay()
903903
{
904-
$loop = Loop::get();
905-
$scheduler = new EventLoopScheduler($loop);
906-
907-
$source1 = Observable::timer(100);
908-
$source2 = Observable::timer(120);
909-
$source3 = Observable::timer(140);
904+
$source1 = Observable::timer(100, $this->scheduler);
905+
$source2 = Observable::timer(120, $this->scheduler);
906+
$source3 = Observable::timer(140, $this->scheduler);
910907

911908
$source = $source1->combineLatest([$source2, $source3]);
912909

913-
$result = null;
914-
$completed = false;
915-
916-
$source->subscribe(new CallbackObserver(
917-
function ($x) use (&$result) {
918-
$result = $x;
919-
},
920-
null,
921-
function () use (&$completed) {
922-
$completed = true;
923-
}
924-
925-
));
926-
927-
$loop->run();
910+
$result = $this->scheduler->startWithCreate(function () use ($source) {
911+
return $source;
912+
});
928913

929-
$this->assertEquals([0, 0, 0], $result);
930-
$this->assertTrue($completed);
914+
$this->assertMessages([
915+
onNext(340, [0, 0, 0]),
916+
onCompleted(340)
917+
], $result->getMessages());
931918
}
932919

933920
/**

test/Rx/Functional/Scheduler/EventLoopSchedulerTest.php

+6-2
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
namespace Rx\Functional\Scheduler;
66

77
use Interop\Async\Loop;
8+
use React\EventLoop\Factory;
89
use Rx\Functional\FunctionalTestCase;
910
use Rx\Observable;
1011
use Rx\Observer\CallbackObserver;
12+
use Rx\Scheduler\EventLoopScheduler;
1113

1214
class EventLoopSchedulerTest extends FunctionalTestCase
1315
{
@@ -16,7 +18,9 @@ public function testDisposeInsideFirstSchedulePeriodicAction()
1618
$completed = false;
1719
$nextCount = 0;
1820

19-
Observable::interval(50)
21+
$loop = Factory::create();
22+
23+
Observable::interval(50, new EventLoopScheduler($loop))
2024
->take(1)
2125
->subscribe(new CallbackObserver(
2226
function ($x) use (&$nextCount) {
@@ -30,7 +34,7 @@ function () use (&$completed) {
3034
}
3135
));
3236

33-
Loop::get()->run();
37+
$loop->run();
3438

3539
$this->assertTrue($completed);
3640
$this->assertEquals(1, $nextCount);

0 commit comments

Comments
 (0)