diff --git a/src/pipeline.ts b/src/pipeline.ts index 867f2c22..ab303709 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -33,31 +33,65 @@ const console = new Stickynotes('ditto:pipeline'); async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise { // Integer max value for Postgres. TODO: switch to a bigint in 2038. if (event.created_at >= 2_147_483_647) { - throw new RelayError('blocked', 'event too far in the future'); + throw new RelayError('invalid', 'event too far in the future'); } + // Integer max value for Postgres. if (event.kind >= 2_147_483_647) { - throw new RelayError('blocked', 'event kind too large'); + throw new RelayError('invalid', 'event kind too large'); } - if (!(await verifyEventWorker(event))) return; - if (encounterEvent(event)) return; - - console.info(`NostrEvent<${event.kind}> ${event.id}`); - pipelineEventsCounter.inc({ kind: event.kind }); - + // The only point of ephemeral events is to stream them, + // so throw an error if we're not even going to do that. + if (NKinds.ephemeral(event.kind) && !isFresh(event)) { + throw new RelayError('invalid', 'event too old'); + } + // Block NIP-70 events, because we have no way to `AUTH`. if (isProtectedEvent(event)) { throw new RelayError('invalid', 'protected event'); } + // Validate the event's signature. + if (!(await verifyEventWorker(event))) { + throw new RelayError('invalid', 'invalid signature'); + } + // Skip events that have been recently encountered. + // We must do this after verifying the signature. + if (encounterEvent(event)) { + throw new RelayError('duplicate', 'already have this event'); + } - if (event.kind !== 24133 && event.pubkey !== Conf.pubkey) { + // Log the event. + console.info(`NostrEvent<${event.kind}> ${event.id}`); + pipelineEventsCounter.inc({ kind: event.kind }); + + // NIP-46 events get special treatment. + // They are exempt from policies and other side-effects, and should be streamed out immediately. + // If streaming fails, an error should be returned. + if (event.kind === 24133) { + await streamOut(event); + return; + } + + // Ensure the event doesn't violate the policy. + if (event.pubkey !== Conf.pubkey) { await policyFilter(event, signal); } + // Prepare the event for additional checks. + // FIXME: This is kind of hacky. Should be reorganized to fetch only what's needed for each stage. await hydrateEvent(event, signal); + // Ensure that the author is not banned. const n = getTagSet(event.user?.tags ?? [], 'n'); - if (n.has('disabled')) { - throw new RelayError('blocked', 'user is disabled'); + throw new RelayError('blocked', 'author is blocked'); + } + + // Ephemeral events must throw if they are not streamed out. + if (NKinds.ephemeral(event.kind)) { + await Promise.all([ + streamOut(event), + webPush(event), + ]); + return; } const kysely = await Storages.kysely(); @@ -130,7 +164,7 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise { +async function storeEvent(event: NostrEvent, signal?: AbortSignal): Promise { if (NKinds.ephemeral(event.kind)) return; const store = await Storages.db(); @@ -217,20 +251,22 @@ async function setLanguage(event: NostrEvent): Promise { /** Determine if the event is being received in a timely manner. */ function isFresh(event: NostrEvent): boolean { - return eventAge(event) < Time.seconds(10); + return eventAge(event) < Time.minutes(1); } /** Distribute the event through active subscriptions. */ async function streamOut(event: NostrEvent): Promise { - if (isFresh(event)) { - const pubsub = await Storages.pubsub(); - await pubsub.event(event); + if (!isFresh(event)) { + throw new RelayError('invalid', 'event too old'); } + + const pubsub = await Storages.pubsub(); + await pubsub.event(event); } async function webPush(event: NostrEvent): Promise { if (!isFresh(event)) { - return; + throw new RelayError('invalid', 'event too old'); } const kysely = await Storages.kysely();