diff --git a/deno.json b/deno.json index 0867a999..aa9729e1 100644 --- a/deno.json +++ b/deno.json @@ -26,6 +26,7 @@ "@db/sqlite": "jsr:@db/sqlite@^0.11.1", "@hono/hono": "jsr:@hono/hono@^4.4.6", "@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1", + "@lambdalisue/async": "jsr:@lambdalisue/async@^2.1.1", "@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0", "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.28.0", "@scure/base": "npm:@scure/base@^1.1.6", @@ -42,8 +43,8 @@ "@std/media-types": "jsr:@std/media-types@^0.224.1", "@std/streams": "jsr:@std/streams@^0.223.0", "comlink": "npm:comlink@^4.4.1", - "deno.json": "./deno.json", "deno-safe-fetch/load": "https://gitlab.com/soapbox-pub/deno-safe-fetch/-/raw/v1.0.0/load.ts", + "deno.json": "./deno.json", "entities": "npm:entities@^4.5.0", "fast-stable-stringify": "npm:fast-stable-stringify@^1.0.0", "formdata-helper": "npm:formdata-helper@^0.3.0", @@ -51,7 +52,6 @@ "iso-639-1": "npm:iso-639-1@2.1.15", "isomorphic-dompurify": "npm:isomorphic-dompurify@^2.11.0", "kysely": "npm:kysely@^0.27.4", - "postgres": "https://raw.githubusercontent.com/xyzshantaram/postgres.js/8a9bbce88b3f6425ecaacd99a80372338b157a53/deno/mod.js", "kysely-postgres-js": "npm:kysely-postgres-js@2.0.0", "light-bolt11-decoder": "npm:light-bolt11-decoder", "linkify-plugin-hashtag": "npm:linkify-plugin-hashtag@^4.1.1", @@ -61,6 +61,7 @@ "nostr-relaypool": "npm:nostr-relaypool2@0.6.34", "nostr-tools": "npm:nostr-tools@2.5.1", "nostr-wasm": "npm:nostr-wasm@^0.1.0", + "postgres": "https://raw.githubusercontent.com/xyzshantaram/postgres.js/8a9bbce88b3f6425ecaacd99a80372338b157a53/deno/mod.js", "prom-client": "npm:prom-client@^15.1.2", "question-deno": "https://raw.githubusercontent.com/ocpu/question-deno/10022b8e52555335aa510adb08b0a300df3cf904/mod.ts", "tldts": "npm:tldts@^6.0.14", diff --git a/deno.lock b/deno.lock index b33fa3ed..2f1f6090 100644 --- a/deno.lock +++ b/deno.lock @@ -10,6 +10,7 @@ "jsr:@gleasonator/policy@0.4.0": "jsr:@gleasonator/policy@0.4.0", "jsr:@gleasonator/policy@0.4.1": "jsr:@gleasonator/policy@0.4.1", "jsr:@hono/hono@^4.4.6": "jsr:@hono/hono@4.5.3", + "jsr:@lambdalisue/async@^2.1.1": "jsr:@lambdalisue/async@2.1.1", "jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.5", "jsr:@nostrify/nostrify@^0.22.4": "jsr:@nostrify/nostrify@0.22.4", "jsr:@nostrify/nostrify@^0.22.5": "jsr:@nostrify/nostrify@0.22.5", @@ -126,6 +127,9 @@ "@hono/hono@4.5.3": { "integrity": "429923b2b3c6586a1450862328d61a1346fee5841e8ae86c494250475057213c" }, + "@lambdalisue/async@2.1.1": { + "integrity": "1fc9bc6f4ed50215cd2f7217842b18cea80f81c25744f88f8c5eb4be5a1c9ab4" + }, "@nostrify/nostrify@0.22.4": { "integrity": "1c8a7847e5773213044b491e85fd7cafae2ad194ce59da4d957d2b27c776b42d", "dependencies": [ @@ -1780,6 +1784,7 @@ "jsr:@bradenmacdonald/s3-lite-client@^0.7.4", "jsr:@db/sqlite@^0.11.1", "jsr:@hono/hono@^4.4.6", + "jsr:@lambdalisue/async@^2.1.1", "jsr:@nostrify/nostrify@^0.28.0", "jsr:@soapbox/kysely-deno-sqlite@^2.1.0", "jsr:@soapbox/stickynotes@^0.4.0", diff --git a/src/config.ts b/src/config.ts index dfb84618..56825cc0 100644 --- a/src/config.ts +++ b/src/config.ts @@ -242,6 +242,10 @@ class Conf { static get firehoseEnabled(): boolean { return optionalBooleanSchema.parse(Deno.env.get('FIREHOSE_ENABLED')) ?? true; } + /** Number of events the firehose is allowed to process at one time before they have to wait in a queue. */ + static get firehoseConcurrency(): number { + return Math.ceil(Number(Deno.env.get('FIREHOSE_CONCURRENCY') ?? (Conf.pg.poolSize * 0.25))); + } /** Whether to enable Ditto cron jobs. */ static get cronEnabled(): boolean { return optionalBooleanSchema.parse(Deno.env.get('CRON_ENABLED')) ?? true; diff --git a/src/firehose.ts b/src/firehose.ts index 8b61c784..86d19f74 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -1,5 +1,7 @@ +import { Semaphore } from '@lambdalisue/async'; import { Stickynotes } from '@soapbox/stickynotes'; +import { Conf } from '@/config.ts'; import { firehoseEventCounter } from '@/metrics.ts'; import { Storages } from '@/storages.ts'; import { nostrNow } from '@/utils.ts'; @@ -7,6 +9,7 @@ import { nostrNow } from '@/utils.ts'; import * as pipeline from '@/pipeline.ts'; const console = new Stickynotes('ditto:firehose'); +const sem = new Semaphore(Conf.firehoseConcurrency); /** * This function watches events on all known relays and performs @@ -22,9 +25,13 @@ export async function startFirehose(): Promise { console.debug(`NostrEvent<${event.kind}> ${event.id}`); firehoseEventCounter.inc({ kind: event.kind }); - pipeline - .handleEvent(event, AbortSignal.timeout(5000)) - .catch(() => {}); + sem.lock(async () => { + try { + await pipeline.handleEvent(event, AbortSignal.timeout(5000)); + } catch (e) { + console.warn(e); + } + }); } } }