From bac9a905dad601e21c3f158aa297f2c367d1fc2d Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 31 Jul 2024 12:52:03 -0500 Subject: [PATCH] Use a vendored NDatabase with Postgres ANY clauses --- src/storages/EventsDB.ts | 3 +- src/storages/NDatabase.ts | 637 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 639 insertions(+), 1 deletion(-) create mode 100644 src/storages/NDatabase.ts diff --git a/src/storages/EventsDB.ts b/src/storages/EventsDB.ts index abf076c7..88ff50e5 100644 --- a/src/storages/EventsDB.ts +++ b/src/storages/EventsDB.ts @@ -1,6 +1,6 @@ // deno-lint-ignore-file require-await -import { NDatabase, NIP50, NKinds, NostrEvent, NostrFilter, NSchema as n, NStore } from '@nostrify/nostrify'; +import { NIP50, NKinds, NostrEvent, NostrFilter, NSchema as n, NStore } from '@nostrify/nostrify'; import { Stickynotes } from '@soapbox/stickynotes'; import { Kysely } from 'kysely'; import { nip27 } from 'nostr-tools'; @@ -10,6 +10,7 @@ import { DittoTables } from '@/db/DittoTables.ts'; import { dbEventCounter, dbQueryCounter } from '@/metrics.ts'; import { RelayError } from '@/RelayError.ts'; import { purifyEvent } from '@/storages/hydrate.ts'; +import { NDatabase } from '@/storages/NDatabase.ts'; import { isNostrId, isURL } from '@/utils.ts'; import { abortError } from '@/utils/abort.ts'; diff --git a/src/storages/NDatabase.ts b/src/storages/NDatabase.ts new file mode 100644 index 00000000..082ee133 --- /dev/null +++ b/src/storages/NDatabase.ts @@ -0,0 +1,637 @@ +import { NKinds, NostrEvent, NostrFilter, NStore } from '@nostrify/nostrify'; +import { Kysely, type SelectQueryBuilder, sql } from 'kysely'; +import { getFilterLimit } from 'nostr-tools'; + +/** Kysely database schema for Nostr. */ +export interface NDatabaseSchema { + nostr_events: { + id: string; + kind: number; + pubkey: string; + content: string; + created_at: number; + tags: string; + sig: string; + }; + nostr_tags: { + event_id: string; + name: string; + value: string; + kind: number; + pubkey: string; + created_at: number; + }; + nostr_fts5: { + event_id: string; + content: string; + }; + nostr_pgfts: { + event_id: string; + search_vec: unknown; + }; +} + +export interface NDatabaseOpts { + /** Enable full-text-search for Postgres or SQLite. Disabled by default. */ + fts?: 'sqlite' | 'postgres'; + /** + * Function that returns which tags to index so tag queries like `{ "#p": ["123"] }` will work. + * By default, all single-letter tags are indexed. + */ + indexTags?(event: NostrEvent): string[][]; + /** + * Build a search index from the event. + * By default, only kinds 0 and 1 events are indexed for search, and the search text is the event content with tag values appended to it. + * Only applicable if `fts5` is `true`. + */ + searchText?(event: NostrEvent): string | undefined; + /** Strategy to use for handling the `timeout` opt. */ + timeoutStrategy?: 'setStatementTimeout' | undefined; +} + +/** + * SQLite database storage adapter for Nostr events. + * It uses [Kysely](https://kysely.dev/) to make queries, making it flexible for a variety of use-cases. + * + * ```ts + * // Create a Kysely instance. + * const kysely = new Kysely({ + * dialect: new DenoSqliteDialect({ + * database: new Sqlite('./db.sqlite3'), + * }), + * }); + * + * // Pass Kysely into the constructor. + * const db = new NDatabase(kysely); + * + * // Migrate the database before use. + * await db.migrate(); + * + * // Now it's just a regular storage. + * await db.event(event1); + * await db.event(event2); + * const events = await db.query([{ kinds: [1] }]); + * ``` + */ +export class NDatabase implements NStore { + private db: Kysely; + private fts?: 'sqlite' | 'postgres'; + private indexTags: (event: NostrEvent) => string[][]; + private searchText: (event: NostrEvent) => string | undefined; + private timeoutStrategy: 'setStatementTimeout' | undefined; + + constructor(db: Kysely, opts?: NDatabaseOpts) { + this.db = db as Kysely; + this.fts = opts?.fts; + this.timeoutStrategy = opts?.timeoutStrategy; + this.indexTags = opts?.indexTags ?? NDatabase.indexTags; + this.searchText = opts?.searchText ?? NDatabase.searchText; + } + + /** Default tag index function. */ + static indexTags(event: NostrEvent): string[][] { + return event.tags.filter(([name, value]) => name.length === 1 && value && value.length < 200); + } + + /** Default search content builder. */ + static searchText(event: NostrEvent): string | undefined { + if (event.kind === 0 || event.kind === 1) { + return `${event.content} ${event.tags.map(([_name, value]) => value).join(' ')}`.substring(0, 1000); + } + } + + /** Insert an event (and its tags) into the database. */ + async event(event: NostrEvent, opts: { signal?: AbortSignal; timeout?: number } = {}): Promise { + if (NKinds.ephemeral(event.kind)) return; + + if (await this.isDeleted(event)) { + throw new Error('Cannot add a deleted event'); + } + return await NDatabase.trx(this.db, (trx) => { + return this.withTimeout(trx, async (trx) => { + await Promise.all([ + this.deleteEvents(trx, event), + this.replaceEvents(trx, event), + ]); + await this.insertEvent(trx, event); + await Promise.all([ + this.insertTags(trx, event), + this.indexSearch(trx, event), + ]); + }, opts.timeout); + }).catch((error) => { + // Don't throw for duplicate events. + if (error.message.includes('UNIQUE constraint failed')) { + return; + } else { + throw error; + } + }); + } + + /** Check if an event has been deleted. */ + protected async isDeleted(event: NostrEvent): Promise { + const filters: NostrFilter[] = [ + { kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }, + ]; + + if (NKinds.replaceable(event.kind) || NKinds.parameterizedReplaceable(event.kind)) { + const d = event.tags.find(([tag]) => tag === 'd')?.[1] ?? ''; + + filters.push({ + kinds: [5], + authors: [event.pubkey], + '#a': [`${event.kind}:${event.pubkey}:${d}`], + since: event.created_at, + limit: 1, + }); + } + + const events = await this.query(filters); + return events.length > 0; + } + + /** Delete events referenced by kind 5. */ + protected async deleteEvents(db: Kysely, event: NostrEvent): Promise { + if (event.kind === 5) { + const ids = new Set(event.tags.filter(([name]) => name === 'e').map(([_name, value]) => value)); + const addrs = new Set(event.tags.filter(([name]) => name === 'a').map(([_name, value]) => value)); + + const filters: NostrFilter[] = []; + + if (ids.size) { + filters.push({ ids: [...ids], authors: [event.pubkey] }); + } + + for (const addr of addrs) { + const [k, pubkey, d] = addr.split(':'); + const kind = Number(k); + + if (pubkey !== event.pubkey) continue; + if (!(Number.isInteger(kind) && kind >= 0)) continue; + if (d === undefined) continue; + + const filter: NostrFilter = { + kinds: [kind], + authors: [event.pubkey], + until: event.created_at, + }; + + if (d) { + filter['#d'] = [d]; + } + + filters.push(filter); + } + + if (filters.length) { + await this.removeEvents(db, filters); + } + } + } + + /** Replace events in NIP-01 replaceable ranges with the same kind and pubkey. */ + protected async replaceEvents(trx: Kysely, event: NostrEvent): Promise { + if (NKinds.replaceable(event.kind)) { + await this.deleteReplaced( + trx, + event, + { kinds: [event.kind], authors: [event.pubkey] }, + (event, prevEvent) => event.created_at > prevEvent.created_at, + 'Cannot replace an event with an older event', + ); + } + + if (NKinds.parameterizedReplaceable(event.kind)) { + const d = event.tags.find(([tag]) => tag === 'd')?.[1]; + if (d) { + await this.deleteReplaced( + trx, + event, + { kinds: [event.kind], authors: [event.pubkey], '#d': [d] }, + (event, prevEvent) => event.created_at > prevEvent.created_at, + 'Cannot replace an event with an older event', + ); + } + } + } + + /** Insert the event into the database. */ + protected async insertEvent(trx: Kysely, event: NostrEvent): Promise { + await trx.insertInto('nostr_events') + .values({ ...event, tags: JSON.stringify(event.tags) }) + .execute(); + } + + /** Insert event tags depending on the event and settings. */ + protected async insertTags(trx: Kysely, event: NostrEvent): Promise { + const { id, kind, pubkey, created_at } = event; + + const tags = this.indexTags(event); + const rows = tags.map(([name, value]) => ({ event_id: id, name, value, kind, pubkey, created_at })); + + if (!tags.length) return; + await trx.insertInto('nostr_tags') + .values(rows) + .execute(); + } + + /** Add search data to the FTS5 table. */ + protected async indexSearch(trx: Kysely, event: NostrEvent): Promise { + if (!this.fts) return; + + const content = this.searchText(event); + if (!content) return; + + if (this.fts === 'sqlite') { + await trx.insertInto('nostr_fts5') + .values({ event_id: event.id, content }) + .execute(); + } + + if (this.fts === 'postgres') { + await trx.insertInto('nostr_pgfts') + .values({ + event_id: event.id, + search_vec: sql`to_tsvector(${content})`, + }) + .execute(); + } + } + + /** Delete events that are replaced by the new event. */ + protected async deleteReplaced( + trx: Kysely, + event: NostrEvent, + filter: NostrFilter, + replaces: (event: NostrEvent, prevEvent: NDatabaseSchema['nostr_events']) => boolean, + error: string, + ): Promise { + const prevEvents = await this.getFilterQuery(trx, filter).execute(); + for (const prevEvent of prevEvents) { + if (!replaces(event, prevEvent)) { + throw new Error(error); + } + } + await this.removeEvents(trx, [filter]); + } + + /** Build the query for a filter. */ + protected getFilterQuery( + trx: Kysely, + filter: NostrFilter, + ): SelectQueryBuilder { + let query = trx + .selectFrom('nostr_events') + .selectAll('nostr_events') + .orderBy('nostr_events.created_at', 'desc') + .orderBy('nostr_events.id', 'asc'); + + if (filter.ids) { + query = query.where('nostr_events.id', '=', (eb) => eb.fn.any(eb.val(filter.ids))); + } + if (filter.kinds) { + query = query.where('nostr_events.kind', '=', (eb) => eb.fn.any(eb.val(filter.kinds))); + } + if (filter.authors) { + query = query.where('nostr_events.pubkey', '=', (eb) => eb.fn.any(eb.val(filter.authors))); + } + if (typeof filter.since === 'number') { + query = query.where('nostr_events.created_at', '>=', filter.since); + } + if (typeof filter.until === 'number') { + query = query.where('nostr_events.created_at', '<=', filter.until); + } + if (typeof filter.limit === 'number') { + query = query.limit(filter.limit); + } + + if (filter.search) { + if (this.fts === 'sqlite') { + query = query + .innerJoin('nostr_fts5', 'nostr_fts5.event_id', 'nostr_events.id') + .where('nostr_fts5.content', 'match', JSON.stringify(filter.search)); + } + + if (this.fts === 'postgres') { + query = query + .innerJoin('nostr_pgfts', 'nostr_pgfts.event_id', 'nostr_events.id') + .where(sql`phraseto_tsquery(${filter.search})`, '@@', sql`search_vec`); + } + + if (!this.fts) { + return trx.selectFrom('nostr_events').selectAll('nostr_events').where('nostr_events.id', '=', null); + } + } + + const tagSubqueries = Object.entries(filter).reduce( + (acc, [key, value]) => { + if (key.startsWith('#') && Array.isArray(value)) { + const name = key.replace(/^#/, ''); + + let subquery = trx + .selectFrom('nostr_tags') + .select(['nostr_tags.event_id', 'nostr_tags.created_at']) + .distinct() + .where('nostr_tags.name', '=', name) + .where('nostr_tags.value', '=', (eb) => eb.fn.any(eb.val(value))) + .orderBy('nostr_tags.created_at', 'desc') + .orderBy('nostr_tags.event_id', 'asc'); + + if (filter.ids) { + subquery = subquery.where('nostr_tags.event_id', '=', (eb) => eb.fn.any(eb.val(filter.ids))); + } + if (filter.kinds) { + subquery = subquery.where('nostr_tags.kind', '=', (eb) => eb.fn.any(eb.val(filter.kinds))); + } + if (filter.authors) { + subquery = subquery.where('nostr_tags.pubkey', '=', (eb) => eb.fn.any(eb.val(filter.authors))); + } + if (typeof filter.since === 'number') { + subquery = subquery.where('nostr_tags.created_at', '>=', filter.since); + } + if (typeof filter.until === 'number') { + subquery = subquery.where('nostr_tags.created_at', '<=', filter.until); + } + + acc.push(subquery); + } + return acc; + }, + [] as SelectQueryBuilder[], + ); + + if (tagSubqueries.length) { + const tagSubquery = trx.selectFrom(() => + tagSubqueries + .map((query) => trx.selectFrom(() => query.as('nostr_tags')).selectAll('nostr_tags')) + .reduce((result, query) => result.intersect(query)) + .as('nostr_tags') + ) + .select(['nostr_tags.event_id', 'nostr_tags.created_at']); + + let tagQuery = trx + .selectFrom('nostr_events') + .selectAll('nostr_events') + .where('nostr_events.id', 'in', (eb) => { + let subquery = trx + .selectFrom(() => tagSubquery.as('nostr_tags')) + .select(['nostr_tags.event_id', 'nostr_tags.created_at']) + .distinct() + .orderBy('nostr_tags.created_at', 'desc') + .orderBy('nostr_tags.event_id', 'asc'); + + if (typeof filter.limit === 'number') { + subquery = subquery.limit(filter.limit); + } + + return eb + .selectFrom(subquery.as('nostr_tags')) + .select('nostr_tags.event_id'); + }) + .orderBy('nostr_events.created_at', 'desc') + .orderBy('nostr_events.id', 'asc'); + + if (typeof filter.limit === 'number') { + tagQuery = tagQuery.limit(filter.limit); + } + + return tagQuery; + } + + return query; + } + + /** Combine filter queries into a single union query. */ + protected getEventsQuery( + trx: Kysely, + filters: NostrFilter[], + ): SelectQueryBuilder { + return filters + .map((filter) => + trx + .selectFrom(() => this.getFilterQuery(trx, filter).as('nostr_events')) + .selectAll() + ) + .reduce((result, query) => result.unionAll(query)); + } + + /** Get events for filters from the database. */ + async query( + filters: NostrFilter[], + opts: { timeout?: number; signal?: AbortSignal; limit?: number } = {}, + ): Promise { + filters = this.normalizeFilters(filters); + + if (!filters.length) { + return []; + } + + return await this.withTimeout(this.db, async (trx) => { + let query = this.getEventsQuery(trx, filters); + + if (typeof opts.limit === 'number') { + query = query.limit(opts.limit); + } + + const events = (await query.execute()).map((row) => { + return { + id: row.id, + kind: row.kind, + pubkey: row.pubkey, + content: row.content, + created_at: row.created_at, + tags: JSON.parse(row.tags), + sig: row.sig, + }; + }); + + return events; + }, opts.timeout); + } + + /** Normalize the `limit` of each filter, and remove filters that can't produce any events. */ + protected normalizeFilters(filters: NostrFilter[]): NostrFilter[] { + return filters.reduce((acc, filter) => { + const limit = getFilterLimit(filter); + if (limit > 0) { + acc.push(limit === Infinity ? filter : { ...filter, limit }); + } + return acc; + }, []); + } + + /** Remove events from the database. */ + protected async removeEvents(db: Kysely, filters: NostrFilter[]): Promise { + return await NDatabase.trx(db, async (trx) => { + const query = this.getEventsQuery(trx, filters).clearSelect().select('id'); + + if (this.fts === 'sqlite') { + await trx.deleteFrom('nostr_fts5') + .where('nostr_fts5.event_id', 'in', () => query) + .execute(); + } + + if (this.fts === 'postgres') { + await trx.deleteFrom('nostr_pgfts') + .where('nostr_pgfts.event_id', 'in', () => query) + .execute(); + } + + await trx.deleteFrom('nostr_events') + .where('nostr_events.id', 'in', () => query) + .execute(); + }); + } + + /** Delete events based on filters from the database. */ + async remove(filters: NostrFilter[], opts: { signal?: AbortSignal; timeout?: number } = {}): Promise { + await this.withTimeout(this.db, (trx) => this.removeEvents(trx, filters), opts.timeout); + } + + /** Get number of events that would be returned by filters. */ + async count( + filters: NostrFilter[], + opts: { signal?: AbortSignal; timeout?: number } = {}, + ): Promise<{ count: number; approximate: false }> { + return await this.withTimeout(this.db, async (trx) => { + const query = this.getEventsQuery(trx, filters); + const [{ count }] = await query + .clearSelect() + .select((eb) => eb.fn.count('nostr_events.id').as('count')) + .execute(); + + return { + count: Number(count), + approximate: false, + }; + }, opts.timeout); + } + + /** Execute NDatabase functions in a transaction. */ + async transaction(callback: (store: NDatabase, kysely: Kysely) => Promise): Promise { + await NDatabase.trx(this.db, async (trx) => { + const store = new NDatabase(trx as Kysely, { + fts: this.fts, + indexTags: this.indexTags, + searchText: this.searchText, + }); + + await callback(store, trx); + }); + } + + /** Execute the callback in a new transaction, unless the Kysely instance is already a transaction. */ + private static async trx( + db: Kysely, + callback: (trx: Kysely) => Promise, + ): Promise { + if (db.isTransaction) { + return await callback(db); + } else { + return await db.transaction().execute((trx) => callback(trx)); + } + } + + /** Maybe execute the callback in a transaction with a timeout, if a timeout is provided. */ + private async withTimeout( + db: Kysely, + callback: (trx: Kysely) => Promise, + timeout: number | undefined, + ): Promise { + if (typeof timeout === 'number') { + return await NDatabase.trx(db, async (trx) => { + await this.setTimeout(trx, timeout); + return await callback(trx); + }); + } else { + return await callback(db); + } + } + + /** Set a timeout in the current database transaction, if applicable. */ + private async setTimeout(trx: Kysely, timeout: number): Promise { + switch (this.timeoutStrategy) { + case 'setStatementTimeout': + await this.setLocal(trx, 'statement_timeout', timeout); + } + } + + /** Set a local variable in the current database transaction (only works with Postgres). */ + private async setLocal(trx: Kysely, key: string, value: string | number): Promise { + await sql`set local ${sql.raw(key)} = ${sql.raw(value.toString())}`.execute(trx); + } + + /** Migrate the database schema. */ + async migrate(): Promise { + const schema = this.db.schema; + + await schema + .createTable('nostr_events') + .ifNotExists() + .addColumn('id', 'text', (col) => col.primaryKey()) + .addColumn('kind', 'integer', (col) => col.notNull()) + .addColumn('pubkey', 'text', (col) => col.notNull()) + .addColumn('content', 'text', (col) => col.notNull()) + .addColumn('created_at', 'integer', (col) => col.notNull()) + .addColumn('tags', 'text', (col) => col.notNull()) + .addColumn('sig', 'text', (col) => col.notNull()) + .execute(); + + await schema + .createTable('nostr_tags') + .ifNotExists() + .addColumn('event_id', 'text', (col) => col.notNull().references('nostr_events.id').onDelete('cascade')) + .addColumn('name', 'text', (col) => col.notNull()) + .addColumn('value', 'text', (col) => col.notNull()) + .addColumn('kind', 'integer', (col) => col.notNull()) + .addColumn('pubkey', 'text', (col) => col.notNull()) + .addColumn('created_at', 'integer', (col) => col.notNull()) + .execute(); + + await schema + .createIndex('nostr_events_kind') + .on('nostr_events') + .ifNotExists() + .columns(['created_at desc', 'id asc', 'kind', 'pubkey']) + .execute(); + await schema + .createIndex('nostr_events_pubkey') + .on('nostr_events') + .ifNotExists() + .columns(['created_at desc', 'id asc', 'pubkey', 'kind']) + .execute(); + + await schema + .createIndex('nostr_tags_kind') + .on('nostr_tags') + .ifNotExists() + .columns(['created_at desc', 'event_id asc', 'value', 'name', 'kind', 'pubkey']) + .execute(); + await schema + .createIndex('nostr_tags_pubkey') + .on('nostr_tags') + .ifNotExists() + .columns(['created_at desc', 'event_id asc', 'value', 'name', 'pubkey', 'kind']) + .execute(); + + if (this.fts === 'sqlite') { + await sql`CREATE VIRTUAL TABLE nostr_fts5 USING fts5(event_id, content)`.execute(this.db); + } + + if (this.fts === 'postgres') { + await schema.createTable('nostr_pgfts') + .ifNotExists() + .addColumn('event_id', 'text', (c) => c.primaryKey().references('nostr_events.id').onDelete('cascade')) + .addColumn('search_vec', sql`tsvector`, (c) => c.notNull()) + .execute(); + + await schema.createIndex('nostr_pgfts_gin_search_vec') + .ifNotExists() + .on('nostr_pgfts') + .using('gin') + .column('search_vec') + .execute(); + } + } +}