Merge branch 'firehose-concurrency' into 'main'

Limit firehose concurrency

See merge request soapbox-pub/ditto!440
This commit is contained in:
Alex Gleason 2024-08-01 17:07:08 +00:00
commit 975294b8a4
4 changed files with 22 additions and 5 deletions

View file

@ -26,6 +26,7 @@
"@db/sqlite": "jsr:@db/sqlite@^0.11.1", "@db/sqlite": "jsr:@db/sqlite@^0.11.1",
"@hono/hono": "jsr:@hono/hono@^4.4.6", "@hono/hono": "jsr:@hono/hono@^4.4.6",
"@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1", "@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1",
"@lambdalisue/async": "jsr:@lambdalisue/async@^2.1.1",
"@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0", "@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0",
"@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.28.0", "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.28.0",
"@scure/base": "npm:@scure/base@^1.1.6", "@scure/base": "npm:@scure/base@^1.1.6",
@ -42,8 +43,8 @@
"@std/media-types": "jsr:@std/media-types@^0.224.1", "@std/media-types": "jsr:@std/media-types@^0.224.1",
"@std/streams": "jsr:@std/streams@^0.223.0", "@std/streams": "jsr:@std/streams@^0.223.0",
"comlink": "npm:comlink@^4.4.1", "comlink": "npm:comlink@^4.4.1",
"deno.json": "./deno.json",
"deno-safe-fetch/load": "https://gitlab.com/soapbox-pub/deno-safe-fetch/-/raw/v1.0.0/load.ts", "deno-safe-fetch/load": "https://gitlab.com/soapbox-pub/deno-safe-fetch/-/raw/v1.0.0/load.ts",
"deno.json": "./deno.json",
"entities": "npm:entities@^4.5.0", "entities": "npm:entities@^4.5.0",
"fast-stable-stringify": "npm:fast-stable-stringify@^1.0.0", "fast-stable-stringify": "npm:fast-stable-stringify@^1.0.0",
"formdata-helper": "npm:formdata-helper@^0.3.0", "formdata-helper": "npm:formdata-helper@^0.3.0",
@ -51,7 +52,6 @@
"iso-639-1": "npm:iso-639-1@2.1.15", "iso-639-1": "npm:iso-639-1@2.1.15",
"isomorphic-dompurify": "npm:isomorphic-dompurify@^2.11.0", "isomorphic-dompurify": "npm:isomorphic-dompurify@^2.11.0",
"kysely": "npm:kysely@^0.27.4", "kysely": "npm:kysely@^0.27.4",
"postgres": "https://raw.githubusercontent.com/xyzshantaram/postgres.js/8a9bbce88b3f6425ecaacd99a80372338b157a53/deno/mod.js",
"kysely-postgres-js": "npm:kysely-postgres-js@2.0.0", "kysely-postgres-js": "npm:kysely-postgres-js@2.0.0",
"light-bolt11-decoder": "npm:light-bolt11-decoder", "light-bolt11-decoder": "npm:light-bolt11-decoder",
"linkify-plugin-hashtag": "npm:linkify-plugin-hashtag@^4.1.1", "linkify-plugin-hashtag": "npm:linkify-plugin-hashtag@^4.1.1",
@ -61,6 +61,7 @@
"nostr-relaypool": "npm:nostr-relaypool2@0.6.34", "nostr-relaypool": "npm:nostr-relaypool2@0.6.34",
"nostr-tools": "npm:nostr-tools@2.5.1", "nostr-tools": "npm:nostr-tools@2.5.1",
"nostr-wasm": "npm:nostr-wasm@^0.1.0", "nostr-wasm": "npm:nostr-wasm@^0.1.0",
"postgres": "https://raw.githubusercontent.com/xyzshantaram/postgres.js/8a9bbce88b3f6425ecaacd99a80372338b157a53/deno/mod.js",
"prom-client": "npm:prom-client@^15.1.2", "prom-client": "npm:prom-client@^15.1.2",
"question-deno": "https://raw.githubusercontent.com/ocpu/question-deno/10022b8e52555335aa510adb08b0a300df3cf904/mod.ts", "question-deno": "https://raw.githubusercontent.com/ocpu/question-deno/10022b8e52555335aa510adb08b0a300df3cf904/mod.ts",
"tldts": "npm:tldts@^6.0.14", "tldts": "npm:tldts@^6.0.14",

5
deno.lock generated
View file

