diff --git a/deno.json b/deno.json index 50f814fe..a3f06bd5 100644 --- a/deno.json +++ b/deno.json @@ -61,7 +61,7 @@ "@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1", "@negrel/webpush": "jsr:@negrel/webpush@^0.3.0", "@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0", - "@nostrify/db": "jsr:@nostrify/db@^0.39.2", + "@nostrify/db": "jsr:@nostrify/db@^0.39.3", "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.38.1", "@nostrify/policies": "jsr:@nostrify/policies@^0.36.1", "@nostrify/types": "jsr:@nostrify/types@^0.36.0", diff --git a/deno.lock b/deno.lock index b7475e3a..19c7aba4 100644 --- a/deno.lock +++ b/deno.lock @@ -31,7 +31,7 @@ "jsr:@hono/hono@^4.4.6": "4.6.15", "jsr:@negrel/http-ece@0.6.0": "0.6.0", "jsr:@negrel/webpush@0.3": "0.3.0", - "jsr:@nostrify/db@~0.39.2": "0.39.2", + "jsr:@nostrify/db@~0.39.3": "0.39.3", "jsr:@nostrify/nostrify@0.31": "0.31.0", "jsr:@nostrify/nostrify@0.32": "0.32.0", "jsr:@nostrify/nostrify@0.36": "0.36.2", @@ -363,8 +363,8 @@ "jsr:@std/path@0.224.0" ] }, - "@nostrify/db@0.39.2": { - "integrity": "65df8e636d172a62319060f77398f992541a674bcc0298d19608fdba639e0b13", + "@nostrify/db@0.39.3": { + "integrity": "d1f1104316b33e0fd3c263086b325ee49f86859abc1a966b43bb9f9a21c15429", "dependencies": [ "jsr:@nostrify/nostrify@~0.38.1", "jsr:@nostrify/types@0.36", @@ -2460,7 +2460,7 @@ "jsr:@gfx/canvas-wasm@~0.4.2", "jsr:@hono/hono@^4.4.6", "jsr:@negrel/webpush@0.3", - "jsr:@nostrify/db@~0.39.2", + "jsr:@nostrify/db@~0.39.3", "jsr:@nostrify/nostrify@~0.38.1", "jsr:@nostrify/policies@~0.36.1", "jsr:@nostrify/types@0.36", diff --git a/packages/ditto/storages/DittoPgStore.test.ts b/packages/ditto/storages/DittoPgStore.test.ts index e119a85f..756cd98b 100644 --- a/packages/ditto/storages/DittoPgStore.test.ts +++ b/packages/ditto/storages/DittoPgStore.test.ts @@ -1,4 +1,5 @@ import { assertEquals, assertRejects } from '@std/assert'; +import { NostrRelayMsg } from '@nostrify/nostrify'; import { genEvent } from '@nostrify/nostrify/test'; import { generateSecretKey } from 'nostr-tools'; @@ -12,19 +13,26 @@ Deno.test('req streaming', async () => { await using db = await createTestDB({ pure: true }); const { store: relay } = db; - const event1 = await eventFixture('event-1'); + const msgs: NostrRelayMsg[] = []; + const controller = new AbortController(); - const promise = new Promise((resolve) => setTimeout(() => resolve(relay.event(event1)), 0)); - - for await (const msg of relay.req([{ since: 0 }])) { - if (msg[0] === 'EVENT') { - assertEquals(relay.subs.size, 1); - assertEquals(msg[2], event1); - break; + const promise = (async () => { + for await (const msg of relay.req([{ since: 0 }], { signal: controller.signal })) { + msgs.push(msg); } - } + })(); + + const event = genEvent({ created_at: Math.floor(Date.now() / 1000) }); + await relay.event(event); + + controller.abort(); await promise; + + const verbs = msgs.map(([verb]) => verb); + + assertEquals(verbs, ['EOSE', 'EVENT', 'CLOSED']); + assertEquals(msgs[1][2], event); assertEquals(relay.subs.size, 0); // cleanup }); diff --git a/packages/ditto/storages/DittoPgStore.ts b/packages/ditto/storages/DittoPgStore.ts index 7bd22d00..000ef536 100644 --- a/packages/ditto/storages/DittoPgStore.ts +++ b/packages/ditto/storages/DittoPgStore.ts @@ -67,9 +67,15 @@ interface DittoPgStoreOpts { notify?: boolean; } +/** Realtime subscription. */ +interface Subscription { + filters: NostrFilter[]; + machina: Machina; +} + /** SQL database storage adapter for Nostr events. */ export class DittoPgStore extends NPostgres { - readonly subs = new Map }>(); + readonly subs = new Map(); readonly encounters = new LRUCache({ max: 1000 }); /** Conditions for when to index certain tags. */ @@ -170,12 +176,12 @@ export class DittoPgStore extends NPostgres { let count = 0; - for (const { filters, machina } of this.subs.values()) { + for (const [subId, { filters, machina }] of this.subs.entries()) { for (const filter of filters) { count++; if (this.matchesFilter(event, filter)) { - machina.push(event); + machina.push(['EVENT', subId, event]); break; } @@ -258,47 +264,60 @@ export class DittoPgStore extends NPostgres { filters: NostrFilter[], opts: { timeout?: number; signal?: AbortSignal } = {}, ): AsyncIterable { + const { db, chunkSize = 20 } = this.opts; + const { timeout = this.opts.timeout, signal } = opts; + const subId = crypto.randomUUID(); const normalFilters = this.normalizeFilters(filters); + const machina = new Machina(signal); if (normalFilters.length) { - const { db, timeout, chunkSize = 20 } = this.opts; + this.withTimeout(db.kysely as unknown as Kysely, timeout, async (trx) => { + const rows = this.getEventsQuery(trx, normalFilters).stream(chunkSize); - const rows = await this.withTimeout( - db.kysely as unknown as Kysely, - (trx) => this.getEventsQuery(trx, normalFilters).stream(chunkSize), - opts.timeout ?? timeout, - ); - - for await (const row of rows) { - const event = this.parseEventRow(row); - yield ['EVENT', subId, event]; - - if (opts.signal?.aborted) { - yield ['CLOSED', subId, 'error: the relay could not respond fast enough']; - return; + for await (const row of rows) { + const event = this.parseEventRow(row); + machina.push(['EVENT', subId, event]); } + + machina.push(['EOSE', subId]); + }).catch((error) => { + if (error instanceof Error && error.message.includes('timeout')) { + machina.push(['CLOSED', subId, 'error: the relay could not respond fast enough']); + } else { + machina.push(['CLOSED', subId, 'error: something went wrong']); + } + }); + + try { + for await (const msg of machina) { + const [verb] = msg; + + yield msg; + + if (verb === 'EOSE') { + break; + } + + if (verb === 'CLOSED') { + return; + } + } + } catch { + yield ['CLOSED', subId, 'error: the relay could not respond fast enough']; + return; } } - yield ['EOSE', subId]; - - if (opts.signal?.aborted) { - yield ['CLOSED', subId, 'error: the relay could not respond fast enough']; - return; - } - - const machina = new Machina(opts.signal); - this.subs.set(subId, { filters, machina }); internalSubscriptionsSizeGauge.set(this.subs.size); try { - for await (const event of machina) { - yield ['EVENT', subId, event]; + for await (const msg of machina) { + yield msg; } } catch (e) { - if (e instanceof Error && e.message.includes('timeout')) { + if (e instanceof Error && e.name === 'AbortError') { yield ['CLOSED', subId, 'error: the relay could not respond fast enough']; } else { yield ['CLOSED', subId, 'error: something went wrong']; diff --git a/packages/ditto/test.ts b/packages/ditto/test.ts index 38801093..84303d76 100644 --- a/packages/ditto/test.ts +++ b/packages/ditto/test.ts @@ -22,6 +22,7 @@ export async function createTestDB(opts?: { pure?: boolean }) { timeout: Conf.db.timeouts.default, pubkey: Conf.pubkey, pure: opts?.pure ?? false, + notify: true, }); return {