From 916b68a3e47b70dc880d25fcc677908ed2c8024d Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sat, 10 Aug 2024 13:30:24 -0500 Subject: [PATCH 01/18] Switch to NPostgres (steamroll SQLite for now) --- deno.json | 2 +- deno.lock | 8 ++++---- src/storages/EventsDB.ts | 10 ++++------ 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/deno.json b/deno.json index ed5eca51..94df4107 100644 --- a/deno.json +++ b/deno.json @@ -30,7 +30,7 @@ "@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1", "@lambdalisue/async": "jsr:@lambdalisue/async@^2.1.1", "@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0", - "@nostrify/db": "jsr:@nostrify/db@^0.30.0", + "@nostrify/db": "jsr:@nostrify/db@^0.31.0", "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.30.0", "@scure/base": "npm:@scure/base@^1.1.6", "@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs", diff --git a/deno.lock b/deno.lock index 880f8d69..1629dbfc 100644 --- a/deno.lock +++ b/deno.lock @@ -11,7 +11,7 @@ "jsr:@gleasonator/policy@0.4.1": "jsr:@gleasonator/policy@0.4.1", "jsr:@hono/hono@^4.4.6": "jsr:@hono/hono@4.5.3", "jsr:@lambdalisue/async@^2.1.1": "jsr:@lambdalisue/async@2.1.1", - "jsr:@nostrify/db@^0.30.0": "jsr:@nostrify/db@0.30.0", + "jsr:@nostrify/db@^0.31.0": "jsr:@nostrify/db@0.31.0", "jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.5", "jsr:@nostrify/nostrify@^0.22.4": "jsr:@nostrify/nostrify@0.22.4", "jsr:@nostrify/nostrify@^0.22.5": "jsr:@nostrify/nostrify@0.22.5", @@ -138,8 +138,8 @@ "@lambdalisue/async@2.1.1": { "integrity": "1fc9bc6f4ed50215cd2f7217842b18cea80f81c25744f88f8c5eb4be5a1c9ab4" }, - "@nostrify/db@0.30.0": { - "integrity": "a75ba78be89d57c54c3d47e9e94c7142817c5b50daec27bf7f9a4af4629be20b", + "@nostrify/db@0.31.0": { + "integrity": "e1621f553aae431ad1d592de40684767b48ebc0b77cd36498eb92101c40374f0", "dependencies": [ "jsr:@nostrify/nostrify@^0.30.0", "jsr:@nostrify/types@^0.30.0", @@ -1827,7 +1827,7 @@ "jsr:@db/sqlite@^0.11.1", "jsr:@hono/hono@^4.4.6", "jsr:@lambdalisue/async@^2.1.1", - "jsr:@nostrify/db@^0.30.0", + "jsr:@nostrify/db@^0.31.0", "jsr:@nostrify/nostrify@^0.30.0", "jsr:@soapbox/kysely-deno-sqlite@^2.1.0", "jsr:@soapbox/stickynotes@^0.4.0", diff --git a/src/storages/EventsDB.ts b/src/storages/EventsDB.ts index 74fa8705..ae59a948 100644 --- a/src/storages/EventsDB.ts +++ b/src/storages/EventsDB.ts @@ -1,6 +1,6 @@ // deno-lint-ignore-file require-await -import { NDatabase } from '@nostrify/db'; +import { NPostgres } from '@nostrify/db'; import { NIP50, NKinds, @@ -33,7 +33,7 @@ type TagCondition = ({ event, count, value }: { /** SQLite database storage adapter for Nostr events. */ class EventsDB implements NStore { - private store: NDatabase; + private store: NPostgres; private console = new Stickynotes('ditto:db:events'); /** Conditions for when to index certain tags. */ @@ -54,11 +54,9 @@ class EventsDB implements NStore { }; constructor(private kysely: Kysely) { - this.store = new NDatabase(kysely, { - fts: Conf.db.dialect, - timeoutStrategy: Conf.db.dialect === 'postgres' ? 'setStatementTimeout' : undefined, + this.store = new NPostgres(kysely, { indexTags: EventsDB.indexTags, - searchText: EventsDB.searchText, + indexSearch: EventsDB.searchText, }); } From 300c78ddb1af1a82902e72e6e2a98a82f072631c Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sat, 10 Aug 2024 15:44:58 -0500 Subject: [PATCH 02/18] Upgrade @nostrify/db --- deno.json | 2 +- deno.lock | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/deno.json b/deno.json index 94df4107..275858ef 100644 --- a/deno.json +++ b/deno.json @@ -30,7 +30,7 @@ "@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1", "@lambdalisue/async": "jsr:@lambdalisue/async@^2.1.1", "@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0", - "@nostrify/db": "jsr:@nostrify/db@^0.31.0", + "@nostrify/db": "jsr:@nostrify/db@^0.31.1", "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.30.0", "@scure/base": "npm:@scure/base@^1.1.6", "@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs", diff --git a/deno.lock b/deno.lock index 1629dbfc..d7649ddc 100644 --- a/deno.lock +++ b/deno.lock @@ -9,9 +9,9 @@ "jsr:@gleasonator/policy@0.2.0": "jsr:@gleasonator/policy@0.2.0", "jsr:@gleasonator/policy@0.4.0": "jsr:@gleasonator/policy@0.4.0", "jsr:@gleasonator/policy@0.4.1": "jsr:@gleasonator/policy@0.4.1", - "jsr:@hono/hono@^4.4.6": "jsr:@hono/hono@4.5.3", + "jsr:@hono/hono@^4.4.6": "jsr:@hono/hono@4.5.4", "jsr:@lambdalisue/async@^2.1.1": "jsr:@lambdalisue/async@2.1.1", - "jsr:@nostrify/db@^0.31.0": "jsr:@nostrify/db@0.31.0", + "jsr:@nostrify/db@^0.31.1": "jsr:@nostrify/db@0.31.1", "jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.5", "jsr:@nostrify/nostrify@^0.22.4": "jsr:@nostrify/nostrify@0.22.4", "jsr:@nostrify/nostrify@^0.22.5": "jsr:@nostrify/nostrify@0.22.5", @@ -135,11 +135,14 @@ "@hono/hono@4.5.3": { "integrity": "429923b2b3c6586a1450862328d61a1346fee5841e8ae86c494250475057213c" }, + "@hono/hono@4.5.4": { + "integrity": "3792780b8460d5df0959b07c059db9325e4fa1a49f8b5aff7ab9bc870bdec8e3" + }, "@lambdalisue/async@2.1.1": { "integrity": "1fc9bc6f4ed50215cd2f7217842b18cea80f81c25744f88f8c5eb4be5a1c9ab4" }, - "@nostrify/db@0.31.0": { - "integrity": "e1621f553aae431ad1d592de40684767b48ebc0b77cd36498eb92101c40374f0", + "@nostrify/db@0.31.1": { + "integrity": "b0e2542f2b02cb67e1f936d1def69c0facc0bdba3d8c1e396a0026929f3e2317", "dependencies": [ "jsr:@nostrify/nostrify@^0.30.0", "jsr:@nostrify/types@^0.30.0", @@ -1827,7 +1830,7 @@ "jsr:@db/sqlite@^0.11.1", "jsr:@hono/hono@^4.4.6", "jsr:@lambdalisue/async@^2.1.1", - "jsr:@nostrify/db@^0.31.0", + "jsr:@nostrify/db@^0.31.1", "jsr:@nostrify/nostrify@^0.30.0", "jsr:@soapbox/kysely-deno-sqlite@^2.1.0", "jsr:@soapbox/stickynotes@^0.4.0", From 552de01a1735cf7031f0edf92049704f6deb0c29 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sun, 11 Aug 2024 19:02:27 -0500 Subject: [PATCH 03/18] Rework DittoDB to return an object --- src/controllers/api/oauth.ts | 2 +- src/controllers/api/statuses.ts | 6 ++--- src/controllers/api/streaming.ts | 2 +- src/db/DittoDB.ts | 40 +++++++++++++++++++++--------- src/db/DittoTables.ts | 27 -------------------- src/db/adapters/DittoPostgres.ts | 7 +++--- src/db/adapters/DittoSQLite.ts | 9 ++++--- src/db/unattached-media.ts | 6 ++--- src/middleware/signerMiddleware.ts | 2 +- src/pipeline.ts | 10 ++++---- src/storages.ts | 3 +-- src/storages/hydrate.ts | 6 ++--- src/trends.ts | 40 +++++++++++++++++++++++------- 13 files changed, 86 insertions(+), 74 deletions(-) diff --git a/src/controllers/api/oauth.ts b/src/controllers/api/oauth.ts index 01f80bf1..a736f5ca 100644 --- a/src/controllers/api/oauth.ts +++ b/src/controllers/api/oauth.ts @@ -82,7 +82,7 @@ const createTokenController: AppController = async (c) => { async function getToken( { pubkey, secret, relays = [] }: { pubkey: string; secret?: string; relays?: string[] }, ): Promise<`token1${string}`> { - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); const token = generateToken(); const serverSeckey = generateSecretKey(); diff --git a/src/controllers/api/statuses.ts b/src/controllers/api/statuses.ts index 00aa9a31..23ad2c76 100644 --- a/src/controllers/api/statuses.ts +++ b/src/controllers/api/statuses.ts @@ -63,7 +63,7 @@ const statusController: AppController = async (c) => { const createStatusController: AppController = async (c) => { const body = await parseBody(c.req.raw); const result = createStatusSchema.safeParse(body); - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); const store = c.get('store'); if (!result.success) { @@ -565,9 +565,9 @@ const zappedByController: AppController = async (c) => { const id = c.req.param('id'); const params = c.get('listPagination'); const store = await Storages.db(); - const db = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); - const zaps = await db.selectFrom('event_zaps') + const zaps = await kysely.selectFrom('event_zaps') .selectAll() .where('target_event_id', '=', id) .orderBy('amount_millisats', 'desc') diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 4bcec4cb..9a5b5deb 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -189,7 +189,7 @@ async function topicToFilter( async function getTokenPubkey(token: string): Promise { if (token.startsWith('token1')) { - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); const { user_pubkey } = await kysely .selectFrom('nip46_tokens') diff --git a/src/db/DittoDB.ts b/src/db/DittoDB.ts index c9756cf7..fadd4962 100644 --- a/src/db/DittoDB.ts +++ b/src/db/DittoDB.ts @@ -1,40 +1,56 @@ import fs from 'node:fs/promises'; import path from 'node:path'; +import { NDatabaseSchema, NPostgresSchema } from '@nostrify/db'; import { FileMigrationProvider, Kysely, Migrator } from 'kysely'; import { Conf } from '@/config.ts'; import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts'; import { DittoSQLite } from '@/db/adapters/DittoSQLite.ts'; import { DittoTables } from '@/db/DittoTables.ts'; +import { EventsDB } from '@/storages/EventsDB.ts'; + +export type DittoDatabase = { + dialect: 'sqlite'; + store: EventsDB; + kysely: Kysely & Kysely; +} | { + dialect: 'postgres'; + store: EventsDB; + kysely: Kysely & Kysely; +}; export class DittoDB { - private static kysely: Promise> | undefined; + private static db: Promise | undefined; - static getInstance(): Promise> { - if (!this.kysely) { - this.kysely = this._getInstance(); + static getInstance(): Promise { + if (!this.db) { + this.db = this._getInstance(); } - return this.kysely; + return this.db; } - static async _getInstance(): Promise> { - let kysely: Kysely; + static async _getInstance(): Promise { + const result = {} as DittoDatabase; switch (Conf.db.dialect) { case 'sqlite': - kysely = await DittoSQLite.getInstance(); + result.dialect = 'sqlite'; + result.kysely = await DittoSQLite.getInstance(); + result.store = new EventsDB(result.kysely as any); break; case 'postgres': - kysely = await DittoPostgres.getInstance(); + result.dialect = 'postgres'; + result.kysely = await DittoPostgres.getInstance(); + result.store = new EventsDB(result.kysely as any); break; default: throw new Error('Unsupported database URL.'); } - await this.migrate(kysely); + await this.migrate(result.kysely); - return kysely; + return result; } static get poolSize(): number { @@ -52,7 +68,7 @@ export class DittoDB { } /** Migrate the database to the latest version. */ - static async migrate(kysely: Kysely) { + static async migrate(kysely: DittoDatabase['kysely']) { const migrator = new Migrator({ db: kysely, provider: new FileMigrationProvider({ diff --git a/src/db/DittoTables.ts b/src/db/DittoTables.ts index 73f0cdad..69356649 100644 --- a/src/db/DittoTables.ts +++ b/src/db/DittoTables.ts @@ -1,7 +1,4 @@ export interface DittoTables { - nostr_events: EventRow; - nostr_tags: TagRow; - nostr_fts5: EventFTSRow; nip46_tokens: NIP46TokenRow; unattached_media: UnattachedMediaRow; author_stats: AuthorStatsRow; @@ -27,30 +24,6 @@ interface EventStatsRow { zaps_amount: number; } -interface EventRow { - id: string; - kind: number; - pubkey: string; - content: string; - created_at: number; - tags: string; - sig: string; -} - -interface EventFTSRow { - event_id: string; - content: string; -} - -interface TagRow { - event_id: string; - name: string; - value: string; - kind: number; - pubkey: string; - created_at: number; -} - interface NIP46TokenRow { api_token: string; user_pubkey: string; diff --git a/src/db/adapters/DittoPostgres.ts b/src/db/adapters/DittoPostgres.ts index e6793a86..d1127117 100644 --- a/src/db/adapters/DittoPostgres.ts +++ b/src/db/adapters/DittoPostgres.ts @@ -1,3 +1,4 @@ +import { NPostgresSchema } from '@nostrify/db'; import { BinaryOperationNode, FunctionNode, @@ -17,11 +18,11 @@ import { DittoTables } from '@/db/DittoTables.ts'; import { KyselyLogger } from '@/db/KyselyLogger.ts'; export class DittoPostgres { - static db: Kysely | undefined; + static db: Kysely & Kysely | undefined; static postgres?: postgres.Sql; // deno-lint-ignore require-await - static async getInstance(): Promise> { + static async getInstance(): Promise & Kysely> { if (!this.postgres) { this.postgres = postgres(Conf.databaseUrl, { max: Conf.pg.poolSize }); } @@ -45,7 +46,7 @@ export class DittoPostgres { }, }, log: KyselyLogger, - }); + }) as Kysely & Kysely; } return this.db; diff --git a/src/db/adapters/DittoSQLite.ts b/src/db/adapters/DittoSQLite.ts index d412ca31..e54292dd 100644 --- a/src/db/adapters/DittoSQLite.ts +++ b/src/db/adapters/DittoSQLite.ts @@ -1,3 +1,4 @@ +import { NDatabaseSchema } from '@nostrify/db'; import { PolySqliteDialect } from '@soapbox/kysely-deno-sqlite'; import { Kysely, sql } from 'kysely'; @@ -7,19 +8,19 @@ import { KyselyLogger } from '@/db/KyselyLogger.ts'; import SqliteWorker from '@/workers/sqlite.ts'; export class DittoSQLite { - static db: Kysely | undefined; + static db: Kysely & Kysely | undefined; - static async getInstance(): Promise> { + static async getInstance(): Promise & Kysely> { if (!this.db) { const sqliteWorker = new SqliteWorker(); await sqliteWorker.open(this.path); - this.db = new Kysely({ + this.db = new Kysely({ dialect: new PolySqliteDialect({ database: sqliteWorker, }), log: KyselyLogger, - }); + }) as Kysely & Kysely; // Set PRAGMA values. await Promise.all([ diff --git a/src/db/unattached-media.ts b/src/db/unattached-media.ts index 0e0aeea6..21949570 100644 --- a/src/db/unattached-media.ts +++ b/src/db/unattached-media.ts @@ -14,7 +14,7 @@ interface UnattachedMedia { /** Add unattached media into the database. */ async function insertUnattachedMedia(media: UnattachedMedia) { - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); await kysely.insertInto('unattached_media') .values({ ...media, data: JSON.stringify(media.data) }) .execute(); @@ -44,7 +44,7 @@ function getUnattachedMedia(kysely: Kysely, until: Date) { /** Delete unattached media by URL. */ async function deleteUnattachedMediaByUrl(url: string) { - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); return kysely.deleteFrom('unattached_media') .where('url', '=', url) .execute(); @@ -67,7 +67,7 @@ async function getUnattachedMediaByIds(kysely: Kysely, ids: string[ /** Delete rows as an event with media is being created. */ async function deleteAttachedMedia(pubkey: string, urls: string[]): Promise { if (!urls.length) return; - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); await kysely.deleteFrom('unattached_media') .where('pubkey', '=', pubkey) .where('url', 'in', urls) diff --git a/src/middleware/signerMiddleware.ts b/src/middleware/signerMiddleware.ts index 89a494c4..60826db9 100644 --- a/src/middleware/signerMiddleware.ts +++ b/src/middleware/signerMiddleware.ts @@ -20,7 +20,7 @@ export const signerMiddleware: AppMiddleware = async (c, next) => { if (bech32.startsWith('token1')) { try { - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); const { user_pubkey, server_seckey, relays } = await kysely .selectFrom('nip46_tokens') diff --git a/src/pipeline.ts b/src/pipeline.ts index 612f93ba..2fb18649 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -19,8 +19,8 @@ import { verifyEventWorker } from '@/workers/verify.ts'; import { nip05Cache } from '@/utils/nip05.ts'; import { updateStats } from '@/utils/stats.ts'; import { getTagSet } from '@/utils/tags.ts'; -import { DittoTables } from '@/db/DittoTables.ts'; import { getAmount } from '@/utils/bolt11.ts'; +import { DittoTables } from '@/db/DittoTables.ts'; const debug = Debug('ditto:pipeline'); @@ -54,7 +54,7 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise { async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise { await hydrateEvents({ events: [event], store: await Storages.db(), signal }); - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); const domain = await kysely .selectFrom('pubkey_domains') .select('domain') @@ -120,7 +120,7 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise { if (NKinds.ephemeral(event.kind)) return; const store = await Storages.db(); - const kysely = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); await updateStats({ event, store, kysely }).catch(debug); await store.event(event, { signal }); @@ -148,7 +148,7 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise { if (!this._db) { this._db = (async () => { - const kysely = await DittoDB.getInstance(); - const store = new EventsDB(kysely); + const { store } = await DittoDB.getInstance(); await seedZapSplits(store); return store; })(); diff --git a/src/storages/hydrate.ts b/src/storages/hydrate.ts index 6fe8df6f..19ba0db4 100644 --- a/src/storages/hydrate.ts +++ b/src/storages/hydrate.ts @@ -18,7 +18,7 @@ interface HydrateOpts { /** Hydrate events using the provided storage. */ async function hydrateEvents(opts: HydrateOpts): Promise { - const { events, store, signal, kysely = await DittoDB.getInstance() } = opts; + const { events, store, signal, kysely = (await DittoDB.getInstance()).kysely } = opts; if (!events.length) { return events; @@ -59,8 +59,8 @@ async function hydrateEvents(opts: HydrateOpts): Promise { } const stats = { - authors: await gatherAuthorStats(cache, kysely), - events: await gatherEventStats(cache, kysely), + authors: await gatherAuthorStats(cache, kysely as Kysely), + events: await gatherEventStats(cache, kysely as Kysely), }; // Dedupe events. diff --git a/src/trends.ts b/src/trends.ts index c5165f33..3e17a9e3 100644 --- a/src/trends.ts +++ b/src/trends.ts @@ -1,10 +1,10 @@ +import { NPostgresSchema } from '@nostrify/db'; import { NostrFilter } from '@nostrify/nostrify'; import { Stickynotes } from '@soapbox/stickynotes'; import { Kysely } from 'kysely'; import { Conf } from '@/config.ts'; import { DittoDB } from '@/db/DittoDB.ts'; -import { DittoTables } from '@/db/DittoTables.ts'; import { handleEvent } from '@/pipeline.ts'; import { AdminSigner } from '@/signers/AdminSigner.ts'; import { Time } from '@/utils/time.ts'; @@ -14,31 +14,53 @@ const console = new Stickynotes('ditto:trends'); /** Get trending tag values for a given tag in the given time frame. */ export async function getTrendingTagValues( /** Kysely instance to execute queries on. */ - kysely: Kysely, + kysely: Kysely, /** Tag name to filter by, eg `t` or `r`. */ tagNames: string[], /** Filter of eligible events. */ filter: NostrFilter, ): Promise<{ value: string; authors: number; uses: number }[]> { + /* + SELECT + LOWER(element.value) AS value, + COUNT(DISTINCT nostr_events.pubkey) AS authors, + COUNT(*) as "uses" + FROM + nostr_events, + jsonb_each_text(nostr_events.tags_index) kv, + jsonb_array_elements_text(kv.value::jsonb) element + WHERE + kv.key = 't' + AND nostr_events.kind = 1 + AND nostr_events.created_at >= 1723325796 + AND nostr_events.created_at <= 1723412196 + GROUP BY + LOWER(element.value) + ORDER BY + COUNT(DISTINCT nostr_events.pubkey) DESC + LIMIT 20; + */ let query = kysely - .selectFrom('nostr_tags') + .selectFrom((eb) => [ + 'nostr_events', + eb.from('jsonb_each_text', ['nostr_events.tags_index'], 'kv'), + eb.from('jsonb_array_elements_text', ['kv.value::jsonb'], 'element'), + ]) .select(({ fn }) => [ - 'nostr_tags.value', - fn.agg('count', ['nostr_tags.pubkey']).distinct().as('authors'), - fn.countAll().as('uses'), + fn('lower', ['element.value']).as('value'), ]) .where('nostr_tags.name', 'in', tagNames) .groupBy('nostr_tags.value') .orderBy((c) => c.fn.agg('count', ['nostr_tags.pubkey']).distinct(), 'desc'); if (filter.kinds) { - query = query.where('nostr_tags.kind', 'in', filter.kinds); + query = query.where('kind', 'in', filter.kinds); } if (typeof filter.since === 'number') { - query = query.where('nostr_tags.created_at', '>=', filter.since); + query = query.where('created_at', '>=', filter.since); } if (typeof filter.until === 'number') { - query = query.where('nostr_tags.created_at', '<=', filter.until); + query = query.where('created_at', '<=', filter.until); } if (typeof filter.limit === 'number') { query = query.limit(filter.limit); From c8bec9b4b7c38f8ae0c830e311e7a7fe442babed Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sun, 11 Aug 2024 19:07:27 -0500 Subject: [PATCH 04/18] Delete unused getUnattachedMedia function --- src/db/unattached-media.ts | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/db/unattached-media.ts b/src/db/unattached-media.ts index 21949570..fac7a1d9 100644 --- a/src/db/unattached-media.ts +++ b/src/db/unattached-media.ts @@ -34,14 +34,6 @@ function selectUnattachedMediaQuery(kysely: Kysely) { ]); } -/** Find attachments that exist but aren't attached to any events. */ -function getUnattachedMedia(kysely: Kysely, until: Date) { - return selectUnattachedMediaQuery(kysely) - .leftJoin('nostr_tags', 'unattached_media.url', 'nostr_tags.value') - .where('uploaded_at', '<', until.getTime()) - .execute(); -} - /** Delete unattached media by URL. */ async function deleteUnattachedMediaByUrl(url: string) { const { kysely } = await DittoDB.getInstance(); @@ -77,7 +69,6 @@ async function deleteAttachedMedia(pubkey: string, urls: string[]): Promise Date: Sun, 11 Aug 2024 19:11:32 -0500 Subject: [PATCH 05/18] trends: disable for now --- src/trends.ts | 60 +++++++++++++++++++++------------------------------ 1 file changed, 24 insertions(+), 36 deletions(-) diff --git a/src/trends.ts b/src/trends.ts index 3e17a9e3..ff5f3d37 100644 --- a/src/trends.ts +++ b/src/trends.ts @@ -1,10 +1,8 @@ -import { NPostgresSchema } from '@nostrify/db'; import { NostrFilter } from '@nostrify/nostrify'; import { Stickynotes } from '@soapbox/stickynotes'; -import { Kysely } from 'kysely'; import { Conf } from '@/config.ts'; -import { DittoDB } from '@/db/DittoDB.ts'; +import { DittoDatabase, DittoDB } from '@/db/DittoDB.ts'; import { handleEvent } from '@/pipeline.ts'; import { AdminSigner } from '@/signers/AdminSigner.ts'; import { Time } from '@/utils/time.ts'; @@ -14,7 +12,7 @@ const console = new Stickynotes('ditto:trends'); /** Get trending tag values for a given tag in the given time frame. */ export async function getTrendingTagValues( /** Kysely instance to execute queries on. */ - kysely: Kysely, + db: DittoDatabase, /** Tag name to filter by, eg `t` or `r`. */ tagNames: string[], /** Filter of eligible events. */ @@ -40,39 +38,29 @@ export async function getTrendingTagValues( COUNT(DISTINCT nostr_events.pubkey) DESC LIMIT 20; */ - let query = kysely - .selectFrom((eb) => [ - 'nostr_events', - eb.from('jsonb_each_text', ['nostr_events.tags_index'], 'kv'), - eb.from('jsonb_array_elements_text', ['kv.value::jsonb'], 'element'), - ]) - .select(({ fn }) => [ - fn('lower', ['element.value']).as('value'), - ]) - .where('nostr_tags.name', 'in', tagNames) - .groupBy('nostr_tags.value') - .orderBy((c) => c.fn.agg('count', ['nostr_tags.pubkey']).distinct(), 'desc'); - if (filter.kinds) { - query = query.where('kind', 'in', filter.kinds); - } - if (typeof filter.since === 'number') { - query = query.where('created_at', '>=', filter.since); - } - if (typeof filter.until === 'number') { - query = query.where('created_at', '<=', filter.until); - } - if (typeof filter.limit === 'number') { - query = query.limit(filter.limit); - } + return []; - const rows = await query.execute(); + // if (filter.kinds) { + // query = query.where('kind', 'in', filter.kinds); + // } + // if (typeof filter.since === 'number') { + // query = query.where('created_at', '>=', filter.since); + // } + // if (typeof filter.until === 'number') { + // query = query.where('created_at', '<=', filter.until); + // } + // if (typeof filter.limit === 'number') { + // query = query.limit(filter.limit); + // } - return rows.map((row) => ({ - value: row.value, - authors: Number(row.authors), - uses: Number(row.uses), - })); + // const rows = await query.execute(); + + // return rows.map((row) => ({ + // value: row.value, + // authors: Number(row.authors), + // uses: Number(row.uses), + // })); } /** Get trending tags and publish an event with them. */ @@ -85,7 +73,7 @@ export async function updateTrendingTags( aliases?: string[], ) { console.info(`Updating trending ${l}...`); - const kysely = await DittoDB.getInstance(); + const db = await DittoDB.getInstance(); const signal = AbortSignal.timeout(1000); const yesterday = Math.floor((Date.now() - Time.days(1)) / 1000); @@ -94,7 +82,7 @@ export async function updateTrendingTags( const tagNames = aliases ? [tagName, ...aliases] : [tagName]; try { - const trends = await getTrendingTagValues(kysely, tagNames, { + const trends = await getTrendingTagValues(db, tagNames, { kinds, since: yesterday, until: now, From 8db54893506237121e8c7e629555ff276d33fde4 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sun, 11 Aug 2024 19:50:20 -0500 Subject: [PATCH 06/18] Implements trends for SQLite --- src/trends.ts | 57 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/src/trends.ts b/src/trends.ts index ff5f3d37..6b038d3f 100644 --- a/src/trends.ts +++ b/src/trends.ts @@ -12,7 +12,7 @@ const console = new Stickynotes('ditto:trends'); /** Get trending tag values for a given tag in the given time frame. */ export async function getTrendingTagValues( /** Kysely instance to execute queries on. */ - db: DittoDatabase, + { dialect, kysely }: DittoDatabase, /** Tag name to filter by, eg `t` or `r`. */ tagNames: string[], /** Filter of eligible events. */ @@ -39,28 +39,41 @@ export async function getTrendingTagValues( LIMIT 20; */ + if (dialect === 'sqlite') { + let query = kysely + .selectFrom('nostr_tags') + .select(({ fn }) => [ + 'nostr_tags.value', + fn.agg('count', ['nostr_tags.pubkey']).distinct().as('authors'), + fn.countAll().as('uses'), + ]) + .where('nostr_tags.name', 'in', tagNames) + .groupBy('nostr_tags.value') + .orderBy((c) => c.fn.agg('count', ['nostr_tags.pubkey']).distinct(), 'desc'); + + if (filter.kinds) { + query = query.where('nostr_tags.kind', 'in', filter.kinds); + } + if (typeof filter.since === 'number') { + query = query.where('nostr_tags.created_at', '>=', filter.since); + } + if (typeof filter.until === 'number') { + query = query.where('nostr_tags.created_at', '<=', filter.until); + } + if (typeof filter.limit === 'number') { + query = query.limit(filter.limit); + } + + const rows = await query.execute(); + + return rows.map((row) => ({ + value: row.value, + authors: Number(row.authors), + uses: Number(row.uses), + })); + } + return []; - - // if (filter.kinds) { - // query = query.where('kind', 'in', filter.kinds); - // } - // if (typeof filter.since === 'number') { - // query = query.where('created_at', '>=', filter.since); - // } - // if (typeof filter.until === 'number') { - // query = query.where('created_at', '<=', filter.until); - // } - // if (typeof filter.limit === 'number') { - // query = query.limit(filter.limit); - // } - - // const rows = await query.execute(); - - // return rows.map((row) => ({ - // value: row.value, - // authors: Number(row.authors), - // uses: Number(row.uses), - // })); } /** Get trending tags and publish an event with them. */ From 8dc3b5d5a73287c80741b5ee6e78cf774ed468fe Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sun, 11 Aug 2024 20:20:31 -0500 Subject: [PATCH 07/18] trends: maybe make postgres trends work? --- src/trends.ts | 49 +++++++++++++++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/src/trends.ts b/src/trends.ts index 6b038d3f..1e64c5b4 100644 --- a/src/trends.ts +++ b/src/trends.ts @@ -1,5 +1,6 @@ import { NostrFilter } from '@nostrify/nostrify'; import { Stickynotes } from '@soapbox/stickynotes'; +import { sql } from 'kysely'; import { Conf } from '@/config.ts'; import { DittoDatabase, DittoDB } from '@/db/DittoDB.ts'; @@ -18,26 +19,34 @@ export async function getTrendingTagValues( /** Filter of eligible events. */ filter: NostrFilter, ): Promise<{ value: string; authors: number; uses: number }[]> { - /* - SELECT - LOWER(element.value) AS value, - COUNT(DISTINCT nostr_events.pubkey) AS authors, - COUNT(*) as "uses" - FROM - nostr_events, - jsonb_each_text(nostr_events.tags_index) kv, - jsonb_array_elements_text(kv.value::jsonb) element - WHERE - kv.key = 't' - AND nostr_events.kind = 1 - AND nostr_events.created_at >= 1723325796 - AND nostr_events.created_at <= 1723412196 - GROUP BY - LOWER(element.value) - ORDER BY - COUNT(DISTINCT nostr_events.pubkey) DESC - LIMIT 20; - */ + if (dialect === 'postgres') { + const { rows } = await sql<{ value: string; authors: number; uses: number }>` + SELECT + LOWER(element.value) AS value, + COUNT(DISTINCT nostr_events.pubkey) AS authors, + COUNT(*) as "uses" + FROM + nostr_events, + jsonb_each_text(nostr_events.tags_index) kv, + jsonb_array_elements_text(kv.value::jsonb) element + WHERE + kv.key = ANY(${tagNames}) + ${filter.kinds ? sql`AND nostr_events.kind = ANY(${filter.kinds})` : sql``} + ${typeof filter.since === 'number' ? sql`AND nostr_events.created_at >= ${filter.since}` : sql``} + ${typeof filter.until === 'number' ? sql`AND nostr_events.created_at <= ${filter.until}` : sql``} + GROUP BY + LOWER(element.value) + ORDER BY + COUNT(DISTINCT nostr_events.pubkey) DESC + ${typeof filter.limit === 'number' ? sql`LIMIT ${filter.limit}` : sql``};` + .execute(kysely); + + return rows.map((row) => ({ + value: row.value, + authors: Number(row.authors), + uses: Number(row.uses), + })); + } if (dialect === 'sqlite') { let query = kysely From 5c23cb02c26d841f30a0e3a46225d89798e8bcda Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 12 Aug 2024 14:37:41 -0500 Subject: [PATCH 08/18] Add NPostgres migration --- src/db/migrations/030_pg_events_jsonb.ts | 141 +++++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 src/db/migrations/030_pg_events_jsonb.ts diff --git a/src/db/migrations/030_pg_events_jsonb.ts b/src/db/migrations/030_pg_events_jsonb.ts new file mode 100644 index 00000000..7d7cc4d2 --- /dev/null +++ b/src/db/migrations/030_pg_events_jsonb.ts @@ -0,0 +1,141 @@ +import { Kysely, sql } from 'kysely'; + +import { Conf } from '@/config.ts'; + +export async function up(db: Kysely): Promise { + if (Conf.db.dialect !== 'postgres') return; + + // Create new table and indexes. + await db.schema + .createTable('nostr_events_new') + .addColumn('id', 'char(64)', (col) => col.primaryKey()) + .addColumn('kind', 'integer', (col) => col.notNull()) + .addColumn('pubkey', 'char(64)', (col) => col.notNull()) + .addColumn('content', 'text', (col) => col.notNull()) + .addColumn('created_at', 'bigint', (col) => col.notNull()) + .addColumn('tags', 'jsonb', (col) => col.notNull()) + .addColumn('tags_index', 'jsonb', (col) => col.notNull()) + .addColumn('sig', 'char(128)', (col) => col.notNull()) + .addColumn('d', 'text') + .addColumn('search', sql`tsvector`) + .addCheckConstraint('nostr_events_kind_chk', sql`kind >= 0`) + .addCheckConstraint('nostr_events_created_chk', sql`created_at >= 0`) + .addCheckConstraint( + 'nostr_events_d_chk', + sql`(kind >= 30000 and kind < 40000 and d is not null) or ((kind < 30000 or kind >= 40000) and d is null)`, + ) + .ifNotExists() + .execute(); + + await db.schema + .createIndex('nostr_events_created_kind_idx') + .on('nostr_events_new') + .columns(['created_at desc', 'id asc', 'kind', 'pubkey']) + .ifNotExists() + .execute(); + + await db.schema + .createIndex('nostr_events_pubkey_created_idx') + .on('nostr_events_new') + .columns(['pubkey', 'created_at desc', 'id asc', 'kind']) + .ifNotExists() + .execute(); + + await db.schema + .createIndex('nostr_events_tags_idx').using('gin') + .on('nostr_events_new') + .column('tags_index') + .ifNotExists() + .execute(); + + await db.schema + .createIndex('nostr_events_replaceable_idx') + .on('nostr_events_new') + .columns(['kind', 'pubkey']) + .where(() => sql`kind >= 10000 and kind < 20000 or (kind in (0, 3))`) + .unique() + .ifNotExists() + .execute(); + + await db.schema + .createIndex('nostr_events_parameterized_idx') + .on('nostr_events_new') + .columns(['kind', 'pubkey', 'd']) + .where(() => sql`kind >= 30000 and kind < 40000`) + .unique() + .ifNotExists() + .execute(); + + await db.schema + .createIndex('nostr_events_search_idx').using('gin') + .on('nostr_events_new') + .column('search') + .ifNotExists() + .execute(); + + let iid: number | undefined; + const tid = setTimeout(() => { + console.warn(`Recreating the database to improve performance. +Depending on the size of your database, this could take a very long time, possibly days! + +If you don't want to wait, you can create a fresh database and then import your old events: + +1. Revert to a prior commit: e789e08c +2. Export your events: "deno task db:export > events.jsonl" +3. Drop your old database: "dropdb ditto" +4. Create a new database: "createdb ditto" +5. Start Ditto +6. While Ditto is running, import your events: "cat events.jsonl | deno task db:import"`); + + const emojis = ['⚡', '🐛', '🔎', '😂', '😅', '😬', '😭', '🙃', '🤔', '🧐', '🧐', '🫠']; + iid = setInterval(() => { + const emoji = emojis[Math.floor(Math.random() * emojis.length)]; + console.info(`Recreating the database... ${emoji}`); + }, 60_000); + }, 10_000); + + // Copy data to the new table. + await sql` + INSERT INTO nostr_events_new(id, kind, pubkey, content, created_at, tags, sig, d, tags_index, search) + SELECT + e.id, + e.kind, + e.pubkey, + e.content, + e.created_at, + e.tags::jsonb, + e.sig, + t_agg.tags_index->'d'->>0 as d, + COALESCE(t_agg.tags_index, '{}'::jsonb) as tags_index, + fts.search_vec + FROM + nostr_events AS e + LEFT JOIN + (SELECT event_id, jsonb_object_agg(name, values_array) as tags_index + FROM ( + SELECT event_id, name, jsonb_agg(value) as values_array + FROM nostr_tags + GROUP BY event_id, name + ) sub + GROUP BY event_id) AS t_agg ON e.id = t_agg.event_id + LEFT JOIN + nostr_pgfts AS fts ON e.id = fts.event_id + WHERE + (e.kind >= 30000 AND e.kind < 40000 AND t_agg.tags_index->'d'->>0 IS NOT NULL) + OR ((e.kind < 30000 OR e.kind >= 40000) AND t_agg.tags_index->'d'->>0 IS NULL) + ON CONFLICT DO NOTHING; + `.execute(db); + + clearTimeout(tid); + if (iid) clearInterval(iid); + + await db.schema.dropTable('nostr_events').execute(); + await db.schema.dropTable('nostr_tags').execute(); + await db.schema.dropTable('nostr_pgfts').execute(); + + await db.schema.alterTable('nostr_events_new').renameTo('nostr_events').execute(); +} + +export function down(_db: Kysely): Promise { + throw new Error("Sorry, you can't migrate back from here."); +} From 3a82c98c21d3dd6b647aeb2a78aa5f762b556c60 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 12 Aug 2024 14:48:32 -0500 Subject: [PATCH 09/18] trends: make trends work in Postgres (probably) --- src/trends.ts | 52 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/src/trends.ts b/src/trends.ts index 1e64c5b4..91164143 100644 --- a/src/trends.ts +++ b/src/trends.ts @@ -20,26 +20,38 @@ export async function getTrendingTagValues( filter: NostrFilter, ): Promise<{ value: string; authors: number; uses: number }[]> { if (dialect === 'postgres') { - const { rows } = await sql<{ value: string; authors: number; uses: number }>` - SELECT - LOWER(element.value) AS value, - COUNT(DISTINCT nostr_events.pubkey) AS authors, - COUNT(*) as "uses" - FROM - nostr_events, - jsonb_each_text(nostr_events.tags_index) kv, - jsonb_array_elements_text(kv.value::jsonb) element - WHERE - kv.key = ANY(${tagNames}) - ${filter.kinds ? sql`AND nostr_events.kind = ANY(${filter.kinds})` : sql``} - ${typeof filter.since === 'number' ? sql`AND nostr_events.created_at >= ${filter.since}` : sql``} - ${typeof filter.until === 'number' ? sql`AND nostr_events.created_at <= ${filter.until}` : sql``} - GROUP BY - LOWER(element.value) - ORDER BY - COUNT(DISTINCT nostr_events.pubkey) DESC - ${typeof filter.limit === 'number' ? sql`LIMIT ${filter.limit}` : sql``};` - .execute(kysely); + let query = kysely + .selectFrom([ + 'nostr_events', + sql<{ key: string; value: string }>`jsonb_each_text(nostr_events.tags_index)`.as('kv'), + sql<{ key: string; value: string }>`jsonb_array_elements_text(kv.value::jsonb)`.as('element'), + ]) + .select(({ fn }) => [ + fn('lower', ['element.value']).as('value'), + fn.agg('count', ['nostr_events.pubkey']).distinct().as('authors'), + fn.countAll().as('uses'), + ]) + .where('kv.key', '=', (eb) => eb.fn.any(eb.val(tagNames))) + .groupBy((eb) => eb.fn('lower', ['element.value'])) + .orderBy((eb) => eb.fn.agg('count', ['nostr_events.pubkey']).distinct(), 'desc'); + + if (filter.kinds) { + query = query.where('nostr_events.kind', '=', ({ fn, val }) => fn.any(val(filter.kinds))); + } + if (filter.authors) { + query = query.where('nostr_events.pubkey', '=', ({ fn, val }) => fn.any(val(filter.authors))); + } + if (typeof filter.since === 'number') { + query = query.where('nostr_events.created_at', '>=', filter.since); + } + if (typeof filter.until === 'number') { + query = query.where('nostr_events.created_at', '<=', filter.until); + } + if (typeof filter.limit === 'number') { + query = query.limit(filter.limit); + } + + const rows = await query.execute(); return rows.map((row) => ({ value: row.value, From 5e4a94457fb7527240647428a66a4dc5266d48e8 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 12 Aug 2024 14:57:17 -0500 Subject: [PATCH 10/18] Fix tests --- src/pipeline.test.ts | 2 +- src/test.ts | 13 +++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/pipeline.test.ts b/src/pipeline.test.ts index 3a196c16..2af2b8c3 100644 --- a/src/pipeline.test.ts +++ b/src/pipeline.test.ts @@ -37,7 +37,7 @@ Deno.test('store one zap receipt in nostr_events; convert it into event_zaps tab await handleZaps(kysely, event); await handleZaps(kysely, event); - const zapReceipts = await kysely.selectFrom('nostr_events').selectAll().execute(); + const zapReceipts = await db.store.query([{}]); const customEventZaps = await kysely.selectFrom('event_zaps').selectAll().execute(); assertEquals(zapReceipts.length, 1); // basic check diff --git a/src/test.ts b/src/test.ts index c3c0db7b..4d683d8b 100644 --- a/src/test.ts +++ b/src/test.ts @@ -2,7 +2,7 @@ import fs from 'node:fs/promises'; import path from 'node:path'; import { Database as Sqlite } from '@db/sqlite'; -import { NDatabase } from '@nostrify/db'; +import { NDatabase, NDatabaseSchema, NPostgresSchema } from '@nostrify/db'; import { NostrEvent } from '@nostrify/nostrify'; import { DenoSqlite3Dialect } from '@soapbox/kysely-deno-sqlite'; import { finalizeEvent, generateSecretKey } from 'nostr-tools'; @@ -10,7 +10,7 @@ import { FileMigrationProvider, Kysely, Migrator } from 'kysely'; import { PostgresJSDialect, PostgresJSDialectConfig } from 'kysely-postgres-js'; import postgres from 'postgres'; -import { DittoDB } from '@/db/DittoDB.ts'; +import { DittoDatabase, DittoDB } from '@/db/DittoDB.ts'; import { DittoTables } from '@/db/DittoTables.ts'; import { purifyEvent } from '@/storages/hydrate.ts'; import { KyselyLogger } from '@/db/KyselyLogger.ts'; @@ -99,7 +99,7 @@ export const createTestDB = async (databaseUrl?: string) => { console.warn(`Using: ${dialect}`); - let kysely: Kysely; + let kysely: DittoDatabase['kysely']; if (dialect === 'sqlite') { // migration 021_pgfts_index.ts calls 'Conf.db.dialect', @@ -107,11 +107,11 @@ export const createTestDB = async (databaseUrl?: string) => { // The following line ensures to NOT use the DATABASE_URL that may exist in an .env file. Deno.env.set('DATABASE_URL', 'sqlite://:memory:'); - kysely = new Kysely({ + kysely = new Kysely({ dialect: new DenoSqlite3Dialect({ database: new Sqlite(':memory:'), }), - }); + }) as Kysely & Kysely; } else { kysely = new Kysely({ // @ts-ignore Kysely version mismatch. @@ -121,13 +121,14 @@ export const createTestDB = async (databaseUrl?: string) => { }) as unknown as PostgresJSDialectConfig['postgres'], }), log: KyselyLogger, - }); + }) as Kysely & Kysely; } await DittoDB.migrate(kysely); const store = new EventsDB(kysely); return { + dialect, store, kysely, [Symbol.asyncDispose]: async () => { From 617659c7fd60df794b40144f3a7b7513b408e901 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 12 Aug 2024 15:13:49 -0500 Subject: [PATCH 11/18] Fix DittoDB+EventsDB relationship --- scripts/admin-event.ts | 4 ++-- scripts/admin-role.ts | 4 ++-- scripts/nostr-pull.ts | 4 ++-- src/db/DittoDB.ts | 5 ----- src/storages.ts | 3 ++- src/storages/EventsDB.ts | 29 +++++++++++++++++++---------- src/test.ts | 16 ++++++++-------- 7 files changed, 35 insertions(+), 30 deletions(-) diff --git a/scripts/admin-event.ts b/scripts/admin-event.ts index ca942512..71c957f4 100644 --- a/scripts/admin-event.ts +++ b/scripts/admin-event.ts @@ -9,8 +9,8 @@ import { nostrNow } from '@/utils.ts'; const signer = new AdminSigner(); -const kysely = await DittoDB.getInstance(); -const eventsDB = new EventsDB(kysely); +const db = await DittoDB.getInstance(); +const eventsDB = new EventsDB(db); const readable = Deno.stdin.readable .pipeThrough(new TextDecoderStream()) diff --git a/scripts/admin-role.ts b/scripts/admin-role.ts index 503b20b0..3f3c53f2 100644 --- a/scripts/admin-role.ts +++ b/scripts/admin-role.ts @@ -6,8 +6,8 @@ import { AdminSigner } from '@/signers/AdminSigner.ts'; import { EventsDB } from '@/storages/EventsDB.ts'; import { nostrNow } from '@/utils.ts'; -const kysely = await DittoDB.getInstance(); -const eventsDB = new EventsDB(kysely); +const db = await DittoDB.getInstance(); +const eventsDB = new EventsDB(db); const [pubkeyOrNpub, role] = Deno.args; const pubkey = pubkeyOrNpub.startsWith('npub1') ? nip19.decode(pubkeyOrNpub as `npub1${string}`).data : pubkeyOrNpub; diff --git a/scripts/nostr-pull.ts b/scripts/nostr-pull.ts index 64c51956..68766c8f 100644 --- a/scripts/nostr-pull.ts +++ b/scripts/nostr-pull.ts @@ -9,8 +9,8 @@ import { nip19 } from 'nostr-tools'; import { DittoDB } from '@/db/DittoDB.ts'; import { EventsDB } from '@/storages/EventsDB.ts'; -const kysely = await DittoDB.getInstance(); -const eventsDB = new EventsDB(kysely); +const db = await DittoDB.getInstance(); +const eventsDB = new EventsDB(db); interface ImportEventsOpts { profilesOnly: boolean; diff --git a/src/db/DittoDB.ts b/src/db/DittoDB.ts index fadd4962..5ed5d15e 100644 --- a/src/db/DittoDB.ts +++ b/src/db/DittoDB.ts @@ -8,15 +8,12 @@ import { Conf } from '@/config.ts'; import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts'; import { DittoSQLite } from '@/db/adapters/DittoSQLite.ts'; import { DittoTables } from '@/db/DittoTables.ts'; -import { EventsDB } from '@/storages/EventsDB.ts'; export type DittoDatabase = { dialect: 'sqlite'; - store: EventsDB; kysely: Kysely & Kysely; } | { dialect: 'postgres'; - store: EventsDB; kysely: Kysely & Kysely; }; @@ -37,12 +34,10 @@ export class DittoDB { case 'sqlite': result.dialect = 'sqlite'; result.kysely = await DittoSQLite.getInstance(); - result.store = new EventsDB(result.kysely as any); break; case 'postgres': result.dialect = 'postgres'; result.kysely = await DittoPostgres.getInstance(); - result.store = new EventsDB(result.kysely as any); break; default: throw new Error('Unsupported database URL.'); diff --git a/src/storages.ts b/src/storages.ts index 0bd79d40..c49b62cf 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -20,7 +20,8 @@ export class Storages { public static async db(): Promise { if (!this._db) { this._db = (async () => { - const { store } = await DittoDB.getInstance(); + const db = await DittoDB.getInstance(); + const store = new EventsDB(db); await seedZapSplits(store); return store; })(); diff --git a/src/storages/EventsDB.ts b/src/storages/EventsDB.ts index ae59a948..07be8067 100644 --- a/src/storages/EventsDB.ts +++ b/src/storages/EventsDB.ts @@ -1,6 +1,6 @@ // deno-lint-ignore-file require-await -import { NPostgres } from '@nostrify/db'; +import { NDatabase, NPostgres } from '@nostrify/db'; import { NIP50, NKinds, @@ -13,11 +13,10 @@ import { NStore, } from '@nostrify/nostrify'; import { Stickynotes } from '@soapbox/stickynotes'; -import { Kysely } from 'kysely'; import { nip27 } from 'nostr-tools'; import { Conf } from '@/config.ts'; -import { DittoTables } from '@/db/DittoTables.ts'; +import { DittoDatabase } from '@/db/DittoDB.ts'; import { dbEventCounter } from '@/metrics.ts'; import { RelayError } from '@/RelayError.ts'; import { purifyEvent } from '@/storages/hydrate.ts'; @@ -33,7 +32,7 @@ type TagCondition = ({ event, count, value }: { /** SQLite database storage adapter for Nostr events. */ class EventsDB implements NStore { - private store: NPostgres; + private store: NDatabase | NPostgres; private console = new Stickynotes('ditto:db:events'); /** Conditions for when to index certain tags. */ @@ -53,11 +52,21 @@ class EventsDB implements NStore { 't': ({ event, count, value }) => (event.kind === 1985 ? count < 20 : count < 5) && value.length < 50, }; - constructor(private kysely: Kysely) { - this.store = new NPostgres(kysely, { - indexTags: EventsDB.indexTags, - indexSearch: EventsDB.searchText, - }); + constructor(private database: DittoDatabase) { + const { dialect, kysely } = database; + + if (dialect === 'postgres') { + this.store = new NPostgres(kysely, { + indexTags: EventsDB.indexTags, + indexSearch: EventsDB.searchText, + }); + } else { + this.store = new NDatabase(kysely, { + fts: 'sqlite', + indexTags: EventsDB.indexTags, + searchText: EventsDB.searchText, + }); + } } /** Insert an event (and its tags) into the database. */ @@ -277,7 +286,7 @@ class EventsDB implements NStore { ) as { key: 'domain'; value: string } | undefined)?.value; if (domain) { - const query = this.kysely + const query = this.database.kysely .selectFrom('pubkey_domains') .select('pubkey') .where('domain', '=', domain); diff --git a/src/test.ts b/src/test.ts index 4d683d8b..df6c84f6 100644 --- a/src/test.ts +++ b/src/test.ts @@ -99,7 +99,7 @@ export const createTestDB = async (databaseUrl?: string) => { console.warn(`Using: ${dialect}`); - let kysely: DittoDatabase['kysely']; + const db: DittoDatabase = { dialect } as DittoDatabase; if (dialect === 'sqlite') { // migration 021_pgfts_index.ts calls 'Conf.db.dialect', @@ -107,13 +107,13 @@ export const createTestDB = async (databaseUrl?: string) => { // The following line ensures to NOT use the DATABASE_URL that may exist in an .env file. Deno.env.set('DATABASE_URL', 'sqlite://:memory:'); - kysely = new Kysely({ + db.kysely = new Kysely({ dialect: new DenoSqlite3Dialect({ database: new Sqlite(':memory:'), }), }) as Kysely & Kysely; } else { - kysely = new Kysely({ + db.kysely = new Kysely({ // @ts-ignore Kysely version mismatch. dialect: new PostgresJSDialect({ postgres: postgres(Conf.databaseUrl, { @@ -123,14 +123,14 @@ export const createTestDB = async (databaseUrl?: string) => { log: KyselyLogger, }) as Kysely & Kysely; } - await DittoDB.migrate(kysely); - const store = new EventsDB(kysely); + await DittoDB.migrate(db.kysely); + const store = new EventsDB(db); return { dialect, store, - kysely, + kysely: db.kysely, [Symbol.asyncDispose]: async () => { if (dialect === 'postgres') { for ( @@ -149,9 +149,9 @@ export const createTestDB = async (databaseUrl?: string) => { 'event_zaps', ] ) { - await kysely.schema.dropTable(table).ifExists().cascade().execute(); + await db.kysely.schema.dropTable(table).ifExists().cascade().execute(); } - await kysely.destroy(); + await db.kysely.destroy(); } }, }; From 8a230113928fc63fc5ed4ff86293daeaa539fb64 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 12 Aug 2024 15:17:42 -0500 Subject: [PATCH 12/18] pg migration: cascade --- src/db/migrations/030_pg_events_jsonb.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db/migrations/030_pg_events_jsonb.ts b/src/db/migrations/030_pg_events_jsonb.ts index 7d7cc4d2..534f70c7 100644 --- a/src/db/migrations/030_pg_events_jsonb.ts +++ b/src/db/migrations/030_pg_events_jsonb.ts @@ -129,7 +129,7 @@ If you don't want to wait, you can create a fresh database and then import your clearTimeout(tid); if (iid) clearInterval(iid); - await db.schema.dropTable('nostr_events').execute(); + await db.schema.dropTable('nostr_events').cascade().execute(); await db.schema.dropTable('nostr_tags').execute(); await db.schema.dropTable('nostr_pgfts').execute(); From f116950a83219a4d81103a4a14bc0491a665ac98 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 12 Aug 2024 15:25:51 -0500 Subject: [PATCH 13/18] Fix db:migrate script --- scripts/db-migrate.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/scripts/db-migrate.ts b/scripts/db-migrate.ts index b4a00d8c..ab0b9747 100644 --- a/scripts/db-migrate.ts +++ b/scripts/db-migrate.ts @@ -7,7 +7,10 @@ if (Deno.env.get('CI') && Conf.db.dialect === 'postgres') { await sleep(1_000); } -const kysely = await DittoDB.getInstance(); +// This migrates kysely internally. +const { kysely } = await DittoDB.getInstance(); + +// Close the connection before exiting. await kysely.destroy(); Deno.exit(); From e743e14bb1e6cf37154f9e19e128b2b765c019f1 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 12 Aug 2024 15:45:47 -0500 Subject: [PATCH 14/18] Fix EventsDB test --- src/storages/EventsDB.test.ts | 57 ++++++++++++++--------------------- 1 file changed, 22 insertions(+), 35 deletions(-) diff --git a/src/storages/EventsDB.test.ts b/src/storages/EventsDB.test.ts index 82c4e34a..7a5f7b93 100644 --- a/src/storages/EventsDB.test.ts +++ b/src/storages/EventsDB.test.ts @@ -58,9 +58,11 @@ Deno.test('delete events', async () => { await using db = await createTestDB(); const { store } = db; + const sk = generateSecretKey(); + const [one, two] = [ - { id: '1', kind: 1, pubkey: 'abc', content: 'hello world', created_at: 1, sig: '', tags: [] }, - { id: '2', kind: 1, pubkey: 'abc', content: 'yolo fam', created_at: 2, sig: '', tags: [] }, + genEvent({ kind: 1, content: 'hello world', created_at: 1 }, sk), + genEvent({ kind: 1, content: 'yolo fam', created_at: 2 }, sk), ]; await store.event(one); @@ -69,15 +71,9 @@ Deno.test('delete events', async () => { // Sanity check assertEquals(await store.query([{ kinds: [1] }]), [two, one]); - await store.event({ - kind: 5, - pubkey: one.pubkey, - tags: [['e', one.id]], - created_at: 0, - content: '', - id: '', - sig: '', - }); + await store.event( + genEvent({ kind: 5, tags: [['e', one.id]] }, sk), + ); assertEquals(await store.query([{ kinds: [1] }]), [two]); }); @@ -86,21 +82,15 @@ Deno.test("user cannot delete another user's event", async () => { await using db = await createTestDB(); const { store } = db; - const event = { id: '1', kind: 1, pubkey: 'abc', content: 'hello world', created_at: 1, sig: '', tags: [] }; + const event = genEvent({ kind: 1, content: 'hello world', created_at: 1 }); await store.event(event); // Sanity check assertEquals(await store.query([{ kinds: [1] }]), [event]); - await store.event({ - kind: 5, - pubkey: 'def', // different pubkey - tags: [['e', event.id]], - created_at: 0, - content: '', - id: '', - sig: '', - }); + await store.event( + genEvent({ kind: 5, tags: [['e', event.id]] }), // different sk + ); assertEquals(await store.query([{ kinds: [1] }]), [event]); }); @@ -109,9 +99,11 @@ Deno.test('admin can delete any event', async () => { await using db = await createTestDB(); const { store } = db; + const sk = generateSecretKey(); + const [one, two] = [ - { id: '1', kind: 1, pubkey: 'abc', content: 'hello world', created_at: 1, sig: '', tags: [] }, - { id: '2', kind: 1, pubkey: 'abc', content: 'yolo fam', created_at: 2, sig: '', tags: [] }, + genEvent({ kind: 1, content: 'hello world', created_at: 1 }, sk), + genEvent({ kind: 1, content: 'yolo fam', created_at: 2 }, sk), ]; await store.event(one); @@ -120,15 +112,9 @@ Deno.test('admin can delete any event', async () => { // Sanity check assertEquals(await store.query([{ kinds: [1] }]), [two, one]); - await store.event({ - kind: 5, - pubkey: Conf.pubkey, // Admin pubkey - tags: [['e', one.id]], - created_at: 0, - content: '', - id: '', - sig: '', - }); + await store.event( + genEvent({ kind: 5, tags: [['e', one.id]] }, Conf.seckey), // admin sk + ); assertEquals(await store.query([{ kinds: [1] }]), [two]); }); @@ -173,14 +159,15 @@ Deno.test('inserting replaceable events', async () => { await using db = await createTestDB(); const { store } = db; - const event = await eventFixture('event-0'); + const sk = generateSecretKey(); + const event = genEvent({ kind: 0, created_at: 100 }, sk); await store.event(event); - const olderEvent = { ...event, id: '123', created_at: event.created_at - 1 }; + const olderEvent = genEvent({ kind: 0, created_at: 50 }, sk); await store.event(olderEvent); assertEquals(await store.query([{ kinds: [0], authors: [event.pubkey] }]), [event]); - const newerEvent = { ...event, id: '123', created_at: event.created_at + 1 }; + const newerEvent = genEvent({ kind: 0, created_at: 999 }, sk); await store.event(newerEvent); assertEquals(await store.query([{ kinds: [0] }]), [newerEvent]); }); From c4064d0fa609708c8b406522a2cf1b1edfc61474 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 12 Aug 2024 15:54:56 -0500 Subject: [PATCH 15/18] pg migration: 3 days -> several hours --- src/db/migrations/030_pg_events_jsonb.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/db/migrations/030_pg_events_jsonb.ts b/src/db/migrations/030_pg_events_jsonb.ts index 534f70c7..894b89a0 100644 --- a/src/db/migrations/030_pg_events_jsonb.ts +++ b/src/db/migrations/030_pg_events_jsonb.ts @@ -75,8 +75,7 @@ export async function up(db: Kysely): Promise { let iid: number | undefined; const tid = setTimeout(() => { - console.warn(`Recreating the database to improve performance. -Depending on the size of your database, this could take a very long time, possibly days! + console.warn(`Recreating the database to improve performance. This will take several hours. If you don't want to wait, you can create a fresh database and then import your old events: From 27ba8b071aa4a6924e951a140e130b133873387d Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 12 Aug 2024 16:48:42 -0500 Subject: [PATCH 16/18] pg migration: advise checking out main branch --- src/db/migrations/030_pg_events_jsonb.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/db/migrations/030_pg_events_jsonb.ts b/src/db/migrations/030_pg_events_jsonb.ts index 894b89a0..7bfc6c17 100644 --- a/src/db/migrations/030_pg_events_jsonb.ts +++ b/src/db/migrations/030_pg_events_jsonb.ts @@ -81,10 +81,11 @@ If you don't want to wait, you can create a fresh database and then import your 1. Revert to a prior commit: e789e08c 2. Export your events: "deno task db:export > events.jsonl" -3. Drop your old database: "dropdb ditto" -4. Create a new database: "createdb ditto" -5. Start Ditto -6. While Ditto is running, import your events: "cat events.jsonl | deno task db:import"`); +3. Checkout the latest commit: "git checkout main && git pull" +4. Drop your old database: "dropdb ditto" +5. Create a new database: "createdb ditto" +6. Start Ditto +7. While Ditto is running, import your events: "cat events.jsonl | deno task db:import"`); const emojis = ['⚡', '🐛', '🔎', '😂', '😅', '😬', '😭', '🙃', '🤔', '🧐', '🧐', '🫠']; iid = setInterval(() => { From d3f4cd8d8c8e7cd83f15720974d7ef861f0fd42e Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 12 Aug 2024 21:57:09 -0500 Subject: [PATCH 17/18] Upgrade @nostrify/db to v0.31.2 --- deno.json | 2 +- deno.lock | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/deno.json b/deno.json index 275858ef..5f230172 100644 --- a/deno.json +++ b/deno.json @@ -30,7 +30,7 @@ "@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1", "@lambdalisue/async": "jsr:@lambdalisue/async@^2.1.1", "@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0", - "@nostrify/db": "jsr:@nostrify/db@^0.31.1", + "@nostrify/db": "jsr:@nostrify/db@^0.31.2", "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.30.0", "@scure/base": "npm:@scure/base@^1.1.6", "@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs", diff --git a/deno.lock b/deno.lock index d7649ddc..8379888a 100644 --- a/deno.lock +++ b/deno.lock @@ -9,9 +9,9 @@ "jsr:@gleasonator/policy@0.2.0": "jsr:@gleasonator/policy@0.2.0", "jsr:@gleasonator/policy@0.4.0": "jsr:@gleasonator/policy@0.4.0", "jsr:@gleasonator/policy@0.4.1": "jsr:@gleasonator/policy@0.4.1", - "jsr:@hono/hono@^4.4.6": "jsr:@hono/hono@4.5.4", + "jsr:@hono/hono@^4.4.6": "jsr:@hono/hono@4.5.5", "jsr:@lambdalisue/async@^2.1.1": "jsr:@lambdalisue/async@2.1.1", - "jsr:@nostrify/db@^0.31.1": "jsr:@nostrify/db@0.31.1", + "jsr:@nostrify/db@^0.31.2": "jsr:@nostrify/db@0.31.2", "jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.5", "jsr:@nostrify/nostrify@^0.22.4": "jsr:@nostrify/nostrify@0.22.4", "jsr:@nostrify/nostrify@^0.22.5": "jsr:@nostrify/nostrify@0.22.5", @@ -138,11 +138,14 @@ "@hono/hono@4.5.4": { "integrity": "3792780b8460d5df0959b07c059db9325e4fa1a49f8b5aff7ab9bc870bdec8e3" }, + "@hono/hono@4.5.5": { + "integrity": "e5a63b5f535475cd80974b65fed23a138d0cbb91fe1cc9a17a7c7278e835c308" + }, "@lambdalisue/async@2.1.1": { "integrity": "1fc9bc6f4ed50215cd2f7217842b18cea80f81c25744f88f8c5eb4be5a1c9ab4" }, - "@nostrify/db@0.31.1": { - "integrity": "b0e2542f2b02cb67e1f936d1def69c0facc0bdba3d8c1e396a0026929f3e2317", + "@nostrify/db@0.31.2": { + "integrity": "a906b64edbf84a6b482cd7c9f5df2d2237c4ec42589116097d99ceb41347b1f5", "dependencies": [ "jsr:@nostrify/nostrify@^0.30.0", "jsr:@nostrify/types@^0.30.0", @@ -1830,7 +1833,7 @@ "jsr:@db/sqlite@^0.11.1", "jsr:@hono/hono@^4.4.6", "jsr:@lambdalisue/async@^2.1.1", - "jsr:@nostrify/db@^0.31.1", + "jsr:@nostrify/db@^0.31.2", "jsr:@nostrify/nostrify@^0.30.0", "jsr:@soapbox/kysely-deno-sqlite@^2.1.0", "jsr:@soapbox/stickynotes@^0.4.0", From 1a69df1cd7702078805a1bf63bfdc5a8453243c0 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 12 Aug 2024 23:04:12 -0500 Subject: [PATCH 18/18] Add timeout message to the relay --- src/controllers/nostr/relay.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index bed6453f..583fd153 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -106,6 +106,8 @@ function connectStream(socket: WebSocket, ip: string | undefined) { } catch (e) { if (e instanceof RelayError) { send(['CLOSED', subId, e.message]); + } else if (e.message.includes('timeout')) { + send(['CLOSED', subId, 'error: the relay could not respond fast enough']); } else { send(['CLOSED', subId, 'error: something went wrong']); }