diff --git a/src/config.ts b/src/config.ts index cd8e2a4c..fba65159 100644 --- a/src/config.ts +++ b/src/config.ts @@ -249,6 +249,13 @@ class Conf { .split(/[, ]+/g) .map(Number); } + /** + * Whether Ditto should subscribe to Nostr events from the Postgres database itself. + * This would make Nostr events inserted directly into Postgres available to the streaming API and relay. + */ + static get notifyEnabled(): boolean { + return optionalBooleanSchema.parse(Deno.env.get('NOTIFY_ENABLED')) ?? false; + } /** Whether to enable Ditto cron jobs. */ static get cronEnabled(): boolean { return optionalBooleanSchema.parse(Deno.env.get('CRON_ENABLED')) ?? true; diff --git a/src/db/DittoDatabase.ts b/src/db/DittoDatabase.ts index ec9a103d..3979cd12 100644 --- a/src/db/DittoDatabase.ts +++ b/src/db/DittoDatabase.ts @@ -6,6 +6,7 @@ export interface DittoDatabase { readonly kysely: Kysely; readonly poolSize: number; readonly availableConnections: number; + listen(channel: string, callback: (payload: string) => void): void; } export interface DittoDatabaseOpts { diff --git a/src/db/adapters/DittoPglite.ts b/src/db/adapters/DittoPglite.ts index 6e4ae488..df616458 100644 --- a/src/db/adapters/DittoPglite.ts +++ b/src/db/adapters/DittoPglite.ts @@ -26,10 +26,15 @@ export class DittoPglite { log: KyselyLogger, }); + const listen = (channel: string, callback: (payload: string) => void): void => { + pglite.listen(channel, callback); + }; + return { kysely, poolSize: 1, availableConnections: 1, + listen, }; } } diff --git a/src/db/adapters/DittoPostgres.ts b/src/db/adapters/DittoPostgres.ts index f1a5bcc9..180e4a7a 100644 --- a/src/db/adapters/DittoPostgres.ts +++ b/src/db/adapters/DittoPostgres.ts @@ -40,6 +40,10 @@ export class DittoPostgres { log: KyselyLogger, }); + const listen = (channel: string, callback: (payload: string) => void): void => { + pg.listen(channel, callback); + }; + return { kysely, get poolSize() { @@ -48,6 +52,7 @@ export class DittoPostgres { get availableConnections() { return pg.connections.idle; }, + listen, }; } } diff --git a/src/db/migrations/039_pg_notify.ts b/src/db/migrations/039_pg_notify.ts new file mode 100644 index 00000000..6d75844d --- /dev/null +++ b/src/db/migrations/039_pg_notify.ts @@ -0,0 +1,37 @@ +import { Kysely, sql } from 'kysely'; + +export async function up(db: Kysely): Promise { + await sql` + CREATE OR REPLACE FUNCTION notify_nostr_event() + RETURNS TRIGGER AS $$ + DECLARE + payload JSON; + BEGIN + payload := json_build_object( + 'id', NEW.id, + 'kind', NEW.kind, + 'pubkey', NEW.pubkey, + 'content', NEW.content, + 'tags', NEW.tags, + 'created_at', NEW.created_at, + 'sig', NEW.sig + ); + + PERFORM pg_notify('nostr_event', payload::text); + + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + `.execute(db); + + await sql` + CREATE TRIGGER nostr_event_trigger + AFTER INSERT OR UPDATE ON nostr_events + FOR EACH ROW EXECUTE FUNCTION notify_nostr_event() + `.execute(db); +} + +export async function down(db: Kysely): Promise { + await sql`DROP TRIGGER nostr_event_trigger ON nostr_events`.execute(db); + await sql`DROP FUNCTION notify_nostr_event()`.execute(db); +} diff --git a/src/notify.ts b/src/notify.ts new file mode 100644 index 00000000..4033d6c6 --- /dev/null +++ b/src/notify.ts @@ -0,0 +1,21 @@ +import { Semaphore } from '@lambdalisue/async'; + +import * as pipeline from '@/pipeline.ts'; +import { Storages } from '@/storages.ts'; + +const sem = new Semaphore(1); + +export async function startNotify(): Promise { + const { listen } = await Storages.database(); + + listen('nostr_event', (payload) => { + sem.lock(async () => { + try { + const event = JSON.parse(payload); + await pipeline.handleEvent(event, AbortSignal.timeout(5000)); + } catch (e) { + console.warn(e); + } + }); + }); +} diff --git a/src/pipeline.ts b/src/pipeline.ts index 57a5f76a..867f2c22 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -40,7 +40,6 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise ${event.id}`); pipelineEventsCounter.inc({ kind: event.kind }); @@ -111,13 +110,6 @@ function encounterEvent(event: NostrEvent): boolean { return encountered; } -/** Check if the event already exists in the database. */ -async function existsInDB(event: DittoEvent): Promise { - const store = await Storages.db(); - const events = await store.query([{ ids: [event.id], limit: 1 }]); - return events.length > 0; -} - /** Check whether the event has a NIP-70 `-` tag. */ function isProtectedEvent(event: NostrEvent): boolean { return event.tags.some(([name]) => name === '-'); diff --git a/src/startup.ts b/src/startup.ts index f3282d73..16439c0b 100644 --- a/src/startup.ts +++ b/src/startup.ts @@ -3,11 +3,16 @@ import { Conf } from '@/config.ts'; import { cron } from '@/cron.ts'; import { startFirehose } from '@/firehose.ts'; +import { startNotify } from '@/notify.ts'; if (Conf.firehoseEnabled) { startFirehose(); } +if (Conf.notifyEnabled) { + startNotify(); +} + if (Conf.cronEnabled) { cron(); }