From 552de01a1735cf7031f0edf92049704f6deb0c29 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sun, 11 Aug 2024 19:02:27 -0500 Subject: [PATCH] Rework DittoDB to return an object --- src/controllers/api/oauth.ts | 2 +- src/controllers/api/statuses.ts | 6 ++--- src/controllers/api/streaming.ts | 2 +- src/db/DittoDB.ts | 40 +++++++++++++++++++++--------- src/db/DittoTables.ts | 27 -------------------- src/db/adapters/DittoPostgres.ts | 7 +++--- src/db/adapters/DittoSQLite.ts | 9 ++++--- src/db/unattached-media.ts | 6 ++--- src/middleware/signerMiddleware.ts | 2 +- src/pipeline.ts | 10 ++++---- src/storages.ts | 3 +-- src/storages/hydrate.ts | 6 ++--- src/trends.ts | 40 +++++++++++++++++++++++------- 13 files changed, 86 insertions(+), 74 deletions(-) diff --git a/src/controllers/api/oauth.ts b/src/controllers/api/oauth.ts index 01f80bf1..a736f5ca 100644 --- a/src/controllers/api/oauth.ts +++ b/src/controllers/api/oauth.ts @@ -82,7 +82,7 @@ const createTokenController: AppController = async (c) => { async function getToken( { pubkey, secret, relays = [] }: { pubkey: string; secret?: string; relays?: string[] }, ): Promise<`token1${string}`> { - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); const token = generateToken(); const serverSeckey = generateSecretKey(); diff --git a/src/controllers/api/statuses.ts b/src/controllers/api/statuses.ts index 00aa9a31..23ad2c76 100644 --- a/src/controllers/api/statuses.ts +++ b/src/controllers/api/statuses.ts @@ -63,7 +63,7 @@ const statusController: AppController = async (c) => { const createStatusController: AppController = async (c) => { const body = await parseBody(c.req.raw); const result = createStatusSchema.safeParse(body); - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); const store = c.get('store'); if (!result.success) { @@ -565,9 +565,9 @@ const zappedByController: AppController = async (c) => { const id = c.req.param('id'); const params = c.get('listPagination'); const store = await Storages.db(); - const db = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); - const zaps = await db.selectFrom('event_zaps') + const zaps = await kysely.selectFrom('event_zaps') .selectAll() .where('target_event_id', '=', id) .orderBy('amount_millisats', 'desc') diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 4bcec4cb..9a5b5deb 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -189,7 +189,7 @@ async function topicToFilter( async function getTokenPubkey(token: string): Promise { if (token.startsWith('token1')) { - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); const { user_pubkey } = await kysely .selectFrom('nip46_tokens') diff --git a/src/db/DittoDB.ts b/src/db/DittoDB.ts index c9756cf7..fadd4962 100644 --- a/src/db/DittoDB.ts +++ b/src/db/DittoDB.ts @@ -1,40 +1,56 @@ import fs from 'node:fs/promises'; import path from 'node:path'; +import { NDatabaseSchema, NPostgresSchema } from '@nostrify/db'; import { FileMigrationProvider, Kysely, Migrator } from 'kysely'; import { Conf } from '@/config.ts'; import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts'; import { DittoSQLite } from '@/db/adapters/DittoSQLite.ts'; import { DittoTables } from '@/db/DittoTables.ts'; +import { EventsDB } from '@/storages/EventsDB.ts'; + +export type DittoDatabase = { + dialect: 'sqlite'; + store: EventsDB; + kysely: Kysely & Kysely; +} | { + dialect: 'postgres'; + store: EventsDB; + kysely: Kysely & Kysely; +}; export class DittoDB { - private static kysely: Promise> | undefined; + private static db: Promise | undefined; - static getInstance(): Promise> { - if (!this.kysely) { - this.kysely = this._getInstance(); + static getInstance(): Promise { + if (!this.db) { + this.db = this._getInstance(); } - return this.kysely; + return this.db; } - static async _getInstance(): Promise> { - let kysely: Kysely; + static async _getInstance(): Promise { + const result = {} as DittoDatabase; switch (Conf.db.dialect) { case 'sqlite': - kysely = await DittoSQLite.getInstance(); + result.dialect = 'sqlite'; + result.kysely = await DittoSQLite.getInstance(); + result.store = new EventsDB(result.kysely as any); break; case 'postgres': - kysely = await DittoPostgres.getInstance(); + result.dialect = 'postgres'; + result.kysely = await DittoPostgres.getInstance(); + result.store = new EventsDB(result.kysely as any); break; default: throw new Error('Unsupported database URL.'); } - await this.migrate(kysely); + await this.migrate(result.kysely); - return kysely; + return result; } static get poolSize(): number { @@ -52,7 +68,7 @@ export class DittoDB { } /** Migrate the database to the latest version. */ - static async migrate(kysely: Kysely) { + static async migrate(kysely: DittoDatabase['kysely']) { const migrator = new Migrator({ db: kysely, provider: new FileMigrationProvider({ diff --git a/src/db/DittoTables.ts b/src/db/DittoTables.ts index 73f0cdad..69356649 100644 --- a/src/db/DittoTables.ts +++ b/src/db/DittoTables.ts @@ -1,7 +1,4 @@ export interface DittoTables { - nostr_events: EventRow; - nostr_tags: TagRow; - nostr_fts5: EventFTSRow; nip46_tokens: NIP46TokenRow; unattached_media: UnattachedMediaRow; author_stats: AuthorStatsRow; @@ -27,30 +24,6 @@ interface EventStatsRow { zaps_amount: number; } -interface EventRow { - id: string; - kind: number; - pubkey: string; - content: string; - created_at: number; - tags: string; - sig: string; -} - -interface EventFTSRow { - event_id: string; - content: string; -} - -interface TagRow { - event_id: string; - name: string; - value: string; - kind: number; - pubkey: string; - created_at: number; -} - interface NIP46TokenRow { api_token: string; user_pubkey: string; diff --git a/src/db/adapters/DittoPostgres.ts b/src/db/adapters/DittoPostgres.ts index e6793a86..d1127117 100644 --- a/src/db/adapters/DittoPostgres.ts +++ b/src/db/adapters/DittoPostgres.ts @@ -1,3 +1,4 @@ +import { NPostgresSchema } from '@nostrify/db'; import { BinaryOperationNode, FunctionNode, @@ -17,11 +18,11 @@ import { DittoTables } from '@/db/DittoTables.ts'; import { KyselyLogger } from '@/db/KyselyLogger.ts'; export class DittoPostgres { - static db: Kysely | undefined; + static db: Kysely & Kysely | undefined; static postgres?: postgres.Sql; // deno-lint-ignore require-await - static async getInstance(): Promise> { + static async getInstance(): Promise & Kysely> { if (!this.postgres) { this.postgres = postgres(Conf.databaseUrl, { max: Conf.pg.poolSize }); } @@ -45,7 +46,7 @@ export class DittoPostgres { }, }, log: KyselyLogger, - }); + }) as Kysely & Kysely; } return this.db; diff --git a/src/db/adapters/DittoSQLite.ts b/src/db/adapters/DittoSQLite.ts index d412ca31..e54292dd 100644 --- a/src/db/adapters/DittoSQLite.ts +++ b/src/db/adapters/DittoSQLite.ts @@ -1,3 +1,4 @@ +import { NDatabaseSchema } from '@nostrify/db'; import { PolySqliteDialect } from '@soapbox/kysely-deno-sqlite'; import { Kysely, sql } from 'kysely'; @@ -7,19 +8,19 @@ import { KyselyLogger } from '@/db/KyselyLogger.ts'; import SqliteWorker from '@/workers/sqlite.ts'; export class DittoSQLite { - static db: Kysely | undefined; + static db: Kysely & Kysely | undefined; - static async getInstance(): Promise> { + static async getInstance(): Promise & Kysely> { if (!this.db) { const sqliteWorker = new SqliteWorker(); await sqliteWorker.open(this.path); - this.db = new Kysely({ + this.db = new Kysely({ dialect: new PolySqliteDialect({ database: sqliteWorker, }), log: KyselyLogger, - }); + }) as Kysely & Kysely; // Set PRAGMA values. await Promise.all([ diff --git a/src/db/unattached-media.ts b/src/db/unattached-media.ts index 0e0aeea6..21949570 100644 --- a/src/db/unattached-media.ts +++ b/src/db/unattached-media.ts @@ -14,7 +14,7 @@ interface UnattachedMedia { /** Add unattached media into the database. */ async function insertUnattachedMedia(media: UnattachedMedia) { - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); await kysely.insertInto('unattached_media') .values({ ...media, data: JSON.stringify(media.data) }) .execute(); @@ -44,7 +44,7 @@ function getUnattachedMedia(kysely: Kysely, until: Date) { /** Delete unattached media by URL. */ async function deleteUnattachedMediaByUrl(url: string) { - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); return kysely.deleteFrom('unattached_media') .where('url', '=', url) .execute(); @@ -67,7 +67,7 @@ async function getUnattachedMediaByIds(kysely: Kysely, ids: string[ /** Delete rows as an event with media is being created. */ async function deleteAttachedMedia(pubkey: string, urls: string[]): Promise { if (!urls.length) return; - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); await kysely.deleteFrom('unattached_media') .where('pubkey', '=', pubkey) .where('url', 'in', urls) diff --git a/src/middleware/signerMiddleware.ts b/src/middleware/signerMiddleware.ts index 89a494c4..60826db9 100644 --- a/src/middleware/signerMiddleware.ts +++ b/src/middleware/signerMiddleware.ts @@ -20,7 +20,7 @@ export const signerMiddleware: AppMiddleware = async (c, next) => { if (bech32.startsWith('token1')) { try { - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); const { user_pubkey, server_seckey, relays } = await kysely .selectFrom('nip46_tokens') diff --git a/src/pipeline.ts b/src/pipeline.ts index 612f93ba..2fb18649 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -19,8 +19,8 @@ import { verifyEventWorker } from '@/workers/verify.ts'; import { nip05Cache } from '@/utils/nip05.ts'; import { updateStats } from '@/utils/stats.ts'; import { getTagSet } from '@/utils/tags.ts'; -import { DittoTables } from '@/db/DittoTables.ts'; import { getAmount } from '@/utils/bolt11.ts'; +import { DittoTables } from '@/db/DittoTables.ts'; const debug = Debug('ditto:pipeline'); @@ -54,7 +54,7 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise { async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise { await hydrateEvents({ events: [event], store: await Storages.db(), signal }); - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); const domain = await kysely .selectFrom('pubkey_domains') .select('domain') @@ -120,7 +120,7 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise { if (NKinds.ephemeral(event.kind)) return; const store = await Storages.db(); - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); await updateStats({ event, store, kysely }).catch(debug); await store.event(event, { signal }); @@ -148,7 +148,7 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise { if (!this._db) { this._db = (async () => { - const kysely = await DittoDB.getInstance(); - const store = new EventsDB(kysely); + const { store } = await DittoDB.getInstance(); await seedZapSplits(store); return store; })(); diff --git a/src/storages/hydrate.ts b/src/storages/hydrate.ts index 6fe8df6f..19ba0db4 100644 --- a/src/storages/hydrate.ts +++ b/src/storages/hydrate.ts @@ -18,7 +18,7 @@ interface HydrateOpts { /** Hydrate events using the provided storage. */ async function hydrateEvents(opts: HydrateOpts): Promise { - const { events, store, signal, kysely = await DittoDB.getInstance() } = opts; + const { events, store, signal, kysely = (await DittoDB.getInstance()).kysely } = opts; if (!events.length) { return events; @@ -59,8 +59,8 @@ async function hydrateEvents(opts: HydrateOpts): Promise { } const stats = { - authors: await gatherAuthorStats(cache, kysely), - events: await gatherEventStats(cache, kysely), + authors: await gatherAuthorStats(cache, kysely as Kysely), + events: await gatherEventStats(cache, kysely as Kysely), }; // Dedupe events. diff --git a/src/trends.ts b/src/trends.ts index c5165f33..3e17a9e3 100644 --- a/src/trends.ts +++ b/src/trends.ts @@ -1,10 +1,10 @@ +import { NPostgresSchema } from '@nostrify/db'; import { NostrFilter } from '@nostrify/nostrify'; import { Stickynotes } from '@soapbox/stickynotes'; import { Kysely } from 'kysely'; import { Conf } from '@/config.ts'; import { DittoDB } from '@/db/DittoDB.ts'; -import { DittoTables } from '@/db/DittoTables.ts'; import { handleEvent } from '@/pipeline.ts'; import { AdminSigner } from '@/signers/AdminSigner.ts'; import { Time } from '@/utils/time.ts'; @@ -14,31 +14,53 @@ const console = new Stickynotes('ditto:trends'); /** Get trending tag values for a given tag in the given time frame. */ export async function getTrendingTagValues( /** Kysely instance to execute queries on. */ - kysely: Kysely, + kysely: Kysely, /** Tag name to filter by, eg `t` or `r`. */ tagNames: string[], /** Filter of eligible events. */ filter: NostrFilter, ): Promise<{ value: string; authors: number; uses: number }[]> { + /* + SELECT + LOWER(element.value) AS value, + COUNT(DISTINCT nostr_events.pubkey) AS authors, + COUNT(*) as "uses" + FROM + nostr_events, + jsonb_each_text(nostr_events.tags_index) kv, + jsonb_array_elements_text(kv.value::jsonb) element + WHERE + kv.key = 't' + AND nostr_events.kind = 1 + AND nostr_events.created_at >= 1723325796 + AND nostr_events.created_at <= 1723412196 + GROUP BY + LOWER(element.value) + ORDER BY + COUNT(DISTINCT nostr_events.pubkey) DESC + LIMIT 20; + */ let query = kysely - .selectFrom('nostr_tags') + .selectFrom((eb) => [ + 'nostr_events', + eb.from('jsonb_each_text', ['nostr_events.tags_index'], 'kv'), + eb.from('jsonb_array_elements_text', ['kv.value::jsonb'], 'element'), + ]) .select(({ fn }) => [ - 'nostr_tags.value', - fn.agg('count', ['nostr_tags.pubkey']).distinct().as('authors'), - fn.countAll().as('uses'), + fn('lower', ['element.value']).as('value'), ]) .where('nostr_tags.name', 'in', tagNames) .groupBy('nostr_tags.value') .orderBy((c) => c.fn.agg('count', ['nostr_tags.pubkey']).distinct(), 'desc'); if (filter.kinds) { - query = query.where('nostr_tags.kind', 'in', filter.kinds); + query = query.where('kind', 'in', filter.kinds); } if (typeof filter.since === 'number') { - query = query.where('nostr_tags.created_at', '>=', filter.since); + query = query.where('created_at', '>=', filter.since); } if (typeof filter.until === 'number') { - query = query.where('nostr_tags.created_at', '<=', filter.until); + query = query.where('created_at', '<=', filter.until); } if (typeof filter.limit === 'number') { query = query.limit(filter.limit);