Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions src/database-layer/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ export interface Database extends BaseDatabase {
) => Promise<Result>;
transaction: TransactionFn;
readTransaction: TransactionFn;
on?: (
name: 'notification',
fn: (...args: any[]) => Promise<void>,
options?: {
channel?: string;
},
) => void;
}

interface EngineParams {
Expand Down Expand Up @@ -706,9 +713,61 @@ if (maybePg != null) {
`);
}
}

// Connect and listen for notifications
async function listen(
channel: string,
fn: (...args: any[]) => Promise<void>,
) {
let listenerClient: Pg.PoolClient | null = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to live outside the listen function so we share the client across notification listeners rather than using a separate client from the pool for each listener


// Clean up and reconnect listener
const reconnect = () => {
try {
listenerClient?.release();
} catch (err) {
// Ignore listener client release errors
}
setTimeout(() => {
void listen(channel, fn);
}, 1000);
};

try {
listenerClient = await pool.connect();
listenerClient.on('end', reconnect);
listenerClient.on('notification', (msg) => {
if (msg.channel === channel) {
void fn(msg).catch((err) => {
console.error(`Error handling message for '${channel}':`, err);
});
}
});
await listenerClient.query(`LISTEN "${channel}"`);
} catch (err) {
console.error(
`Error setting up listener client for '${channel}':`,
err,
);
reconnect();
}
}

return {
engine: Engines.postgres,
executeSql: atomicExecuteSql,
on: async (name, fn, options) => {
if (name !== 'notification') {
throw new Error(`Unsupported listener type: ${name}`);
} else if (options?.channel == null) {
throw new Error('Missing channel option for notification listener');
} else if (options.channel.includes('"')) {
throw new Error(
`Invalid channel name for task LISTEN: ${options.channel}`,
);
}
await listen(options.channel, fn);
},
transaction: createTransaction(async (stackTraceErr, timeoutMS) => {
const client = await pool.connect();
const tx = new PostgresTx(client, false, stackTraceErr, timeoutMS);
Expand Down