From bc0830785a24e743d9c3cd1c3bff082c1e813c85 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 19 Feb 2025 16:19:16 -0600 Subject: [PATCH] Remove old startNotify code --- packages/ditto/notify.ts | 38 ------------------------- packages/ditto/startup.ts | 5 ---- packages/ditto/storages.ts | 7 ++++- packages/ditto/storages/DittoPgStore.ts | 26 ++++++++++------- 4 files changed, 22 insertions(+), 54 deletions(-) delete mode 100644 packages/ditto/notify.ts diff --git a/packages/ditto/notify.ts b/packages/ditto/notify.ts deleted file mode 100644 index 44ed5619..00000000 --- a/packages/ditto/notify.ts +++ /dev/null @@ -1,38 +0,0 @@ -import { Semaphore } from '@core/asyncutil'; - -import { pipelineEncounters } from '@/caches/pipelineEncounters.ts'; -import { Conf } from '@/config.ts'; -import * as pipeline from '@/pipeline.ts'; -import { Storages } from '@/storages.ts'; -import { logi } from '@soapbox/logi'; - -const sem = new Semaphore(1); - -export async function startNotify(): Promise { - const { listen } = await Storages.database(); - const store = await Storages.db(); - - listen('nostr_event', (id) => { - if (pipelineEncounters.has(id)) { - logi({ level: 'debug', ns: 'ditto.notify', id, skipped: true }); - return; - } - - logi({ level: 'debug', ns: 'ditto.notify', id, skipped: false }); - - sem.lock(async () => { - try { - const signal = AbortSignal.timeout(Conf.db.timeouts.default); - - const [event] = await store.query([{ ids: [id], limit: 1 }], { signal }); - - if (event) { - logi({ level: 'debug', ns: 'ditto.event', source: 'notify', id: event.id, kind: event.kind }); - await pipeline.handleEvent(event, { source: 'notify', signal }); - } - } catch { - // Ignore - } - }); - }); -} diff --git a/packages/ditto/startup.ts b/packages/ditto/startup.ts index 0cc2f26a..0372a1d1 100644 --- a/packages/ditto/startup.ts +++ b/packages/ditto/startup.ts @@ -2,16 +2,11 @@ import { Conf } from '@/config.ts'; import { cron } from '@/cron.ts'; import { startFirehose } from '@/firehose.ts'; -import { startNotify } from '@/notify.ts'; if (Conf.firehoseEnabled) { startFirehose(); } -if (Conf.notifyEnabled) { - startNotify(); -} - if (Conf.cronEnabled) { cron(); } diff --git a/packages/ditto/storages.ts b/packages/ditto/storages.ts index 1fe46e83..ff7b2954 100644 --- a/packages/ditto/storages.ts +++ b/packages/ditto/storages.ts @@ -40,7 +40,12 @@ export class Storages { if (!this._db) { this._db = (async () => { const db = await this.database(); - const store = new DittoPgStore({ db, pubkey: Conf.pubkey, timeout: Conf.db.timeouts.default }); + const store = new DittoPgStore({ + db, + pubkey: Conf.pubkey, + timeout: Conf.db.timeouts.default, + notify: Conf.notifyEnabled, + }); await seedZapSplits(store); return store; })(); diff --git a/packages/ditto/storages/DittoPgStore.ts b/packages/ditto/storages/DittoPgStore.ts index 22671185..7bd22d00 100644 --- a/packages/ditto/storages/DittoPgStore.ts +++ b/packages/ditto/storages/DittoPgStore.ts @@ -63,6 +63,8 @@ interface DittoPgStoreOpts { batchSize?: number; /** Max age (in **seconds**) an event can be to be fulfilled to realtime subscribers. */ maxAge?: number; + /** Whether to listen for events from the database with NOTIFY. */ + notify?: boolean; } /** SQL database storage adapter for Nostr events. */ @@ -100,25 +102,29 @@ export class DittoPgStore extends NPostgres { chunkSize: opts.chunkSize, }); - opts.db.listen('nostr_event', async (id) => { - if (this.encounters.has(id)) return; - this.encounters.set(id, true); + if (opts.notify) { + opts.db.listen('nostr_event', async (id) => { + if (this.encounters.has(id)) return; + this.encounters.set(id, true); - const [event] = await this.query([{ ids: [id] }]); + const [event] = await this.query([{ ids: [id] }]); - if (event) { - await this.fulfill(event); - } - }); + if (event) { + await this.fulfill(event); + } + }); + } } /** Insert an event (and its tags) into the database. */ override async event(event: NostrEvent, opts: { signal?: AbortSignal; timeout?: number } = {}): Promise { event = purifyEvent(event); - logi({ level: 'debug', ns: 'ditto.event', source: 'db', id: event.id, kind: event.kind }); + if (this.opts.notify) { + this.encounters.set(event.id, true); + } - this.encounters.set(event.id, true); + logi({ level: 'debug', ns: 'ditto.event', source: 'db', id: event.id, kind: event.kind }); dbEventsCounter.inc({ kind: event.kind }); if (NKinds.ephemeral(event.kind)) {