@ -10,6 +10,7 @@
"jsr:@gleasonator/policy@0.4.0": "jsr:@gleasonator/policy@0.4.0", "jsr:@gleasonator/policy@0.4.0": "jsr:@gleasonator/policy@0.4.0",
"jsr:@gleasonator/policy@0.4.1": "jsr:@gleasonator/policy@0.4.1", "jsr:@gleasonator/policy@0.4.1": "jsr:@gleasonator/policy@0.4.1",
"jsr:@hono/hono@^4.4.6": "jsr:@hono/hono@4.5.3", "jsr:@hono/hono@^4.4.6": "jsr:@hono/hono@4.5.3",
"jsr:@lambdalisue/async@^2.1.1": "jsr:@lambdalisue/async@2.1.1",
"jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.5", "jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.5",
"jsr:@nostrify/nostrify@^0.22.4": "jsr:@nostrify/nostrify@0.22.4", "jsr:@nostrify/nostrify@^0.22.4": "jsr:@nostrify/nostrify@0.22.4",
"jsr:@nostrify/nostrify@^0.22.5": "jsr:@nostrify/nostrify@0.22.5", "jsr:@nostrify/nostrify@^0.22.5": "jsr:@nostrify/nostrify@0.22.5",
@ -126,6 +127,9 @@
"@hono/hono@4.5.3": { "@hono/hono@4.5.3": {
"integrity": "429923b2b3c6586a1450862328d61a1346fee5841e8ae86c494250475057213c" "integrity": "429923b2b3c6586a1450862328d61a1346fee5841e8ae86c494250475057213c"
}, },
"@lambdalisue/async@2.1.1": {
"integrity": "1fc9bc6f4ed50215cd2f7217842b18cea80f81c25744f88f8c5eb4be5a1c9ab4"
},
"@nostrify/nostrify@0.22.4": { "@nostrify/nostrify@0.22.4": {
"integrity": "1c8a7847e5773213044b491e85fd7cafae2ad194ce59da4d957d2b27c776b42d", "integrity": "1c8a7847e5773213044b491e85fd7cafae2ad194ce59da4d957d2b27c776b42d",
"dependencies": [ "dependencies": [
@ -1780,6 +1784,7 @@
"jsr:@bradenmacdonald/s3-lite-client@^0.7.4", "jsr:@bradenmacdonald/s3-lite-client@^0.7.4",
"jsr:@db/sqlite@^0.11.1", "jsr:@db/sqlite@^0.11.1",
"jsr:@hono/hono@^4.4.6", "jsr:@hono/hono@^4.4.6",
"jsr:@lambdalisue/async@^2.1.1",
"jsr:@nostrify/nostrify@^0.28.0", "jsr:@nostrify/nostrify@^0.28.0",
"jsr:@soapbox/kysely-deno-sqlite@^2.1.0", "jsr:@soapbox/kysely-deno-sqlite@^2.1.0",
"jsr:@soapbox/stickynotes@^0.4.0", "jsr:@soapbox/stickynotes@^0.4.0",

View file

@ -242,6 +242,10 @@ class Conf {
static get firehoseEnabled(): boolean { static get firehoseEnabled(): boolean {
return optionalBooleanSchema.parse(Deno.env.get('FIREHOSE_ENABLED')) ?? true; return optionalBooleanSchema.parse(Deno.env.get('FIREHOSE_ENABLED')) ?? true;
} }
/** Number of events the firehose is allowed to process at one time before they have to wait in a queue. */
static get firehoseConcurrency(): number {
return Math.ceil(Number(Deno.env.get('FIREHOSE_CONCURRENCY') ?? (Conf.pg.poolSize * 0.25)));
}
/** Whether to enable Ditto cron jobs. */ /** Whether to enable Ditto cron jobs. */
static get cronEnabled(): boolean { static get cronEnabled(): boolean {
return optionalBooleanSchema.parse(Deno.env.get('CRON_ENABLED')) ?? true; return optionalBooleanSchema.parse(Deno.env.get('CRON_ENABLED')) ?? true;

View file

@ -1,5 +1,7 @@
import { Semaphore } from '@lambdalisue/async';
import { Stickynotes } from '@soapbox/stickynotes'; import { Stickynotes } from '@soapbox/stickynotes';
import { Conf } from '@/config.ts';
import { firehoseEventCounter } from '@/metrics.ts'; import { firehoseEventCounter } from '@/metrics.ts';
import { Storages } from '@/storages.ts'; import { Storages } from '@/storages.ts';
import { nostrNow } from '@/utils.ts'; import { nostrNow } from '@/utils.ts';
@ -7,6 +9,7 @@ import { nostrNow } from '@/utils.ts';
import * as pipeline from '@/pipeline.ts'; import * as pipeline from '@/pipeline.ts';
const console = new Stickynotes('ditto:firehose'); const console = new Stickynotes('ditto:firehose');
const sem = new Semaphore(Conf.firehoseConcurrency);
/** /**
* This function watches events on all known relays and performs * This function watches events on all known relays and performs
@ -22,9 +25,13 @@ export async function startFirehose(): Promise<void> {
console.debug(`NostrEvent<${event.kind}> ${event.id}`); console.debug(`NostrEvent<${event.kind}> ${event.id}`);
firehoseEventCounter.inc({ kind: event.kind }); firehoseEventCounter.inc({ kind: event.kind });
pipeline sem.lock(async () => {
.handleEvent(event, AbortSignal.timeout(5000)) try {
.catch(() => {}); await pipeline.handleEvent(event, AbortSignal.timeout(5000));
} catch (e) {
console.warn(e);
}
});
} }
} }
} }