mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 11:29:46 +00:00
Merge branch 'rm-sqlite' into 'main'
Remove SQLite support, simplify database code See merge request soapbox-pub/ditto!485
This commit is contained in:
commit
624b6b278e
33 changed files with 204 additions and 687 deletions
|
|
@ -35,11 +35,10 @@ test:
|
|||
|
||||
postgres:
|
||||
stage: test
|
||||
script: deno task db:migrate && deno task test
|
||||
script: sleep 1 && deno task test
|
||||
services:
|
||||
- postgres:16
|
||||
variables:
|
||||
DITTO_NSEC: nsec1zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zygs4rm7hz
|
||||
DATABASE_URL: postgres://postgres:postgres@postgres:5432/postgres
|
||||
TEST_DATABASE_URL: postgres://postgres:postgres@postgres:5432/postgres
|
||||
POSTGRES_HOST_AUTH_METHOD: trust
|
||||
ALLOW_TO_USE_DATABASE_URL: true
|
||||
|
|
|
|||
|
|
@ -27,7 +27,6 @@
|
|||
"@/": "./src/",
|
||||
"@b-fuze/deno-dom": "jsr:@b-fuze/deno-dom@^0.1.47",
|
||||
"@bradenmacdonald/s3-lite-client": "jsr:@bradenmacdonald/s3-lite-client@^0.7.4",
|
||||
"@db/sqlite": "jsr:@db/sqlite@^0.11.1",
|
||||
"@electric-sql/pglite": "npm:@soapbox.pub/pglite@^0.2.10",
|
||||
"@hono/hono": "jsr:@hono/hono@^4.4.6",
|
||||
"@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1",
|
||||
|
|
@ -37,7 +36,6 @@
|
|||
"@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.30.1",
|
||||
"@scure/base": "npm:@scure/base@^1.1.6",
|
||||
"@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs",
|
||||
"@soapbox/kysely-deno-sqlite": "jsr:@soapbox/kysely-deno-sqlite@^2.1.0",
|
||||
"@soapbox/kysely-pglite": "jsr:@soapbox/kysely-pglite@^0.0.1",
|
||||
"@soapbox/stickynotes": "jsr:@soapbox/stickynotes@^0.4.0",
|
||||
"@std/assert": "jsr:@std/assert@^0.225.1",
|
||||
|
|
|
|||
65
deno.lock
generated
65
deno.lock
generated
|
|
@ -4,8 +4,6 @@
|
|||
"specifiers": {
|
||||
"jsr:@b-fuze/deno-dom@^0.1.47": "jsr:@b-fuze/deno-dom@0.1.48",
|
||||
"jsr:@bradenmacdonald/s3-lite-client@^0.7.4": "jsr:@bradenmacdonald/s3-lite-client@0.7.6",
|
||||
"jsr:@db/sqlite@^0.11.1": "jsr:@db/sqlite@0.11.1",
|
||||
"jsr:@denosaurs/plug@1": "jsr:@denosaurs/plug@1.0.6",
|
||||
"jsr:@denosaurs/plug@1.0.3": "jsr:@denosaurs/plug@1.0.3",
|
||||
"jsr:@gleasonator/policy": "jsr:@gleasonator/policy@0.2.0",
|
||||
"jsr:@gleasonator/policy@0.2.0": "jsr:@gleasonator/policy@0.2.0",
|
||||
|
|
@ -25,12 +23,9 @@
|
|||
"jsr:@nostrify/policies@^0.33.0": "jsr:@nostrify/policies@0.33.0",
|
||||
"jsr:@nostrify/types@^0.30.0": "jsr:@nostrify/types@0.30.1",
|
||||
"jsr:@nostrify/types@^0.30.1": "jsr:@nostrify/types@0.30.1",
|
||||
"jsr:@soapbox/kysely-deno-sqlite@^2.1.0": "jsr:@soapbox/kysely-deno-sqlite@2.2.0",
|
||||
"jsr:@soapbox/kysely-pglite@^0.0.1": "jsr:@soapbox/kysely-pglite@0.0.1",
|
||||
"jsr:@soapbox/stickynotes@^0.4.0": "jsr:@soapbox/stickynotes@0.4.0",
|
||||
"jsr:@std/assert@^0.213.1": "jsr:@std/assert@0.213.1",
|
||||
"jsr:@std/assert@^0.217.0": "jsr:@std/assert@0.217.0",
|
||||
"jsr:@std/assert@^0.221.0": "jsr:@std/assert@0.221.0",
|
||||
"jsr:@std/assert@^0.224.0": "jsr:@std/assert@0.224.0",
|
||||
"jsr:@std/assert@^0.225.1": "jsr:@std/assert@0.225.3",
|
||||
"jsr:@std/bytes@^0.224.0": "jsr:@std/bytes@0.224.0",
|
||||
|
|
@ -41,22 +36,17 @@
|
|||
"jsr:@std/crypto@^0.224.0": "jsr:@std/crypto@0.224.0",
|
||||
"jsr:@std/dotenv@^0.224.0": "jsr:@std/dotenv@0.224.2",
|
||||
"jsr:@std/encoding@0.213.1": "jsr:@std/encoding@0.213.1",
|
||||
"jsr:@std/encoding@^0.221.0": "jsr:@std/encoding@0.221.0",
|
||||
"jsr:@std/encoding@^0.224.0": "jsr:@std/encoding@0.224.3",
|
||||
"jsr:@std/encoding@^0.224.1": "jsr:@std/encoding@0.224.3",
|
||||
"jsr:@std/fmt@0.213.1": "jsr:@std/fmt@0.213.1",
|
||||
"jsr:@std/fmt@^0.221.0": "jsr:@std/fmt@0.221.0",
|
||||
"jsr:@std/fs@0.213.1": "jsr:@std/fs@0.213.1",
|
||||
"jsr:@std/fs@^0.221.0": "jsr:@std/fs@0.221.0",
|
||||
"jsr:@std/fs@^0.229.3": "jsr:@std/fs@0.229.3",
|
||||
"jsr:@std/internal@^1.0.0": "jsr:@std/internal@1.0.1",
|
||||
"jsr:@std/io@^0.224": "jsr:@std/io@0.224.7",
|
||||
"jsr:@std/json@^0.223.0": "jsr:@std/json@0.223.0",
|
||||
"jsr:@std/media-types@^0.224.1": "jsr:@std/media-types@0.224.1",
|
||||
"jsr:@std/path@0.213.1": "jsr:@std/path@0.213.1",
|
||||
"jsr:@std/path@0.217": "jsr:@std/path@0.217.0",
|
||||
"jsr:@std/path@^0.213.1": "jsr:@std/path@0.213.1",
|
||||
"jsr:@std/path@^0.221.0": "jsr:@std/path@0.221.0",
|
||||
"jsr:@std/streams@^0.223.0": "jsr:@std/streams@0.223.0",
|
||||
"npm:@isaacs/ttlcache@^1.4.1": "npm:@isaacs/ttlcache@1.4.1",
|
||||
"npm:@noble/hashes@^1.4.0": "npm:@noble/hashes@1.4.0",
|
||||
|
|
@ -116,13 +106,6 @@
|
|||
"jsr:@std/io@^0.224"
|
||||
]
|
||||
},
|
||||
"@db/sqlite@0.11.1": {
|
||||
"integrity": "546434e7ed762db07e6ade0f963540dd5e06723b802937bf260ff855b21ef9c5",
|
||||
"dependencies": [
|
||||
"jsr:@denosaurs/plug@1",
|
||||
"jsr:@std/path@0.217"
|
||||
]
|
||||
},
|
||||
"@denosaurs/plug@1.0.3": {
|
||||
"integrity": "b010544e386bea0ff3a1d05e0c88f704ea28cbd4d753439c2f1ee021a85d4640",
|
||||
"dependencies": [
|
||||
|
|
@ -132,15 +115,6 @@
|
|||
"jsr:@std/path@0.213.1"
|
||||
]
|
||||
},
|
||||
"@denosaurs/plug@1.0.6": {
|
||||
"integrity": "6cf5b9daba7799837b9ffbe89f3450510f588fafef8115ddab1ff0be9cb7c1a7",
|
||||
"dependencies": [
|
||||
"jsr:@std/encoding@^0.221.0",
|
||||
"jsr:@std/fmt@^0.221.0",
|
||||
"jsr:@std/fs@^0.221.0",
|
||||
"jsr:@std/path@^0.221.0"
|
||||
]
|
||||
},
|
||||
"@gleasonator/policy@0.2.0": {
|
||||
"integrity": "3fe58b853ab203b2b67e65b64391dbcf5c07bc1caaf46e97b2f8ed5b14f30fdf",
|
||||
"dependencies": [
|
||||
|
|
@ -293,12 +267,6 @@
|
|||
"@nostrify/types@0.30.1": {
|
||||
"integrity": "245da176f6893a43250697db51ad32bfa29bf9b1cdc1ca218043d9abf6de5ae5"
|
||||
},
|
||||
"@soapbox/kysely-deno-sqlite@2.2.0": {
|
||||
"integrity": "668ec94600bc4b4d7bd618dd7ca65d4ef30ee61c46ffcb379b6f45203c08517a",
|
||||
"dependencies": [
|
||||
"npm:kysely@^0.27.2"
|
||||
]
|
||||
},
|
||||
"@soapbox/kysely-pglite@0.0.1": {
|
||||
"integrity": "7a4221aa780aad6fba9747c45c59dfb1c62017ba8cad9db5607f6e5822c058d5",
|
||||
"dependencies": [
|
||||
|
|
@ -311,12 +279,6 @@
|
|||
"@std/assert@0.213.1": {
|
||||
"integrity": "24c28178b30c8e0782c18e8e94ea72b16282207569cdd10ffb9d1d26f2edebfe"
|
||||
},
|
||||
"@std/assert@0.217.0": {
|
||||
"integrity": "c98e279362ca6982d5285c3b89517b757c1e3477ee9f14eb2fdf80a45aaa9642"
|
||||
},
|
||||
"@std/assert@0.221.0": {
|
||||
"integrity": "a5f1aa6e7909dbea271754fd4ab3f4e687aeff4873b4cef9a320af813adb489a"
|
||||
},
|
||||
"@std/assert@0.224.0": {
|
||||
"integrity": "8643233ec7aec38a940a8264a6e3eed9bfa44e7a71cc6b3c8874213ff401967f"
|
||||
},
|
||||
|
|
@ -351,18 +313,12 @@
|
|||
"@std/encoding@0.213.1": {
|
||||
"integrity": "fcbb6928713dde941a18ca5db88ca1544d0755ec8fb20fe61e2dc8144b390c62"
|
||||
},
|
||||
"@std/encoding@0.221.0": {
|
||||
"integrity": "d1dd76ef0dc5d14088411e6dc1dede53bf8308c95d1537df1214c97137208e45"
|
||||
},
|
||||
"@std/encoding@0.224.3": {
|
||||
"integrity": "5e861b6d81be5359fad4155e591acf17c0207b595112d1840998bb9f476dbdaf"
|
||||
},
|
||||
"@std/fmt@0.213.1": {
|
||||
"integrity": "a06d31777566d874b9c856c10244ac3e6b660bdec4c82506cd46be052a1082c3"
|
||||
},
|
||||
"@std/fmt@0.221.0": {
|
||||
"integrity": "379fed69bdd9731110f26b9085aeb740606b20428ce6af31ef6bd45ef8efa62a"
|
||||
},
|
||||
"@std/fs@0.213.1": {
|
||||
"integrity": "fbcaf099f8a85c27ab0712b666262cda8fe6d02e9937bf9313ecaea39a22c501",
|
||||
"dependencies": [
|
||||
|
|
@ -370,13 +326,6 @@
|
|||
"jsr:@std/path@^0.213.1"
|
||||
]
|
||||
},
|
||||
"@std/fs@0.221.0": {
|
||||
"integrity": "028044450299de8ed5a716ade4e6d524399f035513b85913794f4e81f07da286",
|
||||
"dependencies": [
|
||||
"jsr:@std/assert@^0.221.0",
|
||||
"jsr:@std/path@^0.221.0"
|
||||
]
|
||||
},
|
||||
"@std/fs@0.229.3": {
|
||||
"integrity": "783bca21f24da92e04c3893c9e79653227ab016c48e96b3078377ebd5222e6eb"
|
||||
},
|
||||
|
|
@ -434,18 +383,6 @@
|
|||
"jsr:@std/assert@^0.213.1"
|
||||
]
|
||||
},
|
||||
"@std/path@0.217.0": {
|
||||
"integrity": "1217cc25534bca9a2f672d7fe7c6f356e4027df400c0e85c0ef3e4343bc67d11",
|
||||
"dependencies": [
|
||||
"jsr:@std/assert@^0.217.0"
|
||||
]
|
||||
},
|
||||
"@std/path@0.221.0": {
|
||||
"integrity": "0a36f6b17314ef653a3a1649740cc8db51b25a133ecfe838f20b79a56ebe0095",
|
||||
"dependencies": [
|
||||
"jsr:@std/assert@^0.221.0"
|
||||
]
|
||||
},
|
||||
"@std/streams@0.223.0": {
|
||||
"integrity": "d6b28e498ced3960b04dc5d251f2dcfc1df244b5ec5a48dc23a8f9b490be3b99"
|
||||
}
|
||||
|
|
@ -1972,12 +1909,10 @@
|
|||
"dependencies": [
|
||||
"jsr:@b-fuze/deno-dom@^0.1.47",
|
||||
"jsr:@bradenmacdonald/s3-lite-client@^0.7.4",
|
||||
"jsr:@db/sqlite@^0.11.1",
|
||||
"jsr:@hono/hono@^4.4.6",
|
||||
"jsr:@lambdalisue/async@^2.1.1",
|
||||
"jsr:@nostrify/db@^0.31.2",
|
||||
"jsr:@nostrify/nostrify@^0.30.1",
|
||||
"jsr:@soapbox/kysely-deno-sqlite@^2.1.0",
|
||||
"jsr:@soapbox/kysely-pglite@^0.0.1",
|
||||
"jsr:@soapbox/stickynotes@^0.4.0",
|
||||
"jsr:@std/assert@^0.225.1",
|
||||
|
|
|
|||
|
|
@ -16,9 +16,9 @@ ssh -L 9229:localhost:9229 <user>@<host>
|
|||
|
||||
Then, in Chromium, go to `chrome://inspect` and the Ditto server should be available.
|
||||
|
||||
## SQLite performance
|
||||
## SQL performance
|
||||
|
||||
To track slow queries, first set `DEBUG=ditto:sqlite.worker` in the environment so only SQLite logs are shown.
|
||||
To track slow queries, first set `DEBUG=ditto:sql` in the environment so only SQL logs are shown.
|
||||
|
||||
Then, grep for any logs above 0.001s:
|
||||
|
||||
|
|
|
|||
|
|
@ -9,8 +9,8 @@ import { nostrNow } from '@/utils.ts';
|
|||
|
||||
const signer = new AdminSigner();
|
||||
|
||||
const db = await DittoDB.getInstance();
|
||||
const eventsDB = new EventsDB(db);
|
||||
const { kysely } = await DittoDB.getInstance();
|
||||
const eventsDB = new EventsDB(kysely);
|
||||
|
||||
const readable = Deno.stdin.readable
|
||||
.pipeThrough(new TextDecoderStream())
|
||||
|
|
|
|||
|
|
@ -6,8 +6,8 @@ import { AdminSigner } from '@/signers/AdminSigner.ts';
|
|||
import { EventsDB } from '@/storages/EventsDB.ts';
|
||||
import { nostrNow } from '@/utils.ts';
|
||||
|
||||
const db = await DittoDB.getInstance();
|
||||
const eventsDB = new EventsDB(db);
|
||||
const { kysely } = await DittoDB.getInstance();
|
||||
const eventsDB = new EventsDB(kysely);
|
||||
|
||||
const [pubkeyOrNpub, role] = Deno.args;
|
||||
const pubkey = pubkeyOrNpub.startsWith('npub1') ? nip19.decode(pubkeyOrNpub as `npub1${string}`).data : pubkeyOrNpub;
|
||||
|
|
|
|||
|
|
@ -1,11 +1,4 @@
|
|||
import { Conf } from '@/config.ts';
|
||||
import { DittoDB } from '@/db/DittoDB.ts';
|
||||
import { sleep } from '@/test.ts';
|
||||
|
||||
if (Deno.env.get('CI') && Conf.db.dialect === 'postgres') {
|
||||
console.info('Waiting 1 second for postgres to start...');
|
||||
await sleep(1_000);
|
||||
}
|
||||
|
||||
// This migrates kysely internally.
|
||||
const { kysely } = await DittoDB.getInstance();
|
||||
|
|
|
|||
|
|
@ -9,8 +9,8 @@ import { nip19 } from 'nostr-tools';
|
|||
import { DittoDB } from '@/db/DittoDB.ts';
|
||||
import { EventsDB } from '@/storages/EventsDB.ts';
|
||||
|
||||
const db = await DittoDB.getInstance();
|
||||
const eventsDB = new EventsDB(db);
|
||||
const { kysely } = await DittoDB.getInstance();
|
||||
const eventsDB = new EventsDB(kysely);
|
||||
|
||||
interface ImportEventsOpts {
|
||||
profilesOnly: boolean;
|
||||
|
|
|
|||
|
|
@ -45,10 +45,10 @@ const DATABASE_URL = Deno.env.get('DATABASE_URL');
|
|||
if (DATABASE_URL) {
|
||||
vars.DATABASE_URL = await question('input', 'Database URL', DATABASE_URL);
|
||||
} else {
|
||||
const database = await question('list', 'Which database do you want to use?', ['postgres', 'sqlite']);
|
||||
if (database === 'sqlite') {
|
||||
const path = await question('input', 'Path to SQLite database', 'data/db.sqlite3');
|
||||
vars.DATABASE_URL = `sqlite://${path}`;
|
||||
const database = await question('list', 'Which database do you want to use?', ['postgres', 'pglite']);
|
||||
if (database === 'pglite') {
|
||||
const path = await question('input', 'Path to PGlite data directory', 'data/pgdata');
|
||||
vars.DATABASE_URL = `file://${path}`;
|
||||
}
|
||||
if (database === 'postgres') {
|
||||
const host = await question('input', 'Postgres host', 'localhost');
|
||||
|
|
|
|||
|
|
@ -18,6 +18,6 @@ try {
|
|||
}
|
||||
|
||||
const store = await Storages.db();
|
||||
const kysely = await DittoDB.getInstance();
|
||||
const { kysely } = await DittoDB.getInstance();
|
||||
|
||||
await refreshAuthorStats({ pubkey, kysely, store });
|
||||
|
|
|
|||
|
|
@ -1,5 +1,3 @@
|
|||
import url from 'node:url';
|
||||
|
||||
import * as dotenv from '@std/dotenv';
|
||||
import { getPublicKey, nip19 } from 'nostr-tools';
|
||||
import { z } from 'zod';
|
||||
|
|
@ -82,23 +80,13 @@ class Conf {
|
|||
* ```
|
||||
*/
|
||||
static get databaseUrl(): string {
|
||||
return Deno.env.get('DATABASE_URL') ?? 'pglite://data/pgdata';
|
||||
return Deno.env.get('DATABASE_URL') ?? 'file://data/pgdata';
|
||||
}
|
||||
/** Database to use in tests. */
|
||||
static get testDatabaseUrl(): string {
|
||||
return Deno.env.get('TEST_DATABASE_URL') ?? 'memory://';
|
||||
}
|
||||
static db = {
|
||||
get url(): url.UrlWithStringQuery {
|
||||
return url.parse(Conf.databaseUrl);
|
||||
},
|
||||
get dialect(): 'sqlite' | 'postgres' | undefined {
|
||||
switch (Conf.db.url.protocol) {
|
||||
case 'sqlite:':
|
||||
return 'sqlite';
|
||||
case 'pglite:':
|
||||
case 'postgres:':
|
||||
case 'postgresql:':
|
||||
return 'postgres';
|
||||
}
|
||||
return undefined;
|
||||
},
|
||||
/** Database query timeout configurations. */
|
||||
timeouts: {
|
||||
/** Default query timeout when another setting isn't more specific. */
|
||||
|
|
@ -217,21 +205,6 @@ class Conf {
|
|||
static get sentryDsn(): string | undefined {
|
||||
return Deno.env.get('SENTRY_DSN');
|
||||
}
|
||||
/** SQLite settings. */
|
||||
static sqlite = {
|
||||
/**
|
||||
* Number of bytes to use for memory-mapped IO.
|
||||
* https://www.sqlite.org/pragma.html#pragma_mmap_size
|
||||
*/
|
||||
get mmapSize(): number {
|
||||
const value = Deno.env.get('SQLITE_MMAP_SIZE');
|
||||
if (value) {
|
||||
return Number(value);
|
||||
} else {
|
||||
return 1024 * 1024 * 1024;
|
||||
}
|
||||
},
|
||||
};
|
||||
/** Postgres settings. */
|
||||
static pg = {
|
||||
/** Number of connections to use in the pool. */
|
||||
|
|
|
|||
|
|
@ -6,9 +6,11 @@ import { dbAvailableConnectionsGauge, dbPoolSizeGauge } from '@/metrics.ts';
|
|||
|
||||
/** Prometheus/OpenMetrics controller. */
|
||||
export const metricsController: AppController = async (c) => {
|
||||
const db = await DittoDB.getInstance();
|
||||
|
||||
// Update some metrics at request time.
|
||||
dbPoolSizeGauge.set(DittoDB.poolSize);
|
||||
dbAvailableConnectionsGauge.set(DittoDB.availableConnections);
|
||||
dbPoolSizeGauge.set(db.poolSize);
|
||||
dbAvailableConnectionsGauge.set(db.availableConnections);
|
||||
|
||||
const metrics = await register.metrics();
|
||||
|
||||
|
|
|
|||
|
|
@ -1,75 +1,44 @@
|
|||
import fs from 'node:fs/promises';
|
||||
import path from 'node:path';
|
||||
|
||||
import { NDatabaseSchema, NPostgresSchema } from '@nostrify/db';
|
||||
import { FileMigrationProvider, Kysely, Migrator } from 'kysely';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
import { DittoPglite } from '@/db/adapters/DittoPglite.ts';
|
||||
import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts';
|
||||
import { DittoSQLite } from '@/db/adapters/DittoSQLite.ts';
|
||||
import { DittoDatabase, DittoDatabaseOpts } from '@/db/DittoDatabase.ts';
|
||||
import { DittoTables } from '@/db/DittoTables.ts';
|
||||
|
||||
export type DittoDatabase = {
|
||||
dialect: 'sqlite';
|
||||
kysely: Kysely<DittoTables> & Kysely<NDatabaseSchema>;
|
||||
} | {
|
||||
dialect: 'postgres';
|
||||
kysely: Kysely<DittoTables> & Kysely<NPostgresSchema>;
|
||||
};
|
||||
|
||||
export class DittoDB {
|
||||
private static db: Promise<DittoDatabase> | undefined;
|
||||
private static db: DittoDatabase | undefined;
|
||||
|
||||
static getInstance(): Promise<DittoDatabase> {
|
||||
/** Create (and migrate) the database if it isn't been already, or return the existing connection. */
|
||||
static async getInstance(): Promise<DittoDatabase> {
|
||||
if (!this.db) {
|
||||
this.db = this._getInstance();
|
||||
this.db = this.create(Conf.databaseUrl, { poolSize: Conf.pg.poolSize });
|
||||
await this.migrate(this.db.kysely);
|
||||
}
|
||||
return this.db;
|
||||
}
|
||||
|
||||
static async _getInstance(): Promise<DittoDatabase> {
|
||||
const result = {} as DittoDatabase;
|
||||
/** Open a new database connection. */
|
||||
static create(databaseUrl: string, opts?: DittoDatabaseOpts): DittoDatabase {
|
||||
const { protocol } = new URL(databaseUrl);
|
||||
|
||||
switch (Conf.db.url.protocol) {
|
||||
case 'sqlite:':
|
||||
result.dialect = 'sqlite';
|
||||
result.kysely = await DittoSQLite.getInstance();
|
||||
break;
|
||||
case 'pglite:':
|
||||
result.dialect = 'postgres';
|
||||
result.kysely = await DittoPglite.getInstance();
|
||||
break;
|
||||
switch (protocol) {
|
||||
case 'file:':
|
||||
case 'memory:':
|
||||
return DittoPglite.create(databaseUrl);
|
||||
case 'postgres:':
|
||||
case 'postgresql:':
|
||||
result.dialect = 'postgres';
|
||||
result.kysely = await DittoPostgres.getInstance();
|
||||
break;
|
||||
return DittoPostgres.create(databaseUrl, opts);
|
||||
default:
|
||||
throw new Error('Unsupported database URL.');
|
||||
}
|
||||
|
||||
await this.migrate(result.kysely);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
static get poolSize(): number {
|
||||
if (Conf.db.dialect === 'postgres') {
|
||||
return DittoPostgres.poolSize;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
static get availableConnections(): number {
|
||||
if (Conf.db.dialect === 'postgres') {
|
||||
return DittoPostgres.availableConnections;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/** Migrate the database to the latest version. */
|
||||
static async migrate(kysely: DittoDatabase['kysely']) {
|
||||
static async migrate(kysely: Kysely<DittoTables>) {
|
||||
const migrator = new Migrator({
|
||||
db: kysely,
|
||||
provider: new FileMigrationProvider({
|
||||
|
|
|
|||
13
src/db/DittoDatabase.ts
Normal file
13
src/db/DittoDatabase.ts
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
import { Kysely } from 'kysely';
|
||||
|
||||
import { DittoTables } from '@/db/DittoTables.ts';
|
||||
|
||||
export interface DittoDatabase {
|
||||
readonly kysely: Kysely<DittoTables>;
|
||||
readonly poolSize: number;
|
||||
readonly availableConnections: number;
|
||||
}
|
||||
|
||||
export interface DittoDatabaseOpts {
|
||||
poolSize?: number;
|
||||
}
|
||||
|
|
@ -1,4 +1,6 @@
|
|||
export interface DittoTables {
|
||||
import { NPostgresSchema } from '@nostrify/db';
|
||||
|
||||
export interface DittoTables extends NPostgresSchema {
|
||||
nip46_tokens: NIP46TokenRow;
|
||||
author_stats: AuthorStatsRow;
|
||||
event_stats: EventStatsRow;
|
||||
|
|
|
|||
|
|
@ -1,56 +1,24 @@
|
|||
import { PGlite } from '@electric-sql/pglite';
|
||||
import { NPostgresSchema } from '@nostrify/db';
|
||||
import { PgliteDialect } from '@soapbox/kysely-pglite';
|
||||
import { Kysely } from 'kysely';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
import { DittoDatabase } from '@/db/DittoDatabase.ts';
|
||||
import { DittoTables } from '@/db/DittoTables.ts';
|
||||
import { KyselyLogger } from '@/db/KyselyLogger.ts';
|
||||
|
||||
export class DittoPglite {
|
||||
static db: Kysely<DittoTables> & Kysely<NPostgresSchema> | undefined;
|
||||
static create(databaseUrl: string): DittoDatabase {
|
||||
const kysely = new Kysely<DittoTables>({
|
||||
dialect: new PgliteDialect({
|
||||
database: new PGlite(databaseUrl),
|
||||
}),
|
||||
log: KyselyLogger,
|
||||
});
|
||||
|
||||
// deno-lint-ignore require-await
|
||||
static async getInstance(): Promise<Kysely<DittoTables> & Kysely<NPostgresSchema>> {
|
||||
if (!this.db) {
|
||||
this.db = new Kysely({
|
||||
dialect: new PgliteDialect({
|
||||
database: new PGlite(this.path),
|
||||
}),
|
||||
log: KyselyLogger,
|
||||
}) as Kysely<DittoTables> & Kysely<NPostgresSchema>;
|
||||
}
|
||||
|
||||
return this.db;
|
||||
}
|
||||
|
||||
static get poolSize() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
static get availableConnections(): number {
|
||||
return 1;
|
||||
}
|
||||
|
||||
/** Get the relative or absolute path based on the `DATABASE_URL`. */
|
||||
static get path(): string | undefined {
|
||||
if (Conf.databaseUrl === 'pglite://:memory:') {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const { host, pathname } = Conf.db.url;
|
||||
|
||||
if (!pathname) return '';
|
||||
|
||||
// Get relative path.
|
||||
if (host === '') {
|
||||
return pathname;
|
||||
} else if (host === '.') {
|
||||
return pathname;
|
||||
} else if (host) {
|
||||
return host + pathname;
|
||||
}
|
||||
|
||||
return '';
|
||||
return {
|
||||
kysely,
|
||||
poolSize: 1,
|
||||
availableConnections: 1,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,3 @@
|
|||
import { NPostgresSchema } from '@nostrify/db';
|
||||
import {
|
||||
BinaryOperationNode,
|
||||
FunctionNode,
|
||||
|
|
@ -13,51 +12,43 @@ import {
|
|||
import { PostgresJSDialectConfig, PostgresJSDriver } from 'kysely-postgres-js';
|
||||
import postgres from 'postgres';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
import { DittoDatabase, DittoDatabaseOpts } from '@/db/DittoDatabase.ts';
|
||||
import { DittoTables } from '@/db/DittoTables.ts';
|
||||
import { KyselyLogger } from '@/db/KyselyLogger.ts';
|
||||
|
||||
export class DittoPostgres {
|
||||
static db: Kysely<DittoTables> & Kysely<NPostgresSchema> | undefined;
|
||||
static postgres?: postgres.Sql;
|
||||
static create(databaseUrl: string, opts?: DittoDatabaseOpts): DittoDatabase {
|
||||
const pg = postgres(databaseUrl, { max: opts?.poolSize });
|
||||
|
||||
// deno-lint-ignore require-await
|
||||
static async getInstance(): Promise<Kysely<DittoTables> & Kysely<NPostgresSchema>> {
|
||||
if (!this.postgres) {
|
||||
this.postgres = postgres(Conf.databaseUrl, { max: Conf.pg.poolSize });
|
||||
}
|
||||
|
||||
if (!this.db) {
|
||||
this.db = new Kysely({
|
||||
dialect: {
|
||||
createAdapter() {
|
||||
return new PostgresAdapter();
|
||||
},
|
||||
createDriver() {
|
||||
return new PostgresJSDriver({
|
||||
postgres: DittoPostgres.postgres as unknown as PostgresJSDialectConfig['postgres'],
|
||||
});
|
||||
},
|
||||
createIntrospector(db) {
|
||||
return new PostgresIntrospector(db);
|
||||
},
|
||||
createQueryCompiler() {
|
||||
return new DittoPostgresQueryCompiler();
|
||||
},
|
||||
const kysely = new Kysely<DittoTables>({
|
||||
dialect: {
|
||||
createAdapter() {
|
||||
return new PostgresAdapter();
|
||||
},
|
||||
log: KyselyLogger,
|
||||
}) as Kysely<DittoTables> & Kysely<NPostgresSchema>;
|
||||
}
|
||||
createDriver() {
|
||||
return new PostgresJSDriver({
|
||||
postgres: pg as unknown as PostgresJSDialectConfig['postgres'],
|
||||
});
|
||||
},
|
||||
createIntrospector(db) {
|
||||
return new PostgresIntrospector(db);
|
||||
},
|
||||
createQueryCompiler() {
|
||||
return new DittoPostgresQueryCompiler();
|
||||
},
|
||||
},
|
||||
log: KyselyLogger,
|
||||
});
|
||||
|
||||
return this.db;
|
||||
}
|
||||
|
||||
static get poolSize() {
|
||||
return this.postgres?.connections.open ?? 0;
|
||||
}
|
||||
|
||||
static get availableConnections(): number {
|
||||
return this.postgres?.connections.idle ?? 0;
|
||||
return {
|
||||
kysely,
|
||||
get poolSize() {
|
||||
return pg.connections.open;
|
||||
},
|
||||
get availableConnections() {
|
||||
return pg.connections.idle;
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,59 +0,0 @@
|
|||
import { NDatabaseSchema } from '@nostrify/db';
|
||||
import { PolySqliteDialect } from '@soapbox/kysely-deno-sqlite';
|
||||
import { Kysely, sql } from 'kysely';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
import { DittoTables } from '@/db/DittoTables.ts';
|
||||
import { KyselyLogger } from '@/db/KyselyLogger.ts';
|
||||
import SqliteWorker from '@/workers/sqlite.ts';
|
||||
|
||||
export class DittoSQLite {
|
||||
static db: Kysely<DittoTables> & Kysely<NDatabaseSchema> | undefined;
|
||||
|
||||
static async getInstance(): Promise<Kysely<DittoTables> & Kysely<NDatabaseSchema>> {
|
||||
if (!this.db) {
|
||||
const sqliteWorker = new SqliteWorker();
|
||||
await sqliteWorker.open(this.path);
|
||||
|
||||
this.db = new Kysely({
|
||||
dialect: new PolySqliteDialect({
|
||||
database: sqliteWorker,
|
||||
}),
|
||||
log: KyselyLogger,
|
||||
}) as Kysely<DittoTables> & Kysely<NDatabaseSchema>;
|
||||
|
||||
// Set PRAGMA values.
|
||||
await Promise.all([
|
||||
sql`PRAGMA synchronous = normal`.execute(this.db),
|
||||
sql`PRAGMA temp_store = memory`.execute(this.db),
|
||||
sql`PRAGMA foreign_keys = ON`.execute(this.db),
|
||||
sql`PRAGMA auto_vacuum = FULL`.execute(this.db),
|
||||
sql`PRAGMA journal_mode = WAL`.execute(this.db),
|
||||
sql.raw(`PRAGMA mmap_size = ${Conf.sqlite.mmapSize}`).execute(this.db),
|
||||
]);
|
||||
}
|
||||
return this.db;
|
||||
}
|
||||
|
||||
/** Get the relative or absolute path based on the `DATABASE_URL`. */
|
||||
static get path() {
|
||||
if (Conf.databaseUrl === 'sqlite://:memory:') {
|
||||
return ':memory:';
|
||||
}
|
||||
|
||||
const { host, pathname } = Conf.db.url;
|
||||
|
||||
if (!pathname) return '';
|
||||
|
||||
// Get relative path.
|
||||
if (host === '') {
|
||||
return pathname;
|
||||
} else if (host === '.') {
|
||||
return pathname;
|
||||
} else if (host) {
|
||||
return host + pathname;
|
||||
}
|
||||
|
||||
return '';
|
||||
}
|
||||
}
|
||||
|
|
@ -1,13 +1,8 @@
|
|||
import { Kysely, sql } from 'kysely';
|
||||
import { Kysely } from 'kysely';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
|
||||
export async function up(db: Kysely<any>): Promise<void> {
|
||||
if (Conf.db.dialect === 'sqlite') {
|
||||
await sql`CREATE VIRTUAL TABLE events_fts USING fts5(id, content)`.execute(db);
|
||||
}
|
||||
export async function up(_db: Kysely<any>): Promise<void> {
|
||||
// This migration used to create an FTS table for SQLite, but SQLite support was removed.
|
||||
}
|
||||
|
||||
export async function down(db: Kysely<any>): Promise<void> {
|
||||
await db.schema.dropTable('events_fts').ifExists().execute();
|
||||
export async function down(_db: Kysely<any>): Promise<void> {
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,25 +1,13 @@
|
|||
import { Kysely, sql } from 'kysely';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
import { Kysely } from 'kysely';
|
||||
|
||||
export async function up(db: Kysely<any>): Promise<void> {
|
||||
await db.schema.alterTable('events').renameTo('nostr_events').execute();
|
||||
await db.schema.alterTable('tags').renameTo('nostr_tags').execute();
|
||||
await db.schema.alterTable('nostr_tags').renameColumn('tag', 'name').execute();
|
||||
|
||||
if (Conf.db.dialect === 'sqlite') {
|
||||
await db.schema.dropTable('events_fts').execute();
|
||||
await sql`CREATE VIRTUAL TABLE nostr_fts5 USING fts5(event_id, content)`.execute(db);
|
||||
}
|
||||
}
|
||||
|
||||
export async function down(db: Kysely<any>): Promise<void> {
|
||||
await db.schema.alterTable('nostr_events').renameTo('events').execute();
|
||||
await db.schema.alterTable('nostr_tags').renameTo('tags').execute();
|
||||
await db.schema.alterTable('tags').renameColumn('name', 'tag').execute();
|
||||
|
||||
if (Conf.db.dialect === 'sqlite') {
|
||||
await db.schema.dropTable('nostr_fts5').execute();
|
||||
await sql`CREATE VIRTUAL TABLE events_fts USING fts5(id, content)`.execute(db);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,19 +1,13 @@
|
|||
import { Kysely, sql } from 'kysely';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
|
||||
export async function up(db: Kysely<any>): Promise<void> {
|
||||
if (Conf.db.dialect === 'postgres') {
|
||||
await db.schema.createTable('nostr_pgfts')
|
||||
.ifNotExists()
|
||||
.addColumn('event_id', 'text', (c) => c.primaryKey().references('nostr_events.id').onDelete('cascade'))
|
||||
.addColumn('search_vec', sql`tsvector`, (c) => c.notNull())
|
||||
.execute();
|
||||
}
|
||||
await db.schema.createTable('nostr_pgfts')
|
||||
.ifNotExists()
|
||||
.addColumn('event_id', 'text', (c) => c.primaryKey().references('nostr_events.id').onDelete('cascade'))
|
||||
.addColumn('search_vec', sql`tsvector`, (c) => c.notNull())
|
||||
.execute();
|
||||
}
|
||||
|
||||
export async function down(db: Kysely<any>): Promise<void> {
|
||||
if (Conf.db.dialect === 'postgres') {
|
||||
await db.schema.dropTable('nostr_pgfts').ifExists().execute();
|
||||
}
|
||||
await db.schema.dropTable('nostr_pgfts').ifExists().execute();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,21 +1,15 @@
|
|||
import { Kysely } from 'kysely';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
|
||||
export async function up(db: Kysely<any>): Promise<void> {
|
||||
if (Conf.db.dialect === 'postgres') {
|
||||
await db.schema
|
||||
.createIndex('nostr_pgfts_gin_search_vec')
|
||||
.ifNotExists()
|
||||
.on('nostr_pgfts')
|
||||
.using('gin')
|
||||
.column('search_vec')
|
||||
.execute();
|
||||
}
|
||||
await db.schema
|
||||
.createIndex('nostr_pgfts_gin_search_vec')
|
||||
.ifNotExists()
|
||||
.on('nostr_pgfts')
|
||||
.using('gin')
|
||||
.column('search_vec')
|
||||
.execute();
|
||||
}
|
||||
|
||||
export async function down(db: Kysely<any>): Promise<void> {
|
||||
if (Conf.db.dialect === 'postgres') {
|
||||
await db.schema.dropIndex('nostr_pgfts_gin_search_vec').ifExists().execute();
|
||||
}
|
||||
await db.schema.dropIndex('nostr_pgfts_gin_search_vec').ifExists().execute();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,10 +1,6 @@
|
|||
import { Kysely, sql } from 'kysely';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
|
||||
export async function up(db: Kysely<any>): Promise<void> {
|
||||
if (Conf.db.dialect !== 'postgres') return;
|
||||
|
||||
// Create new table and indexes.
|
||||
await db.schema
|
||||
.createTable('nostr_events_new')
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
import { assertEquals } from '@std/assert';
|
||||
import { generateSecretKey } from 'nostr-tools';
|
||||
|
||||
import { createTestDB, genEvent, getTestDB } from '@/test.ts';
|
||||
import { createTestDB, genEvent } from '@/test.ts';
|
||||
import { handleZaps } from '@/pipeline.ts';
|
||||
|
||||
Deno.test('store one zap receipt in nostr_events; convert it into event_zaps table format and store it', async () => {
|
||||
|
|
@ -58,7 +58,7 @@ Deno.test('store one zap receipt in nostr_events; convert it into event_zaps tab
|
|||
// If no error happens = ok
|
||||
|
||||
Deno.test('zap receipt does not have a "description" tag', async () => {
|
||||
await using db = await getTestDB();
|
||||
await using db = await createTestDB();
|
||||
const kysely = db.kysely;
|
||||
|
||||
const sk = generateSecretKey();
|
||||
|
|
@ -71,7 +71,7 @@ Deno.test('zap receipt does not have a "description" tag', async () => {
|
|||
});
|
||||
|
||||
Deno.test('zap receipt does not have a zap request stringified value in the "description" tag', async () => {
|
||||
await using db = await getTestDB();
|
||||
await using db = await createTestDB();
|
||||
const kysely = db.kysely;
|
||||
|
||||
const sk = generateSecretKey();
|
||||
|
|
@ -84,7 +84,7 @@ Deno.test('zap receipt does not have a zap request stringified value in the "des
|
|||
});
|
||||
|
||||
Deno.test('zap receipt does not have a "bolt11" tag', async () => {
|
||||
await using db = await getTestDB();
|
||||
await using db = await createTestDB();
|
||||
const kysely = db.kysely;
|
||||
|
||||
const sk = generateSecretKey();
|
||||
|
|
@ -103,7 +103,7 @@ Deno.test('zap receipt does not have a "bolt11" tag', async () => {
|
|||
});
|
||||
|
||||
Deno.test('zap request inside zap receipt does not have an "e" tag', async () => {
|
||||
await using db = await getTestDB();
|
||||
await using db = await createTestDB();
|
||||
const kysely = db.kysely;
|
||||
|
||||
const sk = generateSecretKey();
|
||||
|
|
|
|||
|
|
@ -16,12 +16,12 @@ export class Storages {
|
|||
private static _pubsub: Promise<InternalRelay> | undefined;
|
||||
private static _search: Promise<SearchStore> | undefined;
|
||||
|
||||
/** SQLite database to store events this Ditto server cares about. */
|
||||
/** SQL database to store events this Ditto server cares about. */
|
||||
public static async db(): Promise<EventsDB> {
|
||||
if (!this._db) {
|
||||
this._db = (async () => {
|
||||
const db = await DittoDB.getInstance();
|
||||
const store = new EventsDB(db);
|
||||
const { kysely } = await DittoDB.getInstance();
|
||||
const store = new EventsDB(kysely);
|
||||
await seedZapSplits(store);
|
||||
return store;
|
||||
})();
|
||||
|
|
|
|||
|
|
@ -13,10 +13,11 @@ import {
|
|||
NStore,
|
||||
} from '@nostrify/nostrify';
|
||||
import { Stickynotes } from '@soapbox/stickynotes';
|
||||
import { Kysely } from 'kysely';
|
||||
import { nip27 } from 'nostr-tools';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
import { DittoDatabase } from '@/db/DittoDB.ts';
|
||||
import { DittoTables } from '@/db/DittoTables.ts';
|
||||
import { dbEventsCounter } from '@/metrics.ts';
|
||||
import { RelayError } from '@/RelayError.ts';
|
||||
import { purifyEvent } from '@/storages/hydrate.ts';
|
||||
|
|
@ -30,7 +31,7 @@ type TagCondition = ({ event, count, value }: {
|
|||
value: string;
|
||||
}) => boolean;
|
||||
|
||||
/** SQLite database storage adapter for Nostr events. */
|
||||
/** SQL database storage adapter for Nostr events. */
|
||||
class EventsDB implements NStore {
|
||||
private store: NDatabase | NPostgres;
|
||||
private console = new Stickynotes('ditto:db:events');
|
||||
|
|
@ -52,21 +53,11 @@ class EventsDB implements NStore {
|
|||
't': ({ event, count, value }) => (event.kind === 1985 ? count < 20 : count < 5) && value.length < 50,
|
||||
};
|
||||
|
||||
constructor(private database: DittoDatabase) {
|
||||
const { dialect, kysely } = database;
|
||||
|
||||
if (dialect === 'postgres') {
|
||||
this.store = new NPostgres(kysely, {
|
||||
indexTags: EventsDB.indexTags,
|
||||
indexSearch: EventsDB.searchText,
|
||||
});
|
||||
} else {
|
||||
this.store = new NDatabase(kysely, {
|
||||
fts: 'sqlite',
|
||||
indexTags: EventsDB.indexTags,
|
||||
searchText: EventsDB.searchText,
|
||||
});
|
||||
}
|
||||
constructor(private kysely: Kysely<DittoTables>) {
|
||||
this.store = new NPostgres(kysely, {
|
||||
indexTags: EventsDB.indexTags,
|
||||
indexSearch: EventsDB.searchText,
|
||||
});
|
||||
}
|
||||
|
||||
/** Insert an event (and its tags) into the database. */
|
||||
|
|
@ -273,7 +264,7 @@ class EventsDB implements NStore {
|
|||
return tags.map(([_tag, value]) => value).join('\n');
|
||||
}
|
||||
|
||||
/** Converts filters to more performant, simpler filters that are better for SQLite. */
|
||||
/** Converts filters to more performant, simpler filters. */
|
||||
async expandFilters(filters: NostrFilter[]): Promise<NostrFilter[]> {
|
||||
filters = structuredClone(filters);
|
||||
|
||||
|
|
@ -286,7 +277,7 @@ class EventsDB implements NStore {
|
|||
) as { key: 'domain'; value: string } | undefined)?.value;
|
||||
|
||||
if (domain) {
|
||||
const query = this.database.kysely
|
||||
const query = this.kysely
|
||||
.selectFrom('pubkey_domains')
|
||||
.select('pubkey')
|
||||
.where('domain', '=', domain);
|
||||
|
|
|
|||
121
src/test.ts
121
src/test.ts
|
|
@ -1,21 +1,10 @@
|
|||
import fs from 'node:fs/promises';
|
||||
import path from 'node:path';
|
||||
|
||||
import { Database as Sqlite } from '@db/sqlite';
|
||||
import { NDatabase, NDatabaseSchema, NPostgresSchema } from '@nostrify/db';
|
||||
import { NostrEvent } from '@nostrify/nostrify';
|
||||
import { DenoSqlite3Dialect } from '@soapbox/kysely-deno-sqlite';
|
||||
import { finalizeEvent, generateSecretKey } from 'nostr-tools';
|
||||
import { FileMigrationProvider, Kysely, Migrator } from 'kysely';
|
||||
import { PostgresJSDialect, PostgresJSDialectConfig } from 'kysely-postgres-js';
|
||||
import postgres from 'postgres';
|
||||
|
||||
import { DittoDatabase, DittoDB } from '@/db/DittoDB.ts';
|
||||
import { DittoTables } from '@/db/DittoTables.ts';
|
||||
import { purifyEvent } from '@/storages/hydrate.ts';
|
||||
import { KyselyLogger } from '@/db/KyselyLogger.ts';
|
||||
import { EventsDB } from '@/storages/EventsDB.ts';
|
||||
import { Conf } from '@/config.ts';
|
||||
import { DittoDB } from '@/db/DittoDB.ts';
|
||||
import { purifyEvent } from '@/storages/hydrate.ts';
|
||||
import { EventsDB } from '@/storages/EventsDB.ts';
|
||||
|
||||
/** Import an event fixture by name in tests. */
|
||||
export async function eventFixture(name: string): Promise<NostrEvent> {
|
||||
|
|
@ -42,97 +31,22 @@ export function genEvent(t: Partial<NostrEvent> = {}, sk: Uint8Array = generateS
|
|||
return purifyEvent(event);
|
||||
}
|
||||
|
||||
/** Get an in-memory SQLite database to use for testing. It's automatically destroyed when it goes out of scope. */
|
||||
export async function getTestDB() {
|
||||
const kysely = new Kysely<DittoTables>({
|
||||
dialect: new DenoSqlite3Dialect({
|
||||
database: new Sqlite(':memory:'),
|
||||
}),
|
||||
});
|
||||
/** Create a database for testing. It uses `TEST_DATABASE_URL`, or creates an in-memory database by default. */
|
||||
export async function createTestDB() {
|
||||
const { testDatabaseUrl } = Conf;
|
||||
const { protocol } = new URL(testDatabaseUrl);
|
||||
const { kysely } = DittoDB.create(testDatabaseUrl, { poolSize: 1 });
|
||||
|
||||
const migrator = new Migrator({
|
||||
db: kysely,
|
||||
provider: new FileMigrationProvider({
|
||||
fs,
|
||||
path,
|
||||
migrationFolder: new URL(import.meta.resolve('./db/migrations')).pathname,
|
||||
}),
|
||||
});
|
||||
|
||||
await migrator.migrateToLatest();
|
||||
|
||||
const store = new NDatabase(kysely);
|
||||
await DittoDB.migrate(kysely);
|
||||
const store = new EventsDB(kysely);
|
||||
|
||||
return {
|
||||
store,
|
||||
kysely,
|
||||
[Symbol.asyncDispose]: () => kysely.destroy(),
|
||||
};
|
||||
}
|
||||
|
||||
/** Create an database for testing. */
|
||||
export const createTestDB = async (databaseUrl?: string) => {
|
||||
databaseUrl ??= Deno.env.get('DATABASE_URL') ?? 'sqlite://:memory:';
|
||||
|
||||
let dialect: 'sqlite' | 'postgres' = (() => {
|
||||
const protocol = databaseUrl.split(':')[0];
|
||||
switch (protocol) {
|
||||
case 'sqlite':
|
||||
return 'sqlite';
|
||||
case 'postgres':
|
||||
return protocol;
|
||||
case 'postgresql':
|
||||
return 'postgres';
|
||||
default:
|
||||
throw new Error(`Unsupported protocol: ${protocol}`);
|
||||
}
|
||||
})();
|
||||
|
||||
const allowToUseDATABASE_URL = Deno.env.get('ALLOW_TO_USE_DATABASE_URL')?.toLowerCase() ?? '';
|
||||
if (allowToUseDATABASE_URL !== 'true' && dialect === 'postgres') {
|
||||
console.warn(
|
||||
'%cRunning tests with sqlite, if you meant to use Postgres, run again with ALLOW_TO_USE_DATABASE_URL environment variable set to true',
|
||||
'color: yellow;',
|
||||
);
|
||||
dialect = 'sqlite';
|
||||
}
|
||||
|
||||
console.warn(`Using: ${dialect}`);
|
||||
|
||||
const db: DittoDatabase = { dialect } as DittoDatabase;
|
||||
|
||||
if (dialect === 'sqlite') {
|
||||
// migration 021_pgfts_index.ts calls 'Conf.db.dialect',
|
||||
// and this calls the DATABASE_URL environment variable.
|
||||
// The following line ensures to NOT use the DATABASE_URL that may exist in an .env file.
|
||||
Deno.env.set('DATABASE_URL', 'sqlite://:memory:');
|
||||
|
||||
db.kysely = new Kysely({
|
||||
dialect: new DenoSqlite3Dialect({
|
||||
database: new Sqlite(':memory:'),
|
||||
}),
|
||||
}) as Kysely<DittoTables> & Kysely<NDatabaseSchema>;
|
||||
} else {
|
||||
db.kysely = new Kysely({
|
||||
// @ts-ignore Kysely version mismatch.
|
||||
dialect: new PostgresJSDialect({
|
||||
postgres: postgres(Conf.databaseUrl, {
|
||||
max: Conf.pg.poolSize,
|
||||
}) as unknown as PostgresJSDialectConfig['postgres'],
|
||||
}),
|
||||
log: KyselyLogger,
|
||||
}) as Kysely<DittoTables> & Kysely<NPostgresSchema>;
|
||||
}
|
||||
|
||||
await DittoDB.migrate(db.kysely);
|
||||
const store = new EventsDB(db);
|
||||
|
||||
return {
|
||||
dialect,
|
||||
store,
|
||||
kysely: db.kysely,
|
||||
[Symbol.asyncDispose]: async () => {
|
||||
if (dialect === 'postgres') {
|
||||
// If we're testing against real Postgres, we will reuse the database
|
||||
// between tests, so we should drop the tables to keep each test fresh.
|
||||
if (['postgres:', 'postgresql:'].includes(protocol)) {
|
||||
for (
|
||||
const table of [
|
||||
'author_stats',
|
||||
|
|
@ -142,20 +56,17 @@ export const createTestDB = async (databaseUrl?: string) => {
|
|||
'kysely_migration_lock',
|
||||
'nip46_tokens',
|
||||
'pubkey_domains',
|
||||
'unattached_media',
|
||||
'nostr_events',
|
||||
'nostr_tags',
|
||||
'nostr_pgfts',
|
||||
'event_zaps',
|
||||
]
|
||||
) {
|
||||
await db.kysely.schema.dropTable(table).ifExists().cascade().execute();
|
||||
await kysely.schema.dropTable(table).ifExists().cascade().execute();
|
||||
}
|
||||
await db.kysely.destroy();
|
||||
await kysely.destroy();
|
||||
}
|
||||
},
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
export function sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
|
|
|
|||
117
src/trends.ts
117
src/trends.ts
|
|
@ -1,9 +1,10 @@
|
|||
import { NostrFilter } from '@nostrify/nostrify';
|
||||
import { Stickynotes } from '@soapbox/stickynotes';
|
||||
import { sql } from 'kysely';
|
||||
import { Kysely, sql } from 'kysely';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
import { DittoDatabase, DittoDB } from '@/db/DittoDB.ts';
|
||||
import { DittoDB } from '@/db/DittoDB.ts';
|
||||
import { DittoTables } from '@/db/DittoTables.ts';
|
||||
import { handleEvent } from '@/pipeline.ts';
|
||||
import { AdminSigner } from '@/signers/AdminSigner.ts';
|
||||
import { Time } from '@/utils/time.ts';
|
||||
|
|
@ -13,88 +14,50 @@ const console = new Stickynotes('ditto:trends');
|
|||
/** Get trending tag values for a given tag in the given time frame. */
|
||||
export async function getTrendingTagValues(
|
||||
/** Kysely instance to execute queries on. */
|
||||
{ dialect, kysely }: DittoDatabase,
|
||||
kysely: Kysely<DittoTables>,
|
||||
/** Tag name to filter by, eg `t` or `r`. */
|
||||
tagNames: string[],
|
||||
/** Filter of eligible events. */
|
||||
filter: NostrFilter,
|
||||
): Promise<{ value: string; authors: number; uses: number }[]> {
|
||||
if (dialect === 'postgres') {
|
||||
let query = kysely
|
||||
.selectFrom([
|
||||
'nostr_events',
|
||||
sql<{ key: string; value: string }>`jsonb_each_text(nostr_events.tags_index)`.as('kv'),
|
||||
sql<{ key: string; value: string }>`jsonb_array_elements_text(kv.value::jsonb)`.as('element'),
|
||||
])
|
||||
.select(({ fn }) => [
|
||||
fn<string>('lower', ['element.value']).as('value'),
|
||||
fn.agg<number>('count', ['nostr_events.pubkey']).distinct().as('authors'),
|
||||
fn.countAll<number>().as('uses'),
|
||||
])
|
||||
.where('kv.key', '=', (eb) => eb.fn.any(eb.val(tagNames)))
|
||||
.groupBy((eb) => eb.fn<string>('lower', ['element.value']))
|
||||
.orderBy((eb) => eb.fn.agg('count', ['nostr_events.pubkey']).distinct(), 'desc');
|
||||
let query = kysely
|
||||
.selectFrom([
|
||||
'nostr_events',
|
||||
sql<{ key: string; value: string }>`jsonb_each_text(nostr_events.tags_index)`.as('kv'),
|
||||
sql<{ key: string; value: string }>`jsonb_array_elements_text(kv.value::jsonb)`.as('element'),
|
||||
])
|
||||
.select(({ fn }) => [
|
||||
fn<string>('lower', ['element.value']).as('value'),
|
||||
fn.agg<number>('count', ['nostr_events.pubkey']).distinct().as('authors'),
|
||||
fn.countAll<number>().as('uses'),
|
||||
])
|
||||
.where('kv.key', '=', (eb) => eb.fn.any(eb.val(tagNames)))
|
||||
.groupBy((eb) => eb.fn<string>('lower', ['element.value']))
|
||||
.orderBy((eb) => eb.fn.agg('count', ['nostr_events.pubkey']).distinct(), 'desc');
|
||||
|
||||
if (filter.kinds) {
|
||||
query = query.where('nostr_events.kind', '=', ({ fn, val }) => fn.any(val(filter.kinds)));
|
||||
}
|
||||
if (filter.authors) {
|
||||
query = query.where('nostr_events.pubkey', '=', ({ fn, val }) => fn.any(val(filter.authors)));
|
||||
}
|
||||
if (typeof filter.since === 'number') {
|
||||
query = query.where('nostr_events.created_at', '>=', filter.since);
|
||||
}
|
||||
if (typeof filter.until === 'number') {
|
||||
query = query.where('nostr_events.created_at', '<=', filter.until);
|
||||
}
|
||||
if (typeof filter.limit === 'number') {
|
||||
query = query.limit(filter.limit);
|
||||
}
|
||||
|
||||
const rows = await query.execute();
|
||||
|
||||
return rows.map((row) => ({
|
||||
value: row.value,
|
||||
authors: Number(row.authors),
|
||||
uses: Number(row.uses),
|
||||
}));
|
||||
if (filter.kinds) {
|
||||
query = query.where('nostr_events.kind', '=', ({ fn, val }) => fn.any(val(filter.kinds)));
|
||||
}
|
||||
if (filter.authors) {
|
||||
query = query.where('nostr_events.pubkey', '=', ({ fn, val }) => fn.any(val(filter.authors)));
|
||||
}
|
||||
if (typeof filter.since === 'number') {
|
||||
query = query.where('nostr_events.created_at', '>=', filter.since);
|
||||
}
|
||||
if (typeof filter.until === 'number') {
|
||||
query = query.where('nostr_events.created_at', '<=', filter.until);
|
||||
}
|
||||
if (typeof filter.limit === 'number') {
|
||||
query = query.limit(filter.limit);
|
||||
}
|
||||
|
||||
if (dialect === 'sqlite') {
|
||||
let query = kysely
|
||||
.selectFrom('nostr_tags')
|
||||
.select(({ fn }) => [
|
||||
'nostr_tags.value',
|
||||
fn.agg<number>('count', ['nostr_tags.pubkey']).distinct().as('authors'),
|
||||
fn.countAll<number>().as('uses'),
|
||||
])
|
||||
.where('nostr_tags.name', 'in', tagNames)
|
||||
.groupBy('nostr_tags.value')
|
||||
.orderBy((c) => c.fn.agg('count', ['nostr_tags.pubkey']).distinct(), 'desc');
|
||||
const rows = await query.execute();
|
||||
|
||||
if (filter.kinds) {
|
||||
query = query.where('nostr_tags.kind', 'in', filter.kinds);
|
||||
}
|
||||
if (typeof filter.since === 'number') {
|
||||
query = query.where('nostr_tags.created_at', '>=', filter.since);
|
||||
}
|
||||
if (typeof filter.until === 'number') {
|
||||
query = query.where('nostr_tags.created_at', '<=', filter.until);
|
||||
}
|
||||
if (typeof filter.limit === 'number') {
|
||||
query = query.limit(filter.limit);
|
||||
}
|
||||
|
||||
const rows = await query.execute();
|
||||
|
||||
return rows.map((row) => ({
|
||||
value: row.value,
|
||||
authors: Number(row.authors),
|
||||
uses: Number(row.uses),
|
||||
}));
|
||||
}
|
||||
|
||||
return [];
|
||||
return rows.map((row) => ({
|
||||
value: row.value,
|
||||
authors: Number(row.authors),
|
||||
uses: Number(row.uses),
|
||||
}));
|
||||
}
|
||||
|
||||
/** Get trending tags and publish an event with them. */
|
||||
|
|
@ -107,7 +70,7 @@ export async function updateTrendingTags(
|
|||
aliases?: string[],
|
||||
) {
|
||||
console.info(`Updating trending ${l}...`);
|
||||
const db = await DittoDB.getInstance();
|
||||
const { kysely } = await DittoDB.getInstance();
|
||||
const signal = AbortSignal.timeout(1000);
|
||||
|
||||
const yesterday = Math.floor((Date.now() - Time.days(1)) / 1000);
|
||||
|
|
@ -116,7 +79,7 @@ export async function updateTrendingTags(
|
|||
const tagNames = aliases ? [tagName, ...aliases] : [tagName];
|
||||
|
||||
try {
|
||||
const trends = await getTrendingTagValues(db, tagNames, {
|
||||
const trends = await getTrendingTagValues(kysely, tagNames, {
|
||||
kinds,
|
||||
since: yesterday,
|
||||
until: now,
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ export class SimpleLRU<
|
|||
|
||||
constructor(fetchFn: FetchFn<K, V, { signal: AbortSignal }>, opts: LRUCache.Options<K, V, void>) {
|
||||
this.cache = new LRUCache({
|
||||
fetchMethod: (key, _staleValue, { signal }) => fetchFn(key, { signal: signal as AbortSignal }),
|
||||
fetchMethod: (key, _staleValue, { signal }) => fetchFn(key, { signal: signal as unknown as AbortSignal }),
|
||||
...opts,
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ import { z } from 'zod';
|
|||
|
||||
import { DittoTables } from '@/db/DittoTables.ts';
|
||||
import { findQuoteTag, findReplyTag, getTagSet } from '@/utils/tags.ts';
|
||||
import { Conf } from '@/config.ts';
|
||||
|
||||
interface UpdateStatsOpts {
|
||||
kysely: Kysely<DittoTables>;
|
||||
|
|
@ -197,16 +196,13 @@ export async function updateAuthorStats(
|
|||
notes_count: 0,
|
||||
};
|
||||
|
||||
let query = kysely
|
||||
const prev = await kysely
|
||||
.selectFrom('author_stats')
|
||||
.selectAll()
|
||||
.where('pubkey', '=', pubkey);
|
||||
.forUpdate()
|
||||
.where('pubkey', '=', pubkey)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (Conf.db.dialect === 'postgres') {
|
||||
query = query.forUpdate();
|
||||
}
|
||||
|
||||
const prev = await query.executeTakeFirst();
|
||||
const stats = fn(prev ?? empty);
|
||||
|
||||
if (prev) {
|
||||
|
|
@ -249,16 +245,13 @@ export async function updateEventStats(
|
|||
reactions: '{}',
|
||||
};
|
||||
|
||||
let query = kysely
|
||||
const prev = await kysely
|
||||
.selectFrom('event_stats')
|
||||
.selectAll()
|
||||
.where('event_id', '=', eventId);
|
||||
.forUpdate()
|
||||
.where('event_id', '=', eventId)
|
||||
.executeTakeFirst();
|
||||
|
||||
if (Conf.db.dialect === 'postgres') {
|
||||
query = query.forUpdate();
|
||||
}
|
||||
|
||||
const prev = await query.executeTakeFirst();
|
||||
const stats = fn(prev ?? empty);
|
||||
|
||||
if (prev) {
|
||||
|
|
|
|||
|
|
@ -1,7 +1,9 @@
|
|||
/// <reference lib="webworker" />
|
||||
|
||||
import Debug from '@soapbox/stickynotes/debug';
|
||||
import * as Comlink from 'comlink';
|
||||
|
||||
import './handlers/abortsignal.ts';
|
||||
import '@/workers/handlers/abortsignal.ts';
|
||||
import '@/sentry.ts';
|
||||
|
||||
const debug = Debug('ditto:fetch.worker');
|
||||
|
|
|
|||
|
|
@ -1,52 +0,0 @@
|
|||
import * as Comlink from 'comlink';
|
||||
import { asyncGeneratorTransferHandler } from 'comlink-async-generator';
|
||||
import { CompiledQuery, QueryResult } from 'kysely';
|
||||
|
||||
import type { SqliteWorker as _SqliteWorker } from './sqlite.worker.ts';
|
||||
|
||||
Comlink.transferHandlers.set('asyncGenerator', asyncGeneratorTransferHandler);
|
||||
|
||||
class SqliteWorker {
|
||||
#worker: Worker;
|
||||
#client: ReturnType<typeof Comlink.wrap<typeof _SqliteWorker>>;
|
||||
#ready: Promise<void>;
|
||||
|
||||
constructor() {
|
||||
this.#worker = new Worker(new URL('./sqlite.worker.ts', import.meta.url).href, { type: 'module' });
|
||||
this.#client = Comlink.wrap<typeof _SqliteWorker>(this.#worker);
|
||||
|
||||
this.#ready = new Promise<void>((resolve) => {
|
||||
const handleEvent = (event: MessageEvent) => {
|
||||
if (event.data[0] === 'ready') {
|
||||
this.#worker.removeEventListener('message', handleEvent);
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
this.#worker.addEventListener('message', handleEvent);
|
||||
});
|
||||
}
|
||||
|
||||
async open(path: string): Promise<void> {
|
||||
await this.#ready;
|
||||
return this.#client.open(path);
|
||||
}
|
||||
|
||||
async executeQuery<R>(query: CompiledQuery): Promise<QueryResult<R>> {
|
||||
await this.#ready;
|
||||
return this.#client.executeQuery(query) as Promise<QueryResult<R>>;
|
||||
}
|
||||
|
||||
async *streamQuery<R>(query: CompiledQuery): AsyncIterableIterator<QueryResult<R>> {
|
||||
await this.#ready;
|
||||
|
||||
for await (const result of await this.#client.streamQuery(query)) {
|
||||
yield result as QueryResult<R>;
|
||||
}
|
||||
}
|
||||
|
||||
destroy(): Promise<void> {
|
||||
return this.#client.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
export default SqliteWorker;
|
||||
|
|
@ -1,42 +0,0 @@
|
|||
/// <reference lib="webworker" />
|
||||
import { Database as SQLite } from '@db/sqlite';
|
||||
import * as Comlink from 'comlink';
|
||||
import { CompiledQuery, QueryResult } from 'kysely';
|
||||
import { asyncGeneratorTransferHandler } from 'comlink-async-generator';
|
||||
|
||||
import '@/sentry.ts';
|
||||
|
||||
let db: SQLite | undefined;
|
||||
|
||||
export const SqliteWorker = {
|
||||
open(path: string): void {
|
||||
db = new SQLite(path);
|
||||
},
|
||||
executeQuery<R>({ sql, parameters }: CompiledQuery): QueryResult<R> {
|
||||
if (!db) throw new Error('Database not open');
|
||||
|
||||
return {
|
||||
rows: db!.prepare(sql).all(...parameters as any[]) as R[],
|
||||
numAffectedRows: BigInt(db!.changes),
|
||||
insertId: BigInt(db!.lastInsertRowId),
|
||||
};
|
||||
},
|
||||
async *streamQuery<R>({ sql, parameters }: CompiledQuery): AsyncIterableIterator<QueryResult<R>> {
|
||||
if (!db) throw new Error('Database not open');
|
||||
|
||||
const stmt = db.prepare(sql).bind(...parameters as any[]);
|
||||
for (const row of stmt) {
|
||||
yield {
|
||||
rows: [row],
|
||||
};
|
||||
}
|
||||
},
|
||||
destroy() {
|
||||
db?.close();
|
||||
},
|
||||
};
|
||||
|
||||
Comlink.transferHandlers.set('asyncGenerator', asyncGeneratorTransferHandler);
|
||||
Comlink.expose(SqliteWorker);
|
||||
|
||||
self.postMessage(['ready']);
|
||||
Loading…
Add table
Reference in a new issue