mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 11:29:46 +00:00
ditto/db: make adapters use classes instead of static classes
This commit is contained in:
parent
398d79b45e
commit
63c0f8b032
9 changed files with 155 additions and 103 deletions
|
|
@ -6,6 +6,7 @@ export interface DittoDB extends AsyncDisposable {
|
|||
readonly kysely: Kysely<DittoTables>;
|
||||
readonly poolSize: number;
|
||||
readonly availableConnections: number;
|
||||
migrate(): Promise<void>;
|
||||
listen(channel: string, callback: (payload: string) => void): void;
|
||||
}
|
||||
|
||||
|
|
|
|||
52
packages/db/DittoPgMigrator.ts
Normal file
52
packages/db/DittoPgMigrator.ts
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
import fs from 'node:fs/promises';
|
||||
import path from 'node:path';
|
||||
|
||||
import { logi } from '@soapbox/logi';
|
||||
import { FileMigrationProvider, type Kysely, Migrator } from 'kysely';
|
||||
|
||||
import type { JsonValue } from '@std/json';
|
||||
|
||||
export class DittoPgMigrator {
|
||||
private migrator: Migrator;
|
||||
|
||||
// deno-lint-ignore no-explicit-any
|
||||
constructor(private kysely: Kysely<any>) {
|
||||
this.migrator = new Migrator({
|
||||
db: this.kysely,
|
||||
provider: new FileMigrationProvider({
|
||||
fs,
|
||||
path,
|
||||
migrationFolder: new URL(import.meta.resolve('./migrations')).pathname,
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
async migrate(): Promise<void> {
|
||||
logi({ level: 'info', ns: 'ditto.db.migration', msg: 'Running migrations...', state: 'started' });
|
||||
const { results, error } = await this.migrator.migrateToLatest();
|
||||
|
||||
if (error) {
|
||||
logi({
|
||||
level: 'fatal',
|
||||
ns: 'ditto.db.migration',
|
||||
msg: 'Migration failed.',
|
||||
state: 'failed',
|
||||
results: results as unknown as JsonValue,
|
||||
error: error instanceof Error ? error : null,
|
||||
});
|
||||
throw new Error('Migration failed.');
|
||||
} else {
|
||||
if (!results?.length) {
|
||||
logi({ level: 'info', ns: 'ditto.db.migration', msg: 'Everything up-to-date.', state: 'skipped' });
|
||||
} else {
|
||||
logi({
|
||||
level: 'info',
|
||||
ns: 'ditto.db.migration',
|
||||
msg: 'Migrations finished!',
|
||||
state: 'migrated',
|
||||
results: results as unknown as JsonValue,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -2,8 +2,9 @@ import { assertEquals } from '@std/assert';
|
|||
|
||||
import { DittoPglite } from './DittoPglite.ts';
|
||||
|
||||
Deno.test('DittoPglite.create', async () => {
|
||||
const db = DittoPglite.create('memory://');
|
||||
Deno.test('DittoPglite', async () => {
|
||||
const db = new DittoPglite('memory://');
|
||||
await db.migrate();
|
||||
|
||||
assertEquals(db.poolSize, 1);
|
||||
assertEquals(db.availableConnections, 1);
|
||||
|
|
|
|||
|
|
@ -4,42 +4,50 @@ import { PgliteDialect } from '@soapbox/kysely-pglite';
|
|||
import { Kysely } from 'kysely';
|
||||
|
||||
import { KyselyLogger } from '../KyselyLogger.ts';
|
||||
import { DittoPgMigrator } from '../DittoPgMigrator.ts';
|
||||
import { isWorker } from '../utils/worker.ts';
|
||||
|
||||
import type { DittoDB, DittoDBOpts } from '../DittoDB.ts';
|
||||
import type { DittoTables } from '../DittoTables.ts';
|
||||
|
||||
export class DittoPglite {
|
||||
static create(databaseUrl: string, opts?: DittoDBOpts): DittoDB {
|
||||
export class DittoPglite implements DittoDB {
|
||||
readonly poolSize = 1;
|
||||
readonly availableConnections = 1;
|
||||
readonly kysely: Kysely<DittoTables>;
|
||||
|
||||
private pglite: PGlite;
|
||||
private migrator: DittoPgMigrator;
|
||||
|
||||
constructor(databaseUrl: string, opts?: DittoDBOpts) {
|
||||
const url = new URL(databaseUrl);
|
||||
|
||||
if (url.protocol === 'file:' && isWorker()) {
|
||||
throw new Error('PGlite is not supported in worker threads.');
|
||||
}
|
||||
|
||||
const pglite = new PGlite(databaseUrl, {
|
||||
this.pglite = new PGlite(databaseUrl, {
|
||||
extensions: { pg_trgm },
|
||||
debug: opts?.debug,
|
||||
});
|
||||
|
||||
const kysely = new Kysely<DittoTables>({
|
||||
dialect: new PgliteDialect({ database: pglite }),
|
||||
this.kysely = new Kysely<DittoTables>({
|
||||
dialect: new PgliteDialect({ database: this.pglite }),
|
||||
log: KyselyLogger,
|
||||
});
|
||||
|
||||
const listen = (channel: string, callback: (payload: string) => void): void => {
|
||||
pglite.listen(channel, callback);
|
||||
};
|
||||
this.migrator = new DittoPgMigrator(this.kysely);
|
||||
}
|
||||
|
||||
return {
|
||||
kysely,
|
||||
poolSize: 1,
|
||||
availableConnections: 1,
|
||||
listen,
|
||||
[Symbol.asyncDispose]: async () => {
|
||||
await pglite.close();
|
||||
await kysely.destroy();
|
||||
},
|
||||
};
|
||||
listen(channel: string, callback: (payload: string) => void): void {
|
||||
this.pglite.listen(channel, callback);
|
||||
}
|
||||
|
||||
async migrate(): Promise<void> {
|
||||
await this.migrator.migrate();
|
||||
}
|
||||
|
||||
async [Symbol.asyncDispose](): Promise<void> {
|
||||
await this.pglite.close();
|
||||
await this.kysely.destroy();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import { DittoPolyPg } from './DittoPolyPg.ts';
|
||||
|
||||
Deno.test('DittoPolyPg', async () => {
|
||||
const db = DittoPolyPg.create('memory://');
|
||||
await DittoPolyPg.migrate(db.kysely);
|
||||
const db = new DittoPolyPg('memory://');
|
||||
await db.migrate();
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,70 +1,53 @@
|
|||
import fs from 'node:fs/promises';
|
||||
import path from 'node:path';
|
||||
|
||||
import { logi } from '@soapbox/logi';
|
||||
import { FileMigrationProvider, type Kysely, Migrator } from 'kysely';
|
||||
|
||||
import { DittoPglite } from './DittoPglite.ts';
|
||||
import { DittoPostgres } from './DittoPostgres.ts';
|
||||
|
||||
import type { JsonValue } from '@std/json';
|
||||
import type { Kysely } from 'kysely';
|
||||
import type { DittoDB, DittoDBOpts } from '../DittoDB.ts';
|
||||
import type { DittoTables } from '../DittoTables.ts';
|
||||
|
||||
/** Creates either a PGlite or Postgres connection depending on the databaseUrl. */
|
||||
export class DittoPolyPg {
|
||||
export class DittoPolyPg implements DittoDB {
|
||||
private adapter: DittoDB;
|
||||
|
||||
/** Open a new database connection. */
|
||||
static create(databaseUrl: string, opts?: DittoDBOpts): DittoDB {
|
||||
constructor(databaseUrl: string, opts?: DittoDBOpts) {
|
||||
const { protocol } = new URL(databaseUrl);
|
||||
|
||||
switch (protocol) {
|
||||
case 'file:':
|
||||
case 'memory:':
|
||||
return DittoPglite.create(databaseUrl, opts);
|
||||
this.adapter = new DittoPglite(databaseUrl, opts);
|
||||
break;
|
||||
case 'postgres:':
|
||||
case 'postgresql:':
|
||||
return DittoPostgres.create(databaseUrl, opts);
|
||||
this.adapter = new DittoPostgres(databaseUrl, opts);
|
||||
break;
|
||||
default:
|
||||
throw new Error('Unsupported database URL.');
|
||||
}
|
||||
}
|
||||
|
||||
/** Migrate the database to the latest version. */
|
||||
static async migrate(kysely: Kysely<DittoTables>) {
|
||||
const migrator = new Migrator({
|
||||
db: kysely,
|
||||
provider: new FileMigrationProvider({
|
||||
fs,
|
||||
path,
|
||||
migrationFolder: new URL(import.meta.resolve('../migrations')).pathname,
|
||||
}),
|
||||
});
|
||||
|
||||
logi({ level: 'info', ns: 'ditto.db.migration', msg: 'Running migrations...', state: 'started' });
|
||||
const { results, error } = await migrator.migrateToLatest();
|
||||
|
||||
if (error) {
|
||||
logi({
|
||||
level: 'fatal',
|
||||
ns: 'ditto.db.migration',
|
||||
msg: 'Migration failed.',
|
||||
state: 'failed',
|
||||
results: results as unknown as JsonValue,
|
||||
error: error instanceof Error ? error : null,
|
||||
});
|
||||
throw new Error('Migration failed.');
|
||||
} else {
|
||||
if (!results?.length) {
|
||||
logi({ level: 'info', ns: 'ditto.db.migration', msg: 'Everything up-to-date.', state: 'skipped' });
|
||||
} else {
|
||||
logi({
|
||||
level: 'info',
|
||||
ns: 'ditto.db.migration',
|
||||
msg: 'Migrations finished!',
|
||||
state: 'migrated',
|
||||
results: results as unknown as JsonValue,
|
||||
});
|
||||
get kysely(): Kysely<DittoTables> {
|
||||
return this.adapter.kysely;
|
||||
}
|
||||
|
||||
async migrate(): Promise<void> {
|
||||
await this.adapter.migrate();
|
||||
}
|
||||
|
||||
listen(channel: string, callback: (payload: string) => void): void {
|
||||
this.adapter.listen(channel, callback);
|
||||
}
|
||||
|
||||
get poolSize(): number {
|
||||
return this.adapter.poolSize;
|
||||
}
|
||||
|
||||
get availableConnections(): number {
|
||||
return this.adapter.availableConnections;
|
||||
}
|
||||
|
||||
async [Symbol.asyncDispose](): Promise<void> {
|
||||
await this.adapter[Symbol.asyncDispose]();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,53 +12,54 @@ import {
|
|||
import { type PostgresJSDialectConfig, PostgresJSDriver } from 'kysely-postgres-js';
|
||||
import postgres from 'postgres';
|
||||
|
||||
import { DittoPgMigrator } from '../DittoPgMigrator.ts';
|
||||
import { KyselyLogger } from '../KyselyLogger.ts';
|
||||
|
||||
import type { DittoDB, DittoDBOpts } from '../DittoDB.ts';
|
||||
import type { DittoTables } from '../DittoTables.ts';
|
||||
|
||||
export class DittoPostgres {
|
||||
static create(databaseUrl: string, opts?: DittoDBOpts): DittoDB {
|
||||
const pg = postgres(databaseUrl, { max: opts?.poolSize });
|
||||
export class DittoPostgres implements DittoDB {
|
||||
private pg: ReturnType<typeof postgres>;
|
||||
private migrator: DittoPgMigrator;
|
||||
|
||||
const kysely = new Kysely<DittoTables>({
|
||||
readonly kysely: Kysely<DittoTables>;
|
||||
|
||||
constructor(databaseUrl: string, opts?: DittoDBOpts) {
|
||||
this.pg = postgres(databaseUrl, { max: opts?.poolSize });
|
||||
|
||||
this.kysely = new Kysely<DittoTables>({
|
||||
dialect: {
|
||||
createAdapter() {
|
||||
return new PostgresAdapter();
|
||||
},
|
||||
createDriver() {
|
||||
return new PostgresJSDriver({
|
||||
postgres: pg as unknown as PostgresJSDialectConfig['postgres'],
|
||||
});
|
||||
},
|
||||
createIntrospector(db) {
|
||||
return new PostgresIntrospector(db);
|
||||
},
|
||||
createQueryCompiler() {
|
||||
return new DittoPostgresQueryCompiler();
|
||||
},
|
||||
createAdapter: () => new PostgresAdapter(),
|
||||
createDriver: () =>
|
||||
new PostgresJSDriver({ postgres: this.pg as unknown as PostgresJSDialectConfig['postgres'] }),
|
||||
createIntrospector: (db) => new PostgresIntrospector(db),
|
||||
createQueryCompiler: () => new DittoPostgresQueryCompiler(),
|
||||
},
|
||||
log: KyselyLogger,
|
||||
});
|
||||
|
||||
const listen = (channel: string, callback: (payload: string) => void): void => {
|
||||
pg.listen(channel, callback);
|
||||
};
|
||||
this.migrator = new DittoPgMigrator(this.kysely);
|
||||
}
|
||||
|
||||
return {
|
||||
kysely,
|
||||
get poolSize() {
|
||||
return pg.connections.open;
|
||||
},
|
||||
get availableConnections() {
|
||||
return pg.connections.idle;
|
||||
},
|
||||
listen,
|
||||
[Symbol.asyncDispose]: async () => {
|
||||
await pg.end();
|
||||
await kysely.destroy();
|
||||
},
|
||||
};
|
||||
listen(channel: string, callback: (payload: string) => void): void {
|
||||
this.pg.listen(channel, callback);
|
||||
}
|
||||
|
||||
async migrate(): Promise<void> {
|
||||
await this.migrator.migrate();
|
||||
}
|
||||
|
||||
get poolSize(): number {
|
||||
return this.pg.connections.open;
|
||||
}
|
||||
|
||||
get availableConnections(): number {
|
||||
return this.pg.connections.idle;
|
||||
}
|
||||
|
||||
async [Symbol.asyncDispose](): Promise<void> {
|
||||
await this.pg.end();
|
||||
await this.kysely.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -3,6 +3,8 @@ import { DummyDB } from './DummyDB.ts';
|
|||
|
||||
Deno.test('DummyDB', async () => {
|
||||
const db = new DummyDB();
|
||||
await db.migrate();
|
||||
|
||||
const rows = await db.kysely.selectFrom('nostr_events').selectAll().execute();
|
||||
|
||||
assertEquals(rows, []);
|
||||
|
|
|
|||
|
|
@ -23,6 +23,10 @@ export class DummyDB implements DittoDB {
|
|||
// noop
|
||||
}
|
||||
|
||||
migrate(): Promise<void> {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
[Symbol.asyncDispose](): Promise<void> {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue