Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions app/backend/src/notifications/notifications.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { JwtModule } from '@nestjs/jwt';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { BullModule } from '@nestjs/bullmq';
import { FanoutProcessor } from './queues/fanout.processor';
import { NOTIFICATION_FANOUT_QUEUE } from './queues/fanout.types';
import { NotificationsService } from './notifications.service';
import { NotificationsGateway } from './gateway/notifications.gateway';
import {
Expand Down Expand Up @@ -44,6 +47,21 @@ import { User } from '../users/entities/user.entity';
signOptions: { expiresIn: '7d' },
}),
}),
BullModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (configService: ConfigService) => ({
connection: {
host: configService.get<string>('REDIS_HOST') ?? 'localhost',
port: configService.get<number>('REDIS_PORT') ?? 6379,
password: configService.get<string>('REDIS_PASSWORD'),
db: configService.get<number>('REDIS_DB') ?? 0,
maxRetriesPerRequest: null,
enableReadyCheck: false,
},
}),
}),
BullModule.registerQueue({ name: NOTIFICATION_FANOUT_QUEUE }),
],
providers: [
NotificationsService,
Expand All @@ -56,6 +74,7 @@ import { User } from '../users/entities/user.entity';
DeliveryService,
AnalyticsService,
PreferencesService,
FanoutProcessor,
],
exports: [NotificationsService, NotificationsGateway],
controllers: [NotificationsController],
Expand Down
66 changes: 33 additions & 33 deletions app/backend/src/notifications/notifications.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { CACHE_MANAGER } from '@nestjs/cache-manager';
import { Cache } from 'cache-manager';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { NOTIFICATION_FANOUT_QUEUE, FanoutJobData } from './queues/fanout.types';
import { Notification, NotificationType, NotificationCategory, NotificationStatus } from './entities/notification.entity';
import { NotificationPreferences } from './entities/notification-preferences.entity';
import { NotificationTemplate } from './entities/notification-template.entity';
Expand Down Expand Up @@ -57,6 +60,7 @@ export class NotificationsService {
private deliveryService: DeliveryService,
private analyticsService: AnalyticsService,
private preferencesService: PreferencesService,
@InjectQueue(NOTIFICATION_FANOUT_QUEUE) private fanoutQueue: Queue,
) {}

