Rework DittoDB to return an object

This commit is contained in:
Alex Gleason 2024-08-11 19:02:27 -05:00
parent f830271ce2
commit 552de01a17
No known key found for this signature in database
GPG key ID: 7211D1F99744FBB7
13 changed files with 86 additions and 74 deletions

View file

@ -82,7 +82,7 @@ const createTokenController: AppController = async (c) => {
async function getToken( async function getToken(
{ pubkey, secret, relays = [] }: { pubkey: string; secret?: string; relays?: string[] }, { pubkey, secret, relays = [] }: { pubkey: string; secret?: string; relays?: string[] },
): Promise<`token1${string}`> { ): Promise<`token1${string}`> {
const kysely = await DittoDB.getInstance(); const { kysely } = await DittoDB.getInstance();
const token = generateToken(); const token = generateToken();
const serverSeckey = generateSecretKey(); const serverSeckey = generateSecretKey();

View file

@ -63,7 +63,7 @@ const statusController: AppController = async (c) => {
const createStatusController: AppController = async (c) => { const createStatusController: AppController = async (c) => {
const body = await parseBody(c.req.raw); const body = await parseBody(c.req.raw);
const result = createStatusSchema.safeParse(body); const result = createStatusSchema.safeParse(body);
const kysely = await DittoDB.getInstance(); const { kysely } = await DittoDB.getInstance();
const store = c.get('store'); const store = c.get('store');
if (!result.success) { if (!result.success) {
@ -565,9 +565,9 @@ const zappedByController: AppController = async (c) => {
const id = c.req.param('id'); const id = c.req.param('id');
const params = c.get('listPagination'); const params = c.get('listPagination');
const store = await Storages.db(); const store = await Storages.db();
const db = await DittoDB.getInstance(); const { kysely } = await DittoDB.getInstance();
const zaps = await db.selectFrom('event_zaps') const zaps = await kysely.selectFrom('event_zaps')
.selectAll() .selectAll()
.where('target_event_id', '=', id) .where('target_event_id', '=', id)
.orderBy('amount_millisats', 'desc') .orderBy('amount_millisats', 'desc')

View file

@ -189,7 +189,7 @@ async function topicToFilter(
async function getTokenPubkey(token: string): Promise<string | undefined> { async function getTokenPubkey(token: string): Promise<string | undefined> {
if (token.startsWith('token1')) { if (token.startsWith('token1')) {
const kysely = await DittoDB.getInstance(); const { kysely } = await DittoDB.getInstance();
const { user_pubkey } = await kysely const { user_pubkey } = await kysely
.selectFrom('nip46_tokens') .selectFrom('nip46_tokens')

View file

@ -1,40 +1,56 @@
import fs from 'node:fs/promises'; import fs from 'node:fs/promises';
import path from 'node:path'; import path from 'node:path';
import { NDatabaseSchema, NPostgresSchema } from '@nostrify/db';
import { FileMigrationProvider, Kysely, Migrator } from 'kysely'; import { FileMigrationProvider, Kysely, Migrator } from 'kysely';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts'; import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts';
import { DittoSQLite } from '@/db/adapters/DittoSQLite.ts'; import { DittoSQLite } from '@/db/adapters/DittoSQLite.ts';
import { DittoTables } from '@/db/DittoTables.ts'; import { DittoTables } from '@/db/DittoTables.ts';
import { EventsDB } from '@/storages/EventsDB.ts';
export type DittoDatabase = {
dialect: 'sqlite';
store: EventsDB;
kysely: Kysely<DittoTables> & Kysely<NDatabaseSchema>;
} | {
dialect: 'postgres';
store: EventsDB;
kysely: Kysely<DittoTables> & Kysely<NPostgresSchema>;
};
export class DittoDB { export class DittoDB {
private static kysely: Promise<Kysely<DittoTables>> | undefined; private static db: Promise<DittoDatabase> | undefined;
static getInstance(): Promise<Kysely<DittoTables>> { static getInstance(): Promise<DittoDatabase> {
if (!this.kysely) { if (!this.db) {
this.kysely = this._getInstance(); this.db = this._getInstance();
} }
return this.kysely; return this.db;
} }
static async _getInstance(): Promise<Kysely<DittoTables>> { static async _getInstance(): Promise<DittoDatabase> {
let kysely: Kysely<DittoTables>; const result = {} as DittoDatabase;
switch (Conf.db.dialect) { switch (Conf.db.dialect) {
case 'sqlite': case 'sqlite':
kysely = await DittoSQLite.getInstance(); result.dialect = 'sqlite';
result.kysely = await DittoSQLite.getInstance();
result.store = new EventsDB(result.kysely as any);
break; break;
case 'postgres': case 'postgres':
kysely = await DittoPostgres.getInstance(); result.dialect = 'postgres';
result.kysely = await DittoPostgres.getInstance();
result.store = new EventsDB(result.kysely as any);
break; break;
default: default:
throw new Error('Unsupported database URL.'); throw new Error('Unsupported database URL.');
} }
await this.migrate(kysely); await this.migrate(result.kysely);
return kysely; return result;
} }
static get poolSize(): number { static get poolSize(): number {
@ -52,7 +68,7 @@ export class DittoDB {
} }
/** Migrate the database to the latest version. */ /** Migrate the database to the latest version. */
static async migrate(kysely: Kysely<DittoTables>) { static async migrate(kysely: DittoDatabase['kysely']) {
const migrator = new Migrator({ const migrator = new Migrator({
db: kysely, db: kysely,
provider: new FileMigrationProvider({ provider: new FileMigrationProvider({

View file

@ -1,7 +1,4 @@
export interface DittoTables { export interface DittoTables {
nostr_events: EventRow;
nostr_tags: TagRow;
nostr_fts5: EventFTSRow;
nip46_tokens: NIP46TokenRow; nip46_tokens: NIP46TokenRow;
unattached_media: UnattachedMediaRow; unattached_media: UnattachedMediaRow;
author_stats: AuthorStatsRow; author_stats: AuthorStatsRow;
@ -27,30 +24,6 @@ interface EventStatsRow {
zaps_amount: number; zaps_amount: number;
} }
interface EventRow {
id: string;
kind: number;
pubkey: string;
content: string;
created_at: number;
tags: string;
sig: string;
}
interface EventFTSRow {
event_id: string;
content: string;
}
interface TagRow {
event_id: string;
name: string;
value: string;
kind: number;
pubkey: string;
created_at: number;
}
interface NIP46TokenRow { interface NIP46TokenRow {
api_token: string; api_token: string;
user_pubkey: string; user_pubkey: string;

View file

@ -1,3 +1,4 @@
import { NPostgresSchema } from '@nostrify/db';
import { import {
BinaryOperationNode, BinaryOperationNode,
FunctionNode, FunctionNode,
@ -17,11 +18,11 @@ import { DittoTables } from '@/db/DittoTables.ts';
import { KyselyLogger } from '@/db/KyselyLogger.ts'; import { KyselyLogger } from '@/db/KyselyLogger.ts';
export class DittoPostgres { export class DittoPostgres {
static db: Kysely<DittoTables> | undefined; static db: Kysely<DittoTables> & Kysely<NPostgresSchema> | undefined;
static postgres?: postgres.Sql; static postgres?: postgres.Sql;
// deno-lint-ignore require-await // deno-lint-ignore require-await
static async getInstance(): Promise<Kysely<DittoTables>> { static async getInstance(): Promise<Kysely<DittoTables> & Kysely<NPostgresSchema>> {
if (!this.postgres) { if (!this.postgres) {
this.postgres = postgres(Conf.databaseUrl, { max: Conf.pg.poolSize }); this.postgres = postgres(Conf.databaseUrl, { max: Conf.pg.poolSize });
} }
@ -45,7 +46,7 @@ export class DittoPostgres {
}, },
}, },
log: KyselyLogger, log: KyselyLogger,
}); }) as Kysely<DittoTables> & Kysely<NPostgresSchema>;
} }
return this.db; return this.db;

View file

@ -1,3 +1,4 @@
import { NDatabaseSchema } from '@nostrify/db';
import { PolySqliteDialect } from '@soapbox/kysely-deno-sqlite'; import { PolySqliteDialect } from '@soapbox/kysely-deno-sqlite';
import { Kysely, sql } from 'kysely'; import { Kysely, sql } from 'kysely';
@ -7,19 +8,19 @@ import { KyselyLogger } from '@/db/KyselyLogger.ts';
import SqliteWorker from '@/workers/sqlite.ts'; import SqliteWorker from '@/workers/sqlite.ts';
export class DittoSQLite { export class DittoSQLite {
static db: Kysely<DittoTables> | undefined; static db: Kysely<DittoTables> & Kysely<NDatabaseSchema> | undefined;
static async getInstance(): Promise<Kysely<DittoTables>> { static async getInstance(): Promise<Kysely<DittoTables> & Kysely<NDatabaseSchema>> {
if (!this.db) { if (!this.db) {
const sqliteWorker = new SqliteWorker(); const sqliteWorker = new SqliteWorker();
await sqliteWorker.open(this.path); await sqliteWorker.open(this.path);
this.db = new Kysely<DittoTables>({ this.db = new Kysely({
dialect: new PolySqliteDialect({ dialect: new PolySqliteDialect({
database: sqliteWorker, database: sqliteWorker,
}), }),
log: KyselyLogger, log: KyselyLogger,
}); }) as Kysely<DittoTables> & Kysely<NDatabaseSchema>;
// Set PRAGMA values. // Set PRAGMA values.
await Promise.all([ await Promise.all([

View file

@ -14,7 +14,7 @@ interface UnattachedMedia {
/** Add unattached media into the database. */ /** Add unattached media into the database. */
async function insertUnattachedMedia(media: UnattachedMedia) { async function insertUnattachedMedia(media: UnattachedMedia) {
const kysely = await DittoDB.getInstance(); const { kysely } = await DittoDB.getInstance();
await kysely.insertInto('unattached_media') await kysely.insertInto('unattached_media')
.values({ ...media, data: JSON.stringify(media.data) }) .values({ ...media, data: JSON.stringify(media.data) })
.execute(); .execute();
@ -44,7 +44,7 @@ function getUnattachedMedia(kysely: Kysely<DittoTables>, until: Date) {
/** Delete unattached media by URL. */ /** Delete unattached media by URL. */
async function deleteUnattachedMediaByUrl(url: string) { async function deleteUnattachedMediaByUrl(url: string) {
const kysely = await DittoDB.getInstance(); const { kysely } = await DittoDB.getInstance();
return kysely.deleteFrom('unattached_media') return kysely.deleteFrom('unattached_media')
.where('url', '=', url) .where('url', '=', url)
.execute(); .execute();
@ -67,7 +67,7 @@ async function getUnattachedMediaByIds(kysely: Kysely<DittoTables>, ids: string[
/** Delete rows as an event with media is being created. */ /** Delete rows as an event with media is being created. */
async function deleteAttachedMedia(pubkey: string, urls: string[]): Promise<void> { async function deleteAttachedMedia(pubkey: string, urls: string[]): Promise<void> {
if (!urls.length) return; if (!urls.length) return;
const kysely = await DittoDB.getInstance(); const { kysely } = await DittoDB.getInstance();
await kysely.deleteFrom('unattached_media') await kysely.deleteFrom('unattached_media')
.where('pubkey', '=', pubkey) .where('pubkey', '=', pubkey)
.where('url', 'in', urls) .where('url', 'in', urls)

View file

@ -20,7 +20,7 @@ export const signerMiddleware: AppMiddleware = async (c, next) => {
if (bech32.startsWith('token1')) { if (bech32.startsWith('token1')) {
try { try {
const kysely = await DittoDB.getInstance(); const { kysely } = await DittoDB.getInstance();
const { user_pubkey, server_seckey, relays } = await kysely const { user_pubkey, server_seckey, relays } = await kysely
.selectFrom('nip46_tokens') .selectFrom('nip46_tokens')

View file

@ -19,8 +19,8 @@ import { verifyEventWorker } from '@/workers/verify.ts';
import { nip05Cache } from '@/utils/nip05.ts'; import { nip05Cache } from '@/utils/nip05.ts';
import { updateStats } from '@/utils/stats.ts'; import { updateStats } from '@/utils/stats.ts';
import { getTagSet } from '@/utils/tags.ts'; import { getTagSet } from '@/utils/tags.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { getAmount } from '@/utils/bolt11.ts'; import { getAmount } from '@/utils/bolt11.ts';
import { DittoTables } from '@/db/DittoTables.ts';
const debug = Debug('ditto:pipeline'); const debug = Debug('ditto:pipeline');
@ -54,7 +54,7 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
throw new RelayError('blocked', 'user is disabled'); throw new RelayError('blocked', 'user is disabled');
} }
const kysely = await DittoDB.getInstance(); const { kysely } = await DittoDB.getInstance();
await Promise.all([ await Promise.all([
storeEvent(event, signal), storeEvent(event, signal),
@ -106,7 +106,7 @@ async function existsInDB(event: DittoEvent): Promise<boolean> {
async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<void> { async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<void> {
await hydrateEvents({ events: [event], store: await Storages.db(), signal }); await hydrateEvents({ events: [event], store: await Storages.db(), signal });
const kysely = await DittoDB.getInstance(); const { kysely } = await DittoDB.getInstance();
const domain = await kysely const domain = await kysely
.selectFrom('pubkey_domains') .selectFrom('pubkey_domains')
.select('domain') .select('domain')
@ -120,7 +120,7 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<voi
async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise<undefined> { async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise<undefined> {
if (NKinds.ephemeral(event.kind)) return; if (NKinds.ephemeral(event.kind)) return;
const store = await Storages.db(); const store = await Storages.db();
const kysely = await DittoDB.getInstance(); const { kysely } = await DittoDB.getInstance();
await updateStats({ event, store, kysely }).catch(debug); await updateStats({ event, store, kysely }).catch(debug);
await store.event(event, { signal }); await store.event(event, { signal });
@ -148,7 +148,7 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise<vo
// Track pubkey domain. // Track pubkey domain.
try { try {
const kysely = await DittoDB.getInstance(); const { kysely } = await DittoDB.getInstance();
const { domain } = parseNip05(nip05); const { domain } = parseNip05(nip05);
await sql` await sql`

View file

@ -20,8 +20,7 @@ export class Storages {
public static async db(): Promise<EventsDB> { public static async db(): Promise<EventsDB> {
if (!this._db) { if (!this._db) {
this._db = (async () => { this._db = (async () => {
const kysely = await DittoDB.getInstance(); const { store } = await DittoDB.getInstance();
const store = new EventsDB(kysely);
await seedZapSplits(store); await seedZapSplits(store);
return store; return store;
})(); })();

View file

@ -18,7 +18,7 @@ interface HydrateOpts {
/** Hydrate events using the provided storage. */ /** Hydrate events using the provided storage. */
async function hydrateEvents(opts: HydrateOpts): Promise<DittoEvent[]> { async function hydrateEvents(opts: HydrateOpts): Promise<DittoEvent[]> {
const { events, store, signal, kysely = await DittoDB.getInstance() } = opts; const { events, store, signal, kysely = (await DittoDB.getInstance()).kysely } = opts;
if (!events.length) { if (!events.length) {
return events; return events;
@ -59,8 +59,8 @@ async function hydrateEvents(opts: HydrateOpts): Promise<DittoEvent[]> {
} }
const stats = { const stats = {
authors: await gatherAuthorStats(cache, kysely), authors: await gatherAuthorStats(cache, kysely as Kysely<DittoTables>),
events: await gatherEventStats(cache, kysely), events: await gatherEventStats(cache, kysely as Kysely<DittoTables>),
}; };
// Dedupe events. // Dedupe events.

View file

@ -1,10 +1,10 @@
import { NPostgresSchema } from '@nostrify/db';
import { NostrFilter } from '@nostrify/nostrify'; import { NostrFilter } from '@nostrify/nostrify';
import { Stickynotes } from '@soapbox/stickynotes'; import { Stickynotes } from '@soapbox/stickynotes';
import { Kysely } from 'kysely'; import { Kysely } from 'kysely';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { DittoDB } from '@/db/DittoDB.ts'; import { DittoDB } from '@/db/DittoDB.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { handleEvent } from '@/pipeline.ts'; import { handleEvent } from '@/pipeline.ts';
import { AdminSigner } from '@/signers/AdminSigner.ts'; import { AdminSigner } from '@/signers/AdminSigner.ts';
import { Time } from '@/utils/time.ts'; import { Time } from '@/utils/time.ts';
@ -14,31 +14,53 @@ const console = new Stickynotes('ditto:trends');
/** Get trending tag values for a given tag in the given time frame. */ /** Get trending tag values for a given tag in the given time frame. */
export async function getTrendingTagValues( export async function getTrendingTagValues(
/** Kysely instance to execute queries on. */ /** Kysely instance to execute queries on. */
kysely: Kysely<DittoTables>, kysely: Kysely<NPostgresSchema>,
/** Tag name to filter by, eg `t` or `r`. */ /** Tag name to filter by, eg `t` or `r`. */
tagNames: string[], tagNames: string[],
/** Filter of eligible events. */ /** Filter of eligible events. */
filter: NostrFilter, filter: NostrFilter,
): Promise<{ value: string; authors: number; uses: number }[]> { ): Promise<{ value: string; authors: number; uses: number }[]> {
/*
SELECT
LOWER(element.value) AS value,
COUNT(DISTINCT nostr_events.pubkey) AS authors,
COUNT(*) as "uses"
FROM
nostr_events,
jsonb_each_text(nostr_events.tags_index) kv,
jsonb_array_elements_text(kv.value::jsonb) element
WHERE
kv.key = 't'
AND nostr_events.kind = 1
AND nostr_events.created_at >= 1723325796
AND nostr_events.created_at <= 1723412196
GROUP BY
LOWER(element.value)
ORDER BY
COUNT(DISTINCT nostr_events.pubkey) DESC
LIMIT 20;
*/
let query = kysely let query = kysely
.selectFrom('nostr_tags') .selectFrom((eb) => [
'nostr_events',
eb.from('jsonb_each_text', ['nostr_events.tags_index'], 'kv'),
eb.from('jsonb_array_elements_text', ['kv.value::jsonb'], 'element'),
])
.select(({ fn }) => [ .select(({ fn }) => [
'nostr_tags.value', fn('lower', ['element.value']).as('value'),
fn.agg<number>('count', ['nostr_tags.pubkey']).distinct().as('authors'),
fn.countAll<number>().as('uses'),
]) ])
.where('nostr_tags.name', 'in', tagNames) .where('nostr_tags.name', 'in', tagNames)
.groupBy('nostr_tags.value') .groupBy('nostr_tags.value')
.orderBy((c) => c.fn.agg('count', ['nostr_tags.pubkey']).distinct(), 'desc'); .orderBy((c) => c.fn.agg('count', ['nostr_tags.pubkey']).distinct(), 'desc');
if (filter.kinds) { if (filter.kinds) {
query = query.where('nostr_tags.kind', 'in', filter.kinds); query = query.where('kind', 'in', filter.kinds);
} }
if (typeof filter.since === 'number') { if (typeof filter.since === 'number') {
query = query.where('nostr_tags.created_at', '>=', filter.since); query = query.where('created_at', '>=', filter.since);
} }
if (typeof filter.until === 'number') { if (typeof filter.until === 'number') {
query = query.where('nostr_tags.created_at', '<=', filter.until); query = query.where('created_at', '<=', filter.until);
} }
if (typeof filter.limit === 'number') { if (typeof filter.limit === 'number') {
query = query.limit(filter.limit); query = query.limit(filter.limit);