diff --git a/lib/client.js b/lib/client.js index dfff6fc8c..682237938 100644 --- a/lib/client.js +++ b/lib/client.js @@ -2,7 +2,7 @@ const events = require("events"); const util = require("util"); -const { throwNotSupported } = require("./new-utils.js"); +const { throwNotSupported, isNamedParameters } = require("./new-utils.js"); const utils = require("./utils.js"); const errors = require("./errors.js"); @@ -20,7 +20,7 @@ const { const promiseUtils = require("./promise-utils"); const rust = require("../index"); const ResultSet = require("./types/result-set.js"); -const { encodeParams, convertComplexType } = require("./types/cql-utils.js"); +const { encodeParams } = require("./types/cql-utils.js"); const { PreparedCache } = require("./cache.js"); const Encoder = require("./encoder.js"); const { HostMap } = require("./host.js"); @@ -28,6 +28,7 @@ const { HostMap } = require("./host.js"); // Imports for the purpose of type hints in JS docs. // eslint-disable-next-line no-unused-vars const { QueryOptions } = require("./query-options.js"); +const { PreparedInfo } = require("./prepared.js"); /** * Represents a database client that maintains multiple connections to the cluster nodes, providing methods to @@ -164,19 +165,6 @@ class Client extends events.EventEmitter { return fullOptions; } - /** - * Manually prepare query into prepared statement - * @param {string} query - * @returns {Promise>} - * Returns a tuple of type object (the format expected by the encoder) and prepared statement wrapper - * @package - */ - async prepareQuery(query) { - let expectedTypes = await this.rustClient.prepareStatement(query); - let res = [expectedTypes.map((t) => convertComplexType(t)), query]; - return res; - } - /** * Attempts to connect to one of the [contactPoints]{@link ClientOptions} and discovers the rest the nodes of the * cluster. @@ -293,17 +281,10 @@ class Client extends events.EventEmitter { try { const execOptions = this.createOptions(options); - if (execOptions.isPaged()) { - return promiseUtils.optionalCallback( - this.#rustyPaged(query, params, execOptions).then( - (e) => e[1], - ), - callback, - ); - } - + // The rusty execute will take the page state from options, but we need to indicate whether it's a paged query. + let pageState = execOptions.isPaged() ? null : undefined; return promiseUtils.optionalCallback( - this.rustyExecute(query, params, execOptions), + this.rustyExecute(query, params, execOptions, pageState), callback, ); } catch (err) { @@ -317,26 +298,24 @@ class Client extends events.EventEmitter { } /** - * Wrapper for executing queries by rust driver - * @param {string | list} query + * Wrapper for executing queries by rust driver. + * When called with a pageState argument (including null), executes a single-page query. + * When called without pageState, executes an unpaged query. + * @param {string | PreparedInfo} query * @param {Array} params * @param {ExecutionOptions} execOptions + * @param {rust.PagingStateWrapper|Buffer|null} [pageState] When provided (including null), enables paged execution. + * When unprovided (undefined), executes an unpaged query. * @returns {Promise} * @package */ - async rustyExecute(query, params, execOptions) { - if ( - // !execOptions.isPrepared() && - params && - !Array.isArray(params) - // && !types.protocolVersion.supportsNamedParameters(version) - ) { - throw new Error(`TODO: Implement any support for named parameters`); - // // Only Cassandra 2.1 and above supports named parameters - // throw new errors.ArgumentError( - // "Named parameters for simple statements are not supported, use prepare flag", - // ); - } + async rustyExecute(query, params, execOptions, pageState) { + // Why not just take execOptions.isPaged()? + // When executing through eachRow, this may not be set properly, so we need to account for that case. + // This should probably be re-worked when stabilizing the API. For now I do not want to restrict the API to avoid + // accidentally breaking compatibility... + const paged = pageState !== undefined; + let withNamedParameters = isNamedParameters(params, execOptions); if (!this.connected) { // TODO: Check this logic and decide if it's needed. Probably do it while implementing (better) connection @@ -344,33 +323,70 @@ class Client extends events.EventEmitter { await this.#connect(); } - let rustOptions = execOptions.getRustOptions(); + if (paged) { + if (pageState instanceof Buffer) { + pageState = rust.PagingStateWrapper.fromBuffer(pageState); + } else if (pageState == null) { + // Take the page state option into account only when we don't pass it explicitly (it's done only in eachRow) + if (execOptions.getPageState() instanceof Buffer) { + pageState = rust.PagingStateWrapper.fromBuffer( + execOptions.getPageState(), + ); + } else if (typeof execOptions.pageState === "string") { + pageState = rust.PagingStateWrapper.fromBuffer( + Buffer.from(execOptions.getPageState(), "hex"), + ); + } + } + } + + const rustOptions = execOptions.getRustOptions(); + + /** + * @type {rust.QueryResultWrapper} + */ let result; + /** + * @type {rust.PagingStateWrapper?} + */ + let resultPageState; + /** + * @type {rust.QueryExecutor?} + */ + let executor; if (execOptions.isPrepared()) { + /** + * @type {PreparedInfo} + */ + let prepared = query; // If the statement is already prepared, skip the preparation process // Otherwise call Rust part to prepare a statement if (typeof query === "string") { - query = await this.prepareQuery(query); + prepared = await PreparedInfo.create(query, this.rustClient); + } + if (withNamedParameters) { + params = utils.adaptNamedParamsPrepared(params, prepared); + } + let encoded = encodeParams(prepared.types, params, this.#encoder); + + if (paged) { + let temp = await this.rustClient.executeSinglePageEncoded( + prepared.query, + encoded, + rustOptions, + pageState, + ); + result = temp[1]; + resultPageState = temp[0]; + executor = temp[2]; + } else { + result = await this.rustClient.executePreparedUnpagedEncoded( + prepared.query, + encoded, + rustOptions, + ); } - - /** - * @type {string} - */ - let statement = query[1]; - /** - * @type {Object} - */ - let expectedTypes = query[0]; - - let encoded = encodeParams(expectedTypes, params, this.#encoder); - - // Execute query - result = await this.rustClient.executePreparedUnpagedEncoded( - statement, - encoded, - rustOptions, - ); } else { // We do not accept already prepared statements for unprepared queries if (typeof query !== "string") { @@ -384,14 +400,35 @@ class Client extends events.EventEmitter { this.#encoder, ); - // Execute query - result = await this.rustClient.queryUnpagedEncoded( - query, - encoded, - rustOptions, - ); + if (paged) { + let temp = await this.rustClient.querySinglePageEncoded( + query, + encoded, + rustOptions, + pageState, + ); + result = temp[1]; + resultPageState = temp[0]; + executor = temp[2]; + } else { + result = await this.rustClient.queryUnpagedEncoded( + query, + encoded, + rustOptions, + ); + } } - return new ResultSet(result, this.#encoder); + + let resultSet = new ResultSet(result, this.#encoder, resultPageState); + if (resultPageState) { + resultSet.rawNextPageAsync = async (pageState) => { + return await executor.fetchNextPage( + this.rustClient, + rust.PagingStateWrapper.fromBuffer(pageState), + ); + }; + } + return resultSet; } /** @@ -457,167 +494,49 @@ class Client extends events.EventEmitter { const nextPage = () => { promiseUtils.toCallback( - this.#rustyPaged(query, params, execOptions, pagingState), + this.rustyExecute(query, params, execOptions, pagingState), pageCallback, ); }; /** * @param {Error} err - * @param {Array} result - * Should be [rust.PagingStateResponseWrapper, ResultSet] + * @param {ResultSet} result */ function pageCallback(err, result) { if (err) { return callback(err); } - /** - * Next requests in case paging (auto or explicit) is used - */ - let lastPagingState = result[0]; - let queryResult = result[1]; - - rowLength += queryResult.rowLength; - if (queryResult.rows) { - queryResult.rows.forEach((value, index) => { + rowLength += result.rowLength; + if (result.rows) { + result.rows.forEach((value, index) => { rowCallback(index, value); }); } - if (lastPagingState) { + if (result.innerPageState) { // Use new page state as next request page state - pagingState = lastPagingState; - + pagingState = result.innerPageState; if (execOptions.isAutoPage()) { // Issue next request for the next page return nextPage(); } // Allows for explicit (manual) paging, in case the caller needs it - queryResult.nextPage = nextPage; + result.nextPage = nextPage; } // Finished auto-paging - queryResult.rowLength = rowLength; - callback(null, queryResult); + result.rowLength = rowLength; + callback(null, result); } promiseUtils.toCallback( - this.#rustyPaged(query, params, execOptions, pagingState), + this.rustyExecute(query, params, execOptions, pagingState), pageCallback, ); } - /** - * Execute a single page of query - * @param {string} query - * @param {Array} params - * @param {ExecutionOptions} execOptions - * @param {rust.PagingStateWrapper|Buffer} [pageState] - * @returns {Promise>} should be Promise<[rust.PagingStateResponseWrapper, ResultSet]> - * @private - */ - async #rustyPaged(query, params, execOptions, pageState) { - if ( - !execOptions.isPrepared() && - params && - !Array.isArray(params) - // && !types.protocolVersion.supportsNamedParameters(version) - ) { - throw new Error(`TODO: Implement any support for named parameters`); - // // Only Cassandra 2.1 and above supports named parameters - // throw new errors.ArgumentError( - // "Named parameters for simple statements are not supported, use prepare flag", - // ); - } - - if (!this.connected) { - // TODO: Check this logic and decide if it's needed. Probably do it while implementing (better) connection - // // Micro optimization to avoid an async execution for a simple check - await this.#connect(); - } - - if (pageState instanceof Buffer) { - pageState = rust.PagingStateWrapper.fromBuffer(pageState); - } else if (pageState == undefined) { - // Take the page state option into account only when we don't pass it explicitly (it's done only in eachRow) - if (execOptions.getPageState() instanceof Buffer) { - pageState = rust.PagingStateWrapper.fromBuffer( - execOptions.getPageState(), - ); - } else if (typeof execOptions.pageState === "string") { - pageState = rust.PagingStateWrapper.fromBuffer( - Buffer.from(execOptions.getPageState(), "hex"), - ); - } - } - const rustOptions = execOptions.getRustOptions(); - let result; - if (execOptions.isPrepared()) { - // If the statement is already prepared, skip the preparation process - // Otherwise call Rust part to prepare a statement - if (typeof query === "string") { - query = await this.prepareQuery(query); - } - - /** - * @type {string} - */ - let statement = query[1]; - /** - * @type {Object} - */ - let expectedTypes = query[0]; - - let encoded = encodeParams(expectedTypes, params, this.#encoder); - - // Execute query - result = await this.rustClient.executeSinglePageEncoded( - statement, - encoded, - rustOptions, - pageState, - ); - } else { - // We do not accept already prepared statements for unprepared queries - if (typeof query !== "string") { - throw new Error("Expected to obtain a string query"); - } - // Parse parameters according to provided hints, with type guessing - let encoded = encodeParams( - execOptions.getHints() || [], - params, - this.#encoder, - ); - - // Execute query - result = await this.rustClient.querySinglePageEncoded( - query, - encoded, - rustOptions, - pageState, - ); - } - /** - * @type {rust.QueryExecutor} - */ - let executor = result[2]; - // result[0] - information about page state - // result[1] - object representing result itself - let resultSet = new ResultSet(result[1], this.#encoder, result[0]); - if (result[0]) { - resultSet.rawNextPageAsync = async (pageState) => { - return await executor.fetchNextPage( - this.rustClient, - rust.PagingStateWrapper.fromBuffer(pageState), - ); - }; - } - result[1] = resultSet; - - return result; - } - /** * Executes the query and pushes the rows to the result stream as soon as they received. * @@ -749,11 +668,18 @@ class Client extends events.EventEmitter { if (shouldBePrepared) { let prepared = preparedCache.getElement(statement); if (!prepared) { - prepared = await this.prepareQuery(statement); + prepared = await PreparedInfo.create( + statement, + this.rustClient, + ); preparedCache.storeElement(statement, prepared); } - types = prepared[0]; - statement = prepared[1]; + types = prepared.types; + statement = prepared.query; + + if (params && !Array.isArray(params)) { + params = utils.adaptNamedParamsPrepared(params, prepared); + } } else { types = hints[i] || []; } @@ -850,6 +776,7 @@ class Client extends events.EventEmitter { * @returns {void} */ } + /** * Callback used by execution methods. * @callback ResultCallback diff --git a/lib/concurrent/index.js b/lib/concurrent/index.js index 72ee093fb..0590854f5 100644 --- a/lib/concurrent/index.js +++ b/lib/concurrent/index.js @@ -4,6 +4,7 @@ const _Client = require("../client"); const utils = require("../utils"); const { Stream } = require("stream"); const { PreparedCache } = require("../cache"); +const { PreparedInfo } = require("../prepared"); /** * Utilities for concurrent query execution with the DataStax Node.js Driver. @@ -155,7 +156,7 @@ class ArrayBasedExecutor { try { let prepared = this._cache.getElement(query); if (!prepared) { - prepared = await (this._client.prepareQuery(query)); + prepared = await PreparedInfo.create(query, this._client.rustClient); this._cache.storeElement(query, prepared); } await this._client diff --git a/lib/new-utils.js b/lib/new-utils.js index dc282d0a5..763d12b39 100644 --- a/lib/new-utils.js +++ b/lib/new-utils.js @@ -137,10 +137,23 @@ function ensure64SignedInteger(number, name) { } } +function isNamedParameters(params, execOptions) { + if (params && !Array.isArray(params)) { + if (!execOptions.isPrepared()) { + throw new customErrors.ArgumentError( + "Named parameters for simple statements are not supported, use prepare flag", + ); + } + return true; + } + return false; +} + exports.throwNotSupported = throwNotSupported; exports.napiErrorHandler = napiErrorHandler; exports.throwNotSupported = throwNotSupported; exports.bigintToLong = bigintToLong; exports.arbitraryValueToBigInt = arbitraryValueToBigInt; +exports.isNamedParameters = isNamedParameters; exports.ensure32SignedInteger = ensure32SignedInteger; exports.ensure64SignedInteger = ensure64SignedInteger; diff --git a/lib/prepared.js b/lib/prepared.js new file mode 100644 index 000000000..542bcaae8 --- /dev/null +++ b/lib/prepared.js @@ -0,0 +1,29 @@ +const _rust = require("../index"); +const { convertComplexType } = require("./types/cql-utils"); + +class PreparedInfo { + /** + * @param {Array} types + * @param {string} query + * @param {Array} colNames + */ + constructor(types, query, colNames) { + this.types = types; + this.query = query; + this.colNames = colNames; + } + + /** + * @param {string} query + * @param {_rust.SessionWrapper} client + * @returns {Promise} + */ + static async create(query, client) { + let expectedTypes = await client.prepareStatement(query); + let types = expectedTypes.map((t) => convertComplexType(t[0])); + let colNames = expectedTypes.map((t) => t[1].toLowerCase()); + return new PreparedInfo(types, query, colNames); + } +} + +module.exports.PreparedInfo = PreparedInfo; diff --git a/lib/types/result-set.js b/lib/types/result-set.js index 2085084e5..2258d8512 100644 --- a/lib/types/result-set.js +++ b/lib/types/result-set.js @@ -41,10 +41,17 @@ class ResultSet { */ #encoder; + /** + * Internal representation of the page state, used for fetching the next page of results. + * @type {rust.PagingStateWrapper?} + * @package + */ + innerPageState; + /** * @param {rust.QueryResultWrapper} result * @param {_Encoder} encoder - * @param {rust.PagingStateResponseWrapper} [pagingState] + * @param {rust.PagingStateWrapper} [pagingState] */ constructor(result, encoder, pagingState) { // Old constructor logic only for purpose of unit tests. @@ -103,6 +110,7 @@ class ResultSet { this.pageState = null; if (pagingState) { + this.innerPageState = pagingState; let rawPageState = pagingState.getRawPageState(); this.pageState = rawPageState.toString("hex"); Object.defineProperty(this, "rawPageState", { diff --git a/lib/utils.js b/lib/utils.js index 123a0e67e..231855f3c 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -288,26 +288,20 @@ function validateFn(fn, name) { * If the params are passed as an associative array (Object), * it adapts the object into an array with the same order as columns * @param {Array|Object} params - * @param {Array} columns + * @param {PreparedInfo} columns * @returns {Array} Returns an array of parameters. * @throws {Error} In case a parameter with a specific name is not defined */ function adaptNamedParamsPrepared(params, columns) { - if (!params || Array.isArray(params) || !columns || columns.length === 0) { - // params is an array or there aren't parameters - return params; - } - const paramsArray = new Array(columns.length); + const paramsArray = new Array(columns.types.length); params = toLowerCaseProperties(params); - const keys = {}; - for (let i = 0; i < columns.length; i++) { - const name = columns[i].name; + for (let i = 0; i < columns.types.length; i++) { + const name = columns.colNames[i]; if (!Object.prototype.hasOwnProperty.call(params, name)) { throw new errors.ArgumentError(`Parameter "${name}" not defined`); } paramsArray[i] = params[name]; - keys[name] = i; } return paramsArray; } diff --git a/src/requests/request.rs b/src/requests/request.rs index 5f3bec706..ea049b4c3 100644 --- a/src/requests/request.rs +++ b/src/requests/request.rs @@ -52,11 +52,16 @@ impl QueryOptionsWrapper { impl PreparedStatementWrapper { /// Get array of expected types for this prepared statement. - pub fn get_expected_types(&self) -> Vec> { + pub fn get_expected_types(&self) -> Vec<(ComplexType<'static>, String)> { self.prepared .get_variable_col_specs() .iter() - .map(|e| ComplexType::new_owned(e.typ().clone())) + .map(|e| { + ( + ComplexType::new_owned(e.typ().clone()), + e.name().to_string(), + ) + }) .collect() } } diff --git a/src/session.rs b/src/session.rs index 190dca995..09ec65178 100644 --- a/src/session.rs +++ b/src/session.rs @@ -164,11 +164,11 @@ impl SessionWrapper { /// Prepares a statement through rust driver for a given session /// Return expected types for the prepared statement - #[napi(ts_return_type = "Promise>")] + #[napi(ts_return_type = "Promise>")] pub async fn prepare_statement( &self, statement: String, - ) -> JsResult>> { + ) -> JsResult, String)>> { with_custom_error_async(async || { let statement: Statement = statement.into(); let w = PreparedStatementWrapper { @@ -177,7 +177,8 @@ impl SessionWrapper { .add_prepared_statement(&statement) // TODO: change for add_prepared_statement_to_owned after it is made public .await?, }; - ConvertedResult::Ok(w.get_expected_types()) + let types = w.get_expected_types(); + ConvertedResult::Ok(types) }) .await } diff --git a/test/integration/supported/client-batch-tests.js b/test/integration/supported/client-batch-tests.js index b43821d62..2807c275c 100644 --- a/test/integration/supported/client-batch-tests.js +++ b/test/integration/supported/client-batch-tests.js @@ -914,9 +914,7 @@ describe("Client @SERVER_API", function () { ); }, ); - // No support for named parameters - // TODO: Fix this test - /* vit("2.0", "should allow named parameters", function (done) { + vit("2.0", "should allow named parameters", function (done) { const client = newInstance(); const id1 = types.Uuid.random(); const id2 = types.Uuid.random(); @@ -928,6 +926,7 @@ describe("Client @SERVER_API", function () { table1, ), params: { + // eslint-disable-next-line camelcase text_SAMPLE: "named params", paramID: id1, time: types.TimeUuid.now(), @@ -990,7 +989,7 @@ describe("Client @SERVER_API", function () { ); }, ); - }); */ + }); vit( "2.0", diff --git a/test/integration/supported/client-execute-prepared-tests.js b/test/integration/supported/client-execute-prepared-tests.js index 165633741..2868fd4b0 100644 --- a/test/integration/supported/client-execute-prepared-tests.js +++ b/test/integration/supported/client-execute-prepared-tests.js @@ -540,10 +540,7 @@ describe("Client @SERVER_API", function () { done, ); }); - - // No support for named parameters - // TODO: fix this test - /* describe("with named parameters", function () { + describe("with named parameters", function () { vit("2.0", "should allow an array of parameters", function (done) { const query = util.format( "SELECT * FROM %s WHERE id1 = :id1", @@ -626,7 +623,7 @@ describe("Client @SERVER_API", function () { ); }, ); - }); */ + }); it("should encode and decode maps using Map polyfills", function (done) { const client = newInstance({