Merge branch 'pipeline-source' into 'main'

pipeline: skip refetch of encountered events

See merge request soapbox-pub/ditto!620
This commit is contained in:
Alex Gleason 2025-01-22 00:03:27 +00:00
commit 3fd311b929
7 changed files with 51 additions and 31 deletions

View file

@ -0,0 +1,3 @@
import { LRUCache } from 'lru-cache';
export const pipelineEncounters = new LRUCache<string, true>({ max: 5000 });

View file

@ -138,7 +138,7 @@ function connectStream(socket: WebSocket, ip: string | undefined) {
relayEventsCounter.inc({ kind: event.kind.toString() }); relayEventsCounter.inc({ kind: event.kind.toString() });
try { try {
// This will store it (if eligible) and run other side-effects. // This will store it (if eligible) and run other side-effects.
await pipeline.handleEvent(purifyEvent(event), AbortSignal.timeout(1000)); await pipeline.handleEvent(purifyEvent(event), { source: 'relay', signal: AbortSignal.timeout(1000) });
send(['OK', event.id, true, '']); send(['OK', event.id, true, '']);
} catch (e) { } catch (e) {
if (e instanceof RelayError) { if (e instanceof RelayError) {

View file

@ -27,7 +27,7 @@ export async function startFirehose(): Promise<void> {
sem.lock(async () => { sem.lock(async () => {
try { try {
await pipeline.handleEvent(event, AbortSignal.timeout(5000)); await pipeline.handleEvent(event, { source: 'firehose', signal: AbortSignal.timeout(5000) });
} catch (e) { } catch (e) {
console.warn(e); console.warn(e);
} }

View file

@ -1,24 +1,32 @@
import { Semaphore } from '@lambdalisue/async'; import { Semaphore } from '@lambdalisue/async';
import { Stickynotes } from '@soapbox/stickynotes';
import { pipelineEncounters } from '@/caches/pipelineEncounters.ts';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import * as pipeline from '@/pipeline.ts'; import * as pipeline from '@/pipeline.ts';
import { Storages } from '@/storages.ts'; import { Storages } from '@/storages.ts';
const sem = new Semaphore(1); const sem = new Semaphore(1);
const console = new Stickynotes('ditto:notify');
export async function startNotify(): Promise<void> { export async function startNotify(): Promise<void> {
const { listen } = await Storages.database(); const { listen } = await Storages.database();
const store = await Storages.db(); const store = await Storages.db();
listen('nostr_event', (payload) => { listen('nostr_event', (id) => {
if (pipelineEncounters.has(id)) {
console.debug(`Skip event ${id} because it was already in the pipeline`);
return;
}
sem.lock(async () => { sem.lock(async () => {
try { try {
const id = payload; const signal = AbortSignal.timeout(Conf.db.timeouts.default);
const timeout = Conf.db.timeouts.default;
const [event] = await store.query([{ ids: [id], limit: 1 }], { signal });
const [event] = await store.query([{ ids: [id], limit: 1 }], { signal: AbortSignal.timeout(timeout) });
if (event) { if (event) {
await pipeline.handleEvent(event, AbortSignal.timeout(timeout)); await pipeline.handleEvent(event, { source: 'notify', signal });
} }
} catch (e) { } catch (e) {
console.warn(e); console.warn(e);

View file

@ -1,7 +1,6 @@
import { NKinds, NostrEvent, NSchema as n } from '@nostrify/nostrify'; import { NKinds, NostrEvent, NSchema as n } from '@nostrify/nostrify';
import { Stickynotes } from '@soapbox/stickynotes'; import { Stickynotes } from '@soapbox/stickynotes';
import { Kysely, sql } from 'kysely'; import { Kysely, sql } from 'kysely';
import { LRUCache } from 'lru-cache';
import { z } from 'zod'; import { z } from 'zod';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
@ -23,14 +22,25 @@ import { getTagSet } from '@/utils/tags.ts';
import { renderWebPushNotification } from '@/views/mastodon/push.ts'; import { renderWebPushNotification } from '@/views/mastodon/push.ts';
import { policyWorker } from '@/workers/policy.ts'; import { policyWorker } from '@/workers/policy.ts';
import { verifyEventWorker } from '@/workers/verify.ts'; import { verifyEventWorker } from '@/workers/verify.ts';
import { pipelineEncounters } from '@/caches/pipelineEncounters.ts';
const console = new Stickynotes('ditto:pipeline'); const console = new Stickynotes('ditto:pipeline');
interface PipelineOpts {
signal: AbortSignal;
source: 'relay' | 'api' | 'firehose' | 'pipeline' | 'notify' | 'internal';
}
/** /**
* Common pipeline function to process (and maybe store) events. * Common pipeline function to process (and maybe store) events.
* It is idempotent, so it can be called multiple times for the same event. * It is idempotent, so it can be called multiple times for the same event.
*/ */
async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void> { async function handleEvent(event: DittoEvent, opts: PipelineOpts): Promise<void> {
// Skip events that have already been encountered.
if (pipelineEncounters.get(event.id)) {
throw new RelayError('duplicate', 'already have this event');
}
// Reject events that are too far in the future.
if (eventAge(event) < -Time.minutes(1)) { if (eventAge(event) < -Time.minutes(1)) {
throw new RelayError('invalid', 'event too far in the future'); throw new RelayError('invalid', 'event too far in the future');
} }
@ -51,11 +61,12 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
if (!(await verifyEventWorker(event))) { if (!(await verifyEventWorker(event))) {
throw new RelayError('invalid', 'invalid signature'); throw new RelayError('invalid', 'invalid signature');
} }
// Skip events that have been recently encountered. // Recheck encountered after async ops.
// We must do this after verifying the signature. if (pipelineEncounters.has(event.id)) {
if (encounterEvent(event)) {
throw new RelayError('duplicate', 'already have this event'); throw new RelayError('duplicate', 'already have this event');
} }
// Set the event as encountered after verifying the signature.
pipelineEncounters.set(event.id, true);
// Log the event. // Log the event.
console.info(`NostrEvent<${event.kind}> ${event.id}`); console.info(`NostrEvent<${event.kind}> ${event.id}`);
@ -71,12 +82,12 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
// Ensure the event doesn't violate the policy. // Ensure the event doesn't violate the policy.
if (event.pubkey !== Conf.pubkey) { if (event.pubkey !== Conf.pubkey) {
await policyFilter(event, signal); await policyFilter(event, opts.signal);
} }
// Prepare the event for additional checks. // Prepare the event for additional checks.
// FIXME: This is kind of hacky. Should be reorganized to fetch only what's needed for each stage. // FIXME: This is kind of hacky. Should be reorganized to fetch only what's needed for each stage.
await hydrateEvent(event, signal); await hydrateEvent(event, opts.signal);
// Ensure that the author is not banned. // Ensure that the author is not banned.
const n = getTagSet(event.user?.tags ?? [], 'n'); const n = getTagSet(event.user?.tags ?? [], 'n');
@ -93,15 +104,24 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
return; return;
} }
// Events received through notify are thought to already be in the database, so they only need to be streamed.
if (opts.source === 'notify') {
await Promise.all([
streamOut(event),
webPush(event),
]);
return;
}
const kysely = await Storages.kysely(); const kysely = await Storages.kysely();
try { try {
await storeEvent(purifyEvent(event), signal); await storeEvent(purifyEvent(event), opts.signal);
} finally { } finally {
// This needs to run in steps, and should not block the API from responding. // This needs to run in steps, and should not block the API from responding.
Promise.allSettled([ Promise.allSettled([
handleZaps(kysely, event), handleZaps(kysely, event),
parseMetadata(event, signal), parseMetadata(event, opts.signal),
setLanguage(event), setLanguage(event),
generateSetEvents(event), generateSetEvents(event),
]) ])
@ -132,17 +152,6 @@ async function policyFilter(event: NostrEvent, signal: AbortSignal): Promise<voi
} }
} }
const encounters = new LRUCache<string, true>({ max: 1000 });
/** Encounter the event, and return whether it has already been encountered. */
function encounterEvent(event: NostrEvent): boolean {
const encountered = !!encounters.get(event.id);
if (!encountered) {
encounters.set(event.id, true);
}
return encountered;
}
/** Check whether the event has a NIP-70 `-` tag. */ /** Check whether the event has a NIP-70 `-` tag. */
function isProtectedEvent(event: NostrEvent): boolean { function isProtectedEvent(event: NostrEvent): boolean {
return event.tags.some(([name]) => name === '-'); return event.tags.some(([name]) => name === '-');
@ -326,7 +335,7 @@ async function generateSetEvents(event: NostrEvent): Promise<void> {
created_at: Math.floor(Date.now() / 1000), created_at: Math.floor(Date.now() / 1000),
}); });
await handleEvent(rel, AbortSignal.timeout(1000)); await handleEvent(rel, { source: 'pipeline', signal: AbortSignal.timeout(1000) });
} }
if (event.kind === 3036 && tagsAdmin) { if (event.kind === 3036 && tagsAdmin) {
@ -344,7 +353,7 @@ async function generateSetEvents(event: NostrEvent): Promise<void> {
created_at: Math.floor(Date.now() / 1000), created_at: Math.floor(Date.now() / 1000),
}); });
await handleEvent(rel, AbortSignal.timeout(1000)); await handleEvent(rel, { source: 'pipeline', signal: AbortSignal.timeout(1000) });
} }
} }

View file

@ -111,7 +111,7 @@ export async function updateTrendingTags(
created_at: Math.floor(Date.now() / 1000), created_at: Math.floor(Date.now() / 1000),
}); });
await handleEvent(label, signal); await handleEvent(label, { source: 'internal', signal });
console.info(`Trending ${l} updated.`); console.info(`Trending ${l} updated.`);
} catch (e) { } catch (e) {
console.error(`Error updating trending ${l}: ${e instanceof Error ? e.message : e}`); console.error(`Error updating trending ${l}: ${e instanceof Error ? e.message : e}`);

View file

@ -161,7 +161,7 @@ async function updateNames(k: number, d: string, n: Record<string, boolean>, c:
async function publishEvent(event: NostrEvent, c: AppContext): Promise<NostrEvent> { async function publishEvent(event: NostrEvent, c: AppContext): Promise<NostrEvent> {
debug('EVENT', event); debug('EVENT', event);
try { try {
await pipeline.handleEvent(event, c.req.raw.signal); await pipeline.handleEvent(event, { source: 'api', signal: c.req.raw.signal });
const client = await Storages.client(); const client = await Storages.client();
await client.event(purifyEvent(event)); await client.event(purifyEvent(event));
} catch (e) { } catch (e) {