From 6568dca19109148d09d2444f78f8bdef6f1db0bc Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 19 Feb 2025 15:55:09 -0600 Subject: [PATCH] DittoPgStore: support timeout in req, add special treatment for ephemeral events, yield event loop when processing many subscriptions --- deno.json | 2 +- deno.lock | 8 +-- packages/ditto/storages/DittoPgStore.ts | 83 +++++++++++++++++++------ 3 files changed, 70 insertions(+), 23 deletions(-) diff --git a/deno.json b/deno.json index 2601e28d..50f814fe 100644 --- a/deno.json +++ b/deno.json @@ -61,7 +61,7 @@ "@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1", "@negrel/webpush": "jsr:@negrel/webpush@^0.3.0", "@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0", - "@nostrify/db": "jsr:@nostrify/db@^0.39.0", + "@nostrify/db": "jsr:@nostrify/db@^0.39.2", "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.38.1", "@nostrify/policies": "jsr:@nostrify/policies@^0.36.1", "@nostrify/types": "jsr:@nostrify/types@^0.36.0", diff --git a/deno.lock b/deno.lock index b46ce6da..b7475e3a 100644 --- a/deno.lock +++ b/deno.lock @@ -31,7 +31,7 @@ "jsr:@hono/hono@^4.4.6": "4.6.15", "jsr:@negrel/http-ece@0.6.0": "0.6.0", "jsr:@negrel/webpush@0.3": "0.3.0", - "jsr:@nostrify/db@0.39": "0.39.0", + "jsr:@nostrify/db@~0.39.2": "0.39.2", "jsr:@nostrify/nostrify@0.31": "0.31.0", "jsr:@nostrify/nostrify@0.32": "0.32.0", "jsr:@nostrify/nostrify@0.36": "0.36.2", @@ -363,8 +363,8 @@ "jsr:@std/path@0.224.0" ] }, - "@nostrify/db@0.39.0": { - "integrity": "13a88c610eb15a5dd13848d5beec9170406376c9d05299ce5e5298452a5431ac", + "@nostrify/db@0.39.2": { + "integrity": "65df8e636d172a62319060f77398f992541a674bcc0298d19608fdba639e0b13", "dependencies": [ "jsr:@nostrify/nostrify@~0.38.1", "jsr:@nostrify/types@0.36", @@ -2460,7 +2460,7 @@ "jsr:@gfx/canvas-wasm@~0.4.2", "jsr:@hono/hono@^4.4.6", "jsr:@negrel/webpush@0.3", - "jsr:@nostrify/db@0.39", + "jsr:@nostrify/db@~0.39.2", "jsr:@nostrify/nostrify@~0.38.1", "jsr:@nostrify/policies@~0.36.1", "jsr:@nostrify/types@0.36", diff --git a/packages/ditto/storages/DittoPgStore.ts b/packages/ditto/storages/DittoPgStore.ts index 4c966c8a..22671185 100644 --- a/packages/ditto/storages/DittoPgStore.ts +++ b/packages/ditto/storages/DittoPgStore.ts @@ -57,14 +57,18 @@ interface DittoPgStoreOpts { timeout: number; /** Whether the event returned should be a Nostr event or a Ditto event. Defaults to false. */ pure?: boolean; - /** Chunk size for streaming events. Defaults to 100. */ + /** Chunk size for streaming events. Defaults to 20. */ chunkSize?: number; + /** Batch size for fulfilling subscriptions. Defaults to 500. */ + batchSize?: number; + /** Max age (in **seconds**) an event can be to be fulfilled to realtime subscribers. */ + maxAge?: number; } /** SQL database storage adapter for Nostr events. */ export class DittoPgStore extends NPostgres { readonly subs = new Map }>(); - readonly encounters = new LRUCache({ max: 100 }); + readonly encounters = new LRUCache({ max: 1000 }); /** Conditions for when to index certain tags. */ static tagConditions: Record = { @@ -103,7 +107,7 @@ export class DittoPgStore extends NPostgres { const [event] = await this.query([{ ids: [id] }]); if (event) { - this.streamOut(event); + await this.fulfill(event); } }); } @@ -117,6 +121,10 @@ export class DittoPgStore extends NPostgres { this.encounters.set(event.id, true); dbEventsCounter.inc({ kind: event.kind }); + if (NKinds.ephemeral(event.kind)) { + return await this.fulfill(event); + } + if (await this.isDeletedAdmin(event)) { throw new RelayError('blocked', 'event deleted by admin'); } @@ -125,7 +133,7 @@ export class DittoPgStore extends NPostgres { try { await super.event(event, { ...opts, timeout: opts.timeout ?? this.opts.timeout }); - this.streamOut(event); + this.fulfill(event); // don't await or catch (should never reject) } catch (e) { if (e instanceof Error && e.message === 'Cannot add a deleted event') { throw new RelayError('blocked', 'event deleted by user'); @@ -137,21 +145,48 @@ export class DittoPgStore extends NPostgres { } } - protected matchesFilter(event: NostrEvent, filter: NostrFilter): boolean { - // TODO: support streaming by search. - return matchFilter(filter, event) && filter.search === undefined; - } + /** Fulfill active subscriptions with this event. */ + protected async fulfill(event: NostrEvent): Promise { + const { maxAge = 60, batchSize = 500 } = this.opts; + + const now = Math.floor(Date.now() / 1000); + const age = now - event.created_at; + + if (age > maxAge) { + // Ephemeral events must be fulfilled, or else return an error to the client. + if (NKinds.ephemeral(event.kind)) { + throw new RelayError('invalid', 'event too old'); + } else { + // Silently ignore old events. + return; + } + } + + let count = 0; - protected streamOut(event: NostrEvent): void { for (const { filters, machina } of this.subs.values()) { for (const filter of filters) { + count++; + if (this.matchesFilter(event, filter)) { machina.push(event); + break; + } + + // Yield to event loop. + if (count % batchSize === 0) { + await new Promise((resolve) => setTimeout(resolve, 0)); } } } } + /** Check if the event fulfills the filter, according to Ditto criteria. */ + protected matchesFilter(event: NostrEvent, filter: NostrFilter): boolean { + // TODO: support streaming by search. + return typeof filter.search !== 'string' && matchFilter(filter, event); + } + /** Check if an event has been deleted by the admin. */ private async isDeletedAdmin(event: NostrEvent): Promise { const filters: NostrFilter[] = [ @@ -215,23 +250,26 @@ export class DittoPgStore extends NPostgres { override async *req( filters: NostrFilter[], - opts?: { signal?: AbortSignal }, + opts: { timeout?: number; signal?: AbortSignal } = {}, ): AsyncIterable { const subId = crypto.randomUUID(); const normalFilters = this.normalizeFilters(filters); if (normalFilters.length) { - const { db, chunkSize = 100 } = this.opts; - const rows = this.getEventsQuery(db.kysely as unknown as Kysely, normalFilters).stream( - chunkSize, + const { db, timeout, chunkSize = 20 } = this.opts; + + const rows = await this.withTimeout( + db.kysely as unknown as Kysely, + (trx) => this.getEventsQuery(trx, normalFilters).stream(chunkSize), + opts.timeout ?? timeout, ); for await (const row of rows) { const event = this.parseEventRow(row); yield ['EVENT', subId, event]; - if (opts?.signal?.aborted) { - yield ['CLOSED', subId, 'aborted']; + if (opts.signal?.aborted) { + yield ['CLOSED', subId, 'error: the relay could not respond fast enough']; return; } } @@ -239,7 +277,12 @@ export class DittoPgStore extends NPostgres { yield ['EOSE', subId]; - const machina = new Machina(opts?.signal); + if (opts.signal?.aborted) { + yield ['CLOSED', subId, 'error: the relay could not respond fast enough']; + return; + } + + const machina = new Machina(opts.signal); this.subs.set(subId, { filters, machina }); internalSubscriptionsSizeGauge.set(this.subs.size); @@ -248,8 +291,12 @@ export class DittoPgStore extends NPostgres { for await (const event of machina) { yield ['EVENT', subId, event]; } - } catch { - yield ['CLOSED', subId, 'error: something went wrong']; + } catch (e) { + if (e instanceof Error && e.message.includes('timeout')) { + yield ['CLOSED', subId, 'error: the relay could not respond fast enough']; + } else { + yield ['CLOSED', subId, 'error: something went wrong']; + } } finally { this.subs.delete(subId); internalSubscriptionsSizeGauge.set(this.subs.size);