From 4f79b7ec29d75a56062c1bbff6517481a893df8c Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sun, 10 Dec 2023 17:42:44 -0600 Subject: [PATCH] stats: handle follow/following counts --- src/deps.ts | 1 + src/pipeline.ts | 2 +- src/stats.ts | 94 +++++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 86 insertions(+), 11 deletions(-) diff --git a/src/deps.ts b/src/deps.ts index f1cedc56..a6b893cd 100644 --- a/src/deps.ts +++ b/src/deps.ts @@ -63,6 +63,7 @@ export { type CompiledQuery, FileMigrationProvider, type Insertable, + type InsertQueryBuilder, Kysely, Migrator, type NullableInsertKeys, diff --git a/src/pipeline.ts b/src/pipeline.ts index d4811e40..4f0c51ab 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -71,7 +71,7 @@ async function storeEvent(event: Event, data: EventData): Promise { } else { await Promise.all([ eventsDB.insertEvent(event, data).catch(console.warn), - updateStats(event), + updateStats(event).catch(console.warn), ]); } } else { diff --git a/src/stats.ts b/src/stats.ts index a0d32d70..578e2a35 100644 --- a/src/stats.ts +++ b/src/stats.ts @@ -1,5 +1,6 @@ -import { type AuthorStatsRow, db, type EventStatsRow } from '@/db.ts'; -import { Event, findReplyTag } from '@/deps.ts'; +import { type AuthorStatsRow, db, type DittoDB, type EventStatsRow } from '@/db.ts'; +import * as eventsDB from '@/db/events.ts'; +import { type Event, findReplyTag, type InsertQueryBuilder } from '@/deps.ts'; type AuthorStat = keyof Omit; type EventStat = keyof Omit; @@ -9,21 +10,29 @@ type EventStatDiff = ['event_stats', eventId: string, stat: EventStat, diff: num type StatDiff = AuthorStatDiff | EventStatDiff; /** Store stats for the event in LMDB. */ -async function updateStats(event: Event) { - const statDiffs = getStatsDiff(event); - if (!statDiffs.length) return; +async function updateStats(event: Event & { prev?: Event }) { + const queries: InsertQueryBuilder[] = []; + // Kind 3 is a special case - replace the count with the new list. + if (event.kind === 3) { + await maybeSetPrev(event); + queries.push(updateFollowingCountQuery(event as Event<3>)); + } + + const statDiffs = getStatsDiff(event); const pubkeyDiffs = statDiffs.filter(([table]) => table === 'author_stats') as AuthorStatDiff[]; const eventDiffs = statDiffs.filter(([table]) => table === 'event_stats') as EventStatDiff[]; - await Promise.all([ - pubkeyDiffs.length ? authorStatsQuery(pubkeyDiffs).execute() : undefined, - eventDiffs.length ? eventStatsQuery(eventDiffs).execute() : undefined, - ]); + if (pubkeyDiffs.length) queries.push(authorStatsQuery(pubkeyDiffs)); + if (eventDiffs.length) queries.push(eventStatsQuery(eventDiffs)); + + if (queries.length) { + await Promise.all(queries.map((query) => query.execute())); + } } /** Calculate stats changes ahead of time so we can build an efficient query. */ -function getStatsDiff(event: Event): StatDiff[] { +function getStatsDiff(event: Event & { prev?: Event }): StatDiff[] { const statDiffs: StatDiff[] = []; const firstTaggedId = event.tags.find(([name]) => name === 'e')?.[1]; @@ -36,6 +45,9 @@ function getStatsDiff(event: Event): StatDiff[] { statDiffs.push(['event_stats', inReplyToId, 'replies_count', 1]); } break; + case 3: + statDiffs.push(...getFollowDiff(event as Event<3>, event.prev as Event<3> | undefined)); + break; case 6: if (firstTaggedId) { statDiffs.push(['event_stats', firstTaggedId, 'reposts_count', 1]); @@ -50,6 +62,7 @@ function getStatsDiff(event: Event): StatDiff[] { return statDiffs; } +/** Create an author stats query from the list of diffs. */ function authorStatsQuery(diffs: AuthorStatDiff[]) { const values: AuthorStatsRow[] = diffs.map(([_, pubkey, stat, diff]) => { const row: AuthorStatsRow = { @@ -75,6 +88,7 @@ function authorStatsQuery(diffs: AuthorStatDiff[]) { ); } +/** Create an event stats query from the list of diffs. */ function eventStatsQuery(diffs: EventStatDiff[]) { const values: EventStatsRow[] = diffs.map(([_, event_id, stat, diff]) => { const row: EventStatsRow = { @@ -100,4 +114,64 @@ function eventStatsQuery(diffs: EventStatDiff[]) { ); } +/** Set the `prev` value on the event to the last version of the event, if any. */ +async function maybeSetPrev(event: Event & { prev?: Event }): Promise { + if (event.prev?.kind === event.kind) return; + + const [prev] = await eventsDB.getFilters([ + { kinds: [event.kind], authors: [event.pubkey], limit: 1 }, + ]); + + if (prev.created_at < event.created_at) { + event.prev = prev; + } +} + +/** Set the following count to the total number of unique "p" tags in the follow list. */ +function updateFollowingCountQuery({ pubkey, tags }: Event<3>) { + const following_count = new Set( + tags + .filter(([name]) => name === 'p') + .map(([_, value]) => value), + ).size; + + return db.insertInto('author_stats') + .values({ + pubkey, + following_count, + followers_count: 0, + notes_count: 0, + }) + .onConflict((oc) => + oc + .column('pubkey') + .doUpdateSet({ following_count }) + ); +} + +/** Compare the old and new follow events (if any), and return a diff array. */ +function getFollowDiff(event: Event<3>, prev?: Event<3>): AuthorStatDiff[] { + const prevTags = prev?.tags ?? []; + + const prevPubkeys = new Set( + prevTags + .filter(([name]) => name === 'p') + .map(([_, value]) => value), + ); + + const pubkeys = new Set( + event.tags + .filter(([name]) => name === 'p') + .map(([_, value]) => value), + ); + + const added = [...pubkeys].filter((pubkey) => !prevPubkeys.has(pubkey)); + const removed = [...prevPubkeys].filter((pubkey) => !pubkeys.has(pubkey)); + + return [ + ...added.map((pubkey): AuthorStatDiff => ['author_stats', pubkey, 'followers_count', 1]), + ...removed.map((pubkey): AuthorStatDiff => ['author_stats', pubkey, 'followers_count', -1]), + ]; +} + export { updateStats };