Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 42 additions & 4 deletions api/server/experimental.js
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,22 @@ if (cluster.isMaster) {
}:${port}`,
);

/** Initialize MCP servers and OAuth reconnection for this worker */
await initializeMCPs();
await initializeOAuthReconnectManager();
await checkMigrations();
/**
* The listen callback is async, so any rejection from these awaits
* would otherwise be detached from `startServer().catch(...)`. Without
* explicit handling, the global `unhandledRejection` handler would
* swallow init failures and leave the worker listening but only
* partially initialized.
*/
try {
/** Initialize MCP servers and OAuth reconnection for this worker */
await initializeMCPs();
await initializeOAuthReconnectManager();
await checkMigrations();
} catch (initErr) {
logger.error(`Worker ${process.pid} post-listen initialization failed:`, initErr);
process.exit(1);
}
});

/** Handle inter-process messages from master */
Expand Down Expand Up @@ -441,3 +453,29 @@ process.on('uncaughtException', (err) => {

process.exit(1);
});

/**
* Unhandled promise rejection handler.
*
* Node 15+ terminates the process by default when a promise rejection is
* unhandled. MCP OAuth reconnect storms and streamable-HTTP transport resets
* can produce transient fire-and-forget rejections (ECONNRESET, token refresh
* races) that are recoverable — the server should log and keep serving other
* requests rather than silently crash under load.
*
* Non-Error reasons are forwarded as-is so structured payloads (e.g.
* `{ code: "ECONNRESET", errno: -104 }`) survive instead of being collapsed to
* "[object Object]" by `String()`.
*/
process.on('unhandledRejection', (reason) => {
if (reason instanceof Error) {
logger.error('Unhandled promise rejection. The app will continue running.', {
name: reason.name,
message: reason.message,
stack: reason.stack,
cause: reason.cause,
});
return;
}
logger.error('Unhandled promise rejection. The app will continue running.', { reason });
});
80 changes: 65 additions & 15 deletions api/server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -222,25 +222,49 @@ const startServer = async () => {
logger.info(`Server listening at http://${host == '0.0.0.0' ? 'localhost' : host}:${port}`);
}

await runAsSystem(async () => {
await initializeMCPs();
await initializeOAuthReconnectManager();
});
await checkMigrations();

// Configure stream services (auto-detects Redis from USE_REDIS env var)
const streamServices = createStreamServices();
GenerationJobManager.configure(streamServices);
GenerationJobManager.initialize();

const inspectFlags = process.execArgv.some((arg) => arg.startsWith('--inspect'));
if (inspectFlags || isEnabled(process.env.MEM_DIAG)) {
memoryDiagnostics.start();
/**
* The listen callback is async, so any rejection from these awaits would
* otherwise be detached from `startServer().catch(...)` (which only
* catches errors that happen before `app.listen`). Without explicit
* handling, the global `unhandledRejection` handler would swallow init
* failures and leave the server listening but only partially
* initialized — passing liveness checks while serving broken requests.
*/
try {
await runAsSystem(async () => {
await initializeMCPs();
await initializeOAuthReconnectManager();
});
await checkMigrations();

// Configure stream services (auto-detects Redis from USE_REDIS env var)
const streamServices = createStreamServices();
GenerationJobManager.configure(streamServices);
GenerationJobManager.initialize();

const inspectFlags = process.execArgv.some((arg) => arg.startsWith('--inspect'));
if (inspectFlags || isEnabled(process.env.MEM_DIAG)) {
memoryDiagnostics.start();
}
} catch (initErr) {
logger.error('Post-listen initialization failed:', initErr);
process.exit(1);
}
});
};

startServer();
/**
* Boot rejections (e.g. `connectDb`, `getAppConfig`, `performStartupChecks`)
* must remain fail-fast: a half-initialized process with no listening HTTP
* server should die immediately so the orchestrator restarts it, instead of
* being kept alive by the `unhandledRejection` handler below until the
* liveness probe eventually times out. Mirrors the pattern in
* `experimental.js`.
*/
startServer().catch((err) => {
logger.error('Failed to start server:', err);
process.exit(1);
});

