Skip to content

Commit d1973a8

Browse files
committed
Reusable SseService for both transports and session manager
1 parent e67320f commit d1973a8

File tree

6 files changed

+225
-202
lines changed

6 files changed

+225
-202
lines changed

src/Http/Controllers/McpController.php

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@
1212

1313
class McpController extends Controller
1414
{
15-
public function __construct(protected McpHandler $mcpHandler, protected SseService $sseService) {}
15+
public function __construct(
16+
protected McpHandler $mcpHandler,
17+
protected SseService $sse
18+
) {}
1619

1720
/**
1821
* Handle MCP requests.
@@ -57,14 +60,27 @@ public function __invoke(Request $request): JsonResponse|StreamedResponse|Respon
5760
*
5861
* @param array<array-key, mixed> $messages JSON-RPC messages from the client
5962
*/
60-
protected function handlePostSseResponse(array $messages): StreamedResponse
61-
{
62-
$response = $this->sseService->createPostSseResponse(
63-
$messages,
64-
fn ($message) => $this->mcpHandler->handle($message),
65-
);
63+
protected function handlePostSseResponse(
64+
array $messages
65+
): StreamedResponse {
66+
return $this->sse->createSseResponse(function () use ($messages) {
67+
$this->sse->sendComment('stream opened');
68+
69+
if (! isset($messages[0])) {
70+
$messages = [$messages];
71+
}
72+
73+
foreach ($messages as $message) {
74+
if (! isset($message['id'])) {
75+
logger('Missing ID in message');
6676

67-
return $response;
77+
continue;
78+
}
79+
80+
$response = $this->mcpHandler->handle($message);
81+
$this->sse->sendEvent($response);
82+
}
83+
});
6884
}
6985

7086
/**

src/Http/Controllers/McpSSEController.php

Lines changed: 20 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -7,82 +7,55 @@
77
use Illuminate\Routing\Controller;
88
use Illuminate\Support\Facades\Log;
99
use Kirschbaum\Loop\McpHandler;
10-
use Kirschbaum\Loop\Services\McpSseService;
10+
use Kirschbaum\Loop\Services\SseService;
11+
use Kirschbaum\Loop\Services\SseSessionManager;
1112
use Symfony\Component\HttpFoundation\StreamedResponse;
1213

1314
class McpSSEController extends Controller
1415
{
1516
public function __construct(
16-
protected McpHandler $mcpHandler
17+
protected McpHandler $mcpHandler,
18+
protected SseService $sse,
19+
protected SseSessionManager $sessionManager
1720
) {}
1821

1922
/**
2023
* Handle SSE connection setup (GET request), creates server-client event stream.
2124
*/
2225
public function connect(Request $request): StreamedResponse
2326
{
24-
$headers = [
25-
'Content-Type' => 'text/event-stream',
26-
'Cache-Control' => 'no-cache, no-store, max-age=0, must-revalidate',
27-
'Pragma' => 'no-cache',
28-
'X-Accel-Buffering' => 'no',
29-
'Connection' => 'keep-alive',
30-
];
31-
3227
$sessionId = uniqid();
33-
3428
$ssePath = config('loop.sse.path', '/mcp/sse');
3529
$postEndpoint = $ssePath.'/message/?sessionId='.$sessionId;
3630

37-
set_time_limit(0);
38-
ignore_user_abort(false); // Detects client disconnects
39-
40-
return response()->stream(function () use ($sessionId, $postEndpoint) {
41-
while (ob_get_level() > 0) {
42-
ob_end_clean();
43-
}
44-
ob_implicit_flush(true);
45-
46-
McpSseService::registerConnection($sessionId);
47-
48-
echo "event: endpoint\n";
49-
echo "data: {$postEndpoint}\n\n";
50-
flush();
31+
return $this->sse->createSseResponse(function () use ($sessionId, $postEndpoint) {
32+
$this->sessionManager->registerConnection($sessionId);
5133

52-
$debugTime = time();
53-
echo "event: debug\n";
54-
echo "data: {\"message\":\"Connection established\",\"time\":{$debugTime}}\n\n";
55-
flush();
34+
$this->sse->sendEvent($postEndpoint, 'endpoint');
35+
$this->sse->sendDebug('Connection established');
5636

57-
$heartbeatInterval = 30; // seconds
37+
$heartbeatInterval = 30;
5838
$lastHeartbeat = time();
5939
$lastMessageId = -1;
6040

6141
register_shutdown_function(function () use ($sessionId) {
62-
McpSseService::removeConnection($sessionId);
42+
$this->sessionManager->removeConnection($sessionId);
6343
});
6444

6545
$connectionStatus = connection_status();
6646

6747
while ($connectionStatus === CONNECTION_NORMAL) {
68-
$messages = McpSseService::getMessages($sessionId, $lastMessageId);
48+
$messages = $this->sessionManager->getMessages($sessionId, $lastMessageId);
6949

7050
foreach ($messages as $message) {
7151
$lastMessageId = $message['id'];
7252

73-
logger(json_encode($message));
74-
75-
echo "event: message\n";
76-
echo 'data: '.json_encode($message['data'])."\n\n";
77-
flush();
53+
$this->sse->sendEvent($message['data']);
7854
}
7955

8056
if ((time() - $lastHeartbeat) >= $heartbeatInterval) {
81-
$currentTime = time();
82-
echo "event: ping\n";
83-
echo "data: {$currentTime}\n\n";
84-
flush();
85-
$lastHeartbeat = $currentTime;
57+
$this->sse->sendPing();
58+
$lastHeartbeat = time();
8659
}
8760

8861
$oldStatus = $connectionStatus;
@@ -91,20 +64,19 @@ public function connect(Request $request): StreamedResponse
9164
usleep(100000);
9265
}
9366

94-
McpSseService::removeConnection($sessionId);
95-
}, 200, $headers);
67+
$this->sessionManager->removeConnection($sessionId);
68+
});
9669
}
9770

9871
/**
9972
* Handle POST messages from client
10073
*/
10174
public function message(Request $request): JsonResponse
10275
{
103-
10476
$requestData = $request->all();
10577
$sessionId = $request->query('sessionId');
10678

107-
if (! McpSseService::sessionExists($sessionId)) {
79+
if (! $this->sessionManager->sessionExists($sessionId)) {
10880
return response()->json([
10981
'status' => 'error',
11082
'message' => 'Session not found or expired',
@@ -114,7 +86,7 @@ public function message(Request $request): JsonResponse
11486
try {
11587
$response = $this->mcpHandler->handle($requestData);
11688

117-
$success = McpSseService::sendToClient($sessionId, $response);
89+
$success = $this->sessionManager->sendToClient($sessionId, $response);
11890

11991
if (! $success) {
12092
Log::warning("Failed to store message for client: {$sessionId}");
@@ -139,7 +111,7 @@ public function message(Request $request): JsonResponse
139111
'message' => $e->getMessage(),
140112
],
141113
];
142-
McpSseService::sendToClient($sessionId, $errorResponse);
114+
$this->sessionManager->sendToClient($sessionId, $errorResponse);
143115

144116
return response()->json([
145117
'status' => 'error',

src/LoopServiceProvider.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
namespace Kirschbaum\Loop;
44

55
use Kirschbaum\Loop\Commands\LoopMcpServerStartCommand;
6-
use Kirschbaum\Loop\Services\McpSseService;
76
use Kirschbaum\Loop\Services\SseDriverManager;
87
use Kirschbaum\Loop\Services\SseService;
8+
use Kirschbaum\Loop\Services\SseSessionManager;
99
use Spatie\LaravelPackageTools\Package;
1010
use Spatie\LaravelPackageTools\PackageServiceProvider;
1111

@@ -42,8 +42,8 @@ public function packageBooted(): void
4242
return new SseService;
4343
});
4444

45-
$this->app->singleton(McpSseService::class, function ($app) {
46-
return new McpSseService;
45+
$this->app->singleton(SseSessionManager::class, function ($app) {
46+
return new SseSessionManager($app->make(SseDriverManager::class));
4747
});
4848

4949
$this->app->singleton(SseDriverManager::class, function ($app) {

src/Services/McpSseService.php

Lines changed: 0 additions & 102 deletions
This file was deleted.

0 commit comments

Comments
 (0)