diff --git a/src/pipeline.ts b/src/pipeline.ts index 9d1a8038..b208e9e2 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -5,6 +5,7 @@ import { LRUCache } from 'lru-cache'; import { z } from 'zod'; import { Conf } from '@/config.ts'; +import { DittoTables } from '@/db/DittoTables.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts'; import { pipelineEventsCounter, policyEventsCounter } from '@/metrics.ts'; import { RelayError } from '@/RelayError.ts'; @@ -14,11 +15,11 @@ import { Storages } from '@/storages.ts'; import { eventAge, parseNip05, Time } from '@/utils.ts'; import { policyWorker } from '@/workers/policy.ts'; import { verifyEventWorker } from '@/workers/verify.ts'; +import { getAmount } from '@/utils/bolt11.ts'; import { nip05Cache } from '@/utils/nip05.ts'; +import { purifyEvent } from '@/utils/purify.ts'; import { updateStats } from '@/utils/stats.ts'; import { getTagSet } from '@/utils/tags.ts'; -import { getAmount } from '@/utils/bolt11.ts'; -import { DittoTables } from '@/db/DittoTables.ts'; const debug = Debug('ditto:pipeline'); @@ -55,7 +56,7 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise { if (NKinds.ephemeral(event.kind)) return; const store = await Storages.db(); - const kysely = await Storages.kysely(); - await updateStats({ event, store, kysely }).catch(debug); - await store.event(event, { signal }); + await store.transaction(async (store, kysely) => { + await updateStats({ event, store, kysely }); + await store.event(event, { signal }); + }); } /** Parse kind 0 metadata and track indexes in the database. */ diff --git a/src/storages.ts b/src/storages.ts index 60a0cebc..cbafd5aa 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -12,7 +12,7 @@ import { seedZapSplits } from '@/utils/zap-split.ts'; export class Storages { private static _db: Promise | undefined; - private static _database: DittoDatabase | undefined; + private static _database: Promise | undefined; private static _admin: Promise | undefined; private static _client: Promise | undefined; private static _pubsub: Promise | undefined; @@ -20,8 +20,11 @@ export class Storages { public static async database(): Promise { if (!this._database) { - this._database = DittoDB.create(Conf.databaseUrl, { poolSize: Conf.pg.poolSize }); - await DittoDB.migrate(this._database.kysely); + this._database = (async () => { + const db = DittoDB.create(Conf.databaseUrl, { poolSize: Conf.pg.poolSize }); + await DittoDB.migrate(db.kysely); + return db; + })(); } return this._database; } @@ -35,7 +38,7 @@ export class Storages { public static async db(): Promise { if (!this._db) { this._db = (async () => { - const { kysely } = await this.database(); + const kysely = await this.kysely(); const store = new EventsDB({ kysely, pubkey: Conf.pubkey, timeout: Conf.db.timeouts.default }); await seedZapSplits(store); return store; diff --git a/src/storages/EventsDB.ts b/src/storages/EventsDB.ts index 72cd9bb3..bedc1cac 100644 --- a/src/storages/EventsDB.ts +++ b/src/storages/EventsDB.ts @@ -1,17 +1,7 @@ // deno-lint-ignore-file require-await -import { NPostgres } from '@nostrify/db'; -import { - NIP50, - NKinds, - NostrEvent, - NostrFilter, - NostrRelayCLOSED, - NostrRelayEOSE, - NostrRelayEVENT, - NSchema as n, - NStore, -} from '@nostrify/nostrify'; +import { NPostgres, NPostgresSchema } from '@nostrify/db'; +import { NIP50, NKinds, NostrEvent, NostrFilter, NSchema as n } from '@nostrify/nostrify'; import { Stickynotes } from '@soapbox/stickynotes'; import { Kysely } from 'kysely'; import { nip27 } from 'nostr-tools'; @@ -41,8 +31,7 @@ interface EventsDBOpts { } /** SQL database storage adapter for Nostr events. */ -class EventsDB implements NStore { - private store: NPostgres; +class EventsDB extends NPostgres { private console = new Stickynotes('ditto:db:events'); /** Conditions for when to index certain tags. */ @@ -63,7 +52,7 @@ class EventsDB implements NStore { }; constructor(private opts: EventsDBOpts) { - this.store = new NPostgres(opts.kysely, { + super(opts.kysely, { indexTags: EventsDB.indexTags, indexSearch: EventsDB.searchText, }); @@ -82,7 +71,7 @@ class EventsDB implements NStore { await this.deleteEventsAdmin(event); try { - await this.store.event(event, { ...opts, timeout: opts.timeout ?? this.opts.timeout }); + await super.event(event, { ...opts, timeout: opts.timeout ?? this.opts.timeout }); } catch (e) { if (e.message === 'Cannot add a deleted event') { throw new RelayError('blocked', 'event deleted by user'); @@ -155,12 +144,9 @@ class EventsDB implements NStore { } } - /** Stream events from the database. */ - req( - filters: NostrFilter[], - opts: { signal?: AbortSignal } = {}, - ): AsyncIterable { - return this.store.req(filters, opts); + protected getFilterQuery(trx: Kysely, filter: NostrFilter) { + const query = super.getFilterQuery(trx, filter); + return query; } /** Get events for filters from the database. */ @@ -185,32 +171,28 @@ class EventsDB implements NStore { } if (opts.signal?.aborted) return Promise.resolve([]); - if (!filters.length) return Promise.resolve([]); this.console.debug('REQ', JSON.stringify(filters)); - return this.store.query(filters, { ...opts, timeout: opts.timeout ?? this.opts.timeout }); + return super.query(filters, { ...opts, timeout: opts.timeout ?? this.opts.timeout }); } /** Delete events based on filters from the database. */ async remove(filters: NostrFilter[], opts: { signal?: AbortSignal; timeout?: number } = {}): Promise { - if (!filters.length) return Promise.resolve(); this.console.debug('DELETE', JSON.stringify(filters)); - - return this.store.remove(filters, { ...opts, timeout: opts.timeout ?? this.opts.timeout }); + return super.remove(filters, { ...opts, timeout: opts.timeout ?? this.opts.timeout }); } /** Get number of events that would be returned by filters. */ async count( filters: NostrFilter[], opts: { signal?: AbortSignal; timeout?: number } = {}, - ): Promise<{ count: number; approximate: boolean }> { + ): Promise<{ count: number; approximate: any }> { if (opts.signal?.aborted) return Promise.reject(abortError()); - if (!filters.length) return Promise.resolve({ count: 0, approximate: false }); this.console.debug('COUNT', JSON.stringify(filters)); - return this.store.count(filters, { ...opts, timeout: opts.timeout ?? this.opts.timeout }); + return super.count(filters, { ...opts, timeout: opts.timeout ?? this.opts.timeout }); } /** Return only the tags that should be indexed. */ @@ -316,6 +298,10 @@ class EventsDB implements NStore { return filters; } + + async transaction(callback: (store: NPostgres, kysely: Kysely) => Promise): Promise { + return super.transaction((store, kysely) => callback(store, kysely as unknown as Kysely)); + } } export { EventsDB };