diff --git a/src/pipeline.ts b/src/pipeline.ts index a4161233..a99ec988 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -121,6 +121,7 @@ async function handleEvent(event: DittoEvent, opts: PipelineOpts): Promise handleZaps(kysely, event), parseMetadata(event, opts.signal), generateSetEvents(event), + updateStreak(event, opts), ]) .then(() => Promise.allSettled([ @@ -335,6 +336,47 @@ async function generateSetEvents(event: NostrEvent): Promise { } } +async function updateStreak(event: DittoEvent, opts: PipelineOpts): Promise { + const { pubkey, user } = event; + + if (event.kind !== 1) { + return; // Only kind 1 events contribute to streaks. + } + + if (!event.user && opts.source !== 'api') { + return; // Create new user events only if the event was created through the API. + } + + const ts = Math.floor(Date.now() / 1000); + const signer = new AdminSigner(); + + const t = user ?? { kind: 30382, content: '', tags: [['d', pubkey]], created_at: ts }; + + const start = parseInt(t.tags.find(([name]) => name === 'ditto.streak.start')?.[1]!); + const end = parseInt(t.tags.find(([name]) => name === 'ditto.streak.end')?.[1]!); + + if (end - start > 86400) { + return; // Streak is broken. + } + + if (event.created_at <= end) { + return; // Streak cannot go backwards in time. + } + + const tags = t.tags.filter(([name]) => !['ditto.streak.start', 'ditto.streak.end'].includes(name)); + + tags.push(['ditto.streak.start', (start || event.created_at).toString()]); + tags.push(['ditto.streak.end', event.created_at.toString(), event.id]); + + const updated = await signer.signEvent({ + ...t, + tags, + created_at: ts, + }); + + await handleEvent(updated, { source: 'pipeline', signal: AbortSignal.timeout(1000) }); +} + /** Stores the event in the 'event_zaps' table */ async function handleZaps(kysely: Kysely, event: NostrEvent) { if (event.kind !== 9735) return;