From cdb729f12421be2714372d2e998bab686b9bdded Mon Sep 17 00:00:00 2001 From: Raju Ahmed Date: Thu, 15 May 2025 02:53:02 +0600 Subject: [PATCH 1/2] [FSSDK-11513] limit number of events in the eventStore --- .../batch_event_processor.spec.ts | 159 +++++++++++++++++- lib/event_processor/batch_event_processor.ts | 79 ++++++--- lib/message/log_message.ts | 1 + 3 files changed, 218 insertions(+), 21 deletions(-) diff --git a/lib/event_processor/batch_event_processor.spec.ts b/lib/event_processor/batch_event_processor.spec.ts index aa25d39e7..f89f9b5ef 100644 --- a/lib/event_processor/batch_event_processor.spec.ts +++ b/lib/event_processor/batch_event_processor.spec.ts @@ -16,17 +16,18 @@ import { expect, describe, it, vi, beforeEach, afterEach, MockInstance } from 'vitest'; import { EventWithId, BatchEventProcessor, LOGGER_NAME } from './batch_event_processor'; -import { getMockSyncCache } from '../tests/mock/mock_cache'; +import { getMockAsyncCache, getMockSyncCache } from '../tests/mock/mock_cache'; import { createImpressionEvent } from '../tests/mock/create_event'; import { ProcessableEvent } from './event_processor'; import { buildLogEvent } from './event_builder/log_event'; -import { resolvablePromise } from '../utils/promise/resolvablePromise'; +import { ResolvablePromise, resolvablePromise } from '../utils/promise/resolvablePromise'; import { advanceTimersByTime } from '../tests/testUtils'; import { getMockLogger } from '../tests/mock/mock_logger'; import { getMockRepeater } from '../tests/mock/mock_repeater'; import * as retry from '../utils/executor/backoff_retry_runner'; import { ServiceState, StartupLog } from '../service'; import { LogLevel } from '../logging/logger'; +import { IdGenerator } from '../utils/id_generator'; const getMockDispatcher = () => { return { @@ -366,6 +367,160 @@ describe('BatchEventProcessor', async () => { expect(events).toEqual(eventsInStore); }); + + it('should not store the event in the eventStore but still dispatch if the \ + number of pending events is greater than the limit', async () => { + const eventDispatcher = getMockDispatcher(); + const mockDispatch: MockInstance = eventDispatcher.dispatchEvent; + mockDispatch.mockResolvedValue(resolvablePromise().promise); + + const eventStore = getMockSyncCache(); + + const idGenerator = new IdGenerator(); + + for (let i = 0; i < 505; i++) { + const event = createImpressionEvent(`id-${i}`); + const cacheId = idGenerator.getId(); + await eventStore.set(cacheId, { id: cacheId, event }); + } + + expect(eventStore.size()).toEqual(505); + + const processor = new BatchEventProcessor({ + eventDispatcher, + dispatchRepeater: getMockRepeater(), + batchSize: 1, + eventStore, + }); + + processor.start(); + await processor.onRunning(); + + const events: ProcessableEvent[] = []; + for(let i = 0; i < 2; i++) { + const event = createImpressionEvent(`id-${i}`); + events.push(event); + await processor.process(event) + } + + expect(eventStore.size()).toEqual(505); + expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(507); + expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[0]])); + expect(eventDispatcher.dispatchEvent.mock.calls[506][0]).toEqual(buildLogEvent([events[1]])); + }); + + it('should store events in the eventStore when the number of events in the store\ + becomes lower than the limit', async () => { + const eventDispatcher = getMockDispatcher(); + + const dispatchResponses: ResolvablePromise[] = []; + + const mockDispatch: MockInstance = eventDispatcher.dispatchEvent; + mockDispatch.mockImplementation((arg) => { + const dispatchResponse = resolvablePromise(); + dispatchResponses.push(dispatchResponse); + return dispatchResponse.promise; + }); + + const eventStore = getMockSyncCache(); + + const idGenerator = new IdGenerator(); + + for (let i = 0; i < 502; i++) { + const event = createImpressionEvent(`id-${i}`); + const cacheId = String(i); + await eventStore.set(cacheId, { id: cacheId, event }); + } + + expect(eventStore.size()).toEqual(502); + + const processor = new BatchEventProcessor({ + eventDispatcher, + dispatchRepeater: getMockRepeater(), + batchSize: 1, + eventStore, + }); + + processor.start(); + await processor.onRunning(); + + let events: ProcessableEvent[] = []; + for(let i = 0; i < 2; i++) { + const event = createImpressionEvent(`id-${i + 502}`); + events.push(event); + await processor.process(event) + } + + expect(eventStore.size()).toEqual(502); + expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(504); + + expect(eventDispatcher.dispatchEvent.mock.calls[502][0]).toEqual(buildLogEvent([events[0]])); + expect(eventDispatcher.dispatchEvent.mock.calls[503][0]).toEqual(buildLogEvent([events[1]])); + + // resolve the dispatch for events not saved in the store + dispatchResponses[502].resolve({ statusCode: 200 }); + dispatchResponses[503].resolve({ statusCode: 200 }); + + await exhaustMicrotasks(); + expect(eventStore.size()).toEqual(502); + + // resolve the dispatch for 3 events in store, making the store size 499 which is lower than the limit + dispatchResponses[0].resolve({ statusCode: 200 }); + dispatchResponses[1].resolve({ statusCode: 200 }); + dispatchResponses[2].resolve({ statusCode: 200 }); + + await exhaustMicrotasks(); + expect(eventStore.size()).toEqual(499); + + // process 2 more events + events = []; + for(let i = 0; i < 2; i++) { + const event = createImpressionEvent(`id-${i + 504}`); + events.push(event); + await processor.process(event) + } + + expect(eventStore.size()).toEqual(500); + expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(506); + expect(eventDispatcher.dispatchEvent.mock.calls[504][0]).toEqual(buildLogEvent([events[0]])); + expect(eventDispatcher.dispatchEvent.mock.calls[505][0]).toEqual(buildLogEvent([events[1]])); + }); + + it('should still dispatch events even if the store save fails', async () => { + const eventDispatcher = getMockDispatcher(); + const mockDispatch: MockInstance = eventDispatcher.dispatchEvent; + mockDispatch.mockResolvedValue({}); + + const eventStore = getMockAsyncCache(); + // Simulate failure in saving to store + eventStore.set = vi.fn().mockRejectedValue(new Error('Failed to save')); + + const dispatchRepeater = getMockRepeater(); + + const processor = new BatchEventProcessor({ + eventDispatcher, + dispatchRepeater, + batchSize: 100, + eventStore, + }); + + processor.start(); + await processor.onRunning(); + + const events: ProcessableEvent[] = []; + for(let i = 0; i < 10; i++) { + const event = createImpressionEvent(`id-${i}`); + events.push(event); + await processor.process(event) + } + + expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(0); + + await dispatchRepeater.execute(0); + + expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1); + expect(eventDispatcher.dispatchEvent.mock.calls[0][0]).toEqual(buildLogEvent(events)); + }); }); it('should dispatch events when dispatchRepeater is triggered', async () => { diff --git a/lib/event_processor/batch_event_processor.ts b/lib/event_processor/batch_event_processor.ts index 5fa7c3f2f..0f8f8e25c 100644 --- a/lib/event_processor/batch_event_processor.ts +++ b/lib/event_processor/batch_event_processor.ts @@ -18,10 +18,10 @@ import { EventProcessor, ProcessableEvent } from "./event_processor"; import { getBatchedAsync, getBatchedSync, Store } from "../utils/cache/store"; import { EventDispatcher, EventDispatcherResponse, LogEvent } from "./event_dispatcher/event_dispatcher"; import { buildLogEvent } from "./event_builder/log_event"; -import { BackoffController, ExponentialBackoff, IntervalRepeater, Repeater } from "../utils/repeater/repeater"; +import { BackoffController, ExponentialBackoff, Repeater } from "../utils/repeater/repeater"; import { LoggerFacade } from '../logging/logger'; import { BaseService, ServiceState, StartupLog } from "../service"; -import { Consumer, Fn, Producer } from "../utils/type"; +import { Consumer, Fn, Maybe, Producer } from "../utils/type"; import { RunResult, runWithRetry } from "../utils/executor/backoff_retry_runner"; import { isSuccessStatusCode } from "../utils/http_request_handler/http_util"; import { EventEmitter } from "../utils/event_emitter/event_emitter"; @@ -31,13 +31,16 @@ import { FAILED_TO_DISPATCH_EVENTS, SERVICE_NOT_RUNNING } from "error_message"; import { OptimizelyError } from "../error/optimizly_error"; import { sprintf } from "../utils/fns"; import { SERVICE_STOPPED_BEFORE_RUNNING } from "../service"; +import { EVENT_STORE_FULL } from "../message/log_message"; export const DEFAULT_MIN_BACKOFF = 1000; export const DEFAULT_MAX_BACKOFF = 32000; +export const MAX_EVENTS_IN_STORE = 500; export type EventWithId = { id: string; event: ProcessableEvent; + notStored?: boolean; }; export type RetryConfig = { @@ -59,7 +62,8 @@ export type BatchEventProcessorConfig = { type EventBatch = { request: LogEvent, - ids: string[], + // ids: string[], + events: EventWithId[], } export const LOGGER_NAME = 'BatchEventProcessor'; @@ -70,11 +74,13 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { private eventQueue: EventWithId[] = []; private batchSize: number; private eventStore?: Store; + private eventCountInStore: Maybe = undefined; + private maxEventsInStore: number = MAX_EVENTS_IN_STORE; private dispatchRepeater: Repeater; private failedEventRepeater?: Repeater; private idGenerator: IdGenerator = new IdGenerator(); private runningTask: Map> = new Map(); - private dispatchingEventIds: Set = new Set(); + private dispatchingEvents: Map = new Map(); private eventEmitter: EventEmitter<{ dispatch: LogEvent }> = new EventEmitter(); private retryConfig?: RetryConfig; @@ -84,11 +90,13 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { this.closingEventDispatcher = config.closingEventDispatcher; this.batchSize = config.batchSize; this.eventStore = config.eventStore; + this.retryConfig = config.retryConfig; this.dispatchRepeater = config.dispatchRepeater; this.dispatchRepeater.setTask(() => this.flush()); + this.maxEventsInStore = Math.max(2 * config.batchSize, MAX_EVENTS_IN_STORE); this.failedEventRepeater = config.failedEventRepeater; this.failedEventRepeater?.setTask(() => this.retryFailedEvents()); if (config.logger) { @@ -111,7 +119,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { } const keys = (await this.eventStore.getKeys()).filter( - (k) => !this.dispatchingEventIds.has(k) && !this.eventQueue.find((e) => e.id === k) + (k) => !this.dispatchingEvents.has(k) && !this.eventQueue.find((e) => e.id === k) ); const events = await (this.eventStore.operation === 'sync' ? @@ -138,7 +146,8 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { (currentBatch.length > 0 && !areEventContextsEqual(currentBatch[0].event, event.event))) { batches.push({ request: buildLogEvent(currentBatch.map((e) => e.event)), - ids: currentBatch.map((e) => e.id), + // ids: currentBatch.map((e) => e.id), + events: currentBatch, }); currentBatch = []; } @@ -148,7 +157,8 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { if (currentBatch.length > 0) { batches.push({ request: buildLogEvent(currentBatch.map((e) => e.event)), - ids: currentBatch.map((e) => e.id), + // ids: currentBatch.map((e) => e.id), + events: currentBatch, }); } @@ -163,15 +173,15 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { } const events: ProcessableEvent[] = []; - const ids: string[] = []; + const eventWithIds: EventWithId[] = []; this.eventQueue.forEach((event) => { events.push(event.event); - ids.push(event.id); + eventWithIds.push(event); }); this.eventQueue = []; - return { request: buildLogEvent(events), ids }; + return { request: buildLogEvent(events), events: eventWithIds }; } private async executeDispatch(request: LogEvent, closing = false): Promise { @@ -185,10 +195,11 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { } private dispatchBatch(batch: EventBatch, closing: boolean): void { - const { request, ids } = batch; + const { request, events } = batch; - ids.forEach((id) => { - this.dispatchingEventIds.add(id); + events.forEach((event) => { + // this.dispatchingEventIds.add(id); + this.dispatchingEvents.set(event.id, event); }); const runResult: RunResult = this.retryConfig @@ -205,9 +216,11 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { this.runningTask.set(taskId, runResult); runResult.result.then((res) => { - ids.forEach((id) => { - this.dispatchingEventIds.delete(id); - this.eventStore?.remove(id); + events.forEach((event) => { + this.eventStore?.remove(event.id); + if (!event.notStored && this.eventCountInStore) { + this.eventCountInStore--; + } }); return Promise.resolve(); }).catch((err) => { @@ -216,7 +229,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { this.logger?.error(err); }).finally(() => { this.runningTask.delete(taskId); - ids.forEach((id) => this.dispatchingEventIds.delete(id)); + events.forEach((event) => this.dispatchingEvents.delete(event.id)); }); } @@ -235,12 +248,12 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { return Promise.reject(new OptimizelyError(SERVICE_NOT_RUNNING, 'BatchEventProcessor')); } - const eventWithId = { + const eventWithId: EventWithId = { id: this.idGenerator.getId(), event: event, }; - await this.eventStore?.set(eventWithId.id, eventWithId); + await this.storeEvent(eventWithId); if (this.eventQueue.length > 0 && !areEventContextsEqual(this.eventQueue[0].event, event)) { this.flush(); @@ -253,7 +266,35 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { } else if (!this.dispatchRepeater.isRunning()) { this.dispatchRepeater.start(); } + } + + private async findEventCountInStore(): Promise { + if (this.eventStore && this.eventCountInStore === undefined) { + try { + const keys = await this.eventStore.getKeys(); + this.eventCountInStore = keys.length; + } catch (e) { + this.logger?.error(e); + } + } + } + private async storeEvent(eventWithId: EventWithId): Promise { + await this.findEventCountInStore(); + if (this.eventCountInStore !== undefined && this.eventCountInStore >= this.maxEventsInStore) { + this.logger?.info(EVENT_STORE_FULL, eventWithId.event.uuid); + eventWithId.notStored = true; + return; + } + + await Promise.resolve(this.eventStore?.set(eventWithId.id, eventWithId)).then(() => { + if (this.eventCountInStore !== undefined) { + this.eventCountInStore++; + } + }).catch((e) => { + eventWithId.notStored = true; + this.logger?.error(e); + }); } start(): void { diff --git a/lib/message/log_message.ts b/lib/message/log_message.ts index b4dc35650..c27f5076f 100644 --- a/lib/message/log_message.ts +++ b/lib/message/log_message.ts @@ -60,5 +60,6 @@ export const USER_HAS_NO_FORCED_VARIATION_FOR_EXPERIMENT = 'No experiment %s mapped to user %s in the forced variation map.'; export const INVALID_EXPERIMENT_KEY_INFO = 'Experiment key %s is not in datafile. It is either invalid, paused, or archived.'; +export const EVENT_STORE_FULL = 'Event store is full. Not saving event with id %d.'; export const messages: string[] = []; From 67d0c185c3dcd65274b5658f1585175fd49a6692 Mon Sep 17 00:00:00 2001 From: Raju Ahmed Date: Thu, 15 May 2025 21:39:01 +0600 Subject: [PATCH 2/2] comments --- lib/event_processor/batch_event_processor.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/event_processor/batch_event_processor.ts b/lib/event_processor/batch_event_processor.ts index 0f8f8e25c..bf0ed3f39 100644 --- a/lib/event_processor/batch_event_processor.ts +++ b/lib/event_processor/batch_event_processor.ts @@ -62,7 +62,6 @@ export type BatchEventProcessorConfig = { type EventBatch = { request: LogEvent, - // ids: string[], events: EventWithId[], } @@ -146,7 +145,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { (currentBatch.length > 0 && !areEventContextsEqual(currentBatch[0].event, event.event))) { batches.push({ request: buildLogEvent(currentBatch.map((e) => e.event)), - // ids: currentBatch.map((e) => e.id), events: currentBatch, }); currentBatch = []; @@ -157,7 +155,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { if (currentBatch.length > 0) { batches.push({ request: buildLogEvent(currentBatch.map((e) => e.event)), - // ids: currentBatch.map((e) => e.id), events: currentBatch, }); } @@ -198,7 +195,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor { const { request, events } = batch; events.forEach((event) => { - // this.dispatchingEventIds.add(id); this.dispatchingEvents.set(event.id, event); });