Skip to content
This repository was archived by the owner on Mar 15, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions config/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,15 @@ services:
arguments:
- "%env(resolve:DATABASE_PRODUCT)%://%env(resolve:DATABASE_USER)%:%env(resolve:DATABASE_PASSWORD)%@%env(resolve:DATABASE_HOST)%:%env(resolve:DATABASE_PORT)%/%env(resolve:DATABASE_DB)%?serverVersion=%env(resolve:DATABASE_SERVERVERSION)%"

# LlmIntegration: bind interface to implementation per consumer (two preparators exist)
App\LlmIntegration\Infrastructure\Service\Cursor\CursorCliBinaryManager:
arguments:
$configWriter: '@App\LlmIntegration\Infrastructure\Service\Cursor\CursorCliConfigWriter'
$runtimePreparator: '@App\LlmIntegration\Infrastructure\Service\Cursor\CursorWorkspaceRuntimePreparator'

App\LlmIntegration\Infrastructure\Service\ClaudeCode\ClaudeCodeBinaryManager:
arguments:
$runtimePreparator: '@App\LlmIntegration\Infrastructure\Service\ClaudeCode\ClaudeCodeWorkspaceRuntimePreparator'

# add more service definitions when explicit configuration is needed
# please note that last definitions always *replace* previous ones
Original file line number Diff line number Diff line change
Expand Up @@ -96,126 +96,117 @@ public function execute(ImplementIssueMessage $message): void

