Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 64 additions & 3 deletions packages/opencode/src/acp/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ export namespace ACP {
private connection: AgentSideConnection
private config: ACPConfig
private sdk: OpencodeClient
private sessionManager
private sessionManager: ACPSessionManager
private sessionAbortControllers = new Map<string, AbortController>()

constructor(connection: AgentSideConnection, config: ACPConfig) {
this.connection = connection
Expand All @@ -65,8 +66,20 @@ export namespace ACP {
{ optionId: "always", kind: "allow_always", name: "Always allow" },
{ optionId: "reject", kind: "reject_once", name: "Reject" },
]
this.config.sdk.event.subscribe({ directory }).then(async (events) => {
for await (const event of events.stream) {

// Create and store abort controller for this session's event stream
const abortController = new AbortController()
this.sessionAbortControllers.set(sessionId, abortController)

this.config.sdk.event
.subscribe(
{ directory },
{
signal: abortController.signal,
},
)
.then(async (events) => {
for await (const event of events.stream) {
switch (event.type) {
case "permission.asked":
try {
Expand Down Expand Up @@ -345,6 +358,18 @@ export namespace ACP {
}
}
})
.catch((error) => {
// Ignore abort errors - they're expected during cleanup
if (error?.name === "AbortError" || error?.message?.includes("abort")) {
log.info("event stream aborted", { sessionId })
return
}
log.error("event stream error", { error, sessionId })
})
.finally(() => {
// Clean up abort controller when stream ends
this.sessionAbortControllers.delete(sessionId)
})
}

async initialize(params: InitializeRequest): Promise<InitializeResponse> {
Expand Down Expand Up @@ -962,6 +987,42 @@ export namespace ACP {
{ throwOnError: true },
)
}

/**
* Dispose of all session resources including event streams and session map entries.
* This should be called when the agent is being destroyed to prevent memory leaks.
*/
async dispose() {
log.info("disposing agent", { sessionCount: this.sessionAbortControllers.size })

// Abort all event streams
for (const [sessionId, controller] of this.sessionAbortControllers.entries()) {
log.info("aborting event stream", { sessionId })
controller.abort()
}
this.sessionAbortControllers.clear()

// Clear all sessions from the manager
this.sessionManager.clear()
}

/**
* Close a specific session and clean up its resources.
* This should be called when a session is explicitly closed/ended.
*/
closeSession(sessionId: string) {
log.info("closing session", { sessionId })

// Abort the event stream for this session
const controller = this.sessionAbortControllers.get(sessionId)
if (controller) {
controller.abort()
this.sessionAbortControllers.delete(sessionId)
}

// Remove from session manager
this.sessionManager.delete(sessionId)
}
}

function toToolKind(toolName: string): ToolKind {
Expand Down
8 changes: 8 additions & 0 deletions packages/opencode/src/acp/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ export class ACPSessionManager {
this.sdk = sdk
}

delete(sessionId: string): boolean {
return this.sessions.delete(sessionId)
}

clear(): void {
this.sessions.clear()
}

async create(cwd: string, mcpServers: McpServer[], model?: ACPSessionState["model"]): Promise<ACPSessionState> {
const session = await this.sdk.session
.create(
Expand Down
1 change: 1 addition & 0 deletions packages/opencode/src/acp/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export interface ACPSessionState {
modelID: string
}
modeId?: string
abortController?: AbortController
}

export interface ACPConfig {
Expand Down