diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index bb7838bb..48c5b253 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -35,11 +35,10 @@ test: postgres: stage: test - script: deno task db:migrate && deno task test + script: sleep 1 && deno task test services: - postgres:16 variables: DITTO_NSEC: nsec1zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zygs4rm7hz - DATABASE_URL: postgres://postgres:postgres@postgres:5432/postgres + TEST_DATABASE_URL: postgres://postgres:postgres@postgres:5432/postgres POSTGRES_HOST_AUTH_METHOD: trust - ALLOW_TO_USE_DATABASE_URL: true diff --git a/deno.json b/deno.json index 840caf77..4897cff4 100644 --- a/deno.json +++ b/deno.json @@ -27,7 +27,6 @@ "@/": "./src/", "@b-fuze/deno-dom": "jsr:@b-fuze/deno-dom@^0.1.47", "@bradenmacdonald/s3-lite-client": "jsr:@bradenmacdonald/s3-lite-client@^0.7.4", - "@db/sqlite": "jsr:@db/sqlite@^0.11.1", "@electric-sql/pglite": "npm:@soapbox.pub/pglite@^0.2.10", "@hono/hono": "jsr:@hono/hono@^4.4.6", "@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1", @@ -37,7 +36,6 @@ "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.30.1", "@scure/base": "npm:@scure/base@^1.1.6", "@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs", - "@soapbox/kysely-deno-sqlite": "jsr:@soapbox/kysely-deno-sqlite@^2.1.0", "@soapbox/kysely-pglite": "jsr:@soapbox/kysely-pglite@^0.0.1", "@soapbox/stickynotes": "jsr:@soapbox/stickynotes@^0.4.0", "@std/assert": "jsr:@std/assert@^0.225.1", diff --git a/deno.lock b/deno.lock index d9eee2d8..ca1f8be3 100644 --- a/deno.lock +++ b/deno.lock @@ -4,8 +4,6 @@ "specifiers": { "jsr:@b-fuze/deno-dom@^0.1.47": "jsr:@b-fuze/deno-dom@0.1.48", "jsr:@bradenmacdonald/s3-lite-client@^0.7.4": "jsr:@bradenmacdonald/s3-lite-client@0.7.6", - "jsr:@db/sqlite@^0.11.1": "jsr:@db/sqlite@0.11.1", - "jsr:@denosaurs/plug@1": "jsr:@denosaurs/plug@1.0.6", "jsr:@denosaurs/plug@1.0.3": "jsr:@denosaurs/plug@1.0.3", "jsr:@gleasonator/policy": "jsr:@gleasonator/policy@0.2.0", "jsr:@gleasonator/policy@0.2.0": "jsr:@gleasonator/policy@0.2.0", @@ -25,12 +23,9 @@ "jsr:@nostrify/policies@^0.33.0": "jsr:@nostrify/policies@0.33.0", "jsr:@nostrify/types@^0.30.0": "jsr:@nostrify/types@0.30.1", "jsr:@nostrify/types@^0.30.1": "jsr:@nostrify/types@0.30.1", - "jsr:@soapbox/kysely-deno-sqlite@^2.1.0": "jsr:@soapbox/kysely-deno-sqlite@2.2.0", "jsr:@soapbox/kysely-pglite@^0.0.1": "jsr:@soapbox/kysely-pglite@0.0.1", "jsr:@soapbox/stickynotes@^0.4.0": "jsr:@soapbox/stickynotes@0.4.0", "jsr:@std/assert@^0.213.1": "jsr:@std/assert@0.213.1", - "jsr:@std/assert@^0.217.0": "jsr:@std/assert@0.217.0", - "jsr:@std/assert@^0.221.0": "jsr:@std/assert@0.221.0", "jsr:@std/assert@^0.224.0": "jsr:@std/assert@0.224.0", "jsr:@std/assert@^0.225.1": "jsr:@std/assert@0.225.3", "jsr:@std/bytes@^0.224.0": "jsr:@std/bytes@0.224.0", @@ -41,22 +36,17 @@ "jsr:@std/crypto@^0.224.0": "jsr:@std/crypto@0.224.0", "jsr:@std/dotenv@^0.224.0": "jsr:@std/dotenv@0.224.2", "jsr:@std/encoding@0.213.1": "jsr:@std/encoding@0.213.1", - "jsr:@std/encoding@^0.221.0": "jsr:@std/encoding@0.221.0", "jsr:@std/encoding@^0.224.0": "jsr:@std/encoding@0.224.3", "jsr:@std/encoding@^0.224.1": "jsr:@std/encoding@0.224.3", "jsr:@std/fmt@0.213.1": "jsr:@std/fmt@0.213.1", - "jsr:@std/fmt@^0.221.0": "jsr:@std/fmt@0.221.0", "jsr:@std/fs@0.213.1": "jsr:@std/fs@0.213.1", - "jsr:@std/fs@^0.221.0": "jsr:@std/fs@0.221.0", "jsr:@std/fs@^0.229.3": "jsr:@std/fs@0.229.3", "jsr:@std/internal@^1.0.0": "jsr:@std/internal@1.0.1", "jsr:@std/io@^0.224": "jsr:@std/io@0.224.7", "jsr:@std/json@^0.223.0": "jsr:@std/json@0.223.0", "jsr:@std/media-types@^0.224.1": "jsr:@std/media-types@0.224.1", "jsr:@std/path@0.213.1": "jsr:@std/path@0.213.1", - "jsr:@std/path@0.217": "jsr:@std/path@0.217.0", "jsr:@std/path@^0.213.1": "jsr:@std/path@0.213.1", - "jsr:@std/path@^0.221.0": "jsr:@std/path@0.221.0", "jsr:@std/streams@^0.223.0": "jsr:@std/streams@0.223.0", "npm:@isaacs/ttlcache@^1.4.1": "npm:@isaacs/ttlcache@1.4.1", "npm:@noble/hashes@^1.4.0": "npm:@noble/hashes@1.4.0", @@ -116,13 +106,6 @@ "jsr:@std/io@^0.224" ] }, - "@db/sqlite@0.11.1": { - "integrity": "546434e7ed762db07e6ade0f963540dd5e06723b802937bf260ff855b21ef9c5", - "dependencies": [ - "jsr:@denosaurs/plug@1", - "jsr:@std/path@0.217" - ] - }, "@denosaurs/plug@1.0.3": { "integrity": "b010544e386bea0ff3a1d05e0c88f704ea28cbd4d753439c2f1ee021a85d4640", "dependencies": [ @@ -132,15 +115,6 @@ "jsr:@std/path@0.213.1" ] }, - "@denosaurs/plug@1.0.6": { - "integrity": "6cf5b9daba7799837b9ffbe89f3450510f588fafef8115ddab1ff0be9cb7c1a7", - "dependencies": [ - "jsr:@std/encoding@^0.221.0", - "jsr:@std/fmt@^0.221.0", - "jsr:@std/fs@^0.221.0", - "jsr:@std/path@^0.221.0" - ] - }, "@gleasonator/policy@0.2.0": { "integrity": "3fe58b853ab203b2b67e65b64391dbcf5c07bc1caaf46e97b2f8ed5b14f30fdf", "dependencies": [ @@ -293,12 +267,6 @@ "@nostrify/types@0.30.1": { "integrity": "245da176f6893a43250697db51ad32bfa29bf9b1cdc1ca218043d9abf6de5ae5" }, - "@soapbox/kysely-deno-sqlite@2.2.0": { - "integrity": "668ec94600bc4b4d7bd618dd7ca65d4ef30ee61c46ffcb379b6f45203c08517a", - "dependencies": [ - "npm:kysely@^0.27.2" - ] - }, "@soapbox/kysely-pglite@0.0.1": { "integrity": "7a4221aa780aad6fba9747c45c59dfb1c62017ba8cad9db5607f6e5822c058d5", "dependencies": [ @@ -311,12 +279,6 @@ "@std/assert@0.213.1": { "integrity": "24c28178b30c8e0782c18e8e94ea72b16282207569cdd10ffb9d1d26f2edebfe" }, - "@std/assert@0.217.0": { - "integrity": "c98e279362ca6982d5285c3b89517b757c1e3477ee9f14eb2fdf80a45aaa9642" - }, - "@std/assert@0.221.0": { - "integrity": "a5f1aa6e7909dbea271754fd4ab3f4e687aeff4873b4cef9a320af813adb489a" - }, "@std/assert@0.224.0": { "integrity": "8643233ec7aec38a940a8264a6e3eed9bfa44e7a71cc6b3c8874213ff401967f" }, @@ -351,18 +313,12 @@ "@std/encoding@0.213.1": { "integrity": "fcbb6928713dde941a18ca5db88ca1544d0755ec8fb20fe61e2dc8144b390c62" }, - "@std/encoding@0.221.0": { - "integrity": "d1dd76ef0dc5d14088411e6dc1dede53bf8308c95d1537df1214c97137208e45" - }, "@std/encoding@0.224.3": { "integrity": "5e861b6d81be5359fad4155e591acf17c0207b595112d1840998bb9f476dbdaf" }, "@std/fmt@0.213.1": { "integrity": "a06d31777566d874b9c856c10244ac3e6b660bdec4c82506cd46be052a1082c3" }, - "@std/fmt@0.221.0": { - "integrity": "379fed69bdd9731110f26b9085aeb740606b20428ce6af31ef6bd45ef8efa62a" - }, "@std/fs@0.213.1": { "integrity": "fbcaf099f8a85c27ab0712b666262cda8fe6d02e9937bf9313ecaea39a22c501", "dependencies": [ @@ -370,13 +326,6 @@ "jsr:@std/path@^0.213.1" ] }, - "@std/fs@0.221.0": { - "integrity": "028044450299de8ed5a716ade4e6d524399f035513b85913794f4e81f07da286", - "dependencies": [ - "jsr:@std/assert@^0.221.0", - "jsr:@std/path@^0.221.0" - ] - }, "@std/fs@0.229.3": { "integrity": "783bca21f24da92e04c3893c9e79653227ab016c48e96b3078377ebd5222e6eb" }, @@ -434,18 +383,6 @@ "jsr:@std/assert@^0.213.1" ] }, - "@std/path@0.217.0": { - "integrity": "1217cc25534bca9a2f672d7fe7c6f356e4027df400c0e85c0ef3e4343bc67d11", - "dependencies": [ - "jsr:@std/assert@^0.217.0" - ] - }, - "@std/path@0.221.0": { - "integrity": "0a36f6b17314ef653a3a1649740cc8db51b25a133ecfe838f20b79a56ebe0095", - "dependencies": [ - "jsr:@std/assert@^0.221.0" - ] - }, "@std/streams@0.223.0": { "integrity": "d6b28e498ced3960b04dc5d251f2dcfc1df244b5ec5a48dc23a8f9b490be3b99" } @@ -1972,12 +1909,10 @@ "dependencies": [ "jsr:@b-fuze/deno-dom@^0.1.47", "jsr:@bradenmacdonald/s3-lite-client@^0.7.4", - "jsr:@db/sqlite@^0.11.1", "jsr:@hono/hono@^4.4.6", "jsr:@lambdalisue/async@^2.1.1", "jsr:@nostrify/db@^0.31.2", "jsr:@nostrify/nostrify@^0.30.1", - "jsr:@soapbox/kysely-deno-sqlite@^2.1.0", "jsr:@soapbox/kysely-pglite@^0.0.1", "jsr:@soapbox/stickynotes@^0.4.0", "jsr:@std/assert@^0.225.1", diff --git a/docs/debugging.md b/docs/debugging.md index 6abc513d..879f36cd 100644 --- a/docs/debugging.md +++ b/docs/debugging.md @@ -16,12 +16,12 @@ ssh -L 9229:localhost:9229 @ Then, in Chromium, go to `chrome://inspect` and the Ditto server should be available. -## SQLite performance +## SQL performance -To track slow queries, first set `DEBUG=ditto:sqlite.worker` in the environment so only SQLite logs are shown. +To track slow queries, first set `DEBUG=ditto:sql` in the environment so only SQL logs are shown. Then, grep for any logs above 0.001s: ```sh journalctl -fu ditto | grep -v '(0.00s)' -``` \ No newline at end of file +``` diff --git a/scripts/admin-event.ts b/scripts/admin-event.ts index 71c957f4..313aa051 100644 --- a/scripts/admin-event.ts +++ b/scripts/admin-event.ts @@ -9,8 +9,8 @@ import { nostrNow } from '@/utils.ts'; const signer = new AdminSigner(); -const db = await DittoDB.getInstance(); -const eventsDB = new EventsDB(db); +const { kysely } = await DittoDB.getInstance(); +const eventsDB = new EventsDB(kysely); const readable = Deno.stdin.readable .pipeThrough(new TextDecoderStream()) diff --git a/scripts/admin-role.ts b/scripts/admin-role.ts index 3f3c53f2..99986817 100644 --- a/scripts/admin-role.ts +++ b/scripts/admin-role.ts @@ -6,8 +6,8 @@ import { AdminSigner } from '@/signers/AdminSigner.ts'; import { EventsDB } from '@/storages/EventsDB.ts'; import { nostrNow } from '@/utils.ts'; -const db = await DittoDB.getInstance(); -const eventsDB = new EventsDB(db); +const { kysely } = await DittoDB.getInstance(); +const eventsDB = new EventsDB(kysely); const [pubkeyOrNpub, role] = Deno.args; const pubkey = pubkeyOrNpub.startsWith('npub1') ? nip19.decode(pubkeyOrNpub as `npub1${string}`).data : pubkeyOrNpub; diff --git a/scripts/db-migrate.ts b/scripts/db-migrate.ts index ab0b9747..0e1d694d 100644 --- a/scripts/db-migrate.ts +++ b/scripts/db-migrate.ts @@ -1,11 +1,4 @@ -import { Conf } from '@/config.ts'; import { DittoDB } from '@/db/DittoDB.ts'; -import { sleep } from '@/test.ts'; - -if (Deno.env.get('CI') && Conf.db.dialect === 'postgres') { - console.info('Waiting 1 second for postgres to start...'); - await sleep(1_000); -} // This migrates kysely internally. const { kysely } = await DittoDB.getInstance(); diff --git a/scripts/nostr-pull.ts b/scripts/nostr-pull.ts index 0556b64a..4b9d51db 100644 --- a/scripts/nostr-pull.ts +++ b/scripts/nostr-pull.ts @@ -9,8 +9,8 @@ import { nip19 } from 'nostr-tools'; import { DittoDB } from '@/db/DittoDB.ts'; import { EventsDB } from '@/storages/EventsDB.ts'; -const db = await DittoDB.getInstance(); -const eventsDB = new EventsDB(db); +const { kysely } = await DittoDB.getInstance(); +const eventsDB = new EventsDB(kysely); interface ImportEventsOpts { profilesOnly: boolean; diff --git a/scripts/setup.ts b/scripts/setup.ts index 9a6d6f34..1365fec5 100644 --- a/scripts/setup.ts +++ b/scripts/setup.ts @@ -45,10 +45,10 @@ const DATABASE_URL = Deno.env.get('DATABASE_URL'); if (DATABASE_URL) { vars.DATABASE_URL = await question('input', 'Database URL', DATABASE_URL); } else { - const database = await question('list', 'Which database do you want to use?', ['postgres', 'sqlite']); - if (database === 'sqlite') { - const path = await question('input', 'Path to SQLite database', 'data/db.sqlite3'); - vars.DATABASE_URL = `sqlite://${path}`; + const database = await question('list', 'Which database do you want to use?', ['postgres', 'pglite']); + if (database === 'pglite') { + const path = await question('input', 'Path to PGlite data directory', 'data/pgdata'); + vars.DATABASE_URL = `file://${path}`; } if (database === 'postgres') { const host = await question('input', 'Postgres host', 'localhost'); diff --git a/scripts/stats-recompute.ts b/scripts/stats-recompute.ts index 107a3167..7d6f721f 100644 --- a/scripts/stats-recompute.ts +++ b/scripts/stats-recompute.ts @@ -18,6 +18,6 @@ try { } const store = await Storages.db(); -const kysely = await DittoDB.getInstance(); +const { kysely } = await DittoDB.getInstance(); await refreshAuthorStats({ pubkey, kysely, store }); diff --git a/src/config.ts b/src/config.ts index 145ed9e4..5fa7be9a 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,5 +1,3 @@ -import url from 'node:url'; - import * as dotenv from '@std/dotenv'; import { getPublicKey, nip19 } from 'nostr-tools'; import { z } from 'zod'; @@ -82,23 +80,13 @@ class Conf { * ``` */ static get databaseUrl(): string { - return Deno.env.get('DATABASE_URL') ?? 'pglite://data/pgdata'; + return Deno.env.get('DATABASE_URL') ?? 'file://data/pgdata'; + } + /** Database to use in tests. */ + static get testDatabaseUrl(): string { + return Deno.env.get('TEST_DATABASE_URL') ?? 'memory://'; } static db = { - get url(): url.UrlWithStringQuery { - return url.parse(Conf.databaseUrl); - }, - get dialect(): 'sqlite' | 'postgres' | undefined { - switch (Conf.db.url.protocol) { - case 'sqlite:': - return 'sqlite'; - case 'pglite:': - case 'postgres:': - case 'postgresql:': - return 'postgres'; - } - return undefined; - }, /** Database query timeout configurations. */ timeouts: { /** Default query timeout when another setting isn't more specific. */ @@ -217,21 +205,6 @@ class Conf { static get sentryDsn(): string | undefined { return Deno.env.get('SENTRY_DSN'); } - /** SQLite settings. */ - static sqlite = { - /** - * Number of bytes to use for memory-mapped IO. - * https://www.sqlite.org/pragma.html#pragma_mmap_size - */ - get mmapSize(): number { - const value = Deno.env.get('SQLITE_MMAP_SIZE'); - if (value) { - return Number(value); - } else { - return 1024 * 1024 * 1024; - } - }, - }; /** Postgres settings. */ static pg = { /** Number of connections to use in the pool. */ diff --git a/src/controllers/metrics.ts b/src/controllers/metrics.ts index e85294c0..e25522ff 100644 --- a/src/controllers/metrics.ts +++ b/src/controllers/metrics.ts @@ -6,9 +6,11 @@ import { dbAvailableConnectionsGauge, dbPoolSizeGauge } from '@/metrics.ts'; /** Prometheus/OpenMetrics controller. */ export const metricsController: AppController = async (c) => { + const db = await DittoDB.getInstance(); + // Update some metrics at request time. - dbPoolSizeGauge.set(DittoDB.poolSize); - dbAvailableConnectionsGauge.set(DittoDB.availableConnections); + dbPoolSizeGauge.set(db.poolSize); + dbAvailableConnectionsGauge.set(db.availableConnections); const metrics = await register.metrics(); diff --git a/src/db/DittoDB.ts b/src/db/DittoDB.ts index 40d03e5f..63d2bfb1 100644 --- a/src/db/DittoDB.ts +++ b/src/db/DittoDB.ts @@ -1,75 +1,44 @@ import fs from 'node:fs/promises'; import path from 'node:path'; -import { NDatabaseSchema, NPostgresSchema } from '@nostrify/db'; import { FileMigrationProvider, Kysely, Migrator } from 'kysely'; import { Conf } from '@/config.ts'; import { DittoPglite } from '@/db/adapters/DittoPglite.ts'; import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts'; -import { DittoSQLite } from '@/db/adapters/DittoSQLite.ts'; +import { DittoDatabase, DittoDatabaseOpts } from '@/db/DittoDatabase.ts'; import { DittoTables } from '@/db/DittoTables.ts'; -export type DittoDatabase = { - dialect: 'sqlite'; - kysely: Kysely & Kysely; -} | { - dialect: 'postgres'; - kysely: Kysely & Kysely; -}; - export class DittoDB { - private static db: Promise | undefined; + private static db: DittoDatabase | undefined; - static getInstance(): Promise { + /** Create (and migrate) the database if it isn't been already, or return the existing connection. */ + static async getInstance(): Promise { if (!this.db) { - this.db = this._getInstance(); + this.db = this.create(Conf.databaseUrl, { poolSize: Conf.pg.poolSize }); + await this.migrate(this.db.kysely); } return this.db; } - static async _getInstance(): Promise { - const result = {} as DittoDatabase; + /** Open a new database connection. */ + static create(databaseUrl: string, opts?: DittoDatabaseOpts): DittoDatabase { + const { protocol } = new URL(databaseUrl); - switch (Conf.db.url.protocol) { - case 'sqlite:': - result.dialect = 'sqlite'; - result.kysely = await DittoSQLite.getInstance(); - break; - case 'pglite:': - result.dialect = 'postgres'; - result.kysely = await DittoPglite.getInstance(); - break; + switch (protocol) { + case 'file:': + case 'memory:': + return DittoPglite.create(databaseUrl); case 'postgres:': case 'postgresql:': - result.dialect = 'postgres'; - result.kysely = await DittoPostgres.getInstance(); - break; + return DittoPostgres.create(databaseUrl, opts); default: throw new Error('Unsupported database URL.'); } - - await this.migrate(result.kysely); - - return result; - } - - static get poolSize(): number { - if (Conf.db.dialect === 'postgres') { - return DittoPostgres.poolSize; - } - return 1; - } - - static get availableConnections(): number { - if (Conf.db.dialect === 'postgres') { - return DittoPostgres.availableConnections; - } - return 1; } /** Migrate the database to the latest version. */ - static async migrate(kysely: DittoDatabase['kysely']) { + static async migrate(kysely: Kysely) { const migrator = new Migrator({ db: kysely, provider: new FileMigrationProvider({ diff --git a/src/db/DittoDatabase.ts b/src/db/DittoDatabase.ts new file mode 100644 index 00000000..530d9391 --- /dev/null +++ b/src/db/DittoDatabase.ts @@ -0,0 +1,13 @@ +import { Kysely } from 'kysely'; + +import { DittoTables } from '@/db/DittoTables.ts'; + +export interface DittoDatabase { + readonly kysely: Kysely; + readonly poolSize: number; + readonly availableConnections: number; +} + +export interface DittoDatabaseOpts { + poolSize?: number; +} diff --git a/src/db/DittoTables.ts b/src/db/DittoTables.ts index 09bf3e43..a62c485d 100644 --- a/src/db/DittoTables.ts +++ b/src/db/DittoTables.ts @@ -1,4 +1,6 @@ -export interface DittoTables { +import { NPostgresSchema } from '@nostrify/db'; + +export interface DittoTables extends NPostgresSchema { nip46_tokens: NIP46TokenRow; author_stats: AuthorStatsRow; event_stats: EventStatsRow; diff --git a/src/db/adapters/DittoPglite.ts b/src/db/adapters/DittoPglite.ts index ef035a3b..4ec7d8a5 100644 --- a/src/db/adapters/DittoPglite.ts +++ b/src/db/adapters/DittoPglite.ts @@ -1,56 +1,24 @@ import { PGlite } from '@electric-sql/pglite'; -import { NPostgresSchema } from '@nostrify/db'; import { PgliteDialect } from '@soapbox/kysely-pglite'; import { Kysely } from 'kysely'; -import { Conf } from '@/config.ts'; +import { DittoDatabase } from '@/db/DittoDatabase.ts'; import { DittoTables } from '@/db/DittoTables.ts'; import { KyselyLogger } from '@/db/KyselyLogger.ts'; export class DittoPglite { - static db: Kysely & Kysely | undefined; + static create(databaseUrl: string): DittoDatabase { + const kysely = new Kysely({ + dialect: new PgliteDialect({ + database: new PGlite(databaseUrl), + }), + log: KyselyLogger, + }); - // deno-lint-ignore require-await - static async getInstance(): Promise & Kysely> { - if (!this.db) { - this.db = new Kysely({ - dialect: new PgliteDialect({ - database: new PGlite(this.path), - }), - log: KyselyLogger, - }) as Kysely & Kysely; - } - - return this.db; - } - - static get poolSize() { - return 1; - } - - static get availableConnections(): number { - return 1; - } - - /** Get the relative or absolute path based on the `DATABASE_URL`. */ - static get path(): string | undefined { - if (Conf.databaseUrl === 'pglite://:memory:') { - return undefined; - } - - const { host, pathname } = Conf.db.url; - - if (!pathname) return ''; - - // Get relative path. - if (host === '') { - return pathname; - } else if (host === '.') { - return pathname; - } else if (host) { - return host + pathname; - } - - return ''; + return { + kysely, + poolSize: 1, + availableConnections: 1, + }; } } diff --git a/src/db/adapters/DittoPostgres.ts b/src/db/adapters/DittoPostgres.ts index d1127117..f1a5bcc9 100644 --- a/src/db/adapters/DittoPostgres.ts +++ b/src/db/adapters/DittoPostgres.ts @@ -1,4 +1,3 @@ -import { NPostgresSchema } from '@nostrify/db'; import { BinaryOperationNode, FunctionNode, @@ -13,51 +12,43 @@ import { import { PostgresJSDialectConfig, PostgresJSDriver } from 'kysely-postgres-js'; import postgres from 'postgres'; -import { Conf } from '@/config.ts'; +import { DittoDatabase, DittoDatabaseOpts } from '@/db/DittoDatabase.ts'; import { DittoTables } from '@/db/DittoTables.ts'; import { KyselyLogger } from '@/db/KyselyLogger.ts'; export class DittoPostgres { - static db: Kysely & Kysely | undefined; - static postgres?: postgres.Sql; + static create(databaseUrl: string, opts?: DittoDatabaseOpts): DittoDatabase { + const pg = postgres(databaseUrl, { max: opts?.poolSize }); - // deno-lint-ignore require-await - static async getInstance(): Promise & Kysely> { - if (!this.postgres) { - this.postgres = postgres(Conf.databaseUrl, { max: Conf.pg.poolSize }); - } - - if (!this.db) { - this.db = new Kysely({ - dialect: { - createAdapter() { - return new PostgresAdapter(); - }, - createDriver() { - return new PostgresJSDriver({ - postgres: DittoPostgres.postgres as unknown as PostgresJSDialectConfig['postgres'], - }); - }, - createIntrospector(db) { - return new PostgresIntrospector(db); - }, - createQueryCompiler() { - return new DittoPostgresQueryCompiler(); - }, + const kysely = new Kysely({ + dialect: { + createAdapter() { + return new PostgresAdapter(); }, - log: KyselyLogger, - }) as Kysely & Kysely; - } + createDriver() { + return new PostgresJSDriver({ + postgres: pg as unknown as PostgresJSDialectConfig['postgres'], + }); + }, + createIntrospector(db) { + return new PostgresIntrospector(db); + }, + createQueryCompiler() { + return new DittoPostgresQueryCompiler(); + }, + }, + log: KyselyLogger, + }); - return this.db; - } - - static get poolSize() { - return this.postgres?.connections.open ?? 0; - } - - static get availableConnections(): number { - return this.postgres?.connections.idle ?? 0; + return { + kysely, + get poolSize() { + return pg.connections.open; + }, + get availableConnections() { + return pg.connections.idle; + }, + }; } } diff --git a/src/db/adapters/DittoSQLite.ts b/src/db/adapters/DittoSQLite.ts deleted file mode 100644 index e54292dd..00000000 --- a/src/db/adapters/DittoSQLite.ts +++ /dev/null @@ -1,59 +0,0 @@ -import { NDatabaseSchema } from '@nostrify/db'; -import { PolySqliteDialect } from '@soapbox/kysely-deno-sqlite'; -import { Kysely, sql } from 'kysely'; - -import { Conf } from '@/config.ts'; -import { DittoTables } from '@/db/DittoTables.ts'; -import { KyselyLogger } from '@/db/KyselyLogger.ts'; -import SqliteWorker from '@/workers/sqlite.ts'; - -export class DittoSQLite { - static db: Kysely & Kysely | undefined; - - static async getInstance(): Promise & Kysely> { - if (!this.db) { - const sqliteWorker = new SqliteWorker(); - await sqliteWorker.open(this.path); - - this.db = new Kysely({ - dialect: new PolySqliteDialect({ - database: sqliteWorker, - }), - log: KyselyLogger, - }) as Kysely & Kysely; - - // Set PRAGMA values. - await Promise.all([ - sql`PRAGMA synchronous = normal`.execute(this.db), - sql`PRAGMA temp_store = memory`.execute(this.db), - sql`PRAGMA foreign_keys = ON`.execute(this.db), - sql`PRAGMA auto_vacuum = FULL`.execute(this.db), - sql`PRAGMA journal_mode = WAL`.execute(this.db), - sql.raw(`PRAGMA mmap_size = ${Conf.sqlite.mmapSize}`).execute(this.db), - ]); - } - return this.db; - } - - /** Get the relative or absolute path based on the `DATABASE_URL`. */ - static get path() { - if (Conf.databaseUrl === 'sqlite://:memory:') { - return ':memory:'; - } - - const { host, pathname } = Conf.db.url; - - if (!pathname) return ''; - - // Get relative path. - if (host === '') { - return pathname; - } else if (host === '.') { - return pathname; - } else if (host) { - return host + pathname; - } - - return ''; - } -} diff --git a/src/db/migrations/002_events_fts.ts b/src/db/migrations/002_events_fts.ts index 56abab5f..45ad03e4 100644 --- a/src/db/migrations/002_events_fts.ts +++ b/src/db/migrations/002_events_fts.ts @@ -1,13 +1,8 @@ -import { Kysely, sql } from 'kysely'; +import { Kysely } from 'kysely'; -import { Conf } from '@/config.ts'; - -export async function up(db: Kysely): Promise { - if (Conf.db.dialect === 'sqlite') { - await sql`CREATE VIRTUAL TABLE events_fts USING fts5(id, content)`.execute(db); - } +export async function up(_db: Kysely): Promise { + // This migration used to create an FTS table for SQLite, but SQLite support was removed. } -export async function down(db: Kysely): Promise { - await db.schema.dropTable('events_fts').ifExists().execute(); +export async function down(_db: Kysely): Promise { } diff --git a/src/db/migrations/019_ndatabase_schema.ts b/src/db/migrations/019_ndatabase_schema.ts index 31b86cd3..79d8cbc9 100644 --- a/src/db/migrations/019_ndatabase_schema.ts +++ b/src/db/migrations/019_ndatabase_schema.ts @@ -1,25 +1,13 @@ -import { Kysely, sql } from 'kysely'; - -import { Conf } from '@/config.ts'; +import { Kysely } from 'kysely'; export async function up(db: Kysely): Promise { await db.schema.alterTable('events').renameTo('nostr_events').execute(); await db.schema.alterTable('tags').renameTo('nostr_tags').execute(); await db.schema.alterTable('nostr_tags').renameColumn('tag', 'name').execute(); - - if (Conf.db.dialect === 'sqlite') { - await db.schema.dropTable('events_fts').execute(); - await sql`CREATE VIRTUAL TABLE nostr_fts5 USING fts5(event_id, content)`.execute(db); - } } export async function down(db: Kysely): Promise { await db.schema.alterTable('nostr_events').renameTo('events').execute(); await db.schema.alterTable('nostr_tags').renameTo('tags').execute(); await db.schema.alterTable('tags').renameColumn('name', 'tag').execute(); - - if (Conf.db.dialect === 'sqlite') { - await db.schema.dropTable('nostr_fts5').execute(); - await sql`CREATE VIRTUAL TABLE events_fts USING fts5(id, content)`.execute(db); - } } diff --git a/src/db/migrations/020_pgfts.ts b/src/db/migrations/020_pgfts.ts index 835de117..26e320ec 100644 --- a/src/db/migrations/020_pgfts.ts +++ b/src/db/migrations/020_pgfts.ts @@ -1,19 +1,13 @@ import { Kysely, sql } from 'kysely'; -import { Conf } from '@/config.ts'; - export async function up(db: Kysely): Promise { - if (Conf.db.dialect === 'postgres') { - await db.schema.createTable('nostr_pgfts') - .ifNotExists() - .addColumn('event_id', 'text', (c) => c.primaryKey().references('nostr_events.id').onDelete('cascade')) - .addColumn('search_vec', sql`tsvector`, (c) => c.notNull()) - .execute(); - } + await db.schema.createTable('nostr_pgfts') + .ifNotExists() + .addColumn('event_id', 'text', (c) => c.primaryKey().references('nostr_events.id').onDelete('cascade')) + .addColumn('search_vec', sql`tsvector`, (c) => c.notNull()) + .execute(); } export async function down(db: Kysely): Promise { - if (Conf.db.dialect === 'postgres') { - await db.schema.dropTable('nostr_pgfts').ifExists().execute(); - } + await db.schema.dropTable('nostr_pgfts').ifExists().execute(); } diff --git a/src/db/migrations/021_pgfts_index.ts b/src/db/migrations/021_pgfts_index.ts index 4b834995..7ad24546 100644 --- a/src/db/migrations/021_pgfts_index.ts +++ b/src/db/migrations/021_pgfts_index.ts @@ -1,21 +1,15 @@ import { Kysely } from 'kysely'; -import { Conf } from '@/config.ts'; - export async function up(db: Kysely): Promise { - if (Conf.db.dialect === 'postgres') { - await db.schema - .createIndex('nostr_pgfts_gin_search_vec') - .ifNotExists() - .on('nostr_pgfts') - .using('gin') - .column('search_vec') - .execute(); - } + await db.schema + .createIndex('nostr_pgfts_gin_search_vec') + .ifNotExists() + .on('nostr_pgfts') + .using('gin') + .column('search_vec') + .execute(); } export async function down(db: Kysely): Promise { - if (Conf.db.dialect === 'postgres') { - await db.schema.dropIndex('nostr_pgfts_gin_search_vec').ifExists().execute(); - } + await db.schema.dropIndex('nostr_pgfts_gin_search_vec').ifExists().execute(); } diff --git a/src/db/migrations/030_pg_events_jsonb.ts b/src/db/migrations/030_pg_events_jsonb.ts index 7bfc6c17..dcd6ad85 100644 --- a/src/db/migrations/030_pg_events_jsonb.ts +++ b/src/db/migrations/030_pg_events_jsonb.ts @@ -1,10 +1,6 @@ import { Kysely, sql } from 'kysely'; -import { Conf } from '@/config.ts'; - export async function up(db: Kysely): Promise { - if (Conf.db.dialect !== 'postgres') return; - // Create new table and indexes. await db.schema .createTable('nostr_events_new') diff --git a/src/pipeline.test.ts b/src/pipeline.test.ts index 2af2b8c3..76b1fe51 100644 --- a/src/pipeline.test.ts +++ b/src/pipeline.test.ts @@ -1,7 +1,7 @@ import { assertEquals } from '@std/assert'; import { generateSecretKey } from 'nostr-tools'; -import { createTestDB, genEvent, getTestDB } from '@/test.ts'; +import { createTestDB, genEvent } from '@/test.ts'; import { handleZaps } from '@/pipeline.ts'; Deno.test('store one zap receipt in nostr_events; convert it into event_zaps table format and store it', async () => { @@ -58,7 +58,7 @@ Deno.test('store one zap receipt in nostr_events; convert it into event_zaps tab // If no error happens = ok Deno.test('zap receipt does not have a "description" tag', async () => { - await using db = await getTestDB(); + await using db = await createTestDB(); const kysely = db.kysely; const sk = generateSecretKey(); @@ -71,7 +71,7 @@ Deno.test('zap receipt does not have a "description" tag', async () => { }); Deno.test('zap receipt does not have a zap request stringified value in the "description" tag', async () => { - await using db = await getTestDB(); + await using db = await createTestDB(); const kysely = db.kysely; const sk = generateSecretKey(); @@ -84,7 +84,7 @@ Deno.test('zap receipt does not have a zap request stringified value in the "des }); Deno.test('zap receipt does not have a "bolt11" tag', async () => { - await using db = await getTestDB(); + await using db = await createTestDB(); const kysely = db.kysely; const sk = generateSecretKey(); @@ -103,7 +103,7 @@ Deno.test('zap receipt does not have a "bolt11" tag', async () => { }); Deno.test('zap request inside zap receipt does not have an "e" tag', async () => { - await using db = await getTestDB(); + await using db = await createTestDB(); const kysely = db.kysely; const sk = generateSecretKey(); diff --git a/src/storages.ts b/src/storages.ts index c49b62cf..7114a3d6 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -16,12 +16,12 @@ export class Storages { private static _pubsub: Promise | undefined; private static _search: Promise | undefined; - /** SQLite database to store events this Ditto server cares about. */ + /** SQL database to store events this Ditto server cares about. */ public static async db(): Promise { if (!this._db) { this._db = (async () => { - const db = await DittoDB.getInstance(); - const store = new EventsDB(db); + const { kysely } = await DittoDB.getInstance(); + const store = new EventsDB(kysely); await seedZapSplits(store); return store; })(); diff --git a/src/storages/EventsDB.ts b/src/storages/EventsDB.ts index 7cbd9c22..6c006dc2 100644 --- a/src/storages/EventsDB.ts +++ b/src/storages/EventsDB.ts @@ -13,10 +13,11 @@ import { NStore, } from '@nostrify/nostrify'; import { Stickynotes } from '@soapbox/stickynotes'; +import { Kysely } from 'kysely'; import { nip27 } from 'nostr-tools'; import { Conf } from '@/config.ts'; -import { DittoDatabase } from '@/db/DittoDB.ts'; +import { DittoTables } from '@/db/DittoTables.ts'; import { dbEventsCounter } from '@/metrics.ts'; import { RelayError } from '@/RelayError.ts'; import { purifyEvent } from '@/storages/hydrate.ts'; @@ -30,7 +31,7 @@ type TagCondition = ({ event, count, value }: { value: string; }) => boolean; -/** SQLite database storage adapter for Nostr events. */ +/** SQL database storage adapter for Nostr events. */ class EventsDB implements NStore { private store: NDatabase | NPostgres; private console = new Stickynotes('ditto:db:events'); @@ -52,21 +53,11 @@ class EventsDB implements NStore { 't': ({ event, count, value }) => (event.kind === 1985 ? count < 20 : count < 5) && value.length < 50, }; - constructor(private database: DittoDatabase) { - const { dialect, kysely } = database; - - if (dialect === 'postgres') { - this.store = new NPostgres(kysely, { - indexTags: EventsDB.indexTags, - indexSearch: EventsDB.searchText, - }); - } else { - this.store = new NDatabase(kysely, { - fts: 'sqlite', - indexTags: EventsDB.indexTags, - searchText: EventsDB.searchText, - }); - } + constructor(private kysely: Kysely) { + this.store = new NPostgres(kysely, { + indexTags: EventsDB.indexTags, + indexSearch: EventsDB.searchText, + }); } /** Insert an event (and its tags) into the database. */ @@ -273,7 +264,7 @@ class EventsDB implements NStore { return tags.map(([_tag, value]) => value).join('\n'); } - /** Converts filters to more performant, simpler filters that are better for SQLite. */ + /** Converts filters to more performant, simpler filters. */ async expandFilters(filters: NostrFilter[]): Promise { filters = structuredClone(filters); @@ -286,7 +277,7 @@ class EventsDB implements NStore { ) as { key: 'domain'; value: string } | undefined)?.value; if (domain) { - const query = this.database.kysely + const query = this.kysely .selectFrom('pubkey_domains') .select('pubkey') .where('domain', '=', domain); diff --git a/src/test.ts b/src/test.ts index df6c84f6..8b3dad80 100644 --- a/src/test.ts +++ b/src/test.ts @@ -1,21 +1,10 @@ -import fs from 'node:fs/promises'; -import path from 'node:path'; - -import { Database as Sqlite } from '@db/sqlite'; -import { NDatabase, NDatabaseSchema, NPostgresSchema } from '@nostrify/db'; import { NostrEvent } from '@nostrify/nostrify'; -import { DenoSqlite3Dialect } from '@soapbox/kysely-deno-sqlite'; import { finalizeEvent, generateSecretKey } from 'nostr-tools'; -import { FileMigrationProvider, Kysely, Migrator } from 'kysely'; -import { PostgresJSDialect, PostgresJSDialectConfig } from 'kysely-postgres-js'; -import postgres from 'postgres'; -import { DittoDatabase, DittoDB } from '@/db/DittoDB.ts'; -import { DittoTables } from '@/db/DittoTables.ts'; -import { purifyEvent } from '@/storages/hydrate.ts'; -import { KyselyLogger } from '@/db/KyselyLogger.ts'; -import { EventsDB } from '@/storages/EventsDB.ts'; import { Conf } from '@/config.ts'; +import { DittoDB } from '@/db/DittoDB.ts'; +import { purifyEvent } from '@/storages/hydrate.ts'; +import { EventsDB } from '@/storages/EventsDB.ts'; /** Import an event fixture by name in tests. */ export async function eventFixture(name: string): Promise { @@ -42,97 +31,22 @@ export function genEvent(t: Partial = {}, sk: Uint8Array = generateS return purifyEvent(event); } -/** Get an in-memory SQLite database to use for testing. It's automatically destroyed when it goes out of scope. */ -export async function getTestDB() { - const kysely = new Kysely({ - dialect: new DenoSqlite3Dialect({ - database: new Sqlite(':memory:'), - }), - }); +/** Create a database for testing. It uses `TEST_DATABASE_URL`, or creates an in-memory database by default. */ +export async function createTestDB() { + const { testDatabaseUrl } = Conf; + const { protocol } = new URL(testDatabaseUrl); + const { kysely } = DittoDB.create(testDatabaseUrl, { poolSize: 1 }); - const migrator = new Migrator({ - db: kysely, - provider: new FileMigrationProvider({ - fs, - path, - migrationFolder: new URL(import.meta.resolve('./db/migrations')).pathname, - }), - }); - - await migrator.migrateToLatest(); - - const store = new NDatabase(kysely); + await DittoDB.migrate(kysely); + const store = new EventsDB(kysely); return { store, kysely, - [Symbol.asyncDispose]: () => kysely.destroy(), - }; -} - -/** Create an database for testing. */ -export const createTestDB = async (databaseUrl?: string) => { - databaseUrl ??= Deno.env.get('DATABASE_URL') ?? 'sqlite://:memory:'; - - let dialect: 'sqlite' | 'postgres' = (() => { - const protocol = databaseUrl.split(':')[0]; - switch (protocol) { - case 'sqlite': - return 'sqlite'; - case 'postgres': - return protocol; - case 'postgresql': - return 'postgres'; - default: - throw new Error(`Unsupported protocol: ${protocol}`); - } - })(); - - const allowToUseDATABASE_URL = Deno.env.get('ALLOW_TO_USE_DATABASE_URL')?.toLowerCase() ?? ''; - if (allowToUseDATABASE_URL !== 'true' && dialect === 'postgres') { - console.warn( - '%cRunning tests with sqlite, if you meant to use Postgres, run again with ALLOW_TO_USE_DATABASE_URL environment variable set to true', - 'color: yellow;', - ); - dialect = 'sqlite'; - } - - console.warn(`Using: ${dialect}`); - - const db: DittoDatabase = { dialect } as DittoDatabase; - - if (dialect === 'sqlite') { - // migration 021_pgfts_index.ts calls 'Conf.db.dialect', - // and this calls the DATABASE_URL environment variable. - // The following line ensures to NOT use the DATABASE_URL that may exist in an .env file. - Deno.env.set('DATABASE_URL', 'sqlite://:memory:'); - - db.kysely = new Kysely({ - dialect: new DenoSqlite3Dialect({ - database: new Sqlite(':memory:'), - }), - }) as Kysely & Kysely; - } else { - db.kysely = new Kysely({ - // @ts-ignore Kysely version mismatch. - dialect: new PostgresJSDialect({ - postgres: postgres(Conf.databaseUrl, { - max: Conf.pg.poolSize, - }) as unknown as PostgresJSDialectConfig['postgres'], - }), - log: KyselyLogger, - }) as Kysely & Kysely; - } - - await DittoDB.migrate(db.kysely); - const store = new EventsDB(db); - - return { - dialect, - store, - kysely: db.kysely, [Symbol.asyncDispose]: async () => { - if (dialect === 'postgres') { + // If we're testing against real Postgres, we will reuse the database + // between tests, so we should drop the tables to keep each test fresh. + if (['postgres:', 'postgresql:'].includes(protocol)) { for ( const table of [ 'author_stats', @@ -142,20 +56,17 @@ export const createTestDB = async (databaseUrl?: string) => { 'kysely_migration_lock', 'nip46_tokens', 'pubkey_domains', - 'unattached_media', 'nostr_events', - 'nostr_tags', - 'nostr_pgfts', 'event_zaps', ] ) { - await db.kysely.schema.dropTable(table).ifExists().cascade().execute(); + await kysely.schema.dropTable(table).ifExists().cascade().execute(); } - await db.kysely.destroy(); + await kysely.destroy(); } }, }; -}; +} export function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); diff --git a/src/trends.ts b/src/trends.ts index 91164143..337ee5c7 100644 --- a/src/trends.ts +++ b/src/trends.ts @@ -1,9 +1,10 @@ import { NostrFilter } from '@nostrify/nostrify'; import { Stickynotes } from '@soapbox/stickynotes'; -import { sql } from 'kysely'; +import { Kysely, sql } from 'kysely'; import { Conf } from '@/config.ts'; -import { DittoDatabase, DittoDB } from '@/db/DittoDB.ts'; +import { DittoDB } from '@/db/DittoDB.ts'; +import { DittoTables } from '@/db/DittoTables.ts'; import { handleEvent } from '@/pipeline.ts'; import { AdminSigner } from '@/signers/AdminSigner.ts'; import { Time } from '@/utils/time.ts'; @@ -13,88 +14,50 @@ const console = new Stickynotes('ditto:trends'); /** Get trending tag values for a given tag in the given time frame. */ export async function getTrendingTagValues( /** Kysely instance to execute queries on. */ - { dialect, kysely }: DittoDatabase, + kysely: Kysely, /** Tag name to filter by, eg `t` or `r`. */ tagNames: string[], /** Filter of eligible events. */ filter: NostrFilter, ): Promise<{ value: string; authors: number; uses: number }[]> { - if (dialect === 'postgres') { - let query = kysely - .selectFrom([ - 'nostr_events', - sql<{ key: string; value: string }>`jsonb_each_text(nostr_events.tags_index)`.as('kv'), - sql<{ key: string; value: string }>`jsonb_array_elements_text(kv.value::jsonb)`.as('element'), - ]) - .select(({ fn }) => [ - fn('lower', ['element.value']).as('value'), - fn.agg('count', ['nostr_events.pubkey']).distinct().as('authors'), - fn.countAll().as('uses'), - ]) - .where('kv.key', '=', (eb) => eb.fn.any(eb.val(tagNames))) - .groupBy((eb) => eb.fn('lower', ['element.value'])) - .orderBy((eb) => eb.fn.agg('count', ['nostr_events.pubkey']).distinct(), 'desc'); + let query = kysely + .selectFrom([ + 'nostr_events', + sql<{ key: string; value: string }>`jsonb_each_text(nostr_events.tags_index)`.as('kv'), + sql<{ key: string; value: string }>`jsonb_array_elements_text(kv.value::jsonb)`.as('element'), + ]) + .select(({ fn }) => [ + fn('lower', ['element.value']).as('value'), + fn.agg('count', ['nostr_events.pubkey']).distinct().as('authors'), + fn.countAll().as('uses'), + ]) + .where('kv.key', '=', (eb) => eb.fn.any(eb.val(tagNames))) + .groupBy((eb) => eb.fn('lower', ['element.value'])) + .orderBy((eb) => eb.fn.agg('count', ['nostr_events.pubkey']).distinct(), 'desc'); - if (filter.kinds) { - query = query.where('nostr_events.kind', '=', ({ fn, val }) => fn.any(val(filter.kinds))); - } - if (filter.authors) { - query = query.where('nostr_events.pubkey', '=', ({ fn, val }) => fn.any(val(filter.authors))); - } - if (typeof filter.since === 'number') { - query = query.where('nostr_events.created_at', '>=', filter.since); - } - if (typeof filter.until === 'number') { - query = query.where('nostr_events.created_at', '<=', filter.until); - } - if (typeof filter.limit === 'number') { - query = query.limit(filter.limit); - } - - const rows = await query.execute(); - - return rows.map((row) => ({ - value: row.value, - authors: Number(row.authors), - uses: Number(row.uses), - })); + if (filter.kinds) { + query = query.where('nostr_events.kind', '=', ({ fn, val }) => fn.any(val(filter.kinds))); + } + if (filter.authors) { + query = query.where('nostr_events.pubkey', '=', ({ fn, val }) => fn.any(val(filter.authors))); + } + if (typeof filter.since === 'number') { + query = query.where('nostr_events.created_at', '>=', filter.since); + } + if (typeof filter.until === 'number') { + query = query.where('nostr_events.created_at', '<=', filter.until); + } + if (typeof filter.limit === 'number') { + query = query.limit(filter.limit); } - if (dialect === 'sqlite') { - let query = kysely - .selectFrom('nostr_tags') - .select(({ fn }) => [ - 'nostr_tags.value', - fn.agg('count', ['nostr_tags.pubkey']).distinct().as('authors'), - fn.countAll().as('uses'), - ]) - .where('nostr_tags.name', 'in', tagNames) - .groupBy('nostr_tags.value') - .orderBy((c) => c.fn.agg('count', ['nostr_tags.pubkey']).distinct(), 'desc'); + const rows = await query.execute(); - if (filter.kinds) { - query = query.where('nostr_tags.kind', 'in', filter.kinds); - } - if (typeof filter.since === 'number') { - query = query.where('nostr_tags.created_at', '>=', filter.since); - } - if (typeof filter.until === 'number') { - query = query.where('nostr_tags.created_at', '<=', filter.until); - } - if (typeof filter.limit === 'number') { - query = query.limit(filter.limit); - } - - const rows = await query.execute(); - - return rows.map((row) => ({ - value: row.value, - authors: Number(row.authors), - uses: Number(row.uses), - })); - } - - return []; + return rows.map((row) => ({ + value: row.value, + authors: Number(row.authors), + uses: Number(row.uses), + })); } /** Get trending tags and publish an event with them. */ @@ -107,7 +70,7 @@ export async function updateTrendingTags( aliases?: string[], ) { console.info(`Updating trending ${l}...`); - const db = await DittoDB.getInstance(); + const { kysely } = await DittoDB.getInstance(); const signal = AbortSignal.timeout(1000); const yesterday = Math.floor((Date.now() - Time.days(1)) / 1000); @@ -116,7 +79,7 @@ export async function updateTrendingTags( const tagNames = aliases ? [tagName, ...aliases] : [tagName]; try { - const trends = await getTrendingTagValues(db, tagNames, { + const trends = await getTrendingTagValues(kysely, tagNames, { kinds, since: yesterday, until: now, diff --git a/src/utils/SimpleLRU.ts b/src/utils/SimpleLRU.ts index f1bf6512..0f2b5b37 100644 --- a/src/utils/SimpleLRU.ts +++ b/src/utils/SimpleLRU.ts @@ -17,7 +17,7 @@ export class SimpleLRU< constructor(fetchFn: FetchFn, opts: LRUCache.Options) { this.cache = new LRUCache({ - fetchMethod: (key, _staleValue, { signal }) => fetchFn(key, { signal: signal as AbortSignal }), + fetchMethod: (key, _staleValue, { signal }) => fetchFn(key, { signal: signal as unknown as AbortSignal }), ...opts, }); } diff --git a/src/utils/stats.ts b/src/utils/stats.ts index ccba0a5b..e4d4d3f2 100644 --- a/src/utils/stats.ts +++ b/src/utils/stats.ts @@ -5,7 +5,6 @@ import { z } from 'zod'; import { DittoTables } from '@/db/DittoTables.ts'; import { findQuoteTag, findReplyTag, getTagSet } from '@/utils/tags.ts'; -import { Conf } from '@/config.ts'; interface UpdateStatsOpts { kysely: Kysely; @@ -197,16 +196,13 @@ export async function updateAuthorStats( notes_count: 0, }; - let query = kysely + const prev = await kysely .selectFrom('author_stats') .selectAll() - .where('pubkey', '=', pubkey); + .forUpdate() + .where('pubkey', '=', pubkey) + .executeTakeFirst(); - if (Conf.db.dialect === 'postgres') { - query = query.forUpdate(); - } - - const prev = await query.executeTakeFirst(); const stats = fn(prev ?? empty); if (prev) { @@ -249,16 +245,13 @@ export async function updateEventStats( reactions: '{}', }; - let query = kysely + const prev = await kysely .selectFrom('event_stats') .selectAll() - .where('event_id', '=', eventId); + .forUpdate() + .where('event_id', '=', eventId) + .executeTakeFirst(); - if (Conf.db.dialect === 'postgres') { - query = query.forUpdate(); - } - - const prev = await query.executeTakeFirst(); const stats = fn(prev ?? empty); if (prev) { diff --git a/src/workers/fetch.worker.ts b/src/workers/fetch.worker.ts index 0012088b..e6f98455 100644 --- a/src/workers/fetch.worker.ts +++ b/src/workers/fetch.worker.ts @@ -1,7 +1,9 @@ +/// + import Debug from '@soapbox/stickynotes/debug'; import * as Comlink from 'comlink'; -import './handlers/abortsignal.ts'; +import '@/workers/handlers/abortsignal.ts'; import '@/sentry.ts'; const debug = Debug('ditto:fetch.worker'); diff --git a/src/workers/sqlite.ts b/src/workers/sqlite.ts deleted file mode 100644 index 154ec556..00000000 --- a/src/workers/sqlite.ts +++ /dev/null @@ -1,52 +0,0 @@ -import * as Comlink from 'comlink'; -import { asyncGeneratorTransferHandler } from 'comlink-async-generator'; -import { CompiledQuery, QueryResult } from 'kysely'; - -import type { SqliteWorker as _SqliteWorker } from './sqlite.worker.ts'; - -Comlink.transferHandlers.set('asyncGenerator', asyncGeneratorTransferHandler); - -class SqliteWorker { - #worker: Worker; - #client: ReturnType>; - #ready: Promise; - - constructor() { - this.#worker = new Worker(new URL('./sqlite.worker.ts', import.meta.url).href, { type: 'module' }); - this.#client = Comlink.wrap(this.#worker); - - this.#ready = new Promise((resolve) => { - const handleEvent = (event: MessageEvent) => { - if (event.data[0] === 'ready') { - this.#worker.removeEventListener('message', handleEvent); - resolve(); - } - }; - this.#worker.addEventListener('message', handleEvent); - }); - } - - async open(path: string): Promise { - await this.#ready; - return this.#client.open(path); - } - - async executeQuery(query: CompiledQuery): Promise> { - await this.#ready; - return this.#client.executeQuery(query) as Promise>; - } - - async *streamQuery(query: CompiledQuery): AsyncIterableIterator> { - await this.#ready; - - for await (const result of await this.#client.streamQuery(query)) { - yield result as QueryResult; - } - } - - destroy(): Promise { - return this.#client.destroy(); - } -} - -export default SqliteWorker; diff --git a/src/workers/sqlite.worker.ts b/src/workers/sqlite.worker.ts deleted file mode 100644 index 23839dbd..00000000 --- a/src/workers/sqlite.worker.ts +++ /dev/null @@ -1,42 +0,0 @@ -/// -import { Database as SQLite } from '@db/sqlite'; -import * as Comlink from 'comlink'; -import { CompiledQuery, QueryResult } from 'kysely'; -import { asyncGeneratorTransferHandler } from 'comlink-async-generator'; - -import '@/sentry.ts'; - -let db: SQLite | undefined; - -export const SqliteWorker = { - open(path: string): void { - db = new SQLite(path); - }, - executeQuery({ sql, parameters }: CompiledQuery): QueryResult { - if (!db) throw new Error('Database not open'); - - return { - rows: db!.prepare(sql).all(...parameters as any[]) as R[], - numAffectedRows: BigInt(db!.changes), - insertId: BigInt(db!.lastInsertRowId), - }; - }, - async *streamQuery({ sql, parameters }: CompiledQuery): AsyncIterableIterator> { - if (!db) throw new Error('Database not open'); - - const stmt = db.prepare(sql).bind(...parameters as any[]); - for (const row of stmt) { - yield { - rows: [row], - }; - } - }, - destroy() { - db?.close(); - }, -}; - -Comlink.transferHandlers.set('asyncGenerator', asyncGeneratorTransferHandler); -Comlink.expose(SqliteWorker); - -self.postMessage(['ready']);