diff --git a/deno.json b/deno.json index 5d7e9cf8..f08048f5 100644 --- a/deno.json +++ b/deno.json @@ -6,6 +6,7 @@ "dev": "deno run -A --watch src/server.ts", "hook": "deno run --allow-read --allow-run --allow-write https://deno.land/x/deno_hooks@0.1.1/mod.ts", "db:migrate": "deno run -A scripts/db-migrate.ts", + "nostr:pull": "deno run -A scripts/nostr-pull.ts", "debug": "deno run -A --inspect src/server.ts", "test": "DATABASE_URL=\"sqlite://:memory:\" deno test -A --junit-path=./deno-test.xml", "check": "deno check src/server.ts", @@ -26,7 +27,7 @@ "@hono/hono": "jsr:@hono/hono@^4.4.6", "@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1", "@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0", - "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.23.3", + "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.25.0", "@scure/base": "npm:@scure/base@^1.1.6", "@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs", "@soapbox/kysely-deno-sqlite": "jsr:@soapbox/kysely-deno-sqlite@^2.1.0", diff --git a/deno.lock b/deno.lock index c10b3fe8..5018b6ce 100644 --- a/deno.lock +++ b/deno.lock @@ -12,7 +12,7 @@ "jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.5", "jsr:@nostrify/nostrify@^0.22.4": "jsr:@nostrify/nostrify@0.22.4", "jsr:@nostrify/nostrify@^0.22.5": "jsr:@nostrify/nostrify@0.22.5", - "jsr:@nostrify/nostrify@^0.23.3": "jsr:@nostrify/nostrify@0.23.3", + "jsr:@nostrify/nostrify@^0.25.0": "jsr:@nostrify/nostrify@0.25.0", "jsr:@soapbox/kysely-deno-sqlite@^2.1.0": "jsr:@soapbox/kysely-deno-sqlite@2.2.0", "jsr:@soapbox/stickynotes@^0.4.0": "jsr:@soapbox/stickynotes@0.4.0", "jsr:@std/assert@^0.217.0": "jsr:@std/assert@0.217.0", @@ -138,8 +138,8 @@ "npm:zod@^3.23.8" ] }, - "@nostrify/nostrify@0.23.3": { - "integrity": "868b10dd094801e28f4982ef9815f0d43f2a807b6f8ad291c78ecb3eb291605a", + "@nostrify/nostrify@0.25.0": { + "integrity": "98f26f44e95ac87fc91b3f3809d38432e1a7f6aebf10380b2554b6f9526313c6", "dependencies": [ "jsr:@std/encoding@^0.224.1", "npm:@scure/base@^1.1.6", @@ -1408,6 +1408,10 @@ "https://gitlab.com/soapbox-pub/kysely-deno-postgres/-/raw/b4725e74ad6ca359ba0e370b55dbb8bb845a8a83/mod.ts": "662438fd3909984bb8cbaf3fd44d2121e949d11301baf21d6c3f057ccf9887de", "https://gitlab.com/soapbox-pub/kysely-deno-postgres/-/raw/b4725e74ad6ca359ba0e370b55dbb8bb845a8a83/src/PostgreSQLDriver.ts": "ea5a523bceeed420858b744beeb95d48976cb2b0d3f519a68b65a8229036cf6a", "https://gitlab.com/soapbox-pub/kysely-deno-postgres/-/raw/b4725e74ad6ca359ba0e370b55dbb8bb845a8a83/src/PostgreSQLDriverDatabaseConnection.ts": "11e2fc10a3abb3d0729613c4b7cdb9cb73b597fd77353311bb6707c73a635fc5", + "https://gitlab.com/soapbox-pub/kysely-deno-postgres/-/raw/c6869b9e12d74af78a846ad503d84493f5db9df4/deps.ts": "b3dbecae69c30a5f161323b8c8ebd91d9af1eceb98fafab3091c7281a4b64fed", + "https://gitlab.com/soapbox-pub/kysely-deno-postgres/-/raw/c6869b9e12d74af78a846ad503d84493f5db9df4/mod.ts": "662438fd3909984bb8cbaf3fd44d2121e949d11301baf21d6c3f057ccf9887de", + "https://gitlab.com/soapbox-pub/kysely-deno-postgres/-/raw/c6869b9e12d74af78a846ad503d84493f5db9df4/src/PostgreSQLDriver.ts": "0f5d1bc2b24d4e0052e38ee289fb2f5e8e1470544f61aa2afe65e1059bf35dfb", + "https://gitlab.com/soapbox-pub/kysely-deno-postgres/-/raw/c6869b9e12d74af78a846ad503d84493f5db9df4/src/PostgreSQLDriverDatabaseConnection.ts": "e5d4e0fc9737c3ec253e679a51f5b43d2bb9a3386c147b7b1d14f4f5a5f734f1", "https://gitlab.com/soapbox-pub/kysely-deno-postgres/-/raw/f2948b86190a10faa293588775e162b3a8b52e70/deps.ts": "b3dbecae69c30a5f161323b8c8ebd91d9af1eceb98fafab3091c7281a4b64fed", "https://gitlab.com/soapbox-pub/kysely-deno-postgres/-/raw/f2948b86190a10faa293588775e162b3a8b52e70/mod.ts": "662438fd3909984bb8cbaf3fd44d2121e949d11301baf21d6c3f057ccf9887de", "https://gitlab.com/soapbox-pub/kysely-deno-postgres/-/raw/f2948b86190a10faa293588775e162b3a8b52e70/src/PostgreSQLDriver.ts": "ac1a39e86fd676973bce215e19db1f26b82408b8f2bb09a3601802974ea7cec6", @@ -1433,7 +1437,7 @@ "jsr:@bradenmacdonald/s3-lite-client@^0.7.4", "jsr:@db/sqlite@^0.11.1", "jsr:@hono/hono@^4.4.6", - "jsr:@nostrify/nostrify@^0.23.3", + "jsr:@nostrify/nostrify@^0.25.0", "jsr:@soapbox/kysely-deno-sqlite@^2.1.0", "jsr:@soapbox/stickynotes@^0.4.0", "jsr:@std/assert@^0.225.1", diff --git a/scripts/nostr-pull.ts b/scripts/nostr-pull.ts new file mode 100644 index 00000000..1a236f8f --- /dev/null +++ b/scripts/nostr-pull.ts @@ -0,0 +1,157 @@ +/** + * Script to import a user/list of users into Ditto given their npub/pubkey + * by looking them up on a list of relays. + */ + +import { NostrEvent, NRelay1, NSchema } from '@nostrify/nostrify'; +import { nip19 } from 'nostr-tools'; + +import { DittoDB } from '@/db/DittoDB.ts'; +import { EventsDB } from '@/storages/EventsDB.ts'; + +const kysely = await DittoDB.getInstance(); +const eventsDB = new EventsDB(kysely); + +interface ImportEventsOpts { + profilesOnly: boolean; +} + +type DoEvent = (event: NostrEvent) => void | Promise; +const importUsers = async ( + authors: string[], + relays: string[], + opts?: Partial, + doEvent: DoEvent = async (event: NostrEvent) => await eventsDB.event(event), +) => { + // Kind 0s + follow lists. + const profiles: Record> = {}; + // Kind 1s. + const notes = new Set(); + const { profilesOnly = false } = opts || {}; + + await Promise.all(relays.map(async (relay) => { + if (!relay.startsWith('wss://')) console.error(`Invalid relay url ${relay}`); + const conn = new NRelay1(relay); + const matched = await conn.query([{ kinds: [0, 3], authors, limit: 1000 }]); + + if (!profilesOnly) { + matched.push( + ...await conn.query( + authors.map((author) => ({ kinds: [1], authors: [author], limit: 200 })), + ), + ); + } + + await conn.close(); + await Promise.all( + matched.map(async (event) => { + const { kind, pubkey } = event; + if (kind === 1 && !notes.has(event.id)) { + // add the event to eventsDB only if it has not been found already. + notes.add(event.id); + await doEvent(event); + return; + } + + profiles[pubkey] ??= {}; + const existing = profiles[pubkey][kind]; + if (existing?.created_at > event.created_at) return; + else profiles[pubkey][kind] = event; + }), + ); + })); + + for (const user in profiles) { + const profile = profiles[user]; + for (const kind in profile) { + await doEvent(profile[kind]); + } + + let name = user; + // kind 0, not first idx + const event = profile[0]; + if (event) { + // if event exists, print name + const parsed = JSON.parse(event.content); + name = parsed.nip05 || parsed.name || name; + } + if (NSchema.id().safeParse(name).success) { + // if no kind 0 found and this is a pubkey, encode as npub + name = nip19.npubEncode(name); + } + console.info(`Imported user ${name}${profilesOnly ? "'s profile" : ''}.`); + } +}; + +if (import.meta.main) { + if (!Deno.args.length) { + showHelp(); + Deno.exit(1); + } + const pubkeys: string[] = []; + const relays: string[] = []; + + const opts: Partial = {}; + + let optionsEnd = false; + let relaySectionBegun = false; + for (const arg of Deno.args) { + if (arg.startsWith('-')) { + if (optionsEnd) { + console.error('Option encountered after end of options section.'); + showUsage(); + Deno.exit(1); + } + switch (arg) { + case '-p': + case '--profile-only': + console.info('Only importing profiles.'); + opts.profilesOnly = true; + break; + } + } else if (arg.startsWith('npub1')) { + optionsEnd = true; + + if (relaySectionBegun) { + console.error('npub specified in relay section'); + Deno.exit(1); + } + const decoded = nip19.decode(arg as `npub1${string}`).data; + if (!NSchema.id().safeParse(decoded).success) { + console.error(`invalid pubkey ${arg}, skipping...`); + continue; + } + pubkeys.push(decoded); + } else if (NSchema.id().safeParse(arg).success) { + pubkeys.push(arg); + } else { + relaySectionBegun = true; + if (!arg.startsWith('wss://')) { + console.error(`invalid relay url ${arg}, skipping...`); + } + relays.push(arg); + } + } + + await importUsers(pubkeys, relays, opts); + Deno.exit(0); +} + +function showHelp() { + console.info('ditto - db:import'); + console.info("Import users' posts and kind 0s from a given set of relays.\n"); + showUsage(); + console.info(` +OPTIONS: + +-p, --profile-only + Only import profiles and not posts. Default: off. +`); +} + +function showUsage() { + console.info( + 'Usage: deno task db:import [options] npub1xxxxxx[ npub1yyyyyyy]...' + + ' wss://first.relay[ second.relay]...', + ); +} diff --git a/src/app.ts b/src/app.ts index 6a2205b4..1cb3746b 100644 --- a/src/app.ts +++ b/src/app.ts @@ -108,6 +108,7 @@ import { trendingStatusesController, trendingTagsController, } from '@/controllers/api/trends.ts'; +import { errorHandler } from '@/controllers/error.ts'; import { metricsController } from '@/controllers/metrics.ts'; import { indexController } from '@/controllers/site.ts'; import { nodeInfoController, nodeInfoSchemaController } from '@/controllers/well-known/nodeinfo.ts'; @@ -151,18 +152,17 @@ if (Conf.cronEnabled) { app.use('*', rateLimitMiddleware(300, Time.minutes(5))); -app.use('/api/*', logger(debug)); -app.use('/.well-known/*', logger(debug)); -app.use('/users/*', logger(debug)); -app.use('/nodeinfo/*', logger(debug)); -app.use('/oauth/*', logger(debug)); +app.use('/api/*', metricsMiddleware, logger(debug)); +app.use('/.well-known/*', metricsMiddleware, logger(debug)); +app.use('/users/*', metricsMiddleware, logger(debug)); +app.use('/nodeinfo/*', metricsMiddleware, logger(debug)); +app.use('/oauth/*', metricsMiddleware, logger(debug)); -app.get('/api/v1/streaming', streamingController); -app.get('/relay', relayController); +app.get('/api/v1/streaming', metricsMiddleware, streamingController); +app.get('/relay', metricsMiddleware, relayController); app.use( '*', - metricsMiddleware, cspMiddleware(), cors({ origin: '*', exposeHeaders: ['link'] }), signerMiddleware, @@ -340,6 +340,8 @@ app.get('/', frontendController, indexController); // Fallback app.get('*', publicFiles, staticFiles, frontendController); +app.onError(errorHandler); + export default app; export type { AppContext, AppController, AppMiddleware }; diff --git a/src/controllers/api/accounts.ts b/src/controllers/api/accounts.ts index 9e77ccc1..ad4802ca 100644 --- a/src/controllers/api/accounts.ts +++ b/src/controllers/api/accounts.ts @@ -209,7 +209,9 @@ const accountStatusesController: AppController = async (c) => { filter['#t'] = [tagged]; } - const events = await store.query([filter], { signal }) + const opts = { signal, limit, timeout: 10_000 }; + + const events = await store.query([filter], opts) .then((events) => hydrateEvents({ events, store, signal })) .then((events) => { if (exclude_replies) { diff --git a/src/controllers/api/notifications.ts b/src/controllers/api/notifications.ts index d92ccf4a..fab7c816 100644 --- a/src/controllers/api/notifications.ts +++ b/src/controllers/api/notifications.ts @@ -78,7 +78,7 @@ async function renderNotifications( const store = c.get('store'); const pubkey = await c.get('signer')?.getPublicKey()!; const { signal } = c.req.raw; - const opts = { signal, limit: params.limit }; + const opts = { signal, limit: params.limit, timeout: 15_000 }; const events = await store .query(filters, opts) diff --git a/src/controllers/api/statuses.ts b/src/controllers/api/statuses.ts index 28e0778a..30550bbb 100644 --- a/src/controllers/api/statuses.ts +++ b/src/controllers/api/statuses.ts @@ -8,14 +8,21 @@ import { z } from 'zod'; import { type AppController } from '@/app.ts'; import { Conf } from '@/config.ts'; import { DittoDB } from '@/db/DittoDB.ts'; -import { getAmount } from '@/utils/bolt11.ts'; import { getAncestors, getAuthor, getDescendants, getEvent } from '@/queries.ts'; import { getUnattachedMediaByIds } from '@/db/unattached-media.ts'; import { renderEventAccounts } from '@/views.ts'; import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts'; import { Storages } from '@/storages.ts'; import { hydrateEvents, purifyEvent } from '@/storages/hydrate.ts'; -import { createEvent, paginated, paginationSchema, parseBody, updateListEvent } from '@/utils/api.ts'; +import { + createEvent, + listPaginationSchema, + paginated, + paginatedList, + paginationSchema, + parseBody, + updateListEvent, +} from '@/utils/api.ts'; import { getInvoice, getLnurl } from '@/utils/lnurl.ts'; import { lookupPubkey } from '@/utils/lookup.ts'; import { addTag, deleteTag } from '@/utils/tags.ts'; @@ -545,33 +552,26 @@ const zapController: AppController = async (c) => { const zappedByController: AppController = async (c) => { const id = c.req.param('id'); + const params = listPaginationSchema.parse(c.req.query()); const store = await Storages.db(); - const amountSchema = z.coerce.number().int().nonnegative().catch(0); + const db = await DittoDB.getInstance(); - const events = (await store.query([{ kinds: [9735], '#e': [id], limit: 100 }])).map((event) => { - const zapRequestString = event.tags.find(([name]) => name === 'description')?.[1]; - if (!zapRequestString) return; - try { - const zapRequest = n.json().pipe(n.event()).parse(zapRequestString); - const amount = zapRequest?.tags.find(([name]: any) => name === 'amount')?.[1]; - if (!amount) { - const amount = getAmount(event?.tags.find(([name]) => name === 'bolt11')?.[1]); - if (!amount) return; - zapRequest.tags.push(['amount', amount]); - } - return zapRequest; - } catch { - return; - } - }).filter(Boolean) as DittoEvent[]; + const zaps = await db.selectFrom('event_zaps') + .selectAll() + .where('target_event_id', '=', id) + .orderBy('amount_millisats', 'desc') + .limit(params.limit) + .offset(params.offset).execute(); - await hydrateEvents({ events, store }); + const authors = await store.query([{ kinds: [0], authors: zaps.map((zap) => zap.sender_pubkey) }]); const results = (await Promise.all( - events.map(async (event) => { - const amount = amountSchema.parse(event.tags.find(([name]) => name === 'amount')?.[1]); - const comment = event?.content ?? ''; - const account = event?.author ? await renderAccount(event.author) : await accountFromPubkey(event.pubkey); + zaps.map(async (zap) => { + const amount = zap.amount_millisats; + const comment = zap.comment; + + const sender = authors.find((author) => author.pubkey === zap.sender_pubkey); + const account = sender ? await renderAccount(sender) : await accountFromPubkey(zap.sender_pubkey); return { comment, @@ -581,7 +581,7 @@ const zappedByController: AppController = async (c) => { }), )).filter(Boolean); - return c.json(results); + return paginatedList(c, params, results); }; export { diff --git a/src/controllers/api/timelines.ts b/src/controllers/api/timelines.ts index 5d7862b5..62f1cd2f 100644 --- a/src/controllers/api/timelines.ts +++ b/src/controllers/api/timelines.ts @@ -60,9 +60,10 @@ const suggestedTimelineController: AppController = async (c) => { async function renderStatuses(c: AppContext, filters: NostrFilter[]) { const { signal } = c.req.raw; const store = c.get('store'); + const opts = { signal, timeout: 10_000 }; const events = await store - .query(filters, { signal }) + .query(filters, opts) .then((events) => hydrateEvents({ events, store, signal })); if (!events.length) { diff --git a/src/controllers/error.ts b/src/controllers/error.ts new file mode 100644 index 00000000..fa5e4d32 --- /dev/null +++ b/src/controllers/error.ts @@ -0,0 +1,11 @@ +import { ErrorHandler } from '@hono/hono'; + +export const errorHandler: ErrorHandler = (err, c) => { + console.error(err); + + if (err.message === 'canceling statement due to statement timeout') { + return c.json({ error: 'The server was unable to respond in a timely manner' }, 500); + } + + return c.json({ error: 'Something went wrong' }, 500); +}; diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 4e624e9b..4d8ab2cb 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -73,11 +73,15 @@ function connectStream(socket: WebSocket) { const pubsub = await Storages.pubsub(); try { - for (const event of await store.query(filters, { limit: FILTER_LIMIT })) { + for (const event of await store.query(filters, { limit: FILTER_LIMIT, timeout: 1000 })) { send(['EVENT', subId, event]); } } catch (e) { - send(['CLOSED', subId, e.message]); + if (e instanceof RelayError) { + send(['CLOSED', subId, e.message]); + } else { + send(['CLOSED', subId, 'error: something went wrong']); + } controllers.delete(subId); return; } @@ -124,7 +128,7 @@ function connectStream(socket: WebSocket) { /** Handle COUNT. Return the number of events matching the filters. */ async function handleCount([_, subId, ...filters]: NostrClientCOUNT): Promise { const store = await Storages.db(); - const { count } = await store.count(filters); + const { count } = await store.count(filters, { timeout: 100 }); send(['COUNT', subId, { count, approximate: false }]); } diff --git a/src/db/DittoTables.ts b/src/db/DittoTables.ts index aed8c8c2..863ca61e 100644 --- a/src/db/DittoTables.ts +++ b/src/db/DittoTables.ts @@ -7,6 +7,7 @@ export interface DittoTables { author_stats: AuthorStatsRow; event_stats: EventStatsRow; pubkey_domains: PubkeyDomainRow; + event_zaps: EventZapRow; } interface AuthorStatsRow { @@ -69,3 +70,11 @@ interface PubkeyDomainRow { domain: string; last_updated_at: number; } + +interface EventZapRow { + receipt_id: string; + target_event_id: string; + sender_pubkey: string; + amount_millisats: number; + comment: string; +} diff --git a/src/db/migrations/027_add_zap_events.ts b/src/db/migrations/027_add_zap_events.ts new file mode 100644 index 00000000..2fcc101c --- /dev/null +++ b/src/db/migrations/027_add_zap_events.ts @@ -0,0 +1,32 @@ +import { Kysely } from 'kysely'; + +export async function up(db: Kysely): Promise { + await db.schema + .createTable('event_zaps') + .addColumn('receipt_id', 'text', (col) => col.primaryKey()) + .addColumn('target_event_id', 'text', (col) => col.notNull()) + .addColumn('sender_pubkey', 'text', (col) => col.notNull()) + .addColumn('amount_millisats', 'integer', (col) => col.notNull()) + .addColumn('comment', 'text', (col) => col.notNull()) + .execute(); + + await db.schema + .createIndex('idx_event_zaps_amount_millisats') + .on('event_zaps') + .column('amount_millisats') + .ifNotExists() + .execute(); + + await db.schema + .createIndex('idx_event_zaps_target_event_id') + .on('event_zaps') + .column('target_event_id') + .ifNotExists() + .execute(); +} + +export async function down(db: Kysely): Promise { + await db.schema.dropIndex('idx_event_zaps_amount_millisats').ifExists().execute(); + await db.schema.dropIndex('idx_event_zaps_target_event_id').ifExists().execute(); + await db.schema.dropTable('event_zaps').execute(); +} diff --git a/src/firehose.ts b/src/firehose.ts index 3e6c8fc0..8b61c784 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -4,7 +4,7 @@ import { firehoseEventCounter } from '@/metrics.ts'; import { Storages } from '@/storages.ts'; import { nostrNow } from '@/utils.ts'; -import * as pipeline from './pipeline.ts'; +import * as pipeline from '@/pipeline.ts'; const console = new Stickynotes('ditto:firehose'); diff --git a/src/metrics.ts b/src/metrics.ts index 96d91599..67da3f49 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -6,6 +6,12 @@ export const httpRequestCounter = new Counter({ labelNames: ['method'], }); +export const httpResponseCounter = new Counter({ + name: 'http_responses_total', + help: 'Total number of HTTP responses', + labelNames: ['status', 'path'], +}); + export const streamingConnectionsGauge = new Gauge({ name: 'streaming_connections', help: 'Number of active connections to the streaming API', diff --git a/src/middleware/metricsMiddleware.ts b/src/middleware/metricsMiddleware.ts index 1a491186..d7ac43d5 100644 --- a/src/middleware/metricsMiddleware.ts +++ b/src/middleware/metricsMiddleware.ts @@ -1,10 +1,14 @@ import { MiddlewareHandler } from '@hono/hono'; -import { httpRequestCounter } from '@/metrics.ts'; +import { httpRequestCounter, httpResponseCounter } from '@/metrics.ts'; export const metricsMiddleware: MiddlewareHandler = async (c, next) => { const { method } = c.req; httpRequestCounter.inc({ method }); await next(); + + const { status } = c.res; + const path = c.req.matchedRoutes.find((r) => r.method !== 'ALL')?.path ?? c.req.routePath; + httpResponseCounter.inc({ status, path }); }; diff --git a/src/pipeline.test.ts b/src/pipeline.test.ts new file mode 100644 index 00000000..64cb523b --- /dev/null +++ b/src/pipeline.test.ts @@ -0,0 +1,125 @@ +import { assertEquals } from '@std/assert'; +import { generateSecretKey } from 'nostr-tools'; + +import { genEvent, getTestDB } from '@/test.ts'; +import { handleZaps } from '@/pipeline.ts'; + +Deno.test('store one zap receipt in nostr_events; convert it into event_zaps table format and store it', async () => { + await using db = await getTestDB(); + const kysely = db.kysely; + + const sk = generateSecretKey(); + + const event = genEvent({ + 'id': '67b48a14fb66c60c8f9070bdeb37afdfcc3d08ad01989460448e4081eddda446', + 'pubkey': '9630f464cca6a5147aa8a35f0bcdd3ce485324e732fd39e09233b1d848238f31', + 'created_at': 1674164545, + 'kind': 9735, + 'tags': [ + ['p', '32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245'], + ['P', '97c70a44366a6535c145b333f973ea86dfdc2d7a99da618c40c64705ad98e322'], + ['e', '3624762a1274dd9636e0c552b53086d70bc88c165bc4dc0f9e836a1eaf86c3b8'], + [ + 'bolt11', + 'lnbc10u1p3unwfusp5t9r3yymhpfqculx78u027lxspgxcr2n2987mx2j55nnfs95nxnzqpp5jmrh92pfld78spqs78v9euf2385t83uvpwk9ldrlvf6ch7tpascqhp5zvkrmemgth3tufcvflmzjzfvjt023nazlhljz2n9hattj4f8jq8qxqyjw5qcqpjrzjqtc4fc44feggv7065fqe5m4ytjarg3repr5j9el35xhmtfexc42yczarjuqqfzqqqqqqqqlgqqqqqqgq9q9qxpqysgq079nkq507a5tw7xgttmj4u990j7wfggtrasah5gd4ywfr2pjcn29383tphp4t48gquelz9z78p4cq7ml3nrrphw5w6eckhjwmhezhnqpy6gyf0', + ], + [ + 'description', + '{"pubkey":"97c70a44366a6535c145b333f973ea86dfdc2d7a99da618c40c64705ad98e322","content":"","id":"d9cc14d50fcb8c27539aacf776882942c1a11ea4472f8cdec1dea82fab66279d","created_at":1674164539,"sig":"77127f636577e9029276be060332ea565deaf89ff215a494ccff16ae3f757065e2bc59b2e8c113dd407917a010b3abd36c8d7ad84c0e3ab7dab3a0b0caa9835d","kind":9734,"tags":[["e","3624762a1274dd9636e0c552b53086d70bc88c165bc4dc0f9e836a1eaf86c3b8"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"],["relays","wss://relay.damus.io","wss://nostr-relay.wlvs.space","wss://nostr.fmt.wiz.biz","wss://relay.nostr.bg","wss://nostr.oxtr.dev","wss://nostr.v0l.io","wss://brb.io","wss://nostr.bitcoiner.social","ws://monad.jb55.com:8080","wss://relay.snort.social"]]}', + ], + ['preimage', '5d006d2cf1e73c7148e7519a4c68adc81642ce0e25a432b2434c99f97344c15f'], + ], + 'content': '', + }, sk); + + await db.store.event(event); + + await handleZaps(kysely, event); + await handleZaps(kysely, event); + + const zapReceipts = await kysely.selectFrom('nostr_events').selectAll().execute(); + const customEventZaps = await kysely.selectFrom('event_zaps').selectAll().execute(); + + assertEquals(zapReceipts.length, 1); // basic check + assertEquals(customEventZaps.length, 1); // basic check + + const expected = { + receipt_id: event.id, + target_event_id: '3624762a1274dd9636e0c552b53086d70bc88c165bc4dc0f9e836a1eaf86c3b8', + sender_pubkey: '97c70a44366a6535c145b333f973ea86dfdc2d7a99da618c40c64705ad98e322', + amount_millisats: 1000000, + comment: '', + }; + + assertEquals(customEventZaps[0], expected); +}); + +// The function tests below only handle the edge cases and don't assert anything +// If no error happens = ok + +Deno.test('zap receipt does not have a "description" tag', async () => { + await using db = await getTestDB(); + const kysely = db.kysely; + + const sk = generateSecretKey(); + + const event = genEvent({ kind: 9735 }, sk); + + await handleZaps(kysely, event); + + // no error happened = ok +}); + +Deno.test('zap receipt does not have a zap request stringified value in the "description" tag', async () => { + await using db = await getTestDB(); + const kysely = db.kysely; + + const sk = generateSecretKey(); + + const event = genEvent({ kind: 9735, tags: [['description', 'yolo']] }, sk); + + await handleZaps(kysely, event); + + // no error happened = ok +}); + +Deno.test('zap receipt does not have a "bolt11" tag', async () => { + await using db = await getTestDB(); + const kysely = db.kysely; + + const sk = generateSecretKey(); + + const event = genEvent({ + kind: 9735, + tags: [[ + 'description', + '{"pubkey":"97c70a44366a6535c145b333f973ea86dfdc2d7a99da618c40c64705ad98e322","content":"","id":"d9cc14d50fcb8c27539aacf776882942c1a11ea4472f8cdec1dea82fab66279d","created_at":1674164539,"sig":"77127f636577e9029276be060332ea565deaf89ff215a494ccff16ae3f757065e2bc59b2e8c113dd407917a010b3abd36c8d7ad84c0e3ab7dab3a0b0caa9835d","kind":9734,"tags":[["e","3624762a1274dd9636e0c552b53086d70bc88c165bc4dc0f9e836a1eaf86c3b8"],["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"],["relays","wss://relay.damus.io","wss://nostr-relay.wlvs.space","wss://nostr.fmt.wiz.biz","wss://relay.nostr.bg","wss://nostr.oxtr.dev","wss://nostr.v0l.io","wss://brb.io","wss://nostr.bitcoiner.social","ws://monad.jb55.com:8080","wss://relay.snort.social"]]}', + ]], + }, sk); + + await handleZaps(kysely, event); + + // no error happened = ok +}); + +Deno.test('zap request inside zap receipt does not have an "e" tag', async () => { + await using db = await getTestDB(); + const kysely = db.kysely; + + const sk = generateSecretKey(); + + const event = genEvent({ + kind: 9735, + tags: [[ + 'bolt11', + 'lnbc10u1p3unwfusp5t9r3yymhpfqculx78u027lxspgxcr2n2987mx2j55nnfs95nxnzqpp5jmrh92pfld78spqs78v9euf2385t83uvpwk9ldrlvf6ch7tpascqhp5zvkrmemgth3tufcvflmzjzfvjt023nazlhljz2n9hattj4f8jq8qxqyjw5qcqpjrzjqtc4fc44feggv7065fqe5m4ytjarg3repr5j9el35xhmtfexc42yczarjuqqfzqqqqqqqqlgqqqqqqgq9q9qxpqysgq079nkq507a5tw7xgttmj4u990j7wfggtrasah5gd4ywfr2pjcn29383tphp4t48gquelz9z78p4cq7ml3nrrphw5w6eckhjwmhezhnqpy6gyf0', + ], [ + 'description', + '{"pubkey":"97c70a44366a6535c145b333f973ea86dfdc2d7a99da618c40c64705ad98e322","content":"","id":"d9cc14d50fcb8c27539aacf776882942c1a11ea4472f8cdec1dea82fab66279d","created_at":1674164539,"sig":"77127f636577e9029276be060332ea565deaf89ff215a494ccff16ae3f757065e2bc59b2e8c113dd407917a010b3abd36c8d7ad84c0e3ab7dab3a0b0caa9835d","kind":9734,"tags":[["p","32e1827635450ebb3c5a7d12c1f8e7b2b514439ac10a67eef3d9fd9c5c68e245"],["relays","wss://relay.damus.io","wss://nostr-relay.wlvs.space","wss://nostr.fmt.wiz.biz","wss://relay.nostr.bg","wss://nostr.oxtr.dev","wss://nostr.v0l.io","wss://brb.io","wss://nostr.bitcoiner.social","ws://monad.jb55.com:8080","wss://relay.snort.social"]]}', + ]], + }, sk); + + await handleZaps(kysely, event); + + // no error happened = ok +}); diff --git a/src/pipeline.ts b/src/pipeline.ts index 695a027e..17998aa3 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,7 +1,8 @@ import { NKinds, NostrEvent, NSchema as n } from '@nostrify/nostrify'; import Debug from '@soapbox/stickynotes/debug'; -import { sql } from 'kysely'; +import { Kysely, sql } from 'kysely'; import { LRUCache } from 'lru-cache'; +import { z } from 'zod'; import { Conf } from '@/config.ts'; import { DittoDB } from '@/db/DittoDB.ts'; @@ -18,6 +19,8 @@ import { verifyEventWorker } from '@/workers/verify.ts'; import { nip05Cache } from '@/utils/nip05.ts'; import { updateStats } from '@/utils/stats.ts'; import { getTagSet } from '@/utils/tags.ts'; +import { DittoTables } from '@/db/DittoTables.ts'; +import { getAmount } from '@/utils/bolt11.ts'; const debug = Debug('ditto:pipeline'); @@ -51,8 +54,11 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise { +async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise { if (NKinds.ephemeral(event.kind)) return; const store = await Storages.db(); const kysely = await DittoDB.getInstance(); @@ -220,4 +226,33 @@ async function generateSetEvents(event: NostrEvent): Promise { } } -export { handleEvent }; +/** Stores the event in the 'event_zaps' table */ +async function handleZaps(kysely: Kysely, event: NostrEvent) { + if (event.kind !== 9735) return; + + const zapRequestString = event?.tags?.find(([name]) => name === 'description')?.[1]; + if (!zapRequestString) return; + const zapRequest = n.json().pipe(n.event()).optional().catch(undefined).parse(zapRequestString); + if (!zapRequest) return; + + const amountSchema = z.coerce.number().int().nonnegative().catch(0); + const amount_millisats = amountSchema.parse(getAmount(event?.tags.find(([name]) => name === 'bolt11')?.[1])); + if (!amount_millisats || amount_millisats < 1) return; + + const zappedEventId = zapRequest.tags.find(([name]) => name === 'e')?.[1]; + if (!zappedEventId) return; + + try { + await kysely.insertInto('event_zaps').values({ + receipt_id: event.id, + target_event_id: zappedEventId, + sender_pubkey: zapRequest.pubkey, + amount_millisats, + comment: zapRequest.content, + }).execute(); + } catch { + // receipt_id is unique, do nothing + } +} + +export { handleEvent, handleZaps }; diff --git a/src/storages/EventsDB.test.ts b/src/storages/EventsDB.test.ts index 16b429d4..32838fa6 100644 --- a/src/storages/EventsDB.test.ts +++ b/src/storages/EventsDB.test.ts @@ -191,3 +191,33 @@ Deno.test('inserting replaceable events', async () => { await eventsDB.event(newerEvent); assertEquals(await eventsDB.query([{ kinds: [0] }]), [newerEvent]); }); + +Deno.test("throws a RelayError when querying an event with a large 'since'", async () => { + const { eventsDB } = await createDB(); + + await assertRejects( + () => eventsDB.query([{ since: 33333333333333 }]), + RelayError, + 'since filter too far into the future', + ); +}); + +Deno.test("throws a RelayError when querying an event with a large 'until'", async () => { + const { eventsDB } = await createDB(); + + await assertRejects( + () => eventsDB.query([{ until: 66666666666666 }]), + RelayError, + 'until filter too far into the future', + ); +}); + +Deno.test("throws a RelayError when querying an event with a large 'kind'", async () => { + const { eventsDB } = await createDB(); + + await assertRejects( + () => eventsDB.query([{ kinds: [99999999999999] }]), + RelayError, + 'kind filter too far into the future', + ); +}); diff --git a/src/storages/EventsDB.ts b/src/storages/EventsDB.ts index bd350173..f640cc45 100644 --- a/src/storages/EventsDB.ts +++ b/src/storages/EventsDB.ts @@ -45,13 +45,14 @@ class EventsDB implements NStore { constructor(private kysely: Kysely) { this.store = new NDatabase(kysely, { fts: Conf.db.dialect, + timeoutStrategy: Conf.db.dialect === 'postgres' ? 'setStatementTimeout' : undefined, indexTags: EventsDB.indexTags, searchText: EventsDB.searchText, }); } /** Insert an event (and its tags) into the database. */ - async event(event: NostrEvent, _opts?: { signal?: AbortSignal }): Promise { + async event(event: NostrEvent, opts: { signal?: AbortSignal; timeout?: number } = {}): Promise { event = purifyEvent(event); this.console.debug('EVENT', JSON.stringify(event)); dbEventCounter.inc({ kind: event.kind }); @@ -63,7 +64,7 @@ class EventsDB implements NStore { await this.deleteEventsAdmin(event); try { - await this.store.event(event); + await this.store.event(event, { ...opts, timeout: opts.timeout ?? 1000 }); } catch (e) { if (e.message === 'Cannot add a deleted event') { throw new RelayError('blocked', 'event deleted by user'); @@ -137,20 +138,23 @@ class EventsDB implements NStore { } /** Get events for filters from the database. */ - async query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise { + async query( + filters: NostrFilter[], + opts: { signal?: AbortSignal; timeout?: number; limit?: number } = {}, + ): Promise { filters = await this.expandFilters(filters); dbQueryCounter.inc(); for (const filter of filters) { if (filter.since && filter.since >= 2_147_483_647) { - throw new Error('since filter too far into the future'); + throw new RelayError('invalid', 'since filter too far into the future'); } if (filter.until && filter.until >= 2_147_483_647) { - throw new Error('until filter too far into the future'); + throw new RelayError('invalid', 'until filter too far into the future'); } for (const kind of filter.kinds ?? []) { if (kind >= 2_147_483_647) { - throw new Error('kind filter too far into the future'); + throw new RelayError('invalid', 'kind filter too far into the future'); } } } @@ -160,28 +164,28 @@ class EventsDB implements NStore { this.console.debug('REQ', JSON.stringify(filters)); - return this.store.query(filters, opts); + return this.store.query(filters, { ...opts, timeout: opts.timeout ?? 1000 }); } /** Delete events based on filters from the database. */ - async remove(filters: NostrFilter[], _opts?: { signal?: AbortSignal }): Promise { + async remove(filters: NostrFilter[], opts: { signal?: AbortSignal; timeout?: number } = {}): Promise { if (!filters.length) return Promise.resolve(); this.console.debug('DELETE', JSON.stringify(filters)); - return this.store.remove(filters); + return this.store.remove(filters, { ...opts, timeout: opts.timeout ?? 3000 }); } /** Get number of events that would be returned by filters. */ async count( filters: NostrFilter[], - opts: { signal?: AbortSignal } = {}, + opts: { signal?: AbortSignal; timeout?: number } = {}, ): Promise<{ count: number; approximate: boolean }> { if (opts.signal?.aborted) return Promise.reject(abortError()); if (!filters.length) return Promise.resolve({ count: 0, approximate: false }); this.console.debug('COUNT', JSON.stringify(filters)); - return this.store.count(filters); + return this.store.count(filters, { ...opts, timeout: opts.timeout ?? 500 }); } /** Return only the tags that should be indexed. */ @@ -275,6 +279,12 @@ class EventsDB implements NStore { filter.search = tokens.filter((t) => typeof t === 'string').join(' '); } + + if (filter.kinds) { + // Ephemeral events are not stored, so don't bother querying for them. + // If this results in an empty kinds array, NDatabase will remove the filter before querying and return no results. + filter.kinds = filter.kinds.filter((kind) => !NKinds.ephemeral(kind)); + } } return filters; diff --git a/src/utils/scavenger.test.ts b/src/utils/scavenger.test.ts new file mode 100644 index 00000000..e69de29b