Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions packages/notifications/src/core/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import {
} from './types';
import { getQueue } from '../queue-adapters/adapter';

/* getHandler takes a consumer as an argument and returns a function or null depending on the type of the consumer.
There are two possible cases:
1. If consumer is an object (i.e. a queue config), getQueue is called to get an adapter and it returns the adapter’s sendMessage method.
2. If consumer is a function, it is returned directly.
*/
const getHandler = async <T>(consumer: Consumer<T>) => {
return await match(consumer)
.with(
Expand All @@ -23,6 +28,16 @@ const getHandler = async <T>(consumer: Consumer<T>) => {
.otherwise(() => null);
};

/*
pubSub sets up an EventTarget for managing events.
It takes subscribers as an argument, where:
1. subscribers: Record<string, Consumer[]> - A map of channel names to one or more subscribers.

Returns:
1. EventTarget - The event target for managing events.

pubsub uses getHandler to get the handler for each subscriber.
*/
export const pubSub = async <T>(
subscribers: Record<string, Consumer<T>[]>,
): Promise<EventTarget> => {
Expand Down
34 changes: 34 additions & 0 deletions packages/notifications/src/core/types.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,60 @@
import type { PostgresType } from 'postgres';

// Constant EventName
export const EventName = 'notification';

/*
OptionType is a type alias for a record of string keys to PostgresType values.
QueueType is a type alias for a string literal union of 'bullmq', 'sqs', or 'asb'.
*/
export type OptionType = Record<string, PostgresType>;
export type QueueType = 'bullmq' | 'sqs' | 'asb';

/*
QueueConfig is a generic interface that takes a type parameter T and has two properties:
1. type: QueueType - A string literal union of 'bullmq', 'sqs', or 'asb'.
2. options: Record<string, T> - A record of string keys to T values.
*/
export interface QueueConfig<T> {
type: QueueType;
options: Record<string, T>;
}

/*
SubscriptionMessage is a generic type that represents a notification message and has four properties:
1. tableName: string - The name of the table that triggered the event (e.g., INSERT, UPDATE, DELETE).
2. action: string - The action that triggered the event.
3. new?(optional): The new values of the row that triggered the event.
4. old?(optional): The old values of the row that triggered the event.
*/
export type SubscriptionMessage<T> = {
tableName: string;
action: string;
new?: Record<string, T>;
old?: Record<string, T>;
};

/*
SendMessage is a type for a function that takes a single payload of type T and returns a Promise<void>.
SendMessages is a type for a function that takes an array of payloads of type T and returns a Promise<void>.
*/
export type SendMessage = <T>(data: T) => Promise<void>;
export type SendMessages = <T>(data: T[]) => Promise<void>;

/*
QueueAdapter is a type for a record of SendMessage and SendMessages functions.
*/
export type QueueAdapter = {
sendMessage: SendMessage;
sendMessages: SendMessages;
};

// ConsumerFn is a function type that takes a SubscriptionMessage<T> or a string and returns void.
export type ConsumerFn = <T>(
notification: SubscriptionMessage<T> | string,
) => void;

/*
Consumer is a union type that can either be a QueueConfig or a ConsumerFn.
*/
export type Consumer<T> = QueueConfig<T> | ConsumerFn;
9 changes: 9 additions & 0 deletions packages/notifications/src/db/connection.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
import postgres, { type Sql, type Options } from 'postgres';
import type { OptionType } from '../core/types';

// DbOptions is an interface that extends Options<OptionType> and has a channels property which is an optional array of string values.

export interface DbOptions extends Options<OptionType> {
channels?: string[];
}

/* createSql is a function that returns a function that creates a new instance of the Sql class.
It takes db_url and options as arguments, where:
1. db_url is the connection string for the database.
2. options includes:
a. channels: An optional array of string values.
b. pgListenMode: 'listen' | 'subscribe' - Determines whether to use LISTEN or SUBSCRIBE mode.
*/
export const createSql = (db_url: string, options: DbOptions) => {
let client: Sql | null = null;
return () => {
Expand Down
9 changes: 9 additions & 0 deletions packages/notifications/src/db/listen.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
import type { Sql } from 'postgres';
import { createSql, type DbOptions } from './connection';

/*
listen is an async function that sets up the eventTarget and starts listening to the provided channels.
It takes db_url, eventTarget, and options as arguments and uses the createSql function to create a new instance of the Sql class.
If no channels are provided, it throws an error.
If channels are provided, it sets up a listener for each channel using the listen method of the Sql class.
*/
export const listen = async (
db_url: string,
eventTarget: EventTarget,
options: DbOptions,
) => {
const { channels } = options;
const sql: Sql = createSql(db_url, {})() as never;

// If no channels are provided, throw an error
if (!channels || channels.length === 0) {
throw new Error('No channels provided');
}
// Listen on each channel and dispatch notifications
for (const channelName of channels)
sql.listen(channelName!, (value) => {
eventTarget.dispatchEvent(
Expand Down
22 changes: 21 additions & 1 deletion packages/notifications/src/db/subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,29 @@ import { match, P } from 'ts-pattern';
import { EventName } from '../core/types';
import { createSql, type DbOptions } from './connection';

export const subscirbe = async (
/*
subscribe is an async function that sets up the eventTarget and starts listening to the provided channels.
It takes db_url, eventTarget, and options as arguments and uses the createSql function to create a new instance of the Sql class.
If there are no channels to listen to, it throws an error.

If channels are provided, it sets up a listener for each channel using the subscribe method of the Sql class.
I want to describe about the subscribe method here.
*/
export const subscribe = async (
db_url: string,
eventTarget: EventTarget,
options: DbOptions,
) => {
const sql: Sql = createSql(db_url, options)() as never;
sql.subscribe('*:inbound', (row, data) => {
/*
Use ts-pattern to match the shape and contents of the `data` object.

Three scenarios are handled:
1. INSERT or DELETE: These events don't require `old` values.
2. UPDATE: This event includes both `new` (row) and `old` values.
3. Default: If the command doesn't match any of the above, return null.
*/
const value = match(data)
.with(
{ command: P.when((v) => v === 'insert' || v === 'delete') },
Expand All @@ -35,6 +51,10 @@ export const subscirbe = async (
)
.otherwise(() => null);

/*
* If the pattern matching produced a valid value,
* dispatch it as a CustomEvent on the eventTarget.
*/
if (value) {
eventTarget.dispatchEvent(
new CustomEvent(EventName, {
Expand Down
55 changes: 46 additions & 9 deletions packages/notifications/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,40 @@
/*
Import:
1. Pattern matching utility from ts-pattern
2. pubSub function which sets up an EventTarget for managing events
3. The subscribe and listen handlers for different modes
*/

import { match } from 'ts-pattern';
import { pubSub } from './core/pubsub';
import type { Consumer } from './core/types';
import { subscirbe } from './db/subscribe';
import { subscribe } from './db/subscribe';
import { listen } from './db/listen';

/*
Options interface for the notify function includes:
1. pgListenMode: 'listen' | 'subscribe' - Determines whether to use LISTEN or SUBSCRIBE mode.
2. subscribers: Record<string, Consumer[]> - A map of channel names to one or more subscribers.
*/
export interface Options<T> {
pgListenMode: 'listen' | 'subscribe';
subscribers: Record<string, Consumer<T>[]>;
}

const listenHandler = async <T>(db_url: string, opitons: Options<T>) => {
const { subscribers } = opitons;
/*
listenHandler and subscribeHandler are the handlers for different notification modes.
*/

/*
listenHandler sets up the eventTarget and starts listening to the provided channels.
It takes db_url and options as arguments, where:
1. db_url is the connection string for the database.
2. options includes:
a. subscribers: Record<string, Consumer[]> - A map of one or more subscribers.
b. pgListenMode: 'listen'
*/
const listenHandler = async <T>(db_url: string, options: Options<T>) => {
const { subscribers } = options;
const channelNames = Object.keys(subscribers);
if (!channelNames || channelNames.length < 1) {
throw 'At least one subscriber is mandatory when pgListenMode: listen';
Expand All @@ -19,18 +43,31 @@ const listenHandler = async <T>(db_url: string, opitons: Options<T>) => {
listen(db_url, eventTarget, { channels: channelNames });
};

const subscribeHandler = async <T>(db_url: string, opitons: Options<T>) => {
const { subscribers } = opitons;
/*
subscribeHandler is the handler for the 'subscribe' mode.
It takes db_url and options as arguments, where:
1. db_url is the connection string for the database.
2. options includes:
a. subscribers: Record<string, Consumer[]> - A map of one or more subscribers.
b. pgListenMode: 'subscribe'
*/
const subscribeHandler = async <T>(db_url: string, options: Options<T>) => {
const { subscribers } = options;
const eventTarget = await pubSub(subscribers);
subscirbe(db_url, eventTarget, {});
subscribe(db_url, eventTarget, {});
};

/*
notify is the main function that calls the appropriate handler based on the pgListenMode provided in the options parameter.
We use ts-pattern for clean and exhaustive pattern matching.
Once the handler is selected, it is invoked with db_url and options as arguments.
*/
export const notify = async <T>(db_url: string, options: Options<T>) => {
const hanlder = match(options.pgListenMode)
const handler = match(options.pgListenMode)
.with('listen', () => listenHandler)
.with('subscribe', () => subscribeHandler)
.otherwise(() => null);
if (hanlder) {
hanlder(db_url, options);
if (handler) {
handler(db_url, options);
}
};
8 changes: 8 additions & 0 deletions packages/notifications/src/queue-adapters/adapter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import { match } from 'ts-pattern';
import type { QueueAdapter, QueueConfig } from '../core/types';

/*
getQueue is an async function that returns the appropriate QueueAdapter
based on the 'type' specified in the QueueConfig object.

It uses ts-pattern to match on the `type` field and dynamically imports the relevant adapter module.
Each adapter module is expected to return a function that takes `options` and returns a QueueAdapter.
Based on the value of the queue type, the appropriate adapter is dynamically imported and its initialized instance is returned.
*/
export const getQueue = async <T>({
type,
options,
Expand Down
12 changes: 12 additions & 0 deletions packages/notifications/src/queue-adapters/azure-service-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ import {
} from '@azure/service-bus';
import type { QueueAdapter } from '../core/types';

/*

azureServiceBusAdaptor is a function that creates and returns a QueueAdapter
for sending messages to an Azure Service Bus queue.

It initializes a ServiceBusClient using the provided connection string and creates a
sender for the specified queue. The returned adapter exposes two methods:
1. sendMessage - sends a single message to the Azure queue.
2. sendMessages - sends multiple messages as a batch to the Azure queue.

*/

export const azureServiceBusAdaptor = <T extends string>(
options: Record<string, T>,
): QueueAdapter => {
Expand Down
12 changes: 12 additions & 0 deletions packages/notifications/src/queue-adapters/sqs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
import type { QueueAdapter } from '../core/types';
import { SendMessageCommand, SQSClient } from '@aws-sdk/client-sqs';

/*
sqlAdapter is a function that creates and returns a QueueAdapter
for sending messages to an Amazon SQS (Simple Queue Service) queue.

It initializes an SQSClient and uses the provided queueUrl to configure
message sending. The adapter provides two methods:

1. sendMessage - serializes and sends a single message to the specified SQS queue.
2. sendMessages - serializes and sends an array of messages as a single SQS message

*/

export const sqlAdapter = <T>(options: Record<string, T>): QueueAdapter => {
const client = new SQSClient({});
const { queueUrl } = options;
Expand Down