Skip to content

Commit e924b1d

Browse files
Kevin SmithKevin Smith
authored andcommitted
BREAKING: Align with current TC39 proposal
1 parent 8d2a788 commit e924b1d

23 files changed

Lines changed: 1317 additions & 352 deletions

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
.DS_Store
2-
_*
32
node_modules
3+
zen-observable.js

.npmignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
.DS_Store
2+
node_modules

LICENSE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Copyright (c) 2015 zenparsing (Kevin Smith)
1+
Copyright (c) 2018 zenparsing (Kevin Smith)
22

33
Permission is hereby granted, free of charge, to any person obtaining a copy
44
of this software and associated documentation files (the "Software"), to deal

README.md

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -180,20 +180,3 @@ Observable.of(0, 1, 2, 3, 4).reduce((previousValue, currentValue) => {
180180
```
181181

182182
Returns a new Observable that applies a function against an accumulator and each value of the stream to reduce it to a single value.
183-
184-
185-
### observable.flatMap ( callback )
186-
187-
Returns a new Observable that emits the values from each Observable that is returned from the `callback` argument.
188-
189-
```js
190-
Observable.of("Hello", "Goodbye").flatMap(value => {
191-
return Observable.of(value + " Earth", value + " Mars");
192-
}).subscribe(value => {
193-
console.log(value);
194-
});
195-
// "Hello Earth"
196-
// "Hello Mars"
197-
// "Goodbye Earth"
198-
// "Goodbye Mars"
199-
```

package-lock.json

Lines changed: 9 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
{
22
"name": "zen-observable",
3-
"version": "0.6.1",
3+
"version": "0.7.0",
44
"repository": "zenparsing/zen-observable",
55
"description": "An Implementation of ES Observables",
66
"homepage": "https://github.com/zenparsing/zen-observable",
77
"license": "MIT",
88
"devDependencies": {
9-
"es-observable-tests": "^0.3.0",
10-
"moon-unit": "^0.2.1",
11-
"v8-promise": "*"
9+
"esdown": "^1.2.8",
10+
"moon-unit": "^0.2.1"
1211
},
13-
"dependencies": {}
12+
"dependencies": {},
13+
"scripts": {
14+
"test": "esdown test",
15+
"build": "esdown - src/Observable.js zen-observable.js -g '*'",
16+
"prepublishOnly": "npm run build"
17+
}
1418
}

src/Observable.js

Lines changed: 46 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ if (typeof Symbol === "function" && !Symbol.observable) {
1515

1616
// === Abstract Operations ===
1717

18+
function hostReportError(e) {
19+
setTimeout(() => { throw e });
20+
}
21+
1822
function getMethod(obj, key) {
1923
let value = obj[key];
2024

@@ -59,7 +63,8 @@ function cleanupSubscription(subscription) {
5963
subscription._cleanup = undefined;
6064

6165
// Call the cleanup function
62-
cleanup();
66+
try { cleanup() }
67+
catch (e) { hostReportError(e) }
6368
}
6469

6570
function subscriptionClosed(subscription) {
@@ -88,10 +93,12 @@ function Subscription(observer, subscriber) {
8893
this._cleanup = undefined;
8994
this._observer = observer;
9095

91-
let start = getMethod(observer, "start");
92-
93-
if (start)
94-
start.call(observer, this);
96+
try {
97+
let start = getMethod(observer, "start");
98+
if (start) start.call(observer, this);
99+
} catch (e) {
100+
hostReportError(e);
101+
}
95102

96103
if (subscriptionClosed(this))
97104
return;
@@ -141,73 +148,68 @@ addMethods(SubscriptionObserver.prototype = {}, {
141148

142149
// If the stream is closed, then return undefined
143150
if (subscriptionClosed(subscription))
144-
return undefined;
151+
return;
145152

146153
let observer = subscription._observer;
147-
let m = getMethod(observer, "next");
148154

149-
// If the observer doesn't support "next", then return undefined
150-
if (!m)
151-
return undefined;
152-
153-
// Send the next value to the sink
154-
return m.call(observer, value);
155+
try {
156+
// If the observer has a "next" method, send the next value
157+
let m = getMethod(observer, "next");
158+
if (m) m.call(observer, value);
159+
} catch (e) {
160+
hostReportError(e);
161+
}
155162
},
156163

157164
error(value) {
158165
let subscription = this._subscription;
159166

160167
// If the stream is closed, throw the error to the caller
161-
if (subscriptionClosed(subscription))
162-
throw value;
168+
if (subscriptionClosed(subscription)) {
169+
hostReportError(value);
170+
return;
171+
}
163172

164173
let observer = subscription._observer;
165174
subscription._observer = undefined;
166175

167176
try {
168177
let m = getMethod(observer, "error");
169-
170-
// If the sink does not support "error", then throw the error to the caller
171-
if (!m)
172-
throw value;
173-
174-
value = m.call(observer, value);
178+
if (m) m.call(observer, value);
179+
else throw value;
175180
} catch (e) {
176-
try { cleanupSubscription(subscription) }
177-
finally { throw e }
181+
hostReportError(e);
178182
}
179183

180184
cleanupSubscription(subscription);
181-
return value;
182185
},
183186

184-
complete(value) {
187+
complete() {
185188
let subscription = this._subscription;
186189

187-
// If the stream is closed, then return undefined
188190
if (subscriptionClosed(subscription))
189-
return undefined;
191+
return;
190192

191193
let observer = subscription._observer;
192194
subscription._observer = undefined;
193195

194196
try {
195197
let m = getMethod(observer, "complete");
196-
197-
// If the sink does not support "complete", then return undefined
198-
value = m ? m.call(observer, value) : undefined;
198+
if (m) m.call(observer);
199199
} catch (e) {
200-
try { cleanupSubscription(subscription) }
201-
finally { throw e }
200+
hostReportError(e);
202201
}
203202

204203
cleanupSubscription(subscription);
205-
return value;
206204
},
207205

208206
});
209207

210208
export function Observable(subscriber) {
209+
// Constructor cannot be called as a function
210+
if (!(this instanceof Observable))
211+
throw new TypeError("Observable cannot be called as a function");
212+
211213
// The stream subscriber must be a function
212214
if (typeof subscriber !== "function")
213215
throw new TypeError("Observable initializer must be a function");
@@ -224,6 +226,8 @@ addMethods(Observable.prototype, {
224226
error: args[0],
225227
complete: args[1],
226228
};
229+
} else if (typeof observer !== 'object' || observer === null) {
230+
observer = {};
227231
}
228232

229233
return new Subscription(observer, this._subscriber);
@@ -251,7 +255,7 @@ addMethods(Observable.prototype, {
251255
return;
252256

253257
try {
254-
return fn(value);
258+
fn(value);
255259
} catch (err) {
256260
reject(err);
257261
subscription.unsubscribe();
@@ -278,11 +282,11 @@ addMethods(Observable.prototype, {
278282
try { value = fn(value) }
279283
catch (e) { return observer.error(e) }
280284

281-
return observer.next(value);
285+
observer.next(value);
282286
},
283287

284-
error(e) { return observer.error(e) },
285-
complete(x) { return observer.complete(x) },
288+
error(e) { observer.error(e) },
289+
complete() { observer.complete() },
286290
}));
287291
},
288292

@@ -297,14 +301,14 @@ addMethods(Observable.prototype, {
297301
if (observer.closed)
298302
return;
299303

300-
try { if (!fn(value)) return undefined }
304+
try { if (!fn(value)) return }
301305
catch (e) { return observer.error(e) }
302306

303-
return observer.next(value);
307+
observer.next(value);
304308
},
305309

306-
error(e) { return observer.error(e) },
307-
complete() { return observer.complete() },
310+
error(e) { observer.error(e) },
311+
complete() { observer.complete() },
308312
}));
309313
},
310314

@@ -339,8 +343,7 @@ addMethods(Observable.prototype, {
339343

340344
complete() {
341345
if (!hasValue && !hasSeed) {
342-
observer.error(new TypeError("Cannot reduce an empty sequence"));
343-
return;
346+
return observer.error(new TypeError("Cannot reduce an empty sequence"));
344347
}
345348

346349
observer.next(acc);
@@ -350,70 +353,6 @@ addMethods(Observable.prototype, {
350353
}));
351354
},
352355

353-
flatMap(fn) {
354-
if (typeof fn !== "function")
355-
throw new TypeError(fn + " is not a function");
356-
357-
let C = getSpecies(this);
358-
359-
return new C(observer => {
360-
let completed = false;
361-
let subscriptions = [];
362-
363-
// Subscribe to the outer Observable
364-
let outer = this.subscribe({
365-
366-
next(value) {
367-
if (fn) {
368-
try {
369-
value = fn(value);
370-
} catch (x) {
371-
observer.error(x);
372-
return;
373-
}
374-
}
375-
376-
// Subscribe to the inner Observable
377-
Observable.from(value).subscribe({
378-
_subscription: null,
379-
380-
start(s) { subscriptions.push(this._subscription = s) },
381-
next(value) { observer.next(value) },
382-
error(e) { observer.error(e) },
383-
384-
complete() {
385-
let i = subscriptions.indexOf(this._subscription);
386-
387-
if (i >= 0)
388-
subscriptions.splice(i, 1);
389-
390-
closeIfDone();
391-
}
392-
});
393-
},
394-
395-
error(e) {
396-
return observer.error(e);
397-
},
398-
399-
complete() {
400-
completed = true;
401-
closeIfDone();
402-
}
403-
});
404-
405-
function closeIfDone() {
406-
if (completed && subscriptions.length === 0)
407-
observer.complete();
408-
}
409-
410-
return () => {
411-
subscriptions.forEach(s => s.unsubscribe());
412-
outer.unsubscribe();
413-
};
414-
});
415-
},
416-
417356
});
418357

419358
Object.defineProperty(Observable.prototype, getSymbol("observable"), {

test/default.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
1-
import { runTests } from "es-observable-tests";
1+
import { runTests } from "./tc39";
22
import { Observable } from "../src/Observable.js";
33
import { TestRunner } from "moon-unit";
44

5-
import flatMapTests from "./flatMap.js";
65
import reduceTests from "./reduce.js";
76
import mapTests from "./map.js";
87
import filterTests from "./filter.js";
98
import speciesTests from "./species.js";
109

10+
// Silence setTimeout so errors aren't thrown
11+
global.setTimeout = function() {};
12+
1113
runTests(Observable).then(() => {
1214
return new TestRunner().inject({ Observable }).run({
1315
"map": mapTests,
14-
"flatMap": flatMapTests,
1516
"reduce": reduceTests,
1617
"filter": filterTests,
1718
"species": speciesTests,

0 commit comments

Comments
 (0)