diff --git a/scripts/db-populate-nip05.ts b/scripts/db-populate-nip05.ts index 48e792f6..ec74847b 100644 --- a/scripts/db-populate-nip05.ts +++ b/scripts/db-populate-nip05.ts @@ -1,49 +1,25 @@ import { Semaphore } from '@lambdalisue/async'; -import { NSchema as n } from '@nostrify/nostrify'; +import { updateAuthorData } from '@/pipeline.ts'; import { Storages } from '@/storages.ts'; -import { faviconCache } from '@/utils/favicon.ts'; -import { nip05Cache } from '@/utils/nip05.ts'; +import { NostrEvent } from '@nostrify/nostrify'; const kysely = await Storages.kysely(); const sem = new Semaphore(5); const query = kysely .selectFrom('nostr_events') - .select('content') + .select(['id', 'kind', 'content', 'pubkey', 'tags', 'created_at', 'sig']) .where('kind', '=', 0); -for await (const { content } of query.stream(100)) { +for await (const row of query.stream(100)) { while (sem.locked) { await new Promise((resolve) => setTimeout(resolve, 0)); } sem.lock(async () => { - const signal = AbortSignal.timeout(30_000); // generous timeout - - // Parse metadata. - const metadata = n.json().pipe(n.metadata()).catch({}).safeParse(content); - if (!metadata.success) return; - - // Update nip05. - const { nip05 } = metadata.data; - if (nip05) { - try { - await nip05Cache.fetch(nip05, { signal }); - } catch { - // Ignore. - } - } - - // Update favicon. - const domain = nip05?.split('@')[1].toLowerCase(); - if (domain) { - try { - await faviconCache.fetch(domain, { signal }); - } catch { - // Ignore. - } - } + const event: NostrEvent = { ...row, created_at: Number(row.created_at) }; + await updateAuthorData(event, AbortSignal.timeout(3000)); }); } diff --git a/src/pipeline.ts b/src/pipeline.ts index 9f9b8365..32fe353e 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,6 +1,7 @@ import { NKinds, NostrEvent, NSchema as n } from '@nostrify/nostrify'; import { logi } from '@soapbox/logi'; -import { Kysely } from 'kysely'; +import { Kysely, UpdateObject } from 'kysely'; +import tldts from 'tldts'; import { z } from 'zod'; import { pipelineEncounters } from '@/caches/pipelineEncounters.ts'; @@ -120,7 +121,7 @@ async function handleEvent(event: DittoEvent, opts: PipelineOpts): Promise // This needs to run in steps, and should not block the API from responding. Promise.allSettled([ handleZaps(kysely, event), - parseMetadata(event, opts.signal), + updateAuthorData(event, opts.signal), generateSetEvents(event), ]) .then(() => @@ -190,18 +191,47 @@ async function storeEvent(event: NostrEvent, signal?: AbortSignal): Promise { +async function updateAuthorData(event: NostrEvent, signal: AbortSignal): Promise { if (event.kind !== 0) return; // Parse metadata. const metadata = n.json().pipe(n.metadata()).catch({}).safeParse(event.content); if (!metadata.success) return; + const { name, nip05 } = metadata.data; + const kysely = await Storages.kysely(); - // Get nip05. - const { name, nip05 } = metadata.data; - const result = nip05 ? await nip05Cache.fetch(nip05, { signal }).catch(() => undefined) : undefined; + const updates: UpdateObject = {}; + + const authorStats = await kysely + .selectFrom('author_stats') + .selectAll() + .where('pubkey', '=', event.pubkey) + .executeTakeFirst(); + + const lastVerified = authorStats?.nip05_last_verified_at; + const eventNewer = !lastVerified || event.created_at > lastVerified; + + if (nip05 !== authorStats?.nip05 && eventNewer) { + if (nip05) { + const tld = tldts.parse(nip05); + if (tld.isIcann && !tld.isIp && !tld.isPrivate) { + const pointer = await nip05Cache.fetch(nip05.toLowerCase(), { signal }); + if (pointer.pubkey === event.pubkey) { + updates.nip05 = nip05; + updates.nip05_domain = tld.domain; + updates.nip05_hostname = tld.hostname; + updates.nip05_last_verified_at = event.created_at; + } + } + } else { + updates.nip05 = null; + updates.nip05_domain = null; + updates.nip05_hostname = null; + updates.nip05_last_verified_at = event.created_at; + } + } // Fetch favicon. const domain = nip05?.split('@')[1].toLowerCase(); @@ -209,18 +239,24 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise oc.column('pubkey').doUpdateSet({ search })) - .execute(); - } - } catch { - // do nothing + if (search !== authorStats?.search) { + updates.search = search; + } + + if (Object.keys(updates).length) { + await kysely.insertInto('author_stats') + .values({ + pubkey: event.pubkey, + followers_count: 0, + following_count: 0, + notes_count: 0, + search, + ...updates, + }) + .onConflict((oc) => oc.column('pubkey').doUpdateSet(updates)) + .execute(); } } @@ -353,4 +389,4 @@ async function handleZaps(kysely: Kysely, event: NostrEvent) { } } -export { handleEvent, handleZaps }; +export { handleEvent, handleZaps, updateAuthorData }; diff --git a/src/utils/nip05.ts b/src/utils/nip05.ts index 6bed4c6d..66ccb16e 100644 --- a/src/utils/nip05.ts +++ b/src/utils/nip05.ts @@ -1,29 +1,24 @@ import { nip19 } from 'nostr-tools'; import { NIP05, NStore } from '@nostrify/nostrify'; import { logi } from '@soapbox/logi'; -import { Kysely } from 'kysely'; import tldts from 'tldts'; import { Conf } from '@/config.ts'; -import { DittoTables } from '@/db/DittoTables.ts'; import { cachedNip05sSizeGauge } from '@/metrics.ts'; import { Storages } from '@/storages.ts'; import { errorJson } from '@/utils/log.ts'; import { SimpleLRU } from '@/utils/SimpleLRU.ts'; -import { Nip05, nostrNow, parseNip05 } from '@/utils.ts'; import { fetchWorker } from '@/workers/fetch.ts'; export const nip05Cache = new SimpleLRU( async (nip05, { signal }) => { const store = await Storages.db(); - const kysely = await Storages.kysely(); - return getNip05(kysely, store, nip05, signal); + return getNip05(store, nip05, signal); }, { ...Conf.caches.nip05, gauge: cachedNip05sSizeGauge }, ); async function getNip05( - kysely: Kysely, store: NStore, nip05: string, signal?: AbortSignal, @@ -36,88 +31,26 @@ async function getNip05( logi({ level: 'info', ns: 'ditto.nip05', nip05, state: 'started' }); - let pointer: nip19.ProfilePointer | undefined = await queryNip05(kysely, nip05); - - if (pointer) { - logi({ level: 'info', ns: 'ditto.nip05', nip05, state: 'found', source: 'db', pubkey: pointer.pubkey }); - return pointer; - } - const [name, domain] = nip05.split('@'); try { if (domain === Conf.url.host) { - pointer = await localNip05Lookup(store, name); + const pointer = await localNip05Lookup(store, name); if (pointer) { logi({ level: 'info', ns: 'ditto.nip05', nip05, state: 'found', source: 'local', pubkey: pointer.pubkey }); + return pointer; } else { throw new Error(`Not found: ${nip05}`); } } else { - pointer = await NIP05.lookup(nip05, { fetch: fetchWorker, signal }); + const pointer = await NIP05.lookup(nip05, { fetch: fetchWorker, signal }); logi({ level: 'info', ns: 'ditto.nip05', nip05, state: 'found', source: 'fetch', pubkey: pointer.pubkey }); + return pointer; } } catch (e) { logi({ level: 'info', ns: 'ditto.nip05', nip05, state: 'failed', error: errorJson(e) }); throw e; } - - insertNip05(kysely, nip05, pointer.pubkey).catch((e) => { - logi({ level: 'error', ns: 'ditto.nip05', nip05, state: 'insert_failed', error: errorJson(e) }); - }); - - return pointer; -} - -async function queryNip05(kysely: Kysely, nip05: string): Promise { - const row = await kysely - .selectFrom('author_stats') - .select('pubkey') - .where('nip05', '=', nip05) - .executeTakeFirst(); - - if (row) { - return { pubkey: row.pubkey }; - } -} - -async function insertNip05(kysely: Kysely, nip05: string, pubkey: string, ts = nostrNow()): Promise { - const tld = tldts.parse(nip05); - - if (!tld.isIcann || tld.isIp || tld.isPrivate) { - throw new Error(`Invalid NIP-05: ${nip05}`); - } - - await kysely - .insertInto('author_stats') - .values({ - pubkey, - nip05, - nip05_domain: tld.domain, - nip05_hostname: tld.hostname, - nip05_last_verified_at: ts, - followers_count: 0, // TODO: fix `author_stats` types so setting these aren't required - following_count: 0, - notes_count: 0, - search: nip05, - }) - .onConflict((oc) => - oc - .column('pubkey') - .doUpdateSet({ - nip05, - nip05_domain: tld.domain, - nip05_hostname: tld.hostname, - nip05_last_verified_at: ts, - }) - .where((eb) => - eb.or([ - eb('author_stats.nip05_last_verified_at', '<', ts), - eb('author_stats.nip05_last_verified_at', 'is', null), - ]) - ) - ) - .execute(); } export async function localNip05Lookup(store: NStore, localpart: string): Promise { @@ -134,19 +67,3 @@ export async function localNip05Lookup(store: NStore, localpart: string): Promis return { pubkey, relays: [Conf.relay] }; } } - -export async function parseAndVerifyNip05( - nip05: string | undefined, - pubkey: string, - signal = AbortSignal.timeout(3000), -): Promise { - if (!nip05) return; - try { - const result = await nip05Cache.fetch(nip05, { signal }); - if (result.pubkey === pubkey) { - return parseNip05(nip05); - } - } catch (_e) { - // do nothing - } -}