diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index f8792393c60..3817aaaa392 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -47,7 +47,8 @@ export namespace ACP { private connection: AgentSideConnection private config: ACPConfig private sdk: OpencodeClient - private sessionManager + private sessionManager: ACPSessionManager + private sessionAbortControllers = new Map() constructor(connection: AgentSideConnection, config: ACPConfig) { this.connection = connection @@ -65,8 +66,20 @@ export namespace ACP { { optionId: "always", kind: "allow_always", name: "Always allow" }, { optionId: "reject", kind: "reject_once", name: "Reject" }, ] - this.config.sdk.event.subscribe({ directory }).then(async (events) => { - for await (const event of events.stream) { + + // Create and store abort controller for this session's event stream + const abortController = new AbortController() + this.sessionAbortControllers.set(sessionId, abortController) + + this.config.sdk.event + .subscribe( + { directory }, + { + signal: abortController.signal, + }, + ) + .then(async (events) => { + for await (const event of events.stream) { switch (event.type) { case "permission.asked": try { @@ -345,6 +358,18 @@ export namespace ACP { } } }) + .catch((error) => { + // Ignore abort errors - they're expected during cleanup + if (error?.name === "AbortError" || error?.message?.includes("abort")) { + log.info("event stream aborted", { sessionId }) + return + } + log.error("event stream error", { error, sessionId }) + }) + .finally(() => { + // Clean up abort controller when stream ends + this.sessionAbortControllers.delete(sessionId) + }) } async initialize(params: InitializeRequest): Promise { @@ -962,6 +987,42 @@ export namespace ACP { { throwOnError: true }, ) } + + /** + * Dispose of all session resources including event streams and session map entries. + * This should be called when the agent is being destroyed to prevent memory leaks. + */ + async dispose() { + log.info("disposing agent", { sessionCount: this.sessionAbortControllers.size }) + + // Abort all event streams + for (const [sessionId, controller] of this.sessionAbortControllers.entries()) { + log.info("aborting event stream", { sessionId }) + controller.abort() + } + this.sessionAbortControllers.clear() + + // Clear all sessions from the manager + this.sessionManager.clear() + } + + /** + * Close a specific session and clean up its resources. + * This should be called when a session is explicitly closed/ended. + */ + closeSession(sessionId: string) { + log.info("closing session", { sessionId }) + + // Abort the event stream for this session + const controller = this.sessionAbortControllers.get(sessionId) + if (controller) { + controller.abort() + this.sessionAbortControllers.delete(sessionId) + } + + // Remove from session manager + this.sessionManager.delete(sessionId) + } } function toToolKind(toolName: string): ToolKind { diff --git a/packages/opencode/src/acp/session.ts b/packages/opencode/src/acp/session.ts index 70b65834705..0b15d4a1431 100644 --- a/packages/opencode/src/acp/session.ts +++ b/packages/opencode/src/acp/session.ts @@ -13,6 +13,14 @@ export class ACPSessionManager { this.sdk = sdk } + delete(sessionId: string): boolean { + return this.sessions.delete(sessionId) + } + + clear(): void { + this.sessions.clear() + } + async create(cwd: string, mcpServers: McpServer[], model?: ACPSessionState["model"]): Promise { const session = await this.sdk.session .create( diff --git a/packages/opencode/src/acp/types.ts b/packages/opencode/src/acp/types.ts index 42b23091237..9126e2374a2 100644 --- a/packages/opencode/src/acp/types.ts +++ b/packages/opencode/src/acp/types.ts @@ -11,6 +11,7 @@ export interface ACPSessionState { modelID: string } modeId?: string + abortController?: AbortController } export interface ACPConfig {