mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 03:19:46 +00:00
DittoPgStore: support timeout in req, add special treatment for ephemeral events, yield event loop when processing many subscriptions
This commit is contained in:
parent
31044691e1
commit
6568dca191
3 changed files with 70 additions and 23 deletions
|
|
@ -61,7 +61,7 @@
|
||||||
"@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1",
|
"@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1",
|
||||||
"@negrel/webpush": "jsr:@negrel/webpush@^0.3.0",
|
"@negrel/webpush": "jsr:@negrel/webpush@^0.3.0",
|
||||||
"@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0",
|
"@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0",
|
||||||
"@nostrify/db": "jsr:@nostrify/db@^0.39.0",
|
"@nostrify/db": "jsr:@nostrify/db@^0.39.2",
|
||||||
"@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.38.1",
|
"@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.38.1",
|
||||||
"@nostrify/policies": "jsr:@nostrify/policies@^0.36.1",
|
"@nostrify/policies": "jsr:@nostrify/policies@^0.36.1",
|
||||||
"@nostrify/types": "jsr:@nostrify/types@^0.36.0",
|
"@nostrify/types": "jsr:@nostrify/types@^0.36.0",
|
||||||
|
|
|
||||||
8
deno.lock
generated
8
deno.lock
generated
|
|
@ -31,7 +31,7 @@
|
||||||
"jsr:@hono/hono@^4.4.6": "4.6.15",
|
"jsr:@hono/hono@^4.4.6": "4.6.15",
|
||||||
"jsr:@negrel/http-ece@0.6.0": "0.6.0",
|
"jsr:@negrel/http-ece@0.6.0": "0.6.0",
|
||||||
"jsr:@negrel/webpush@0.3": "0.3.0",
|
"jsr:@negrel/webpush@0.3": "0.3.0",
|
||||||
"jsr:@nostrify/db@0.39": "0.39.0",
|
"jsr:@nostrify/db@~0.39.2": "0.39.2",
|
||||||
"jsr:@nostrify/nostrify@0.31": "0.31.0",
|
"jsr:@nostrify/nostrify@0.31": "0.31.0",
|
||||||
"jsr:@nostrify/nostrify@0.32": "0.32.0",
|
"jsr:@nostrify/nostrify@0.32": "0.32.0",
|
||||||
"jsr:@nostrify/nostrify@0.36": "0.36.2",
|
"jsr:@nostrify/nostrify@0.36": "0.36.2",
|
||||||
|
|
@ -363,8 +363,8 @@
|
||||||
"jsr:@std/path@0.224.0"
|
"jsr:@std/path@0.224.0"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"@nostrify/db@0.39.0": {
|
"@nostrify/db@0.39.2": {
|
||||||
"integrity": "13a88c610eb15a5dd13848d5beec9170406376c9d05299ce5e5298452a5431ac",
|
"integrity": "65df8e636d172a62319060f77398f992541a674bcc0298d19608fdba639e0b13",
|
||||||
"dependencies": [
|
"dependencies": [
|
||||||
"jsr:@nostrify/nostrify@~0.38.1",
|
"jsr:@nostrify/nostrify@~0.38.1",
|
||||||
"jsr:@nostrify/types@0.36",
|
"jsr:@nostrify/types@0.36",
|
||||||
|
|
@ -2460,7 +2460,7 @@
|
||||||
"jsr:@gfx/canvas-wasm@~0.4.2",
|
"jsr:@gfx/canvas-wasm@~0.4.2",
|
||||||
"jsr:@hono/hono@^4.4.6",
|
"jsr:@hono/hono@^4.4.6",
|
||||||
"jsr:@negrel/webpush@0.3",
|
"jsr:@negrel/webpush@0.3",
|
||||||
"jsr:@nostrify/db@0.39",
|
"jsr:@nostrify/db@~0.39.2",
|
||||||
"jsr:@nostrify/nostrify@~0.38.1",
|
"jsr:@nostrify/nostrify@~0.38.1",
|
||||||
"jsr:@nostrify/policies@~0.36.1",
|
"jsr:@nostrify/policies@~0.36.1",
|
||||||
"jsr:@nostrify/types@0.36",
|
"jsr:@nostrify/types@0.36",
|
||||||
|
|
|
||||||
|
|
@ -57,14 +57,18 @@ interface DittoPgStoreOpts {
|
||||||
timeout: number;
|
timeout: number;
|
||||||
/** Whether the event returned should be a Nostr event or a Ditto event. Defaults to false. */
|
/** Whether the event returned should be a Nostr event or a Ditto event. Defaults to false. */
|
||||||
pure?: boolean;
|
pure?: boolean;
|
||||||
/** Chunk size for streaming events. Defaults to 100. */
|
/** Chunk size for streaming events. Defaults to 20. */
|
||||||
chunkSize?: number;
|
chunkSize?: number;
|
||||||
|
/** Batch size for fulfilling subscriptions. Defaults to 500. */
|
||||||
|
batchSize?: number;
|
||||||
|
/** Max age (in **seconds**) an event can be to be fulfilled to realtime subscribers. */
|
||||||
|
maxAge?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** SQL database storage adapter for Nostr events. */
|
/** SQL database storage adapter for Nostr events. */
|
||||||
export class DittoPgStore extends NPostgres {
|
export class DittoPgStore extends NPostgres {
|
||||||
readonly subs = new Map<string, { filters: NostrFilter[]; machina: Machina<NostrEvent> }>();
|
readonly subs = new Map<string, { filters: NostrFilter[]; machina: Machina<NostrEvent> }>();
|
||||||
readonly encounters = new LRUCache<string, boolean>({ max: 100 });
|
readonly encounters = new LRUCache<string, boolean>({ max: 1000 });
|
||||||
|
|
||||||
/** Conditions for when to index certain tags. */
|
/** Conditions for when to index certain tags. */
|
||||||
static tagConditions: Record<string, TagCondition> = {
|
static tagConditions: Record<string, TagCondition> = {
|
||||||
|
|
@ -103,7 +107,7 @@ export class DittoPgStore extends NPostgres {
|
||||||
const [event] = await this.query([{ ids: [id] }]);
|
const [event] = await this.query([{ ids: [id] }]);
|
||||||
|
|
||||||
if (event) {
|
if (event) {
|
||||||
this.streamOut(event);
|
await this.fulfill(event);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
@ -117,6 +121,10 @@ export class DittoPgStore extends NPostgres {
|
||||||
this.encounters.set(event.id, true);
|
this.encounters.set(event.id, true);
|
||||||
dbEventsCounter.inc({ kind: event.kind });
|
dbEventsCounter.inc({ kind: event.kind });
|
||||||
|
|
||||||
|
if (NKinds.ephemeral(event.kind)) {
|
||||||
|
return await this.fulfill(event);
|
||||||
|
}
|
||||||
|
|
||||||
if (await this.isDeletedAdmin(event)) {
|
if (await this.isDeletedAdmin(event)) {
|
||||||
throw new RelayError('blocked', 'event deleted by admin');
|
throw new RelayError('blocked', 'event deleted by admin');
|
||||||
}
|
}
|
||||||
|
|
@ -125,7 +133,7 @@ export class DittoPgStore extends NPostgres {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await super.event(event, { ...opts, timeout: opts.timeout ?? this.opts.timeout });
|
await super.event(event, { ...opts, timeout: opts.timeout ?? this.opts.timeout });
|
||||||
this.streamOut(event);
|
this.fulfill(event); // don't await or catch (should never reject)
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
if (e instanceof Error && e.message === 'Cannot add a deleted event') {
|
if (e instanceof Error && e.message === 'Cannot add a deleted event') {
|
||||||
throw new RelayError('blocked', 'event deleted by user');
|
throw new RelayError('blocked', 'event deleted by user');
|
||||||
|
|
@ -137,21 +145,48 @@ export class DittoPgStore extends NPostgres {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected matchesFilter(event: NostrEvent, filter: NostrFilter): boolean {
|
/** Fulfill active subscriptions with this event. */
|
||||||
// TODO: support streaming by search.
|
protected async fulfill(event: NostrEvent): Promise<void> {
|
||||||
return matchFilter(filter, event) && filter.search === undefined;
|
const { maxAge = 60, batchSize = 500 } = this.opts;
|
||||||
}
|
|
||||||
|
const now = Math.floor(Date.now() / 1000);
|
||||||
|
const age = now - event.created_at;
|
||||||
|
|
||||||
|
if (age > maxAge) {
|
||||||
|
// Ephemeral events must be fulfilled, or else return an error to the client.
|
||||||
|
if (NKinds.ephemeral(event.kind)) {
|
||||||
|
throw new RelayError('invalid', 'event too old');
|
||||||
|
} else {
|
||||||
|
// Silently ignore old events.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let count = 0;
|
||||||
|
|
||||||
protected streamOut(event: NostrEvent): void {
|
|
||||||
for (const { filters, machina } of this.subs.values()) {
|
for (const { filters, machina } of this.subs.values()) {
|
||||||
for (const filter of filters) {
|
for (const filter of filters) {
|
||||||
|
count++;
|
||||||
|
|
||||||
if (this.matchesFilter(event, filter)) {
|
if (this.matchesFilter(event, filter)) {
|
||||||
machina.push(event);
|
machina.push(event);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Yield to event loop.
|
||||||
|
if (count % batchSize === 0) {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Check if the event fulfills the filter, according to Ditto criteria. */
|
||||||
|
protected matchesFilter(event: NostrEvent, filter: NostrFilter): boolean {
|
||||||
|
// TODO: support streaming by search.
|
||||||
|
return typeof filter.search !== 'string' && matchFilter(filter, event);
|
||||||
|
}
|
||||||
|
|
||||||
/** Check if an event has been deleted by the admin. */
|
/** Check if an event has been deleted by the admin. */
|
||||||
private async isDeletedAdmin(event: NostrEvent): Promise<boolean> {
|
private async isDeletedAdmin(event: NostrEvent): Promise<boolean> {
|
||||||
const filters: NostrFilter[] = [
|
const filters: NostrFilter[] = [
|
||||||
|
|
@ -215,23 +250,26 @@ export class DittoPgStore extends NPostgres {
|
||||||
|
|
||||||
override async *req(
|
override async *req(
|
||||||
filters: NostrFilter[],
|
filters: NostrFilter[],
|
||||||
opts?: { signal?: AbortSignal },
|
opts: { timeout?: number; signal?: AbortSignal } = {},
|
||||||
): AsyncIterable<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
|
): AsyncIterable<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
|
||||||
const subId = crypto.randomUUID();
|
const subId = crypto.randomUUID();
|
||||||
const normalFilters = this.normalizeFilters(filters);
|
const normalFilters = this.normalizeFilters(filters);
|
||||||
|
|
||||||
if (normalFilters.length) {
|
if (normalFilters.length) {
|
||||||
const { db, chunkSize = 100 } = this.opts;
|
const { db, timeout, chunkSize = 20 } = this.opts;
|
||||||
const rows = this.getEventsQuery(db.kysely as unknown as Kysely<NPostgresSchema>, normalFilters).stream(
|
|
||||||
chunkSize,
|
const rows = await this.withTimeout(
|
||||||
|
db.kysely as unknown as Kysely<NPostgresSchema>,
|
||||||
|
(trx) => this.getEventsQuery(trx, normalFilters).stream(chunkSize),
|
||||||
|
opts.timeout ?? timeout,
|
||||||
);
|
);
|
||||||
|
|
||||||
for await (const row of rows) {
|
for await (const row of rows) {
|
||||||
const event = this.parseEventRow(row);
|
const event = this.parseEventRow(row);
|
||||||
yield ['EVENT', subId, event];
|
yield ['EVENT', subId, event];
|
||||||
|
|
||||||
if (opts?.signal?.aborted) {
|
if (opts.signal?.aborted) {
|
||||||
yield ['CLOSED', subId, 'aborted'];
|
yield ['CLOSED', subId, 'error: the relay could not respond fast enough'];
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -239,7 +277,12 @@ export class DittoPgStore extends NPostgres {
|
||||||
|
|
||||||
yield ['EOSE', subId];
|
yield ['EOSE', subId];
|
||||||
|
|
||||||
const machina = new Machina<DittoEvent>(opts?.signal);
|
if (opts.signal?.aborted) {
|
||||||
|
yield ['CLOSED', subId, 'error: the relay could not respond fast enough'];
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const machina = new Machina<DittoEvent>(opts.signal);
|
||||||
|
|
||||||
this.subs.set(subId, { filters, machina });
|
this.subs.set(subId, { filters, machina });
|
||||||
internalSubscriptionsSizeGauge.set(this.subs.size);
|
internalSubscriptionsSizeGauge.set(this.subs.size);
|
||||||
|
|
@ -248,8 +291,12 @@ export class DittoPgStore extends NPostgres {
|
||||||
for await (const event of machina) {
|
for await (const event of machina) {
|
||||||
yield ['EVENT', subId, event];
|
yield ['EVENT', subId, event];
|
||||||
}
|
}
|
||||||
} catch {
|
} catch (e) {
|
||||||
yield ['CLOSED', subId, 'error: something went wrong'];
|
if (e instanceof Error && e.message.includes('timeout')) {
|
||||||
|
yield ['CLOSED', subId, 'error: the relay could not respond fast enough'];
|
||||||
|
} else {
|
||||||
|
yield ['CLOSED', subId, 'error: something went wrong'];
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
this.subs.delete(subId);
|
this.subs.delete(subId);
|
||||||
internalSubscriptionsSizeGauge.set(this.subs.size);
|
internalSubscriptionsSizeGauge.set(this.subs.size);
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue