From 8155fae89a338d218913ad4f6136862fa1cc1e93 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sat, 19 Oct 2024 20:26:04 -0500 Subject: [PATCH] Perform listen/notify of Nostr events to the pipeline --- src/config.ts | 7 ++++++ src/db/DittoDatabase.ts | 2 ++ src/db/adapters/DittoPglite.ts | 8 +++++++ src/db/adapters/DittoPostgres.ts | 8 +++++++ src/db/migrations/039_pg_notify.ts | 37 ++++++++++++++++++++++++++++++ src/notify.ts | 20 ++++++++++++++++ src/startup.ts | 5 ++++ 7 files changed, 87 insertions(+) create mode 100644 src/db/migrations/039_pg_notify.ts create mode 100644 src/notify.ts diff --git a/src/config.ts b/src/config.ts index cd8e2a4c..fba65159 100644 --- a/src/config.ts +++ b/src/config.ts @@ -249,6 +249,13 @@ class Conf { .split(/[, ]+/g) .map(Number); } + /** + * Whether Ditto should subscribe to Nostr events from the Postgres database itself. + * This would make Nostr events inserted directly into Postgres available to the streaming API and relay. + */ + static get notifyEnabled(): boolean { + return optionalBooleanSchema.parse(Deno.env.get('NOTIFY_ENABLED')) ?? false; + } /** Whether to enable Ditto cron jobs. */ static get cronEnabled(): boolean { return optionalBooleanSchema.parse(Deno.env.get('CRON_ENABLED')) ?? true; diff --git a/src/db/DittoDatabase.ts b/src/db/DittoDatabase.ts index ec9a103d..7df322cc 100644 --- a/src/db/DittoDatabase.ts +++ b/src/db/DittoDatabase.ts @@ -1,3 +1,4 @@ +import { NostrEvent } from '@nostrify/nostrify'; import { Kysely } from 'kysely'; import { DittoTables } from '@/db/DittoTables.ts'; @@ -6,6 +7,7 @@ export interface DittoDatabase { readonly kysely: Kysely; readonly poolSize: number; readonly availableConnections: number; + readonly listenNostr: (onEvent: (event: NostrEvent) => void) => void; } export interface DittoDatabaseOpts { diff --git a/src/db/adapters/DittoPglite.ts b/src/db/adapters/DittoPglite.ts index 6e4ae488..3bbafe3c 100644 --- a/src/db/adapters/DittoPglite.ts +++ b/src/db/adapters/DittoPglite.ts @@ -1,5 +1,6 @@ import { PGlite } from '@electric-sql/pglite'; import { pg_trgm } from '@electric-sql/pglite/contrib/pg_trgm'; +import { NostrEvent } from '@nostrify/nostrify'; import { PgliteDialect } from '@soapbox/kysely-pglite'; import { Kysely } from 'kysely'; @@ -26,10 +27,17 @@ export class DittoPglite { log: KyselyLogger, }); + const listenNostr = (onEvent: (event: NostrEvent) => void): void => { + pglite.listen('nostr_event', (payload) => { + onEvent(JSON.parse(payload)); + }); + }; + return { kysely, poolSize: 1, availableConnections: 1, + listenNostr, }; } } diff --git a/src/db/adapters/DittoPostgres.ts b/src/db/adapters/DittoPostgres.ts index f1a5bcc9..36ea8bc8 100644 --- a/src/db/adapters/DittoPostgres.ts +++ b/src/db/adapters/DittoPostgres.ts @@ -1,3 +1,4 @@ +import { NostrEvent } from '@nostrify/nostrify'; import { BinaryOperationNode, FunctionNode, @@ -40,6 +41,12 @@ export class DittoPostgres { log: KyselyLogger, }); + const listenNostr = (onEvent: (event: NostrEvent) => void): void => { + pg.listen('nostr_event', (payload) => { + onEvent(JSON.parse(payload)); + }); + }; + return { kysely, get poolSize() { @@ -48,6 +55,7 @@ export class DittoPostgres { get availableConnections() { return pg.connections.idle; }, + listenNostr, }; } } diff --git a/src/db/migrations/039_pg_notify.ts b/src/db/migrations/039_pg_notify.ts new file mode 100644 index 00000000..6d75844d --- /dev/null +++ b/src/db/migrations/039_pg_notify.ts @@ -0,0 +1,37 @@ +import { Kysely, sql } from 'kysely'; + +export async function up(db: Kysely): Promise { + await sql` + CREATE OR REPLACE FUNCTION notify_nostr_event() + RETURNS TRIGGER AS $$ + DECLARE + payload JSON; + BEGIN + payload := json_build_object( + 'id', NEW.id, + 'kind', NEW.kind, + 'pubkey', NEW.pubkey, + 'content', NEW.content, + 'tags', NEW.tags, + 'created_at', NEW.created_at, + 'sig', NEW.sig + ); + + PERFORM pg_notify('nostr_event', payload::text); + + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + `.execute(db); + + await sql` + CREATE TRIGGER nostr_event_trigger + AFTER INSERT OR UPDATE ON nostr_events + FOR EACH ROW EXECUTE FUNCTION notify_nostr_event() + `.execute(db); +} + +export async function down(db: Kysely): Promise { + await sql`DROP TRIGGER nostr_event_trigger ON nostr_events`.execute(db); + await sql`DROP FUNCTION notify_nostr_event()`.execute(db); +} diff --git a/src/notify.ts b/src/notify.ts new file mode 100644 index 00000000..a39d574b --- /dev/null +++ b/src/notify.ts @@ -0,0 +1,20 @@ +import { Semaphore } from '@lambdalisue/async'; + +import * as pipeline from '@/pipeline.ts'; +import { Storages } from '@/storages.ts'; + +const sem = new Semaphore(1); + +export async function startNotify(): Promise { + const { listenNostr } = await Storages.database(); + + listenNostr((event) => { + sem.lock(async () => { + try { + await pipeline.handleEvent(event, AbortSignal.timeout(5000)); + } catch (e) { + console.warn(e); + } + }); + }); +} diff --git a/src/startup.ts b/src/startup.ts index f3282d73..16439c0b 100644 --- a/src/startup.ts +++ b/src/startup.ts @@ -3,11 +3,16 @@ import { Conf } from '@/config.ts'; import { cron } from '@/cron.ts'; import { startFirehose } from '@/firehose.ts'; +import { startNotify } from '@/notify.ts'; if (Conf.firehoseEnabled) { startFirehose(); } +if (Conf.notifyEnabled) { + startNotify(); +} + if (Conf.cronEnabled) { cron(); }