DittoPgStore: rework realtime streaming so it actually works

This commit is contained in:
Alex Gleason 2025-02-19 20:17:53 -06:00
parent bc0830785a
commit f87f19d06c
No known key found for this signature in database
GPG key ID: 7211D1F99744FBB7
5 changed files with 71 additions and 43 deletions

View file

@ -61,7 +61,7 @@
"@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1", "@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1",
"@negrel/webpush": "jsr:@negrel/webpush@^0.3.0", "@negrel/webpush": "jsr:@negrel/webpush@^0.3.0",
"@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0", "@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0",
"@nostrify/db": "jsr:@nostrify/db@^0.39.2", "@nostrify/db": "jsr:@nostrify/db@^0.39.3",
"@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.38.1", "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.38.1",
"@nostrify/policies": "jsr:@nostrify/policies@^0.36.1", "@nostrify/policies": "jsr:@nostrify/policies@^0.36.1",
"@nostrify/types": "jsr:@nostrify/types@^0.36.0", "@nostrify/types": "jsr:@nostrify/types@^0.36.0",

8
deno.lock generated
View file

@ -31,7 +31,7 @@
"jsr:@hono/hono@^4.4.6": "4.6.15", "jsr:@hono/hono@^4.4.6": "4.6.15",
"jsr:@negrel/http-ece@0.6.0": "0.6.0", "jsr:@negrel/http-ece@0.6.0": "0.6.0",
"jsr:@negrel/webpush@0.3": "0.3.0", "jsr:@negrel/webpush@0.3": "0.3.0",
"jsr:@nostrify/db@~0.39.2": "0.39.2", "jsr:@nostrify/db@~0.39.3": "0.39.3",
"jsr:@nostrify/nostrify@0.31": "0.31.0", "jsr:@nostrify/nostrify@0.31": "0.31.0",
"jsr:@nostrify/nostrify@0.32": "0.32.0", "jsr:@nostrify/nostrify@0.32": "0.32.0",
"jsr:@nostrify/nostrify@0.36": "0.36.2", "jsr:@nostrify/nostrify@0.36": "0.36.2",
@ -363,8 +363,8 @@
"jsr:@std/path@0.224.0" "jsr:@std/path@0.224.0"
] ]
}, },
"@nostrify/db@0.39.2": { "@nostrify/db@0.39.3": {
"integrity": "65df8e636d172a62319060f77398f992541a674bcc0298d19608fdba639e0b13", "integrity": "d1f1104316b33e0fd3c263086b325ee49f86859abc1a966b43bb9f9a21c15429",
"dependencies": [ "dependencies": [
"jsr:@nostrify/nostrify@~0.38.1", "jsr:@nostrify/nostrify@~0.38.1",
"jsr:@nostrify/types@0.36", "jsr:@nostrify/types@0.36",
@ -2460,7 +2460,7 @@
"jsr:@gfx/canvas-wasm@~0.4.2", "jsr:@gfx/canvas-wasm@~0.4.2",
"jsr:@hono/hono@^4.4.6", "jsr:@hono/hono@^4.4.6",
"jsr:@negrel/webpush@0.3", "jsr:@negrel/webpush@0.3",
"jsr:@nostrify/db@~0.39.2", "jsr:@nostrify/db@~0.39.3",
"jsr:@nostrify/nostrify@~0.38.1", "jsr:@nostrify/nostrify@~0.38.1",
"jsr:@nostrify/policies@~0.36.1", "jsr:@nostrify/policies@~0.36.1",
"jsr:@nostrify/types@0.36", "jsr:@nostrify/types@0.36",

View file

@ -1,4 +1,5 @@
import { assertEquals, assertRejects } from '@std/assert'; import { assertEquals, assertRejects } from '@std/assert';
import { NostrRelayMsg } from '@nostrify/nostrify';
import { genEvent } from '@nostrify/nostrify/test'; import { genEvent } from '@nostrify/nostrify/test';
import { generateSecretKey } from 'nostr-tools'; import { generateSecretKey } from 'nostr-tools';
@ -12,19 +13,26 @@ Deno.test('req streaming', async () => {
await using db = await createTestDB({ pure: true }); await using db = await createTestDB({ pure: true });
const { store: relay } = db; const { store: relay } = db;
const event1 = await eventFixture('event-1'); const msgs: NostrRelayMsg[] = [];
const controller = new AbortController();
const promise = new Promise((resolve) => setTimeout(() => resolve(relay.event(event1)), 0)); const promise = (async () => {
for await (const msg of relay.req([{ since: 0 }], { signal: controller.signal })) {
msgs.push(msg);
}
})();
for await (const msg of relay.req([{ since: 0 }])) { const event = genEvent({ created_at: Math.floor(Date.now() / 1000) });
if (msg[0] === 'EVENT') { await relay.event(event);
assertEquals(relay.subs.size, 1);
assertEquals(msg[2], event1); controller.abort();
break;
}
}
await promise; await promise;
const verbs = msgs.map(([verb]) => verb);
assertEquals(verbs, ['EOSE', 'EVENT', 'CLOSED']);
assertEquals(msgs[1][2], event);
assertEquals(relay.subs.size, 0); // cleanup assertEquals(relay.subs.size, 0); // cleanup
}); });

View file

@ -67,9 +67,15 @@ interface DittoPgStoreOpts {
notify?: boolean; notify?: boolean;
} }
/** Realtime subscription. */
interface Subscription {
filters: NostrFilter[];
machina: Machina<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED>;
}
/** SQL database storage adapter for Nostr events. */ /** SQL database storage adapter for Nostr events. */
export class DittoPgStore extends NPostgres { export class DittoPgStore extends NPostgres {
readonly subs = new Map<string, { filters: NostrFilter[]; machina: Machina<NostrEvent> }>(); readonly subs = new Map<string, Subscription>();
readonly encounters = new LRUCache<string, boolean>({ max: 1000 }); readonly encounters = new LRUCache<string, boolean>({ max: 1000 });
/** Conditions for when to index certain tags. */ /** Conditions for when to index certain tags. */
@ -170,12 +176,12 @@ export class DittoPgStore extends NPostgres {
let count = 0; let count = 0;
for (const { filters, machina } of this.subs.values()) { for (const [subId, { filters, machina }] of this.subs.entries()) {
for (const filter of filters) { for (const filter of filters) {
count++; count++;
if (this.matchesFilter(event, filter)) { if (this.matchesFilter(event, filter)) {
machina.push(event); machina.push(['EVENT', subId, event]);
break; break;
} }
@ -258,47 +264,60 @@ export class DittoPgStore extends NPostgres {
filters: NostrFilter[], filters: NostrFilter[],
opts: { timeout?: number; signal?: AbortSignal } = {}, opts: { timeout?: number; signal?: AbortSignal } = {},
): AsyncIterable<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> { ): AsyncIterable<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
const { db, chunkSize = 20 } = this.opts;
const { timeout = this.opts.timeout, signal } = opts;
const subId = crypto.randomUUID(); const subId = crypto.randomUUID();
const normalFilters = this.normalizeFilters(filters); const normalFilters = this.normalizeFilters(filters);
const machina = new Machina<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED>(signal);
if (normalFilters.length) { if (normalFilters.length) {
const { db, timeout, chunkSize = 20 } = this.opts; this.withTimeout(db.kysely as unknown as Kysely<NPostgresSchema>, timeout, async (trx) => {
const rows = this.getEventsQuery(trx, normalFilters).stream(chunkSize);
const rows = await this.withTimeout(
db.kysely as unknown as Kysely<NPostgresSchema>,
(trx) => this.getEventsQuery(trx, normalFilters).stream(chunkSize),
opts.timeout ?? timeout,
);
for await (const row of rows) { for await (const row of rows) {
const event = this.parseEventRow(row); const event = this.parseEventRow(row);
yield ['EVENT', subId, event]; machina.push(['EVENT', subId, event]);
}
if (opts.signal?.aborted) { machina.push(['EOSE', subId]);
}).catch((error) => {
if (error instanceof Error && error.message.includes('timeout')) {
machina.push(['CLOSED', subId, 'error: the relay could not respond fast enough']);
} else {
machina.push(['CLOSED', subId, 'error: something went wrong']);
}
});
try {
for await (const msg of machina) {
const [verb] = msg;
yield msg;
if (verb === 'EOSE') {
break;
}
if (verb === 'CLOSED') {
return;
}
}
} catch {
yield ['CLOSED', subId, 'error: the relay could not respond fast enough']; yield ['CLOSED', subId, 'error: the relay could not respond fast enough'];
return; return;
} }
} }
}
yield ['EOSE', subId];
if (opts.signal?.aborted) {
yield ['CLOSED', subId, 'error: the relay could not respond fast enough'];
return;
}
const machina = new Machina<DittoEvent>(opts.signal);
this.subs.set(subId, { filters, machina }); this.subs.set(subId, { filters, machina });
internalSubscriptionsSizeGauge.set(this.subs.size); internalSubscriptionsSizeGauge.set(this.subs.size);
try { try {
for await (const event of machina) { for await (const msg of machina) {
yield ['EVENT', subId, event]; yield msg;
} }
} catch (e) { } catch (e) {
if (e instanceof Error && e.message.includes('timeout')) { if (e instanceof Error && e.name === 'AbortError') {
yield ['CLOSED', subId, 'error: the relay could not respond fast enough']; yield ['CLOSED', subId, 'error: the relay could not respond fast enough'];
} else { } else {
yield ['CLOSED', subId, 'error: something went wrong']; yield ['CLOSED', subId, 'error: something went wrong'];

View file

@ -22,6 +22,7 @@ export async function createTestDB(opts?: { pure?: boolean }) {
timeout: Conf.db.timeouts.default, timeout: Conf.db.timeouts.default,
pubkey: Conf.pubkey, pubkey: Conf.pubkey,
pure: opts?.pure ?? false, pure: opts?.pure ?? false,
notify: true,
}); });
return { return {