Remove old startNotify code

This commit is contained in:
Alex Gleason 2025-02-19 16:19:16 -06:00
parent d9a466c0ee
commit bc0830785a
No known key found for this signature in database
GPG key ID: 7211D1F99744FBB7
4 changed files with 22 additions and 54 deletions

View file

@ -1,38 +0,0 @@
import { Semaphore } from '@core/asyncutil';
import { pipelineEncounters } from '@/caches/pipelineEncounters.ts';
import { Conf } from '@/config.ts';
import * as pipeline from '@/pipeline.ts';
import { Storages } from '@/storages.ts';
import { logi } from '@soapbox/logi';
const sem = new Semaphore(1);
export async function startNotify(): Promise<void> {
const { listen } = await Storages.database();
const store = await Storages.db();
listen('nostr_event', (id) => {
if (pipelineEncounters.has(id)) {
logi({ level: 'debug', ns: 'ditto.notify', id, skipped: true });
return;
}
logi({ level: 'debug', ns: 'ditto.notify', id, skipped: false });
sem.lock(async () => {
try {
const signal = AbortSignal.timeout(Conf.db.timeouts.default);
const [event] = await store.query([{ ids: [id], limit: 1 }], { signal });
if (event) {
logi({ level: 'debug', ns: 'ditto.event', source: 'notify', id: event.id, kind: event.kind });
await pipeline.handleEvent(event, { source: 'notify', signal });
}
} catch {
// Ignore
}
});
});
}

View file

@ -2,16 +2,11 @@
import { Conf } from '@/config.ts';
import { cron } from '@/cron.ts';
import { startFirehose } from '@/firehose.ts';
import { startNotify } from '@/notify.ts';
if (Conf.firehoseEnabled) {
startFirehose();
}
if (Conf.notifyEnabled) {
startNotify();
}
if (Conf.cronEnabled) {
cron();
}

View file

@ -40,7 +40,12 @@ export class Storages {
if (!this._db) {
this._db = (async () => {
const db = await this.database();
const store = new DittoPgStore({ db, pubkey: Conf.pubkey, timeout: Conf.db.timeouts.default });
const store = new DittoPgStore({
db,
pubkey: Conf.pubkey,
timeout: Conf.db.timeouts.default,
notify: Conf.notifyEnabled,
});
await seedZapSplits(store);
return store;
})();

View file

@ -63,6 +63,8 @@ interface DittoPgStoreOpts {
batchSize?: number;
/** Max age (in **seconds**) an event can be to be fulfilled to realtime subscribers. */
maxAge?: number;
/** Whether to listen for events from the database with NOTIFY. */
notify?: boolean;
}
/** SQL database storage adapter for Nostr events. */
@ -100,25 +102,29 @@ export class DittoPgStore extends NPostgres {
chunkSize: opts.chunkSize,
});
opts.db.listen('nostr_event', async (id) => {
if (this.encounters.has(id)) return;
this.encounters.set(id, true);
if (opts.notify) {
opts.db.listen('nostr_event', async (id) => {
if (this.encounters.has(id)) return;
this.encounters.set(id, true);
const [event] = await this.query([{ ids: [id] }]);
const [event] = await this.query([{ ids: [id] }]);
if (event) {
await this.fulfill(event);
}
});
if (event) {
await this.fulfill(event);
}
});
}
}
/** Insert an event (and its tags) into the database. */
override async event(event: NostrEvent, opts: { signal?: AbortSignal; timeout?: number } = {}): Promise<void> {
event = purifyEvent(event);
logi({ level: 'debug', ns: 'ditto.event', source: 'db', id: event.id, kind: event.kind });
if (this.opts.notify) {
this.encounters.set(event.id, true);
}
this.encounters.set(event.id, true);
logi({ level: 'debug', ns: 'ditto.event', source: 'db', id: event.id, kind: event.kind });
dbEventsCounter.inc({ kind: event.kind });
if (NKinds.ephemeral(event.kind)) {