diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index 82576a6986f1d..9c4adf181bdbf 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -1393,8 +1393,8 @@ class ApiGateway { }); await res(queryType === QueryTypeEnum.REGULAR_QUERY ? - { sql: toQuery(sqlQueries[0]) } : - sqlQueries.map((sqlQuery) => ({ sql: toQuery(sqlQuery) }))); + { sql: toQuery(sqlQueries[0]), dataSource: sqlQueries[0].dataSource } : + sqlQueries.map((sqlQuery) => ({ sql: toQuery(sqlQuery), dataSource: sqlQuery.dataSource }))); } catch (e: any) { this.handleError({ e, context, query, res, requestStarted diff --git a/packages/cubejs-api-gateway/test/index.test.ts b/packages/cubejs-api-gateway/test/index.test.ts index bf4a2b4d7fe9e..d5dbf9d6e2601 100644 --- a/packages/cubejs-api-gateway/test/index.test.ts +++ b/packages/cubejs-api-gateway/test/index.test.ts @@ -851,6 +851,43 @@ describe('API Gateway', () => { }); }); + describe('/v1/sql endpoint dataSource', () => { + test('returns dataSource for single query', async () => { + const { app } = await createApiGateway(); + const query = JSON.stringify({ measures: ['Foo.bar'] }); + + const res = await request(app) + .get(`/cubejs-api/v1/sql?query=${encodeURIComponent(query)}`) + .set('Authorization', 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.t-IDcSemACt8x4iTMCda8Yhe3iZaWbvV5XKSTbuAn0M') + .expect(200); + + expect(res.body).toHaveProperty('sql'); + expect(res.body).toHaveProperty('dataSource'); + expect(res.body.dataSource).toBe('default'); + }); + + test('returns dataSource for blending query', async () => { + const { app } = await createApiGateway(); + const query = JSON.stringify([ + { measures: ['Foo.bar'], timeDimensions: [{ dimension: 'Foo.time', granularity: 'day' }] }, + { measures: ['Foo.bar'], timeDimensions: [{ dimension: 'Foo.time', granularity: 'day' }] } + ]); + + const res = await request(app) + .get(`/cubejs-api/v1/sql?query=${encodeURIComponent(query)}`) + .set('Authorization', 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.t-IDcSemACt8x4iTMCda8Yhe3iZaWbvV5XKSTbuAn0M') + .expect(200); + + expect(Array.isArray(res.body)).toBe(true); + expect(res.body.length).toBe(2); + res.body.forEach((item: any) => { + expect(item).toHaveProperty('sql'); + expect(item).toHaveProperty('dataSource'); + expect(item.dataSource).toBe('default'); + }); + }); + }); + describe('/cubejs-system/v1', () => { const scheduledRefreshContextsFactory = () => ([ { securityContext: { foo: 'bar' } }, diff --git a/packages/cubejs-api-gateway/test/mocks.ts b/packages/cubejs-api-gateway/test/mocks.ts index e9ffa95dbccd0..f2341b544c323 100644 --- a/packages/cubejs-api-gateway/test/mocks.ts +++ b/packages/cubejs-api-gateway/test/mocks.ts @@ -67,7 +67,8 @@ export const compilerApi = jest.fn().mockImplementation(async () => ({ foo__bar: 'Foo.bar', foo__time: 'Foo.time', }, - order: [{ id: 'id', desc: true, }] + order: [{ id: 'id', desc: true, }], + dataSource: 'default' }; }, diff --git a/packages/cubejs-cubestore-driver/src/WebSocketConnection.ts b/packages/cubejs-cubestore-driver/src/WebSocketConnection.ts index e3f6dfe800060..200017321dc83 100644 --- a/packages/cubejs-cubestore-driver/src/WebSocketConnection.ts +++ b/packages/cubejs-cubestore-driver/src/WebSocketConnection.ts @@ -14,16 +14,29 @@ import { HttpTable } from '../codegen'; +interface SentMessage { + resolve: (value: any) => void; + reject: (reason?: any) => void; + buffer: Uint8Array; +} + +interface CubeStoreWebSocket extends WebSocket { + readyPromise: Promise; + lastHeartBeat: Date; + sentMessages: Record; + sendAsync: (message: Uint8Array) => Promise; +} + export class WebSocketConnection { protected messageCounter: number; - protected maxConnectRetries: number; + protected readonly maxConnectRetries: number; - protected noHeartBeatTimeout: number; + protected readonly noHeartBeatTimeout: number; protected currentConnectionTry: number; - protected webSocket: any; + protected webSocket: CubeStoreWebSocket | null = null; private readonly url: string; @@ -38,10 +51,10 @@ export class WebSocketConnection { this.connectionId = uuidv4(); } - protected async initWebSocket() { + protected async initWebSocket(): Promise { if (!this.webSocket) { - const webSocket: any = new WebSocket(this.url); - webSocket.readyPromise = new Promise((resolve, reject) => { + const webSocket = new WebSocket(this.url) as CubeStoreWebSocket; + webSocket.readyPromise = new Promise((resolve, reject) => { webSocket.lastHeartBeat = new Date(); const pingInterval = setInterval(() => { if (webSocket.readyState === WebSocket.OPEN) { @@ -53,12 +66,15 @@ export class WebSocketConnection { } }, 5000); - webSocket.sendAsync = async (message) => new Promise((resolveSend, rejectSend) => { + webSocket.sendAsync = async (message: Uint8Array) => new Promise((resolveSend, rejectSend) => { // If socket is closing this message should be resent if (webSocket.readyState === WebSocket.OPEN) { webSocket.send(message, (err) => { if (err) { - rejectSend(err); + rejectSend(new ConnectionError( + `CubeStore connection error: ${err.message}`, + err + )); } else { resolveSend(); } @@ -81,7 +97,7 @@ export class WebSocketConnection { } if (webSocket === this.webSocket) { - this.webSocket = undefined; + this.webSocket = null; } }); webSocket.on('pong', () => { @@ -112,10 +128,10 @@ export class WebSocketConnection { } if (webSocket === this.webSocket) { - this.webSocket = undefined; + this.webSocket = null; } }); - webSocket.on('message', async (msg) => { + webSocket.on('message', async (msg: Buffer) => { const buf = new flatbuffers.ByteBuffer(msg); const httpMessage = HttpMessage.getRootAsHttpMessage(buf); const resolvers = webSocket.sentMessages[httpMessage.messageId()]; @@ -179,10 +195,12 @@ export class WebSocketConnection { } }); }); + webSocket.sentMessages = {}; this.webSocket = webSocket; } - return this.webSocket.readyPromise; + + return this.webSocket!.readyPromise; } private retryWaitTime() { @@ -196,10 +214,14 @@ export class WebSocketConnection { socket.send(buffer, (err) => { if (err) { delete socket.sentMessages[messageId]; - reject(err); + reject(new ConnectionError( + `CubeStore connection error: ${err.message}`, + err + )); } }); } + socket.sentMessages[messageId] = { resolve, reject, diff --git a/packages/cubejs-cubestore-driver/src/errors.ts b/packages/cubejs-cubestore-driver/src/errors.ts index cdef8bb88447d..df9f52529a375 100644 --- a/packages/cubejs-cubestore-driver/src/errors.ts +++ b/packages/cubejs-cubestore-driver/src/errors.ts @@ -7,6 +7,7 @@ export class ConnectionError extends CubeStoreError { public constructor(message: string, cause?: Error) { super(message); + this.name = 'ConnectionError'; this.cause = cause; } @@ -15,6 +16,7 @@ export class ConnectionError extends CubeStoreError { export class QueryError extends CubeStoreError { public constructor(message: string) { super(message); + this.name = 'QueryError'; } }