diff --git a/.gitignore b/.gitignore index 2eea525d..17f06fa0 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -.env \ No newline at end of file +.env +*.cpuprofile \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 00000000..71abdef3 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,23 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "request": "launch", + "name": "Launch Program", + "type": "node", + "program": "${workspaceFolder}/src/server.ts", + "cwd": "${workspaceFolder}", + "runtimeExecutable": "deno", + "runtimeArgs": [ + "run", + "--inspect-wait", + "--allow-all", + "--unstable" + ], + "attachSimplePort": 9229 + } + ] +} \ No newline at end of file diff --git a/deno.json b/deno.json index 8d078610..3716529d 100644 --- a/deno.json +++ b/deno.json @@ -3,8 +3,7 @@ "lock": false, "tasks": { "start": "deno run -A --unstable src/server.ts", - "dev": "deno run -A --unstable --watch src/server.ts", - "debug": "deno run -A --unstable --inspect src/server.ts", + "dev": "deno run -A --unstable --watch --inspect src/server.ts", "test": "DB_PATH=\":memory:\" deno test -A --unstable", "check": "deno check src/server.ts", "relays:sync": "deno run -A --unstable scripts/relays.ts sync" diff --git a/fixtures/events/event-0.json b/fixtures/events/event-0.json new file mode 100644 index 00000000..907e1a11 --- /dev/null +++ b/fixtures/events/event-0.json @@ -0,0 +1,15 @@ +{ + "id": "63d38c9b483d2d98a46382eadefd272e0e4bdb106a5b6eddb400c4e76f693d35", + "pubkey": "79c2cae114ea28a981e7559b4fe7854a473521a8d22a66bbab9fa248eb820ff6", + "created_at": 1699398376, + "kind": 0, + "tags": [ + [ + "proxy", + "https://gleasonator.com/users/alex", + "activitypub" + ] + ], + "content": "{\"name\":\"Alex Gleason\",\"about\":\"I create Fediverse software that empowers people online.\\n\\nI'm vegan btw.\\n\\nNote: If you have a question for me, please tag me publicly. This gives the opportunity for others to chime in, and bystanders to learn.\",\"picture\":\"https://media.gleasonator.com/aae0071188681629f200ab41502e03b9861d2754a44c008d3869c8a08b08d1f1.png\",\"banner\":\"https://media.gleasonator.com/e5f6e0e380536780efa774e8d3c8a5a040e3f9f99dbb48910b261c32872ee3a3.gif\",\"nip05\":\"alex_at_gleasonator.com@mostr.pub\",\"lud16\":\"alex@alexgleason.me\"}", + "sig": "9d48bbb600aab44abaeee11c97f1753f1d7de08378e9b33d84f9be893a09270aeceecfde3cfb698c555ae1bde3e4e54b3463a61bb99bdf673d64c2202f98b0e9" +} \ No newline at end of file diff --git a/fixtures/events/event-1.json b/fixtures/events/event-1.json new file mode 100644 index 00000000..f902786c --- /dev/null +++ b/fixtures/events/event-1.json @@ -0,0 +1,15 @@ +{ + "kind": 1, + "content": "I'm vegan btw", + "tags": [ + [ + "proxy", + "https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79", + "activitypub" + ] + ], + "pubkey": "79c2cae114ea28a981e7559b4fe7854a473521a8d22a66bbab9fa248eb820ff6", + "created_at": 1691091365, + "id": "55920b758b9c7b17854b6e3d44e6a02a83d1cb49e1227e75a30426dea94d4cb2", + "sig": "a72f12c08f18e85d98fb92ae89e2fe63e48b8864c5e10fbdd5335f3c9f936397a6b0a7350efe251f8168b1601d7012d4a6d0ee6eec958067cf22a14f5a5ea579" +} \ No newline at end of file diff --git a/src/app.ts b/src/app.ts index 5ac26360..b8a96950 100644 --- a/src/app.ts +++ b/src/app.ts @@ -4,6 +4,7 @@ import { type User } from '@/db/users.ts'; import { type Context, cors, + Debug, type Event, type Handler, Hono, @@ -90,7 +91,14 @@ if (Conf.sentryDsn) { app.use('*', sentryMiddleware({ dsn: Conf.sentryDsn })); } -app.use('*', logger()); +const debug = Debug('ditto:http'); + +app.use('/api', logger(debug)); +app.use('/relay', logger(debug)); +app.use('/.well-known', logger(debug)); +app.use('/users', logger(debug)); +app.use('/nodeinfo', logger(debug)); +app.use('/oauth', logger(debug)); app.get('/api/v1/streaming', streamingController); app.get('/api/v1/streaming/', streamingController); diff --git a/src/client.ts b/src/client.ts index c3d7cd9a..85e430ae 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,19 +1,23 @@ -import { type Event, type Filter, matchFilters } from '@/deps.ts'; +import { Debug, type Event, type Filter, matchFilters } from '@/deps.ts'; import * as pipeline from '@/pipeline.ts'; -import { allRelays, pool } from '@/pool.ts'; +import { activeRelays, pool } from '@/pool.ts'; import type { GetFiltersOpts } from '@/filter.ts'; +const debug = Debug('ditto:client'); + /** Get events from a NIP-01 filter. */ function getFilters(filters: Filter[], opts: GetFiltersOpts = {}): Promise[]> { + if (opts.signal?.aborted) return Promise.resolve([]); if (!filters.length) return Promise.resolve([]); + debug('REQ', JSON.stringify(filters)); + return new Promise((resolve) => { - let tid: number; const results: Event[] = []; const unsub = pool.subscribe( filters, - allRelays, + opts.relays ?? activeRelays, (event: Event | null) => { if (event && matchFilters(filters, event)) { pipeline.handleEvent(event).catch(() => {}); @@ -29,24 +33,20 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts } if (typeof opts.limit === 'number' && results.length >= opts.limit) { unsub(); - clearTimeout(tid); resolve(results as Event[]); } }, undefined, () => { unsub(); - clearTimeout(tid); resolve(results as Event[]); }, ); - if (typeof opts.timeout === 'number') { - tid = setTimeout(() => { - unsub(); - resolve(results as Event[]); - }, opts.timeout); - } + opts.signal?.addEventListener('abort', () => { + unsub(); + resolve(results as Event[]); + }); }); } diff --git a/src/controllers/api/accounts.ts b/src/controllers/api/accounts.ts index 22c9a63f..0a4430ca 100644 --- a/src/controllers/api/accounts.ts +++ b/src/controllers/api/accounts.ts @@ -8,7 +8,7 @@ import { getAuthor, getFollowedPubkeys, getFollows } from '@/queries.ts'; import { booleanParamSchema, fileSchema } from '@/schema.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { uploadFile } from '@/upload.ts'; -import { isFollowing, lookupAccount, nostrNow, Time } from '@/utils.ts'; +import { isFollowing, lookupAccount, nostrNow } from '@/utils.ts'; import { paginated, paginationSchema, parseBody } from '@/utils/web.ts'; import { createEvent } from '@/utils/web.ts'; import { renderEventAccounts } from '@/views.ts'; @@ -258,7 +258,7 @@ const favouritesController: AppController = async (c) => { const events7 = await mixer.getFilters( [{ kinds: [7], authors: [pubkey], ...params }], - { timeout: Time.seconds(1) }, + { signal: AbortSignal.timeout(1000) }, ); const ids = events7 @@ -266,7 +266,7 @@ const favouritesController: AppController = async (c) => { .filter((id): id is string => !!id); const events1 = await mixer.getFilters([{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }], { - timeout: Time.seconds(1), + signal: AbortSignal.timeout(1000), }); const statuses = await Promise.all(events1.map((event) => renderStatus(event, c.get('pubkey')))); diff --git a/src/controllers/api/instance.ts b/src/controllers/api/instance.ts index 10b359b2..ebc32694 100644 --- a/src/controllers/api/instance.ts +++ b/src/controllers/api/instance.ts @@ -10,8 +10,8 @@ const instanceController: AppController = (c) => { return c.json({ uri: host, title: 'Ditto', - description: 'An efficient and flexible social media server.', - short_description: 'An efficient and flexible social media server.', + description: 'Nostr and the Fediverse', + short_description: 'Nostr and the Fediverse', registrations: Conf.registrations, max_toot_chars: Conf.postCharLimit, configuration: { diff --git a/src/controllers/api/notifications.ts b/src/controllers/api/notifications.ts index 89f59f79..e7d66011 100644 --- a/src/controllers/api/notifications.ts +++ b/src/controllers/api/notifications.ts @@ -1,6 +1,5 @@ import { type AppController } from '@/app.ts'; import * as mixer from '@/mixer.ts'; -import { Time } from '@/utils.ts'; import { paginated, paginationSchema } from '@/utils/web.ts'; import { renderNotification } from '@/views/mastodon/notifications.ts'; @@ -10,7 +9,7 @@ const notificationsController: AppController = async (c) => { const events = await mixer.getFilters( [{ kinds: [1], '#p': [pubkey], since, until }], - { timeout: Time.seconds(3) }, + { signal: AbortSignal.timeout(3000) }, ); const statuses = await Promise.all(events.map((event) => renderNotification(event, pubkey))); diff --git a/src/controllers/api/search.ts b/src/controllers/api/search.ts index e5f1778c..aed19fec 100644 --- a/src/controllers/api/search.ts +++ b/src/controllers/api/search.ts @@ -5,7 +5,7 @@ import { type DittoFilter } from '@/filter.ts'; import * as mixer from '@/mixer.ts'; import { booleanParamSchema } from '@/schema.ts'; import { nostrIdSchema } from '@/schemas/nostr.ts'; -import { dedupeEvents, Time } from '@/utils.ts'; +import { dedupeEvents } from '@/utils.ts'; import { lookupNip05Cached } from '@/utils/nip05.ts'; import { renderAccount } from '@/views/mastodon/accounts.ts'; import { renderStatus } from '@/views/mastodon/statuses.ts'; @@ -93,9 +93,9 @@ function typeToKinds(type: SearchQuery['type']): number[] { } /** Resolve a searched value into an event, if applicable. */ -async function lookupEvent(query: SearchQuery): Promise { +async function lookupEvent(query: SearchQuery, signal = AbortSignal.timeout(1000)): Promise { const filters = await getLookupFilters(query); - const [event] = await mixer.getFilters(filters, { limit: 1, timeout: Time.seconds(1) }); + const [event] = await mixer.getFilters(filters, { limit: 1, signal }); return event; } diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 8cdd7d47..07512608 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -1,11 +1,13 @@ import { type AppController } from '@/app.ts'; -import { z } from '@/deps.ts'; +import { Debug, z } from '@/deps.ts'; import { type DittoFilter } from '@/filter.ts'; import { getAuthor, getFeedPubkeys } from '@/queries.ts'; import { Sub } from '@/subs.ts'; import { bech32ToPubkey } from '@/utils.ts'; import { renderStatus } from '@/views/mastodon/statuses.ts'; +const debug = Debug('ditto:streaming'); + /** * Streaming timelines/categories. * https://docs.joinmastodon.org/methods/streaming/#streams @@ -49,6 +51,7 @@ const streamingController: AppController = (c) => { function send(name: string, payload: object) { if (socket.readyState === WebSocket.OPEN) { + debug('send', name, JSON.stringify(payload)); socket.send(JSON.stringify({ event: name, payload: JSON.stringify(payload), diff --git a/src/controllers/api/timelines.ts b/src/controllers/api/timelines.ts index a29cdc89..c430ed92 100644 --- a/src/controllers/api/timelines.ts +++ b/src/controllers/api/timelines.ts @@ -3,7 +3,6 @@ import { type DittoFilter } from '@/filter.ts'; import * as mixer from '@/mixer.ts'; import { getFeedPubkeys } from '@/queries.ts'; import { booleanParamSchema } from '@/schema.ts'; -import { Time } from '@/utils.ts'; import { paginated, paginationSchema } from '@/utils/web.ts'; import { renderStatus } from '@/views/mastodon/statuses.ts'; @@ -33,10 +32,10 @@ const hashtagTimelineController: AppController = (c) => { }; /** Render statuses for timelines. */ -async function renderStatuses(c: AppContext, filters: DittoFilter<1>[]) { +async function renderStatuses(c: AppContext, filters: DittoFilter<1>[], signal = AbortSignal.timeout(1000)) { const events = await mixer.getFilters( filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })), - { timeout: Time.seconds(1) }, + { signal }, ); if (!events.length) { diff --git a/src/controllers/nostr/relay-info.ts b/src/controllers/nostr/relay-info.ts new file mode 100644 index 00000000..0edafbd3 --- /dev/null +++ b/src/controllers/nostr/relay-info.ts @@ -0,0 +1,19 @@ +import { AppController } from '@/app.ts'; +import { Conf } from '@/config.ts'; + +const relayInfoController: AppController = (c) => { + return c.json({ + name: 'Ditto', + description: 'Nostr and the Fediverse.', + pubkey: Conf.pubkey, + contact: `mailto:${Conf.adminEmail}`, + supported_nips: [1, 5, 9, 11, 45, 46, 98], + software: 'Ditto', + version: '0.0.0', + limitation: { + // TODO. + }, + }); +}; + +export { relayInfoController }; diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 9f5cd596..f4b32b38 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -1,3 +1,4 @@ +import { relayInfoController } from '@/controllers/nostr/relay-info.ts'; import * as eventsDB from '@/db/events.ts'; import * as pipeline from '@/pipeline.ts'; import { jsonSchema } from '@/schema.ts'; @@ -116,9 +117,14 @@ function prepareFilters(filters: ClientREQ[2][]): Filter[] { })); } -const relayController: AppController = (c) => { +const relayController: AppController = (c, next) => { const upgrade = c.req.header('upgrade'); + // NIP-11: https://github.com/nostr-protocol/nips/blob/master/11.md + if (c.req.header('accept') === 'application/nostr+json') { + return relayInfoController(c, next); + } + if (upgrade?.toLowerCase() !== 'websocket') { return c.text('Please use a Nostr client to connect.', 400); } diff --git a/src/cron.ts b/src/cron.ts index b29ab19b..bfaf773d 100644 --- a/src/cron.ts +++ b/src/cron.ts @@ -1,25 +1,12 @@ -import * as eventsDB from '@/db/events.ts'; import { deleteUnattachedMediaByUrl, getUnattachedMedia } from '@/db/unattached-media.ts'; import { cron } from '@/deps.ts'; import { Time } from '@/utils/time.ts'; import { configUploader as uploader } from '@/uploaders/config.ts'; import { cidFromUrl } from '@/utils/ipfs.ts'; -/** Clean up old remote events. */ -async function cleanupEvents() { - console.log('Cleaning up old remote events...'); - - const [result] = await eventsDB.deleteFilters([{ - until: Math.floor((Date.now() - Time.days(7)) / 1000), - local: false, - }]); - - console.log(`Cleaned up ${result?.numDeletedRows ?? 0} old remote events.`); -} - /** Delete files that aren't attached to any events. */ async function cleanupMedia() { - console.log('Deleting orphaned media files...'); + console.info('Deleting orphaned media files...'); const until = new Date(Date.now() - Time.minutes(15)); const media = await getUnattachedMedia(until); @@ -35,11 +22,8 @@ async function cleanupMedia() { } } - console.log(`Removed ${media?.length ?? 0} orphaned media files.`); + console.info(`Removed ${media?.length ?? 0} orphaned media files.`); } -await cleanupEvents(); await cleanupMedia(); - -cron.every15Minute(cleanupEvents); cron.every15Minute(cleanupMedia); diff --git a/src/db.ts b/src/db.ts index 4f1a8d91..25d6d783 100644 --- a/src/db.ts +++ b/src/db.ts @@ -3,7 +3,7 @@ import path from 'node:path'; import { FileMigrationProvider, Kysely, Migrator, PolySqliteDialect } from '@/deps.ts'; import { Conf } from '@/config.ts'; -import { getPragma, setPragma } from '@/pragma.ts'; +import { setPragma } from '@/pragma.ts'; import SqliteWorker from '@/workers/sqlite.ts'; interface DittoDB { @@ -89,12 +89,6 @@ await Promise.all([ setPragma(db, 'mmap_size', Conf.sqlite.mmapSize), ]); -// Log out PRAGMA values for debugging. -['journal_mode', 'synchronous', 'temp_store', 'mmap_size'].forEach(async (pragma) => { - const value = await getPragma(db, pragma); - console.log(`PRAGMA ${pragma} = ${value};`); -}); - const migrator = new Migrator({ db, provider: new FileMigrationProvider({ @@ -106,7 +100,7 @@ const migrator = new Migrator({ /** Migrate the database to the latest version. */ async function migrate() { - console.log('Running migrations...'); + console.info('Running migrations...'); const results = await migrator.migrateToLatest(); if (results.error) { @@ -114,11 +108,11 @@ async function migrate() { Deno.exit(1); } else { if (!results.results?.length) { - console.log('Everything up-to-date.'); + console.info('Everything up-to-date.'); } else { - console.log('Migrations finished!'); + console.info('Migrations finished!'); for (const { migrationName, status } of results.results!) { - console.log(` - ${migrationName}: ${status}`); + console.info(` - ${migrationName}: ${status}`); } } } diff --git a/src/db/events.ts b/src/db/events.ts index 9de88280..91368fe8 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -1,5 +1,5 @@ import { db, type DittoDB } from '@/db.ts'; -import { type Event, type SelectQueryBuilder } from '@/deps.ts'; +import { Debug, type Event, type SelectQueryBuilder } from '@/deps.ts'; import { isParameterizedReplaceableKind } from '@/kinds.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { EventData } from '@/types.ts'; @@ -7,6 +7,8 @@ import { isNostrId, isURL } from '@/utils.ts'; import type { DittoFilter, GetFiltersOpts } from '@/filter.ts'; +const debug = Debug('ditto:db:events'); + /** Function to decide whether or not to index a tag. */ type TagCondition = ({ event, count, value }: { event: Event; @@ -28,6 +30,8 @@ const tagConditions: Record = { /** Insert an event (and its tags) into the database. */ function insertEvent(event: Event, data: EventData): Promise { + debug('insertEvent', JSON.stringify(event)); + return db.transaction().execute(async (trx) => { /** Insert the event into the database. */ async function addEvent() { @@ -224,6 +228,7 @@ async function getFilters( opts: GetFiltersOpts = {}, ): Promise[]> { if (!filters.length) return Promise.resolve([]); + debug('REQ', JSON.stringify(filters)); let query = getFiltersQuery(filters); if (typeof opts.limit === 'number') { @@ -276,6 +281,7 @@ async function getFilters( /** Delete events based on filters from the database. */ function deleteFilters(filters: DittoFilter[]) { if (!filters.length) return Promise.resolve([]); + debug('deleteFilters', JSON.stringify(filters)); return db.transaction().execute(async (trx) => { const query = getFiltersQuery(filters).clearSelect().select('id'); @@ -293,6 +299,7 @@ function deleteFilters(filters: DittoFilter[]) { /** Get number of events that would be returned by filters. */ async function countFilters(filters: DittoFilter[]): Promise { if (!filters.length) return Promise.resolve(0); + debug('countFilters', JSON.stringify(filters)); const query = getFiltersQuery(filters); const [{ count }] = await query diff --git a/src/db/memorelay.test.ts b/src/db/memorelay.test.ts new file mode 100644 index 00000000..b0125cfc --- /dev/null +++ b/src/db/memorelay.test.ts @@ -0,0 +1,18 @@ +import { assertEquals } from '@/deps-test.ts'; + +import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' }; + +import { memorelay } from './memorelay.ts'; + +Deno.test('memorelay', async () => { + assertEquals(memorelay.hasEvent(event1), false); + assertEquals(memorelay.hasEventById(event1.id), false); + + memorelay.insertEvent(event1); + + assertEquals(memorelay.hasEvent(event1), true); + assertEquals(memorelay.hasEventById(event1.id), true); + + const result = await memorelay.getFilters([{ ids: [event1.id] }]); + assertEquals(result[0], event1); +}); diff --git a/src/db/memorelay.ts b/src/db/memorelay.ts new file mode 100644 index 00000000..4f6022e8 --- /dev/null +++ b/src/db/memorelay.ts @@ -0,0 +1,69 @@ +import { Debug, type Event, type Filter, LRUCache } from '@/deps.ts'; +import { getFilterId, type GetFiltersOpts, getMicroFilters, isMicrofilter } from '@/filter.ts'; + +const debug = Debug('ditto:memorelay'); + +const events = new LRUCache({ + max: 3000, + maxEntrySize: 5000, + sizeCalculation: (event) => JSON.stringify(event).length, +}); + +/** Get events from memory. */ +function getFilters(filters: Filter[], opts: GetFiltersOpts = {}): Promise[]> { + if (opts.signal?.aborted) return Promise.resolve([]); + if (!filters.length) return Promise.resolve([]); + debug('REQ', JSON.stringify(filters)); + + const results: Event[] = []; + + for (const filter of filters) { + if (isMicrofilter(filter)) { + const event = events.get(getFilterId(filter)); + if (event) { + results.push(event as Event); + } + } + } + + return Promise.resolve(results); +} + +/** Insert an event into memory. */ +function insertEvent(event: Event): void { + for (const microfilter of getMicroFilters(event)) { + const filterId = getFilterId(microfilter); + const existing = events.get(filterId); + if (!existing || event.created_at > existing.created_at) { + events.set(filterId, event); + } + } +} + +/** Check if an event is in memory. */ +function hasEvent(event: Event): boolean { + for (const microfilter of getMicroFilters(event)) { + const filterId = getFilterId(microfilter); + const existing = events.get(filterId); + if (existing) { + return true; + } + } + return false; +} + +/** Check if an event is in memory by ID. */ +function hasEventById(eventId: string): boolean { + const filterId = getFilterId({ ids: [eventId] }); + return events.has(filterId); +} + +/** In-memory data store for events using microfilters. */ +const memorelay = { + getFilters, + insertEvent, + hasEvent, + hasEventById, +}; + +export { memorelay }; diff --git a/src/db/users.ts b/src/db/users.ts index 0e8d9a08..7d56e1ce 100644 --- a/src/db/users.ts +++ b/src/db/users.ts @@ -1,7 +1,9 @@ -import { type Insertable } from '@/deps.ts'; +import { Debug, type Insertable } from '@/deps.ts'; import { db, type UserRow } from '../db.ts'; +const debug = Debug('ditto:users'); + interface User { pubkey: string; username: string; @@ -11,6 +13,7 @@ interface User { /** Adds a user to the database. */ function insertUser(user: Insertable) { + debug('insertUser', JSON.stringify(user)); return db.insertInto('users').values(user).execute(); } diff --git a/src/deps.ts b/src/deps.ts index 85cbd7b2..3e763a73 100644 --- a/src/deps.ts +++ b/src/deps.ts @@ -9,7 +9,7 @@ export { } from 'https://deno.land/x/hono@v3.10.1/mod.ts'; export { cors, logger, serveStatic } from 'https://deno.land/x/hono@v3.10.1/middleware.ts'; export { z } from 'https://deno.land/x/zod@v3.21.4/mod.ts'; -export { Author, RelayPoolWorker } from 'https://dev.jspm.io/nostr-relaypool@0.6.30'; +export { RelayPoolWorker } from 'https://dev.jspm.io/nostr-relaypool@0.6.30'; export { type Event, type EventTemplate, @@ -81,5 +81,9 @@ export { Machina } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/08a1 export * as Sentry from 'https://deno.land/x/sentry@7.78.0/index.js'; export { sentry as sentryMiddleware } from 'npm:@hono/sentry@^1.0.0'; export * as Comlink from 'npm:comlink@^4.4.1'; +export { EventEmitter } from 'npm:tseep@^1.1.3'; +export { default as stringifyStable } from 'npm:fast-stable-stringify@^1.0.0'; +// @deno-types="npm:@types/debug@^4.1.12" +export { default as Debug } from 'npm:debug@^4.3.4'; export type * as TypeFest from 'npm:type-fest@^4.3.0'; diff --git a/src/filter.test.ts b/src/filter.test.ts new file mode 100644 index 00000000..efc00d7c --- /dev/null +++ b/src/filter.test.ts @@ -0,0 +1,37 @@ +import { type Event } from '@/deps.ts'; +import { assertEquals } from '@/deps-test.ts'; + +import event0 from '~/fixtures/events/event-0.json' assert { type: 'json' }; +import event1 from '~/fixtures/events/event-1.json' assert { type: 'json' }; + +import { eventToMicroFilter, getFilterId, getMicroFilters, isMicrofilter } from './filter.ts'; + +Deno.test('getMicroFilters', () => { + const event = event0 as Event<0>; + const microfilters = getMicroFilters(event); + assertEquals(microfilters.length, 2); + assertEquals(microfilters[0], { authors: [event.pubkey], kinds: [0] }); + assertEquals(microfilters[1], { ids: [event.id] }); +}); + +Deno.test('eventToMicroFilter', () => { + assertEquals(eventToMicroFilter(event0), { authors: [event0.pubkey], kinds: [0] }); + assertEquals(eventToMicroFilter(event1), { ids: [event1.id] }); +}); + +Deno.test('isMicrofilter', () => { + assertEquals(isMicrofilter({ ids: [event0.id] }), true); + assertEquals(isMicrofilter({ authors: [event0.pubkey], kinds: [0] }), true); + assertEquals(isMicrofilter({ ids: [event0.id], authors: [event0.pubkey], kinds: [0] }), false); +}); + +Deno.test('getFilterId', () => { + assertEquals( + getFilterId({ ids: [event0.id] }), + '{"ids":["63d38c9b483d2d98a46382eadefd272e0e4bdb106a5b6eddb400c4e76f693d35"]}', + ); + assertEquals( + getFilterId({ authors: [event0.pubkey], kinds: [0] }), + '{"authors":["79c2cae114ea28a981e7559b4fe7854a473521a8d22a66bbab9fa248eb820ff6"],"kinds":[0]}', + ); +}); diff --git a/src/filter.ts b/src/filter.ts index 76a0fcde..926e3602 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -1,7 +1,7 @@ import { Conf } from '@/config.ts'; -import { type Event, type Filter, matchFilters } from '@/deps.ts'; - -import type { EventData } from '@/types.ts'; +import { type Event, type Filter, matchFilters, stringifyStable, z } from '@/deps.ts'; +import { nostrIdSchema } from '@/schemas/nostr.ts'; +import { type EventData } from '@/types.ts'; /** Additional properties that may be added by Ditto to events. */ type Relation = 'author' | 'author_stats' | 'event_stats'; @@ -14,12 +14,21 @@ interface DittoFilter extends Filter { relations?: Relation[]; } +/** Microfilter to get one specific event by ID. */ +type IdMicrofilter = { ids: [Event['id']] }; +/** Microfilter to get an author. */ +type AuthorMicrofilter = { kinds: [0]; authors: [Event['pubkey']] }; +/** Filter to get one specific event. */ +type MicroFilter = IdMicrofilter | AuthorMicrofilter; + /** Additional options to apply to the whole subscription. */ interface GetFiltersOpts { - /** How long to wait (in milliseconds) until aborting the request. */ - timeout?: number; + /** Signal to abort the request. */ + signal?: AbortSignal; /** Event limit for the whole subscription. */ limit?: number; + /** Relays to use, if applicable. */ + relays?: WebSocket['url'][]; } function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean { @@ -44,4 +53,55 @@ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData return false; } -export { type DittoFilter, type GetFiltersOpts, matchDittoFilters, type Relation }; +/** Get deterministic ID for a microfilter. */ +function getFilterId(filter: MicroFilter): string { + if ('ids' in filter) { + return stringifyStable({ ids: [filter.ids[0]] }); + } else { + return stringifyStable({ + kinds: [filter.kinds[0]], + authors: [filter.authors[0]], + }); + } +} + +/** Get a microfilter from a Nostr event. */ +function eventToMicroFilter(event: Event): MicroFilter { + const [microfilter] = getMicroFilters(event); + return microfilter; +} + +/** Get all the microfilters for an event, in order of priority. */ +function getMicroFilters(event: Event): MicroFilter[] { + const microfilters: MicroFilter[] = []; + if (event.kind === 0) { + microfilters.push({ kinds: [0], authors: [event.pubkey] }); + } + microfilters.push({ ids: [event.id] }); + return microfilters; +} + +/** Microfilter schema. */ +const microFilterSchema = z.union([ + z.object({ ids: z.tuple([nostrIdSchema]) }).strict(), + z.object({ kinds: z.tuple([z.literal(0)]), authors: z.tuple([nostrIdSchema]) }).strict(), +]); + +/** Checks whether the filter is a microfilter. */ +function isMicrofilter(filter: Filter): filter is MicroFilter { + return microFilterSchema.safeParse(filter).success; +} + +export { + type AuthorMicrofilter, + type DittoFilter, + eventToMicroFilter, + getFilterId, + type GetFiltersOpts, + getMicroFilters, + type IdMicrofilter, + isMicrofilter, + matchDittoFilters, + type MicroFilter, + type Relation, +}; diff --git a/src/firehose.ts b/src/firehose.ts index ea6be62a..b95d33f7 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -1,9 +1,11 @@ -import { type Event } from '@/deps.ts'; +import { Debug, type Event } from '@/deps.ts'; import { activeRelays, pool } from '@/pool.ts'; import { nostrNow } from '@/utils.ts'; import * as pipeline from './pipeline.ts'; +const debug = Debug('ditto:firehose'); + // This file watches events on all known relays and performs // side-effects based on them, such as trending hashtag tracking // and storing events for notifications and the home feed. @@ -17,7 +19,7 @@ pool.subscribe( /** Handle events through the firehose pipeline. */ function handleEvent(event: Event): Promise { - console.info(`firehose: Event<${event.kind}> ${event.id}`); + debug(`Event<${event.kind}> ${event.id}`); return pipeline .handleEvent(event) diff --git a/src/middleware/cache.ts b/src/middleware/cache.ts index 932632b4..87de611a 100644 --- a/src/middleware/cache.ts +++ b/src/middleware/cache.ts @@ -1,6 +1,7 @@ +import { Debug, type MiddlewareHandler } from '@/deps.ts'; import ExpiringCache from '@/utils/expiring-cache.ts'; -import type { MiddlewareHandler } from '@/deps.ts'; +const debug = Debug('ditto:middleware:cache'); export const cache = (options: { cacheName: string; @@ -11,14 +12,14 @@ export const cache = (options: { const cache = new ExpiringCache(await caches.open(options.cacheName)); const response = await cache.match(key); if (!response) { - console.debug('Building cache for page', c.req.url); + debug('Building cache for page', c.req.url); await next(); const response = c.res.clone(); if (response.status < 500) { await cache.putExpiring(key, response, options.expires ?? 0); } } else { - console.debug('Serving page from cache', c.req.url); + debug('Serving page from cache', c.req.url); return response; } }; diff --git a/src/note.ts b/src/note.ts index 93689e94..a19a7936 100644 --- a/src/note.ts +++ b/src/note.ts @@ -43,7 +43,7 @@ interface ParsedNoteContent { function parseNoteContent(content: string): ParsedNoteContent { // Parsing twice is ineffecient, but I don't know how to do only once. const html = linkifyStr(content, linkifyOpts); - const links = linkify.find(content).filter(isValidLink); + const links = linkify.find(content).filter(isLinkURL); const firstUrl = links.find(isNonMediaLink)?.href; return { @@ -77,15 +77,9 @@ function isNonMediaLink({ href }: Link): boolean { return /^https?:\/\//.test(href) && !getUrlMimeType(href); } -/** Ensures the URL can be parsed. Why linkifyjs doesn't already guarantee this, idk... */ -function isValidLink(link: Link): boolean { - try { - new URL(link.href); - return true; - } catch (_e) { - console.error(`Invalid link: ${link.href}`); - return false; - } +/** Ensures the Link is a URL so it can be parsed. */ +function isLinkURL(link: Link): boolean { + return link.type === 'url'; } /** `npm:mime` treats `.com` as a file extension, so parse the full URL to get its path first. */ diff --git a/src/pipeline.ts b/src/pipeline.ts index 4f0c51ab..577aaae7 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,21 +1,24 @@ import { Conf } from '@/config.ts'; import * as eventsDB from '@/db/events.ts'; +import { memorelay } from '@/db/memorelay.ts'; import { addRelays } from '@/db/relays.ts'; import { deleteAttachedMedia } from '@/db/unattached-media.ts'; import { findUser } from '@/db/users.ts'; -import { type Event, LRUCache } from '@/deps.ts'; +import { Debug, type Event } from '@/deps.ts'; import { isEphemeralKind } from '@/kinds.ts'; import * as mixer from '@/mixer.ts'; import { publish } from '@/pool.ts'; import { isLocallyFollowed } from '@/queries.ts'; +import { reqmeister } from '@/reqmeister.ts'; import { updateStats } from '@/stats.ts'; import { Sub } from '@/subs.ts'; import { getTagSet } from '@/tags.ts'; +import { type EventData } from '@/types.ts'; import { eventAge, isRelay, nostrDate, Time } from '@/utils.ts'; import { TrendsWorker } from '@/workers/trends.ts'; import { verifySignatureWorker } from '@/workers/verify.ts'; -import type { EventData } from '@/types.ts'; +const debug = Debug('ditto:pipeline'); /** * Common pipeline function to process (and maybe store) events. @@ -23,28 +26,29 @@ import type { EventData } from '@/types.ts'; */ async function handleEvent(event: Event): Promise { if (!(await verifySignatureWorker(event))) return; + const wanted = reqmeister.isWanted(event); if (encounterEvent(event)) return; + debug(`Event<${event.kind}> ${event.id}`); const data = await getEventData(event); await Promise.all([ - storeEvent(event, data), + storeEvent(event, data, { force: wanted }), processDeletions(event), trackRelays(event), trackHashtags(event), + fetchRelatedEvents(event, data), processMedia(event, data), streamOut(event, data), broadcast(event, data), ]); } -/** Tracks encountered events to skip duplicates, improving idempotency and performance. */ -const encounters = new LRUCache({ max: 1000 }); - /** Encounter the event, and return whether it has already been encountered. */ -function encounterEvent(event: Event) { - const result = encounters.get(event.id); - encounters.set(event.id, true); - return result; +function encounterEvent(event: Event): boolean { + const preexisting = memorelay.hasEvent(event); + memorelay.insertEvent(event); + reqmeister.encounter(event); + return preexisting; } /** Preload data that will be useful to several tasks. */ @@ -56,22 +60,27 @@ async function getEventData({ pubkey }: Event): Promise { /** Check if the pubkey is the `DITTO_NSEC` pubkey. */ const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey; -/** Maybe store the event, if eligible. */ -async function storeEvent(event: Event, data: EventData): Promise { - if (isEphemeralKind(event.kind)) return; +interface StoreEventOpts { + force?: boolean; +} - if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { +/** Maybe store the event, if eligible. */ +async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = {}): Promise { + if (isEphemeralKind(event.kind)) return; + const { force = false } = opts; + + if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { const [deletion] = await mixer.getFilters( [{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }], - { limit: 1, timeout: Time.seconds(1) }, + { limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) }, ); if (deletion) { return Promise.reject(new RelayError('blocked', 'event was deleted')); } else { await Promise.all([ - eventsDB.insertEvent(event, data).catch(console.warn), - updateStats(event).catch(console.warn), + eventsDB.insertEvent(event, data).catch(debug), + updateStats(event).catch(debug), ]); } } else { @@ -105,7 +114,7 @@ async function trackHashtags(event: Event): Promise { if (!tags.length) return; try { - console.info('tracking tags:', tags); + debug('tracking tags:', JSON.stringify(tags)); await TrendsWorker.addTagUsages(event.pubkey, tags, date); } catch (_e) { // do nothing @@ -128,6 +137,18 @@ function trackRelays(event: Event) { return addRelays([...relays]); } +/** Queue related events to fetch. */ +function fetchRelatedEvents(event: Event, data: EventData) { + if (!data.user) { + reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {}); + } + for (const [name, id, relay] of event.tags) { + if (name === 'e' && !memorelay.hasEventById(id)) { + reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {}); + } + } +} + /** Delete unattached media entries that are attached to the event. */ function processMedia({ tags, pubkey }: Event, { user }: EventData) { if (user) { diff --git a/src/pool.ts b/src/pool.ts index da186160..b92f4ebe 100644 --- a/src/pool.ts +++ b/src/pool.ts @@ -1,5 +1,7 @@ import { getActiveRelays } from '@/db/relays.ts'; -import { type Event, RelayPoolWorker } from '@/deps.ts'; +import { Debug, type Event, RelayPoolWorker } from '@/deps.ts'; + +const debug = Debug('ditto:pool'); const activeRelays = await getActiveRelays(); @@ -17,6 +19,7 @@ const pool = new RelayPoolWorker(worker, activeRelays, { /** Publish an event to the given relays, or the entire pool. */ function publish(event: Event, relays: string[] = activeRelays) { + debug('publish', event); return pool.publish(event, relays); } diff --git a/src/queries.ts b/src/queries.ts index edd5f4aa..5f03b6e2 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -1,11 +1,13 @@ import * as eventsDB from '@/db/events.ts'; import { type Event, findReplyTag } from '@/deps.ts'; -import { type DittoFilter, type Relation } from '@/filter.ts'; +import { type AuthorMicrofilter, type DittoFilter, type IdMicrofilter, type Relation } from '@/filter.ts'; import * as mixer from '@/mixer.ts'; +import { reqmeister } from '@/reqmeister.ts'; +import { memorelay } from '@/db/memorelay.ts'; interface GetEventOpts { - /** Timeout in milliseconds. */ - timeout?: number; + /** Signal to abort the request. */ + signal?: AbortSignal; /** Event kind. */ kind?: K; /** Relations to include on the event. */ @@ -17,36 +19,73 @@ const getEvent = async ( id: string, opts: GetEventOpts = {}, ): Promise | undefined> => { - const { kind, relations, timeout = 1000 } = opts; + const { kind, relations, signal = AbortSignal.timeout(1000) } = opts; + const microfilter: IdMicrofilter = { ids: [id] }; + + const [memoryEvent] = await memorelay.getFilters([microfilter], opts) as eventsDB.DittoEvent[]; + + if (memoryEvent && !relations) { + return memoryEvent; + } + const filter: DittoFilter = { ids: [id], relations, limit: 1 }; if (kind) { filter.kinds = [kind]; } - const [event] = await mixer.getFilters([filter], { limit: 1, timeout }); - return event; + + const dbEvent = await eventsDB.getFilters([filter], { limit: 1, signal }) + .then(([event]) => event); + + // TODO: make this DRY-er. + + if (dbEvent && !dbEvent.author) { + const [author] = await memorelay.getFilters([{ kinds: [0], authors: [dbEvent.pubkey] }], opts); + dbEvent.author = author; + } + + if (dbEvent) return dbEvent; + + if (memoryEvent && !memoryEvent.author) { + const [author] = await memorelay.getFilters([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts); + memoryEvent.author = author; + } + + if (memoryEvent) return memoryEvent; + + return await reqmeister.req(microfilter, opts).catch(() => undefined) as Event | undefined; }; /** Get a Nostr `set_medatadata` event for a user's pubkey. */ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise | undefined> => { - const { relations, timeout = 1000 } = opts; + const { relations, signal = AbortSignal.timeout(1000) } = opts; + const microfilter: AuthorMicrofilter = { kinds: [0], authors: [pubkey] }; - const [event] = await mixer.getFilters( + const [memoryEvent] = await memorelay.getFilters([microfilter], opts); + + if (memoryEvent && !relations) { + return memoryEvent; + } + + const dbEvent = await eventsDB.getFilters( [{ authors: [pubkey], relations, kinds: [0], limit: 1 }], - { limit: 1, timeout }, - ); + { limit: 1, signal }, + ).then(([event]) => event); - return event; + if (dbEvent) return dbEvent; + if (memoryEvent) return memoryEvent; + + return reqmeister.req(microfilter, opts).catch(() => undefined); }; /** Get users the given pubkey follows. */ -const getFollows = async (pubkey: string, timeout = 1000): Promise | undefined> => { - const [event] = await mixer.getFilters([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, timeout }); +const getFollows = async (pubkey: string, signal = AbortSignal.timeout(1000)): Promise | undefined> => { + const [event] = await mixer.getFilters([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal }); return event; }; /** Get pubkeys the user follows. */ -async function getFollowedPubkeys(pubkey: string): Promise { - const event = await getFollows(pubkey); +async function getFollowedPubkeys(pubkey: string, signal?: AbortSignal): Promise { + const event = await getFollows(pubkey, signal); if (!event) return []; return event.tags @@ -78,10 +117,10 @@ async function getAncestors(event: Event<1>, result = [] as Event<1>[]): Promise return result.reverse(); } -function getDescendants(eventId: string): Promise[]> { +function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Promise[]> { return mixer.getFilters( [{ kinds: [1], '#e': [eventId], relations: ['author', 'event_stats', 'author_stats'] }], - { limit: 200, timeout: 2000 }, + { limit: 200, signal }, ); } diff --git a/src/reqmeister.ts b/src/reqmeister.ts new file mode 100644 index 00000000..cff8cb98 --- /dev/null +++ b/src/reqmeister.ts @@ -0,0 +1,113 @@ +import * as client from '@/client.ts'; +import { Debug, type Event, EventEmitter, type Filter } from '@/deps.ts'; +import { AuthorMicrofilter, eventToMicroFilter, getFilterId, IdMicrofilter, type MicroFilter } from '@/filter.ts'; +import { Time } from '@/utils/time.ts'; + +const debug = Debug('ditto:reqmeister'); + +interface ReqmeisterOpts { + delay?: number; + timeout?: number; +} + +interface ReqmeisterReqOpts { + relays?: WebSocket['url'][]; + signal?: AbortSignal; +} + +type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]]; + +/** Batches requests to Nostr relays using microfilters. */ +class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> { + #opts: ReqmeisterOpts; + #queue: ReqmeisterQueueItem[] = []; + #promise!: Promise; + #resolve!: () => void; + + constructor(opts: ReqmeisterOpts = {}) { + super(); + this.#opts = opts; + this.#tick(); + this.#perform(); + } + + #tick() { + this.#resolve?.(); + this.#promise = new Promise((resolve) => { + this.#resolve = resolve; + }); + } + + async #perform() { + const { delay, timeout = Time.seconds(1) } = this.#opts; + await new Promise((resolve) => setTimeout(resolve, delay)); + + const queue = this.#queue; + this.#queue = []; + + const wantedEvents = new Set(); + const wantedAuthors = new Set(); + + // TODO: batch by relays. + for (const [_filterId, filter, _relays] of queue) { + if ('ids' in filter) { + filter.ids.forEach((id) => wantedEvents.add(id)); + } else { + wantedAuthors.add(filter.authors[0]); + } + } + + const filters: Filter[] = []; + + if (wantedEvents.size) filters.push({ ids: [...wantedEvents] }); + if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] }); + + if (filters.length) { + debug('REQ', JSON.stringify(filters)); + const events = await client.getFilters(filters, { signal: AbortSignal.timeout(timeout) }); + + for (const event of events) { + this.encounter(event); + } + } + + this.#tick(); + this.#perform(); + } + + req(filter: IdMicrofilter, opts?: ReqmeisterReqOpts): Promise; + req(filter: AuthorMicrofilter, opts?: ReqmeisterReqOpts): Promise>; + req(filter: MicroFilter, opts?: ReqmeisterReqOpts): Promise; + req(filter: MicroFilter, opts: ReqmeisterReqOpts = {}): Promise { + const { relays = [], signal } = opts; + if (signal?.aborted) return Promise.reject(new DOMException('Aborted', 'AbortError')); + + const filterId = getFilterId(filter); + + this.#queue.push([filterId, filter, relays]); + + return new Promise((resolve, reject) => { + this.once(filterId, resolve); + this.#promise.finally(() => setTimeout(reject, 0)); + signal?.addEventListener('abort', () => reject(new DOMException('Aborted', 'AbortError')), { once: true }); + }); + } + + encounter(event: Event): void { + const filterId = getFilterId(eventToMicroFilter(event)); + this.#queue = this.#queue.filter(([id]) => id !== filterId); + this.emit(filterId, event); + } + + isWanted(event: Event): boolean { + const filterId = getFilterId(eventToMicroFilter(event)); + return this.#queue.some(([id]) => id === filterId); + } +} + +const reqmeister = new Reqmeister({ + delay: Time.seconds(1), + timeout: Time.seconds(1), +}); + +export { reqmeister }; diff --git a/src/sign.ts b/src/sign.ts index 0662668d..de7149fe 100644 --- a/src/sign.ts +++ b/src/sign.ts @@ -1,13 +1,15 @@ import { type AppContext } from '@/app.ts'; import { Conf } from '@/config.ts'; import { decryptAdmin, encryptAdmin } from '@/crypto.ts'; -import { type Event, type EventTemplate, finishEvent, HTTPException } from '@/deps.ts'; +import { Debug, type Event, type EventTemplate, finishEvent, HTTPException } from '@/deps.ts'; import { connectResponseSchema } from '@/schemas/nostr.ts'; import { jsonSchema } from '@/schema.ts'; import { Sub } from '@/subs.ts'; import { eventMatchesTemplate, Time } from '@/utils.ts'; import { createAdminEvent } from '@/utils/web.ts'; +const debug = Debug('ditto:sign'); + interface SignEventOpts { /** Target proof-of-work difficulty for the signed event. */ pow?: number; @@ -28,10 +30,12 @@ async function signEvent( const header = c.req.header('x-nostr-sign'); if (seckey) { + debug(`Signing Event<${event.kind}> with secret key`); return finishEvent(event, seckey); } if (header) { + debug(`Signing Event<${event.kind}> with NIP-46`); return await signNostrConnect(event, c, opts); } diff --git a/src/stats.ts b/src/stats.ts index 32208df8..b4620218 100644 --- a/src/stats.ts +++ b/src/stats.ts @@ -1,6 +1,6 @@ import { type AuthorStatsRow, db, type DittoDB, type EventStatsRow } from '@/db.ts'; import * as eventsDB from '@/db/events.ts'; -import { type Event, findReplyTag, type InsertQueryBuilder } from '@/deps.ts'; +import { Debug, type Event, findReplyTag, type InsertQueryBuilder } from '@/deps.ts'; type AuthorStat = keyof Omit; type EventStat = keyof Omit; @@ -9,6 +9,8 @@ type AuthorStatDiff = ['author_stats', pubkey: string, stat: AuthorStat, diff: n type EventStatDiff = ['event_stats', eventId: string, stat: EventStat, diff: number]; type StatDiff = AuthorStatDiff | EventStatDiff; +const debug = Debug('ditto:stats'); + /** Store stats for the event in LMDB. */ async function updateStats(event: Event) { let prev: Event | undefined; @@ -26,6 +28,10 @@ async function updateStats(event: Event) { const pubkeyDiffs = statDiffs.filter(([table]) => table === 'author_stats') as AuthorStatDiff[]; const eventDiffs = statDiffs.filter(([table]) => table === 'event_stats') as EventStatDiff[]; + if (statDiffs.length) { + debug(JSON.stringify({ id: event.id, pubkey: event.pubkey, kind: event.kind, tags: event.tags, statDiffs })); + } + if (pubkeyDiffs.length) queries.push(authorStatsQuery(pubkeyDiffs)); if (eventDiffs.length) queries.push(eventStatsQuery(eventDiffs)); diff --git a/src/subs.ts b/src/subs.ts index f9d66061..c716e018 100644 --- a/src/subs.ts +++ b/src/subs.ts @@ -1,9 +1,11 @@ -import { type Event } from '@/deps.ts'; +import { Debug, type Event } from '@/deps.ts'; import { Subscription } from '@/subscription.ts'; import type { DittoFilter } from '@/filter.ts'; import type { EventData } from '@/types.ts'; +const debug = Debug('ditto:subs'); + /** * Manages Ditto event subscriptions. * Subscriptions can be added, removed, and matched against events. @@ -21,6 +23,7 @@ class SubscriptionStore { * ``` */ sub(socket: unknown, id: string, filters: DittoFilter[]): Subscription { + debug('sub', id, JSON.stringify(filters)); let subs = this.#store.get(socket); if (!subs) { @@ -38,12 +41,14 @@ class SubscriptionStore { /** Remove a subscription from the store. */ unsub(socket: unknown, id: string): void { + debug('unsub', id); this.#store.get(socket)?.get(id)?.close(); this.#store.get(socket)?.delete(id); } /** Remove an entire socket. */ close(socket: unknown): void { + debug('close', socket); const subs = this.#store.get(socket); if (subs) { diff --git a/src/utils/nip05.ts b/src/utils/nip05.ts index b655175f..4fa38084 100644 --- a/src/utils/nip05.ts +++ b/src/utils/nip05.ts @@ -1,18 +1,20 @@ -import { TTLCache, z } from '@/deps.ts'; +import { Debug, TTLCache, z } from '@/deps.ts'; import { Time } from '@/utils/time.ts'; import { fetchWorker } from '@/workers/fetch.ts'; +const debug = Debug('ditto:nip05'); + const nip05Cache = new TTLCache>({ ttl: Time.hours(1), max: 5000 }); const NIP05_REGEX = /^(?:([\w.+-]+)@)?([\w.-]+)$/; interface LookupOpts { - timeout?: number; + signal?: AbortSignal; } /** Get pubkey from NIP-05. */ async function lookup(value: string, opts: LookupOpts = {}): Promise { - const { timeout = 2000 } = opts; + const { signal = AbortSignal.timeout(2000) } = opts; const match = value.match(NIP05_REGEX); if (!match) return null; @@ -21,7 +23,7 @@ async function lookup(value: string, opts: LookupOpts = {}): Promise { const cached = nip05Cache.get(value); if (cached !== undefined) return cached; - console.log(`Looking up NIP-05 for ${value}`); + debug(`Lookup ${value}`); const result = lookup(value); nip05Cache.set(value, result); diff --git a/src/utils/unfurl.ts b/src/utils/unfurl.ts index 45bdfc9e..9f03c0f2 100644 --- a/src/utils/unfurl.ts +++ b/src/utils/unfurl.ts @@ -1,7 +1,9 @@ -import { TTLCache, unfurl } from '@/deps.ts'; +import { Debug, TTLCache, unfurl } from '@/deps.ts'; import { Time } from '@/utils/time.ts'; import { fetchWorker } from '@/workers/fetch.ts'; +const debug = Debug('ditto:unfurl'); + interface PreviewCard { url: string; title: string; @@ -20,7 +22,7 @@ interface PreviewCard { } async function unfurlCard(url: string, signal: AbortSignal): Promise { - console.log(`Unfurling ${url}...`); + debug(`Unfurling ${url}...`); try { const result = await unfurl(url, { fetch: (url) => fetchWorker(url, { signal }), @@ -60,12 +62,12 @@ const previewCardCache = new TTLCache>({ }); /** Unfurl card from cache if available, otherwise fetch it. */ -function unfurlCardCached(url: string, timeout = Time.seconds(1)): Promise { +function unfurlCardCached(url: string, signal = AbortSignal.timeout(1000)): Promise { const cached = previewCardCache.get(url); if (cached !== undefined) { return cached; } else { - const card = unfurlCard(url, AbortSignal.timeout(timeout)); + const card = unfurlCard(url, signal); previewCardCache.set(url, card); return card; } diff --git a/src/workers/fetch.worker.ts b/src/workers/fetch.worker.ts index 2988a2e7..8a2c0b11 100644 --- a/src/workers/fetch.worker.ts +++ b/src/workers/fetch.worker.ts @@ -1,13 +1,16 @@ -import { Comlink } from '@/deps.ts'; +import { Comlink, Debug } from '@/deps.ts'; import './handlers/abortsignal.ts'; +const debug = Debug('ditto:fetch.worker'); + export const FetchWorker = { async fetch( url: string, init: Omit, signal: AbortSignal | null | undefined, ): Promise<[BodyInit, ResponseInit]> { + debug(init.method, url); const response = await fetch(url, { ...init, signal }); return [ await response.arrayBuffer(), diff --git a/src/workers/sqlite.worker.ts b/src/workers/sqlite.worker.ts index 6f422132..564149ac 100644 --- a/src/workers/sqlite.worker.ts +++ b/src/workers/sqlite.worker.ts @@ -1,9 +1,10 @@ /// -import { Comlink, type CompiledQuery, DenoSqlite3, type QueryResult, Sentry } from '@/deps.ts'; +import { Comlink, type CompiledQuery, Debug, DenoSqlite3, type QueryResult, Sentry } from '@/deps.ts'; import '@/sentry.ts'; let db: DenoSqlite3 | undefined; +const debug = Debug('ditto:sqlite.worker'); export const SqliteWorker = { open(path: string): void { @@ -11,6 +12,7 @@ export const SqliteWorker = { }, executeQuery({ sql, parameters }: CompiledQuery): QueryResult { if (!db) throw new Error('Database not open'); + debug(sql); const result: QueryResult = Sentry.startSpan({ name: sql, op: 'db.query' }, () => { return {