From 55ee8eaf74ac0034f498ea88a54a734bbaa543f1 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 12 Sep 2024 11:09:54 -0500 Subject: [PATCH 1/8] Fix stat inflation --- src/pipeline.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/pipeline.ts b/src/pipeline.ts index 88e5f29b..4153cd41 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -119,8 +119,10 @@ async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise { + await updateStats({ event, store, kysely }); + await store.event(event, { signal }); + }); } /** Parse kind 0 metadata and track indexes in the database. */ From d67f2a27ead4bac4b24990a2a7336fb26138bd30 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 12 Sep 2024 11:30:57 -0500 Subject: [PATCH 2/8] stats: use the NPostgres transaction method to avoid transactions within transactions --- src/pipeline.ts | 3 +-- src/storages/EventsDB.ts | 4 ++++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/pipeline.ts b/src/pipeline.ts index 4153cd41..afc7acdf 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -117,9 +117,8 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise { if (NKinds.ephemeral(event.kind)) return; const store = await Storages.db(); - const kysely = await Storages.kysely(); - await kysely.transaction().execute(async (kysely) => { + await store.transaction(async (store, kysely) => { await updateStats({ event, store, kysely }); await store.event(event, { signal }); }); diff --git a/src/storages/EventsDB.ts b/src/storages/EventsDB.ts index 72cd9bb3..148a30aa 100644 --- a/src/storages/EventsDB.ts +++ b/src/storages/EventsDB.ts @@ -316,6 +316,10 @@ class EventsDB implements NStore { return filters; } + + async transaction(callback: (store: NPostgres, kysely: Kysely) => Promise): Promise { + return this.store.transaction((store, kysely) => callback(store, kysely as unknown as Kysely)); + } } export { EventsDB }; From fc912f185e9da4a943320f3ca00cbeb6c4dcb9fa Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 12 Sep 2024 13:03:23 -0500 Subject: [PATCH 3/8] Gracefully start and exit the database --- src/DittoExit.ts | 37 ++++++++++++++++++++++++++++++++ src/db/DittoDatabase.ts | 1 + src/db/adapters/DittoPglite.ts | 5 ++++- src/db/adapters/DittoPostgres.ts | 1 + src/server.ts | 13 ++++++++++- src/storages.ts | 23 ++++++++++++++++---- 6 files changed, 74 insertions(+), 6 deletions(-) create mode 100644 src/DittoExit.ts diff --git a/src/DittoExit.ts b/src/DittoExit.ts new file mode 100644 index 00000000..36201fc7 --- /dev/null +++ b/src/DittoExit.ts @@ -0,0 +1,37 @@ +import { Stickynotes } from '@soapbox/stickynotes'; + +/** + * Add cleanup tasks to this module, + * then they will automatically be called (and the program exited) after SIGINT. + */ +export class DittoExit { + private static tasks: Array<() => Promise> = []; + private static console = new Stickynotes('ditto:exit'); + + static { + Deno.addSignalListener('SIGINT', () => this.finish('SIGINT')); + Deno.addSignalListener('SIGTERM', () => this.finish('SIGTERM')); + Deno.addSignalListener('SIGHUP', () => this.finish('SIGHUP')); + Deno.addSignalListener('SIGQUIT', () => this.finish('SIGQUIT')); + Deno.addSignalListener('SIGABRT', () => this.finish('SIGABRT')); + } + + static add(task: () => Promise): void { + this.tasks.push(task); + this.console.debug(`Added cleanup task #${this.tasks.length}`); + } + + private static async cleanup(): Promise { + this.console.debug(`Running ${this.tasks.length} cleanup tasks...`); + await Promise.allSettled( + this.tasks.map((task) => task()), + ); + } + + private static async finish(signal: Deno.Signal): Promise { + this.console.debug(signal); + await this.cleanup(); + this.console.debug('Exiting gracefully.'); + Deno.exit(0); + } +} diff --git a/src/db/DittoDatabase.ts b/src/db/DittoDatabase.ts index 530d9391..93c71a90 100644 --- a/src/db/DittoDatabase.ts +++ b/src/db/DittoDatabase.ts @@ -6,6 +6,7 @@ export interface DittoDatabase { readonly kysely: Kysely; readonly poolSize: number; readonly availableConnections: number; + readonly waitReady: Promise; } export interface DittoDatabaseOpts { diff --git a/src/db/adapters/DittoPglite.ts b/src/db/adapters/DittoPglite.ts index 4ec7d8a5..9b425c4b 100644 --- a/src/db/adapters/DittoPglite.ts +++ b/src/db/adapters/DittoPglite.ts @@ -8,9 +8,11 @@ import { KyselyLogger } from '@/db/KyselyLogger.ts'; export class DittoPglite { static create(databaseUrl: string): DittoDatabase { + const pglite = new PGlite(databaseUrl); + const kysely = new Kysely({ dialect: new PgliteDialect({ - database: new PGlite(databaseUrl), + database: pglite, }), log: KyselyLogger, }); @@ -19,6 +21,7 @@ export class DittoPglite { kysely, poolSize: 1, availableConnections: 1, + waitReady: pglite.waitReady, }; } } diff --git a/src/db/adapters/DittoPostgres.ts b/src/db/adapters/DittoPostgres.ts index f1a5bcc9..0300c3e0 100644 --- a/src/db/adapters/DittoPostgres.ts +++ b/src/db/adapters/DittoPostgres.ts @@ -48,6 +48,7 @@ export class DittoPostgres { get availableConnections() { return pg.connections.idle; }, + waitReady: Promise.resolve(), }; } } diff --git a/src/server.ts b/src/server.ts index f7a33dc0..bfa240c9 100644 --- a/src/server.ts +++ b/src/server.ts @@ -5,5 +5,16 @@ import '@/sentry.ts'; import '@/nostr-wasm.ts'; import app from '@/app.ts'; import { Conf } from '@/config.ts'; +import { DittoExit } from '@/DittoExit.ts'; -Deno.serve({ port: Conf.port }, app.fetch); +const ac = new AbortController(); +// deno-lint-ignore require-await +DittoExit.add(async () => ac.abort()); + +Deno.serve( + { + port: Conf.port, + signal: ac.signal, + }, + app.fetch, +); diff --git a/src/storages.ts b/src/storages.ts index 60a0cebc..fb66c55e 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -2,6 +2,7 @@ import { Conf } from '@/config.ts'; import { DittoDatabase } from '@/db/DittoDatabase.ts'; import { DittoDB } from '@/db/DittoDB.ts'; +import { DittoExit } from '@/DittoExit.ts'; import { AdminStore } from '@/storages/AdminStore.ts'; import { EventsDB } from '@/storages/EventsDB.ts'; import { SearchStore } from '@/storages/search-store.ts'; @@ -10,9 +11,11 @@ import { NPool, NRelay1 } from '@nostrify/nostrify'; import { getRelays } from '@/utils/outbox.ts'; import { seedZapSplits } from '@/utils/zap-split.ts'; +DittoExit.add(() => Storages.close()); + export class Storages { private static _db: Promise | undefined; - private static _database: DittoDatabase | undefined; + private static _database: Promise | undefined; private static _admin: Promise | undefined; private static _client: Promise | undefined; private static _pubsub: Promise | undefined; @@ -20,8 +23,12 @@ export class Storages { public static async database(): Promise { if (!this._database) { - this._database = DittoDB.create(Conf.databaseUrl, { poolSize: Conf.pg.poolSize }); - await DittoDB.migrate(this._database.kysely); + this._database = (async () => { + const db = DittoDB.create(Conf.databaseUrl, { poolSize: Conf.pg.poolSize }); + await db.waitReady; + await DittoDB.migrate(db.kysely); + return db; + })(); } return this._database; } @@ -35,7 +42,7 @@ export class Storages { public static async db(): Promise { if (!this._db) { this._db = (async () => { - const { kysely } = await this.database(); + const kysely = await this.kysely(); const store = new EventsDB({ kysely, pubkey: Conf.pubkey, timeout: Conf.db.timeouts.default }); await seedZapSplits(store); return store; @@ -118,4 +125,12 @@ export class Storages { } return this._search; } + + /** Close the database connection, if one has been opened. */ + public static async close(): Promise { + if (this._database) { + const { kysely } = await this._database; + await kysely.destroy(); + } + } } From c50c63f954c0fb6cb72ad5e4f53dd0d79d580b86 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 12 Sep 2024 13:10:36 -0500 Subject: [PATCH 4/8] pipeline: purifyEvent before passing it to storage --- src/pipeline.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/pipeline.ts b/src/pipeline.ts index afc7acdf..85d27964 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -5,6 +5,7 @@ import { LRUCache } from 'lru-cache'; import { z } from 'zod'; import { Conf } from '@/config.ts'; +import { DittoTables } from '@/db/DittoTables.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts'; import { pipelineEventsCounter, policyEventsCounter } from '@/metrics.ts'; import { RelayError } from '@/RelayError.ts'; @@ -14,11 +15,11 @@ import { Storages } from '@/storages.ts'; import { eventAge, parseNip05, Time } from '@/utils.ts'; import { policyWorker } from '@/workers/policy.ts'; import { verifyEventWorker } from '@/workers/verify.ts'; +import { getAmount } from '@/utils/bolt11.ts'; import { nip05Cache } from '@/utils/nip05.ts'; +import { purifyEvent } from '@/utils/purify.ts'; import { updateStats } from '@/utils/stats.ts'; import { getTagSet } from '@/utils/tags.ts'; -import { getAmount } from '@/utils/bolt11.ts'; -import { DittoTables } from '@/db/DittoTables.ts'; const debug = Debug('ditto:pipeline'); @@ -55,7 +56,7 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise Date: Thu, 12 Sep 2024 13:17:21 -0500 Subject: [PATCH 5/8] waitReady is not actually needed --- src/db/DittoDatabase.ts | 1 - src/db/adapters/DittoPglite.ts | 1 - src/db/adapters/DittoPostgres.ts | 1 - src/storages.ts | 1 - 4 files changed, 4 deletions(-) diff --git a/src/db/DittoDatabase.ts b/src/db/DittoDatabase.ts index 93c71a90..530d9391 100644 --- a/src/db/DittoDatabase.ts +++ b/src/db/DittoDatabase.ts @@ -6,7 +6,6 @@ export interface DittoDatabase { readonly kysely: Kysely; readonly poolSize: number; readonly availableConnections: number; - readonly waitReady: Promise; } export interface DittoDatabaseOpts { diff --git a/src/db/adapters/DittoPglite.ts b/src/db/adapters/DittoPglite.ts index 9b425c4b..3423cb31 100644 --- a/src/db/adapters/DittoPglite.ts +++ b/src/db/adapters/DittoPglite.ts @@ -21,7 +21,6 @@ export class DittoPglite { kysely, poolSize: 1, availableConnections: 1, - waitReady: pglite.waitReady, }; } } diff --git a/src/db/adapters/DittoPostgres.ts b/src/db/adapters/DittoPostgres.ts index 0300c3e0..f1a5bcc9 100644 --- a/src/db/adapters/DittoPostgres.ts +++ b/src/db/adapters/DittoPostgres.ts @@ -48,7 +48,6 @@ export class DittoPostgres { get availableConnections() { return pg.connections.idle; }, - waitReady: Promise.resolve(), }; } } diff --git a/src/storages.ts b/src/storages.ts index fb66c55e..15920444 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -25,7 +25,6 @@ export class Storages { if (!this._database) { this._database = (async () => { const db = DittoDB.create(Conf.databaseUrl, { poolSize: Conf.pg.poolSize }); - await db.waitReady; await DittoDB.migrate(db.kysely); return db; })(); From 83167623705848acd46c83b910a7f606714a6c53 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 12 Sep 2024 13:37:21 -0500 Subject: [PATCH 6/8] Remove this DittoExit stuff (since I'm not convinced it's needed) --- src/DittoExit.ts | 37 ------------------------------------- src/server.ts | 13 +------------ src/storages.ts | 3 --- 3 files changed, 1 insertion(+), 52 deletions(-) delete mode 100644 src/DittoExit.ts diff --git a/src/DittoExit.ts b/src/DittoExit.ts deleted file mode 100644 index 36201fc7..00000000 --- a/src/DittoExit.ts +++ /dev/null @@ -1,37 +0,0 @@ -import { Stickynotes } from '@soapbox/stickynotes'; - -/** - * Add cleanup tasks to this module, - * then they will automatically be called (and the program exited) after SIGINT. - */ -export class DittoExit { - private static tasks: Array<() => Promise> = []; - private static console = new Stickynotes('ditto:exit'); - - static { - Deno.addSignalListener('SIGINT', () => this.finish('SIGINT')); - Deno.addSignalListener('SIGTERM', () => this.finish('SIGTERM')); - Deno.addSignalListener('SIGHUP', () => this.finish('SIGHUP')); - Deno.addSignalListener('SIGQUIT', () => this.finish('SIGQUIT')); - Deno.addSignalListener('SIGABRT', () => this.finish('SIGABRT')); - } - - static add(task: () => Promise): void { - this.tasks.push(task); - this.console.debug(`Added cleanup task #${this.tasks.length}`); - } - - private static async cleanup(): Promise { - this.console.debug(`Running ${this.tasks.length} cleanup tasks...`); - await Promise.allSettled( - this.tasks.map((task) => task()), - ); - } - - private static async finish(signal: Deno.Signal): Promise { - this.console.debug(signal); - await this.cleanup(); - this.console.debug('Exiting gracefully.'); - Deno.exit(0); - } -} diff --git a/src/server.ts b/src/server.ts index bfa240c9..f7a33dc0 100644 --- a/src/server.ts +++ b/src/server.ts @@ -5,16 +5,5 @@ import '@/sentry.ts'; import '@/nostr-wasm.ts'; import app from '@/app.ts'; import { Conf } from '@/config.ts'; -import { DittoExit } from '@/DittoExit.ts'; -const ac = new AbortController(); -// deno-lint-ignore require-await -DittoExit.add(async () => ac.abort()); - -Deno.serve( - { - port: Conf.port, - signal: ac.signal, - }, - app.fetch, -); +Deno.serve({ port: Conf.port }, app.fetch); diff --git a/src/storages.ts b/src/storages.ts index 15920444..05e2c383 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -2,7 +2,6 @@ import { Conf } from '@/config.ts'; import { DittoDatabase } from '@/db/DittoDatabase.ts'; import { DittoDB } from '@/db/DittoDB.ts'; -import { DittoExit } from '@/DittoExit.ts'; import { AdminStore } from '@/storages/AdminStore.ts'; import { EventsDB } from '@/storages/EventsDB.ts'; import { SearchStore } from '@/storages/search-store.ts'; @@ -11,8 +10,6 @@ import { NPool, NRelay1 } from '@nostrify/nostrify'; import { getRelays } from '@/utils/outbox.ts'; import { seedZapSplits } from '@/utils/zap-split.ts'; -DittoExit.add(() => Storages.close()); - export class Storages { private static _db: Promise | undefined; private static _database: Promise | undefined; From a0fd702e09cc725c29d37133c098112f8bf921f4 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 12 Sep 2024 13:38:00 -0500 Subject: [PATCH 7/8] Revert DittoPglite --- src/db/adapters/DittoPglite.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/db/adapters/DittoPglite.ts b/src/db/adapters/DittoPglite.ts index 3423cb31..4ec7d8a5 100644 --- a/src/db/adapters/DittoPglite.ts +++ b/src/db/adapters/DittoPglite.ts @@ -8,11 +8,9 @@ import { KyselyLogger } from '@/db/KyselyLogger.ts'; export class DittoPglite { static create(databaseUrl: string): DittoDatabase { - const pglite = new PGlite(databaseUrl); - const kysely = new Kysely({ dialect: new PgliteDialect({ - database: pglite, + database: new PGlite(databaseUrl), }), log: KyselyLogger, }); From 1732b690220995bc237bbcf4ed84d6e053a4c5a1 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 12 Sep 2024 13:39:02 -0500 Subject: [PATCH 8/8] Remove unused Storages.close method --- src/storages.ts | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/storages.ts b/src/storages.ts index 05e2c383..cbafd5aa 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -121,12 +121,4 @@ export class Storages { } return this._search; } - - /** Close the database connection, if one has been opened. */ - public static async close(): Promise { - if (this._database) { - const { kysely } = await this._database; - await kysely.destroy(); - } - } }