import { Injectable, OnModuleDestroy, OnModuleInit } from "@nestjs/common"; import { Queue, Worker } from "bullmq"; import Redis from "ioredis"; import nodemailer from "nodemailer"; import axios from "axios"; import { PrismaService } from "../../prisma/prisma.service"; type QueueName = "communications" | "qrcode" | "workflows" | "webhooks" | "reports"; @Injectable() export class QueuesService implements OnModuleInit, OnModuleDestroy { private readonly connection: Redis; private readonly queues: Record; private readonly workers: Worker[] = []; private readonly transporters = new Map(); constructor(private readonly prisma: PrismaService) { const url = process.env.REDIS_URL ?? "redis://localhost:6379"; this.connection = new Redis(url, { maxRetriesPerRequest: null }); this.queues = { communications: new Queue("communications", { connection: this.connection }), qrcode: new Queue("qrcode", { connection: this.connection }), workflows: new Queue("workflows", { connection: this.connection }), webhooks: new Queue("webhooks", { connection: this.connection }), reports: new Queue("reports", { connection: this.connection }) }; } async onModuleInit() { if (process.env.PROCESS_QUEUES !== "1") return; this.workers.push( new Worker( "communications", async (job) => { if (job.name === "sendEmail") { const tenantId = String((job.data as any).tenantId ?? ""); const host = (await this.getIntegrationValue(tenantId, "smtp.host")) ?? process.env.SMTP_HOST; const portRaw = (await this.getIntegrationValue(tenantId, "smtp.port")) ?? process.env.SMTP_PORT ?? "587"; const user = (await this.getIntegrationValue(tenantId, "smtp.user")) ?? process.env.SMTP_USER; const pass = (await this.getIntegrationValue(tenantId, "smtp.pass")) ?? process.env.SMTP_PASS; const from = (await this.getIntegrationValue(tenantId, "smtp.from")) ?? process.env.SMTP_FROM ?? user; const port = Number(portRaw); if (!host || !port || !user || !pass || !from) throw new Error("SMTP is not configured"); const transporterKey = `${host}:${port}:${user}`; const transporter = this.transporters.get(transporterKey) ?? nodemailer.createTransport({ host, port, secure: port === 465, auth: { user, pass } }); this.transporters.set(transporterKey, transporter); const to = String((job.data as any).to ?? ""); const subject = String((job.data as any).subject ?? ""); const text = (job.data as any).text ? String((job.data as any).text) : undefined; const html = (job.data as any).html ? String((job.data as any).html) : undefined; const communicationLogId = (job.data as any).communicationLogId ? String((job.data as any).communicationLogId) : undefined; if (!to || !subject) throw new Error("Invalid email payload"); try { const sent = await transporter.sendMail({ from, to, subject, text, html }); const inviteeId = (job.data as any).inviteeId ? String((job.data as any).inviteeId) : undefined; if (inviteeId) { await this.prisma.invitee.updateMany({ where: { id: inviteeId }, data: { status: "delivered" } }); } await this.markCommunicationLog(communicationLogId, "sent", sent?.messageId ? String(sent.messageId) : undefined); return { ok: true }; } catch (e: any) { await this.markCommunicationLog(communicationLogId, "failed", undefined, e?.message ? String(e.message) : "Delivery failed"); throw e; } } if (job.name === "sendSms" || job.name === "sendWhatsapp") { const tenantId = String((job.data as any).tenantId ?? ""); const username = (await this.getIntegrationValue(tenantId, "africastalking.username")) ?? process.env.AFRICASTALKING_USERNAME ?? process.env.AT_USERNAME; const apiKey = (await this.getIntegrationValue(tenantId, "africastalking.apiKey")) ?? process.env.AFRICASTALKING_API_KEY ?? process.env.AT_API_KEY; if (!username || !apiKey) throw new Error("Africa's Talking is not configured"); const to = String((job.data as any).to ?? ""); const message = String((job.data as any).message ?? ""); const communicationLogId = (job.data as any).communicationLogId ? String((job.data as any).communicationLogId) : undefined; if (!to || !message) throw new Error("Invalid message payload"); try { let providerMessageId: string | undefined; if (job.name === "sendSms") { const from = (job.data as any).from ? String((job.data as any).from) : (await this.getIntegrationValue(tenantId, "africastalking.senderId")) ?? process.env.AFRICASTALKING_SENDER_ID ?? process.env.AT_SMS_FROM; const body = new URLSearchParams(); body.set("username", username); body.set("to", to); body.set("message", message); if (from) body.set("from", from); const res = await axios.post("https://api.africastalking.com/version1/messaging", body.toString(), { headers: { apikey: apiKey, "Content-Type": "application/x-www-form-urlencoded" } }); providerMessageId = String((res.data as any)?.SMSMessageData?.Recipients?.[0]?.messageId ?? ""); } else { const url = (await this.getIntegrationValue(tenantId, "africastalking.whatsappUrl")) ?? process.env.AFRICASTALKING_WHATSAPP_URL ?? process.env.AT_WHATSAPP_URL; if (!url) throw new Error("AT_WHATSAPP_URL is not configured"); const res = await axios.post( url, { username, to, message }, { headers: { apikey: apiKey, "Content-Type": "application/json" } } ); providerMessageId = String((res.data as any)?.id ?? ""); } const inviteeId = (job.data as any).inviteeId ? String((job.data as any).inviteeId) : undefined; if (inviteeId) { await this.prisma.invitee.updateMany({ where: { id: inviteeId }, data: { status: "delivered" } }); } await this.markCommunicationLog(communicationLogId, "sent", providerMessageId || undefined); return { ok: true }; } catch (e: any) { await this.markCommunicationLog(communicationLogId, "failed", undefined, e?.message ? String(e.message) : "Delivery failed"); throw e; } } return { ok: true }; }, { connection: this.connection } ) ); this.workers.push( new Worker( "qrcode", async () => { return { ok: true }; }, { connection: this.connection } ) ); this.workers.push( new Worker( "workflows", async () => { return { ok: true }; }, { connection: this.connection } ) ); this.workers.push( new Worker( "webhooks", async (job) => { if (job.name === "paystack") { const payload = (job.data as any)?.body; const event = String(payload?.event ?? ""); const reference = String(payload?.data?.reference ?? ""); const tenantSlug = (job.data as any)?.tenantSlug ? String((job.data as any).tenantSlug) : undefined; const tenant = tenantSlug ? await this.prisma.tenant.findUnique({ where: { slug: tenantSlug } }) : null; const webhook = await this.prisma.paystackWebhookEvent.create({ data: { tenantId: tenant?.id ?? null, event, reference: reference || null, raw: payload ?? {}, status: "received" } }); if (reference) { const nextStatus = event === "charge.success" ? "success" : event === "charge.failed" ? "failed" : undefined; if (nextStatus) { await this.prisma.paymentTransaction.updateMany({ where: { reference, ...(tenant?.id ? { tenantId: tenant.id } : {}) }, data: { status: nextStatus, raw: payload } }); } } await this.prisma.paystackWebhookEvent.update({ where: { id: webhook.id }, data: { status: "processed", processedAt: new Date() } }); return { ok: true }; } return { ok: true }; }, { connection: this.connection } ) ); this.workers.push( new Worker( "reports", async (job) => { if (job.name === "registrationsCsv") { const tenantId = String((job.data as any).tenantId ?? ""); const eventId = (job.data as any).eventId ? String((job.data as any).eventId) : undefined; if (!tenantId) throw new Error("Missing tenantId"); const rows = await this.prisma.registration.findMany({ where: { tenantId, ...(eventId ? { eventId } : {}) }, orderBy: { createdAt: "desc" }, include: { attendee: true, event: true } }); const header = ["code", "status", "createdAt", "attendeeFullName", "attendeeEmail", "eventName"].join(","); const lines = rows.map((r) => { const fields = [ r.code, r.status, r.createdAt.toISOString(), r.attendee.fullName, r.attendee.email, r.event.name ].map((v) => `"${String(v).replaceAll('"', '""')}"`); return fields.join(","); }); return { csv: [header, ...lines].join("\n") }; } return { ok: true }; }, { connection: this.connection } ) ); } async onModuleDestroy() { await Promise.allSettled(this.workers.map((w) => w.close())); await Promise.allSettled(Object.values(this.queues).map((q) => q.close())); await this.connection.quit(); } queue(name: QueueName) { return this.queues[name]; } private async getIntegrationValue(tenantId: string, key: string) { if (!tenantId) return null; const row = await this.prisma.integrationSetting.findUnique({ where: { tenantId_key: { tenantId, key } } }); return row?.value ?? null; } private async markCommunicationLog(id: string | undefined, status: "sent" | "failed", providerMessageId?: string, error?: string) { if (!id) return; await this.prisma.communicationLog.updateMany({ where: { id }, data: { status, providerMessageId, error } }); } }