diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index a64b111d..48c5b253 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -35,10 +35,10 @@ test: postgres: stage: test - script: deno task db:migrate && deno task test + script: sleep 1 && deno task test services: - postgres:16 variables: DITTO_NSEC: nsec1zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zygs4rm7hz - DATABASE_URL: postgres://postgres:postgres@postgres:5432/postgres + TEST_DATABASE_URL: postgres://postgres:postgres@postgres:5432/postgres POSTGRES_HOST_AUTH_METHOD: trust diff --git a/scripts/admin-event.ts b/scripts/admin-event.ts index 71c957f4..313aa051 100644 --- a/scripts/admin-event.ts +++ b/scripts/admin-event.ts @@ -9,8 +9,8 @@ import { nostrNow } from '@/utils.ts'; const signer = new AdminSigner(); -const db = await DittoDB.getInstance(); -const eventsDB = new EventsDB(db); +const { kysely } = await DittoDB.getInstance(); +const eventsDB = new EventsDB(kysely); const readable = Deno.stdin.readable .pipeThrough(new TextDecoderStream()) diff --git a/scripts/admin-role.ts b/scripts/admin-role.ts index 3f3c53f2..99986817 100644 --- a/scripts/admin-role.ts +++ b/scripts/admin-role.ts @@ -6,8 +6,8 @@ import { AdminSigner } from '@/signers/AdminSigner.ts'; import { EventsDB } from '@/storages/EventsDB.ts'; import { nostrNow } from '@/utils.ts'; -const db = await DittoDB.getInstance(); -const eventsDB = new EventsDB(db); +const { kysely } = await DittoDB.getInstance(); +const eventsDB = new EventsDB(kysely); const [pubkeyOrNpub, role] = Deno.args; const pubkey = pubkeyOrNpub.startsWith('npub1') ? nip19.decode(pubkeyOrNpub as `npub1${string}`).data : pubkeyOrNpub; diff --git a/scripts/db-migrate.ts b/scripts/db-migrate.ts index ab0b9747..0e1d694d 100644 --- a/scripts/db-migrate.ts +++ b/scripts/db-migrate.ts @@ -1,11 +1,4 @@ -import { Conf } from '@/config.ts'; import { DittoDB } from '@/db/DittoDB.ts'; -import { sleep } from '@/test.ts'; - -if (Deno.env.get('CI') && Conf.db.dialect === 'postgres') { - console.info('Waiting 1 second for postgres to start...'); - await sleep(1_000); -} // This migrates kysely internally. const { kysely } = await DittoDB.getInstance(); diff --git a/scripts/nostr-pull.ts b/scripts/nostr-pull.ts index 0556b64a..4b9d51db 100644 --- a/scripts/nostr-pull.ts +++ b/scripts/nostr-pull.ts @@ -9,8 +9,8 @@ import { nip19 } from 'nostr-tools'; import { DittoDB } from '@/db/DittoDB.ts'; import { EventsDB } from '@/storages/EventsDB.ts'; -const db = await DittoDB.getInstance(); -const eventsDB = new EventsDB(db); +const { kysely } = await DittoDB.getInstance(); +const eventsDB = new EventsDB(kysely); interface ImportEventsOpts { profilesOnly: boolean; diff --git a/scripts/stats-recompute.ts b/scripts/stats-recompute.ts index 107a3167..7d6f721f 100644 --- a/scripts/stats-recompute.ts +++ b/scripts/stats-recompute.ts @@ -18,6 +18,6 @@ try { } const store = await Storages.db(); -const kysely = await DittoDB.getInstance(); +const { kysely } = await DittoDB.getInstance(); await refreshAuthorStats({ pubkey, kysely, store }); diff --git a/src/controllers/api/oauth.ts b/src/controllers/api/oauth.ts index 01f80bf1..a736f5ca 100644 --- a/src/controllers/api/oauth.ts +++ b/src/controllers/api/oauth.ts @@ -82,7 +82,7 @@ const createTokenController: AppController = async (c) => { async function getToken( { pubkey, secret, relays = [] }: { pubkey: string; secret?: string; relays?: string[] }, ): Promise<`token1${string}`> { - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); const token = generateToken(); const serverSeckey = generateSecretKey(); diff --git a/src/controllers/api/statuses.ts b/src/controllers/api/statuses.ts index 4236db52..05b5022c 100644 --- a/src/controllers/api/statuses.ts +++ b/src/controllers/api/statuses.ts @@ -578,7 +578,7 @@ const zappedByController: AppController = async (c) => { const id = c.req.param('id'); const params = c.get('listPagination'); const store = await Storages.db(); - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); const zaps = await kysely.selectFrom('event_zaps') .selectAll() diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index f70a6b06..047aa573 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -222,7 +222,7 @@ async function topicToFilter( async function getTokenPubkey(token: string): Promise { if (token.startsWith('token1')) { - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); const { user_pubkey } = await kysely .selectFrom('nip46_tokens') diff --git a/src/controllers/metrics.ts b/src/controllers/metrics.ts index e85294c0..e25522ff 100644 --- a/src/controllers/metrics.ts +++ b/src/controllers/metrics.ts @@ -6,9 +6,11 @@ import { dbAvailableConnectionsGauge, dbPoolSizeGauge } from '@/metrics.ts'; /** Prometheus/OpenMetrics controller. */ export const metricsController: AppController = async (c) => { + const db = await DittoDB.getInstance(); + // Update some metrics at request time. - dbPoolSizeGauge.set(DittoDB.poolSize); - dbAvailableConnectionsGauge.set(DittoDB.availableConnections); + dbPoolSizeGauge.set(db.poolSize); + dbAvailableConnectionsGauge.set(db.availableConnections); const metrics = await register.metrics(); diff --git a/src/db/DittoDB.ts b/src/db/DittoDB.ts index 03638a98..63d2bfb1 100644 --- a/src/db/DittoDB.ts +++ b/src/db/DittoDB.ts @@ -6,57 +6,35 @@ import { FileMigrationProvider, Kysely, Migrator } from 'kysely'; import { Conf } from '@/config.ts'; import { DittoPglite } from '@/db/adapters/DittoPglite.ts'; import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts'; +import { DittoDatabase, DittoDatabaseOpts } from '@/db/DittoDatabase.ts'; import { DittoTables } from '@/db/DittoTables.ts'; export class DittoDB { - private static kysely: Promise> | undefined; + private static db: DittoDatabase | undefined; - static getInstance(): Promise> { - if (!this.kysely) { - this.kysely = this._getInstance(); + /** Create (and migrate) the database if it isn't been already, or return the existing connection. */ + static async getInstance(): Promise { + if (!this.db) { + this.db = this.create(Conf.databaseUrl, { poolSize: Conf.pg.poolSize }); + await this.migrate(this.db.kysely); } - return this.kysely; + return this.db; } - static async _getInstance(): Promise> { - const { protocol } = new URL(Conf.databaseUrl); - - let kysely: Kysely; + /** Open a new database connection. */ + static create(databaseUrl: string, opts?: DittoDatabaseOpts): DittoDatabase { + const { protocol } = new URL(databaseUrl); switch (protocol) { case 'file:': case 'memory:': - kysely = await DittoPglite.getInstance(); - break; + return DittoPglite.create(databaseUrl); case 'postgres:': case 'postgresql:': - kysely = await DittoPostgres.getInstance(); - break; + return DittoPostgres.create(databaseUrl, opts); default: throw new Error('Unsupported database URL.'); } - - await this.migrate(kysely); - - return kysely; - } - - static get poolSize(): number { - const { protocol } = new URL(Conf.databaseUrl); - - if (['postgres:', 'postgresql:'].includes(protocol)) { - return DittoPostgres.poolSize; - } - return 1; - } - - static get availableConnections(): number { - const { protocol } = new URL(Conf.databaseUrl); - - if (['postgres:', 'postgresql:'].includes(protocol)) { - return DittoPostgres.availableConnections; - } - return 1; } /** Migrate the database to the latest version. */ diff --git a/src/db/DittoDatabase.ts b/src/db/DittoDatabase.ts new file mode 100644 index 00000000..530d9391 --- /dev/null +++ b/src/db/DittoDatabase.ts @@ -0,0 +1,13 @@ +import { Kysely } from 'kysely'; + +import { DittoTables } from '@/db/DittoTables.ts'; + +export interface DittoDatabase { + readonly kysely: Kysely; + readonly poolSize: number; + readonly availableConnections: number; +} + +export interface DittoDatabaseOpts { + poolSize?: number; +} diff --git a/src/db/adapters/DittoPglite.ts b/src/db/adapters/DittoPglite.ts index 80ea6087..4ec7d8a5 100644 --- a/src/db/adapters/DittoPglite.ts +++ b/src/db/adapters/DittoPglite.ts @@ -2,32 +2,23 @@ import { PGlite } from '@electric-sql/pglite'; import { PgliteDialect } from '@soapbox/kysely-pglite'; import { Kysely } from 'kysely'; -import { Conf } from '@/config.ts'; +import { DittoDatabase } from '@/db/DittoDatabase.ts'; import { DittoTables } from '@/db/DittoTables.ts'; import { KyselyLogger } from '@/db/KyselyLogger.ts'; export class DittoPglite { - static db: Kysely | undefined; + static create(databaseUrl: string): DittoDatabase { + const kysely = new Kysely({ + dialect: new PgliteDialect({ + database: new PGlite(databaseUrl), + }), + log: KyselyLogger, + }); - // deno-lint-ignore require-await - static async getInstance(): Promise> { - if (!this.db) { - this.db = new Kysely({ - dialect: new PgliteDialect({ - database: new PGlite(Conf.databaseUrl), - }), - log: KyselyLogger, - }) as Kysely; - } - - return this.db; - } - - static get poolSize() { - return 1; - } - - static get availableConnections(): number { - return 1; + return { + kysely, + poolSize: 1, + availableConnections: 1, + }; } } diff --git a/src/db/adapters/DittoPostgres.ts b/src/db/adapters/DittoPostgres.ts index b0a6c93a..f1a5bcc9 100644 --- a/src/db/adapters/DittoPostgres.ts +++ b/src/db/adapters/DittoPostgres.ts @@ -12,51 +12,43 @@ import { import { PostgresJSDialectConfig, PostgresJSDriver } from 'kysely-postgres-js'; import postgres from 'postgres'; -import { Conf } from '@/config.ts'; +import { DittoDatabase, DittoDatabaseOpts } from '@/db/DittoDatabase.ts'; import { DittoTables } from '@/db/DittoTables.ts'; import { KyselyLogger } from '@/db/KyselyLogger.ts'; export class DittoPostgres { - static kysely: Kysely | undefined; - static postgres?: postgres.Sql; + static create(databaseUrl: string, opts?: DittoDatabaseOpts): DittoDatabase { + const pg = postgres(databaseUrl, { max: opts?.poolSize }); - // deno-lint-ignore require-await - static async getInstance(): Promise> { - if (!this.postgres) { - this.postgres = postgres(Conf.databaseUrl, { max: Conf.pg.poolSize }); - } - - if (!this.kysely) { - this.kysely = new Kysely({ - dialect: { - createAdapter() { - return new PostgresAdapter(); - }, - createDriver() { - return new PostgresJSDriver({ - postgres: DittoPostgres.postgres as unknown as PostgresJSDialectConfig['postgres'], - }); - }, - createIntrospector(db) { - return new PostgresIntrospector(db); - }, - createQueryCompiler() { - return new DittoPostgresQueryCompiler(); - }, + const kysely = new Kysely({ + dialect: { + createAdapter() { + return new PostgresAdapter(); }, - log: KyselyLogger, - }); - } + createDriver() { + return new PostgresJSDriver({ + postgres: pg as unknown as PostgresJSDialectConfig['postgres'], + }); + }, + createIntrospector(db) { + return new PostgresIntrospector(db); + }, + createQueryCompiler() { + return new DittoPostgresQueryCompiler(); + }, + }, + log: KyselyLogger, + }); - return this.kysely; - } - - static get poolSize() { - return this.postgres?.connections.open ?? 0; - } - - static get availableConnections(): number { - return this.postgres?.connections.idle ?? 0; + return { + kysely, + get poolSize() { + return pg.connections.open; + }, + get availableConnections() { + return pg.connections.idle; + }, + }; } } diff --git a/src/db/migrations/020_pgfts.ts b/src/db/migrations/020_pgfts.ts index 835de117..26e320ec 100644 --- a/src/db/migrations/020_pgfts.ts +++ b/src/db/migrations/020_pgfts.ts @@ -1,19 +1,13 @@ import { Kysely, sql } from 'kysely'; -import { Conf } from '@/config.ts'; - export async function up(db: Kysely): Promise { - if (Conf.db.dialect === 'postgres') { - await db.schema.createTable('nostr_pgfts') - .ifNotExists() - .addColumn('event_id', 'text', (c) => c.primaryKey().references('nostr_events.id').onDelete('cascade')) - .addColumn('search_vec', sql`tsvector`, (c) => c.notNull()) - .execute(); - } + await db.schema.createTable('nostr_pgfts') + .ifNotExists() + .addColumn('event_id', 'text', (c) => c.primaryKey().references('nostr_events.id').onDelete('cascade')) + .addColumn('search_vec', sql`tsvector`, (c) => c.notNull()) + .execute(); } export async function down(db: Kysely): Promise { - if (Conf.db.dialect === 'postgres') { - await db.schema.dropTable('nostr_pgfts').ifExists().execute(); - } + await db.schema.dropTable('nostr_pgfts').ifExists().execute(); } diff --git a/src/db/migrations/021_pgfts_index.ts b/src/db/migrations/021_pgfts_index.ts index 4b834995..7ad24546 100644 --- a/src/db/migrations/021_pgfts_index.ts +++ b/src/db/migrations/021_pgfts_index.ts @@ -1,21 +1,15 @@ import { Kysely } from 'kysely'; -import { Conf } from '@/config.ts'; - export async function up(db: Kysely): Promise { - if (Conf.db.dialect === 'postgres') { - await db.schema - .createIndex('nostr_pgfts_gin_search_vec') - .ifNotExists() - .on('nostr_pgfts') - .using('gin') - .column('search_vec') - .execute(); - } + await db.schema + .createIndex('nostr_pgfts_gin_search_vec') + .ifNotExists() + .on('nostr_pgfts') + .using('gin') + .column('search_vec') + .execute(); } export async function down(db: Kysely): Promise { - if (Conf.db.dialect === 'postgres') { - await db.schema.dropIndex('nostr_pgfts_gin_search_vec').ifExists().execute(); - } + await db.schema.dropIndex('nostr_pgfts_gin_search_vec').ifExists().execute(); } diff --git a/src/db/migrations/030_pg_events_jsonb.ts b/src/db/migrations/030_pg_events_jsonb.ts index 7bfc6c17..dcd6ad85 100644 --- a/src/db/migrations/030_pg_events_jsonb.ts +++ b/src/db/migrations/030_pg_events_jsonb.ts @@ -1,10 +1,6 @@ import { Kysely, sql } from 'kysely'; -import { Conf } from '@/config.ts'; - export async function up(db: Kysely): Promise { - if (Conf.db.dialect !== 'postgres') return; - // Create new table and indexes. await db.schema .createTable('nostr_events_new') diff --git a/src/middleware/signerMiddleware.ts b/src/middleware/signerMiddleware.ts index 89a494c4..60826db9 100644 --- a/src/middleware/signerMiddleware.ts +++ b/src/middleware/signerMiddleware.ts @@ -20,7 +20,7 @@ export const signerMiddleware: AppMiddleware = async (c, next) => { if (bech32.startsWith('token1')) { try { - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); const { user_pubkey, server_seckey, relays } = await kysely .selectFrom('nip46_tokens') diff --git a/src/pipeline.test.ts b/src/pipeline.test.ts index 2af2b8c3..76b1fe51 100644 --- a/src/pipeline.test.ts +++ b/src/pipeline.test.ts @@ -1,7 +1,7 @@ import { assertEquals } from '@std/assert'; import { generateSecretKey } from 'nostr-tools'; -import { createTestDB, genEvent, getTestDB } from '@/test.ts'; +import { createTestDB, genEvent } from '@/test.ts'; import { handleZaps } from '@/pipeline.ts'; Deno.test('store one zap receipt in nostr_events; convert it into event_zaps table format and store it', async () => { @@ -58,7 +58,7 @@ Deno.test('store one zap receipt in nostr_events; convert it into event_zaps tab // If no error happens = ok Deno.test('zap receipt does not have a "description" tag', async () => { - await using db = await getTestDB(); + await using db = await createTestDB(); const kysely = db.kysely; const sk = generateSecretKey(); @@ -71,7 +71,7 @@ Deno.test('zap receipt does not have a "description" tag', async () => { }); Deno.test('zap receipt does not have a zap request stringified value in the "description" tag', async () => { - await using db = await getTestDB(); + await using db = await createTestDB(); const kysely = db.kysely; const sk = generateSecretKey(); @@ -84,7 +84,7 @@ Deno.test('zap receipt does not have a zap request stringified value in the "des }); Deno.test('zap receipt does not have a "bolt11" tag', async () => { - await using db = await getTestDB(); + await using db = await createTestDB(); const kysely = db.kysely; const sk = generateSecretKey(); @@ -103,7 +103,7 @@ Deno.test('zap receipt does not have a "bolt11" tag', async () => { }); Deno.test('zap request inside zap receipt does not have an "e" tag', async () => { - await using db = await getTestDB(); + await using db = await createTestDB(); const kysely = db.kysely; const sk = generateSecretKey(); diff --git a/src/pipeline.ts b/src/pipeline.ts index ceb370ca..dd59cb8d 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -53,7 +53,7 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise { async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise { await hydrateEvents({ events: [event], store: await Storages.db(), signal }); - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); const domain = await kysely .selectFrom('pubkey_domains') .select('domain') @@ -118,7 +118,7 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise { if (NKinds.ephemeral(event.kind)) return; const store = await Storages.db(); - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); await updateStats({ event, store, kysely }).catch(debug); await store.event(event, { signal }); @@ -146,7 +146,7 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise { if (!this._db) { this._db = (async () => { - const db = await DittoDB.getInstance(); - const store = new EventsDB(db); + const { kysely } = await DittoDB.getInstance(); + const store = new EventsDB(kysely); await seedZapSplits(store); return store; })(); diff --git a/src/storages/hydrate.ts b/src/storages/hydrate.ts index 2294f880..19ba0db4 100644 --- a/src/storages/hydrate.ts +++ b/src/storages/hydrate.ts @@ -18,7 +18,7 @@ interface HydrateOpts { /** Hydrate events using the provided storage. */ async function hydrateEvents(opts: HydrateOpts): Promise { - const { events, store, signal, kysely = await DittoDB.getInstance() } = opts; + const { events, store, signal, kysely = (await DittoDB.getInstance()).kysely } = opts; if (!events.length) { return events; diff --git a/src/test.ts b/src/test.ts index 3f144eb6..444a1b1c 100644 --- a/src/test.ts +++ b/src/test.ts @@ -1,16 +1,9 @@ -import { PGlite } from '@electric-sql/pglite'; import { NostrEvent } from '@nostrify/nostrify'; -import { PgliteDialect } from '@soapbox/kysely-pglite'; import { finalizeEvent, generateSecretKey } from 'nostr-tools'; -import { Kysely } from 'kysely'; -import { PostgresJSDialect, PostgresJSDialectConfig } from 'kysely-postgres-js'; -import postgres from 'postgres'; import { Conf } from '@/config.ts'; import { DittoDB } from '@/db/DittoDB.ts'; -import { DittoTables } from '@/db/DittoTables.ts'; import { purifyEvent } from '@/storages/hydrate.ts'; -import { KyselyLogger } from '@/db/KyselyLogger.ts'; import { EventsDB } from '@/storages/EventsDB.ts'; /** Import an event fixture by name in tests. */ @@ -41,31 +34,7 @@ export function genEvent(t: Partial = {}, sk: Uint8Array = generateS /** Create an database for testing. */ export const createTestDB = async (databaseUrl = Conf.testDatabaseUrl) => { const { protocol } = new URL(databaseUrl); - - const kysely: Kysely = (() => { - switch (protocol) { - case 'postgres:': - case 'postgresql:': - return new Kysely({ - // @ts-ignore Kysely version mismatch. - dialect: new PostgresJSDialect({ - postgres: postgres(databaseUrl, { - max: Conf.pg.poolSize, - }) as unknown as PostgresJSDialectConfig['postgres'], - }), - log: KyselyLogger, - }); - case 'file:': - case 'memory:': - return new Kysely({ - dialect: new PgliteDialect({ - database: new PGlite(databaseUrl), - }), - }); - default: - throw new Error(`Unsupported database URL protocol: ${protocol}`); - } - })(); + const { kysely } = DittoDB.create(databaseUrl, { poolSize: 1 }); await DittoDB.migrate(kysely); const store = new EventsDB(kysely); diff --git a/src/trends.ts b/src/trends.ts index 6199e9ed..337ee5c7 100644 --- a/src/trends.ts +++ b/src/trends.ts @@ -70,7 +70,7 @@ export async function updateTrendingTags( aliases?: string[], ) { console.info(`Updating trending ${l}...`); - const db = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); const signal = AbortSignal.timeout(1000); const yesterday = Math.floor((Date.now() - Time.days(1)) / 1000); @@ -79,7 +79,7 @@ export async function updateTrendingTags( const tagNames = aliases ? [tagName, ...aliases] : [tagName]; try { - const trends = await getTrendingTagValues(db, tagNames, { + const trends = await getTrendingTagValues(kysely, tagNames, { kinds, since: yesterday, until: now, diff --git a/src/workers/fetch.worker.ts b/src/workers/fetch.worker.ts index 469c8ff0..e6f98455 100644 --- a/src/workers/fetch.worker.ts +++ b/src/workers/fetch.worker.ts @@ -1,3 +1,5 @@ +/// + import Debug from '@soapbox/stickynotes/debug'; import * as Comlink from 'comlink';