diff --git a/packages/api/middleware/requireVar.ts b/packages/api/middleware/requireVar.ts deleted file mode 100644 index d1527e9a..00000000 --- a/packages/api/middleware/requireVar.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { HTTPException } from '@hono/hono/http-exception'; - -import type { SetRequired } from 'type-fest'; -import type { DittoEnv } from '../DittoEnv.ts'; -import type { DittoMiddleware } from '../DittoMiddleware.ts'; - -type DittoVars = DittoEnv['Variables']; - -export function requireVar(key: K): DittoMiddleware> { - return (c, next) => { - if (!c.var[key]) { - throw new HTTPException(500, { message: `Missing required variable: ${key}` }); - } - - return next(); - }; -} diff --git a/packages/ditto/storages/EventsDB.test.ts b/packages/ditto/storages/EventsDB.test.ts index ce138b2f..232c85dc 100644 --- a/packages/ditto/storages/EventsDB.test.ts +++ b/packages/ditto/storages/EventsDB.test.ts @@ -7,6 +7,26 @@ import { eventFixture, genEvent } from '@/test.ts'; import { EventsDB } from '@/storages/EventsDB.ts'; import { createTestDB } from '@/test.ts'; +Deno.test('InternalRelay', async () => { + await using db = await createTestDB({ pure: true }); + const { store: relay } = db; + + const event1 = await eventFixture('event-1'); + + const promise = new Promise((resolve) => setTimeout(() => resolve(relay.event(event1)), 0)); + + for await (const msg of relay.req([{}])) { + if (msg[0] === 'EVENT') { + assertEquals(relay.subs.size, 1); + assertEquals(msg[2], event1); + break; + } + } + + await promise; + assertEquals(relay.subs.size, 0); // cleanup +}); + Deno.test('count filters', async () => { await using db = await createTestDB({ pure: true }); const { store } = db; diff --git a/packages/ditto/storages/EventsDB.ts b/packages/ditto/storages/EventsDB.ts index 91ad8d6a..7116a6ec 100644 --- a/packages/ditto/storages/EventsDB.ts +++ b/packages/ditto/storages/EventsDB.ts @@ -1,15 +1,28 @@ // deno-lint-ignore-file require-await -import { DittoTables } from '@ditto/db'; +import { DittoEvent } from '@ditto/api'; +import { DittoDatabase, DittoTables } from '@ditto/db'; +import { detectLanguage } from '@ditto/lang'; import { NPostgres, NPostgresSchema } from '@nostrify/db'; -import { dbEventsCounter } from '@ditto/metrics'; -import { NIP50, NKinds, NostrEvent, NostrFilter, NSchema as n } from '@nostrify/nostrify'; +import { dbEventsCounter, internalSubscriptionsSizeGauge } from '@ditto/metrics'; +import { + NIP50, + NKinds, + NostrEvent, + NostrFilter, + NostrRelayCLOSED, + NostrRelayEOSE, + NostrRelayEVENT, + NSchema as n, +} from '@nostrify/nostrify'; +import { Machina } from '@nostrify/nostrify/utils'; import { logi } from '@soapbox/logi'; import { JsonValue } from '@std/json'; import { LanguageCode } from 'iso-639-1'; import { Kysely } from 'kysely'; import linkify from 'linkifyjs'; -import { nip27 } from 'nostr-tools'; +import { LRUCache } from 'lru-cache'; +import { matchFilter, nip27 } from 'nostr-tools'; import tldts from 'tldts'; import { z } from 'zod'; @@ -17,8 +30,6 @@ import { RelayError } from '@/RelayError.ts'; import { isNostrId } from '@/utils.ts'; import { abortError } from '../../utils/abort.ts'; import { purifyEvent } from '../../utils/purify.ts'; -import { DittoEvent } from '@/interfaces/DittoEvent.ts'; -import { detectLanguage } from '../../utils/language.ts'; import { getMediaLinks } from '../../utils/note.ts'; /** Function to decide whether or not to index a tag. */ @@ -37,30 +48,35 @@ interface TagConditionOpts { } /** Options for the EventsDB store. */ -interface EventsDBOpts { +interface DittoPgStoreOpts { /** Kysely instance to use. */ - kysely: Kysely; + db: DittoDatabase; /** Pubkey of the admin account. */ pubkey: string; /** Timeout in milliseconds for database queries. */ timeout: number; /** Whether the event returned should be a Nostr event or a Ditto event. Defaults to false. */ pure?: boolean; + /** Chunk size for streaming events. Defaults to 100. */ + chunkSize?: number; } /** SQL database storage adapter for Nostr events. */ -class EventsDB extends NPostgres { +export class DittoPgStore extends NPostgres { + readonly subs = new Map }>(); + readonly encounters = new LRUCache({ max: 1000 }); + /** Conditions for when to index certain tags. */ static tagConditions: Record = { 'a': ({ count }) => count < 15, 'd': ({ event, count }) => count === 0 && NKinds.parameterizedReplaceable(event.kind), - 'e': EventsDB.eTagCondition, + 'e': DittoPgStore.eTagCondition, 'k': ({ count, value }) => count === 0 && Number.isInteger(Number(value)), 'L': ({ event, count }) => event.kind === 1985 || count === 0, 'l': ({ event, count }) => event.kind === 1985 || count === 0, 'n': ({ count, value }) => count < 50 && value.length < 50, 'P': ({ count, value }) => count === 0 && isNostrId(value), - 'p': EventsDB.pTagCondition, + 'p': DittoPgStore.pTagCondition, 'proxy': ({ count, value }) => count === 0 && value.length < 256, 'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value), 'r': ({ event, count }) => (event.kind === 1985 ? count < 20 : count < 3), @@ -72,65 +88,48 @@ class EventsDB extends NPostgres { }, }; - static indexExtensions(event: NostrEvent): Record { - const ext: Record = {}; + constructor(private opts: DittoPgStoreOpts) { + super(opts.db.kysely, { + indexTags: DittoPgStore.indexTags, + indexSearch: DittoPgStore.searchText, + indexExtensions: DittoPgStore.indexExtensions, + chunkSize: opts.chunkSize, + }); - if (event.kind === 1) { - ext.reply = event.tags.some(([name]) => name === 'e').toString(); - } else if (event.kind === 1111) { - ext.reply = event.tags.some(([name]) => ['e', 'E'].includes(name)).toString(); - } else if (event.kind === 6) { - ext.reply = 'false'; - } + opts.db.listen('nostr_event', async (id) => { + if (this.encounters.has(id)) return; + this.encounters.set(id, true); - if ([1, 20, 30023].includes(event.kind)) { - const language = detectLanguage(event.content, 0.90); + const [event] = await this.query([{ ids: [id] }]); - if (language) { - ext.language = language; + if (event) { + this.streamOut(event); } - } - - const imeta: string[][][] = event.tags - .filter(([name]) => name === 'imeta') - .map(([_, ...entries]) => - entries.map((entry) => { - const split = entry.split(' '); - return [split[0], split.splice(1).join(' ')]; - }) - ); - - // quirks mode - if (!imeta.length && event.kind === 1) { - const links = linkify.find(event.content).filter(({ type }) => type === 'url'); - imeta.push(...getMediaLinks(links)); - } - - if (imeta.length) { - ext.media = 'true'; - - if (imeta.every((tags) => tags.some(([name, value]) => name === 'm' && value.startsWith('video/')))) { - ext.video = 'true'; - } - } - - ext.protocol = event.tags.find(([name]) => name === 'proxy')?.[2] ?? 'nostr'; - - return ext; + }); } - constructor(private opts: EventsDBOpts) { - super(opts.kysely, { - indexTags: EventsDB.indexTags, - indexSearch: EventsDB.searchText, - indexExtensions: EventsDB.indexExtensions, - }); + protected matchesFilter(event: NostrEvent, filter: NostrFilter): boolean { + // TODO: support streaming by search. + return matchFilter(filter, event) && filter.search === undefined; + } + + streamOut(event: NostrEvent): void { + for (const { filters, machina } of this.subs.values()) { + for (const filter of filters) { + if (this.matchesFilter(event, filter)) { + machina.push(event); + } + } + } } /** Insert an event (and its tags) into the database. */ override async event(event: NostrEvent, opts: { signal?: AbortSignal; timeout?: number } = {}): Promise { event = purifyEvent(event); + logi({ level: 'debug', ns: 'ditto.event', source: 'db', id: event.id, kind: event.kind }); + + this.encounters.set(event.id, true); dbEventsCounter.inc({ kind: event.kind }); if (await this.isDeletedAdmin(event)) { @@ -141,6 +140,7 @@ class EventsDB extends NPostgres { try { await super.event(event, { ...opts, timeout: opts.timeout ?? this.opts.timeout }); + this.streamOut(event); } catch (e) { if (e instanceof Error && e.message === 'Cannot add a deleted event') { throw new RelayError('blocked', 'event deleted by user'); @@ -213,6 +213,47 @@ class EventsDB extends NPostgres { } } + override async *req( + filters: NostrFilter[], + opts?: { signal?: AbortSignal }, + ): AsyncIterable { + const subId = crypto.randomUUID(); + filters = this.normalizeFilters(filters); + + if (filters.length) { + const { db, chunkSize = 100 } = this.opts; + const rows = this.getEventsQuery(db.kysely as unknown as Kysely, filters).stream(chunkSize); + + for await (const row of rows) { + const event = this.parseEventRow(row); + yield ['EVENT', subId, event]; + + if (opts?.signal?.aborted) { + yield ['CLOSED', subId, 'aborted']; + return; + } + } + } + + yield ['EOSE', subId]; + + const machina = new Machina(opts?.signal); + + this.subs.set(subId, { filters, machina }); + internalSubscriptionsSizeGauge.set(this.subs.size); + + try { + for await (const event of machina) { + yield ['EVENT', subId, event]; + } + } catch { + yield ['CLOSED', subId, 'error: something went wrong']; + } finally { + this.subs.delete(subId); + internalSubscriptionsSizeGauge.set(this.subs.size); + } + } + /** Get events for filters from the database. */ override async query( filters: NostrFilter[], @@ -323,7 +364,7 @@ class EventsDB extends NPostgres { return event.tags.reduce((results, tag, index) => { const [name, value] = tag; - const condition = EventsDB.tagConditions[name] as TagCondition | undefined; + const condition = DittoPgStore.tagConditions[name] as TagCondition | undefined; if (value && condition && value.length < 200 && checkCondition(name, value, condition, index)) { results.push(tag); @@ -334,16 +375,63 @@ class EventsDB extends NPostgres { }, []); } + static indexExtensions(event: NostrEvent): Record { + const ext: Record = {}; + + if (event.kind === 1) { + ext.reply = event.tags.some(([name]) => name === 'e').toString(); + } else if (event.kind === 1111) { + ext.reply = event.tags.some(([name]) => ['e', 'E'].includes(name)).toString(); + } else if (event.kind === 6) { + ext.reply = 'false'; + } + + if ([1, 20, 30023].includes(event.kind)) { + const language = detectLanguage(event.content, 0.90); + + if (language) { + ext.language = language; + } + } + + const imeta: string[][][] = event.tags + .filter(([name]) => name === 'imeta') + .map(([_, ...entries]) => + entries.map((entry) => { + const split = entry.split(' '); + return [split[0], split.splice(1).join(' ')]; + }) + ); + + // quirks mode + if (!imeta.length && event.kind === 1) { + const links = linkify.find(event.content).filter(({ type }) => type === 'url'); + imeta.push(...getMediaLinks(links)); + } + + if (imeta.length) { + ext.media = 'true'; + + if (imeta.every((tags) => tags.some(([name, value]) => name === 'm' && value.startsWith('video/')))) { + ext.video = 'true'; + } + } + + ext.protocol = event.tags.find(([name]) => name === 'proxy')?.[2] ?? 'nostr'; + + return ext; + } + /** Build a search index from the event. */ static searchText(event: NostrEvent): string { switch (event.kind) { case 0: - return EventsDB.buildUserSearchContent(event); + return DittoPgStore.buildUserSearchContent(event); case 1: case 20: return nip27.replaceAll(event.content, () => ''); case 30009: - return EventsDB.buildTagsSearchContent(event.tags.filter(([t]) => t !== 'alt')); + return DittoPgStore.buildTagsSearchContent(event.tags.filter(([t]) => t !== 'alt')); case 30360: return event.tags.find(([name]) => name === 'd')?.[1] || ''; default: @@ -385,7 +473,7 @@ class EventsDB extends NPostgres { } if (domains.size || hostnames.size) { - let query = this.opts.kysely + let query = this.opts.db.kysely .selectFrom('author_stats') .select('pubkey') .where((eb) => { @@ -433,5 +521,3 @@ class EventsDB extends NPostgres { return super.transaction((store, kysely) => callback(store, kysely as unknown as Kysely)); } } - -export { EventsDB }; diff --git a/packages/ditto/storages/InternalRelay.test.ts b/packages/ditto/storages/InternalRelay.test.ts deleted file mode 100644 index c97dcd39..00000000 --- a/packages/ditto/storages/InternalRelay.test.ts +++ /dev/null @@ -1,23 +0,0 @@ -import { assertEquals } from '@std/assert'; - -import { eventFixture } from '@/test.ts'; - -import { InternalRelay } from './InternalRelay.ts'; - -Deno.test('InternalRelay', async () => { - const relay = new InternalRelay(); - const event1 = await eventFixture('event-1'); - - const promise = new Promise((resolve) => setTimeout(() => resolve(relay.event(event1)), 0)); - - for await (const msg of relay.req([{}])) { - if (msg[0] === 'EVENT') { - assertEquals(relay.subs.size, 1); - assertEquals(msg[2], event1); - break; - } - } - - await promise; - assertEquals(relay.subs.size, 0); // cleanup -}); diff --git a/packages/ditto/storages/InternalRelay.ts b/packages/ditto/storages/InternalRelay.ts deleted file mode 100644 index b2acf58d..00000000 --- a/packages/ditto/storages/InternalRelay.ts +++ /dev/null @@ -1,86 +0,0 @@ -// deno-lint-ignore-file require-await -import { - NIP50, - NostrEvent, - NostrFilter, - NostrRelayCLOSED, - NostrRelayEOSE, - NostrRelayEVENT, - NRelay, -} from '@nostrify/nostrify'; -import { Machina } from '@nostrify/nostrify/utils'; -import { matchFilter } from 'nostr-tools'; -import { Gauge } from 'prom-client'; - -import { DittoEvent } from '@/interfaces/DittoEvent.ts'; -import { purifyEvent } from '../../utils/purify.ts'; - -interface InternalRelayOpts { - gauge?: Gauge; -} - -/** - * PubSub event store for streaming events within the application. - * The pipeline should push events to it, then anything in the application can subscribe to it. - */ -export class InternalRelay implements NRelay { - readonly subs = new Map }>(); - - constructor(private opts: InternalRelayOpts = {}) {} - - async *req( - filters: NostrFilter[], - opts?: { signal?: AbortSignal }, - ): AsyncGenerator { - const id = crypto.randomUUID(); - const machina = new Machina(opts?.signal); - - yield ['EOSE', id]; - - this.subs.set(id, { filters, machina }); - this.opts.gauge?.set(this.subs.size); - - try { - for await (const event of machina) { - yield ['EVENT', id, event]; - } - } finally { - this.subs.delete(id); - this.opts.gauge?.set(this.subs.size); - } - } - - async event(event: DittoEvent): Promise { - for (const { filters, machina } of this.subs.values()) { - for (const filter of filters) { - if (matchFilter(filter, event)) { - if (filter.search) { - const tokens = NIP50.parseInput(filter.search); - - const domain = (tokens.find((t) => - typeof t === 'object' && t.key === 'domain' - ) as { key: 'domain'; value: string } | undefined)?.value; - - if (domain === event.author_stats?.nip05_hostname) { - machina.push(purifyEvent(event)); - break; - } - } else { - machina.push(purifyEvent(event)); - break; - } - } - } - } - - return Promise.resolve(); - } - - async query(): Promise { - return []; - } - - async close(): Promise { - return Promise.resolve(); - } -}