Expose a generic listen method on the database adapters

This commit is contained in:
Alex Gleason 2024-10-19 21:01:42 -05:00
parent 4df61c0c59
commit 013917d612
No known key found for this signature in database
GPG key ID: 7211D1F99744FBB7
4 changed files with 10 additions and 16 deletions

View file

@ -1,4 +1,3 @@
import { NostrEvent } from '@nostrify/nostrify';
import { Kysely } from 'kysely'; import { Kysely } from 'kysely';
import { DittoTables } from '@/db/DittoTables.ts'; import { DittoTables } from '@/db/DittoTables.ts';
@ -7,7 +6,7 @@ export interface DittoDatabase {
readonly kysely: Kysely<DittoTables>; readonly kysely: Kysely<DittoTables>;
readonly poolSize: number; readonly poolSize: number;
readonly availableConnections: number; readonly availableConnections: number;
readonly listenNostr: (onEvent: (event: NostrEvent) => void) => void; listen(channel: string, callback: (payload: string) => void): void;
} }
export interface DittoDatabaseOpts { export interface DittoDatabaseOpts {

View file

@ -1,6 +1,5 @@
import { PGlite } from '@electric-sql/pglite'; import { PGlite } from '@electric-sql/pglite';
import { pg_trgm } from '@electric-sql/pglite/contrib/pg_trgm'; import { pg_trgm } from '@electric-sql/pglite/contrib/pg_trgm';
import { NostrEvent } from '@nostrify/nostrify';
import { PgliteDialect } from '@soapbox/kysely-pglite'; import { PgliteDialect } from '@soapbox/kysely-pglite';
import { Kysely } from 'kysely'; import { Kysely } from 'kysely';
@ -27,17 +26,15 @@ export class DittoPglite {
log: KyselyLogger, log: KyselyLogger,
}); });
const listenNostr = (onEvent: (event: NostrEvent) => void): void => { const listen = (channel: string, callback: (payload: string) => void): void => {
pglite.listen('nostr_event', (payload) => { pglite.listen(channel, callback);
onEvent(JSON.parse(payload));
});
}; };
return { return {
kysely, kysely,
poolSize: 1, poolSize: 1,
availableConnections: 1, availableConnections: 1,
listenNostr, listen,
}; };
} }
} }

View file

@ -1,4 +1,3 @@
import { NostrEvent } from '@nostrify/nostrify';
import { import {
BinaryOperationNode, BinaryOperationNode,
FunctionNode, FunctionNode,
@ -41,10 +40,8 @@ export class DittoPostgres {
log: KyselyLogger, log: KyselyLogger,
}); });
const listenNostr = (onEvent: (event: NostrEvent) => void): void => { const listen = (channel: string, callback: (payload: string) => void): void => {
pg.listen('nostr_event', (payload) => { pg.listen(channel, callback);
onEvent(JSON.parse(payload));
});
}; };
return { return {
@ -55,7 +52,7 @@ export class DittoPostgres {
get availableConnections() { get availableConnections() {
return pg.connections.idle; return pg.connections.idle;
}, },
listenNostr, listen,
}; };
} }
} }

View file

@ -6,11 +6,12 @@ import { Storages } from '@/storages.ts';
const sem = new Semaphore(1); const sem = new Semaphore(1);
export async function startNotify(): Promise<void> { export async function startNotify(): Promise<void> {
const { listenNostr } = await Storages.database(); const { listen } = await Storages.database();
listenNostr((event) => { listen('nostr_event', (payload) => {
sem.lock(async () => { sem.lock(async () => {
try { try {
const event = JSON.parse(payload);
await pipeline.handleEvent(event, AbortSignal.timeout(5000)); await pipeline.handleEvent(event, AbortSignal.timeout(5000));
} catch (e) { } catch (e) {
console.warn(e); console.warn(e);