77use Illuminate \Routing \Controller ;
88use Illuminate \Support \Facades \Log ;
99use Kirschbaum \Loop \McpHandler ;
10- use Kirschbaum \Loop \Services \McpSseService ;
10+ use Kirschbaum \Loop \Services \SseService ;
11+ use Kirschbaum \Loop \Services \SseSessionManager ;
1112use Symfony \Component \HttpFoundation \StreamedResponse ;
1213
1314class McpSSEController extends Controller
1415{
1516 public function __construct (
16- protected McpHandler $ mcpHandler
17+ protected McpHandler $ mcpHandler ,
18+ protected SseService $ sse ,
19+ protected SseSessionManager $ sessionManager
1720 ) {}
1821
1922 /**
2023 * Handle SSE connection setup (GET request), creates server-client event stream.
2124 */
2225 public function connect (Request $ request ): StreamedResponse
2326 {
24- $ headers = [
25- 'Content-Type ' => 'text/event-stream ' ,
26- 'Cache-Control ' => 'no-cache, no-store, max-age=0, must-revalidate ' ,
27- 'Pragma ' => 'no-cache ' ,
28- 'X-Accel-Buffering ' => 'no ' ,
29- 'Connection ' => 'keep-alive ' ,
30- ];
31-
3227 $ sessionId = uniqid ();
33-
3428 $ ssePath = config ('loop.sse.path ' , '/mcp/sse ' );
3529 $ postEndpoint = $ ssePath .'/message/?sessionId= ' .$ sessionId ;
3630
37- set_time_limit (0 );
38- ignore_user_abort (false ); // Detects client disconnects
39-
40- return response ()->stream (function () use ($ sessionId , $ postEndpoint ) {
41- while (ob_get_level () > 0 ) {
42- ob_end_clean ();
43- }
44- ob_implicit_flush (true );
45-
46- McpSseService::registerConnection ($ sessionId );
47-
48- echo "event: endpoint \n" ;
49- echo "data: {$ postEndpoint }\n\n" ;
50- flush ();
31+ return $ this ->sse ->createSseResponse (function () use ($ sessionId , $ postEndpoint ) {
32+ $ this ->sessionManager ->registerConnection ($ sessionId );
5133
52- $ debugTime = time ();
53- echo "event: debug \n" ;
54- echo "data: { \"message \": \"Connection established \", \"time \": {$ debugTime }} \n\n" ;
55- flush ();
34+ $ this ->sse ->sendEvent ($ postEndpoint , 'endpoint ' );
35+ $ this ->sse ->sendDebug ('Connection established ' );
5636
57- $ heartbeatInterval = 30 ; // seconds
37+ $ heartbeatInterval = 30 ;
5838 $ lastHeartbeat = time ();
5939 $ lastMessageId = -1 ;
6040
6141 register_shutdown_function (function () use ($ sessionId ) {
62- McpSseService:: removeConnection ($ sessionId );
42+ $ this -> sessionManager -> removeConnection ($ sessionId );
6343 });
6444
6545 $ connectionStatus = connection_status ();
6646
6747 while ($ connectionStatus === CONNECTION_NORMAL ) {
68- $ messages = McpSseService:: getMessages ($ sessionId , $ lastMessageId );
48+ $ messages = $ this -> sessionManager -> getMessages ($ sessionId , $ lastMessageId );
6949
7050 foreach ($ messages as $ message ) {
7151 $ lastMessageId = $ message ['id ' ];
7252
73- logger (json_encode ($ message ));
74-
75- echo "event: message \n" ;
76- echo 'data: ' .json_encode ($ message ['data ' ])."\n\n" ;
77- flush ();
53+ $ this ->sse ->sendEvent ($ message ['data ' ]);
7854 }
7955
8056 if ((time () - $ lastHeartbeat ) >= $ heartbeatInterval ) {
81- $ currentTime = time ();
82- echo "event: ping \n" ;
83- echo "data: {$ currentTime }\n\n" ;
84- flush ();
85- $ lastHeartbeat = $ currentTime ;
57+ $ this ->sse ->sendPing ();
58+ $ lastHeartbeat = time ();
8659 }
8760
8861 $ oldStatus = $ connectionStatus ;
@@ -91,20 +64,19 @@ public function connect(Request $request): StreamedResponse
9164 usleep (100000 );
9265 }
9366
94- McpSseService:: removeConnection ($ sessionId );
95- }, 200 , $ headers );
67+ $ this -> sessionManager -> removeConnection ($ sessionId );
68+ });
9669 }
9770
9871 /**
9972 * Handle POST messages from client
10073 */
10174 public function message (Request $ request ): JsonResponse
10275 {
103-
10476 $ requestData = $ request ->all ();
10577 $ sessionId = $ request ->query ('sessionId ' );
10678
107- if (! McpSseService:: sessionExists ($ sessionId )) {
79+ if (! $ this -> sessionManager -> sessionExists ($ sessionId )) {
10880 return response ()->json ([
10981 'status ' => 'error ' ,
11082 'message ' => 'Session not found or expired ' ,
@@ -114,7 +86,7 @@ public function message(Request $request): JsonResponse
11486 try {
11587 $ response = $ this ->mcpHandler ->handle ($ requestData );
11688
117- $ success = McpSseService:: sendToClient ($ sessionId , $ response );
89+ $ success = $ this -> sessionManager -> sendToClient ($ sessionId , $ response );
11890
11991 if (! $ success ) {
12092 Log::warning ("Failed to store message for client: {$ sessionId }" );
@@ -139,7 +111,7 @@ public function message(Request $request): JsonResponse
139111 'message ' => $ e ->getMessage (),
140112 ],
141113 ];
142- McpSseService:: sendToClient ($ sessionId , $ errorResponse );
114+ $ this -> sessionManager -> sendToClient ($ sessionId , $ errorResponse );
143115
144116 return response ()->json ([
145117 'status ' => 'error ' ,
0 commit comments