private function doExecute(ImplementIssueMessage $message, ProductConfigDto $config): void
{
$runStarted = $this->workflowConcurrencyFacade->tryStartRun(
$didRun = $this->workflowConcurrencyFacade->executeWithRunClaim(
$message->productConfigId,
$message->issueNumber,
WorkflowRunPhase::Implementation,
$message->runId,
);

if (!$runStarted) {
$this->logger->info('[ImplementationAgent] Skipping stale or duplicate implementation message', [
'productConfigId' => $message->productConfigId,
'issueNumber' => $message->issueNumber,
'runId' => $message->runId,
]);

return;
}
function () use ($message, $config): void {
$issueDto = $this->findIssue($config->githubUrl, $config->githubToken, $message->issueNumber);
if ($issueDto === null) {
$this->logger->error('[ImplementationAgent] Could not fetch issue data', [
'issueNumber' => $message->issueNumber,
]);

return;
}

try {
$issueDto = $this->findIssue($config->githubUrl, $config->githubToken, $message->issueNumber);
if ($issueDto === null) {
$this->logger->error('[ImplementationAgent] Could not fetch issue data', [
'issueNumber' => $message->issueNumber,
]);
$issueComments = $this->githubIntegrationFacade->getIssueComments(
$config->githubUrl,
$config->githubToken,
$message->issueNumber,
);

return;
}
$workspaceInfo = null;

try {
if ($message->isRevision && $message->prBranchName !== null) {
$workspaceInfo = $this->workspaceManagementFacade->acquireWorkspace(
$config->id,
$message->issueNumber,
$config->githubUrl,
$config->githubToken,
$message->prBranchName,
);

$prComments = $message->prNumber !== null
? $this->githubIntegrationFacade->getPullRequestComments($config->githubUrl, $config->githubToken, $message->prNumber)
: [];

$prompt = $this->buildRevisionPrompt($issueDto, $issueComments, $prComments, $message->prNumber);
} else {
$workspaceInfo = $this->workspaceManagementFacade->acquireWorkspace(
$config->id,
$message->issueNumber,
$config->githubUrl,
$config->githubToken,
);

$prompt = $this->buildPrompt($issueDto, $issueComments);
}

$agentApiKeyOverride = $message->agentApiKeyOverride;
$credentials = $agentApiKeyOverride !== null
? new AgentCredentialsDto($agentApiKeyOverride, $agentApiKeyOverride)
: new AgentCredentialsDto($config->cursorAgentApiKey, $config->anthropicApiKey);

$agentResult = $this->llmIntegrationFacade->runAgent(
AgentRole::Implementation,
$prompt,
$workspaceInfo->workspacePath,
$credentials,
$config->githubToken,
$workspaceInfo->containerName,
$message->agentProviderOverride,
);

$issueComments = $this->githubIntegrationFacade->getIssueComments(
$config->githubUrl,
$config->githubToken,
$message->issueNumber,
);
$outcome = ImplementationOutcome::fromAgentOutput($agentResult->success, $agentResult->resultText);

$workspaceInfo = null;
$this->logger->info('[ImplementationAgent] Agent completed', [
'issueNumber' => $message->issueNumber,
'outcome' => $outcome->value,
'isRevision' => $message->isRevision,
'durationMs' => $agentResult->durationMs,
]);

try {
if ($message->isRevision && $message->prBranchName !== null) {
$workspaceInfo = $this->workspaceManagementFacade->acquireWorkspace(
$config->id,
$message->issueNumber,
$this->handleOutcome(
$outcome,
$agentResult->resultText,
$config->githubUrl,
$config->githubToken,
$message->prBranchName,
$message->issueNumber,
$issueDto,
$message->prNumber,
);
} catch (Throwable $e) {
$this->logger->error('[ImplementationAgent] Unhandled exception', [
'issueNumber' => $message->issueNumber,
'isRevision' => $message->isRevision,
'error' => $e->getMessage(),
]);

$prComments = $message->prNumber !== null
? $this->githubIntegrationFacade->getPullRequestComments($config->githubUrl, $config->githubToken, $message->prNumber)
: [];
$this->applyErrorLabels($config->githubUrl, $config->githubToken, $message->issueNumber);

$prompt = $this->buildRevisionPrompt($issueDto, $issueComments, $prComments, $message->prNumber);
} else {
$workspaceInfo = $this->workspaceManagementFacade->acquireWorkspace(
$config->id,
$message->issueNumber,
$this->githubIntegrationFacade->postIssueComment(
$config->githubUrl,
$config->githubToken,
$message->issueNumber,
"**ProductBuilder Implementation Error**\n\nAn unexpected error occurred during implementation:\n\n> " . $e->getMessage(),
);

$prompt = $this->buildPrompt($issueDto, $issueComments);
} finally {
if ($workspaceInfo !== null) {
$this->workspaceManagementFacade->releaseWorkspace($workspaceInfo);
}
}
},
);

$agentApiKeyOverride = $message->agentApiKeyOverride;
$credentials = $agentApiKeyOverride !== null
? new AgentCredentialsDto($agentApiKeyOverride, $agentApiKeyOverride)
: new AgentCredentialsDto($config->cursorAgentApiKey, $config->anthropicApiKey);

$agentResult = $this->llmIntegrationFacade->runAgent(
AgentRole::Implementation,
$prompt,
$workspaceInfo->workspacePath,
$credentials,
$config->githubToken,
$workspaceInfo->containerName,
$message->agentProviderOverride,
);

$outcome = ImplementationOutcome::fromAgentOutput($agentResult->success, $agentResult->resultText);

$this->logger->info('[ImplementationAgent] Agent completed', [
'issueNumber' => $message->issueNumber,
'outcome' => $outcome->value,
'isRevision' => $message->isRevision,
'durationMs' => $agentResult->durationMs,
]);

$this->handleOutcome(
$outcome,
$agentResult->resultText,
$config->githubUrl,
$config->githubToken,
$message->issueNumber,
$issueDto,
$message->prNumber,
);
} catch (Throwable $e) {
$this->logger->error('[ImplementationAgent] Unhandled exception', [
'issueNumber' => $message->issueNumber,
'isRevision' => $message->isRevision,
'error' => $e->getMessage(),
]);

$this->applyErrorLabels($config->githubUrl, $config->githubToken, $message->issueNumber);

$this->githubIntegrationFacade->postIssueComment(
$config->githubUrl,
$config->githubToken,
$message->issueNumber,
"**ProductBuilder Implementation Error**\n\nAn unexpected error occurred during implementation:\n\n> " . $e->getMessage(),
);
} finally {
if ($workspaceInfo !== null) {
$this->workspaceManagementFacade->releaseWorkspace($workspaceInfo);
}
}
} finally {
$this->workflowConcurrencyFacade->releaseRunClaim(
$message->productConfigId,
$message->issueNumber,
$message->runId,
);
if (!$didRun) {
$this->logger->info('[ImplementationAgent] Skipping stale or duplicate implementation message', [
'productConfigId' => $message->productConfigId,
'issueNumber' => $message->issueNumber,
'runId' => $message->runId,
]);
}
}

Expand Down
46 changes: 34 additions & 12 deletions src/LlmIntegration/Infrastructure/Service/AgentStreamParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@
use Symfony\Component\Process\Process;

/**
* Parses NDJSON stream output shared by Cursor CLI and Claude Code CLI agents.
* Orchestrates process execution and NDJSON stream handling: runs the process,
* parses lines via NdjsonStreamLineParser, and logs events / assembles CliAgentRunResult.
*
* Both CLIs emit identical event types: system (init), assistant, tool_call
* (started/completed), and result. This service handles all stream parsing
* and logging for any CLI agent provider.
* Both Cursor CLI and Claude Code CLI emit identical event types: system (init),
* assistant, tool_call (started/completed), and result.
*/
class AgentStreamParser
{
private const float ASSISTANT_MAX_FLUSH_DELAY_SECONDS = 0.6;

public function __construct(
private readonly LoggerInterface $logger,
private readonly NdjsonStreamLineParser $lineParser,
private readonly LoggerInterface $logger,
) {
}

Expand Down Expand Up @@ -144,19 +145,28 @@ public function executeAndParseStream(Process $process, string $providerLabel):
}

/**
* Parses a single NDJSON line, logs a human-readable summary, and returns the decoded data.
* Parses a single NDJSON line (via line parser), logs a human-readable summary, and returns the decoded data.
*
* @return array<string, mixed>|null
*/
private function logStreamEvent(string $jsonLine, string $providerLabel): ?array
{
$parsed = $this->lineParser->parseLine($jsonLine);
if ($parsed === null) {
return null;
}

try {
/** @var array<string, mixed> $event */
$event = json_decode($jsonLine, true, 512, JSON_THROW_ON_ERROR);
/** @var array<string, mixed>|null $event */
$event = json_decode($parsed->jsonLine, true, 512, JSON_THROW_ON_ERROR);
} catch (JsonException) {
return null;
}

if (!is_array($event)) {
return null;
}

$type = is_string($event['type'] ?? null) ? $event['type'] : '';
$subtype = is_string($event['subtype'] ?? null) ? $event['subtype'] : '';

Expand Down Expand Up @@ -481,18 +491,30 @@ private function shouldFlushAssistantBufferByBoundary(string $assistantLogBuffer

private function parseResultLine(string $jsonLine, string $providerLabel): CliAgentRunResult
{
$parsed = $this->lineParser->parseLine($jsonLine);
if ($parsed === null) {
$this->logger->warning('[' . $providerLabel . '] Could not parse result JSON line', [
'preview' => mb_substr($jsonLine, 0, 500),
]);

return new CliAgentRunResult(true, $jsonLine, null, null);
}

try {
/** @var array<string, mixed> $data */
$data = json_decode($jsonLine, true, 512, JSON_THROW_ON_ERROR);
} catch (JsonException $e) {
/** @var array<string, mixed>|null $data */
$data = json_decode($parsed->jsonLine, true, 512, JSON_THROW_ON_ERROR);
} catch (JsonException) {
$this->logger->warning('[' . $providerLabel . '] Could not parse result JSON line', [
'error' => $e->getMessage(),
'preview' => mb_substr($jsonLine, 0, 500),
]);

return new CliAgentRunResult(true, $jsonLine, null, null);
}

if (!is_array($data)) {
return new CliAgentRunResult(true, $jsonLine, null, null);
}

$isError = (bool) ($data['is_error'] ?? false);
$resultText = is_string($data['result'] ?? null) ? $data['result'] : $jsonLine;
$sessionId = is_string($data['session_id'] ?? null) ? $data['session_id'] : null;
Expand Down
Loading