Simplify database interfaces, make tests use pglite

This commit is contained in:
Alex Gleason 2024-09-11 11:48:31 -05:00
parent dc8d09a9da
commit f3ae200833
No known key found for this signature in database
GPG key ID: 7211D1F99744FBB7
25 changed files with 117 additions and 193 deletions

View file

@ -35,10 +35,10 @@ test:
postgres:
stage: test
script: deno task db:migrate && deno task test
script: sleep 1 && deno task test
services:
- postgres:16
variables:
DITTO_NSEC: nsec1zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zyg3zygs4rm7hz
DATABASE_URL: postgres://postgres:postgres@postgres:5432/postgres
TEST_DATABASE_URL: postgres://postgres:postgres@postgres:5432/postgres
POSTGRES_HOST_AUTH_METHOD: trust

View file

@ -9,8 +9,8 @@ import { nostrNow } from '@/utils.ts';
const signer = new AdminSigner();
const db = await DittoDB.getInstance();
const eventsDB = new EventsDB(db);
const { kysely } = await DittoDB.getInstance();
const eventsDB = new EventsDB(kysely);
const readable = Deno.stdin.readable
.pipeThrough(new TextDecoderStream())

View file

@ -6,8 +6,8 @@ import { AdminSigner } from '@/signers/AdminSigner.ts';
import { EventsDB } from '@/storages/EventsDB.ts';
import { nostrNow } from '@/utils.ts';
const db = await DittoDB.getInstance();
const eventsDB = new EventsDB(db);
const { kysely } = await DittoDB.getInstance();
const eventsDB = new EventsDB(kysely);
const [pubkeyOrNpub, role] = Deno.args;
const pubkey = pubkeyOrNpub.startsWith('npub1') ? nip19.decode(pubkeyOrNpub as `npub1${string}`).data : pubkeyOrNpub;

View file

@ -1,11 +1,4 @@
import { Conf } from '@/config.ts';
import { DittoDB } from '@/db/DittoDB.ts';
import { sleep } from '@/test.ts';
if (Deno.env.get('CI') && Conf.db.dialect === 'postgres') {
console.info('Waiting 1 second for postgres to start...');
await sleep(1_000);
}
// This migrates kysely internally.
const { kysely } = await DittoDB.getInstance();

View file

@ -9,8 +9,8 @@ import { nip19 } from 'nostr-tools';
import { DittoDB } from '@/db/DittoDB.ts';
import { EventsDB } from '@/storages/EventsDB.ts';
const db = await DittoDB.getInstance();
const eventsDB = new EventsDB(db);
const { kysely } = await DittoDB.getInstance();
const eventsDB = new EventsDB(kysely);
interface ImportEventsOpts {
profilesOnly: boolean;

View file

@ -18,6 +18,6 @@ try {
}
const store = await Storages.db();
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
await refreshAuthorStats({ pubkey, kysely, store });

View file

@ -82,7 +82,7 @@ const createTokenController: AppController = async (c) => {
async function getToken(
{ pubkey, secret, relays = [] }: { pubkey: string; secret?: string; relays?: string[] },
): Promise<`token1${string}`> {
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
const token = generateToken();
const serverSeckey = generateSecretKey();

View file

@ -578,7 +578,7 @@ const zappedByController: AppController = async (c) => {
const id = c.req.param('id');
const params = c.get('listPagination');
const store = await Storages.db();
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
const zaps = await kysely.selectFrom('event_zaps')
.selectAll()

View file

@ -222,7 +222,7 @@ async function topicToFilter(
async function getTokenPubkey(token: string): Promise<string | undefined> {
if (token.startsWith('token1')) {
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
const { user_pubkey } = await kysely
.selectFrom('nip46_tokens')

View file

@ -6,9 +6,11 @@ import { dbAvailableConnectionsGauge, dbPoolSizeGauge } from '@/metrics.ts';
/** Prometheus/OpenMetrics controller. */
export const metricsController: AppController = async (c) => {
const db = await DittoDB.getInstance();
// Update some metrics at request time.
dbPoolSizeGauge.set(DittoDB.poolSize);
dbAvailableConnectionsGauge.set(DittoDB.availableConnections);
dbPoolSizeGauge.set(db.poolSize);
dbAvailableConnectionsGauge.set(db.availableConnections);
const metrics = await register.metrics();

View file

@ -6,57 +6,35 @@ import { FileMigrationProvider, Kysely, Migrator } from 'kysely';
import { Conf } from '@/config.ts';
import { DittoPglite } from '@/db/adapters/DittoPglite.ts';
import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts';
import { DittoDatabase, DittoDatabaseOpts } from '@/db/DittoDatabase.ts';
import { DittoTables } from '@/db/DittoTables.ts';
export class DittoDB {
private static kysely: Promise<Kysely<DittoTables>> | undefined;
private static db: DittoDatabase | undefined;
static getInstance(): Promise<Kysely<DittoTables>> {
if (!this.kysely) {
this.kysely = this._getInstance();
/** Create (and migrate) the database if it isn't been already, or return the existing connection. */
static async getInstance(): Promise<DittoDatabase> {
if (!this.db) {
this.db = this.create(Conf.databaseUrl, { poolSize: Conf.pg.poolSize });
await this.migrate(this.db.kysely);
}
return this.kysely;
return this.db;
}
static async _getInstance(): Promise<Kysely<DittoTables>> {
const { protocol } = new URL(Conf.databaseUrl);
let kysely: Kysely<DittoTables>;
/** Open a new database connection. */
static create(databaseUrl: string, opts?: DittoDatabaseOpts): DittoDatabase {
const { protocol } = new URL(databaseUrl);
switch (protocol) {
case 'file:':
case 'memory:':
kysely = await DittoPglite.getInstance();
break;
return DittoPglite.create(databaseUrl);
case 'postgres:':
case 'postgresql:':
kysely = await DittoPostgres.getInstance();
break;
return DittoPostgres.create(databaseUrl, opts);
default:
throw new Error('Unsupported database URL.');
}
await this.migrate(kysely);
return kysely;
}
static get poolSize(): number {
const { protocol } = new URL(Conf.databaseUrl);
if (['postgres:', 'postgresql:'].includes(protocol)) {
return DittoPostgres.poolSize;
}
return 1;
}
static get availableConnections(): number {
const { protocol } = new URL(Conf.databaseUrl);
if (['postgres:', 'postgresql:'].includes(protocol)) {
return DittoPostgres.availableConnections;
}
return 1;
}
/** Migrate the database to the latest version. */

13
src/db/DittoDatabase.ts Normal file
View file

@ -0,0 +1,13 @@
import { Kysely } from 'kysely';
import { DittoTables } from '@/db/DittoTables.ts';
export interface DittoDatabase {
readonly kysely: Kysely<DittoTables>;
readonly poolSize: number;
readonly availableConnections: number;
}
export interface DittoDatabaseOpts {
poolSize?: number;
}

View file

@ -2,32 +2,23 @@ import { PGlite } from '@electric-sql/pglite';
import { PgliteDialect } from '@soapbox/kysely-pglite';
import { Kysely } from 'kysely';
import { Conf } from '@/config.ts';
import { DittoDatabase } from '@/db/DittoDatabase.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { KyselyLogger } from '@/db/KyselyLogger.ts';
export class DittoPglite {
static db: Kysely<DittoTables> | undefined;
static create(databaseUrl: string): DittoDatabase {
const kysely = new Kysely<DittoTables>({
dialect: new PgliteDialect({
database: new PGlite(databaseUrl),
}),
log: KyselyLogger,
});
// deno-lint-ignore require-await
static async getInstance(): Promise<Kysely<DittoTables>> {
if (!this.db) {
this.db = new Kysely<DittoTables>({
dialect: new PgliteDialect({
database: new PGlite(Conf.databaseUrl),
}),
log: KyselyLogger,
}) as Kysely<DittoTables>;
}
return this.db;
}
static get poolSize() {
return 1;
}
static get availableConnections(): number {
return 1;
return {
kysely,
poolSize: 1,
availableConnections: 1,
};
}
}

View file

@ -12,51 +12,43 @@ import {
import { PostgresJSDialectConfig, PostgresJSDriver } from 'kysely-postgres-js';
import postgres from 'postgres';
import { Conf } from '@/config.ts';
import { DittoDatabase, DittoDatabaseOpts } from '@/db/DittoDatabase.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { KyselyLogger } from '@/db/KyselyLogger.ts';
export class DittoPostgres {
static kysely: Kysely<DittoTables> | undefined;
static postgres?: postgres.Sql;
static create(databaseUrl: string, opts?: DittoDatabaseOpts): DittoDatabase {
const pg = postgres(databaseUrl, { max: opts?.poolSize });
// deno-lint-ignore require-await
static async getInstance(): Promise<Kysely<DittoTables>> {
if (!this.postgres) {
this.postgres = postgres(Conf.databaseUrl, { max: Conf.pg.poolSize });
}
if (!this.kysely) {
this.kysely = new Kysely<DittoTables>({
dialect: {
createAdapter() {
return new PostgresAdapter();
},
createDriver() {
return new PostgresJSDriver({
postgres: DittoPostgres.postgres as unknown as PostgresJSDialectConfig['postgres'],
});
},
createIntrospector(db) {
return new PostgresIntrospector(db);
},
createQueryCompiler() {
return new DittoPostgresQueryCompiler();
},
const kysely = new Kysely<DittoTables>({
dialect: {
createAdapter() {
return new PostgresAdapter();
},
log: KyselyLogger,
});
}
createDriver() {
return new PostgresJSDriver({
postgres: pg as unknown as PostgresJSDialectConfig['postgres'],
});
},
createIntrospector(db) {
return new PostgresIntrospector(db);
},
createQueryCompiler() {
return new DittoPostgresQueryCompiler();
},
},
log: KyselyLogger,
});
return this.kysely;
}
static get poolSize() {
return this.postgres?.connections.open ?? 0;
}
static get availableConnections(): number {
return this.postgres?.connections.idle ?? 0;
return {
kysely,
get poolSize() {
return pg.connections.open;
},
get availableConnections() {
return pg.connections.idle;
},
};
}
}

View file

@ -1,19 +1,13 @@
import { Kysely, sql } from 'kysely';
import { Conf } from '@/config.ts';
export async function up(db: Kysely<any>): Promise<void> {
if (Conf.db.dialect === 'postgres') {
await db.schema.createTable('nostr_pgfts')
.ifNotExists()
.addColumn('event_id', 'text', (c) => c.primaryKey().references('nostr_events.id').onDelete('cascade'))
.addColumn('search_vec', sql`tsvector`, (c) => c.notNull())
.execute();
}
await db.schema.createTable('nostr_pgfts')
.ifNotExists()
.addColumn('event_id', 'text', (c) => c.primaryKey().references('nostr_events.id').onDelete('cascade'))
.addColumn('search_vec', sql`tsvector`, (c) => c.notNull())
.execute();
}
export async function down(db: Kysely<any>): Promise<void> {
if (Conf.db.dialect === 'postgres') {
await db.schema.dropTable('nostr_pgfts').ifExists().execute();
}
await db.schema.dropTable('nostr_pgfts').ifExists().execute();
}

View file

@ -1,21 +1,15 @@
import { Kysely } from 'kysely';
import { Conf } from '@/config.ts';
export async function up(db: Kysely<any>): Promise<void> {
if (Conf.db.dialect === 'postgres') {
await db.schema
.createIndex('nostr_pgfts_gin_search_vec')
.ifNotExists()
.on('nostr_pgfts')
.using('gin')
.column('search_vec')
.execute();
}
await db.schema
.createIndex('nostr_pgfts_gin_search_vec')
.ifNotExists()
.on('nostr_pgfts')
.using('gin')
.column('search_vec')
.execute();
}
export async function down(db: Kysely<any>): Promise<void> {
if (Conf.db.dialect === 'postgres') {
await db.schema.dropIndex('nostr_pgfts_gin_search_vec').ifExists().execute();
}
await db.schema.dropIndex('nostr_pgfts_gin_search_vec').ifExists().execute();
}

View file

@ -1,10 +1,6 @@
import { Kysely, sql } from 'kysely';
import { Conf } from '@/config.ts';
export async function up(db: Kysely<any>): Promise<void> {
if (Conf.db.dialect !== 'postgres') return;
// Create new table and indexes.
await db.schema
.createTable('nostr_events_new')

View file

@ -20,7 +20,7 @@ export const signerMiddleware: AppMiddleware = async (c, next) => {
if (bech32.startsWith('token1')) {
try {
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
const { user_pubkey, server_seckey, relays } = await kysely
.selectFrom('nip46_tokens')

View file

@ -1,7 +1,7 @@
import { assertEquals } from '@std/assert';
import { generateSecretKey } from 'nostr-tools';
import { createTestDB, genEvent, getTestDB } from '@/test.ts';
import { createTestDB, genEvent } from '@/test.ts';
import { handleZaps } from '@/pipeline.ts';
Deno.test('store one zap receipt in nostr_events; convert it into event_zaps table format and store it', async () => {
@ -58,7 +58,7 @@ Deno.test('store one zap receipt in nostr_events; convert it into event_zaps tab
// If no error happens = ok
Deno.test('zap receipt does not have a "description" tag', async () => {
await using db = await getTestDB();
await using db = await createTestDB();
const kysely = db.kysely;
const sk = generateSecretKey();
@ -71,7 +71,7 @@ Deno.test('zap receipt does not have a "description" tag', async () => {
});
Deno.test('zap receipt does not have a zap request stringified value in the "description" tag', async () => {
await using db = await getTestDB();
await using db = await createTestDB();
const kysely = db.kysely;
const sk = generateSecretKey();
@ -84,7 +84,7 @@ Deno.test('zap receipt does not have a zap request stringified value in the "des
});
Deno.test('zap receipt does not have a "bolt11" tag', async () => {
await using db = await getTestDB();
await using db = await createTestDB();
const kysely = db.kysely;
const sk = generateSecretKey();
@ -103,7 +103,7 @@ Deno.test('zap receipt does not have a "bolt11" tag', async () => {
});
Deno.test('zap request inside zap receipt does not have an "e" tag', async () => {
await using db = await getTestDB();
await using db = await createTestDB();
const kysely = db.kysely;
const sk = generateSecretKey();

View file

@ -53,7 +53,7 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
throw new RelayError('blocked', 'user is disabled');
}
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
await Promise.all([
storeEvent(event, signal),
@ -104,7 +104,7 @@ async function existsInDB(event: DittoEvent): Promise<boolean> {
async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<void> {
await hydrateEvents({ events: [event], store: await Storages.db(), signal });
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
const domain = await kysely
.selectFrom('pubkey_domains')
.select('domain')
@ -118,7 +118,7 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<voi
async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise<undefined> {
if (NKinds.ephemeral(event.kind)) return;
const store = await Storages.db();
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
await updateStats({ event, store, kysely }).catch(debug);
await store.event(event, { signal });
@ -146,7 +146,7 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise<vo
// Track pubkey domain.
try {
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
const { domain } = parseNip05(nip05);
await sql`

View file

@ -20,8 +20,8 @@ export class Storages {
public static async db(): Promise<EventsDB> {
if (!this._db) {
this._db = (async () => {
const db = await DittoDB.getInstance();
const store = new EventsDB(db);
const { kysely } = await DittoDB.getInstance();
const store = new EventsDB(kysely);
await seedZapSplits(store);
return store;
})();

View file

@ -18,7 +18,7 @@ interface HydrateOpts {
/** Hydrate events using the provided storage. */
async function hydrateEvents(opts: HydrateOpts): Promise<DittoEvent[]> {
const { events, store, signal, kysely = await DittoDB.getInstance() } = opts;
const { events, store, signal, kysely = (await DittoDB.getInstance()).kysely } = opts;
if (!events.length) {
return events;

View file

@ -1,16 +1,9 @@
import { PGlite } from '@electric-sql/pglite';
import { NostrEvent } from '@nostrify/nostrify';
import { PgliteDialect } from '@soapbox/kysely-pglite';
import { finalizeEvent, generateSecretKey } from 'nostr-tools';
import { Kysely } from 'kysely';
import { PostgresJSDialect, PostgresJSDialectConfig } from 'kysely-postgres-js';
import postgres from 'postgres';
import { Conf } from '@/config.ts';
import { DittoDB } from '@/db/DittoDB.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { purifyEvent } from '@/storages/hydrate.ts';
import { KyselyLogger } from '@/db/KyselyLogger.ts';
import { EventsDB } from '@/storages/EventsDB.ts';
/** Import an event fixture by name in tests. */
@ -41,31 +34,7 @@ export function genEvent(t: Partial<NostrEvent> = {}, sk: Uint8Array = generateS
/** Create an database for testing. */
export const createTestDB = async (databaseUrl = Conf.testDatabaseUrl) => {
const { protocol } = new URL(databaseUrl);
const kysely: Kysely<DittoTables> = (() => {
switch (protocol) {
case 'postgres:':
case 'postgresql:':
return new Kysely({
// @ts-ignore Kysely version mismatch.
dialect: new PostgresJSDialect({
postgres: postgres(databaseUrl, {
max: Conf.pg.poolSize,
}) as unknown as PostgresJSDialectConfig['postgres'],
}),
log: KyselyLogger,
});
case 'file:':
case 'memory:':
return new Kysely({
dialect: new PgliteDialect({
database: new PGlite(databaseUrl),
}),
});
default:
throw new Error(`Unsupported database URL protocol: ${protocol}`);
}
})();
const { kysely } = DittoDB.create(databaseUrl, { poolSize: 1 });
await DittoDB.migrate(kysely);
const store = new EventsDB(kysely);

View file

@ -70,7 +70,7 @@ export async function updateTrendingTags(
aliases?: string[],
) {
console.info(`Updating trending ${l}...`);
const db = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
const signal = AbortSignal.timeout(1000);
const yesterday = Math.floor((Date.now() - Time.days(1)) / 1000);
@ -79,7 +79,7 @@ export async function updateTrendingTags(
const tagNames = aliases ? [tagName, ...aliases] : [tagName];
try {
const trends = await getTrendingTagValues(db, tagNames, {
const trends = await getTrendingTagValues(kysely, tagNames, {
kinds,
since: yesterday,
until: now,

View file

@ -1,3 +1,5 @@
/// <reference lib="webworker" />
import Debug from '@soapbox/stickynotes/debug';
import * as Comlink from 'comlink';