Skip to content

Commit

Permalink
Add channels to the shared web socket service (#567)
Browse files Browse the repository at this point in the history
- Agents that use the shared websocket service now connect to an
agent-specific channel. The channel will typically have the agent and
its extensions/clients.
  • Loading branch information
hillary-mutisya authored Jan 17, 2025
1 parent 25745b7 commit 2b0023f
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 25 deletions.
2 changes: 1 addition & 1 deletion ts/packages/agents/browser/src/agent/actionHandler.mts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async function updateBrowserContext(
return;
}

const webSocket = await createWebSocket();
const webSocket = await createWebSocket("browser", "dispatcher");
if (webSocket) {
context.agentContext.webSocket = webSocket;
context.agentContext.browserConnector = new BrowserConnector(context);
Expand Down
8 changes: 5 additions & 3 deletions ts/packages/agents/browser/src/extension/serviceWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export async function createWebSocket() {
let socketEndpoint =
configValues["WEBSOCKET_HOST"] ?? "ws://localhost:8080/";

socketEndpoint += "?clientId=" + chrome.runtime.id;
socketEndpoint += `?channel=browser&role=client&clientId=${chrome.runtime.id}`;
return new Promise<WebSocket | undefined>((resolve, reject) => {
const webSocket = new WebSocket(socketEndpoint);
console.log("Connected to: " + socketEndpoint);
Expand Down Expand Up @@ -133,11 +133,13 @@ async function ensureWebsocketConnected() {
}
};

webSocket.onclose = (event: object) => {
webSocket.onclose = (event: any) => {
console.log("websocket connection closed");
webSocket = undefined;
showBadgeError();
reconnectWebSocket();
if (event.reason !== "duplicate") {
reconnectWebSocket();
}
};

resolve(webSocket);
Expand Down
2 changes: 1 addition & 1 deletion ts/packages/agents/code/src/codeActionHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async function updateCodeContext(
return;
}

const webSocket = await createWebSocket();
const webSocket = await createWebSocket("code", "dispatcher");
if (webSocket) {
agentContext.webSocket = webSocket;
agentContext.pendingCall = new Map();
Expand Down
14 changes: 12 additions & 2 deletions ts/packages/coda/src/webSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,19 @@ export type WebSocketMessage = {
body: any;
};

export async function createWebSocket() {
export async function createWebSocket(
channel: string,
role: string,
clientId?: string,
) {
return new Promise<WebSocket | undefined>((resolve, reject) => {
const webSocket = new WebSocket("ws://localhost:8080/");
let endpoint = "ws://localhost:8080";
endpoint += `?channel=${channel}&role=${role}`;
if (clientId) {
endpoint += `clientId=${clientId}`;
}

const webSocket = new WebSocket(endpoint);

webSocket.onopen = (event: object) => {
console.log("websocket open");
Expand Down
2 changes: 1 addition & 1 deletion ts/packages/coda/src/wsConnect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async function ensureWebsocketConnected() {
return;
}

webSocket = await createWebSocket();
webSocket = await createWebSocket("code", "client");
if (!webSocket) {
return;
}
Expand Down
11 changes: 10 additions & 1 deletion ts/packages/commonUtils/src/webSockets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ export type WebSocketMessage = {
body: any;
};

export async function createWebSocket() {
export async function createWebSocket(
channel: string,
role: string,
clientId?: string,
) {
return new Promise<WebSocket | undefined>((resolve, reject) => {
let endpoint = "ws://localhost:8080";
if (process.env["WEBSOCKET_HOST"]) {
Expand All @@ -32,6 +36,11 @@ export async function createWebSocket() {
}
}

endpoint += `?channel=${channel}&role=${role}`;
if (clientId) {
endpoint += `clientId=${clientId}`;
}

const webSocket = new WebSocket(endpoint);

webSocket.onopen = (event: object) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,26 @@ import { WebSocketMessage } from "common-utils";
import registerDebug from "debug";
import { IncomingMessage } from "node:http";

interface Client {
id: string | null;
role: string;
socket: WebSocket;
channelName: string;
}

interface Channel {
name: string;
clients: Set<Client>;
}

const debug = registerDebug("typeagent:serviceHost");

const hostEndpoint = process.env["WEBSOCKET_HOST"] ?? "ws://localhost:8080";
const url = new URL(hostEndpoint);

// Channels organized by agentType
const channels: Map<string, Channel> = new Map();

try {
const wss = new WebSocketServer({
port: parseInt(url.port),
Expand All @@ -32,37 +47,88 @@ try {
wss.on("connection", (ws: WebSocket, req: IncomingMessage) => {
debug("New client connected");

if (req.url) {
const params = new URLSearchParams(req.url.split("?")[1]);
const clientId = params.get("clientId");
if (clientId) {
for (var client of wss.clients) {
if ((client as any).clientId) {
wss.clients.delete(client);
}
}
const params = new URLSearchParams(req.url?.split("?")[1]);
const clientId = params.get("clientId");
const channelName = params.get("channel");
const role = params.get("role");

(ws as any).clientId = clientId;
if (!channelName || !role) {
ws.send(JSON.stringify({ error: "Missing agentName or role" }));
ws.close();
return;
}

// Ensure the channel exists
if (!channels.has(channelName)) {
channels.set(channelName, {
name: channelName,
clients: new Set(),
});
}

const channel = channels.get(channelName)!;
const client: Client = {
id: clientId,
role: role,
socket: ws,
channelName: channelName,
};

if (clientId) {
for (var socket of wss.clients) {
if ((socket as any).clientId == clientId && socket !== ws) {
debug(
"Closing duplicate socket instance for id " + clientId,
);
socket.close(1013, "duplicate");
}
}

(ws as any).clientId = clientId;
}

debug(`Connection count: ${wss.clients.size}`);
channel.clients.add(client);
debug(`Client ${clientId} joined channel ${channelName}.`);

ws.on("message", (message: string) => {
try {
const data = JSON.parse(message) as WebSocketMessage;
if (data.messageType != "keepAlive") {
// broadcast to all connected clients
// TO DO: add routing to send messages to specific clients
wss.clients.forEach((client) => client.send(message));
let foundAtLeastOneTarget = false;

// Broadcast message to all clients in the same channel that have a different role
channel.clients.forEach((client) => {
if (
client.role !== role &&
client.socket.readyState === WebSocket.OPEN
) {
client.socket.send(message);
foundAtLeastOneTarget = true;
}
});

if (!foundAtLeastOneTarget) {
const errorMessage =
data.source === channelName
? `The ${channelName} agent is not connected. The message cannot be processed.`
: `No ${channelName} clients are listening for messaages on this channel`;
ws.send(JSON.stringify({ error: errorMessage }));
}
}
} catch {
debug("WebSocket message not parsed.");
}
});

ws.on("close", () => {
debug("Client disconnected");
debug(`Client ${clientId} disconnected.`);
channel.clients.delete(client);

// Cleanup empty channels
if (channel.clients.size === 0) {
channels.delete(channelName);
debug(`Channel ${channelName} deleted.`);
}
});
});

Expand Down
6 changes: 5 additions & 1 deletion ts/packages/shell/src/main/browserIpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ export class BrowserAgentIpc {
} catch {}
}

this.webSocket = await createWebSocket();
this.webSocket = await createWebSocket(
"browser",
"client",
"inlineBrowser",
);
if (!this.webSocket) {
resolve(undefined);
return;
Expand Down

0 comments on commit 2b0023f

Please sign in to comment.