Skip to content

Commit 0b0f875

Browse files
committed
Add on notification listener support
Change-type: minor
1 parent 199e5d1 commit 0b0f875

File tree

1 file changed

+59
-0
lines changed

1 file changed

+59
-0
lines changed

src/database-layer/db.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,13 @@ export interface Database extends BaseDatabase {
106106
) => Promise<Result>;
107107
transaction: TransactionFn;
108108
readTransaction: TransactionFn;
109+
on?: (
110+
name: 'notification',
111+
fn: (...args: any[]) => Promise<void>,
112+
options?: {
113+
channel?: string;
114+
},
115+
) => void;
109116
}
110117

111118
interface EngineParams {
@@ -707,9 +714,61 @@ if (maybePg != null) {
707714
`);
708715
}
709716
}
717+
718+
// Connect and listen for notifications
719+
async function listen(
720+
channel: string,
721+
fn: (...args: any[]) => Promise<void>,
722+
) {
723+
let listenerClient: Pg.PoolClient | null = null;
724+
725+
// Clean up and reconnect listener
726+
const reconnect = () => {
727+
try {
728+
listenerClient?.release();
729+
} catch (err) {
730+
// Ignore listener client release errors
731+
}
732+
setTimeout(() => {
733+
void listen(channel, fn);
734+
}, 1000);
735+
};
736+
737+
try {
738+
listenerClient = await pool.connect();
739+
listenerClient.on('end', reconnect);
740+
listenerClient.on('notification', (msg) => {
741+
if (msg.channel === channel) {
742+
void fn(msg).catch((err) => {
743+
console.error(`Error handling message for '${channel}':`, err);
744+
});
745+
}
746+
});
747+
await listenerClient.query(`LISTEN "${channel}"`);
748+
} catch (err) {
749+
console.error(
750+
`Error setting up listener client for '${channel}':`,
751+
err,
752+
);
753+
reconnect();
754+
}
755+
}
756+
710757
return {
711758
engine: Engines.postgres,
712759
executeSql: atomicExecuteSql,
760+
on: async (name, fn, options) => {
761+
if (name !== 'notification') {
762+
throw new Error(`Unsupported listener type: ${name}`);
763+
} else if (options?.channel == null) {
764+
throw new Error('Missing channel option for notification listener');
765+
} else if (options.channel.includes('"')) {
766+
throw new Error(
767+
`Invalid channel name for task LISTEN: ${options.channel}`,
768+
);
769+
}
770+
await listen(options.channel, fn);
771+
},
713772
transaction: createTransaction(async (stackTraceErr, timeoutMS) => {
714773
const client = await pool.connect();
715774
const tx = new PostgresTx(client, false, stackTraceErr, timeoutMS);

0 commit comments

Comments
 (0)