Skip to content
This repository has been archived by the owner on Mar 1, 2024. It is now read-only.

Commit

Permalink
Removing the messagesend and messagerecv classes in favor of using th…
Browse files Browse the repository at this point in the history
…e protobuf messages.
mcottontensor committed Jan 30, 2024
1 parent 801663e commit f8e503c
Showing 10 changed files with 159 additions and 331 deletions.
2 changes: 1 addition & 1 deletion Common/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@epicgames-ps/lib-pixelstreamingcommon-ue5.5",
"version": "0.0.6",
"version": "0.0.7",
"description": "Common utilities library for Unreal Engine 5.5 Pixel Streaming",
"main": "build/commonjs/pixelstreamingcommon.js",
"module": "build/es2015/pixelstreamingcommon.js",
6 changes: 6 additions & 0 deletions Common/protobuf/signalling_messages.proto
Original file line number Diff line number Diff line change
@@ -68,6 +68,7 @@ message offer {
string type = 1;
string sdp = 2;
optional string playerId = 3;
optional bool sfu = 4;
}

message peerDataChannelsReady {
@@ -132,3 +133,8 @@ message subscribe {
message unsubscribe {
string type = 1;
}

message streamerIdChanged {
string type = 1;
string newID = 2;
}
38 changes: 38 additions & 0 deletions Common/src/Messages/message_helpers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { IMessageType } from "@protobuf-ts/runtime";
import { BaseMessage } from './base_message';
import { Logger } from '../Logger/Logger';
import { MessageRegistry } from './message_registry';

export function createMessage(messageType: IMessageType<BaseMessage>, params?: any) {
const message = messageType.create();
@@ -9,3 +11,39 @@ export function createMessage(messageType: IMessageType<BaseMessage>, params?: a
}
return message;
}

export function validateMessage(msg: any): IMessageType<BaseMessage> | null {
let valid: boolean = true;

if (!msg.type) {
Logger.Error(Logger.GetStackTrace(), `Parsed message has no type. Rejected. ${JSON.stringify(msg)}`);
return null;
}

const messageType = MessageRegistry[msg.type];
if (!messageType) {
Logger.Error(Logger.GetStackTrace(), `Message is of an unknown type: "${messageType}". Rejected.`);
return null;
}

if (messageType.fields) {
for (let field of messageType.fields) {
if (!field.opt) {
if (!msg.hasOwnProperty(field.name)) {
Logger.Error(Logger.GetStackTrace(), `Message "${msg.type}"" is missing required field "${field.name}". Rejected.`);
valid = false;
}
}
}
}

for (const fieldName in msg) {
const found = messageType.fields.find(field => field.name === fieldName);
if (!found) {
Logger.Error(Logger.GetStackTrace(), `Message "${msg.type}" contains unknown field "${fieldName}". Rejected.`);
valid = false;
}
}

return valid ? messageType : null;
}
81 changes: 80 additions & 1 deletion Common/src/Messages/signalling_messages.ts
Original file line number Diff line number Diff line change
@@ -196,6 +196,10 @@ export interface offer {
* @generated from protobuf field: optional string playerId = 3;
*/
playerId?: string;
/**
* @generated from protobuf field: optional bool sfu = 4;
*/
sfu?: boolean;
}
/**
* @generated from protobuf message peerDataChannelsReady
@@ -365,6 +369,19 @@ export interface unsubscribe {
*/
type: string;
}
/**
* @generated from protobuf message streamerIdChanged
*/
export interface streamerIdChanged {
/**
* @generated from protobuf field: string type = 1;
*/
type: string;
/**
* @generated from protobuf field: string newID = 2;
*/
newID: string;
}
// @generated message type with reflection information, may provide speed optimized methods
class base_message$Type extends MessageType<base_message> {
constructor() {
@@ -1033,7 +1050,8 @@ class offer$Type extends MessageType<offer> {
super("offer", [
{ no: 1, name: "type", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 2, name: "sdp", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 3, name: "playerId", kind: "scalar", opt: true, T: 9 /*ScalarType.STRING*/ }
{ no: 3, name: "playerId", kind: "scalar", opt: true, T: 9 /*ScalarType.STRING*/ },
{ no: 4, name: "sfu", kind: "scalar", opt: true, T: 8 /*ScalarType.BOOL*/ }
]);
}
create(value?: PartialMessage<offer>): offer {
@@ -1058,6 +1076,9 @@ class offer$Type extends MessageType<offer> {
case /* optional string playerId */ 3:
message.playerId = reader.string();
break;
case /* optional bool sfu */ 4:
message.sfu = reader.bool();
break;
default:
let u = options.readUnknownField;
if (u === "throw")
@@ -1079,6 +1100,9 @@ class offer$Type extends MessageType<offer> {
/* optional string playerId = 3; */
if (message.playerId !== undefined)
writer.tag(3, WireType.LengthDelimited).string(message.playerId);
/* optional bool sfu = 4; */
if (message.sfu !== undefined)
writer.tag(4, WireType.Varint).bool(message.sfu);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
@@ -1772,3 +1796,58 @@ class unsubscribe$Type extends MessageType<unsubscribe> {
* @generated MessageType for protobuf message unsubscribe
*/
export const unsubscribe = new unsubscribe$Type();
// @generated message type with reflection information, may provide speed optimized methods
class streamerIdChanged$Type extends MessageType<streamerIdChanged> {
constructor() {
super("streamerIdChanged", [
{ no: 1, name: "type", kind: "scalar", T: 9 /*ScalarType.STRING*/ },
{ no: 2, name: "newID", kind: "scalar", T: 9 /*ScalarType.STRING*/ }
]);
}
create(value?: PartialMessage<streamerIdChanged>): streamerIdChanged {
const message = globalThis.Object.create((this.messagePrototype!));
message.type = "";
message.newID = "";
if (value !== undefined)
reflectionMergePartial<streamerIdChanged>(this, message, value);
return message;
}
internalBinaryRead(reader: IBinaryReader, length: number, options: BinaryReadOptions, target?: streamerIdChanged): streamerIdChanged {
let message = target ?? this.create(), end = reader.pos + length;
while (reader.pos < end) {
let [fieldNo, wireType] = reader.tag();
switch (fieldNo) {
case /* string type */ 1:
message.type = reader.string();
break;
case /* string newID */ 2:
message.newID = reader.string();
break;
default:
let u = options.readUnknownField;
if (u === "throw")
throw new globalThis.Error(`Unknown field ${fieldNo} (wire type ${wireType}) for ${this.typeName}`);
let d = reader.skip(wireType);
if (u !== false)
(u === true ? UnknownFieldHandler.onRead : u)(this.typeName, message, fieldNo, wireType, d);
}
}
return message;
}
internalBinaryWrite(message: streamerIdChanged, writer: IBinaryWriter, options: BinaryWriteOptions): IBinaryWriter {
/* string type = 1; */
if (message.type !== "")
writer.tag(1, WireType.LengthDelimited).string(message.type);
/* string newID = 2; */
if (message.newID !== "")
writer.tag(2, WireType.LengthDelimited).string(message.newID);
let u = options.writeUnknownFields;
if (u !== false)
(u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer);
return writer;
}
}
/**
* @generated MessageType for protobuf message streamerIdChanged
*/
export const streamerIdChanged = new streamerIdChanged$Type();
7 changes: 3 additions & 4 deletions Common/src/Transport/ITransport.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { MessageSend } from '../WebSockets/MessageSend';
import { MessageRecv } from '../WebSockets/MessageReceive';
import { BaseMessage } from '../Messages/base_message';
import { EventEmitter } from 'events';

/**
@@ -16,12 +15,12 @@ export interface ITransport {
/**
* Called when the protocol wants to send a message over the transport.
*/
sendMessage(msg: MessageSend): void;
sendMessage(msg: BaseMessage): void;

/**
* Callback filled in by the SignallingProtocol and should be called by the transport when a new message arrives.
*/
onMessage: (msg: MessageRecv) => void;
onMessage: (msg: BaseMessage) => void;

/**
* Connect to a given URL.
23 changes: 14 additions & 9 deletions Common/src/Transport/WebSocketTransport.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Copyright Epic Games, Inc. All Rights Reserved.

import { Logger } from '../Logger/Logger';
import { MessageRecv } from '../WebSockets/MessageReceive';
import { MessageSend } from '../WebSockets/MessageSend';
import { ITransport } from './ITransport';
import { EventEmitter } from 'events';
import { BaseMessage } from '../Messages/base_message';
import * as MessageHelpers from '../Messages/message_helpers';

// declare the new method for the websocket interface
declare global {
@@ -25,11 +25,11 @@ export class WebSocketTransport implements ITransport {
this.events = new EventEmitter();
}

sendMessage(msg: MessageSend): void {
this.webSocket.send(msg.payload());
sendMessage(msg: BaseMessage): void {
this.webSocket.send(JSON.stringify(msg));
}

onMessage: (msg: MessageRecv) => void;
onMessage: (msg: BaseMessage) => void;

/**
* Connect to the signaling server
@@ -104,17 +104,22 @@ export class WebSocketTransport implements ITransport {
return;
}

const message: MessageRecv = JSON.parse(event.data);
Logger.Log(
Logger.GetStackTrace(),
'received => \n' +
JSON.stringify(JSON.parse(event.data), undefined, 4),
6
);

this.onMessage(message);
// Send to our signalling protocol to handle the incoming message
//this.signallingProtocol.handleMessage(message.type, event.data);
let parsedMessage;
try {
parsedMessage = JSON.parse(event.data);
} catch (e) {
Logger.Error(Logger.GetStackTrace(), `Error parsing message string ${event.data}.\n${e}`);
return;
}

this.onMessage(parsedMessage);
}

/**
97 changes: 0 additions & 97 deletions Common/src/WebSockets/MessageReceive.ts

This file was deleted.

200 changes: 0 additions & 200 deletions Common/src/WebSockets/MessageSend.ts

This file was deleted.

33 changes: 17 additions & 16 deletions Common/src/WebSockets/SignallingProtocol.ts
Original file line number Diff line number Diff line change
@@ -2,9 +2,10 @@

import { Logger } from '../Logger/Logger';
import { ITransport } from '../Transport/ITransport';
import * as MessageReceive from './MessageReceive';
import * as MessageSend from './MessageSend';
import { EventEmitter } from 'events';
import { BaseMessage } from '../Messages/base_message';
import * as Messages from '../Messages/signalling_messages';
import * as MessageHelpers from '../Messages/message_helpers';

/**
* Signalling protocol for handling messages from the signalling server.
@@ -31,10 +32,10 @@ export class SignallingProtocol {
transport.events.addListener('error', () => this.transportEvents.emit('error'));
transport.events.addListener('close', (event: CloseEvent) => this.transportEvents.emit('close', event));

transport.onMessage = (msg: MessageReceive.MessageRecv) => {
transport.onMessage = (msg: BaseMessage) => {
// auto handle ping messages
if (msg.type == MessageReceive.MessageRecvTypes.PING) {
const pongMessage = new MessageSend.MessagePong(new Date().getTime());
if (msg.type == Messages.ping.typeName) {
const pongMessage = MessageHelpers.createMessage(Messages.pong, { time: new Date().getTime() });
transport.sendMessage(pongMessage);
}
// call the handlers
@@ -66,49 +67,49 @@ export class SignallingProtocol {
/**
* Passes a message to the transport to send to the other end.
*/
sendMessage(msg: MessageSend.MessageSend) {
sendMessage(msg: BaseMessage) {
this.transport.sendMessage(msg);
}

// the following are just wrappers for sendMessage and should be deprioritized.

requestStreamerList() {
const payload = new MessageSend.MessageListStreamers();
const payload = MessageHelpers.createMessage(Messages.listStreamers);
this.transport.sendMessage(payload);
}

sendSubscribe(streamerid: string) {
const payload = new MessageSend.MessageSubscribe(streamerid);
const payload = MessageHelpers.createMessage(Messages.subscribe, { streamerid: streamerid });
this.transport.sendMessage(payload);
}

sendUnsubscribe() {
const payload = new MessageSend.MessageUnsubscribe();
const payload = MessageHelpers.createMessage(Messages.unsubscribe);
this.transport.sendMessage(payload);
}

sendWebRtcOffer(offer: RTCSessionDescriptionInit, extraParams: MessageSend.ExtraOfferParameters) {
const payload = new MessageSend.MessageWebRTCOffer(offer, extraParams);
sendWebRtcOffer(extraParams: any) {
const payload = MessageHelpers.createMessage(Messages.offer, extraParams);
this.transport.sendMessage(payload);
}

sendWebRtcAnswer(answer: RTCSessionDescriptionInit, extraParams: MessageSend.ExtraAnswerParameters) {
const payload = new MessageSend.MessageWebRTCAnswer(answer, extraParams);
sendWebRtcAnswer(extraParams: any) {
const payload = MessageHelpers.createMessage(Messages.answer, extraParams);
this.transport.sendMessage(payload);
}

sendWebRtcDatachannelRequest() {
const payload = new MessageSend.MessageWebRTCDatachannelRequest();
const payload = MessageHelpers.createMessage(Messages.dataChannelRequest);
this.transport.sendMessage(payload);
}

sendSFURecvDataChannelReady() {
const payload = new MessageSend.MessageSFURecvDataChannelReady();
const payload = MessageHelpers.createMessage(Messages.peerDataChannelsReady);
this.transport.sendMessage(payload);
}

sendIceCandidate(candidate: RTCIceCandidate) {
const payload = new MessageSend.MessageIceCandidate(candidate);
const payload = MessageHelpers.createMessage(Messages.iceCandidate, { candidate: candidate });
this.transport.sendMessage(payload);
}
}
3 changes: 0 additions & 3 deletions Common/src/pixelstreamingcommon.ts
Original file line number Diff line number Diff line change
@@ -2,9 +2,6 @@ export { Logger } from './Logger/Logger';
export { ITransport } from './Transport/ITransport';
export { WebSocketTransport } from './Transport/WebSocketTransport';
export { SignallingProtocol } from './WebSockets/SignallingProtocol';
export * as MessageReceive from './WebSockets/MessageReceive';
export * as MessageSend from './WebSockets/MessageSend';
export { MessageStreamerList } from './WebSockets/MessageReceive';
export { IMessageType } from "@protobuf-ts/runtime";
export { BaseMessage } from './Messages/base_message';
export { MessageRegistry } from './Messages/message_registry';

0 comments on commit f8e503c

Please sign in to comment.