diff --git a/packages/notifications/src/core/pubsub.ts b/packages/notifications/src/core/pubsub.ts index 959b45c..2f52e1b 100644 --- a/packages/notifications/src/core/pubsub.ts +++ b/packages/notifications/src/core/pubsub.ts @@ -7,6 +7,11 @@ import { } from './types'; import { getQueue } from '../queue-adapters/adapter'; +/* getHandler takes a consumer as an argument and returns a function or null depending on the type of the consumer. +There are two possible cases: + 1. If consumer is an object (i.e. a queue config), getQueue is called to get an adapter and it returns the adapter’s sendMessage method. + 2. If consumer is a function, it is returned directly. + */ const getHandler = async (consumer: Consumer) => { return await match(consumer) .with( @@ -23,6 +28,16 @@ const getHandler = async (consumer: Consumer) => { .otherwise(() => null); }; +/* +pubSub sets up an EventTarget for managing events. +It takes subscribers as an argument, where: +1. subscribers: Record - A map of channel names to one or more subscribers. + +Returns: +1. EventTarget - The event target for managing events. + +pubsub uses getHandler to get the handler for each subscriber. +*/ export const pubSub = async ( subscribers: Record[]>, ): Promise => { diff --git a/packages/notifications/src/core/types.ts b/packages/notifications/src/core/types.ts index ab52ee2..1d8732a 100644 --- a/packages/notifications/src/core/types.ts +++ b/packages/notifications/src/core/types.ts @@ -1,26 +1,60 @@ import type { PostgresType } from 'postgres'; +// Constant EventName export const EventName = 'notification'; + +/* +OptionType is a type alias for a record of string keys to PostgresType values. +QueueType is a type alias for a string literal union of 'bullmq', 'sqs', or 'asb'. +*/ export type OptionType = Record; export type QueueType = 'bullmq' | 'sqs' | 'asb'; + +/* +QueueConfig is a generic interface that takes a type parameter T and has two properties: +1. type: QueueType - A string literal union of 'bullmq', 'sqs', or 'asb'. +2. options: Record - A record of string keys to T values. +*/ export interface QueueConfig { type: QueueType; options: Record; } + +/* +SubscriptionMessage is a generic type that represents a notification message and has four properties: +1. tableName: string - The name of the table that triggered the event (e.g., INSERT, UPDATE, DELETE). +2. action: string - The action that triggered the event. +3. new?(optional): The new values of the row that triggered the event. +4. old?(optional): The old values of the row that triggered the event. +*/ export type SubscriptionMessage = { tableName: string; action: string; new?: Record; old?: Record; }; + +/* +SendMessage is a type for a function that takes a single payload of type T and returns a Promise. +SendMessages is a type for a function that takes an array of payloads of type T and returns a Promise. +*/ export type SendMessage = (data: T) => Promise; export type SendMessages = (data: T[]) => Promise; + +/* +QueueAdapter is a type for a record of SendMessage and SendMessages functions. +*/ export type QueueAdapter = { sendMessage: SendMessage; sendMessages: SendMessages; }; +// ConsumerFn is a function type that takes a SubscriptionMessage or a string and returns void. export type ConsumerFn = ( notification: SubscriptionMessage | string, ) => void; + +/* +Consumer is a union type that can either be a QueueConfig or a ConsumerFn. +*/ export type Consumer = QueueConfig | ConsumerFn; diff --git a/packages/notifications/src/db/connection.ts b/packages/notifications/src/db/connection.ts index f788b3a..c55e01f 100644 --- a/packages/notifications/src/db/connection.ts +++ b/packages/notifications/src/db/connection.ts @@ -1,10 +1,19 @@ import postgres, { type Sql, type Options } from 'postgres'; import type { OptionType } from '../core/types'; +// DbOptions is an interface that extends Options and has a channels property which is an optional array of string values. + export interface DbOptions extends Options { channels?: string[]; } +/* createSql is a function that returns a function that creates a new instance of the Sql class. +It takes db_url and options as arguments, where: +1. db_url is the connection string for the database. +2. options includes: + a. channels: An optional array of string values. + b. pgListenMode: 'listen' | 'subscribe' - Determines whether to use LISTEN or SUBSCRIBE mode. +*/ export const createSql = (db_url: string, options: DbOptions) => { let client: Sql | null = null; return () => { diff --git a/packages/notifications/src/db/listen.ts b/packages/notifications/src/db/listen.ts index a603683..908b70d 100644 --- a/packages/notifications/src/db/listen.ts +++ b/packages/notifications/src/db/listen.ts @@ -1,6 +1,12 @@ import type { Sql } from 'postgres'; import { createSql, type DbOptions } from './connection'; +/* +listen is an async function that sets up the eventTarget and starts listening to the provided channels. +It takes db_url, eventTarget, and options as arguments and uses the createSql function to create a new instance of the Sql class. +If no channels are provided, it throws an error. +If channels are provided, it sets up a listener for each channel using the listen method of the Sql class. +*/ export const listen = async ( db_url: string, eventTarget: EventTarget, @@ -8,9 +14,12 @@ export const listen = async ( ) => { const { channels } = options; const sql: Sql = createSql(db_url, {})() as never; + + // If no channels are provided, throw an error if (!channels || channels.length === 0) { throw new Error('No channels provided'); } + // Listen on each channel and dispatch notifications for (const channelName of channels) sql.listen(channelName!, (value) => { eventTarget.dispatchEvent( diff --git a/packages/notifications/src/db/subscribe.ts b/packages/notifications/src/db/subscribe.ts index cf87ac1..83eca38 100644 --- a/packages/notifications/src/db/subscribe.ts +++ b/packages/notifications/src/db/subscribe.ts @@ -3,13 +3,29 @@ import { match, P } from 'ts-pattern'; import { EventName } from '../core/types'; import { createSql, type DbOptions } from './connection'; -export const subscirbe = async ( +/* +subscribe is an async function that sets up the eventTarget and starts listening to the provided channels. +It takes db_url, eventTarget, and options as arguments and uses the createSql function to create a new instance of the Sql class. +If there are no channels to listen to, it throws an error. + +If channels are provided, it sets up a listener for each channel using the subscribe method of the Sql class. +I want to describe about the subscribe method here. +*/ +export const subscribe = async ( db_url: string, eventTarget: EventTarget, options: DbOptions, ) => { const sql: Sql = createSql(db_url, options)() as never; sql.subscribe('*:inbound', (row, data) => { + /* + Use ts-pattern to match the shape and contents of the `data` object. + + Three scenarios are handled: + 1. INSERT or DELETE: These events don't require `old` values. + 2. UPDATE: This event includes both `new` (row) and `old` values. + 3. Default: If the command doesn't match any of the above, return null. + */ const value = match(data) .with( { command: P.when((v) => v === 'insert' || v === 'delete') }, @@ -35,6 +51,10 @@ export const subscirbe = async ( ) .otherwise(() => null); + /* + * If the pattern matching produced a valid value, + * dispatch it as a CustomEvent on the eventTarget. + */ if (value) { eventTarget.dispatchEvent( new CustomEvent(EventName, { diff --git a/packages/notifications/src/index.ts b/packages/notifications/src/index.ts index ad63b66..0efcf2d 100644 --- a/packages/notifications/src/index.ts +++ b/packages/notifications/src/index.ts @@ -1,16 +1,40 @@ +/* +Import: +1. Pattern matching utility from ts-pattern +2. pubSub function which sets up an EventTarget for managing events +3. The subscribe and listen handlers for different modes +*/ + import { match } from 'ts-pattern'; import { pubSub } from './core/pubsub'; import type { Consumer } from './core/types'; -import { subscirbe } from './db/subscribe'; +import { subscribe } from './db/subscribe'; import { listen } from './db/listen'; +/* +Options interface for the notify function includes: +1. pgListenMode: 'listen' | 'subscribe' - Determines whether to use LISTEN or SUBSCRIBE mode. +2. subscribers: Record - A map of channel names to one or more subscribers. +*/ export interface Options { pgListenMode: 'listen' | 'subscribe'; subscribers: Record[]>; } -const listenHandler = async (db_url: string, opitons: Options) => { - const { subscribers } = opitons; +/* +listenHandler and subscribeHandler are the handlers for different notification modes. +*/ + +/* +listenHandler sets up the eventTarget and starts listening to the provided channels. +It takes db_url and options as arguments, where: +1. db_url is the connection string for the database. +2. options includes: + a. subscribers: Record - A map of one or more subscribers. + b. pgListenMode: 'listen' +*/ +const listenHandler = async (db_url: string, options: Options) => { + const { subscribers } = options; const channelNames = Object.keys(subscribers); if (!channelNames || channelNames.length < 1) { throw 'At least one subscriber is mandatory when pgListenMode: listen'; @@ -19,18 +43,31 @@ const listenHandler = async (db_url: string, opitons: Options) => { listen(db_url, eventTarget, { channels: channelNames }); }; -const subscribeHandler = async (db_url: string, opitons: Options) => { - const { subscribers } = opitons; +/* +subscribeHandler is the handler for the 'subscribe' mode. +It takes db_url and options as arguments, where: +1. db_url is the connection string for the database. +2. options includes: + a. subscribers: Record - A map of one or more subscribers. + b. pgListenMode: 'subscribe' +*/ +const subscribeHandler = async (db_url: string, options: Options) => { + const { subscribers } = options; const eventTarget = await pubSub(subscribers); - subscirbe(db_url, eventTarget, {}); + subscribe(db_url, eventTarget, {}); }; +/* +notify is the main function that calls the appropriate handler based on the pgListenMode provided in the options parameter. +We use ts-pattern for clean and exhaustive pattern matching. +Once the handler is selected, it is invoked with db_url and options as arguments. +*/ export const notify = async (db_url: string, options: Options) => { - const hanlder = match(options.pgListenMode) + const handler = match(options.pgListenMode) .with('listen', () => listenHandler) .with('subscribe', () => subscribeHandler) .otherwise(() => null); - if (hanlder) { - hanlder(db_url, options); + if (handler) { + handler(db_url, options); } }; diff --git a/packages/notifications/src/queue-adapters/adapter.ts b/packages/notifications/src/queue-adapters/adapter.ts index 0493389..d71b9ba 100644 --- a/packages/notifications/src/queue-adapters/adapter.ts +++ b/packages/notifications/src/queue-adapters/adapter.ts @@ -1,6 +1,14 @@ import { match } from 'ts-pattern'; import type { QueueAdapter, QueueConfig } from '../core/types'; +/* + getQueue is an async function that returns the appropriate QueueAdapter + based on the 'type' specified in the QueueConfig object. + + It uses ts-pattern to match on the `type` field and dynamically imports the relevant adapter module. + Each adapter module is expected to return a function that takes `options` and returns a QueueAdapter. + Based on the value of the queue type, the appropriate adapter is dynamically imported and its initialized instance is returned. + */ export const getQueue = async ({ type, options, diff --git a/packages/notifications/src/queue-adapters/azure-service-bus.ts b/packages/notifications/src/queue-adapters/azure-service-bus.ts index 883d041..ea212d2 100644 --- a/packages/notifications/src/queue-adapters/azure-service-bus.ts +++ b/packages/notifications/src/queue-adapters/azure-service-bus.ts @@ -5,6 +5,18 @@ import { } from '@azure/service-bus'; import type { QueueAdapter } from '../core/types'; +/* + + azureServiceBusAdaptor is a function that creates and returns a QueueAdapter + for sending messages to an Azure Service Bus queue. + + It initializes a ServiceBusClient using the provided connection string and creates a + sender for the specified queue. The returned adapter exposes two methods: + 1. sendMessage - sends a single message to the Azure queue. + 2. sendMessages - sends multiple messages as a batch to the Azure queue. + +*/ + export const azureServiceBusAdaptor = ( options: Record, ): QueueAdapter => { diff --git a/packages/notifications/src/queue-adapters/sqs.ts b/packages/notifications/src/queue-adapters/sqs.ts index 885a021..4dd03c8 100644 --- a/packages/notifications/src/queue-adapters/sqs.ts +++ b/packages/notifications/src/queue-adapters/sqs.ts @@ -1,6 +1,18 @@ import type { QueueAdapter } from '../core/types'; import { SendMessageCommand, SQSClient } from '@aws-sdk/client-sqs'; +/* + sqlAdapter is a function that creates and returns a QueueAdapter + for sending messages to an Amazon SQS (Simple Queue Service) queue. + + It initializes an SQSClient and uses the provided queueUrl to configure + message sending. The adapter provides two methods: + + 1. sendMessage - serializes and sends a single message to the specified SQS queue. + 2. sendMessages - serializes and sends an array of messages as a single SQS message + +*/ + export const sqlAdapter = (options: Record): QueueAdapter => { const client = new SQSClient({}); const { queueUrl } = options;