pipeline: skip refetch of encountered events

This commit is contained in:
Alex Gleason 2025-01-21 17:30:59 -06:00
parent 93a035e3ff
commit 7fdfb806f4
No known key found for this signature in database
GPG key ID: 7211D1F99744FBB7
7 changed files with 51 additions and 31 deletions

View file

@ -0,0 +1,3 @@
import { LRUCache } from 'lru-cache';
export const pipelineEncounters = new LRUCache<string, true>({ max: 5000 });

View file

@ -138,7 +138,7 @@ function connectStream(socket: WebSocket, ip: string | undefined) {
relayEventsCounter.inc({ kind: event.kind.toString() });
try {
// This will store it (if eligible) and run other side-effects.
await pipeline.handleEvent(purifyEvent(event), AbortSignal.timeout(1000));
await pipeline.handleEvent(purifyEvent(event), { source: 'relay', signal: AbortSignal.timeout(1000) });
send(['OK', event.id, true, '']);
} catch (e) {
if (e instanceof RelayError) {

View file

@ -27,7 +27,7 @@ export async function startFirehose(): Promise<void> {
sem.lock(async () => {
try {
await pipeline.handleEvent(event, AbortSignal.timeout(5000));
await pipeline.handleEvent(event, { source: 'firehose', signal: AbortSignal.timeout(5000) });
} catch (e) {
console.warn(e);
}

View file

@ -1,24 +1,32 @@
import { Semaphore } from '@lambdalisue/async';
import { Stickynotes } from '@soapbox/stickynotes';
import { pipelineEncounters } from '@/caches/pipelineEncounters.ts';
import { Conf } from '@/config.ts';
import * as pipeline from '@/pipeline.ts';
import { Storages } from '@/storages.ts';
const sem = new Semaphore(1);
const console = new Stickynotes('ditto:notify');
export async function startNotify(): Promise<void> {
const { listen } = await Storages.database();
const store = await Storages.db();
listen('nostr_event', (payload) => {
listen('nostr_event', (id) => {
if (pipelineEncounters.has(id)) {
console.debug(`Skip event ${id} because it was already in the pipeline`);
return;
}
sem.lock(async () => {
try {
const id = payload;
const timeout = Conf.db.timeouts.default;
const signal = AbortSignal.timeout(Conf.db.timeouts.default);
const [event] = await store.query([{ ids: [id], limit: 1 }], { signal });
const [event] = await store.query([{ ids: [id], limit: 1 }], { signal: AbortSignal.timeout(timeout) });
if (event) {
await pipeline.handleEvent(event, AbortSignal.timeout(timeout));
await pipeline.handleEvent(event, { source: 'notify', signal });
}
} catch (e) {
console.warn(e);

View file

@ -1,7 +1,6 @@
import { NKinds, NostrEvent, NSchema as n } from '@nostrify/nostrify';
import { Stickynotes } from '@soapbox/stickynotes';
import { Kysely, sql } from 'kysely';
import { LRUCache } from 'lru-cache';
import { z } from 'zod';
import { Conf } from '@/config.ts';
@ -23,14 +22,25 @@ import { getTagSet } from '@/utils/tags.ts';
import { renderWebPushNotification } from '@/views/mastodon/push.ts';
import { policyWorker } from '@/workers/policy.ts';
import { verifyEventWorker } from '@/workers/verify.ts';
import { pipelineEncounters } from '@/caches/pipelineEncounters.ts';
const console = new Stickynotes('ditto:pipeline');
interface PipelineOpts {
signal: AbortSignal;
source: 'relay' | 'api' | 'firehose' | 'pipeline' | 'notify' | 'internal';
}
/**
* Common pipeline function to process (and maybe store) events.
* It is idempotent, so it can be called multiple times for the same event.
*/
async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void> {
async function handleEvent(event: DittoEvent, opts: PipelineOpts): Promise<void> {
// Skip events that have already been encountered.
if (pipelineEncounters.get(event.id)) {
throw new RelayError('duplicate', 'already have this event');
}
// Reject events that are too far in the future.
if (eventAge(event) < -Time.minutes(1)) {
throw new RelayError('invalid', 'event too far in the future');
}
@ -51,11 +61,12 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
if (!(await verifyEventWorker(event))) {
throw new RelayError('invalid', 'invalid signature');
}
// Skip events that have been recently encountered.
// We must do this after verifying the signature.
if (encounterEvent(event)) {
// Recheck encountered after async ops.
if (pipelineEncounters.has(event.id)) {
throw new RelayError('duplicate', 'already have this event');
}
// Set the event as encountered after verifying the signature.
pipelineEncounters.set(event.id, true);
// Log the event.
console.info(`NostrEvent<${event.kind}> ${event.id}`);
@ -71,12 +82,12 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
// Ensure the event doesn't violate the policy.
if (event.pubkey !== Conf.pubkey) {
await policyFilter(event, signal);
await policyFilter(event, opts.signal);
}
// Prepare the event for additional checks.
// FIXME: This is kind of hacky. Should be reorganized to fetch only what's needed for each stage.
await hydrateEvent(event, signal);
await hydrateEvent(event, opts.signal);
// Ensure that the author is not banned.
const n = getTagSet(event.user?.tags ?? [], 'n');
@ -93,15 +104,24 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
return;
}
// Events received through notify are thought to already be in the database, so they only need to be streamed.
if (opts.source === 'notify') {
await Promise.all([
streamOut(event),
webPush(event),
]);
return;
}
const kysely = await Storages.kysely();
try {
await storeEvent(purifyEvent(event), signal);
await storeEvent(purifyEvent(event), opts.signal);
} finally {
// This needs to run in steps, and should not block the API from responding.
Promise.allSettled([
handleZaps(kysely, event),
parseMetadata(event, signal),
parseMetadata(event, opts.signal),
setLanguage(event),
generateSetEvents(event),
])
@ -132,17 +152,6 @@ async function policyFilter(event: NostrEvent, signal: AbortSignal): Promise<voi
}
}
const encounters = new LRUCache<string, true>({ max: 1000 });
/** Encounter the event, and return whether it has already been encountered. */
function encounterEvent(event: NostrEvent): boolean {
const encountered = !!encounters.get(event.id);
if (!encountered) {
encounters.set(event.id, true);
}
return encountered;
}
/** Check whether the event has a NIP-70 `-` tag. */
function isProtectedEvent(event: NostrEvent): boolean {
return event.tags.some(([name]) => name === '-');
@ -326,7 +335,7 @@ async function generateSetEvents(event: NostrEvent): Promise<void> {
created_at: Math.floor(Date.now() / 1000),
});
await handleEvent(rel, AbortSignal.timeout(1000));
await handleEvent(rel, { source: 'pipeline', signal: AbortSignal.timeout(1000) });
}
if (event.kind === 3036 && tagsAdmin) {
@ -344,7 +353,7 @@ async function generateSetEvents(event: NostrEvent): Promise<void> {
created_at: Math.floor(Date.now() / 1000),
});
await handleEvent(rel, AbortSignal.timeout(1000));
await handleEvent(rel, { source: 'pipeline', signal: AbortSignal.timeout(1000) });
}
}

View file

@ -111,7 +111,7 @@ export async function updateTrendingTags(
created_at: Math.floor(Date.now() / 1000),
});
await handleEvent(label, signal);
await handleEvent(label, { source: 'internal', signal });
console.info(`Trending ${l} updated.`);
} catch (e) {
console.error(`Error updating trending ${l}: ${e instanceof Error ? e.message : e}`);

View file

@ -161,7 +161,7 @@ async function updateNames(k: number, d: string, n: Record<string, boolean>, c:
async function publishEvent(event: NostrEvent, c: AppContext): Promise<NostrEvent> {
debug('EVENT', event);
try {
await pipeline.handleEvent(event, c.req.raw.signal);
await pipeline.handleEvent(event, { source: 'api', signal: c.req.raw.signal });
const client = await Storages.client();
await client.event(purifyEvent(event));
} catch (e) {