Skip to content

Commit

Permalink
Clean up all the states when adding dynamic agent failed (#555)
Browse files Browse the repository at this point in the history
Make sure all the state that got modified are cleaned up, including the
channel we created.
Make sure the browser agent process doesn't crash when an unexpected
disconnect web agent message comes in.
  • Loading branch information
curtisman authored Jan 14, 2025
1 parent f80790d commit c44d484
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 70 deletions.
16 changes: 11 additions & 5 deletions ts/packages/agentRpc/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,17 @@ export async function createAgentRpcClient(
manifest: AppAgentManifest;
}) => {
const context = contextMap.get(param.contextId);
return context.addDynamicAgent(
param.name,
param.manifest,
await createAgentRpcClient(param.name, channelProvider),
);
try {
await context.addDynamicAgent(
param.name,
param.manifest,
await createAgentRpcClient(param.name, channelProvider),
);
} catch (e: any) {
// Clean up the channel if adding the agent fails
channelProvider.deleteChannel(param.name);
throw e;
}
},
removeDynamicAgent: async (param: {
contextId: number;
Expand Down
22 changes: 13 additions & 9 deletions ts/packages/agentRpc/src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

import registerDebug from "debug";

const debug = registerDebug("typeagent:rpc");
const debugIn = registerDebug("typeagent:rpc:in");
const debugOut = registerDebug("typeagent:rpc:out");
const debugError = registerDebug("typeagent:rpc:error");

import { RpcChannel } from "./common.js";
Expand Down Expand Up @@ -48,8 +49,12 @@ export function createRpc<
}
>();

const out = (message: any, cbErr?: (err: Error | null) => void) => {
debugOut(message);
channel.send(message, cbErr);
};
const cb = (message: any) => {
debug("message", message);
debugIn(message);
if (isCallMessage(message)) {
const f = callHandlers?.[message.name];

Expand All @@ -63,22 +68,22 @@ export function createRpc<
if (isInvokeMessage(message)) {
const f = invokeHandlers?.[message.name];
if (f === undefined) {
channel.send({
out({
type: "invokeError",
callId: message.callId,
error: "No invoke handler",
});
} else {
f(message.param).then(
(result) => {
channel.send({
out({
type: "invokeResult",
callId: message.callId,
result,
});
},
(error) => {
channel.send({
out({
type: "invokeError",
callId: message.callId,
error: error.message,
Expand Down Expand Up @@ -132,7 +137,7 @@ export function createRpc<
};

return new Promise<T>((resolve, reject) => {
channel.send(message, (err) => {
out(message, (err) => {
if (err !== null) {
reject(err);
}
Expand All @@ -141,13 +146,12 @@ export function createRpc<
});
},
send: <P>(name: keyof CallTargetFunctions, param?: P) => {
const message = {
out({
type: "call",
callId: nextCallId++,
name,
param,
};
channel.send(message);
});
},
} as RpcReturn<InvokeTargetFunctions, CallTargetFunctions>;
return rpc;
Expand Down
2 changes: 2 additions & 0 deletions ts/packages/agents/browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"agent-rpc": "workspace:*",
"aiclient": "workspace:*",
"common-utils": "workspace:*",
"debug": "^4.3.4",
"dompurify": "^3.1.6",
"html-to-text": "^9.0.5",
"jsonpath": "^1.1.1",
Expand All @@ -49,6 +50,7 @@
},
"devDependencies": {
"@types/chrome": "^0.0.256",
"@types/debug": "^4.1.10",
"@types/dompurify": "^3.0.5",
"@types/html-to-text": "^9.0.4",
"@types/jquery": "^3.5.14",
Expand Down
49 changes: 32 additions & 17 deletions ts/packages/agents/browser/src/agent/webTypeAgent.mts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import { createRpc } from "agent-rpc/rpc";
import { BrowserActionContext } from "./actionHandler.mjs";
import { WebAgentMessage } from "../common/webAgentMessageTypes.mjs";

import registerDebug from "debug";

const debugError = registerDebug("typeagent:webAgent:error");

export type WebAgentChannels = {
channelProvider: GenericChannelProvider;
registerChannel: GenericChannel;
Expand Down Expand Up @@ -55,12 +59,19 @@ function ensureWebAgentChannels(context: SessionContext<BrowserActionContext>) {
addTypeAgent: async (param: {
name: string;
manifest: AppAgentManifest;
}): Promise<void> =>
context.addDynamicAgent(
param.name,
param.manifest,
await createAgentRpcClient(param.name, channelProvider),
),
}): Promise<void> => {
try {
await context.addDynamicAgent(
param.name,
param.manifest,
await createAgentRpcClient(param.name, channelProvider),
);
} catch (e: any) {
// Clean up the channel if adding the agent fails
channelProvider.deleteChannel(param.name);
throw e;
}
},
});

const webAgentChannels = {
Expand All @@ -79,16 +90,20 @@ export async function processWebAgentMessage(
if (webAgentChannels === undefined) {
return;
}
switch (message.messageType) {
case "register":
webAgentChannels.registerChannel.message(message.body);
break;
case "message":
webAgentChannels.channelProvider.message(message.body);
break;
case "disconnect":
await context.removeDynamicAgent(message.body);
webAgentChannels.channelProvider.deleteChannel(message.body);
break;
try {
switch (message.messageType) {
case "register":
webAgentChannels.registerChannel.message(message.body);
break;
case "message":
webAgentChannels.channelProvider.message(message.body);
break;
case "disconnect":
await context.removeDynamicAgent(message.body);
webAgentChannels.channelProvider.deleteChannel(message.body);
break;
}
} catch (e: any) {
debugError("Error processing web agent message", e);
}
}
100 changes: 64 additions & 36 deletions ts/packages/dispatcher/src/context/appAgentManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,48 +265,58 @@ export class AppAgentManager implements ActionConfigProvider {
}

private addAgentManifest(
name: string,
appAgentName: string,
manifest: AppAgentManifest,
semanticMapP: Promise<void>[],
provider?: AppAgentProvider,
actionEmbeddingCache?: EmbeddingCache,
) {
this.emojis[name] = manifest.emojiChar;

// TODO: detect duplicate names
const actionConfigs = convertToActionConfig(name, manifest);
const actionConfigs = convertToActionConfig(appAgentName, manifest);

const entries = Object.entries(actionConfigs);
for (const [name, config] of entries) {
debug(`Adding action config: ${name}`);
this.actionConfigs.set(name, config);
this.emojis[name] = config.emojiChar;

const actionSchemaFile =
this.actionSchemaFileCache.getActionSchemaFile(config);

if (this.actionSemanticMap) {
semanticMapP.push(
this.actionSemanticMap.addActionSchemaFile(
config,
actionSchemaFile,
actionEmbeddingCache,
),
);
}

if (config.transient) {
this.transientAgents[name] = false;
}
if (config.injected) {
for (const actionName of actionSchemaFile.actionSchemas.keys()) {
this.injectedSchemaForActionName.set(actionName, name);
try {
for (const [schemaName, config] of entries) {
debug(`Adding action config: ${schemaName}`);
this.actionConfigs.set(schemaName, config);
this.emojis[schemaName] = config.emojiChar;

const actionSchemaFile =
this.actionSchemaFileCache.getActionSchemaFile(config);

if (this.actionSemanticMap) {
semanticMapP.push(
this.actionSemanticMap.addActionSchemaFile(
config,
actionSchemaFile,
actionEmbeddingCache,
),
);
}

if (config.transient) {
this.transientAgents[schemaName] = false;
}
if (config.injected) {
for (const actionName of actionSchemaFile.actionSchemas.keys()) {
this.injectedSchemaForActionName.set(
actionName,
schemaName,
);
}
}
}

this.emojis[appAgentName] = manifest.emojiChar;
} catch (e: any) {
// Clean up what we did.
this.cleanupDynamicAgent(appAgentName);
throw e;
}

const record: AppAgentRecord = {
name,
name: appAgentName,
provider,
actions: new Set(),
schemas: new Set(),
Expand All @@ -315,7 +325,7 @@ export class AppAgentManager implements ActionConfigProvider {
manifest,
};

this.agents.set(name, record);
this.agents.set(appAgentName, record);
return record;
}

Expand All @@ -342,17 +352,35 @@ export class AppAgentManager implements ActionConfigProvider {
debug("Finish action embeddings");
}

private cleanupDynamicAgent(appAgentName: string) {
delete this.emojis[appAgentName];
for (const [schemaName, config] of this.actionConfigs) {
if (getAppAgentName(schemaName) !== appAgentName) {
continue;
}
delete this.emojis[schemaName];
this.actionConfigs.delete(schemaName);
this.actionSchemaFileCache.unloadActionSchemaFile(schemaName);
this.actionSemanticMap?.removeActionSchemaFile(schemaName);
if (config.transient) {
delete this.transientAgents[schemaName];
}
if (config.injected) {
const injectedMap = this.injectedSchemaForActionName;
for (const [actionName, name] of injectedMap) {
if (name === schemaName) {
injectedMap.delete(actionName);
}
}
}
}
}

public async removeDynamicAgent(appAgentName: string) {
const record = this.getRecord(appAgentName);
this.agents.delete(appAgentName);
this.cleanupDynamicAgent(appAgentName);

for (const name of this.actionConfigs.keys()) {
if (getAppAgentName(name) === appAgentName) {
this.actionConfigs.delete(name);
this.actionSchemaFileCache.unloadActionSchemaFile(name);
this.actionSemanticMap?.removeActionSchemaFile(name);
}
}
await this.closeSessionContext(record);
if (record.appAgent !== undefined) {
record.provider?.unloadAppAgent(record.name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ export class ActionSchemaSemanticMap {
}

public removeActionSchemaFile(schemaName: string) {
if (!this.actionSemanticMaps.has(schemaName)) {
throw new Error(`Internal Error: Invalid schemaName ${schemaName}`);
}
this.actionSemanticMaps.delete(schemaName);
}

Expand Down
6 changes: 6 additions & 0 deletions ts/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c44d484

Please sign in to comment.