mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 11:29:46 +00:00
Merge branch 'ephemeral' into 'main'
pipeline: improve error handling, include special treatment for ephemeral and NIP-46 events See merge request soapbox-pub/ditto!576
This commit is contained in:
commit
d46af51604
1 changed files with 53 additions and 17 deletions
|
|
@ -33,31 +33,65 @@ const console = new Stickynotes('ditto:pipeline');
|
||||||
async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void> {
|
async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void> {
|
||||||
// Integer max value for Postgres. TODO: switch to a bigint in 2038.
|
// Integer max value for Postgres. TODO: switch to a bigint in 2038.
|
||||||
if (event.created_at >= 2_147_483_647) {
|
if (event.created_at >= 2_147_483_647) {
|
||||||
throw new RelayError('blocked', 'event too far in the future');
|
throw new RelayError('invalid', 'event too far in the future');
|
||||||
}
|
}
|
||||||
|
// Integer max value for Postgres.
|
||||||
if (event.kind >= 2_147_483_647) {
|
if (event.kind >= 2_147_483_647) {
|
||||||
throw new RelayError('blocked', 'event kind too large');
|
throw new RelayError('invalid', 'event kind too large');
|
||||||
}
|
}
|
||||||
if (!(await verifyEventWorker(event))) return;
|
// The only point of ephemeral events is to stream them,
|
||||||
if (encounterEvent(event)) return;
|
// so throw an error if we're not even going to do that.
|
||||||
|
if (NKinds.ephemeral(event.kind) && !isFresh(event)) {
|
||||||
console.info(`NostrEvent<${event.kind}> ${event.id}`);
|
throw new RelayError('invalid', 'event too old');
|
||||||
pipelineEventsCounter.inc({ kind: event.kind });
|
}
|
||||||
|
// Block NIP-70 events, because we have no way to `AUTH`.
|
||||||
if (isProtectedEvent(event)) {
|
if (isProtectedEvent(event)) {
|
||||||
throw new RelayError('invalid', 'protected event');
|
throw new RelayError('invalid', 'protected event');
|
||||||
}
|
}
|
||||||
|
// Validate the event's signature.
|
||||||
|
if (!(await verifyEventWorker(event))) {
|
||||||
|
throw new RelayError('invalid', 'invalid signature');
|
||||||
|
}
|
||||||
|
// Skip events that have been recently encountered.
|
||||||
|
// We must do this after verifying the signature.
|
||||||
|
if (encounterEvent(event)) {
|
||||||
|
throw new RelayError('duplicate', 'already have this event');
|
||||||
|
}
|
||||||
|
|
||||||
if (event.kind !== 24133 && event.pubkey !== Conf.pubkey) {
|
// Log the event.
|
||||||
|
console.info(`NostrEvent<${event.kind}> ${event.id}`);
|
||||||
|
pipelineEventsCounter.inc({ kind: event.kind });
|
||||||
|
|
||||||
|
// NIP-46 events get special treatment.
|
||||||
|
// They are exempt from policies and other side-effects, and should be streamed out immediately.
|
||||||
|
// If streaming fails, an error should be returned.
|
||||||
|
if (event.kind === 24133) {
|
||||||
|
await streamOut(event);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure the event doesn't violate the policy.
|
||||||
|
if (event.pubkey !== Conf.pubkey) {
|
||||||
await policyFilter(event, signal);
|
await policyFilter(event, signal);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Prepare the event for additional checks.
|
||||||
|
// FIXME: This is kind of hacky. Should be reorganized to fetch only what's needed for each stage.
|
||||||
await hydrateEvent(event, signal);
|
await hydrateEvent(event, signal);
|
||||||
|
|
||||||
|
// Ensure that the author is not banned.
|
||||||
const n = getTagSet(event.user?.tags ?? [], 'n');
|
const n = getTagSet(event.user?.tags ?? [], 'n');
|
||||||
|
|
||||||
if (n.has('disabled')) {
|
if (n.has('disabled')) {
|
||||||
throw new RelayError('blocked', 'user is disabled');
|
throw new RelayError('blocked', 'author is blocked');
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ephemeral events must throw if they are not streamed out.
|
||||||
|
if (NKinds.ephemeral(event.kind)) {
|
||||||
|
await Promise.all([
|
||||||
|
streamOut(event),
|
||||||
|
webPush(event),
|
||||||
|
]);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const kysely = await Storages.kysely();
|
const kysely = await Storages.kysely();
|
||||||
|
|
@ -130,7 +164,7 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<voi
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Maybe store the event, if eligible. */
|
/** Maybe store the event, if eligible. */
|
||||||
async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise<undefined> {
|
async function storeEvent(event: NostrEvent, signal?: AbortSignal): Promise<undefined> {
|
||||||
if (NKinds.ephemeral(event.kind)) return;
|
if (NKinds.ephemeral(event.kind)) return;
|
||||||
const store = await Storages.db();
|
const store = await Storages.db();
|
||||||
|
|
||||||
|
|
@ -217,20 +251,22 @@ async function setLanguage(event: NostrEvent): Promise<void> {
|
||||||
|
|
||||||
/** Determine if the event is being received in a timely manner. */
|
/** Determine if the event is being received in a timely manner. */
|
||||||
function isFresh(event: NostrEvent): boolean {
|
function isFresh(event: NostrEvent): boolean {
|
||||||
return eventAge(event) < Time.seconds(10);
|
return eventAge(event) < Time.minutes(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Distribute the event through active subscriptions. */
|
/** Distribute the event through active subscriptions. */
|
||||||
async function streamOut(event: NostrEvent): Promise<void> {
|
async function streamOut(event: NostrEvent): Promise<void> {
|
||||||
if (isFresh(event)) {
|
if (!isFresh(event)) {
|
||||||
|
throw new RelayError('invalid', 'event too old');
|
||||||
|
}
|
||||||
|
|
||||||
const pubsub = await Storages.pubsub();
|
const pubsub = await Storages.pubsub();
|
||||||
await pubsub.event(event);
|
await pubsub.event(event);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async function webPush(event: NostrEvent): Promise<void> {
|
async function webPush(event: NostrEvent): Promise<void> {
|
||||||
if (!isFresh(event)) {
|
if (!isFresh(event)) {
|
||||||
return;
|
throw new RelayError('invalid', 'event too old');
|
||||||
}
|
}
|
||||||
|
|
||||||
const kysely = await Storages.kysely();
|
const kysely = await Storages.kysely();
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue