diff --git a/apps/worker/package.json b/apps/worker/package.json index e0ba31b7683..09797a8633a 100644 --- a/apps/worker/package.json +++ b/apps/worker/package.json @@ -43,6 +43,7 @@ "@sentry/tracing": "^7.40.0", "@types/newrelic": "^9.14.6", "svix": "^1.64.1", + "lru-cache": "^11.2.4", "axios": "^1.9.0", "body-parser": "^2.2.0", "class-transformer": "0.5.1", diff --git a/apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts b/apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts index fe2781cf9fa..a51dc5f5b23 100644 --- a/apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts @@ -34,6 +34,7 @@ import { import { setUser } from '@sentry/node'; import { differenceInMilliseconds } from 'date-fns'; import { formatInTimeZone } from 'date-fns-tz'; +import { LRUCache } from 'lru-cache'; import { EXCEPTION_MESSAGE_ON_WEBHOOK_FILTER, PlatformException, shouldHaltOnStepFailure } from '../../../shared/utils'; import { AddJob } from '../add-job'; import { PartialNotificationEntity } from '../add-job/add-job.command'; @@ -48,6 +49,13 @@ import { calculateNextAvailableTime, isWithinSchedule } from './schedule-validat const nr = require('newrelic'); +const workflowCache = new LRUCache({ + max: 1000, + ttl: 1000 * 30, +}); + +const workflowInflightRequests = new Map>(); + export type SelectedWorkflowFields = Pick; /** @@ -152,8 +160,12 @@ export class RunJob { throw new PlatformException(`Notification with id ${job._notificationId} not found`); } - const workflow = - (await this.notificationTemplateRepository.findById(job._templateId, job._environmentId)) ?? undefined; + const workflow = await this.getWorkflow( + job._templateId, + job._environmentId, + job._organizationId, + job.payload?.__source + ); if (isSubscribersScheduleEnabled) { const schedule = await this.getSubscriberSchedule.execute( @@ -375,6 +387,59 @@ export class RunJob { } } + @Instrument() + private async getWorkflow( + templateId: string, + environmentId: string, + organizationId: string, + source?: string + ): Promise { + const cacheKey = `${environmentId}:${templateId}`; + + const isFeatureFlagEnabled = await this.featureFlagsService.getFlag({ + key: FeatureFlagsKeysEnum.IS_LRU_CACHE_ENABLED, + defaultValue: false, + environment: { _id: environmentId }, + organization: { _id: organizationId }, + component: 'worker-workflow', + }); + + const isCacheEnabled = isFeatureFlagEnabled && !source; + + if (isCacheEnabled) { + const cached = workflowCache.get(cacheKey); + if (cached) { + return cached; + } + + const inflightRequest = workflowInflightRequests.get(cacheKey); + if (inflightRequest) { + return inflightRequest; + } + } + + const fetchPromise = this.notificationTemplateRepository + .findById(templateId, environmentId) + .then((workflow) => { + if (workflow && isCacheEnabled) { + workflowCache.set(cacheKey, workflow); + } + + return workflow ?? undefined; + }) + .finally(() => { + if (isCacheEnabled) { + workflowInflightRequests.delete(cacheKey); + } + }); + + if (isCacheEnabled) { + workflowInflightRequests.set(cacheKey, fetchPromise); + } + + return fetchPromise; + } + private isUnsnoozeJob(job: JobEntity) { return job.type === StepTypeEnum.IN_APP && job.delay && job.payload?.unsnooze; } diff --git a/apps/worker/src/app/workflow/usecases/subscriber-job-bound/subscriber-job-bound.usecase.ts b/apps/worker/src/app/workflow/usecases/subscriber-job-bound/subscriber-job-bound.usecase.ts index 199bbaa43e4..6be0fe80a89 100644 --- a/apps/worker/src/app/workflow/usecases/subscriber-job-bound/subscriber-job-bound.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/subscriber-job-bound/subscriber-job-bound.usecase.ts @@ -6,6 +6,7 @@ import { CreateNotificationJobsCommand, CreateOrUpdateSubscriberCommand, CreateOrUpdateSubscriberUseCase, + FeatureFlagsService, GetPreferences, GetPreferencesCommand, Instrument, @@ -19,6 +20,7 @@ import { IntegrationRepository, NotificationTemplateEntity, NotificationTemplate import { buildWorkflowPreferences, ChannelTypeEnum, + FeatureFlagsKeysEnum, InAppProviderIdEnum, ISubscribersDefine, ProvidersIdEnum, @@ -26,11 +28,19 @@ import { SeverityLevelEnum, STEP_TYPE_TO_CHANNEL_TYPE, } from '@novu/shared'; +import { LRUCache } from 'lru-cache'; import { StoreSubscriberJobs, StoreSubscriberJobsCommand } from '../store-subscriber-jobs'; import { SubscriberJobBoundCommand } from './subscriber-job-bound.command'; const LOG_CONTEXT = 'SubscriberJobBoundUseCase'; +const workflowCache = new LRUCache({ + max: 1000, + ttl: 1000 * 30, +}); + +const workflowInflightRequests = new Map>(); + @Injectable() export class SubscriberJobBound { constructor( @@ -42,7 +52,8 @@ export class SubscriberJobBound { private logger: PinoLogger, private analyticsService: AnalyticsService, private traceLogRepository: TraceLogRepository, - private getPreferences: GetPreferences + private getPreferences: GetPreferences, + private featureFlagsService: FeatureFlagsService ) {} @InstrumentUsecase() @@ -74,6 +85,8 @@ export class SubscriberJobBound { : await this.getWorkflow({ _id: templateId, environmentId, + organizationId, + source: command.payload?.__source, }); if (!template) { @@ -268,8 +281,62 @@ export class SubscriberJobBound { return true; } - private async getWorkflow({ _id, environmentId }: { _id: string; environmentId: string }) { - return await this.notificationTemplateRepository.findById(_id, environmentId); + @Instrument() + private async getWorkflow({ + _id, + environmentId, + organizationId, + source, + }: { + _id: string; + environmentId: string; + organizationId: string; + source?: string; + }): Promise { + const cacheKey = `${environmentId}:${_id}`; + + const isFeatureFlagEnabled = await this.featureFlagsService.getFlag({ + key: FeatureFlagsKeysEnum.IS_LRU_CACHE_ENABLED, + defaultValue: false, + environment: { _id: environmentId }, + organization: { _id: organizationId }, + component: 'worker-workflow', + }); + + const isCacheEnabled = isFeatureFlagEnabled && !source; + + if (isCacheEnabled) { + const cached = workflowCache.get(cacheKey); + if (cached) { + return cached; + } + + const inflightRequest = workflowInflightRequests.get(cacheKey); + if (inflightRequest) { + return inflightRequest; + } + } + + const fetchPromise = this.notificationTemplateRepository + .findById(_id, environmentId) + .then((workflow) => { + if (workflow && isCacheEnabled) { + workflowCache.set(cacheKey, workflow); + } + + return workflow; + }) + .finally(() => { + if (isCacheEnabled) { + workflowInflightRequests.delete(cacheKey); + } + }); + + if (isCacheEnabled) { + workflowInflightRequests.set(cacheKey, fetchPromise); + } + + return fetchPromise; } @InstrumentUsecase() diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7d390b82221..58919457ec9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1796,6 +1796,9 @@ importers: lodash: specifier: ^4.17.15 version: 4.17.21 + lru-cache: + specifier: ^11.2.4 + version: 11.2.4 nest-raven: specifier: 10.1.0 version: 10.1.0(@nestjs/common@10.4.18(class-transformer@0.5.1)(class-validator@0.14.1)(reflect-metadata@0.2.2)(rxjs@7.8.1))(@nestjs/core@10.4.18)(@sentry/node@8.33.1)(class-transformer@0.5.1)(class-validator@0.14.1)(graphql@16.9.0)(reflect-metadata@0.2.2)(rxjs@7.8.1) @@ -26761,7 +26764,6 @@ packages: engines: {node: '>=0.6.0', teleport: '>=0.2.0'} deprecated: |- You or someone you depend on is using Q, the JavaScript Promise library that gave JavaScript developers strong feelings about promises. They can almost certainly migrate to the native JavaScript promise now. Thank you literally everyone for joining me in this bet against the odds. Be excellent to each other. - (For a CapTP with native promises, see @endo/eventual-send and @endo/captp) qs@6.10.4: