@@ -18,10 +18,10 @@ import { EventProcessor, ProcessableEvent } from "./event_processor";
1818import { getBatchedAsync , getBatchedSync , Store } from "../utils/cache/store" ;
1919import { EventDispatcher , EventDispatcherResponse , LogEvent } from "./event_dispatcher/event_dispatcher" ;
2020import { buildLogEvent } from "./event_builder/log_event" ;
21- import { BackoffController , ExponentialBackoff , IntervalRepeater , Repeater } from "../utils/repeater/repeater" ;
21+ import { BackoffController , ExponentialBackoff , Repeater } from "../utils/repeater/repeater" ;
2222import { LoggerFacade } from '../logging/logger' ;
2323import { BaseService , ServiceState , StartupLog } from "../service" ;
24- import { Consumer , Fn , Producer } from "../utils/type" ;
24+ import { Consumer , Fn , Maybe , Producer } from "../utils/type" ;
2525import { RunResult , runWithRetry } from "../utils/executor/backoff_retry_runner" ;
2626import { isSuccessStatusCode } from "../utils/http_request_handler/http_util" ;
2727import { EventEmitter } from "../utils/event_emitter/event_emitter" ;
@@ -31,13 +31,16 @@ import { FAILED_TO_DISPATCH_EVENTS, SERVICE_NOT_RUNNING } from "error_message";
3131import { OptimizelyError } from "../error/optimizly_error" ;
3232import { sprintf } from "../utils/fns" ;
3333import { SERVICE_STOPPED_BEFORE_RUNNING } from "../service" ;
34+ import { EVENT_STORE_FULL } from "../message/log_message" ;
3435
3536export const DEFAULT_MIN_BACKOFF = 1000 ;
3637export const DEFAULT_MAX_BACKOFF = 32000 ;
38+ export const MAX_EVENTS_IN_STORE = 500 ;
3739
3840export type EventWithId = {
3941 id : string ;
4042 event : ProcessableEvent ;
43+ notStored ?: boolean ;
4144} ;
4245
4346export type RetryConfig = {
@@ -59,7 +62,8 @@ export type BatchEventProcessorConfig = {
5962
6063type EventBatch = {
6164 request : LogEvent ,
62- ids : string [ ] ,
65+ // ids: string[],
66+ events : EventWithId [ ] ,
6367}
6468
6569export const LOGGER_NAME = 'BatchEventProcessor' ;
@@ -70,11 +74,13 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
7074 private eventQueue : EventWithId [ ] = [ ] ;
7175 private batchSize : number ;
7276 private eventStore ?: Store < EventWithId > ;
77+ private eventCountInStore : Maybe < number > = undefined ;
78+ private maxEventsInStore : number = MAX_EVENTS_IN_STORE ;
7379 private dispatchRepeater : Repeater ;
7480 private failedEventRepeater ?: Repeater ;
7581 private idGenerator : IdGenerator = new IdGenerator ( ) ;
7682 private runningTask : Map < string , RunResult < EventDispatcherResponse > > = new Map ( ) ;
77- private dispatchingEventIds : Set < string > = new Set ( ) ;
83+ private dispatchingEvents : Map < string , EventWithId > = new Map ( ) ;
7884 private eventEmitter : EventEmitter < { dispatch : LogEvent } > = new EventEmitter ( ) ;
7985 private retryConfig ?: RetryConfig ;
8086
@@ -84,11 +90,13 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
8490 this . closingEventDispatcher = config . closingEventDispatcher ;
8591 this . batchSize = config . batchSize ;
8692 this . eventStore = config . eventStore ;
93+
8794 this . retryConfig = config . retryConfig ;
8895
8996 this . dispatchRepeater = config . dispatchRepeater ;
9097 this . dispatchRepeater . setTask ( ( ) => this . flush ( ) ) ;
9198
99+ this . maxEventsInStore = Math . max ( 2 * config . batchSize , MAX_EVENTS_IN_STORE ) ;
92100 this . failedEventRepeater = config . failedEventRepeater ;
93101 this . failedEventRepeater ?. setTask ( ( ) => this . retryFailedEvents ( ) ) ;
94102 if ( config . logger ) {
@@ -111,7 +119,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
111119 }
112120
113121 const keys = ( await this . eventStore . getKeys ( ) ) . filter (
114- ( k ) => ! this . dispatchingEventIds . has ( k ) && ! this . eventQueue . find ( ( e ) => e . id === k )
122+ ( k ) => ! this . dispatchingEvents . has ( k ) && ! this . eventQueue . find ( ( e ) => e . id === k )
115123 ) ;
116124
117125 const events = await ( this . eventStore . operation === 'sync' ?
@@ -138,7 +146,8 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
138146 ( currentBatch . length > 0 && ! areEventContextsEqual ( currentBatch [ 0 ] . event , event . event ) ) ) {
139147 batches . push ( {
140148 request : buildLogEvent ( currentBatch . map ( ( e ) => e . event ) ) ,
141- ids : currentBatch . map ( ( e ) => e . id ) ,
149+ // ids: currentBatch.map((e) => e.id),
150+ events : currentBatch ,
142151 } ) ;
143152 currentBatch = [ ] ;
144153 }
@@ -148,7 +157,8 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
148157 if ( currentBatch . length > 0 ) {
149158 batches . push ( {
150159 request : buildLogEvent ( currentBatch . map ( ( e ) => e . event ) ) ,
151- ids : currentBatch . map ( ( e ) => e . id ) ,
160+ // ids: currentBatch.map((e) => e.id),
161+ events : currentBatch ,
152162 } ) ;
153163 }
154164
@@ -163,15 +173,15 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
163173 }
164174
165175 const events : ProcessableEvent [ ] = [ ] ;
166- const ids : string [ ] = [ ] ;
176+ const eventWithIds : EventWithId [ ] = [ ] ;
167177
168178 this . eventQueue . forEach ( ( event ) => {
169179 events . push ( event . event ) ;
170- ids . push ( event . id ) ;
180+ eventWithIds . push ( event ) ;
171181 } ) ;
172182
173183 this . eventQueue = [ ] ;
174- return { request : buildLogEvent ( events ) , ids } ;
184+ return { request : buildLogEvent ( events ) , events : eventWithIds } ;
175185 }
176186
177187 private async executeDispatch ( request : LogEvent , closing = false ) : Promise < EventDispatcherResponse > {
@@ -185,10 +195,11 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
185195 }
186196
187197 private dispatchBatch ( batch : EventBatch , closing : boolean ) : void {
188- const { request, ids } = batch ;
198+ const { request, events } = batch ;
189199
190- ids . forEach ( ( id ) => {
191- this . dispatchingEventIds . add ( id ) ;
200+ events . forEach ( ( event ) => {
201+ // this.dispatchingEventIds.add(id);
202+ this . dispatchingEvents . set ( event . id , event ) ;
192203 } ) ;
193204
194205 const runResult : RunResult < EventDispatcherResponse > = this . retryConfig
@@ -205,9 +216,11 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
205216 this . runningTask . set ( taskId , runResult ) ;
206217
207218 runResult . result . then ( ( res ) => {
208- ids . forEach ( ( id ) => {
209- this . dispatchingEventIds . delete ( id ) ;
210- this . eventStore ?. remove ( id ) ;
219+ events . forEach ( ( event ) => {
220+ this . eventStore ?. remove ( event . id ) ;
221+ if ( ! event . notStored && this . eventCountInStore ) {
222+ this . eventCountInStore -- ;
223+ }
211224 } ) ;
212225 return Promise . resolve ( ) ;
213226 } ) . catch ( ( err ) => {
@@ -216,7 +229,7 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
216229 this . logger ?. error ( err ) ;
217230 } ) . finally ( ( ) => {
218231 this . runningTask . delete ( taskId ) ;
219- ids . forEach ( ( id ) => this . dispatchingEventIds . delete ( id ) ) ;
232+ events . forEach ( ( event ) => this . dispatchingEvents . delete ( event . id ) ) ;
220233 } ) ;
221234 }
222235
@@ -235,12 +248,12 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
235248 return Promise . reject ( new OptimizelyError ( SERVICE_NOT_RUNNING , 'BatchEventProcessor' ) ) ;
236249 }
237250
238- const eventWithId = {
251+ const eventWithId : EventWithId = {
239252 id : this . idGenerator . getId ( ) ,
240253 event : event ,
241254 } ;
242255
243- await this . eventStore ?. set ( eventWithId . id , eventWithId ) ;
256+ await this . storeEvent ( eventWithId ) ;
244257
245258 if ( this . eventQueue . length > 0 && ! areEventContextsEqual ( this . eventQueue [ 0 ] . event , event ) ) {
246259 this . flush ( ) ;
@@ -253,7 +266,35 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
253266 } else if ( ! this . dispatchRepeater . isRunning ( ) ) {
254267 this . dispatchRepeater . start ( ) ;
255268 }
269+ }
270+
271+ private async findEventCountInStore ( ) : Promise < void > {
272+ if ( this . eventStore && this . eventCountInStore === undefined ) {
273+ try {
274+ const keys = await this . eventStore . getKeys ( ) ;
275+ this . eventCountInStore = keys . length ;
276+ } catch ( e ) {
277+ this . logger ?. error ( e ) ;
278+ }
279+ }
280+ }
256281
282+ private async storeEvent ( eventWithId : EventWithId ) : Promise < void > {
283+ await this . findEventCountInStore ( ) ;
284+ if ( this . eventCountInStore !== undefined && this . eventCountInStore >= this . maxEventsInStore ) {
285+ this . logger ?. info ( EVENT_STORE_FULL , eventWithId . event . uuid ) ;
286+ eventWithId . notStored = true ;
287+ return ;
288+ }
289+
290+ await Promise . resolve ( this . eventStore ?. set ( eventWithId . id , eventWithId ) ) . then ( ( ) => {
291+ if ( this . eventCountInStore !== undefined ) {
292+ this . eventCountInStore ++ ;
293+ }
294+ } ) . catch ( ( e ) => {
295+ eventWithId . notStored = true ;
296+ this . logger ?. error ( e ) ;
297+ } ) ;
257298 }
258299
259300 start ( ) : void {
0 commit comments