diff --git a/packages/ditto/storages.ts b/packages/ditto/storages.ts index 4bd7fa30..c99e4252 100644 --- a/packages/ditto/storages.ts +++ b/packages/ditto/storages.ts @@ -44,8 +44,8 @@ export class Storages { public static async db(): Promise { if (!this._db) { this._db = (async () => { - const kysely = await this.kysely(); - const store = new DittoPgStore({ kysely, pubkey: Conf.pubkey, timeout: Conf.db.timeouts.default }); + const db = await this.database(); + const store = new DittoPgStore({ db, pubkey: Conf.pubkey, timeout: Conf.db.timeouts.default }); await seedZapSplits(store); return store; })(); diff --git a/packages/ditto/storages/DittoPgStore.test.ts b/packages/ditto/storages/DittoPgStore.test.ts index b74e91b9..3d2ee611 100644 --- a/packages/ditto/storages/DittoPgStore.test.ts +++ b/packages/ditto/storages/DittoPgStore.test.ts @@ -7,6 +7,26 @@ import { Conf } from '@/config.ts'; import { DittoPgStore } from '@/storages/DittoPgStore.ts'; import { createTestDB } from '@/test.ts'; +Deno.test('req streaming', async () => { + await using db = await createTestDB({ pure: true }); + const { store: relay } = db; + + const event1 = await eventFixture('event-1'); + + const promise = new Promise((resolve) => setTimeout(() => resolve(relay.event(event1)), 0)); + + for await (const msg of relay.req([{ since: 0 }])) { + if (msg[0] === 'EVENT') { + assertEquals(relay.subs.size, 1); + assertEquals(msg[2], event1); + break; + } + } + + await promise; + assertEquals(relay.subs.size, 0); // cleanup +}); + Deno.test('count filters', async () => { await using db = await createTestDB({ pure: true }); const { store } = db; diff --git a/packages/ditto/storages/DittoPgStore.ts b/packages/ditto/storages/DittoPgStore.ts index 36040f4c..4c966c8a 100644 --- a/packages/ditto/storages/DittoPgStore.ts +++ b/packages/ditto/storages/DittoPgStore.ts @@ -1,16 +1,27 @@ // deno-lint-ignore-file require-await -import { DittoTables } from '@ditto/db'; +import { DittoDatabase, DittoTables } from '@ditto/db'; import { detectLanguage } from '@ditto/lang'; import { NPostgres, NPostgresSchema } from '@nostrify/db'; -import { dbEventsCounter } from '@ditto/metrics'; -import { NIP50, NKinds, NostrEvent, NostrFilter, NSchema as n } from '@nostrify/nostrify'; +import { dbEventsCounter, internalSubscriptionsSizeGauge } from '@ditto/metrics'; +import { + NIP50, + NKinds, + NostrEvent, + NostrFilter, + NostrRelayCLOSED, + NostrRelayEOSE, + NostrRelayEVENT, + NSchema as n, +} from '@nostrify/nostrify'; +import { Machina } from '@nostrify/nostrify/utils'; import { logi } from '@soapbox/logi'; import { JsonValue } from '@std/json'; import { LanguageCode } from 'iso-639-1'; import { Kysely } from 'kysely'; import linkify from 'linkifyjs'; -import { nip27 } from 'nostr-tools'; +import { LRUCache } from 'lru-cache'; +import { matchFilter, nip27 } from 'nostr-tools'; import tldts from 'tldts'; import { z } from 'zod'; @@ -36,20 +47,25 @@ interface TagConditionOpts { value: string; } -/** Options for the DittoPgStore store. */ +/** Options for the EventsDB store. */ interface DittoPgStoreOpts { /** Kysely instance to use. */ - kysely: Kysely; + db: DittoDatabase; /** Pubkey of the admin account. */ pubkey: string; /** Timeout in milliseconds for database queries. */ timeout: number; /** Whether the event returned should be a Nostr event or a Ditto event. Defaults to false. */ pure?: boolean; + /** Chunk size for streaming events. Defaults to 100. */ + chunkSize?: number; } /** SQL database storage adapter for Nostr events. */ -class DittoPgStore extends NPostgres { +export class DittoPgStore extends NPostgres { + readonly subs = new Map }>(); + readonly encounters = new LRUCache({ max: 100 }); + /** Conditions for when to index certain tags. */ static tagConditions: Record = { 'a': ({ count }) => count < 15, @@ -72,65 +88,33 @@ class DittoPgStore extends NPostgres { }, }; - static indexExtensions(event: NostrEvent): Record { - const ext: Record = {}; - - if (event.kind === 1) { - ext.reply = event.tags.some(([name]) => name === 'e').toString(); - } else if (event.kind === 1111) { - ext.reply = event.tags.some(([name]) => ['e', 'E'].includes(name)).toString(); - } else if (event.kind === 6) { - ext.reply = 'false'; - } - - if ([1, 20, 30023].includes(event.kind)) { - const language = detectLanguage(event.content, 0.90); - - if (language) { - ext.language = language; - } - } - - const imeta: string[][][] = event.tags - .filter(([name]) => name === 'imeta') - .map(([_, ...entries]) => - entries.map((entry) => { - const split = entry.split(' '); - return [split[0], split.splice(1).join(' ')]; - }) - ); - - // quirks mode - if (!imeta.length && event.kind === 1) { - const links = linkify.find(event.content).filter(({ type }) => type === 'url'); - imeta.push(...getMediaLinks(links)); - } - - if (imeta.length) { - ext.media = 'true'; - - if (imeta.every((tags) => tags.some(([name, value]) => name === 'm' && value.startsWith('video/')))) { - ext.video = 'true'; - } - } - - ext.protocol = event.tags.find(([name]) => name === 'proxy')?.[2] ?? 'nostr'; - - return ext; - } - constructor(private opts: DittoPgStoreOpts) { - super(opts.kysely, { + super(opts.db.kysely, { indexTags: DittoPgStore.indexTags, indexSearch: DittoPgStore.searchText, indexExtensions: DittoPgStore.indexExtensions, + chunkSize: opts.chunkSize, + }); + + opts.db.listen('nostr_event', async (id) => { + if (this.encounters.has(id)) return; + this.encounters.set(id, true); + + const [event] = await this.query([{ ids: [id] }]); + + if (event) { + this.streamOut(event); + } }); } /** Insert an event (and its tags) into the database. */ override async event(event: NostrEvent, opts: { signal?: AbortSignal; timeout?: number } = {}): Promise { event = purifyEvent(event); + logi({ level: 'debug', ns: 'ditto.event', source: 'db', id: event.id, kind: event.kind }); + + this.encounters.set(event.id, true); dbEventsCounter.inc({ kind: event.kind }); if (await this.isDeletedAdmin(event)) { @@ -141,6 +125,7 @@ class DittoPgStore extends NPostgres { try { await super.event(event, { ...opts, timeout: opts.timeout ?? this.opts.timeout }); + this.streamOut(event); } catch (e) { if (e instanceof Error && e.message === 'Cannot add a deleted event') { throw new RelayError('blocked', 'event deleted by user'); @@ -152,6 +137,21 @@ class DittoPgStore extends NPostgres { } } + protected matchesFilter(event: NostrEvent, filter: NostrFilter): boolean { + // TODO: support streaming by search. + return matchFilter(filter, event) && filter.search === undefined; + } + + protected streamOut(event: NostrEvent): void { + for (const { filters, machina } of this.subs.values()) { + for (const filter of filters) { + if (this.matchesFilter(event, filter)) { + machina.push(event); + } + } + } + } + /** Check if an event has been deleted by the admin. */ private async isDeletedAdmin(event: NostrEvent): Promise { const filters: NostrFilter[] = [ @@ -213,10 +213,53 @@ class DittoPgStore extends NPostgres { } } + override async *req( + filters: NostrFilter[], + opts?: { signal?: AbortSignal }, + ): AsyncIterable { + const subId = crypto.randomUUID(); + const normalFilters = this.normalizeFilters(filters); + + if (normalFilters.length) { + const { db, chunkSize = 100 } = this.opts; + const rows = this.getEventsQuery(db.kysely as unknown as Kysely, normalFilters).stream( + chunkSize, + ); + + for await (const row of rows) { + const event = this.parseEventRow(row); + yield ['EVENT', subId, event]; + + if (opts?.signal?.aborted) { + yield ['CLOSED', subId, 'aborted']; + return; + } + } + } + + yield ['EOSE', subId]; + + const machina = new Machina(opts?.signal); + + this.subs.set(subId, { filters, machina }); + internalSubscriptionsSizeGauge.set(this.subs.size); + + try { + for await (const event of machina) { + yield ['EVENT', subId, event]; + } + } catch { + yield ['CLOSED', subId, 'error: something went wrong']; + } finally { + this.subs.delete(subId); + internalSubscriptionsSizeGauge.set(this.subs.size); + } + } + /** Get events for filters from the database. */ override async query( filters: NostrFilter[], - opts: { signal?: AbortSignal; timeout?: number; limit?: number } = {}, + opts: { signal?: AbortSignal; pure?: boolean; timeout?: number; limit?: number } = {}, ): Promise { filters = await this.expandFilters(filters); @@ -334,6 +377,53 @@ class DittoPgStore extends NPostgres { }, []); } + static indexExtensions(event: NostrEvent): Record { + const ext: Record = {}; + + if (event.kind === 1) { + ext.reply = event.tags.some(([name]) => name === 'e').toString(); + } else if (event.kind === 1111) { + ext.reply = event.tags.some(([name]) => ['e', 'E'].includes(name)).toString(); + } else if (event.kind === 6) { + ext.reply = 'false'; + } + + if ([1, 20, 30023].includes(event.kind)) { + const language = detectLanguage(event.content, 0.90); + + if (language) { + ext.language = language; + } + } + + const imeta: string[][][] = event.tags + .filter(([name]) => name === 'imeta') + .map(([_, ...entries]) => + entries.map((entry) => { + const split = entry.split(' '); + return [split[0], split.splice(1).join(' ')]; + }) + ); + + // quirks mode + if (!imeta.length && event.kind === 1) { + const links = linkify.find(event.content).filter(({ type }) => type === 'url'); + imeta.push(...getMediaLinks(links)); + } + + if (imeta.length) { + ext.media = 'true'; + + if (imeta.every((tags) => tags.some(([name, value]) => name === 'm' && value.startsWith('video/')))) { + ext.video = 'true'; + } + } + + ext.protocol = event.tags.find(([name]) => name === 'proxy')?.[2] ?? 'nostr'; + + return ext; + } + /** Build a search index from the event. */ static searchText(event: NostrEvent): string { switch (event.kind) { @@ -385,7 +475,7 @@ class DittoPgStore extends NPostgres { } if (domains.size || hostnames.size) { - let query = this.opts.kysely + let query = this.opts.db.kysely .selectFrom('author_stats') .select('pubkey') .where((eb) => { @@ -433,5 +523,3 @@ class DittoPgStore extends NPostgres { return super.transaction((store, kysely) => callback(store, kysely as unknown as Kysely)); } } - -export { DittoPgStore }; diff --git a/packages/ditto/test.ts b/packages/ditto/test.ts index 3c6a555b..dd1ae6cb 100644 --- a/packages/ditto/test.ts +++ b/packages/ditto/test.ts @@ -34,31 +34,31 @@ export function genEvent(t: Partial = {}, sk: Uint8Array = generateS /** Create a database for testing. It uses `DATABASE_URL`, or creates an in-memory database by default. */ export async function createTestDB(opts?: { pure?: boolean }) { - const { kysely } = DittoDB.create(Conf.databaseUrl, { poolSize: 1 }); + const db = DittoDB.create(Conf.databaseUrl, { poolSize: 1 }); - await DittoDB.migrate(kysely); + await DittoDB.migrate(db.kysely); const store = new DittoPgStore({ - kysely, + db, timeout: Conf.db.timeouts.default, pubkey: Conf.pubkey, pure: opts?.pure ?? false, }); return { + ...db, store, - kysely, [Symbol.asyncDispose]: async () => { const { rows } = await sql< { tablename: string } - >`select tablename from pg_tables where schemaname = current_schema()`.execute(kysely); + >`select tablename from pg_tables where schemaname = current_schema()`.execute(db.kysely); for (const { tablename } of rows) { if (tablename.startsWith('kysely_')) continue; - await sql`truncate table ${sql.ref(tablename)} cascade`.execute(kysely); + await sql`truncate table ${sql.ref(tablename)} cascade`.execute(db.kysely); } - await kysely.destroy(); + await db.kysely.destroy(); }, }; } diff --git a/packages/ditto/workers/policy.worker.ts b/packages/ditto/workers/policy.worker.ts index acf7b2f1..852c24b5 100644 --- a/packages/ditto/workers/policy.worker.ts +++ b/packages/ditto/workers/policy.worker.ts @@ -30,10 +30,10 @@ export class CustomPolicy implements NPolicy { async init({ path, databaseUrl, pubkey }: PolicyInit): Promise { const Policy = (await import(path)).default; - const { kysely } = DittoDB.create(databaseUrl, { poolSize: 1 }); + const db = DittoDB.create(databaseUrl, { poolSize: 1 }); const store = new DittoPgStore({ - kysely, + db, pubkey, timeout: 5_000, });