From 7fdfb806f46a4182344e1bf8a293b2039944b399 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 21 Jan 2025 17:30:59 -0600 Subject: [PATCH] pipeline: skip refetch of encountered events --- src/caches/pipelineEncounters.ts | 3 ++ src/controllers/nostr/relay.ts | 2 +- src/firehose.ts | 2 +- src/notify.ts | 18 ++++++++--- src/pipeline.ts | 53 +++++++++++++++++++------------- src/trends.ts | 2 +- src/utils/api.ts | 2 +- 7 files changed, 51 insertions(+), 31 deletions(-) create mode 100644 src/caches/pipelineEncounters.ts diff --git a/src/caches/pipelineEncounters.ts b/src/caches/pipelineEncounters.ts new file mode 100644 index 00000000..491a416f --- /dev/null +++ b/src/caches/pipelineEncounters.ts @@ -0,0 +1,3 @@ +import { LRUCache } from 'lru-cache'; + +export const pipelineEncounters = new LRUCache({ max: 5000 }); diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 71397ee8..74bd8a56 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -138,7 +138,7 @@ function connectStream(socket: WebSocket, ip: string | undefined) { relayEventsCounter.inc({ kind: event.kind.toString() }); try { // This will store it (if eligible) and run other side-effects. - await pipeline.handleEvent(purifyEvent(event), AbortSignal.timeout(1000)); + await pipeline.handleEvent(purifyEvent(event), { source: 'relay', signal: AbortSignal.timeout(1000) }); send(['OK', event.id, true, '']); } catch (e) { if (e instanceof RelayError) { diff --git a/src/firehose.ts b/src/firehose.ts index da8ab9c1..0dd88ba2 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -27,7 +27,7 @@ export async function startFirehose(): Promise { sem.lock(async () => { try { - await pipeline.handleEvent(event, AbortSignal.timeout(5000)); + await pipeline.handleEvent(event, { source: 'firehose', signal: AbortSignal.timeout(5000) }); } catch (e) { console.warn(e); } diff --git a/src/notify.ts b/src/notify.ts index 69480875..cda22718 100644 --- a/src/notify.ts +++ b/src/notify.ts @@ -1,24 +1,32 @@ import { Semaphore } from '@lambdalisue/async'; +import { Stickynotes } from '@soapbox/stickynotes'; +import { pipelineEncounters } from '@/caches/pipelineEncounters.ts'; import { Conf } from '@/config.ts'; import * as pipeline from '@/pipeline.ts'; import { Storages } from '@/storages.ts'; const sem = new Semaphore(1); +const console = new Stickynotes('ditto:notify'); export async function startNotify(): Promise { const { listen } = await Storages.database(); const store = await Storages.db(); - listen('nostr_event', (payload) => { + listen('nostr_event', (id) => { + if (pipelineEncounters.has(id)) { + console.debug(`Skip event ${id} because it was already in the pipeline`); + return; + } + sem.lock(async () => { try { - const id = payload; - const timeout = Conf.db.timeouts.default; + const signal = AbortSignal.timeout(Conf.db.timeouts.default); + + const [event] = await store.query([{ ids: [id], limit: 1 }], { signal }); - const [event] = await store.query([{ ids: [id], limit: 1 }], { signal: AbortSignal.timeout(timeout) }); if (event) { - await pipeline.handleEvent(event, AbortSignal.timeout(timeout)); + await pipeline.handleEvent(event, { source: 'notify', signal }); } } catch (e) { console.warn(e); diff --git a/src/pipeline.ts b/src/pipeline.ts index 5becff20..688fe3bb 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,7 +1,6 @@ import { NKinds, NostrEvent, NSchema as n } from '@nostrify/nostrify'; import { Stickynotes } from '@soapbox/stickynotes'; import { Kysely, sql } from 'kysely'; -import { LRUCache } from 'lru-cache'; import { z } from 'zod'; import { Conf } from '@/config.ts'; @@ -23,14 +22,25 @@ import { getTagSet } from '@/utils/tags.ts'; import { renderWebPushNotification } from '@/views/mastodon/push.ts'; import { policyWorker } from '@/workers/policy.ts'; import { verifyEventWorker } from '@/workers/verify.ts'; +import { pipelineEncounters } from '@/caches/pipelineEncounters.ts'; const console = new Stickynotes('ditto:pipeline'); +interface PipelineOpts { + signal: AbortSignal; + source: 'relay' | 'api' | 'firehose' | 'pipeline' | 'notify' | 'internal'; +} + /** * Common pipeline function to process (and maybe store) events. * It is idempotent, so it can be called multiple times for the same event. */ -async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise { +async function handleEvent(event: DittoEvent, opts: PipelineOpts): Promise { + // Skip events that have already been encountered. + if (pipelineEncounters.get(event.id)) { + throw new RelayError('duplicate', 'already have this event'); + } + // Reject events that are too far in the future. if (eventAge(event) < -Time.minutes(1)) { throw new RelayError('invalid', 'event too far in the future'); } @@ -51,11 +61,12 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise ${event.id}`); @@ -71,12 +82,12 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise({ max: 1000 }); - -/** Encounter the event, and return whether it has already been encountered. */ -function encounterEvent(event: NostrEvent): boolean { - const encountered = !!encounters.get(event.id); - if (!encountered) { - encounters.set(event.id, true); - } - return encountered; -} - /** Check whether the event has a NIP-70 `-` tag. */ function isProtectedEvent(event: NostrEvent): boolean { return event.tags.some(([name]) => name === '-'); @@ -326,7 +335,7 @@ async function generateSetEvents(event: NostrEvent): Promise { created_at: Math.floor(Date.now() / 1000), }); - await handleEvent(rel, AbortSignal.timeout(1000)); + await handleEvent(rel, { source: 'pipeline', signal: AbortSignal.timeout(1000) }); } if (event.kind === 3036 && tagsAdmin) { @@ -344,7 +353,7 @@ async function generateSetEvents(event: NostrEvent): Promise { created_at: Math.floor(Date.now() / 1000), }); - await handleEvent(rel, AbortSignal.timeout(1000)); + await handleEvent(rel, { source: 'pipeline', signal: AbortSignal.timeout(1000) }); } } diff --git a/src/trends.ts b/src/trends.ts index aced6800..cbe85c14 100644 --- a/src/trends.ts +++ b/src/trends.ts @@ -111,7 +111,7 @@ export async function updateTrendingTags( created_at: Math.floor(Date.now() / 1000), }); - await handleEvent(label, signal); + await handleEvent(label, { source: 'internal', signal }); console.info(`Trending ${l} updated.`); } catch (e) { console.error(`Error updating trending ${l}: ${e instanceof Error ? e.message : e}`); diff --git a/src/utils/api.ts b/src/utils/api.ts index 4bbd32fc..37cbbbf9 100644 --- a/src/utils/api.ts +++ b/src/utils/api.ts @@ -161,7 +161,7 @@ async function updateNames(k: number, d: string, n: Record, c: async function publishEvent(event: NostrEvent, c: AppContext): Promise { debug('EVENT', event); try { - await pipeline.handleEvent(event, c.req.raw.signal); + await pipeline.handleEvent(event, { source: 'api', signal: c.req.raw.signal }); const client = await Storages.client(); await client.event(purifyEvent(event)); } catch (e) {