diff --git a/app/backend/src/notifications/notifications.module.ts b/app/backend/src/notifications/notifications.module.ts index 2d954546..acdbb4c2 100644 --- a/app/backend/src/notifications/notifications.module.ts +++ b/app/backend/src/notifications/notifications.module.ts @@ -2,6 +2,9 @@ import { Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { JwtModule } from '@nestjs/jwt'; import { ConfigModule, ConfigService } from '@nestjs/config'; +import { BullModule } from '@nestjs/bullmq'; +import { FanoutProcessor } from './queues/fanout.processor'; +import { NOTIFICATION_FANOUT_QUEUE } from './queues/fanout.types'; import { NotificationsService } from './notifications.service'; import { NotificationsGateway } from './gateway/notifications.gateway'; import { @@ -44,6 +47,21 @@ import { User } from '../users/entities/user.entity'; signOptions: { expiresIn: '7d' }, }), }), + BullModule.forRootAsync({ + imports: [ConfigModule], + inject: [ConfigService], + useFactory: (configService: ConfigService) => ({ + connection: { + host: configService.get('REDIS_HOST') ?? 'localhost', + port: configService.get('REDIS_PORT') ?? 6379, + password: configService.get('REDIS_PASSWORD'), + db: configService.get('REDIS_DB') ?? 0, + maxRetriesPerRequest: null, + enableReadyCheck: false, + }, + }), + }), + BullModule.registerQueue({ name: NOTIFICATION_FANOUT_QUEUE }), ], providers: [ NotificationsService, @@ -56,6 +74,7 @@ import { User } from '../users/entities/user.entity'; DeliveryService, AnalyticsService, PreferencesService, + FanoutProcessor, ], exports: [NotificationsService, NotificationsGateway], controllers: [NotificationsController], diff --git a/app/backend/src/notifications/notifications.service.ts b/app/backend/src/notifications/notifications.service.ts index 25986e95..a679aa45 100644 --- a/app/backend/src/notifications/notifications.service.ts +++ b/app/backend/src/notifications/notifications.service.ts @@ -3,6 +3,9 @@ import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { CACHE_MANAGER } from '@nestjs/cache-manager'; import { Cache } from 'cache-manager'; +import { InjectQueue } from '@nestjs/bullmq'; +import { Queue } from 'bullmq'; +import { NOTIFICATION_FANOUT_QUEUE, FanoutJobData } from './queues/fanout.types'; import { Notification, NotificationType, NotificationCategory, NotificationStatus } from './entities/notification.entity'; import { NotificationPreferences } from './entities/notification-preferences.entity'; import { NotificationTemplate } from './entities/notification-template.entity'; @@ -57,6 +60,7 @@ export class NotificationsService { private deliveryService: DeliveryService, private analyticsService: AnalyticsService, private preferencesService: PreferencesService, + @InjectQueue(NOTIFICATION_FANOUT_QUEUE) private fanoutQueue: Queue, ) {} /** @@ -292,45 +296,41 @@ export class NotificationsService { } /** - * Send notifications to multiple users + * Send notifications to multiple users via topic-based fanout. + * + * Enqueues a single BullMQ job instead of N individual jobs. The + * FanoutProcessor fans out to recipients in batches of 100, using one + * bulk INSERT per batch to eliminate per-row DB lock contention. */ - async sendBulkNotifications(dto: CreateBulkNotificationDto): Promise { + async sendBulkNotifications(dto: CreateBulkNotificationDto): Promise<{ queued: number; topicKey: string }> { try { - const notifications: Notification[] = []; + const topicKey = `fanout:${dto.category}:${Date.now()}`; - for (const userId of dto.userIds) { - try { - // Check rate limit - await this.checkRateLimit(userId); - - const notification = await this.createNotification({ - userId, - type: dto.types[0] || NotificationType.IN_APP, - category: dto.category, - title: dto.title, - message: dto.message, - templateId: dto.templateId, - data: dto.data, - scheduledFor: dto.scheduledFor, - }); - - notifications.push(notification); - - // Send if not scheduled - if (!dto.scheduledFor) { - await this.sendNotification(notification); - } - } catch (error) { - this.logger.error(`Failed to send to user ${userId}: ${error.message}`); - } - } + const jobData: FanoutJobData = { + topicKey, + category: dto.category, + title: dto.title, + message: dto.message, + type: dto.types?.[0] ?? NotificationType.IN_APP, + templateId: dto.templateId, + data: dto.data as Record | undefined, + recipientIds: dto.userIds, + scheduledFor: dto.scheduledFor?.toISOString(), + }; + + await this.fanoutQueue.add('fanout', jobData, { + jobId: topicKey, + delay: dto.scheduledFor ? Math.max(0, dto.scheduledFor.getTime() - Date.now()) : undefined, + attempts: 3, + backoff: { type: 'exponential', delay: 2000 }, + }); - // Log bulk send - await this.analyticsService.logBulkNotificationSent(dto.category, notifications.length); + this.logger.log(`Fanout job enqueued: ${topicKey} for ${dto.userIds.length} recipients`); + await this.analyticsService.logBulkNotificationSent(dto.category, dto.userIds.length); - return notifications; + return { queued: dto.userIds.length, topicKey }; } catch (error) { - this.logger.error(`Failed to send bulk notifications: ${error.message}`); + this.logger.error(`Failed to enqueue bulk notifications: ${error.message}`); throw error; } } diff --git a/app/backend/src/notifications/queues/fanout.processor.ts b/app/backend/src/notifications/queues/fanout.processor.ts new file mode 100644 index 00000000..38433af8 --- /dev/null +++ b/app/backend/src/notifications/queues/fanout.processor.ts @@ -0,0 +1,138 @@ +import { Processor, WorkerHost } from '@nestjs/bullmq'; +import { Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { In, Repository } from 'typeorm'; +import { Job } from 'bullmq'; +import { Notification, NotificationType, NotificationCategory, NotificationStatus } from '../entities/notification.entity'; +import { DeliveryChannel } from '../entities/notification-delivery.entity'; +import { NotificationPreferences } from '../entities/notification-preferences.entity'; +import { DeliveryService } from '../services/delivery.service'; +import { PreferencesService } from '../services/preferences.service'; +import { NotificationsGateway } from '../gateway/notifications.gateway'; +import { NOTIFICATION_FANOUT_QUEUE, FANOUT_BATCH_SIZE, FanoutJobData } from './fanout.types'; + +@Processor(NOTIFICATION_FANOUT_QUEUE) +export class FanoutProcessor extends WorkerHost { + private readonly logger = new Logger(FanoutProcessor.name); + + constructor( + @InjectRepository(Notification) + private readonly notificationRepo: Repository, + private readonly deliveryService: DeliveryService, + private readonly preferencesService: PreferencesService, + private readonly gateway: NotificationsGateway, + ) { + super(); + } + + async process(job: Job): Promise { + const { recipientIds, topicKey } = job.data; + this.logger.log(`Fanout job ${job.id} (${topicKey}): fanning out to ${recipientIds.length} recipients`); + + for (let i = 0; i < recipientIds.length; i += FANOUT_BATCH_SIZE) { + const batch = recipientIds.slice(i, i + FANOUT_BATCH_SIZE); + await this.processBatch(batch, job.data); + await job.updateProgress(Math.round(((i + batch.length) / recipientIds.length) * 100)); + } + + this.logger.log(`Fanout job ${job.id} complete`); + } + + private async processBatch(userIds: string[], payload: FanoutJobData): Promise { + // One INSERT for the whole batch — eliminates per-row lock contention + const values = userIds.map(userId => ({ + userId, + type: payload.type as NotificationType, + category: payload.category as NotificationCategory, + title: payload.title, + message: payload.message, + templateId: payload.templateId ?? null, + data: payload.data ?? null, + status: NotificationStatus.PENDING, + })); + + const insertResult = await this.notificationRepo + .createQueryBuilder() + .insert() + .into(Notification) + .values(values) + .execute(); + + const ids: string[] = insertResult.identifiers.map((r: { id: string }) => r.id); + const notifications = await this.notificationRepo.findBy({ id: In(ids) }); + + await Promise.allSettled(notifications.map(n => this.deliver(n))); + } + + private async deliver(notification: Notification): Promise { + try { + const preferences = await this.preferencesService.getUserPreferences(notification.userId); + + if (!preferences?.notificationsEnabled) { + await this.notificationRepo.update(notification.id, { + status: NotificationStatus.FAILED, + failureReason: 'Notifications disabled', + }); + return; + } + + const channels = this.resolveChannels(preferences, notification.category); + + for (const channel of channels) { + try { + await this.deliveryService.sendViaChannel(notification, channel, preferences); + } catch (err) { + this.logger.error(`Fanout delivery failed [user=${notification.userId}, channel=${channel}]: ${(err as Error).message}`); + } + } + + await this.notificationRepo.update(notification.id, { status: NotificationStatus.SENT }); + + if (channels.includes(DeliveryChannel.IN_APP)) { + this.gateway.notifyUser(notification.userId, { + id: notification.id, + title: notification.title, + message: notification.message, + category: notification.category, + type: notification.type, + createdAt: notification.createdAt, + data: notification.data, + metadata: notification.metadata, + }); + } + } catch (err) { + this.logger.error(`Fanout deliver error [user=${notification.userId}]: ${(err as Error).message}`); + await this.notificationRepo.update(notification.id, { + status: NotificationStatus.FAILED, + failureReason: (err as Error).message, + }); + } + } + + private resolveChannels(preferences: NotificationPreferences, category: NotificationCategory): DeliveryChannel[] { + const categoryKey = this.categoryToKey(category); + const prefs = preferences?.categoryPreferences?.[categoryKey]; + const channels: DeliveryChannel[] = []; + + if (prefs?.email) channels.push(DeliveryChannel.EMAIL); + if (prefs?.push) channels.push(DeliveryChannel.PUSH); + if (prefs?.inApp) channels.push(DeliveryChannel.IN_APP); + if (prefs?.sms) channels.push(DeliveryChannel.SMS); + + return channels.length > 0 ? channels : [DeliveryChannel.IN_APP]; + } + + private categoryToKey(category: NotificationCategory): string { + const map: Record = { + [NotificationCategory.EVENT_REMINDER]: 'eventReminder', + [NotificationCategory.TICKET_SALE]: 'ticketSale', + [NotificationCategory.REVIEW]: 'review', + [NotificationCategory.SYSTEM_ALERT]: 'systemAlert', + [NotificationCategory.MARKETING]: 'marketing', + [NotificationCategory.INVITATION]: 'invitation', + [NotificationCategory.COMMENT]: 'comment', + [NotificationCategory.FOLLOWER]: 'follower', + }; + return map[category] ?? 'eventReminder'; + } +} diff --git a/app/backend/src/notifications/queues/fanout.types.ts b/app/backend/src/notifications/queues/fanout.types.ts new file mode 100644 index 00000000..b61116ed --- /dev/null +++ b/app/backend/src/notifications/queues/fanout.types.ts @@ -0,0 +1,14 @@ +export const NOTIFICATION_FANOUT_QUEUE = 'notification-fanout'; +export const FANOUT_BATCH_SIZE = 100; + +export interface FanoutJobData { + topicKey: string; + category: string; + title: string; + message: string; + type: string; + templateId?: string; + data?: Record; + recipientIds: string[]; + scheduledFor?: string; +}