Merge branch 'main' into feat-set-kind-0-api

This commit is contained in:
P. Reis 2024-11-01 23:19:53 -03:00
commit c411a871ef

View file

@ -33,31 +33,65 @@ const console = new Stickynotes('ditto:pipeline');
async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void> { async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void> {
// Integer max value for Postgres. TODO: switch to a bigint in 2038. // Integer max value for Postgres. TODO: switch to a bigint in 2038.
if (event.created_at >= 2_147_483_647) { if (event.created_at >= 2_147_483_647) {
throw new RelayError('blocked', 'event too far in the future'); throw new RelayError('invalid', 'event too far in the future');
} }
// Integer max value for Postgres.
if (event.kind >= 2_147_483_647) { if (event.kind >= 2_147_483_647) {
throw new RelayError('blocked', 'event kind too large'); throw new RelayError('invalid', 'event kind too large');
} }
if (!(await verifyEventWorker(event))) return; // The only point of ephemeral events is to stream them,
if (encounterEvent(event)) return; // so throw an error if we're not even going to do that.
if (NKinds.ephemeral(event.kind) && !isFresh(event)) {
console.info(`NostrEvent<${event.kind}> ${event.id}`); throw new RelayError('invalid', 'event too old');
pipelineEventsCounter.inc({ kind: event.kind }); }
// Block NIP-70 events, because we have no way to `AUTH`.
if (isProtectedEvent(event)) { if (isProtectedEvent(event)) {
throw new RelayError('invalid', 'protected event'); throw new RelayError('invalid', 'protected event');
} }
// Validate the event's signature.
if (!(await verifyEventWorker(event))) {
throw new RelayError('invalid', 'invalid signature');
}
// Skip events that have been recently encountered.
// We must do this after verifying the signature.
if (encounterEvent(event)) {
throw new RelayError('duplicate', 'already have this event');
}
if (event.kind !== 24133 && event.pubkey !== Conf.pubkey) { // Log the event.
console.info(`NostrEvent<${event.kind}> ${event.id}`);
pipelineEventsCounter.inc({ kind: event.kind });
// NIP-46 events get special treatment.
// They are exempt from policies and other side-effects, and should be streamed out immediately.
// If streaming fails, an error should be returned.
if (event.kind === 24133) {
await streamOut(event);
return;
}
// Ensure the event doesn't violate the policy.
if (event.pubkey !== Conf.pubkey) {
await policyFilter(event, signal); await policyFilter(event, signal);
} }
// Prepare the event for additional checks.
// FIXME: This is kind of hacky. Should be reorganized to fetch only what's needed for each stage.
await hydrateEvent(event, signal); await hydrateEvent(event, signal);
// Ensure that the author is not banned.
const n = getTagSet(event.user?.tags ?? [], 'n'); const n = getTagSet(event.user?.tags ?? [], 'n');
if (n.has('disabled')) { if (n.has('disabled')) {
throw new RelayError('blocked', 'user is disabled'); throw new RelayError('blocked', 'author is blocked');
}
// Ephemeral events must throw if they are not streamed out.
if (NKinds.ephemeral(event.kind)) {
await Promise.all([
streamOut(event),
webPush(event),
]);
return;
} }
const kysely = await Storages.kysely(); const kysely = await Storages.kysely();
@ -130,7 +164,7 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<voi
} }
/** Maybe store the event, if eligible. */ /** Maybe store the event, if eligible. */
async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise<undefined> { async function storeEvent(event: NostrEvent, signal?: AbortSignal): Promise<undefined> {
if (NKinds.ephemeral(event.kind)) return; if (NKinds.ephemeral(event.kind)) return;
const store = await Storages.db(); const store = await Storages.db();
@ -217,20 +251,22 @@ async function setLanguage(event: NostrEvent): Promise<void> {
/** Determine if the event is being received in a timely manner. */ /** Determine if the event is being received in a timely manner. */
function isFresh(event: NostrEvent): boolean { function isFresh(event: NostrEvent): boolean {
return eventAge(event) < Time.seconds(10); return eventAge(event) < Time.minutes(1);
} }
/** Distribute the event through active subscriptions. */ /** Distribute the event through active subscriptions. */
async function streamOut(event: NostrEvent): Promise<void> { async function streamOut(event: NostrEvent): Promise<void> {
if (isFresh(event)) { if (!isFresh(event)) {
throw new RelayError('invalid', 'event too old');
}
const pubsub = await Storages.pubsub(); const pubsub = await Storages.pubsub();
await pubsub.event(event); await pubsub.event(event);
} }
}
async function webPush(event: NostrEvent): Promise<void> { async function webPush(event: NostrEvent): Promise<void> {
if (!isFresh(event)) { if (!isFresh(event)) {
return; throw new RelayError('invalid', 'event too old');
} }
const kysely = await Storages.kysely(); const kysely = await Storages.kysely();