-
Notifications
You must be signed in to change notification settings - Fork 31
Open
Description
nodejs version:22
platform: Ubuntu 24.04.3 LTS
import { Http3Server} from "@fails-components/webtransport";
import { WebTransportSessionImpl } from "@fails-components/webtransport/dist/lib/types";
import { WebTransportSocket } from './webtransportSocket';
import ... others
export interface WebtransportConnectorOptions extends HybridSwitcherOptions {
useDict?: boolean;
useProtobuf?: boolean;
cert: string
privateKey: string
}
let curId = 1;
export class WebTransportConnector extends EventEmitter implements IConnector {
opts: WebtransportConnectorOptions;
port: number;
host: string;
useDict: boolean;
useProtobuf: boolean;
handshake: HandshakeCommand;
heartbeat: HeartbeatCommand;
cert: string;
privateKey: string;
connector: IConnector;
dictionary: DictionaryComponent;
protobuf: ProtobufComponent;
decodeIO_protobuf: IComponent;
listeningServer: Http3Server;
// 持久流管理
constructor(port: number, host: string, opts: WebtransportConnectorOptions) {
super();
this.opts = opts;
this.port = port;
this.host = host;
this.useDict = opts.useDict;
this.useProtobuf = opts.useProtobuf;
this.handshake = new HandshakeCommand(opts);
this.heartbeat = new HeartbeatCommand(opts);
this.cert = opts.cert;
this.privateKey = opts.privateKey;
}
/**
* Start connector to listen the specified port
*/
start(cb: () => void) {
let app = pinus.app
let self = this;
let gensocket = function (socketId: number, session: WebTransportSessionImpl, clientInfo: { ip: string, port: string }) {
let wTsocket = new WebTransportSocket(socketId, session, clientInfo);
wTsocket.on('handshake', self.handshake.handle.bind(self.handshake, wTsocket));
wTsocket.on('heartbeat', self.heartbeat.handle.bind(self.heartbeat, wTsocket));
wTsocket.on('disconnect', self.heartbeat.clear.bind(self.heartbeat, wTsocket.id));
wTsocket.on('closing', Kick.handle.bind(null, wTsocket));
self.emit('connection', wTsocket);
return wTsocket
};
this.connector = app.components.__connector__.connector;
this.dictionary = app.components.__dictionary__;
this.protobuf = app.components.__protobuf__;
this.decodeIO_protobuf = app.components.__decodeIO__protobuf__;
let server = new Http3Server({
port: this.port,
host: '0.0.0.0',
secret: 'mysecret',
cert: this.cert,
privKey: this.privateKey,
reliability: 'reliableOnly',
defaultDatagramsReadableMode: 'bytes',
});
this.listeningServer = server;
server.ready.then(async () => {
server.setRequestCallback(async (args: any) => {
const url = args.header[':path']
const [path] = url.split('?')
if (server.sessionController[path] == null) {
return {
...args,
path,
status: 404
}
}
const protocols = args.header['wt-available-protocols']
? args.header['wt-available-protocols']
.split(',')
.map((el: string) => el.trim())
: undefined
let selectedProtocol = protocols && protocols[protocols.length - 1]
if (selectedProtocol === 'noprot') selectedProtocol = undefined
return {
...args,
path,
userData: {
search: url.substring(path.length)
},
header: {
...args.header,
':path': path
},
status: 200,
selectedProtocol
}
})
for await (const session of getReaderStream(server.sessionStream('/bidirectional'))) {
try {
await session.ready
let socketId = curId++
//@ts-ignore
const clientInfo = parseClientAddress(session.peerAddress)
let wTsocket = gensocket(socketId, session, clientInfo)
logger.info('WebTransportConnector session opened:', socketId)
wTsocket.establishPersistentStream()
session.closed.then((reason) => {
logger.info('WebTransportConnector session closed:', reason)
wTsocket.emit('disconnect', JSON.stringify(reason))
// 清理持久流
wTsocket.closePersistentStream()
// 释放所有事件监听,避免引用链存活
wTsocket.removeAllListeners()
wTsocket = null;
})
} catch (err) {
logger.error('WebTransportConnector session error:', err);
session.close({
closeCode: 1002,
reason: 'session error'
})
}
}
}).catch((err) => {
logger.error('WebTransportConnector server error:', err);
})
server.startServer()
process.nextTick(cb);
}
stop(force: boolean, cb: () => void) {
this.listeningServer.stopServer()
process.nextTick(cb);
}
decode = coder.decode;
encode = coder.encode;
}
function parseClientAddress(address: string | undefined): { ip: string, port: string } {
if (!address) {
return { ip: '0', port: '0' }
}
// 检查是否包含端口号 (格式: IP:PORT)
const colonIndex = address.lastIndexOf(':')
if (colonIndex !== -1) {
const ip = address.substring(0, colonIndex)
const port = address.substring(colonIndex + 1)
return { ip, port }
}
// 只有IP地址
return { ip: address, port: '0' }
}
webtransportSocket.ts
import * as path from 'path';
import { EventEmitter } from "stream";
import { ISocket } from "../interfaces/ISocket";
import { WebTransportBidirectionalStream, WebTransportSession } from "@fails-components/webtransport";
import { getLogger } from 'pinus-logger';
import { Message, Package } from 'pinus-protocol';
import { WebTransportSessionImpl } from '@fails-components/webtransport/dist/lib/types';
import handler from './common/handler';
let logger = getLogger('pinus', path.basename(__filename));
let ST_INITED = 0;
let ST_WAIT_ACK = 1;
let ST_WORKING = 2;
let ST_CLOSED = 3;
export class WebTransportSocket extends EventEmitter implements ISocket {
id: number;
remoteAddress: { ip: string, port: number };
state: number;
session: WebTransportSessionImpl;
private streamWriter: WritableStreamDefaultWriter<Uint8Array> | null = null
private streamReader: ReadableStreamDefaultReader<Uint8Array> | null = null
private _datagramReader: ReadableStreamDefaultReader<Uint8Array> | null = null;
private _datagramWriter: WritableStreamDefaultWriter<Uint8Array> | null = null;
private messageQueue: Buffer[] = [];
private isSending: boolean = false;
constructor(id: number, session: WebTransportSessionImpl, clientInfo: { ip: string, port: string }) {
super();
this.id = id;
this.session = session;
this.state = ST_INITED
this.remoteAddress = { ip: clientInfo.ip, port: parseInt(clientInfo.port) }
}
async establishPersistentStream() {
try {
// this.initDatagramStream();
this.initBidirectionalStream();
} catch (error) {
logger.error('Failed to establish persistent stream:', error);
}
}
async initBidirectionalStream() {
const streamIterator = this.session.incomingBidirectionalStreams[Symbol.asyncIterator]();
const firstStream = await streamIterator.next();
if (!firstStream.done) {
const persistentStream: WebTransportBidirectionalStream = firstStream.value;
this.streamWriter = persistentStream.writable.getWriter();
this.streamWriter.closed.then(() => {
logger.info('Persistent writer stream closed gracefully:', this.id);
}).catch((error) => {
logger.error('Persistent writer stream closed abruptly:', error);
});
logger.info('Persistent stream set for socket:', this.id);
this.streamReader = persistentStream.readable.getReader();
this.streamReader.closed.then(() => {
logger.info('Persistent reader stream closed gracefully:', this.id);
}).catch((error) => {
logger.error('Persistent reader stream closed abruptly:', error);
});
this.handlePersistentStreamReading(this.streamReader);
}
}
async initDatagramStream() {
this._datagramReader = this.session.datagrams.readable.getReader();
this._datagramReader.closed.catch((error) => {
logger.error('Datagram stream closed:', error);
});
this._datagramWriter = this.session.datagrams.createWritable().getWriter();
this._datagramWriter.closed.catch((error) => {
logger.error('Datagram stream closed:', error);
});
this.handlePersistentStreamReading(this._datagramReader);
}
async handlePersistentStreamReading(reader: ReadableStreamDefaultReader<Uint8Array>) {
let valueArr: Uint8Array[] = []
try {
while (this.session.state !== 'closed' && this.session.state !== 'failed') {
const { done, value } = await reader.read();
if (done) {
logger.info('Persistent stream ended for socket:', this.id);
break;
}
if (value[value.length - 1] != 0) {
valueArr.push(value)
continue
} else {
valueArr.push(value.subarray(0, value.length - 1))
}
let dataUint8Array = concatUint8Array(valueArr)
valueArr = []
if (value) {
try {
const msg = Package.decode(Buffer.from(dataUint8Array));
this.onPersistentMessage(msg);
} catch (decodeError) {
logger.error('Failed to decode message:', decodeError);
}
}
}
logger.info('Persistent stream reading ended for socket:', this.id, " state:", this.session.state);
} catch (error) {
logger.error('Persistent stream reading error:', error);
} finally {
try {
reader.releaseLock();
} catch (releaseError) {
logger.warn('Error releasing reader lock:', releaseError);
}
}
}
onPersistentMessage(pkg: ReturnType<typeof Package.decode>) {
if (pkg instanceof Array) {
handler(this, pkg, true);
return;
}
handler(this, pkg, true);
}
private enqueueMessage(msg: Buffer) {
this.messageQueue.push(msg);
logger.debug(`Message enqueued, queue length: ${this.messageQueue.length}`);
this.processQueue();
}
private dequeueMessage(): Buffer | undefined {
return this.messageQueue.shift();
}
private async processQueue() {
if (this.session.state === 'closed') {
return;
}
if (this.isSending) {
return;
}
if (this.messageQueue.length === 0) {
return;
}
this.isSending = true;
try {
//@ts-ignore
while (this.messageQueue.length > 0 && this.session.state !== 'closed') {
const msg = this.dequeueMessage();
if (!msg) {
break;
}
await this.sendMessageToStream(msg);
logger.debug('Message sent from queue');
await new Promise(resolve => setTimeout(resolve, 100));
}
} catch (error) {
logger.error('Error sending message from queue:', error);
this.disconnect();
} finally {
this.isSending = false;
//@ts-ignore
if (this.messageQueue.length > 0 && this.session.state !== 'closed') {
Promise.resolve().then(() => this.processQueue());
}
}
}
private async sendMessageToStream(msg: Buffer): Promise<void> {
let writer = this.streamWriter || this._datagramWriter;
if (!writer) {
throw new Error('No persistent stream writer available');
}
// logger.info('writer.desiredSize:', writer.desiredSize);
await writer.ready;
await writer.write(msg);
}
async sendRaw(msg: any) {
if (this.session.state == "closed") {
return;
}
let writer = this.streamWriter || this._datagramWriter;
if (!writer) {
logger.error('No persistent stream writer available');
return;
}
const encodedMsg = encode(msg);
this.enqueueMessage(encodedMsg);
logger.debug('Message queued for sending');
}
async closePersistentStream() {
this.messageQueue = [];
this.isSending = false;
if (this.session.state == "closed") {
logger.info("WebTransportSocket session already closed for closePersistentStream:", this.id);
return;
}
if (this.streamReader) {
try {
await this.streamReader.cancel();
} catch (error) {
logger.error('Error canceling persistent stream reader:', error);
}
this.streamReader = null;
}
if (this._datagramReader) {
try {
await this._datagramReader.cancel();
} catch (error) {
logger.error('Error canceling datagram reader:', error);
}
this._datagramReader = null;
}
if (this.streamWriter) {
try {
logger.info('Persistent stream closed for socket:', this.id);
await this.streamWriter.close();
} catch (error) {
logger.error('Error closing persistent stream:', error);
}
this.streamWriter = null;
}
if (this._datagramWriter) {
try {
logger.info('Datagram stream closed for socket:', this.id);
await this._datagramWriter.close();
} catch (error) {
logger.error('Error closing datagram stream:', error);
}
this._datagramWriter = null;
}
}
send(msg: any) {
if (msg instanceof String) {
msg = Buffer.from(msg as string);
} else if (!(msg instanceof Buffer)) {
msg = Buffer.from(JSON.stringify(msg));
}
this.sendRaw(Package.encode(Package.TYPE_DATA, msg));
}
sendBatch(msgs: any[]) {
let rs: Buffer[] = [];
for (let i = 0; i < msgs.length; i++) {
let src = Package.encode(Package.TYPE_DATA, msgs[i]);
rs.push(src);
}
this.sendRaw(rs);
}
sendForce(msg: any) {
this.sendRaw(msg);
}
/**
* Response handshake request
*
* @api private
*/
handshakeResponse(resp: any) {
this.sendRaw(resp);
this.state = ST_WAIT_ACK;
}
disconnect() {
logger.info("WebTransportSocket disconnect:", this.id);
this.state = ST_CLOSED;
this.closePersistentStream();
if (this.session.state == "closed") {
logger.info("WebTransportSocket session already closed:", this.id);
return;
}
this.session.close({
closeCode: 1002,
reason: 'disconnect'
})
}
}
function encode(msg: Buffer) {
//add asscii 0 to last one
return Buffer.concat([msg, Buffer.from([0])])
}
function concatUint8Array(arr: Uint8Array[]) {
if (arr.length === 0) {
return new Uint8Array(0);
}
if (arr.length === 1) {
return arr[0];
}
let totalLength = 0;
for (const item of arr) {
totalLength += item.length;
}
const result = new Uint8Array(totalLength);
let offset = 0;
for (const item of arr) {
result.set(item, offset);
offset += item.length;
}
return result;
}
once stress testing is 20 minute,after tesing,heap mem can down initial level(about 45M),but rss can not decrease,
after Several 5-hour stress tests, the rss up to 3300M, one of webtransport process be killed in k8s pod.
The remaining process is shown in the following figure.
Each process handles approximately 4.1 million connections, with each connection ending and being disconnected by the client in about one minute. Each process has approximately 2,500 concurrent connections, processing 500 requests per second (QPS) simultaneously, with CPU utilization at 80%.
I change the mem alloc to libjemalloc.so.2,but no use.
Metadata
Metadata
Assignees
Labels
No labels