diff --git a/src/commands/entity_store_perf/README.md b/src/commands/entity_store_perf/README.md index b94adb4..bd4bc5b 100644 --- a/src/commands/entity_store_perf/README.md +++ b/src/commands/entity_store_perf/README.md @@ -53,6 +53,7 @@ yarn start upload-perf-data [file] [--index ] [--delete] [options] - `--samplingInterval `: Metrics sampling interval when `--metrics` is enabled (default: `5`) - `--transformTimeout `: Generic transform wait timeout in metrics mode for V1 flow (default: `30`) - `--noTransforms`: Run Entity Store V2 / ESQL flow (enable V2, install V2, no transforms, v2 indices) +- `--bulk-concurrency `: Parallel `_bulk` requests per upload (default: `8`). Values above ~8 often do not increase throughput once the cluster is saturated. When `--metrics` is enabled, log files can be used with `create-baseline`/`compare-metrics` by passing the emitted prefix. In V2 mode (`--noTransforms`), transform stats are skipped. @@ -76,8 +77,11 @@ yarn start upload-perf-data-interval [file] [options] ### Options -- `--interval `: Upload interval (default: `30`) -- `--count `: Number of uploads (default: `10`) +- `--interval `: Pause between completed uploads (default: `30`) +- `--duration `: Wall-clock run limit (e.g. `3h`, `30m`, `45s`); keeps uploading until the deadline, pausing `--interval` seconds between uploads when time allows. **Takes precedence over `--count`** when both are set. +- `--count `: Number of uploads (default: `10`); used only when `--duration` is not set. +- `--ingest-rate `: **Maximum** documents per second per upload (default: unlimited). Throttle pauses between 5k-doc batches; it does not speed up Elasticsearch. Each upload logs achieved docs/sec when it finishes. +- `--bulk-concurrency `: Parallel `_bulk` requests per upload (default: `8`). Tune for experiments; ~13k docs/sec was observed at 8 on a typical test cluster. - `--deleteData`: Delete entities and data stream/index first - `--deleteEngines`: Delete entity engines first - `--transformTimeout `: Generic transform wait timeout (default: `30`) @@ -92,8 +96,14 @@ yarn start upload-perf-data-interval large --deleteData yarn start upload-perf-data-interval large --deleteData --interval 60 --count 100 yarn start upload-perf-data-interval large --deleteData --interval 60 --count 100 --samplingInterval 10 yarn start upload-perf-data-interval large --deleteData --noTransforms +yarn start upload-perf-data-interval large --deleteData --noTransforms \ + --interval 60 --duration 3h --ingest-rate 500 ``` +Note: `--duration 3h --interval 60` does not mean exactly 180 uploads. It means the command keeps ingesting for up to 3 hours with ~60s pauses between uploads (each upload may take longer when `--ingest-rate` is set). + +After uploads complete, the command polls the entity index until the expected entity count is reached (or until `--transformTimeout` minutes elapse, default 30). Entity Store V2 can lag behind log ingest; if polling times out, the command continues with a warning instead of hanging indefinitely. With `--deleteData`, entity counting uses `match_all` on the entity index (accurate for multi-upload interval runs). + ### Output logs Interval uploads write metrics to `./logs`: diff --git a/src/commands/entity_store_perf/entity_store_perf.ts b/src/commands/entity_store_perf/entity_store_perf.ts index 5fa4cd7..2599592 100644 --- a/src/commands/entity_store_perf/entity_store_perf.ts +++ b/src/commands/entity_store_perf/entity_store_perf.ts @@ -2,7 +2,8 @@ import { log } from '../../utils/logger.ts'; import { faker } from '@faker-js/faker'; import fs from 'fs'; import { getEsClient, getFileLineCount } from '../utils/indices.ts'; -import { streamingBulkIngest } from '../shared/elasticsearch.ts'; +import { bulkUpsert } from '../shared/elasticsearch.ts'; +import pMap from 'p-map'; import { createProgressBar } from '../utils/cli_utils.ts'; import { ensureSecurityDefaultDataView } from '../../utils/security_default_data_view.ts'; import readline from 'readline'; @@ -25,10 +26,13 @@ import { type UserFields, } from '../../types/entities.ts'; import { getEntityStorePerfDataDir } from '../../utils/data_paths.ts'; +import { sleep } from '../../utils/sleep.ts'; // Checkpoint stability configuration for transform completion detection // Consider checkpoint stable if it hasn't changed in this duration (10 seconds) const CHECKPOINT_STABLE_TIME_MS = 10000; +const UPLOAD_BATCH_SIZE = 5000; +export const DEFAULT_UPLOAD_BULK_CONCURRENCY = 8; // Consider stable after this many consecutive checks with the same checkpoint const STABLE_CHECKPOINT_THRESHOLD = 3; @@ -477,29 +481,74 @@ const countEntities = async (baseDomainName: string, entityIndex: string = ENTIT return res.count; }; +const countAllEntities = async (entityIndex: string) => { + const esClient = getEsClient(); + const res = await esClient.count({ + index: entityIndex, + query: { match_all: {} }, + }); + return res.count; +}; + +interface CountEntitiesUntilOptions { + /** When true, count every entity doc in the index (use after --deleteData). */ + matchAll?: boolean; + /** Stop polling after this many ms and continue (default: 30 minutes). */ + timeoutMs?: number; +} + const countEntitiesUntil = async ( name: string, count: number, entityIndex: string = ENTITY_INDEX_V1, + options: CountEntitiesUntilOptions = {}, ) => { + const { matchAll = false, timeoutMs = 30 * 60 * 1000 } = options; + const countFn = matchAll + ? () => countAllEntities(entityIndex) + : () => countEntities(path.parse(name).name, entityIndex); + let total = 0; - log.info('Polling for entities...'); + const pollStart = Date.now(); + log.info( + `Polling for entities (expecting ${count}, timeout ${timeoutMs / 1000 / 60} min${matchAll ? ', match_all' : ''})...`, + ); const progress = createProgressBar('entities', { format: 'Progress | {value}/{total} Entities', }); progress.start(count, 0); + let lastLoggedTotal = -1; + let pollIterations = 0; + const entityPollLogEvery = 15; // ~30s at 2s poll interval + while (total < count && !stop) { - total = await countEntities(path.parse(name).name, entityIndex); + if (Date.now() - pollStart >= timeoutMs) { + log.warn( + `Entity count ${total}/${count} not reached within ${timeoutMs / 1000 / 60} minutes. Entity Store may still be processing — continuing.`, + ); + break; + } + + total = await countFn(); progress.update(total); - await new Promise((resolve) => setTimeout(resolve, 2000)); + if (total < count) { + pollIterations++; + if (total !== lastLoggedTotal || pollIterations % entityPollLogEvery === 0) { + log.info(`Entity count ${total}/${count}, polling again in 2s...`); + lastLoggedTotal = total; + } + await new Promise((resolve) => setTimeout(resolve, 2000)); + } } progress.stop(); if (stop) { log.info('Process stopped before reaching the count.'); + } else if (total >= count) { + log.info(`Entity count reached: ${total}/${count}`); } return total; @@ -1022,6 +1071,8 @@ export const uploadFile = async ({ modifyDoc, onComplete, timestampSpreadMs, + ingestRateDocsPerSecond, + bulkConcurrency = DEFAULT_UPLOAD_BULK_CONCURRENCY, }: { filePath: string; index: string; @@ -1029,55 +1080,96 @@ export const uploadFile = async ({ modifyDoc?: (doc: Record) => Record; // eslint-disable-line @typescript-eslint/no-explicit-any onComplete?: () => void; timestampSpreadMs?: number; + ingestRateDocsPerSecond?: number; + bulkConcurrency?: number; }) => { - const stream = fs.createReadStream(filePath); const progress = createProgressBar('upload', { format: '{bar} | {percentage}% | {value}/{total} Documents Uploaded', }); progress.start(lineCount, 0); const rl = readline.createInterface({ - input: stream, + input: fs.createReadStream(filePath), crlfDelay: Infinity, }); - const lineGenerator = async function* () { - for await (const line of rl) { - yield JSON.parse(line); - } - }; - // End of spread window is fixed at the start of the upload so all timestamps // fall in [endTime - spread, endTime] regardless of upload duration. const spreadEndTime = Date.now(); + const uploadStart = Date.now(); + const batchDelayMs = + ingestRateDocsPerSecond !== undefined + ? (UPLOAD_BATCH_SIZE / ingestRateDocsPerSecond) * 1000 + : undefined; + + // Parallel fixed-size batches via pMap (not streamingBulkIngest): supports batch-level + // ingest-rate throttling and ~13k docs/sec vs ~800 with helpers.bulk on our test clusters. + const prepareDoc = (parsed: Record) => { + parsed['@timestamp'] = + timestampSpreadMs !== undefined + ? new Date(spreadEndTime - Math.floor(Math.random() * timestampSpreadMs)).toISOString() + : new Date().toISOString(); + return modifyDoc + ? modifyDoc(parsed as Record) // eslint-disable-line @typescript-eslint/no-explicit-any + : parsed; + }; - await streamingBulkIngest({ - index, - datasource: lineGenerator(), - flushBytes: 1024 * 1024 * 1, - flushInterval: 3000, - onDocument: (doc) => { + const batchGenerator = async function* (): AsyncGenerator { + let batch: unknown[] = []; + for await (const line of rl) { if (stop) { throw new Error('Stopped'); } - const record = doc as Record; - record['@timestamp'] = - timestampSpreadMs !== undefined - ? new Date(spreadEndTime - Math.floor(Math.random() * timestampSpreadMs)).toISOString() - : new Date().toISOString(); - const payload = modifyDoc ? modifyDoc(doc as Record) : doc; // eslint-disable-line @typescript-eslint/no-explicit-any - return [{ create: { _index: index } }, { ...payload }]; - }, - onSuccess: () => { - progress.increment(); - }, - onDrop: (doc) => { - log.error('Failed to index document:', doc); - process.exit(1); + const payload = prepareDoc(JSON.parse(line) as Record); + batch.push({ create: { _index: index } }, payload); + if (batch.length / 2 >= UPLOAD_BATCH_SIZE) { + yield batch; + batch = []; + if (batchDelayMs !== undefined && batchDelayMs > 0) { + await sleep(batchDelayMs); + } + } + } + if (batch.length > 0) { + yield batch; + } + }; + + await pMap( + batchGenerator(), + async (operations) => { + const result = await bulkUpsert({ + documents: operations, + refresh: false, + pipeline: '_none', + }); + if (result.errors) { + log.error('Failed to index batch:', result); + process.exit(1); + } + progress.increment(operations.length / 2); }, - }); + { concurrency: bulkConcurrency }, + ); progress.stop(); + + const uploadMs = Math.max(Date.now() - uploadStart, 1); + const achievedDocsPerSecond = Math.round((lineCount / uploadMs) * 1000); + log.info( + `Upload finished: ${lineCount} docs in ${uploadMs}ms (~${achievedDocsPerSecond} docs/sec achieved)`, + ); + if (ingestRateDocsPerSecond !== undefined) { + log.info( + `Configured --ingest-rate cap: ${ingestRateDocsPerSecond} docs/sec (maximum, not a target)`, + ); + if (achievedDocsPerSecond < ingestRateDocsPerSecond) { + log.info( + 'Achieved rate is below the cap — throughput is limited by bulk indexing / cluster capacity, not the ingest-rate throttle.', + ); + } + } + if (onComplete) { onComplete(); } @@ -1102,6 +1194,7 @@ export const uploadPerfDataFile = async ( transformTimeoutMs: number; }, timestampSpreadMs?: number, + bulkConcurrency: number = DEFAULT_UPLOAD_BULK_CONCURRENCY, ) => { const index = indexOverride || `logs-perftest.${name}-default`; const entityIndex = noTransforms ? ENTITY_INDEX_V2 : ENTITY_INDEX_V1; @@ -1167,7 +1260,7 @@ export const uploadPerfDataFile = async ( } try { - await uploadFile({ filePath, index, lineCount, timestampSpreadMs }); + await uploadFile({ filePath, index, lineCount, timestampSpreadMs, bulkConcurrency }); const ingestTook = Date.now() - startTime; log.info(`Data file ${name} uploaded to index ${index} in ${ingestTook}ms`); @@ -1202,38 +1295,163 @@ export const uploadPerfDataFile = async ( } }; +type IntervalUploadLoopParams = { + intervalMs: number; + uploadCount?: number; + durationMs?: number; + ingestRateDocsPerSecond?: number; + bulkConcurrency: number; + filePath: string; + index: string; + lineCount: number; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + addIdPrefix?: (prefix: string) => (doc: Record) => Record; +}; + +const runIntervalUploadLoop = async ({ + intervalMs, + uploadCount, + durationMs, + ingestRateDocsPerSecond, + bulkConcurrency, + filePath, + index, + lineCount, + addIdPrefix, +}: IntervalUploadLoopParams): Promise => { + const uploadFileParams = { + filePath, + index, + lineCount, + ingestRateDocsPerSecond, + bulkConcurrency, + }; + + if (durationMs !== undefined) { + const deadline = Date.now() + durationMs; + let uploadIndex = 0; + + while (Date.now() < deadline && !stop) { + log.info( + `Uploading ${uploadIndex + 1} (duration mode, deadline ${new Date(deadline).toISOString()})...`, + ); + await uploadFile({ + ...uploadFileParams, + modifyDoc: addIdPrefix ? addIdPrefix(uploadIndex.toString()) : undefined, + }); + uploadIndex++; + + if (Date.now() >= deadline) { + break; + } + + const remaining = deadline - Date.now(); + if (remaining <= 0) { + break; + } + + const waitMs = Math.min(intervalMs, remaining); + log.info(`Waiting ${waitMs / 1000}s until next upload...`); + await sleep(waitMs); + } + + return uploadIndex; + } + + const count = uploadCount ?? 10; + let previousUpload = Promise.resolve(); + let uploadsScheduled = 0; + + for (let i = 0; i < count; i++) { + if (stop) { + break; + } + let uploadCompleted = false; + const onComplete = () => { + uploadCompleted = true; + }; + const intervalS = intervalMs / 1000; + log.info(`Uploading ${i + 1} of ${count}, next upload in ${intervalS}s...`); + previousUpload = previousUpload.then(() => + uploadFile({ + ...uploadFileParams, + onComplete, + modifyDoc: addIdPrefix ? addIdPrefix(i.toString()) : undefined, + }), + ); + uploadsScheduled++; + let progress: ReturnType | null = null; + for (let j = 0; j < intervalS; j++) { + if (stop) { + break; + } + if (uploadCompleted) { + if (!progress) { + progress = createProgressBar('interval', { + format: '{bar} | {value}s | waiting {total}s until next upload', + }); + progress.start(intervalS, j + 1); + } else { + progress.update(j + 1); + } + } + await sleep(1000); + } + progress?.update(intervalS); + progress?.stop(); + } + + await previousUpload; + return uploadsScheduled; +}; + /** * V2 flow (Entity Store V2 / ESQL): enable + install V2, use .entities.v2.latest*, no transforms. */ const runUploadPerfDataIntervalV2 = async ( name: string, intervalMs: number, - uploadCount: number, + uploadCount: number | undefined, deleteEntities?: boolean, _doDeleteEngines?: boolean, - _transformTimeoutMs?: number, + transformTimeoutMs?: number, samplingIntervalMs?: number, indexOverride?: string, + ingestRateDocsPerSecond?: number, + durationMs?: number, + bulkConcurrency: number = DEFAULT_UPLOAD_BULK_CONCURRENCY, + noIdIncrement?: boolean, ) => { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const addIdPrefix = (prefix: string) => (doc: Record) => { - if (doc.host) { - return changeHostName(doc, prefix); - } else if (doc.user) { - return changeUserName(doc, prefix); - } else if (doc.service) { - return changeServiceName(doc, prefix); - } else if (doc.entity && doc.cloud) { - return changeGenericEntityName(doc, prefix); - } - return doc; - }; + const addIdPrefix = noIdIncrement + ? undefined + : (prefix: string) => + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (doc: Record) => { + if (doc.host) { + return changeHostName(doc, prefix); + } else if (doc.user) { + return changeUserName(doc, prefix); + } else if (doc.service) { + return changeServiceName(doc, prefix); + } else if (doc.entity && doc.cloud) { + return changeGenericEntityName(doc, prefix); + } + return doc; + }; + const entityWaitTimeoutMs = transformTimeoutMs ?? 30 * 60 * 1000; const index = indexOverride ?? `logs-perftest.${name}-default`; const filePath = getFilePath(name); + const modeLabel = + durationMs !== undefined + ? `for up to ${durationMs / 1000}s (deadline ${new Date(Date.now() + durationMs).toISOString()})` + : `${uploadCount ?? 10} times`; + const ingestRateLabel = + ingestRateDocsPerSecond !== undefined ? `${ingestRateDocsPerSecond} docs/sec` : 'unlimited'; + log.info( - `Uploading performance data file ${name} every ${intervalMs}ms ${uploadCount} times to index ${index} (Entity Store V2)`, + `Uploading performance data file ${name} every ${intervalMs / 1000}s ${modeLabel} to index ${index} (Entity Store V2, ingest rate: ${ingestRateLabel}, bulk concurrency: ${bulkConcurrency})`, ); if (deleteEntities) { @@ -1267,55 +1485,33 @@ const runUploadPerfDataIntervalV2 = async ( ); const startTime = Date.now(); - let previousUpload = Promise.resolve(); const samplingInterval = samplingIntervalMs ?? 5000; const stopHealthLogging = logClusterHealthEvery(name, samplingInterval); const stopNodeStatsLogging = logNodeStatsEvery(name, samplingInterval); const stopKibanaStatsLogging = logKibanaStatsEvery(name, samplingInterval); - for (let i = 0; i < uploadCount; i++) { - if (stop) break; - let uploadCompleted = false; - const onComplete = () => { - uploadCompleted = true; - }; - const intervalS = intervalMs / 1000; - log.info(`Uploading ${i + 1} of ${uploadCount}, next upload in ${intervalS}s...`); - previousUpload = previousUpload.then(() => - uploadFile({ - onComplete, - filePath, - index, - lineCount, - modifyDoc: addIdPrefix(i.toString()), - }), - ); - let progress: ReturnType | null = null; - for (let j = 0; j < intervalS; j++) { - if (stop) break; - if (uploadCompleted) { - if (!progress) { - progress = createProgressBar('interval', { - format: '{bar} | {value}s | waiting {total}s until next upload', - }); - progress.start(intervalS, j + 1); - } else { - progress.update(j + 1); - } - } - await new Promise((resolve) => setTimeout(resolve, 1000)); - } - progress?.update(intervalS); - progress?.stop(); - } - - await previousUpload; + const uploadsCompleted = await runIntervalUploadLoop({ + intervalMs, + uploadCount, + durationMs, + ingestRateDocsPerSecond, + bulkConcurrency, + filePath, + index, + lineCount, + addIdPrefix, + }); const ingestTook = Date.now() - startTime; - log.info(`Data file ${name} uploaded to index ${index} in ${ingestTook}ms`); + log.info( + `Data file ${name} uploaded to index ${index} in ${ingestTook}ms (${uploadsCompleted} upload(s) completed)`, + ); - await countEntitiesUntil(name, entityCount * uploadCount, ENTITY_INDEX_V2); + await countEntitiesUntil(name, entityCount * uploadsCompleted, ENTITY_INDEX_V2, { + matchAll: deleteEntities, + timeoutMs: entityWaitTimeoutMs, + }); log.info('Skipping transform completion wait (Entity Store V2 / ESQL mode)'); @@ -1332,32 +1528,46 @@ const runUploadPerfDataIntervalV2 = async ( const runUploadPerfDataIntervalV1 = async ( name: string, intervalMs: number, - uploadCount: number, + uploadCount: number | undefined, deleteEntities?: boolean, doDeleteEngines?: boolean, transformTimeoutMs?: number, samplingIntervalMs?: number, indexOverride?: string, + ingestRateDocsPerSecond?: number, + durationMs?: number, + bulkConcurrency: number = DEFAULT_UPLOAD_BULK_CONCURRENCY, + noIdIncrement?: boolean, ) => { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const addIdPrefix = (prefix: string) => (doc: Record) => { - if (doc.host) { - return changeHostName(doc, prefix); - } else if (doc.user) { - return changeUserName(doc, prefix); - } else if (doc.service) { - return changeServiceName(doc, prefix); - } else if (doc.entity && doc.cloud) { - return changeGenericEntityName(doc, prefix); - } - return doc; - }; + const addIdPrefix = noIdIncrement + ? undefined + : (prefix: string) => + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (doc: Record) => { + if (doc.host) { + return changeHostName(doc, prefix); + } else if (doc.user) { + return changeUserName(doc, prefix); + } else if (doc.service) { + return changeServiceName(doc, prefix); + } else if (doc.entity && doc.cloud) { + return changeGenericEntityName(doc, prefix); + } + return doc; + }; const index = indexOverride ?? `logs-perftest.${name}-default`; const filePath = getFilePath(name); + const modeLabel = + durationMs !== undefined + ? `for up to ${durationMs / 1000}s (deadline ${new Date(Date.now() + durationMs).toISOString()})` + : `${uploadCount ?? 10} times`; + const ingestRateLabel = + ingestRateDocsPerSecond !== undefined ? `${ingestRateDocsPerSecond} docs/sec` : 'unlimited'; + log.info( - `Uploading performance data file ${name} every ${intervalMs}ms ${uploadCount} times to index ${index} (Entity Store V1)`, + `Uploading performance data file ${name} every ${intervalMs / 1000}s ${modeLabel} to index ${index} (Entity Store V1, ingest rate: ${ingestRateLabel}, bulk concurrency: ${bulkConcurrency})`, ); if (doDeleteEngines) { @@ -1396,7 +1606,6 @@ const runUploadPerfDataIntervalV1 = async ( ); const startTime = Date.now(); - let previousUpload = Promise.resolve(); const samplingInterval = samplingIntervalMs ?? 5000; const stopHealthLogging = logClusterHealthEvery(name, samplingInterval); @@ -1404,50 +1613,29 @@ const runUploadPerfDataIntervalV1 = async ( const stopNodeStatsLogging = logNodeStatsEvery(name, samplingInterval); const stopKibanaStatsLogging = logKibanaStatsEvery(name, samplingInterval); - for (let i = 0; i < uploadCount; i++) { - if (stop) break; - let uploadCompleted = false; - const onComplete = () => { - uploadCompleted = true; - }; - const intervalS = intervalMs / 1000; - log.info(`Uploading ${i + 1} of ${uploadCount}, next upload in ${intervalS}s...`); - previousUpload = previousUpload.then(() => - uploadFile({ - onComplete, - filePath, - index, - lineCount, - modifyDoc: addIdPrefix(i.toString()), - }), - ); - let progress: ReturnType | null = null; - for (let j = 0; j < intervalS; j++) { - if (stop) break; - if (uploadCompleted) { - if (!progress) { - progress = createProgressBar('interval', { - format: '{bar} | {value}s | waiting {total}s until next upload', - }); - progress.start(intervalS, j + 1); - } else { - progress.update(j + 1); - } - } - await new Promise((resolve) => setTimeout(resolve, 1000)); - } - progress?.update(intervalS); - progress?.stop(); - } - - await previousUpload; + const uploadsCompleted = await runIntervalUploadLoop({ + intervalMs, + uploadCount, + durationMs, + ingestRateDocsPerSecond, + bulkConcurrency, + filePath, + index, + lineCount, + addIdPrefix, + }); const ingestTook = Date.now() - startTime; - log.info(`Data file ${name} uploaded to index ${index} in ${ingestTook}ms`); + log.info( + `Data file ${name} uploaded to index ${index} in ${ingestTook}ms (${uploadsCompleted} upload(s) completed)`, + ); - await countEntitiesUntil(name, entityCount * uploadCount, ENTITY_INDEX_V1); + await countEntitiesUntil(name, entityCount * uploadsCompleted, ENTITY_INDEX_V1, { + matchAll: deleteEntities, + timeoutMs: transformTimeoutMs ?? 30 * 60 * 1000, + }); - const totalDocumentsIngested = lineCount * uploadCount; + const totalDocumentsIngested = lineCount * uploadsCompleted; const timeout = transformTimeoutMs ?? 1800000; log.info( `Waiting for generic transform to process ${totalDocumentsIngested} documents (timeout: ${timeout / 1000 / 60} minutes)...`, @@ -1475,13 +1663,17 @@ const runUploadPerfDataIntervalV1 = async ( export const uploadPerfDataFileInterval = async ( name: string, intervalMs: number, - uploadCount: number, + uploadCount: number | undefined, deleteEntities?: boolean, doDeleteEngines?: boolean, transformTimeoutMs?: number, samplingIntervalMs?: number, noTransforms?: boolean, indexOverride?: string, + ingestRateDocsPerSecond?: number, + durationMs?: number, + bulkConcurrency: number = DEFAULT_UPLOAD_BULK_CONCURRENCY, + noIdIncrement?: boolean, ) => { if (noTransforms) { return runUploadPerfDataIntervalV2( @@ -1493,6 +1685,10 @@ export const uploadPerfDataFileInterval = async ( transformTimeoutMs, samplingIntervalMs, indexOverride, + ingestRateDocsPerSecond, + durationMs, + bulkConcurrency, + noIdIncrement, ); } return runUploadPerfDataIntervalV1( @@ -1504,5 +1700,9 @@ export const uploadPerfDataFileInterval = async ( transformTimeoutMs, samplingIntervalMs, indexOverride, + ingestRateDocsPerSecond, + durationMs, + bulkConcurrency, + noIdIncrement, ); }; diff --git a/src/commands/entity_store_perf/index.ts b/src/commands/entity_store_perf/index.ts index fa4561f..c2d4d8e 100644 --- a/src/commands/entity_store_perf/index.ts +++ b/src/commands/entity_store_perf/index.ts @@ -3,6 +3,7 @@ import { type CommandModule } from '../types.ts'; import { log } from '../../utils/logger.ts'; import { parseDuration, + assertPositiveInt, parseIntBase10, promptForFileSelection, wrapAction, @@ -12,6 +13,7 @@ import { listPerfDataFiles, uploadPerfDataFile, uploadPerfDataFileInterval, + DEFAULT_UPLOAD_BULK_CONCURRENCY, isValidDistributionType, type DistributionType, ENTITY_DISTRIBUTIONS, @@ -136,9 +138,16 @@ export const entityStorePerfCommands: CommandModule = { '--timestamp-spread ', 'Spread document @timestamp values randomly over the given duration ending at now (e.g., 2h, 30m, 1d, 500ms)', ) + .option( + '--bulk-concurrency ', + 'Parallel _bulk requests per upload (default: 8)', + parseIntBase10, + DEFAULT_UPLOAD_BULK_CONCURRENCY, + ) .description('Upload performance data file') .action( wrapAction(async (file, options) => { + assertPositiveInt(options.bulkConcurrency, '--bulk-concurrency'); const timestampSpreadMs = options.timestampSpread !== undefined ? parseDuration(options.timestampSpread as string) @@ -154,6 +163,7 @@ export const entityStorePerfCommands: CommandModule = { transformTimeoutMs: options.transformTimeout * 60 * 1000, }, timestampSpreadMs, + options.bulkConcurrency, ); }), ); @@ -162,9 +172,28 @@ export const entityStorePerfCommands: CommandModule = { .command('upload-perf-data-interval') .argument('[file]', 'File to upload') .option('--interval ', 'interval in s', parseIntBase10, 30) - .option('--count ', 'number of times to upload', parseIntBase10, 10) + .option( + '--count ', + 'number of times to upload (default: 10; ignored when --duration is set)', + parseIntBase10, + ) + .option( + '--duration ', + 'wall-clock run limit (e.g. 3h, 30m, 45s); takes precedence over --count', + ) + .option( + '--ingest-rate ', + 'max documents per second per upload (default: unlimited)', + parseIntBase10, + ) + .option( + '--bulk-concurrency ', + 'Parallel _bulk requests per upload (default: 8)', + parseIntBase10, + DEFAULT_UPLOAD_BULK_CONCURRENCY, + ) .option('--deleteData', 'Delete all entities before uploading') - .option('--deleteEngines', 'Delete all entities before uploading') + .option('--deleteEngines', 'Delete entity engines first (V1 transform flow only)') .option( '--transformTimeout ', 'Timeout in minutes for waiting for generic transform to complete (default: 30)', @@ -182,19 +211,57 @@ export const entityStorePerfCommands: CommandModule = { 'Run Entity Store V2 / ESQL flow (enable V2, install V2, no transforms, v2 indices)', ) .option('--index ', 'Destination index') - .description('Upload performance data file') + .option( + '--no-id-increment', + 'Do not append a per-upload suffix to entity IDs — every upload reuses the same entity IDs, keeping the entity count constant (useful for sustained load tests)', + ) + .description('Upload performance data file repeatedly at intervals') .action( wrapAction(async (file, options) => { + if (options.ingestRate !== undefined) { + assertPositiveInt(options.ingestRate, '--ingest-rate'); + } + + assertPositiveInt(options.bulkConcurrency, '--bulk-concurrency'); + + const durationMs = + options.duration !== undefined ? parseDuration(options.duration as string) : undefined; + + if (durationMs !== undefined && options.count !== undefined) { + log.warn('--count is ignored when --duration is set'); + } + + let uploadCount: number | undefined; + if (durationMs === undefined) { + const count = options.count ?? 10; + assertPositiveInt(count, '--count'); + uploadCount = count; + } + + if (durationMs !== undefined) { + log.info( + `Interval upload mode: duration (${options.duration}), interval ${options.interval}s, ingest rate ${options.ingestRate ?? 'unlimited'}, bulk concurrency ${options.bulkConcurrency}`, + ); + } else { + log.info( + `Interval upload mode: count ${uploadCount}, interval ${options.interval}s, ingest rate ${options.ingestRate ?? 'unlimited'}, bulk concurrency ${options.bulkConcurrency}`, + ); + } + await uploadPerfDataFileInterval( file ?? (await promptForFileSelection(listPerfDataFiles())), options.interval * 1000, - options.count, + uploadCount, options.deleteData, options.deleteEngines, options.transformTimeout * 60 * 1000, options.samplingInterval * 1000, options.noTransforms, options.index, + options.ingestRate, + durationMs, + options.bulkConcurrency, + options.idIncrement === false, ); }), ); diff --git a/src/commands/shared/elasticsearch.ts b/src/commands/shared/elasticsearch.ts index 0b3e422..2b62fb2 100644 --- a/src/commands/shared/elasticsearch.ts +++ b/src/commands/shared/elasticsearch.ts @@ -56,10 +56,15 @@ export const logBulkErrors = (result: BulkResponse, context: string): void => { export async function bulkUpsert(params: { documents: unknown[]; refresh?: boolean; + pipeline?: string; }): Promise { - const { documents, refresh = true } = params; + const { documents, refresh = true, pipeline } = params; const client = getEsClient(); - const result = await client.bulk({ body: documents, refresh }); + const result = await client.bulk({ + body: documents, + refresh, + ...(pipeline !== undefined && { pipeline }), + }); logBulkErrors(result, 'Bulk request reported errors. Some documents may have failed.'); return result; } @@ -118,11 +123,19 @@ export async function bulkIngest(params: BulkIngestParams): Promise { } } +/** Matches @elastic/elasticsearch helpers.bulk defaults (5 MB, 30 s). */ +export const DEFAULT_BULK_FLUSH_BYTES = 5 * 1024 * 1024; +export const DEFAULT_BULK_FLUSH_INTERVAL_MS = 30_000; +export const DEFAULT_BULK_CONCURRENCY = 8; + export interface StreamingBulkIngestParams { index: string; datasource: AsyncIterable; flushBytes?: number; flushInterval?: number; + concurrency?: number; + /** Passed to the Bulk API (e.g. `_none` to skip ingest pipelines). */ + pipeline?: string; onDrop?: (doc: unknown) => void; onDocument?: (doc: object) => BulkOperationTuple; onSuccess?: () => void; @@ -136,8 +149,10 @@ export async function streamingBulkIngest(params: StreamingBulkIngestParams): Pr const { index, datasource, - flushBytes = 1024 * 1024, - flushInterval = 3000, + flushBytes = DEFAULT_BULK_FLUSH_BYTES, + flushInterval = DEFAULT_BULK_FLUSH_INTERVAL_MS, + concurrency, + pipeline, onDrop, onDocument, onSuccess, @@ -160,6 +175,8 @@ export async function streamingBulkIngest(params: StreamingBulkIngestParams): Pr onDocument: (doc: object) => docTransform(doc) as any, flushBytes, flushInterval, + ...(concurrency !== undefined && { concurrency }), + ...(pipeline !== undefined && { pipeline }), onDrop: onDrop ? (d) => onDrop(d.document) : undefined, onSuccess, }); diff --git a/src/commands/utils/cli_utils.ts b/src/commands/utils/cli_utils.ts index fe1b54f..02107a8 100644 --- a/src/commands/utils/cli_utils.ts +++ b/src/commands/utils/cli_utils.ts @@ -6,6 +6,14 @@ export const parseIntBase10 = (input: string) => parseInt(input, 10); export const parseOptionInt = (input: string | undefined, fallback: number): number => input ? parseIntBase10(input) : fallback; +/** Exit if value is not a positive integer (guards against NaN from parseIntBase10). */ +export const assertPositiveInt = (value: number, flagName: string): void => { + if (!Number.isFinite(value) || !Number.isInteger(value) || value < 1) { + log.error(`❌ ${flagName} must be a positive integer`); + process.exit(1); + } +}; + const DURATION_UNIT_MS: Record = { ms: 1, s: 1000,