Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions packages/cubejs-mssql-driver/src/MSSqlDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
TableColumnQueryResult,
GenericDataBaseType,
} from '@cubejs-backend/base-driver';
import { QueryStream } from './QueryStream';
import { QueryStream, transformRow } from './QueryStream';

// ********* Value converters ***************** //
const numericTypes = [
Expand Down Expand Up @@ -63,7 +63,7 @@ const MSSqlToGenericType: Record<string, string> = {
* MS SQL driver class.
*/
export class MSSqlDriver extends BaseDriver implements DriverInterface {
private readonly connectionPool: ConnectionPool;
private readonly pool: ConnectionPool;

private readonly initialConnectPromise: Promise<ConnectionPool>;

Expand All @@ -90,6 +90,11 @@ export class MSSqlDriver extends BaseDriver implements DriverInterface {
*/
maxPoolSize?: number,

/**
* Min pool size value for the [cube]<-->[db] pool.
*/
minPoolSize?: number,

/**
* Time to wait for a response from a connection after validation
* request before determining it as not valid. Default - 10000 ms.
Expand All @@ -105,9 +110,6 @@ export class MSSqlDriver extends BaseDriver implements DriverInterface {
config.dataSource ||
assertDataSource('default');

/**
* @type {import('mssql').config}
*/
this.config = {
readOnly: true,
server: getEnv('dbHost', { dataSource }),
Expand All @@ -119,22 +121,26 @@ export class MSSqlDriver extends BaseDriver implements DriverInterface {
requestTimeout: getEnv('dbQueryTimeout') * 1000,
options: {
encrypt: getEnv('dbSsl', { dataSource }),
useUTC: false
useUTC: true
},
pool: {
max:
config.maxPoolSize ||
getEnv('dbMaxPoolSize', { dataSource }) ||
8,
min: 0,
min: config.minPoolSize ||
getEnv('dbMinPoolSize', { dataSource }) ||
0,
idleTimeoutMillis: 30 * 1000,
acquireTimeoutMillis: 20 * 1000
},
...config
};
const { readOnly, ...poolConfig } = this.config;
this.connectionPool = new ConnectionPool(poolConfig as MsSQLConfig);
this.initialConnectPromise = this.connectionPool.connect();

const { readOnly: _, ...poolConfig } = this.config;

this.pool = new ConnectionPool(poolConfig as MsSQLConfig);
this.initialConnectPromise = this.pool.connect();
}

/**
Expand Down Expand Up @@ -292,7 +298,10 @@ export class MSSqlDriver extends BaseDriver implements DriverInterface {
// TODO time zone UTC set in driver ?

cancelFn = () => request.cancel();
return request.query(query).then(res => res.recordset);
return request.query(query).then(res => {
res.recordset?.forEach(transformRow);
return res.recordset;
});
});
promise.cancel = () => cancelFn && cancelFn();
return promise;
Expand Down Expand Up @@ -384,6 +393,10 @@ export class MSSqlDriver extends BaseDriver implements DriverInterface {
query.query = `SELECT TOP ${query.limit} * FROM (${query.query}) AS t`;
}

public override async release(): Promise<void> {
await this.pool.close();
}

public capabilities(): DriverCapabilities {
return {
incrementalSchemaLoading: true,
Expand Down
20 changes: 11 additions & 9 deletions packages/cubejs-mssql-driver/src/QueryStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ import {
getEnv,
} from '@cubejs-backend/shared';

export function transformRow(row: Record<string, any>) {
for (const [key, value] of Object.entries(row)) {
if (value instanceof Date) {
// toJSON() returns an ISO-8601 UTC string (e.g. "2017-01-03T00:00:00.000Z")
// because useUTC: true makes tedious construct Date objects via Date.UTC(),
row[key] = value.toJSON();
}
}
}

/**
* MS-SQL query stream class.
*/
Expand All @@ -23,7 +33,7 @@ export class QueryStream extends Readable {
});
this.request = request;
this.request.on('row', row => {
this.transformRow(row);
transformRow(row);
const canAdd = this.push(row);
if (this.toRead-- <= 0 || !canAdd) {
this.request?.pause();
Expand All @@ -45,14 +55,6 @@ export class QueryStream extends Readable {
this.request?.resume();
}

private transformRow(row: Record<string, any>) {
for (const [key, value] of Object.entries(row)) {
if (value instanceof Date) {
row[key] = value.toJSON();
}
}
}

/**
* @override
*/
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-schema-compiler/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
"@clickhouse/client": "^1.12.0",
"@cubejs-backend/linter": "1.6.14",
"@cubejs-backend/query-orchestrator": "1.6.14",
"@cubejs-backend/mssql-driver": "1.6.14",
"@types/babel__code-frame": "^7.0.6",
"@types/babel__generator": "^7.6.8",
"@types/babel__traverse": "^7.20.5",
Expand All @@ -76,7 +77,6 @@
"@types/uuid": "^8.3.0",
"jest": "^29",
"moment": "^2.30.1",
"mssql": "^10.0.2",
"mysql": "^2.18.1",
"node-fetch": "2",
"pg-promise": "^11.5.5",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,60 +1,58 @@
// eslint-disable-next-line import/no-extraneous-dependencies
import { GenericContainer, Wait } from 'testcontainers';
import sql from 'mssql';
import { MSSqlDriver } from '@cubejs-backend/mssql-driver';

import { BaseDbRunner } from '../utils/BaseDbRunner';
import { MssqlQuery } from '../../../src';

export class MSSqlDbRunner extends BaseDbRunner {
async connectionLazyInit(port) {
return {
testQueries: async (queries, fixture) => {
const pool = new sql.ConnectionPool({
server: 'localhost',
port,
user: 'sa',
password: this.password(),
options: {
// local dev / self-signed certs
trustServerCertificate: true,
}
});
const driver = new MSSqlDriver({
server: 'localhost',
port,
user: 'sa',
password: this.password(),
readOnly: false,
options: {
trustServerCertificate: true,
encrypt: false,
},
pool: {
max: 4,
min: 0,
idleTimeoutMillis: 30000,
acquireTimeoutMillis: 20000,
},
});

await pool.connect();
await driver.testConnection();

try {
const tx = new sql.Transaction(pool);
await tx.begin();
try {
await this.prepareFixture(tx, fixture);
const result = await queries.map(query => async () => {
const request = new sql.Request(tx);
(query[1] || []).forEach((v, i) => request.input(`_${i + 1}`, v));
return (await request.query(query[0])).recordset;
}).reduce((a, b) => a.then(b), Promise.resolve());
await tx.commit();
return result;
} catch (e) {
// console.log(e.stack);
await tx.rollback();
throw e;
}
} finally {
await pool.close();
return {
testQueries: async (queries, _fixture) => {
await this.prepareFixture(driver);
let result;
for (const query of queries) {
result = await driver.query(query[0], query[1] || []);
}
}
return result;
},
close: async () => {
await driver.release();
},
};
}

async prepareFixture(tx) {
const query = async (q) => {
const request = new sql.Request(tx);
await request.query(q);
};
await query('CREATE TABLE ##visitors (id INT, amount INT, created_at datetime, updated_at datetime, status INT, source VARCHAR(MAX), latitude DECIMAL, longitude DECIMAL)');
await query('CREATE TABLE ##visitor_checkins (id INT, visitor_id INT, created_at datetime, source VARCHAR(MAX))');
await query('CREATE TABLE ##cards (id INT, visitor_id INT, visitor_checkin_id INT)');
await query(`
/**
* @param {MSSqlDriver} driver
*/
async prepareFixture(driver) {
await driver.query('DROP TABLE IF EXISTS ##visitors', []);
await driver.query('DROP TABLE IF EXISTS ##visitor_checkins', []);
await driver.query('DROP TABLE IF EXISTS ##cards', []);
await driver.query('DROP TABLE IF EXISTS ##numbers', []);
await driver.query('CREATE TABLE ##visitors (id INT, amount INT, created_at datetime, updated_at datetime, status INT, source VARCHAR(MAX), latitude DECIMAL, longitude DECIMAL)', []);
await driver.query('CREATE TABLE ##visitor_checkins (id INT, visitor_id INT, created_at datetime, source VARCHAR(MAX))', []);
await driver.query('CREATE TABLE ##cards (id INT, visitor_id INT, visitor_checkin_id INT)', []);
await driver.query(`
INSERT INTO
##visitors
(id, amount, created_at, updated_at, status, source, latitude, longitude) VALUES
Expand All @@ -64,8 +62,8 @@ export class MSSqlDbRunner extends BaseDbRunner {
(4, 400, '2017-01-07', '2017-01-25', 2, NULL, 120.120, 10.60),
(5, 500, '2017-01-07', '2017-01-25', 2, NULL, 120.120, 58.10),
(6, 500, '2016-09-07', '2016-09-07', 2, NULL, 120.120, 58.10)
`);
await query(`
`, []);
await driver.query(`
INSERT INTO
##visitor_checkins
(id, visitor_id, created_at, source) VALUES
Expand All @@ -75,24 +73,24 @@ export class MSSqlDbRunner extends BaseDbRunner {
(4, 2, '2017-01-05', NULL),
(5, 2, '2017-01-05', NULL),
(6, 3, '2017-01-06', NULL)
`);
await query(`
`, []);
await driver.query(`
INSERT INTO
##cards
(id, visitor_id, visitor_checkin_id) VALUES
(1, 1, 1),
(2, 1, 2),
(3, 3, 6)
`);
await query('CREATE TABLE ##numbers (num INT);');
await query(`
`, []);
await driver.query('CREATE TABLE ##numbers (num INT);', []);
await driver.query(`
INSERT INTO ##numbers (num) VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9),
(10), (11), (12), (13), (14), (15), (16), (17), (18), (19),
(20), (21), (22), (23), (24), (25), (26), (27), (28), (29),
(30), (31), (32), (33), (34), (35), (36), (37), (38), (39),
(40), (41), (42), (43), (44), (45), (46), (47), (48), (49),
(50), (51), (52), (53), (54), (55), (56), (57), (58), (59);
`);
`, []);
}

password() {
Expand Down
Loading
Loading