Compare commits

..

No commits in common. "789b6c7e932a760f717bb0855c96294d0e4f71cc" and "fd87bad2382fc5685191aa6c9954eb7def97ae16" have entirely different histories.

View file

@ -61,6 +61,8 @@ interface DittoPgStoreOpts {
pure?: boolean;
/** Chunk size for streaming events. Defaults to 20. */
chunkSize?: number;
/** Batch size for fulfilling subscriptions. Defaults to 500. */
batchSize?: number;
/** Max age (in **seconds**) an event can be to be fulfilled to realtime subscribers. */
maxAge?: number;
/** Whether to listen for events from the database with NOTIFY. */
@ -131,9 +133,6 @@ export class DittoPgStore extends NPostgres {
dbEventsCounter.inc({ kind: event.kind });
if (NKinds.ephemeral(event.kind)) {
if (this.encounters.has(event.id)) return;
this.encounters.set(event.id, true);
return await this.fulfill(event);
}
@ -191,7 +190,7 @@ export class DittoPgStore extends NPostgres {
/** Fulfill active subscriptions with this event. */
protected async fulfill(event: NostrEvent): Promise<void> {
const { maxAge = 60 } = this.opts;
const { maxAge = 60, batchSize = 500 } = this.opts;
const now = Math.floor(Date.now() / 1000);
const age = now - event.created_at;
@ -206,12 +205,21 @@ export class DittoPgStore extends NPostgres {
}
}
let count = 0;
for (const [subId, { filters, machina }] of this.subs.entries()) {
for (const filter of filters) {
count++;
if (this.matchesFilter(event, filter)) {
machina.push(['EVENT', subId, event]);
break;
}
// Yield to event loop.
if (count % batchSize === 0) {
await new Promise((resolve) => setTimeout(resolve, 0));
}
}
}
}