let messageCount = 0;
process.on('uncaughtException', (err) => {
Expand Down Expand Up @@ -299,5 +323,31 @@ process.on('uncaughtException', (err) => {
process.exit(1);
});

/**
* Unhandled promise rejection handler.
*
* Node 15+ terminates the process by default when a promise rejection is
* unhandled. MCP OAuth reconnect storms and streamable-HTTP transport resets
* can produce transient fire-and-forget rejections (ECONNRESET, token refresh
* races) that are recoverable — the server should log and keep serving other
* requests rather than silently crash under load.
*
* Non-Error reasons are forwarded as-is so structured payloads (e.g.
* `{ code: "ECONNRESET", errno: -104 }`) survive instead of being collapsed to
* "[object Object]" by `String()`.
*/
process.on('unhandledRejection', (reason) => {
if (reason instanceof Error) {
logger.error('Unhandled promise rejection. The app will continue running.', {
name: reason.name,
message: reason.message,
stack: reason.stack,
cause: reason.cause,
});
return;
}
logger.error('Unhandled promise rejection. The app will continue running.', { reason });
});

/** Export app for easier testing purposes */
module.exports = app;
85 changes: 85 additions & 0 deletions api/server/routes/__tests__/mcp.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ jest.mock('@librechat/api', () => {

jest.mock('@librechat/data-schemas', () => ({
getTenantId: jest.fn(),
tenantStorage: {
run: jest.fn((store, fn) => fn()),
},
logger: {
debug: jest.fn(),
info: jest.fn(),
Expand Down Expand Up @@ -1862,6 +1865,88 @@ describe('MCP Routes', () => {
});
});

describe('GET /:serverName/oauth/callback - Tenant Context', () => {
beforeEach(() => {
const { getTenantId, tenantStorage } = require('@librechat/data-schemas');
const { MCPOAuthHandler, MCPTokenStorage } = require('@librechat/api');
getTenantId.mockReset();
tenantStorage.run.mockReset();
tenantStorage.run.mockImplementation((store, fn) => fn());
MCPOAuthHandler.resolveStateToFlowId.mockReset();
MCPOAuthHandler.getFlowState.mockReset();
MCPOAuthHandler.completeOAuthFlow.mockReset();
MCPTokenStorage.storeTokens.mockReset();
});

it('should wrap callback body in tenantStorage.run when flowState has tenantId and no current context', async () => {
const { getTenantId, tenantStorage } = require('@librechat/data-schemas');
const { MCPOAuthHandler, MCPTokenStorage } = require('@librechat/api');
const flowId = 'user123:test-server';
const csrfToken = generateTestCsrfToken(flowId);

getTenantId.mockReturnValue(undefined);

MCPOAuthHandler.resolveStateToFlowId.mockResolvedValue(flowId);
MCPOAuthHandler.getFlowState.mockResolvedValue({
serverName: 'test-server',
userId: 'user123',
tenantId: 'tenant-abc',
metadata: {},
clientInfo: {},
codeVerifier: 'test-verifier',
});
MCPOAuthHandler.completeOAuthFlow.mockResolvedValue({
access_token: 'token',
token_type: 'bearer',
});
MCPTokenStorage.storeTokens.mockResolvedValue();

const response = await request(app)
.get(`/api/mcp/test-server/oauth/callback?code=test-code&state=${flowId}`)
.set('Cookie', [`oauth_csrf=${csrfToken}`])
.expect(302);

expect(tenantStorage.run).toHaveBeenCalledWith(
{ tenantId: 'tenant-abc' },
expect.any(Function),
);
expect(MCPTokenStorage.storeTokens).toHaveBeenCalled();

const basePath = getBasePath();
expect(response.headers.location).toContain(`${basePath}/oauth/success`);
});

it('should not call tenantStorage.run when flowState has no tenantId', async () => {
const { getTenantId, tenantStorage } = require('@librechat/data-schemas');
const { MCPOAuthHandler, MCPTokenStorage } = require('@librechat/api');
const flowId = 'user123:test-server';
const csrfToken = generateTestCsrfToken(flowId);

getTenantId.mockReturnValue(undefined);

MCPOAuthHandler.resolveStateToFlowId.mockResolvedValue(flowId);
MCPOAuthHandler.getFlowState.mockResolvedValue({
serverName: 'test-server',
userId: 'user123',
metadata: {},
clientInfo: {},
codeVerifier: 'test-verifier',
});
MCPOAuthHandler.completeOAuthFlow.mockResolvedValue({
access_token: 'token',
token_type: 'bearer',
});
MCPTokenStorage.storeTokens.mockResolvedValue();

await request(app)
.get(`/api/mcp/test-server/oauth/callback?code=test-code&state=${flowId}`)
.set('Cookie', [`oauth_csrf=${csrfToken}`])
.expect(302);

expect(tenantStorage.run).not.toHaveBeenCalled();
});
});

describe('GET /servers', () => {
// mockRegistryInstance is defined at the top of the file

Expand Down
Loading
Loading