From 8155fae89a338d218913ad4f6136862fa1cc1e93 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sat, 19 Oct 2024 20:26:04 -0500 Subject: [PATCH 1/3] Perform listen/notify of Nostr events to the pipeline --- src/config.ts | 7 ++++++ src/db/DittoDatabase.ts | 2 ++ src/db/adapters/DittoPglite.ts | 8 +++++++ src/db/adapters/DittoPostgres.ts | 8 +++++++ src/db/migrations/039_pg_notify.ts | 37 ++++++++++++++++++++++++++++++ src/notify.ts | 20 ++++++++++++++++ src/startup.ts | 5 ++++ 7 files changed, 87 insertions(+) create mode 100644 src/db/migrations/039_pg_notify.ts create mode 100644 src/notify.ts diff --git a/src/config.ts b/src/config.ts index cd8e2a4c..fba65159 100644 --- a/src/config.ts +++ b/src/config.ts @@ -249,6 +249,13 @@ class Conf { .split(/[, ]+/g) .map(Number); } + /** + * Whether Ditto should subscribe to Nostr events from the Postgres database itself. + * This would make Nostr events inserted directly into Postgres available to the streaming API and relay. + */ + static get notifyEnabled(): boolean { + return optionalBooleanSchema.parse(Deno.env.get('NOTIFY_ENABLED')) ?? false; + } /** Whether to enable Ditto cron jobs. */ static get cronEnabled(): boolean { return optionalBooleanSchema.parse(Deno.env.get('CRON_ENABLED')) ?? true; diff --git a/src/db/DittoDatabase.ts b/src/db/DittoDatabase.ts index ec9a103d..7df322cc 100644 --- a/src/db/DittoDatabase.ts +++ b/src/db/DittoDatabase.ts @@ -1,3 +1,4 @@ +import { NostrEvent } from '@nostrify/nostrify'; import { Kysely } from 'kysely'; import { DittoTables } from '@/db/DittoTables.ts'; @@ -6,6 +7,7 @@ export interface DittoDatabase { readonly kysely: Kysely; readonly poolSize: number; readonly availableConnections: number; + readonly listenNostr: (onEvent: (event: NostrEvent) => void) => void; } export interface DittoDatabaseOpts { diff --git a/src/db/adapters/DittoPglite.ts b/src/db/adapters/DittoPglite.ts index 6e4ae488..3bbafe3c 100644 --- a/src/db/adapters/DittoPglite.ts +++ b/src/db/adapters/DittoPglite.ts @@ -1,5 +1,6 @@ import { PGlite } from '@electric-sql/pglite'; import { pg_trgm } from '@electric-sql/pglite/contrib/pg_trgm'; +import { NostrEvent } from '@nostrify/nostrify'; import { PgliteDialect } from '@soapbox/kysely-pglite'; import { Kysely } from 'kysely'; @@ -26,10 +27,17 @@ export class DittoPglite { log: KyselyLogger, }); + const listenNostr = (onEvent: (event: NostrEvent) => void): void => { + pglite.listen('nostr_event', (payload) => { + onEvent(JSON.parse(payload)); + }); + }; + return { kysely, poolSize: 1, availableConnections: 1, + listenNostr, }; } } diff --git a/src/db/adapters/DittoPostgres.ts b/src/db/adapters/DittoPostgres.ts index f1a5bcc9..36ea8bc8 100644 --- a/src/db/adapters/DittoPostgres.ts +++ b/src/db/adapters/DittoPostgres.ts @@ -1,3 +1,4 @@ +import { NostrEvent } from '@nostrify/nostrify'; import { BinaryOperationNode, FunctionNode, @@ -40,6 +41,12 @@ export class DittoPostgres { log: KyselyLogger, }); + const listenNostr = (onEvent: (event: NostrEvent) => void): void => { + pg.listen('nostr_event', (payload) => { + onEvent(JSON.parse(payload)); + }); + }; + return { kysely, get poolSize() { @@ -48,6 +55,7 @@ export class DittoPostgres { get availableConnections() { return pg.connections.idle; }, + listenNostr, }; } } diff --git a/src/db/migrations/039_pg_notify.ts b/src/db/migrations/039_pg_notify.ts new file mode 100644 index 00000000..6d75844d --- /dev/null +++ b/src/db/migrations/039_pg_notify.ts @@ -0,0 +1,37 @@ +import { Kysely, sql } from 'kysely'; + +export async function up(db: Kysely): Promise { + await sql` + CREATE OR REPLACE FUNCTION notify_nostr_event() + RETURNS TRIGGER AS $$ + DECLARE + payload JSON; + BEGIN + payload := json_build_object( + 'id', NEW.id, + 'kind', NEW.kind, + 'pubkey', NEW.pubkey, + 'content', NEW.content, + 'tags', NEW.tags, + 'created_at', NEW.created_at, + 'sig', NEW.sig + ); + + PERFORM pg_notify('nostr_event', payload::text); + + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + `.execute(db); + + await sql` + CREATE TRIGGER nostr_event_trigger + AFTER INSERT OR UPDATE ON nostr_events + FOR EACH ROW EXECUTE FUNCTION notify_nostr_event() + `.execute(db); +} + +export async function down(db: Kysely): Promise { + await sql`DROP TRIGGER nostr_event_trigger ON nostr_events`.execute(db); + await sql`DROP FUNCTION notify_nostr_event()`.execute(db); +} diff --git a/src/notify.ts b/src/notify.ts new file mode 100644 index 00000000..a39d574b --- /dev/null +++ b/src/notify.ts @@ -0,0 +1,20 @@ +import { Semaphore } from '@lambdalisue/async'; + +import * as pipeline from '@/pipeline.ts'; +import { Storages } from '@/storages.ts'; + +const sem = new Semaphore(1); + +export async function startNotify(): Promise { + const { listenNostr } = await Storages.database(); + + listenNostr((event) => { + sem.lock(async () => { + try { + await pipeline.handleEvent(event, AbortSignal.timeout(5000)); + } catch (e) { + console.warn(e); + } + }); + }); +} diff --git a/src/startup.ts b/src/startup.ts index f3282d73..16439c0b 100644 --- a/src/startup.ts +++ b/src/startup.ts @@ -3,11 +3,16 @@ import { Conf } from '@/config.ts'; import { cron } from '@/cron.ts'; import { startFirehose } from '@/firehose.ts'; +import { startNotify } from '@/notify.ts'; if (Conf.firehoseEnabled) { startFirehose(); } +if (Conf.notifyEnabled) { + startNotify(); +} + if (Conf.cronEnabled) { cron(); } From 4df61c0c59379f47d1bb38e566f966b8ef1678cf Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sat, 19 Oct 2024 20:55:09 -0500 Subject: [PATCH 2/3] pipeline: remove "event already in database" check --- src/pipeline.ts | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/pipeline.ts b/src/pipeline.ts index 57a5f76a..867f2c22 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -40,7 +40,6 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise ${event.id}`); pipelineEventsCounter.inc({ kind: event.kind }); @@ -111,13 +110,6 @@ function encounterEvent(event: NostrEvent): boolean { return encountered; } -/** Check if the event already exists in the database. */ -async function existsInDB(event: DittoEvent): Promise { - const store = await Storages.db(); - const events = await store.query([{ ids: [event.id], limit: 1 }]); - return events.length > 0; -} - /** Check whether the event has a NIP-70 `-` tag. */ function isProtectedEvent(event: NostrEvent): boolean { return event.tags.some(([name]) => name === '-'); From 013917d612a9fbf033875ed34d851b8f1bf6f6cd Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sat, 19 Oct 2024 21:01:42 -0500 Subject: [PATCH 3/3] Expose a generic `listen` method on the database adapters --- src/db/DittoDatabase.ts | 3 +-- src/db/adapters/DittoPglite.ts | 9 +++------ src/db/adapters/DittoPostgres.ts | 9 +++------ src/notify.ts | 5 +++-- 4 files changed, 10 insertions(+), 16 deletions(-) diff --git a/src/db/DittoDatabase.ts b/src/db/DittoDatabase.ts index 7df322cc..3979cd12 100644 --- a/src/db/DittoDatabase.ts +++ b/src/db/DittoDatabase.ts @@ -1,4 +1,3 @@ -import { NostrEvent } from '@nostrify/nostrify'; import { Kysely } from 'kysely'; import { DittoTables } from '@/db/DittoTables.ts'; @@ -7,7 +6,7 @@ export interface DittoDatabase { readonly kysely: Kysely; readonly poolSize: number; readonly availableConnections: number; - readonly listenNostr: (onEvent: (event: NostrEvent) => void) => void; + listen(channel: string, callback: (payload: string) => void): void; } export interface DittoDatabaseOpts { diff --git a/src/db/adapters/DittoPglite.ts b/src/db/adapters/DittoPglite.ts index 3bbafe3c..df616458 100644 --- a/src/db/adapters/DittoPglite.ts +++ b/src/db/adapters/DittoPglite.ts @@ -1,6 +1,5 @@ import { PGlite } from '@electric-sql/pglite'; import { pg_trgm } from '@electric-sql/pglite/contrib/pg_trgm'; -import { NostrEvent } from '@nostrify/nostrify'; import { PgliteDialect } from '@soapbox/kysely-pglite'; import { Kysely } from 'kysely'; @@ -27,17 +26,15 @@ export class DittoPglite { log: KyselyLogger, }); - const listenNostr = (onEvent: (event: NostrEvent) => void): void => { - pglite.listen('nostr_event', (payload) => { - onEvent(JSON.parse(payload)); - }); + const listen = (channel: string, callback: (payload: string) => void): void => { + pglite.listen(channel, callback); }; return { kysely, poolSize: 1, availableConnections: 1, - listenNostr, + listen, }; } } diff --git a/src/db/adapters/DittoPostgres.ts b/src/db/adapters/DittoPostgres.ts index 36ea8bc8..180e4a7a 100644 --- a/src/db/adapters/DittoPostgres.ts +++ b/src/db/adapters/DittoPostgres.ts @@ -1,4 +1,3 @@ -import { NostrEvent } from '@nostrify/nostrify'; import { BinaryOperationNode, FunctionNode, @@ -41,10 +40,8 @@ export class DittoPostgres { log: KyselyLogger, }); - const listenNostr = (onEvent: (event: NostrEvent) => void): void => { - pg.listen('nostr_event', (payload) => { - onEvent(JSON.parse(payload)); - }); + const listen = (channel: string, callback: (payload: string) => void): void => { + pg.listen(channel, callback); }; return { @@ -55,7 +52,7 @@ export class DittoPostgres { get availableConnections() { return pg.connections.idle; }, - listenNostr, + listen, }; } } diff --git a/src/notify.ts b/src/notify.ts index a39d574b..4033d6c6 100644 --- a/src/notify.ts +++ b/src/notify.ts @@ -6,11 +6,12 @@ import { Storages } from '@/storages.ts'; const sem = new Semaphore(1); export async function startNotify(): Promise { - const { listenNostr } = await Storages.database(); + const { listen } = await Storages.database(); - listenNostr((event) => { + listen('nostr_event', (payload) => { sem.lock(async () => { try { + const event = JSON.parse(payload); await pipeline.handleEvent(event, AbortSignal.timeout(5000)); } catch (e) { console.warn(e);