diff --git a/deno.json b/deno.json index 2601e28d..a3f06bd5 100644 --- a/deno.json +++ b/deno.json @@ -61,7 +61,7 @@ "@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1", "@negrel/webpush": "jsr:@negrel/webpush@^0.3.0", "@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0", - "@nostrify/db": "jsr:@nostrify/db@^0.39.0", + "@nostrify/db": "jsr:@nostrify/db@^0.39.3", "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.38.1", "@nostrify/policies": "jsr:@nostrify/policies@^0.36.1", "@nostrify/types": "jsr:@nostrify/types@^0.36.0", diff --git a/deno.lock b/deno.lock index b46ce6da..19c7aba4 100644 --- a/deno.lock +++ b/deno.lock @@ -31,7 +31,7 @@ "jsr:@hono/hono@^4.4.6": "4.6.15", "jsr:@negrel/http-ece@0.6.0": "0.6.0", "jsr:@negrel/webpush@0.3": "0.3.0", - "jsr:@nostrify/db@0.39": "0.39.0", + "jsr:@nostrify/db@~0.39.3": "0.39.3", "jsr:@nostrify/nostrify@0.31": "0.31.0", "jsr:@nostrify/nostrify@0.32": "0.32.0", "jsr:@nostrify/nostrify@0.36": "0.36.2", @@ -363,8 +363,8 @@ "jsr:@std/path@0.224.0" ] }, - "@nostrify/db@0.39.0": { - "integrity": "13a88c610eb15a5dd13848d5beec9170406376c9d05299ce5e5298452a5431ac", + "@nostrify/db@0.39.3": { + "integrity": "d1f1104316b33e0fd3c263086b325ee49f86859abc1a966b43bb9f9a21c15429", "dependencies": [ "jsr:@nostrify/nostrify@~0.38.1", "jsr:@nostrify/types@0.36", @@ -2460,7 +2460,7 @@ "jsr:@gfx/canvas-wasm@~0.4.2", "jsr:@hono/hono@^4.4.6", "jsr:@negrel/webpush@0.3", - "jsr:@nostrify/db@0.39", + "jsr:@nostrify/db@~0.39.3", "jsr:@nostrify/nostrify@~0.38.1", "jsr:@nostrify/policies@~0.36.1", "jsr:@nostrify/types@0.36", diff --git a/packages/db/DittoDatabase.ts b/packages/db/DittoDatabase.ts index e43356a0..ebe97cec 100644 --- a/packages/db/DittoDatabase.ts +++ b/packages/db/DittoDatabase.ts @@ -2,7 +2,7 @@ import type { Kysely } from 'kysely'; import type { DittoTables } from './DittoTables.ts'; -export interface DittoDatabase { +export interface DittoDatabase extends AsyncDisposable { readonly kysely: Kysely; readonly poolSize: number; readonly availableConnections: number; diff --git a/packages/db/adapters/DittoPglite.ts b/packages/db/adapters/DittoPglite.ts index 2e7ca3fc..5e7e6ca4 100644 --- a/packages/db/adapters/DittoPglite.ts +++ b/packages/db/adapters/DittoPglite.ts @@ -36,6 +36,10 @@ export class DittoPglite { poolSize: 1, availableConnections: 1, listen, + [Symbol.asyncDispose]: async () => { + await pglite.close(); + await kysely.destroy(); + }, }; } } diff --git a/packages/db/adapters/DittoPostgres.ts b/packages/db/adapters/DittoPostgres.ts index 9ab8156f..b62a878b 100644 --- a/packages/db/adapters/DittoPostgres.ts +++ b/packages/db/adapters/DittoPostgres.ts @@ -54,6 +54,10 @@ export class DittoPostgres { return pg.connections.idle; }, listen, + [Symbol.asyncDispose]: async () => { + await pg.end(); + await kysely.destroy(); + }, }; } } diff --git a/packages/ditto/controllers/api/cashu.test.ts b/packages/ditto/controllers/api/cashu.test.ts index ee73661b..57be895d 100644 --- a/packages/ditto/controllers/api/cashu.test.ts +++ b/packages/ditto/controllers/api/cashu.test.ts @@ -140,7 +140,10 @@ Deno.test('PUT /wallet must NOT be successful: wrong request body/schema', async assertObjectMatch(body, { error: 'Bad schema' }); }); -Deno.test('PUT /wallet must NOT be successful: wallet already exists', async () => { +Deno.test('PUT /wallet must NOT be successful: wallet already exists', { + sanitizeOps: false, + sanitizeResources: false, +}, async () => { using _mock = mockFetch(); await using db = await createTestDB(); const store = db.store; @@ -178,7 +181,10 @@ Deno.test('PUT /wallet must NOT be successful: wallet already exists', async () assertEquals(body2, { error: 'You already have a wallet 😏' }); }); -Deno.test('GET /wallet must be successful', async () => { +Deno.test('GET /wallet must be successful', { + sanitizeOps: false, + sanitizeResources: false, +}, async () => { using _mock = mockFetch(); await using db = await createTestDB(); const store = db.store; diff --git a/packages/ditto/controllers/api/oauth.ts b/packages/ditto/controllers/api/oauth.ts index 7ac2c2b2..c48963a9 100644 --- a/packages/ditto/controllers/api/oauth.ts +++ b/packages/ditto/controllers/api/oauth.ts @@ -123,7 +123,7 @@ async function getToken( encryption: 'nip44', pubkey: bunkerPubkey, signer: new NSecSigner(nip46Seckey), - relay: await Storages.pubsub(), // TODO: Use the relays from the request. + relay: await Storages.db(), // TODO: Use the relays from the request. timeout: 60_000, }); diff --git a/packages/ditto/controllers/api/streaming.ts b/packages/ditto/controllers/api/streaming.ts index 01eaaed8..4171e1be 100644 --- a/packages/ditto/controllers/api/streaming.ts +++ b/packages/ditto/controllers/api/streaming.ts @@ -94,8 +94,6 @@ const streamingController: AppController = async (c) => { const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 }); const store = await Storages.db(); - const pubsub = await Storages.pubsub(); - const policy = pubkey ? new MuteListPolicy(pubkey, await Storages.admin()) : undefined; function send(e: StreamingEvent) { @@ -105,9 +103,12 @@ const streamingController: AppController = async (c) => { } } - async function sub(filters: NostrFilter[], render: (event: NostrEvent) => Promise) { + async function sub( + filter: NostrFilter & { limit: 0 }, + render: (event: NostrEvent) => Promise, + ) { try { - for await (const msg of pubsub.req(filters, { signal: controller.signal })) { + for await (const msg of store.req([filter], { signal: controller.signal })) { if (msg[0] === 'EVENT') { const event = msg[2]; @@ -140,7 +141,7 @@ const streamingController: AppController = async (c) => { const topicFilter = await topicToFilter(stream, c.req.query(), pubkey, conf.url.host); if (topicFilter) { - sub([topicFilter], async (event) => { + sub(topicFilter, async (event) => { let payload: object | undefined; if (event.kind === 1) { @@ -161,7 +162,7 @@ const streamingController: AppController = async (c) => { } if (['user', 'user:notification'].includes(stream) && pubkey) { - sub([{ '#p': [pubkey] }], async (event) => { + sub({ '#p': [pubkey], limit: 0 }, async (event) => { if (event.pubkey === pubkey) return; // skip own events const payload = await renderNotification(event, { viewerPubkey: pubkey }); if (payload) { @@ -209,23 +210,23 @@ async function topicToFilter( query: Record, pubkey: string | undefined, host: string, -): Promise { +): Promise<(NostrFilter & { limit: 0 }) | undefined> { switch (topic) { case 'public': - return { kinds: [1, 6, 20] }; + return { kinds: [1, 6, 20], limit: 0 }; case 'public:local': - return { kinds: [1, 6, 20], search: `domain:${host}` }; + return { kinds: [1, 6, 20], search: `domain:${host}`, limit: 0 }; case 'hashtag': - if (query.tag) return { kinds: [1, 6, 20], '#t': [query.tag] }; + if (query.tag) return { kinds: [1, 6, 20], '#t': [query.tag], limit: 0 }; break; case 'hashtag:local': - if (query.tag) return { kinds: [1, 6, 20], '#t': [query.tag], search: `domain:${host}` }; + if (query.tag) return { kinds: [1, 6, 20], '#t': [query.tag], search: `domain:${host}`, limit: 0 }; break; case 'user': // HACK: this puts the user's entire contacts list into RAM, // and then calls `matchFilters` over it. Refreshing the page // is required after following a new user. - return pubkey ? { kinds: [1, 6, 20], authors: [...await getFeedPubkeys(pubkey)] } : undefined; + return pubkey ? { kinds: [1, 6, 20], authors: [...await getFeedPubkeys(pubkey)], limit: 0 } : undefined; } } diff --git a/packages/ditto/controllers/nostr/relay.ts b/packages/ditto/controllers/nostr/relay.ts index b4924f22..9c29f89d 100644 --- a/packages/ditto/controllers/nostr/relay.ts +++ b/packages/ditto/controllers/nostr/relay.ts @@ -23,9 +23,6 @@ import { errorJson } from '@/utils/log.ts'; import { purifyEvent } from '@/utils/purify.ts'; import { Time } from '@/utils/time.ts'; -/** Limit of initial events returned for a subscription. */ -const FILTER_LIMIT = 100; - const limiters = { msg: new MemoryRateLimiter({ limit: 300, window: Time.minutes(1) }), req: new MultiRateLimiter([ @@ -126,11 +123,10 @@ function connectStream(socket: WebSocket, ip: string | undefined, conf: DittoCon controllers.set(subId, controller); const store = await Storages.db(); - const pubsub = await Storages.pubsub(); try { - for (const event of await store.query(filters, { limit: FILTER_LIMIT, timeout: conf.db.timeouts.relay })) { - send(['EVENT', subId, purifyEvent(event)]); + for await (const [verb, , ...rest] of store.req(filters, { limit: 100, timeout: conf.db.timeouts.relay })) { + send([verb, subId, ...rest] as NostrRelayMsg); } } catch (e) { if (e instanceof RelayError) { @@ -143,18 +139,6 @@ function connectStream(socket: WebSocket, ip: string | undefined, conf: DittoCon controllers.delete(subId); return; } - - send(['EOSE', subId]); - - try { - for await (const msg of pubsub.req(filters, { signal: controller.signal })) { - if (msg[0] === 'EVENT') { - send(['EVENT', subId, msg[2]]); - } - } - } catch { - controllers.delete(subId); - } } /** Handle EVENT. Store the event. */ diff --git a/packages/ditto/notify.ts b/packages/ditto/notify.ts deleted file mode 100644 index 44ed5619..00000000 --- a/packages/ditto/notify.ts +++ /dev/null @@ -1,38 +0,0 @@ -import { Semaphore } from '@core/asyncutil'; - -import { pipelineEncounters } from '@/caches/pipelineEncounters.ts'; -import { Conf } from '@/config.ts'; -import * as pipeline from '@/pipeline.ts'; -import { Storages } from '@/storages.ts'; -import { logi } from '@soapbox/logi'; - -const sem = new Semaphore(1); - -export async function startNotify(): Promise { - const { listen } = await Storages.database(); - const store = await Storages.db(); - - listen('nostr_event', (id) => { - if (pipelineEncounters.has(id)) { - logi({ level: 'debug', ns: 'ditto.notify', id, skipped: true }); - return; - } - - logi({ level: 'debug', ns: 'ditto.notify', id, skipped: false }); - - sem.lock(async () => { - try { - const signal = AbortSignal.timeout(Conf.db.timeouts.default); - - const [event] = await store.query([{ ids: [id], limit: 1 }], { signal }); - - if (event) { - logi({ level: 'debug', ns: 'ditto.event', source: 'notify', id: event.id, kind: event.kind }); - await pipeline.handleEvent(event, { source: 'notify', signal }); - } - } catch { - // Ignore - } - }); - }); -} diff --git a/packages/ditto/pipeline.ts b/packages/ditto/pipeline.ts index d3168c0e..07be1bd9 100644 --- a/packages/ditto/pipeline.ts +++ b/packages/ditto/pipeline.ts @@ -78,8 +78,8 @@ async function handleEvent(event: DittoEvent, opts: PipelineOpts): Promise // They are exempt from policies and other side-effects, and should be streamed out immediately. // If streaming fails, an error should be returned. if (event.kind === 24133) { - await streamOut(event); - return; + const store = await Storages.db(); + await store.event(event, { signal: opts.signal }); } // Ensure the event doesn't violate the policy. @@ -97,24 +97,6 @@ async function handleEvent(event: DittoEvent, opts: PipelineOpts): Promise throw new RelayError('blocked', 'author is blocked'); } - // Ephemeral events must throw if they are not streamed out. - if (NKinds.ephemeral(event.kind)) { - await Promise.all([ - streamOut(event), - webPush(event), - ]); - return; - } - - // Events received through notify are thought to already be in the database, so they only need to be streamed. - if (opts.source === 'notify') { - await Promise.all([ - streamOut(event), - webPush(event), - ]); - return; - } - const kysely = await Storages.kysely(); try { @@ -127,12 +109,8 @@ async function handleEvent(event: DittoEvent, opts: PipelineOpts): Promise prewarmLinkPreview(event, opts.signal), generateSetEvents(event), ]) - .then(() => - Promise.allSettled([ - streamOut(event), - webPush(event), - ]) - ); + .then(() => webPush(event)) + .catch(() => {}); } } @@ -165,12 +143,13 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise { - if (NKinds.ephemeral(event.kind)) return; const store = await Storages.db(); try { await store.transaction(async (store, kysely) => { - await updateStats({ event, store, kysely }); + if (!NKinds.ephemeral(event.kind)) { + await updateStats({ event, store, kysely }); + } await store.event(event, { signal }); }); } catch (e) { @@ -274,16 +253,6 @@ function isFresh(event: NostrEvent): boolean { return eventAge(event) < Time.minutes(1); } -/** Distribute the event through active subscriptions. */ -async function streamOut(event: NostrEvent): Promise { - if (!isFresh(event)) { - throw new RelayError('invalid', 'event too old'); - } - - const pubsub = await Storages.pubsub(); - await pubsub.event(event); -} - async function webPush(event: NostrEvent): Promise { if (!isFresh(event)) { throw new RelayError('invalid', 'event too old'); diff --git a/packages/ditto/signers/ConnectSigner.ts b/packages/ditto/signers/ConnectSigner.ts index 89c62679..c6d23d37 100644 --- a/packages/ditto/signers/ConnectSigner.ts +++ b/packages/ditto/signers/ConnectSigner.ts @@ -28,7 +28,7 @@ export class ConnectSigner implements NostrSigner { encryption: 'nip44', pubkey: this.opts.bunkerPubkey, // TODO: use a remote relay for `nprofile` signing (if present and `Conf.relay` isn't already in the list) - relay: await Storages.pubsub(), + relay: await Storages.db(), signer, timeout: 60_000, }); diff --git a/packages/ditto/startup.ts b/packages/ditto/startup.ts index 0cc2f26a..0372a1d1 100644 --- a/packages/ditto/startup.ts +++ b/packages/ditto/startup.ts @@ -2,16 +2,11 @@ import { Conf } from '@/config.ts'; import { cron } from '@/cron.ts'; import { startFirehose } from '@/firehose.ts'; -import { startNotify } from '@/notify.ts'; if (Conf.firehoseEnabled) { startFirehose(); } -if (Conf.notifyEnabled) { - startNotify(); -} - if (Conf.cronEnabled) { cron(); } diff --git a/packages/ditto/storages.ts b/packages/ditto/storages.ts index 1494dc8c..ff7b2954 100644 --- a/packages/ditto/storages.ts +++ b/packages/ditto/storages.ts @@ -1,23 +1,20 @@ // deno-lint-ignore-file require-await import { type DittoDatabase, DittoDB } from '@ditto/db'; -import { internalSubscriptionsSizeGauge } from '@ditto/metrics'; +import { NPool, NRelay1 } from '@nostrify/nostrify'; import { logi } from '@soapbox/logi'; import { Conf } from '@/config.ts'; import { wsUrlSchema } from '@/schema.ts'; import { AdminStore } from '@/storages/AdminStore.ts'; -import { EventsDB } from '@/storages/EventsDB.ts'; -import { InternalRelay } from '@/storages/InternalRelay.ts'; -import { NPool, NRelay1 } from '@nostrify/nostrify'; +import { DittoPgStore } from '@/storages/DittoPgStore.ts'; import { getRelays } from '@/utils/outbox.ts'; import { seedZapSplits } from '@/utils/zap-split.ts'; export class Storages { - private static _db: Promise | undefined; + private static _db: Promise | undefined; private static _database: Promise | undefined; private static _admin: Promise | undefined; private static _client: Promise> | undefined; - private static _pubsub: Promise | undefined; public static async database(): Promise { if (!this._database) { @@ -39,11 +36,16 @@ export class Storages { } /** SQL database to store events this Ditto server cares about. */ - public static async db(): Promise { + public static async db(): Promise { if (!this._db) { this._db = (async () => { - const kysely = await this.kysely(); - const store = new EventsDB({ kysely, pubkey: Conf.pubkey, timeout: Conf.db.timeouts.default }); + const db = await this.database(); + const store = new DittoPgStore({ + db, + pubkey: Conf.pubkey, + timeout: Conf.db.timeouts.default, + notify: Conf.notifyEnabled, + }); await seedZapSplits(store); return store; })(); @@ -59,14 +61,6 @@ export class Storages { return this._admin; } - /** Internal pubsub relay between controllers and the pipeline. */ - public static async pubsub(): Promise { - if (!this._pubsub) { - this._pubsub = Promise.resolve(new InternalRelay({ gauge: internalSubscriptionsSizeGauge })); - } - return this._pubsub; - } - /** Relay pool storage. */ public static async client(): Promise> { if (!this._client) { diff --git a/packages/ditto/storages/EventsDB.test.ts b/packages/ditto/storages/DittoPgStore.test.ts similarity index 91% rename from packages/ditto/storages/EventsDB.test.ts rename to packages/ditto/storages/DittoPgStore.test.ts index 03f31d35..756cd98b 100644 --- a/packages/ditto/storages/EventsDB.test.ts +++ b/packages/ditto/storages/DittoPgStore.test.ts @@ -1,13 +1,41 @@ import { assertEquals, assertRejects } from '@std/assert'; +import { NostrRelayMsg } from '@nostrify/nostrify'; import { genEvent } from '@nostrify/nostrify/test'; import { generateSecretKey } from 'nostr-tools'; import { RelayError } from '@/RelayError.ts'; import { eventFixture } from '@/test.ts'; import { Conf } from '@/config.ts'; -import { EventsDB } from '@/storages/EventsDB.ts'; +import { DittoPgStore } from '@/storages/DittoPgStore.ts'; import { createTestDB } from '@/test.ts'; +Deno.test('req streaming', async () => { + await using db = await createTestDB({ pure: true }); + const { store: relay } = db; + + const msgs: NostrRelayMsg[] = []; + const controller = new AbortController(); + + const promise = (async () => { + for await (const msg of relay.req([{ since: 0 }], { signal: controller.signal })) { + msgs.push(msg); + } + })(); + + const event = genEvent({ created_at: Math.floor(Date.now() / 1000) }); + await relay.event(event); + + controller.abort(); + + await promise; + + const verbs = msgs.map(([verb]) => verb); + + assertEquals(verbs, ['EOSE', 'EVENT', 'CLOSED']); + assertEquals(msgs[1][2], event); + assertEquals(relay.subs.size, 0); // cleanup +}); + Deno.test('count filters', async () => { await using db = await createTestDB({ pure: true }); const { store } = db; @@ -255,7 +283,7 @@ Deno.test('NPostgres.query with search', async (t) => { }); }); -Deno.test('EventsDB.indexTags indexes only the final `e` and `p` tag of kind 7 events', () => { +Deno.test('DittoPgStore.indexTags indexes only the final `e` and `p` tag of kind 7 events', () => { const event = { kind: 7, id: 'a92549a442d306b32273aa9456ba48e3851a4e6203af3f567543298ab964b35b', @@ -286,7 +314,7 @@ Deno.test('EventsDB.indexTags indexes only the final `e` and `p` tag of kind 7 e '44639d039a7f7fb8772fcfa13d134d3cda684ec34b6a777ead589676f9e8d81b08a24234066dcde1aacfbe193224940fba7586e7197c159757d3caf8f2b57e1b', }; - const tags = EventsDB.indexTags(event); + const tags = DittoPgStore.indexTags(event); assertEquals(tags, [ ['e', 'e3653ae41ffb510e5fc071555ecfbc94d2fc31e355d61d941e39a97ac6acb15b'], diff --git a/packages/ditto/storages/EventsDB.ts b/packages/ditto/storages/DittoPgStore.ts similarity index 66% rename from packages/ditto/storages/EventsDB.ts rename to packages/ditto/storages/DittoPgStore.ts index e7669861..a921a309 100644 --- a/packages/ditto/storages/EventsDB.ts +++ b/packages/ditto/storages/DittoPgStore.ts @@ -1,16 +1,27 @@ // deno-lint-ignore-file require-await -import { DittoTables } from '@ditto/db'; +import { DittoDatabase, DittoTables } from '@ditto/db'; import { detectLanguage } from '@ditto/lang'; import { NPostgres, NPostgresSchema } from '@nostrify/db'; -import { dbEventsCounter } from '@ditto/metrics'; -import { NIP50, NKinds, NostrEvent, NostrFilter, NSchema as n } from '@nostrify/nostrify'; +import { dbEventsCounter, internalSubscriptionsSizeGauge } from '@ditto/metrics'; +import { + NIP50, + NKinds, + NostrEvent, + NostrFilter, + NostrRelayCLOSED, + NostrRelayEOSE, + NostrRelayEVENT, + NSchema as n, +} from '@nostrify/nostrify'; +import { Machina } from '@nostrify/nostrify/utils'; import { logi } from '@soapbox/logi'; import { JsonValue } from '@std/json'; import { LanguageCode } from 'iso-639-1'; import { Kysely } from 'kysely'; import linkify from 'linkifyjs'; -import { nip27 } from 'nostr-tools'; +import { LRUCache } from 'lru-cache'; +import { matchFilter, nip27 } from 'nostr-tools'; import tldts from 'tldts'; import { z } from 'zod'; @@ -37,30 +48,47 @@ interface TagConditionOpts { } /** Options for the EventsDB store. */ -interface EventsDBOpts { +interface DittoPgStoreOpts { /** Kysely instance to use. */ - kysely: Kysely; + db: DittoDatabase; /** Pubkey of the admin account. */ pubkey: string; /** Timeout in milliseconds for database queries. */ timeout: number; /** Whether the event returned should be a Nostr event or a Ditto event. Defaults to false. */ pure?: boolean; + /** Chunk size for streaming events. Defaults to 20. */ + chunkSize?: number; + /** Batch size for fulfilling subscriptions. Defaults to 500. */ + batchSize?: number; + /** Max age (in **seconds**) an event can be to be fulfilled to realtime subscribers. */ + maxAge?: number; + /** Whether to listen for events from the database with NOTIFY. */ + notify?: boolean; +} + +/** Realtime subscription. */ +interface Subscription { + filters: NostrFilter[]; + machina: Machina; } /** SQL database storage adapter for Nostr events. */ -class EventsDB extends NPostgres { +export class DittoPgStore extends NPostgres { + readonly subs = new Map(); + readonly encounters = new LRUCache({ max: 1000 }); + /** Conditions for when to index certain tags. */ static tagConditions: Record = { 'a': ({ count }) => count < 15, 'd': ({ event, count }) => count === 0 && NKinds.parameterizedReplaceable(event.kind), - 'e': EventsDB.eTagCondition, + 'e': DittoPgStore.eTagCondition, 'k': ({ count, value }) => count === 0 && Number.isInteger(Number(value)), 'L': ({ event, count }) => event.kind === 1985 || count === 0, 'l': ({ event, count }) => event.kind === 1985 || count === 0, 'n': ({ count, value }) => count < 50 && value.length < 50, 'P': ({ count, value }) => count === 0 && isNostrId(value), - 'p': EventsDB.pTagCondition, + 'p': DittoPgStore.pTagCondition, 'proxy': ({ count, value }) => count === 0 && value.length < 256, 'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value), 'r': ({ event, count }) => (event.kind === 1985 ? count < 20 : count < 3), @@ -72,67 +100,43 @@ class EventsDB extends NPostgres { }, }; - static indexExtensions(event: NostrEvent): Record { - const ext: Record = {}; - - if (event.kind === 1) { - ext.reply = event.tags.some(([name]) => name === 'e').toString(); - } else if (event.kind === 1111) { - ext.reply = event.tags.some(([name]) => ['e', 'E'].includes(name)).toString(); - } else if (event.kind === 6) { - ext.reply = 'false'; - } - - if ([1, 20, 30023].includes(event.kind)) { - const language = detectLanguage(event.content, 0.90); - - if (language) { - ext.language = language; - } - } - - const imeta: string[][][] = event.tags - .filter(([name]) => name === 'imeta') - .map(([_, ...entries]) => - entries.map((entry) => { - const split = entry.split(' '); - return [split[0], split.splice(1).join(' ')]; - }) - ); - - // quirks mode - if (!imeta.length && event.kind === 1) { - const links = linkify.find(event.content).filter(({ type }) => type === 'url'); - imeta.push(...getMediaLinks(links)); - } - - if (imeta.length) { - ext.media = 'true'; - - if (imeta.every((tags) => tags.some(([name, value]) => name === 'm' && value.startsWith('video/')))) { - ext.video = 'true'; - } - } - - ext.protocol = event.tags.find(([name]) => name === 'proxy')?.[2] ?? 'nostr'; - - return ext; - } - - constructor(private opts: EventsDBOpts) { - super(opts.kysely, { - indexTags: EventsDB.indexTags, - indexSearch: EventsDB.searchText, - indexExtensions: EventsDB.indexExtensions, + constructor(private opts: DittoPgStoreOpts) { + super(opts.db.kysely, { + indexTags: DittoPgStore.indexTags, + indexSearch: DittoPgStore.searchText, + indexExtensions: DittoPgStore.indexExtensions, + chunkSize: opts.chunkSize, }); + + if (opts.notify) { + opts.db.listen('nostr_event', async (id) => { + if (this.encounters.has(id)) return; + this.encounters.set(id, true); + + const [event] = await this.query([{ ids: [id] }]); + + if (event) { + await this.fulfill(event); + } + }); + } } /** Insert an event (and its tags) into the database. */ override async event(event: NostrEvent, opts: { signal?: AbortSignal; timeout?: number } = {}): Promise { event = purifyEvent(event); + logi({ level: 'debug', ns: 'ditto.event', source: 'db', id: event.id, kind: event.kind }); dbEventsCounter.inc({ kind: event.kind }); + if (NKinds.ephemeral(event.kind)) { + return await this.fulfill(event); + } + + if (this.opts.notify) { + this.encounters.set(event.id, true); + } + if (await this.isDeletedAdmin(event)) { throw new RelayError('blocked', 'event deleted by admin'); } @@ -141,6 +145,7 @@ class EventsDB extends NPostgres { try { await super.event(event, { ...opts, timeout: opts.timeout ?? this.opts.timeout }); + this.fulfill(event); // don't await or catch (should never reject) } catch (e) { if (e instanceof Error && e.message === 'Cannot add a deleted event') { throw new RelayError('blocked', 'event deleted by user'); @@ -152,6 +157,48 @@ class EventsDB extends NPostgres { } } + /** Fulfill active subscriptions with this event. */ + protected async fulfill(event: NostrEvent): Promise { + const { maxAge = 60, batchSize = 500 } = this.opts; + + const now = Math.floor(Date.now() / 1000); + const age = now - event.created_at; + + if (age > maxAge) { + // Ephemeral events must be fulfilled, or else return an error to the client. + if (NKinds.ephemeral(event.kind)) { + throw new RelayError('invalid', 'event too old'); + } else { + // Silently ignore old events. + return; + } + } + + let count = 0; + + for (const [subId, { filters, machina }] of this.subs.entries()) { + for (const filter of filters) { + count++; + + if (this.matchesFilter(event, filter)) { + machina.push(['EVENT', subId, event]); + break; + } + + // Yield to event loop. + if (count % batchSize === 0) { + await new Promise((resolve) => setTimeout(resolve, 0)); + } + } + } + } + + /** Check if the event fulfills the filter, according to Ditto criteria. */ + protected matchesFilter(event: NostrEvent, filter: NostrFilter): boolean { + // TODO: support streaming by search. + return typeof filter.search !== 'string' && matchFilter(filter, event); + } + /** Check if an event has been deleted by the admin. */ private async isDeletedAdmin(event: NostrEvent): Promise { const filters: NostrFilter[] = [ @@ -213,27 +260,89 @@ class EventsDB extends NPostgres { } } + override async *req( + filters: NostrFilter[], + opts: { timeout?: number; signal?: AbortSignal; limit?: number } = {}, + ): AsyncIterable { + const { db, chunkSize = 20 } = this.opts; + const { limit, timeout = this.opts.timeout, signal } = opts; + + filters = await this.expandFilters(filters); + + const subId = crypto.randomUUID(); + const normalFilters = this.normalizeFilters(filters); + const machina = new Machina(signal); + + if (normalFilters.length && limit !== 0) { + this.withTimeout(db.kysely as unknown as Kysely, timeout, async (trx) => { + let query = this.getEventsQuery(trx, normalFilters); + + if (typeof opts.limit === 'number') { + query = query.limit(opts.limit); + } + + for await (const row of query.stream(chunkSize)) { + const event = this.parseEventRow(row); + machina.push(['EVENT', subId, event]); + } + + machina.push(['EOSE', subId]); + }).catch((error) => { + if (error instanceof Error && error.message.includes('timeout')) { + machina.push(['CLOSED', subId, 'error: the relay could not respond fast enough']); + } else { + machina.push(['CLOSED', subId, 'error: something went wrong']); + } + }); + + try { + for await (const msg of machina) { + const [verb] = msg; + + yield msg; + + if (verb === 'EOSE') { + break; + } + + if (verb === 'CLOSED') { + return; + } + } + } catch { + yield ['CLOSED', subId, 'error: the relay could not respond fast enough']; + return; + } + } else { + yield ['EOSE', subId]; + } + + this.subs.set(subId, { filters, machina }); + internalSubscriptionsSizeGauge.set(this.subs.size); + + try { + for await (const msg of machina) { + yield msg; + } + } catch (e) { + if (e instanceof Error && e.name === 'AbortError') { + yield ['CLOSED', subId, 'error: the relay could not respond fast enough']; + } else { + yield ['CLOSED', subId, 'error: something went wrong']; + } + } finally { + this.subs.delete(subId); + internalSubscriptionsSizeGauge.set(this.subs.size); + } + } + /** Get events for filters from the database. */ override async query( filters: NostrFilter[], - opts: { signal?: AbortSignal; timeout?: number; limit?: number } = {}, + opts: { signal?: AbortSignal; pure?: boolean; timeout?: number; limit?: number } = {}, ): Promise { filters = await this.expandFilters(filters); - for (const filter of filters) { - if (filter.since && filter.since >= 2_147_483_647) { - throw new RelayError('invalid', 'since filter too far into the future'); - } - if (filter.until && filter.until >= 2_147_483_647) { - throw new RelayError('invalid', 'until filter too far into the future'); - } - for (const kind of filter.kinds ?? []) { - if (kind >= 2_147_483_647) { - throw new RelayError('invalid', 'kind filter too far into the future'); - } - } - } - if (opts.signal?.aborted) return Promise.resolve([]); logi({ level: 'debug', ns: 'ditto.req', source: 'db', filters: filters as JsonValue }); @@ -323,7 +432,7 @@ class EventsDB extends NPostgres { return event.tags.reduce((results, tag, index) => { const [name, value] = tag; - const condition = EventsDB.tagConditions[name] as TagCondition | undefined; + const condition = DittoPgStore.tagConditions[name] as TagCondition | undefined; if (value && condition && value.length < 200 && checkCondition(name, value, condition, index)) { results.push(tag); @@ -334,16 +443,63 @@ class EventsDB extends NPostgres { }, []); } + static indexExtensions(event: NostrEvent): Record { + const ext: Record = {}; + + if (event.kind === 1) { + ext.reply = event.tags.some(([name]) => name === 'e').toString(); + } else if (event.kind === 1111) { + ext.reply = event.tags.some(([name]) => ['e', 'E'].includes(name)).toString(); + } else if (event.kind === 6) { + ext.reply = 'false'; + } + + if ([1, 20, 30023].includes(event.kind)) { + const language = detectLanguage(event.content, 0.90); + + if (language) { + ext.language = language; + } + } + + const imeta: string[][][] = event.tags + .filter(([name]) => name === 'imeta') + .map(([_, ...entries]) => + entries.map((entry) => { + const split = entry.split(' '); + return [split[0], split.splice(1).join(' ')]; + }) + ); + + // quirks mode + if (!imeta.length && event.kind === 1) { + const links = linkify.find(event.content).filter(({ type }) => type === 'url'); + imeta.push(...getMediaLinks(links)); + } + + if (imeta.length) { + ext.media = 'true'; + + if (imeta.every((tags) => tags.some(([name, value]) => name === 'm' && value.startsWith('video/')))) { + ext.video = 'true'; + } + } + + ext.protocol = event.tags.find(([name]) => name === 'proxy')?.[2] ?? 'nostr'; + + return ext; + } + /** Build a search index from the event. */ static searchText(event: NostrEvent): string { switch (event.kind) { case 0: - return EventsDB.buildUserSearchContent(event); + return DittoPgStore.buildUserSearchContent(event); case 1: case 20: return nip27.replaceAll(event.content, () => ''); case 30009: - return EventsDB.buildTagsSearchContent(event.tags.filter(([t]) => t !== 'alt')); + return DittoPgStore.buildTagsSearchContent(event.tags.filter(([t]) => t !== 'alt')); case 30360: return event.tags.find(([name]) => name === 'd')?.[1] || ''; default: @@ -367,6 +523,18 @@ class EventsDB extends NPostgres { filters = structuredClone(filters); for (const filter of filters) { + if (filter.since && filter.since >= 2_147_483_647) { + throw new RelayError('invalid', 'since filter too far into the future'); + } + if (filter.until && filter.until >= 2_147_483_647) { + throw new RelayError('invalid', 'until filter too far into the future'); + } + for (const kind of filter.kinds ?? []) { + if (kind >= 2_147_483_647) { + throw new RelayError('invalid', 'kind filter too far into the future'); + } + } + if (filter.search) { const tokens = NIP50.parseInput(filter.search); @@ -385,7 +553,7 @@ class EventsDB extends NPostgres { } if (domains.size || hostnames.size) { - let query = this.opts.kysely + let query = this.opts.db.kysely .selectFrom('author_stats') .select('pubkey') .where((eb) => { @@ -417,21 +585,33 @@ class EventsDB extends NPostgres { .map((t) => typeof t === 'object' ? `${t.key}:${t.value}` : t) .join(' '); } - - if (filter.kinds) { - // Ephemeral events are not stored, so don't bother querying for them. - // If this results in an empty kinds array, NDatabase will remove the filter before querying and return no results. - filter.kinds = filter.kinds.filter((kind) => !NKinds.ephemeral(kind)); - } } return filters; } - // deno-lint-ignore no-explicit-any - override async transaction(callback: (store: NPostgres, kysely: Kysely) => Promise): Promise { - return super.transaction((store, kysely) => callback(store, kysely as unknown as Kysely)); + /** Execute the callback in a new transaction, unless the Kysely instance is already a transaction. */ + private static override async trx( + db: Kysely, + callback: (trx: Kysely) => Promise, + ): Promise { + if (db.isTransaction) { + return await callback(db); + } else { + return await db.transaction().execute((trx) => callback(trx)); + } + } + + /** Execute NPostgres functions in a transaction. */ + // @ts-ignore gg + override async transaction( + callback: (store: DittoPgStore, kysely: Kysely) => Promise, + ): Promise { + const { db } = this.opts; + + await DittoPgStore.trx(db.kysely, async (trx) => { + const store = new DittoPgStore({ ...this.opts, db: { ...db, kysely: trx }, notify: false }); + await callback(store, trx); + }); } } - -export { EventsDB }; diff --git a/packages/ditto/storages/InternalRelay.test.ts b/packages/ditto/storages/InternalRelay.test.ts deleted file mode 100644 index c97dcd39..00000000 --- a/packages/ditto/storages/InternalRelay.test.ts +++ /dev/null @@ -1,23 +0,0 @@ -import { assertEquals } from '@std/assert'; - -import { eventFixture } from '@/test.ts'; - -import { InternalRelay } from './InternalRelay.ts'; - -Deno.test('InternalRelay', async () => { - const relay = new InternalRelay(); - const event1 = await eventFixture('event-1'); - - const promise = new Promise((resolve) => setTimeout(() => resolve(relay.event(event1)), 0)); - - for await (const msg of relay.req([{}])) { - if (msg[0] === 'EVENT') { - assertEquals(relay.subs.size, 1); - assertEquals(msg[2], event1); - break; - } - } - - await promise; - assertEquals(relay.subs.size, 0); // cleanup -}); diff --git a/packages/ditto/storages/InternalRelay.ts b/packages/ditto/storages/InternalRelay.ts deleted file mode 100644 index 9ab942fb..00000000 --- a/packages/ditto/storages/InternalRelay.ts +++ /dev/null @@ -1,86 +0,0 @@ -// deno-lint-ignore-file require-await -import { - NIP50, - NostrEvent, - NostrFilter, - NostrRelayCLOSED, - NostrRelayEOSE, - NostrRelayEVENT, - NRelay, -} from '@nostrify/nostrify'; -import { Machina } from '@nostrify/nostrify/utils'; -import { matchFilter } from 'nostr-tools'; -import { Gauge } from 'prom-client'; - -import { DittoEvent } from '@/interfaces/DittoEvent.ts'; -import { purifyEvent } from '@/utils/purify.ts'; - -interface InternalRelayOpts { - gauge?: Gauge; -} - -/** - * PubSub event store for streaming events within the application. - * The pipeline should push events to it, then anything in the application can subscribe to it. - */ -export class InternalRelay implements NRelay { - readonly subs = new Map }>(); - - constructor(private opts: InternalRelayOpts = {}) {} - - async *req( - filters: NostrFilter[], - opts?: { signal?: AbortSignal }, - ): AsyncGenerator { - const id = crypto.randomUUID(); - const machina = new Machina(opts?.signal); - - yield ['EOSE', id]; - - this.subs.set(id, { filters, machina }); - this.opts.gauge?.set(this.subs.size); - - try { - for await (const event of machina) { - yield ['EVENT', id, event]; - } - } finally { - this.subs.delete(id); - this.opts.gauge?.set(this.subs.size); - } - } - - async event(event: DittoEvent): Promise { - for (const { filters, machina } of this.subs.values()) { - for (const filter of filters) { - if (matchFilter(filter, event)) { - if (filter.search) { - const tokens = NIP50.parseInput(filter.search); - - const domain = (tokens.find((t) => - typeof t === 'object' && t.key === 'domain' - ) as { key: 'domain'; value: string } | undefined)?.value; - - if (domain === event.author_stats?.nip05_hostname) { - machina.push(purifyEvent(event)); - break; - } - } else { - machina.push(purifyEvent(event)); - break; - } - } - } - } - - return Promise.resolve(); - } - - async query(): Promise { - return []; - } - - async close(): Promise { - return Promise.resolve(); - } -} diff --git a/packages/ditto/test.ts b/packages/ditto/test.ts index dcf428a6..c363963f 100644 --- a/packages/ditto/test.ts +++ b/packages/ditto/test.ts @@ -2,7 +2,7 @@ import { DittoDB } from '@ditto/db'; import { NostrEvent } from '@nostrify/nostrify'; import { Conf } from '@/config.ts'; -import { EventsDB } from '@/storages/EventsDB.ts'; +import { DittoPgStore } from '@/storages/DittoPgStore.ts'; import { sql } from 'kysely'; /** Import an event fixture by name in tests. */ @@ -13,31 +13,32 @@ export async function eventFixture(name: string): Promise { /** Create a database for testing. It uses `DATABASE_URL`, or creates an in-memory database by default. */ export async function createTestDB(opts?: { pure?: boolean }) { - const { kysely } = DittoDB.create(Conf.databaseUrl, { poolSize: 1 }); + const db = DittoDB.create(Conf.databaseUrl, { poolSize: 1 }); - await DittoDB.migrate(kysely); + await DittoDB.migrate(db.kysely); - const store = new EventsDB({ - kysely, + const store = new DittoPgStore({ + db, timeout: Conf.db.timeouts.default, pubkey: Conf.pubkey, pure: opts?.pure ?? false, + notify: true, }); return { + ...db, store, - kysely, [Symbol.asyncDispose]: async () => { const { rows } = await sql< { tablename: string } - >`select tablename from pg_tables where schemaname = current_schema()`.execute(kysely); + >`select tablename from pg_tables where schemaname = current_schema()`.execute(db.kysely); for (const { tablename } of rows) { if (tablename.startsWith('kysely_')) continue; - await sql`truncate table ${sql.ref(tablename)} cascade`.execute(kysely); + await sql`truncate table ${sql.ref(tablename)} cascade`.execute(db.kysely); } - await kysely.destroy(); + await db[Symbol.asyncDispose](); }, }; } diff --git a/packages/ditto/workers/policy.worker.ts b/packages/ditto/workers/policy.worker.ts index 85a98240..852c24b5 100644 --- a/packages/ditto/workers/policy.worker.ts +++ b/packages/ditto/workers/policy.worker.ts @@ -4,7 +4,7 @@ import { NostrEvent, NostrRelayOK, NPolicy } from '@nostrify/nostrify'; import { ReadOnlyPolicy } from '@nostrify/policies'; import * as Comlink from 'comlink'; -import { EventsDB } from '@/storages/EventsDB.ts'; +import { DittoPgStore } from '@/storages/DittoPgStore.ts'; // @ts-ignore Don't try to access the env from this worker. Deno.env = new Map(); @@ -15,7 +15,7 @@ interface PolicyInit { path: string; /** Database URL to connect to. */ databaseUrl: string; - /** Admin pubkey to use for EventsDB checks. */ + /** Admin pubkey to use for DittoPgStore checks. */ pubkey: string; } @@ -30,10 +30,10 @@ export class CustomPolicy implements NPolicy { async init({ path, databaseUrl, pubkey }: PolicyInit): Promise { const Policy = (await import(path)).default; - const { kysely } = DittoDB.create(databaseUrl, { poolSize: 1 }); + const db = DittoDB.create(databaseUrl, { poolSize: 1 }); - const store = new EventsDB({ - kysely, + const store = new DittoPgStore({ + db, pubkey, timeout: 5_000, }); diff --git a/scripts/db-populate-extensions.ts b/scripts/db-populate-extensions.ts index 2b40bd3d..0cb3a49b 100644 --- a/scripts/db-populate-extensions.ts +++ b/scripts/db-populate-extensions.ts @@ -1,7 +1,7 @@ import { NostrEvent } from '@nostrify/nostrify'; import { Storages } from '../packages/ditto/storages.ts'; -import { EventsDB } from '../packages/ditto/storages/EventsDB.ts'; +import { DittoPgStore } from '../packages/ditto/storages/DittoPgStore.ts'; const kysely = await Storages.kysely(); @@ -11,7 +11,7 @@ const query = kysely for await (const row of query.stream()) { const event: NostrEvent = { ...row, created_at: Number(row.created_at) }; - const ext = EventsDB.indexExtensions(event); + const ext = DittoPgStore.indexExtensions(event); try { await kysely