@@ -18,11 +18,11 @@ import { Poll } from './poll';
18
18
*/
19
19
export class SocketStream < T , U > extends Stream < T , U > implements IDisposable {
20
20
/**
21
- * Construct a new socket stream.
21
+ * Construct a new web socket stream.
22
22
*
23
23
* @param sender - The sender which owns the stream.
24
24
*
25
- * @param options = The socket stream instantiation options .
25
+ * @param options - Web socket `url` and optional `WebSocket` constructor .
26
26
*/
27
27
constructor ( sender : T , options : SocketStream . IOptions ) {
28
28
super ( sender ) ;
@@ -41,9 +41,8 @@ export class SocketStream<T, U> extends Stream<T, U> implements IDisposable {
41
41
* Dispose the stream.
42
42
*/
43
43
dispose ( ) {
44
- super . stop ( ) ;
45
- this . subscription . dispose ( ) ;
46
- const { socket } = this ;
44
+ const { socket, subscription } = this ;
45
+ subscription . dispose ( ) ;
47
46
if ( socket ) {
48
47
this . socket = null ;
49
48
socket . onclose = ( ) => undefined ;
@@ -53,6 +52,7 @@ export class SocketStream<T, U> extends Stream<T, U> implements IDisposable {
53
52
socket . close ( ) ;
54
53
}
55
54
Signal . clearData ( this ) ;
55
+ super . stop ( ) ;
56
56
}
57
57
58
58
/**
@@ -75,9 +75,9 @@ export class SocketStream<T, U> extends Stream<T, U> implements IDisposable {
75
75
protected socket : WebSocket | null = null ;
76
76
77
77
/**
78
- * The poll instance that mediates the web socket lifecycle .
78
+ * A handle to the socket subscription to dispose when necessary .
79
79
*/
80
- protected readonly subscription : Poll ;
80
+ protected readonly subscription : IDisposable ;
81
81
82
82
/**
83
83
* Open a web socket and subscribe to its updates.
@@ -88,7 +88,7 @@ export class SocketStream<T, U> extends Stream<T, U> implements IDisposable {
88
88
if ( this . isDisposed ) {
89
89
return ;
90
90
}
91
- return new Promise < void > ( ( _ , reject ) => {
91
+ return new Promise ( ( _ , reject ) => {
92
92
this . socket = this . factory ( ) ;
93
93
this . socket . onclose = ( ) => reject ( new Error ( 'socket stream has closed' ) ) ;
94
94
this . socket . onmessage = ( { data } ) => data && this . emit ( JSON . parse ( data ) ) ;
0 commit comments