Skip to content

Commit 52200e7

Browse files
committed
Allow substituting the request creation function
1 parent 0120ac9 commit 52200e7

File tree

2 files changed

+21
-8
lines changed

2 files changed

+21
-8
lines changed

index.js

+8
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@ module.exports.AGServer = require('./server');
2020

2121
module.exports.AGServerSocket = require('./serversocket');
2222

23+
/**
24+
* Expose AGRequest constructor.
25+
*
26+
* @api public
27+
*/
28+
29+
module.exports.AGRequest = require('ag-request');
30+
2331
/**
2432
* Creates an http.Server exclusively used for WS upgrades.
2533
*

serversocket.js

+13-8
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ function AGServerSocket(id, server, socket, protocolVersion) {
4141
this.outboundPreparedMessageCount = 0;
4242
this.outboundSentMessageCount = 0;
4343

44+
this.createRequest = this.server.options.requestCreator || this.defaultRequestCreator;
4445
this.cloneData = this.server.options.cloneData;
4546

4647
this.inboundMessageStream = new WritableConsumableStream();
@@ -197,6 +198,10 @@ AGServerSocket.prototype._startBatchOnHandshake = function () {
197198
}, this.batchOnHandshakeDuration);
198199
};
199200

201+
AGServerSocket.prototype.defaultRequestCreator = function (socket, id, procedureName, data) {
202+
return new AGRequest(socket, id, procedureName, data);
203+
};
204+
200205
// ---- Receiver logic ----
201206

202207
AGServerSocket.prototype.receiver = function (receiverName) {
@@ -616,7 +621,7 @@ AGServerSocket.prototype._processInboundPacket = async function (packet, message
616621
this.socket.close(HANDSHAKE_REJECTION_STATUS_CODE);
617622
return;
618623
}
619-
let request = new AGRequest(this, packet.cid, eventName, packet.data);
624+
let request = this.createRequest(this, packet.cid, eventName, packet.data);
620625
await this._processHandshakeRequest(request);
621626
this._procedureDemux.write(eventName, request);
622627
return;
@@ -635,7 +640,7 @@ AGServerSocket.prototype._processInboundPacket = async function (packet, message
635640
return;
636641
}
637642
// Let AGServer handle these events.
638-
let request = new AGRequest(this, packet.cid, eventName, packet.data);
643+
let request = this.createRequest(this, packet.cid, eventName, packet.data);
639644
await this._processAuthenticateRequest(request);
640645
this._procedureDemux.write(eventName, request);
641646
return;
@@ -664,7 +669,7 @@ AGServerSocket.prototype._processInboundPacket = async function (packet, message
664669
this.emitError(error);
665670

666671
if (isRPC) {
667-
let request = new AGRequest(this, packet.cid, eventName, packet.data);
672+
let request = this.createRequest(this, packet.cid, eventName, packet.data);
668673
request.error(error);
669674
}
670675
return;
@@ -674,7 +679,7 @@ AGServerSocket.prototype._processInboundPacket = async function (packet, message
674679
this.emitError(error);
675680

676681
if (isRPC) {
677-
let request = new AGRequest(this, packet.cid, eventName, packet.data);
682+
let request = this.createRequest(this, packet.cid, eventName, packet.data);
678683
request.error(error);
679684
}
680685
return;
@@ -688,7 +693,7 @@ AGServerSocket.prototype._processInboundPacket = async function (packet, message
688693
this.emitError(error);
689694

690695
if (isRPC) {
691-
let request = new AGRequest(this, packet.cid, eventName, packet.data);
696+
let request = this.createRequest(this, packet.cid, eventName, packet.data);
692697
request.error(error);
693698
}
694699
return;
@@ -702,13 +707,13 @@ AGServerSocket.prototype._processInboundPacket = async function (packet, message
702707
this.emitError(error);
703708

704709
if (isRPC) {
705-
let request = new AGRequest(this, packet.cid, eventName, packet.data);
710+
let request = this.createRequest(this, packet.cid, eventName, packet.data);
706711
request.error(error);
707712
}
708713
return;
709714
}
710715
if (isRPC) {
711-
let request = new AGRequest(this, packet.cid, eventName, packet.data);
716+
let request = this.createRequest(this, packet.cid, eventName, packet.data);
712717
await this._processUnsubscribeRequest(request);
713718
this._procedureDemux.write(eventName, request);
714719
return;
@@ -735,7 +740,7 @@ AGServerSocket.prototype._processInboundPacket = async function (packet, message
735740
let newData;
736741

737742
if (isRPC) {
738-
let request = new AGRequest(this, packet.cid, eventName, packet.data);
743+
let request = this.createRequest(this, packet.cid, eventName, packet.data);
739744
try {
740745
let {data} = await this.server._processMiddlewareAction(this.middlewareInboundStream, action, this);
741746
newData = data;

0 commit comments

Comments
 (0)