From d4d34d22774c0e222abe8e6c694a90e6629ad835 Mon Sep 17 00:00:00 2001 From: bingmokaka <103943264+bingmokaka@users.noreply.github.com> Date: Sun, 28 Jun 2026 18:52:40 +0800 Subject: [PATCH] Wire risk alert email for analyze jobs --- backend/src/queue/document.processor.ts | 41 ++++++++++++++++++++++++- backend/src/queue/queue.module.ts | 4 +++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/backend/src/queue/document.processor.ts b/backend/src/queue/document.processor.ts index 50269e3..cdd21d4 100644 --- a/backend/src/queue/document.processor.ts +++ b/backend/src/queue/document.processor.ts @@ -7,6 +7,8 @@ import { VerificationService } from '../verification/verification.service'; import { VerificationStatus } from '../verification/entities/verification-record.entity'; import { RiskAssessmentService } from '../risk-assessment/risk-assessment.service'; import { StellarService } from '../stellar/stellar.service'; +import { MailService } from '../mail/mail.service'; +import { UsersService } from '../users/users.service'; import { QueueService } from './queue.service'; @Injectable() @@ -20,13 +22,15 @@ export class DocumentProcessor implements OnModuleDestroy { private readonly documentsService: DocumentsService, private readonly stellarService: StellarService, private readonly verificationService: VerificationService, + private readonly mailService: MailService, + private readonly usersService: UsersService, ) { const connection = this.queueService.getConnectionOptions(); this.worker = new Worker( this.queueService.queueName, async (job) => { if (job.name === 'analyze') { - await this.riskService.assessDocument(job.data.documentId); + await this.handleAnalyze(job.data.documentId); return; } if (job.name === 'anchor') { @@ -41,6 +45,41 @@ export class DocumentProcessor implements OnModuleDestroy { }); } + private async handleAnalyze(documentId: string) { + const riskResult = await this.riskService.assessDocument(documentId); + if (riskResult.flags.length === 0) { + return; + } + + const document = await this.documentsService.findById(documentId); + if (!document) { + this.logger.warn(`Document ${documentId} not found for risk alert`); + return; + } + + const user = await this.usersService.findById(document.ownerId); + if (!user?.email) { + this.logger.warn(`Owner ${document.ownerId} not found for risk alert`); + return; + } + + try { + await this.mailService.sendRiskAlert( + user.email, + document.title, + riskResult.flags.map(String), + ); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + const stack = error instanceof Error ? error.stack : undefined; + this.logger.error( + `Failed to send risk alert for document ${documentId}`, + message, + stack, + ); + } + } + private async handleAnchor(documentId: string) { const document = await this.documentsService.findById(documentId); if (!document) { diff --git a/backend/src/queue/queue.module.ts b/backend/src/queue/queue.module.ts index a69b4bb..1b06116 100644 --- a/backend/src/queue/queue.module.ts +++ b/backend/src/queue/queue.module.ts @@ -5,6 +5,8 @@ import { DocumentsModule } from '../documents/documents.module'; import { RiskAssessmentModule } from '../risk-assessment/risk-assessment.module'; import { StellarModule } from '../stellar/stellar.module'; import { VerificationModule } from '../verification/verification.module'; +import { MailModule } from '../mail/mail.module'; +import { UsersModule } from '../users/users.module'; import { DocumentProcessor } from './document.processor'; import { QueueService } from './queue.service'; @@ -15,6 +17,8 @@ import { QueueService } from './queue.service'; RiskAssessmentModule, StellarModule, VerificationModule, + MailModule, + UsersModule, ], providers: [QueueService, DocumentProcessor], exports: [QueueService],