/**
Expand Down Expand Up @@ -292,45 +296,41 @@ export class NotificationsService {
}

/**
* Send notifications to multiple users
* Send notifications to multiple users via topic-based fanout.
*
* Enqueues a single BullMQ job instead of N individual jobs. The
* FanoutProcessor fans out to recipients in batches of 100, using one
* bulk INSERT per batch to eliminate per-row DB lock contention.
*/
async sendBulkNotifications(dto: CreateBulkNotificationDto): Promise<Notification[]> {
async sendBulkNotifications(dto: CreateBulkNotificationDto): Promise<{ queued: number; topicKey: string }> {
try {
const notifications: Notification[] = [];
const topicKey = `fanout:${dto.category}:${Date.now()}`;

for (const userId of dto.userIds) {
try {
// Check rate limit
await this.checkRateLimit(userId);

const notification = await this.createNotification({
userId,
type: dto.types[0] || NotificationType.IN_APP,
category: dto.category,
title: dto.title,
message: dto.message,
templateId: dto.templateId,
data: dto.data,
scheduledFor: dto.scheduledFor,
});

notifications.push(notification);

// Send if not scheduled
if (!dto.scheduledFor) {
await this.sendNotification(notification);
}
} catch (error) {
this.logger.error(`Failed to send to user ${userId}: ${error.message}`);
}
}
const jobData: FanoutJobData = {
topicKey,
category: dto.category,
title: dto.title,
message: dto.message,
type: dto.types?.[0] ?? NotificationType.IN_APP,
templateId: dto.templateId,
data: dto.data as Record<string, unknown> | undefined,
recipientIds: dto.userIds,
scheduledFor: dto.scheduledFor?.toISOString(),
};

await this.fanoutQueue.add('fanout', jobData, {
jobId: topicKey,
delay: dto.scheduledFor ? Math.max(0, dto.scheduledFor.getTime() - Date.now()) : undefined,
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
});

// Log bulk send
await this.analyticsService.logBulkNotificationSent(dto.category, notifications.length);
this.logger.log(`Fanout job enqueued: ${topicKey} for ${dto.userIds.length} recipients`);
await this.analyticsService.logBulkNotificationSent(dto.category, dto.userIds.length);

return notifications;
return { queued: dto.userIds.length, topicKey };
} catch (error) {
this.logger.error(`Failed to send bulk notifications: ${error.message}`);
this.logger.error(`Failed to enqueue bulk notifications: ${error.message}`);
throw error;
}
}
Expand Down
138 changes: 138 additions & 0 deletions app/backend/src/notifications/queues/fanout.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { In, Repository } from 'typeorm';
import { Job } from 'bullmq';
import { Notification, NotificationType, NotificationCategory, NotificationStatus } from '../entities/notification.entity';
import { DeliveryChannel } from '../entities/notification-delivery.entity';
import { NotificationPreferences } from '../entities/notification-preferences.entity';
import { DeliveryService } from '../services/delivery.service';
import { PreferencesService } from '../services/preferences.service';
import { NotificationsGateway } from '../gateway/notifications.gateway';
import { NOTIFICATION_FANOUT_QUEUE, FANOUT_BATCH_SIZE, FanoutJobData } from './fanout.types';

@Processor(NOTIFICATION_FANOUT_QUEUE)
export class FanoutProcessor extends WorkerHost {
private readonly logger = new Logger(FanoutProcessor.name);

constructor(
@InjectRepository(Notification)
private readonly notificationRepo: Repository<Notification>,
private readonly deliveryService: DeliveryService,
private readonly preferencesService: PreferencesService,
private readonly gateway: NotificationsGateway,
) {
super();
}

async process(job: Job<FanoutJobData>): Promise<void> {
const { recipientIds, topicKey } = job.data;
this.logger.log(`Fanout job ${job.id} (${topicKey}): fanning out to ${recipientIds.length} recipients`);

for (let i = 0; i < recipientIds.length; i += FANOUT_BATCH_SIZE) {
const batch = recipientIds.slice(i, i + FANOUT_BATCH_SIZE);
await this.processBatch(batch, job.data);
await job.updateProgress(Math.round(((i + batch.length) / recipientIds.length) * 100));
}

this.logger.log(`Fanout job ${job.id} complete`);
}

private async processBatch(userIds: string[], payload: FanoutJobData): Promise<void> {
// One INSERT for the whole batch — eliminates per-row lock contention
const values = userIds.map(userId => ({
userId,
type: payload.type as NotificationType,
category: payload.category as NotificationCategory,
title: payload.title,
message: payload.message,
templateId: payload.templateId ?? null,
data: payload.data ?? null,
status: NotificationStatus.PENDING,
}));

const insertResult = await this.notificationRepo
.createQueryBuilder()
.insert()
.into(Notification)
.values(values)
.execute();

const ids: string[] = insertResult.identifiers.map((r: { id: string }) => r.id);
const notifications = await this.notificationRepo.findBy({ id: In(ids) });

await Promise.allSettled(notifications.map(n => this.deliver(n)));
}

private async deliver(notification: Notification): Promise<void> {
try {
const preferences = await this.preferencesService.getUserPreferences(notification.userId);

if (!preferences?.notificationsEnabled) {
await this.notificationRepo.update(notification.id, {
status: NotificationStatus.FAILED,
failureReason: 'Notifications disabled',
});
return;
}

const channels = this.resolveChannels(preferences, notification.category);

for (const channel of channels) {
try {
await this.deliveryService.sendViaChannel(notification, channel, preferences);
} catch (err) {
this.logger.error(`Fanout delivery failed [user=${notification.userId}, channel=${channel}]: ${(err as Error).message}`);
}
}

await this.notificationRepo.update(notification.id, { status: NotificationStatus.SENT });

if (channels.includes(DeliveryChannel.IN_APP)) {
this.gateway.notifyUser(notification.userId, {
id: notification.id,
title: notification.title,
message: notification.message,
category: notification.category,
type: notification.type,
createdAt: notification.createdAt,
data: notification.data,
metadata: notification.metadata,
});
}
} catch (err) {
this.logger.error(`Fanout deliver error [user=${notification.userId}]: ${(err as Error).message}`);
await this.notificationRepo.update(notification.id, {
status: NotificationStatus.FAILED,
failureReason: (err as Error).message,
});
}
}

private resolveChannels(preferences: NotificationPreferences, category: NotificationCategory): DeliveryChannel[] {
const categoryKey = this.categoryToKey(category);
const prefs = preferences?.categoryPreferences?.[categoryKey];
const channels: DeliveryChannel[] = [];

if (prefs?.email) channels.push(DeliveryChannel.EMAIL);
if (prefs?.push) channels.push(DeliveryChannel.PUSH);
if (prefs?.inApp) channels.push(DeliveryChannel.IN_APP);
if (prefs?.sms) channels.push(DeliveryChannel.SMS);

return channels.length > 0 ? channels : [DeliveryChannel.IN_APP];
}

private categoryToKey(category: NotificationCategory): string {
const map: Record<NotificationCategory, string> = {
[NotificationCategory.EVENT_REMINDER]: 'eventReminder',
[NotificationCategory.TICKET_SALE]: 'ticketSale',
[NotificationCategory.REVIEW]: 'review',
[NotificationCategory.SYSTEM_ALERT]: 'systemAlert',
[NotificationCategory.MARKETING]: 'marketing',
[NotificationCategory.INVITATION]: 'invitation',
[NotificationCategory.COMMENT]: 'comment',
[NotificationCategory.FOLLOWER]: 'follower',
};
return map[category] ?? 'eventReminder';
}
}
14 changes: 14 additions & 0 deletions app/backend/src/notifications/queues/fanout.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export const NOTIFICATION_FANOUT_QUEUE = 'notification-fanout';
export const FANOUT_BATCH_SIZE = 100;

export interface FanoutJobData {
topicKey: string;
category: string;
title: string;
message: string;
type: string;
templateId?: string;
data?: Record<string, unknown>;
recipientIds: string[];
scheduledFor?: string;
}
Loading