Skip to content

Commit b5f4ebf

Browse files
[Fix] WebSocket Close Bugs (#11)
* fix websocket closing issue and add authentication errors * fix websocket bugs * service probes * probes ignore fix * afterWrite error handle * remove websocket package dependency * update logic: should be able to check socket status before write only. Closing a Duplex stream will close raw socket * test commit
1 parent 511f6d7 commit b5f4ebf

File tree

15 files changed

+598
-111
lines changed

15 files changed

+598
-111
lines changed

.changeset/happy-flies-tease.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-core': patch
3+
---
4+
5+
Fix missing authentication errors for websocket sync stream requests

.changeset/smooth-frogs-wait.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-rsocket-router': patch
3+
---
4+
5+
Fix issue where sending data during socket closing would throw an exception.

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ npm-error.log
1616
.local-dev
1717
.probes
1818

19+
1920
packages/*/manifest.json
2021

2122
.clinic

packages/rsocket-router/package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,17 @@
1818
"test": "vitest"
1919
},
2020
"dependencies": {
21-
"rsocket-core": "1.0.0-alpha.3",
22-
"rsocket-websocket-server": "1.0.0-alpha.3",
2321
"@journeyapps-platform/micro": "^17.0.1",
22+
"rsocket-core": "1.0.0-alpha.3",
2423
"ts-codec": "^1.2.2",
2524
"uuid": "^9.0.1",
26-
"ws": "~8.2.3"
25+
"ws": "^8.17.0"
2726
},
2827
"devDependencies": {
2928
"@types/uuid": "^9.0.4",
3029
"@types/ws": "~8.2.0",
3130
"bson": "^6.6.0",
31+
"rsocket-websocket-client": "1.0.0-alpha.3",
3232
"typescript": "^5.2.2",
3333
"vitest": "^0.34.6"
3434
}

packages/rsocket-router/src/router/ReactiveSocketRouter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import * as micro from '@journeyapps-platform/micro';
77
import * as http from 'http';
88
import { Payload, RSocketServer } from 'rsocket-core';
9-
import { WebsocketServerTransport } from 'rsocket-websocket-server';
109
import * as ws from 'ws';
1110
import { SocketRouterObserver } from './SocketRouterListener.js';
1211
import {
@@ -17,6 +16,7 @@ import {
1716
ReactiveSocketRouterOptions,
1817
SocketResponder
1918
} from './types.js';
19+
import { WebsocketServerTransport } from './transport/WebSocketServerTransport.js';
2020

2121
export class ReactiveSocketRouter<C> {
2222
constructor(protected options?: ReactiveSocketRouterOptions<C>) {}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Adapted from https://github.com/rsocket/rsocket-js/blob/1.0.x-alpha/packages/rsocket-websocket-client/src/WebsocketClientTransport.ts
3+
* Copyright 2021-2022 the original author or authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
import {
19+
Closeable,
20+
Deferred,
21+
Demultiplexer,
22+
DuplexConnection,
23+
Frame,
24+
FrameHandler,
25+
Multiplexer,
26+
Outbound,
27+
ServerTransport
28+
} from 'rsocket-core';
29+
import * as WebSocket from 'ws';
30+
import { WebsocketDuplexConnection } from './WebsocketDuplexConnection.js';
31+
import * as micro from '@journeyapps-platform/micro';
32+
33+
export type SocketFactory = (options: SocketOptions) => WebSocket.WebSocketServer;
34+
35+
export type SocketOptions = {
36+
host?: string;
37+
port?: number;
38+
};
39+
40+
export type ServerOptions = SocketOptions & {
41+
wsCreator?: SocketFactory;
42+
debug?: boolean;
43+
};
44+
45+
const defaultFactory: SocketFactory = (options: SocketOptions) => {
46+
return new WebSocket.WebSocketServer({
47+
host: options.host,
48+
port: options.port
49+
});
50+
};
51+
52+
export class WebsocketServerTransport implements ServerTransport {
53+
private readonly host: string | undefined;
54+
private readonly port: number | undefined;
55+
private readonly factory: SocketFactory;
56+
57+
constructor(options: ServerOptions) {
58+
this.host = options.host;
59+
this.port = options.port;
60+
this.factory = options.wsCreator ?? defaultFactory;
61+
}
62+
63+
async bind(
64+
connectionAcceptor: (frame: Frame, connection: DuplexConnection) => Promise<void>,
65+
multiplexerDemultiplexerFactory: (
66+
frame: Frame,
67+
outbound: Outbound & Closeable
68+
) => Multiplexer & Demultiplexer & FrameHandler
69+
): Promise<Closeable> {
70+
const websocketServer: WebSocket.WebSocketServer = await this.connectServer();
71+
const serverCloseable = new ServerCloseable(websocketServer);
72+
73+
const connectionListener = (websocket: WebSocket.WebSocket) => {
74+
try {
75+
websocket.binaryType = 'nodebuffer';
76+
const duplex = WebSocket.createWebSocketStream(websocket);
77+
WebsocketDuplexConnection.create(duplex, connectionAcceptor, multiplexerDemultiplexerFactory, websocket);
78+
} catch (ex) {
79+
micro.logger.error(`Could not create duplex connection`, ex);
80+
if (websocket.readyState == websocket.OPEN) {
81+
websocket.close();
82+
}
83+
}
84+
};
85+
86+
const closeListener = (error?: Error) => {
87+
serverCloseable.close(error);
88+
};
89+
90+
websocketServer.addListener('connection', connectionListener);
91+
websocketServer.addListener('close', closeListener);
92+
websocketServer.addListener('error', closeListener);
93+
94+
return serverCloseable;
95+
}
96+
97+
private connectServer(): Promise<WebSocket.WebSocketServer> {
98+
return new Promise((resolve, reject) => {
99+
const websocketServer = this.factory({
100+
host: this.host,
101+
port: this.port
102+
});
103+
104+
const earlyCloseListener = (error?: Error) => {
105+
reject(error);
106+
};
107+
108+
websocketServer.addListener('close', earlyCloseListener);
109+
websocketServer.addListener('error', earlyCloseListener);
110+
websocketServer.addListener('listening', () => resolve(websocketServer));
111+
});
112+
}
113+
}
114+
115+
class ServerCloseable extends Deferred {
116+
constructor(private readonly server: WebSocket.WebSocketServer) {
117+
super();
118+
}
119+
120+
close(error?: Error) {
121+
if (this.done) {
122+
super.close(error);
123+
return;
124+
}
125+
126+
// For this package's use case the server is externally closed
127+
128+
super.close();
129+
}
130+
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Adapted from https://github.com/rsocket/rsocket-js/blob/1.0.x-alpha/packages/rsocket-websocket-client/src/WebsocketDuplexConnection.ts
3+
* Copyright 2021-2022 the original author or authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
import * as micro from '@journeyapps-platform/micro';
19+
import {
20+
Closeable,
21+
Deferred,
22+
Demultiplexer,
23+
deserializeFrame,
24+
DuplexConnection,
25+
Frame,
26+
FrameHandler,
27+
Multiplexer,
28+
Outbound,
29+
serializeFrame
30+
} from 'rsocket-core';
31+
import { Duplex } from 'stream';
32+
import WebSocket from 'ws';
33+
34+
export class WebsocketDuplexConnection extends Deferred implements DuplexConnection, Outbound {
35+
readonly multiplexerDemultiplexer: Multiplexer & Demultiplexer & FrameHandler;
36+
37+
constructor(
38+
private websocketDuplex: Duplex,
39+
frame: Frame,
40+
multiplexerDemultiplexerFactory: (
41+
frame: Frame,
42+
outbound: Outbound & Closeable
43+
) => Multiplexer & Demultiplexer & FrameHandler,
44+
private rawSocket: WebSocket.WebSocket
45+
) {
46+
super();
47+
48+
websocketDuplex.on('close', this.handleClosed);
49+
websocketDuplex.on('error', this.handleError);
50+
websocketDuplex.on('data', this.handleMessage);
51+
52+
this.multiplexerDemultiplexer = multiplexerDemultiplexerFactory(frame, this);
53+
}
54+
55+
get availability(): number {
56+
return this.websocketDuplex.destroyed ? 0 : 1;
57+
}
58+
59+
close(error?: Error) {
60+
if (this.done) {
61+
super.close(error);
62+
return;
63+
}
64+
65+
this.websocketDuplex.removeAllListeners();
66+
this.websocketDuplex.end();
67+
68+
super.close(error);
69+
}
70+
71+
send(frame: Frame): void {
72+
if (this.done) {
73+
return;
74+
}
75+
76+
try {
77+
const buffer = serializeFrame(frame);
78+
// Work around for this issue
79+
// https://github.com/websockets/ws/issues/1515
80+
if (this.rawSocket.readyState == this.rawSocket.CLOSING || this.rawSocket.readyState == this.rawSocket.CLOSED) {
81+
this.close(new Error('WebSocket is closing'));
82+
return;
83+
}
84+
85+
this.websocketDuplex.write(buffer);
86+
} catch (ex) {
87+
this.close(new Error(ex.reason || `Could not write to WebSocket duplex connection: ${ex}`));
88+
}
89+
}
90+
91+
private handleClosed = (e: WebSocket.CloseEvent): void => {
92+
this.close(new Error(e.reason || 'WebsocketDuplexConnection: Socket closed unexpectedly.'));
93+
};
94+
95+
private handleError = (e: WebSocket.ErrorEvent): void => {
96+
micro.logger.error(`Error in WebSocket duplex connection: ${e}`);
97+
this.close(e.error);
98+
};
99+
100+
private handleMessage = (buffer: Buffer): void => {
101+
try {
102+
const frame = deserializeFrame(buffer);
103+
this.multiplexerDemultiplexer.handle(frame);
104+
} catch (error) {
105+
this.close(error);
106+
}
107+
};
108+
109+
static create(
110+
socket: Duplex,
111+
connectionAcceptor: (frame: Frame, connection: DuplexConnection) => Promise<void>,
112+
multiplexerDemultiplexerFactory: (
113+
frame: Frame,
114+
outbound: Outbound & Closeable
115+
) => Multiplexer & Demultiplexer & FrameHandler,
116+
rawSocket: WebSocket.WebSocket
117+
): void {
118+
socket.once('data', async (buffer) => {
119+
let frame: Frame | undefined = undefined;
120+
try {
121+
frame = deserializeFrame(buffer);
122+
if (!frame) {
123+
throw new Error(`Unable to deserialize frame`);
124+
}
125+
} catch (ex) {
126+
micro.logger.info(`Received error deserializing initial frame buffer. Skipping connection request.`, ex);
127+
// The initial frame should always be parsable
128+
return socket.end();
129+
}
130+
131+
const connection = new WebsocketDuplexConnection(socket, frame, multiplexerDemultiplexerFactory, rawSocket);
132+
if (connection.done) {
133+
return;
134+
}
135+
try {
136+
socket.pause();
137+
await connectionAcceptor(frame, connection);
138+
socket.resume();
139+
} catch (error) {
140+
micro.logger.info(`Error accepting connection:`, error);
141+
connection.close(error);
142+
}
143+
});
144+
}
145+
}

0 commit comments

Comments
 (0)