diff --git a/deno.json b/deno.json index 5233fa9e..66b12132 100644 --- a/deno.json +++ b/deno.json @@ -6,21 +6,22 @@ "dev": "deno run --allow-read --allow-write=data --allow-env --allow-net --unstable --watch src/server.ts", "debug": "deno run --allow-read --allow-write=data --allow-env --allow-net --unstable --inspect src/server.ts", "test": "DB_PATH=\":memory:\" deno test --allow-read --allow-write=data --allow-env --unstable src", - "check": "deno check --unstable src/server.ts" + "check": "deno check --unstable src/server.ts", + "relays:sync": "deno run -A --unstable scripts/relays.ts sync", }, "imports": { "@/": "./src/", "~/": "./" }, "lint": { - "include": ["src/"], + "include": ["src/", "scripts/"], "rules": { "tags": ["recommended"], "exclude": ["no-explicit-any"] } }, "fmt": { - "include": ["src/"], + "include": ["src/", "scripts/"], "useTabs": false, "lineWidth": 120, "indentWidth": 2, diff --git a/scripts/relays.ts b/scripts/relays.ts new file mode 100644 index 00000000..ab4e7aba --- /dev/null +++ b/scripts/relays.ts @@ -0,0 +1,23 @@ +import { addRelays } from '@/db/relays.ts'; +import { filteredArray } from '@/schema.ts'; +import { relaySchema } from '~/src/utils.ts'; + +switch (Deno.args[0]) { + case 'sync': + await sync(Deno.args.slice(1)); + break; + default: + console.log('Usage: deno run -A scripts/relays.ts sync '); +} + +async function sync([url]: string[]) { + if (!url) { + console.error('Error: please provide a URL'); + Deno.exit(1); + } + const response = await fetch(url); + const data = await response.json(); + const values = filteredArray(relaySchema).parse(data) as `wss://${string}`[]; + await addRelays(values); + console.log(`Done: added ${values.length} relays.`); +} diff --git a/src/app.ts b/src/app.ts index 00a14601..6968b0d8 100644 --- a/src/app.ts +++ b/src/app.ts @@ -1,6 +1,6 @@ import { type Context, cors, type Handler, Hono, type HonoEnv, logger, type MiddlewareHandler } from '@/deps.ts'; import { type Event } from '@/event.ts'; -import '@/loopback.ts'; +import '@/firehose.ts'; import { actorController } from './controllers/activitypub/actor.ts'; import { diff --git a/src/config.ts b/src/config.ts index 461d986b..f13a4079 100644 --- a/src/config.ts +++ b/src/config.ts @@ -35,12 +35,9 @@ const Conf = { ['sign', 'verify'], ); }, - get relay() { - const value = Deno.env.get('DITTO_RELAY'); - if (!value) { - throw new Error('Missing DITTO_RELAY'); - } - return value; + get relay(): `wss://${string}` | `ws://${string}` { + const { protocol, host } = Conf.url; + return `${protocol === 'https:' ? 'wss:' : 'ws:'}//${host}/relay`; }, get localDomain() { return Deno.env.get('LOCAL_DOMAIN') || 'http://localhost:8000'; diff --git a/src/db.ts b/src/db.ts index d03bf2f1..474652c4 100644 --- a/src/db.ts +++ b/src/db.ts @@ -8,6 +8,7 @@ interface DittoDB { events: EventRow; tags: TagRow; users: UserRow; + relays: RelayRow; } interface EventRow { @@ -34,6 +35,12 @@ interface UserRow { inserted_at: Date; } +interface RelayRow { + url: string; + domain: string; + active: boolean; +} + const db = new Kysely({ dialect: new DenoSqliteDialect({ database: new Sqlite(Conf.dbPath), diff --git a/src/db/migrations/001_add_relays.ts b/src/db/migrations/001_add_relays.ts new file mode 100644 index 00000000..1415f5f7 --- /dev/null +++ b/src/db/migrations/001_add_relays.ts @@ -0,0 +1,14 @@ +import { Kysely } from '@/deps.ts'; + +export async function up(db: Kysely): Promise { + await db.schema + .createTable('relays') + .addColumn('url', 'text', (col) => col.primaryKey()) + .addColumn('domain', 'text', (col) => col.notNull()) + .addColumn('active', 'boolean', (col) => col.notNull()) + .execute(); +} + +export async function down(db: Kysely): Promise { + await db.schema.dropTable('relays').execute(); +} diff --git a/src/db/relays.ts b/src/db/relays.ts new file mode 100644 index 00000000..d6f99c38 --- /dev/null +++ b/src/db/relays.ts @@ -0,0 +1,31 @@ +import { tldts } from '@/deps.ts'; +import { db } from '@/db.ts'; + +/** Inserts relays into the database, skipping duplicates. */ +function addRelays(relays: `wss://${string}`[]) { + if (!relays.length) return Promise.resolve(); + + const values = relays.map((url) => ({ + url, + domain: tldts.getDomain(url)!, + active: true, + })); + + return db.insertInto('relays') + .values(values) + .onConflict((oc) => oc.column('url').doNothing()) + .execute(); +} + +/** Get a list of all known active relay URLs. */ +async function getActiveRelays(): Promise { + const rows = await db + .selectFrom('relays') + .select('relays.url') + .where('relays.active', '=', true) + .execute(); + + return rows.map((row) => row.url); +} + +export { addRelays, getActiveRelays }; diff --git a/src/deps.ts b/src/deps.ts index 5d075796..2c7d67d5 100644 --- a/src/deps.ts +++ b/src/deps.ts @@ -59,4 +59,5 @@ export { type NullableInsertKeys, sql, } from 'npm:kysely@^0.25.0'; -export { DenoSqliteDialect } from 'https://gitlab.com/soapbox-pub/kysely-deno-sqlite/-/raw/76748303a45fac64a889cd2b9265c6c9b8ef2e8b/mod.ts'; +export { DenoSqliteDialect } from 'https://gitlab.com/soapbox-pub/kysely-deno-sqlite/-/raw/v1.0.0/mod.ts'; +export { default as tldts } from 'npm:tldts@^6.0.14'; diff --git a/src/firehose.ts b/src/firehose.ts new file mode 100644 index 00000000..689ebc3f --- /dev/null +++ b/src/firehose.ts @@ -0,0 +1,69 @@ +import { insertEvent, isLocallyFollowed } from '@/db/events.ts'; +import { addRelays, getActiveRelays } from '@/db/relays.ts'; +import { findUser } from '@/db/users.ts'; +import { RelayPool } from '@/deps.ts'; +import { trends } from '@/trends.ts'; +import { isRelay, nostrDate, nostrNow } from '@/utils.ts'; + +import type { SignedEvent } from '@/event.ts'; + +const relays = await getActiveRelays(); +const pool = new RelayPool(relays); + +// This file watches events on all known relays and performs +// side-effects based on them, such as trending hashtag tracking +// and storing events for notifications and the home feed. +pool.subscribe( + [{ kinds: [0, 1, 3, 5, 6, 7, 10002], since: nostrNow() }], + relays, + handleEvent, + undefined, + undefined, +); + +/** Handle events through the firehose pipeline. */ +async function handleEvent(event: SignedEvent): Promise { + console.info(`firehose: Event<${event.kind}> ${event.id}`); + + trackHashtags(event); + trackRelays(event); + + if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) { + insertEvent(event).catch(console.warn); + } +} + +/** Track whenever a hashtag is used, for processing trending tags. */ +function trackHashtags(event: SignedEvent): void { + const date = nostrDate(event.created_at); + + const tags = event.tags + .filter((tag) => tag[0] === 't') + .map((tag) => tag[1]) + .slice(0, 5); + + if (!tags.length) return; + + try { + console.info('tracking tags:', tags); + trends.addTagUsages(event.pubkey, tags, date); + } catch (_e) { + // do nothing + } +} + +/** Tracks known relays in the database. */ +function trackRelays(event: SignedEvent) { + const relays = new Set<`wss://${string}`>(); + + event.tags.forEach((tag) => { + if (['p', 'e', 'a'].includes(tag[0]) && isRelay(tag[2])) { + relays.add(tag[2]); + } + if (event.kind === 10002 && tag[0] === 'r' && isRelay(tag[1])) { + relays.add(tag[1]); + } + }); + + return addRelays([...relays]); +} diff --git a/src/loopback.ts b/src/loopback.ts deleted file mode 100644 index e07c73ea..00000000 --- a/src/loopback.ts +++ /dev/null @@ -1,51 +0,0 @@ -import { Conf } from '@/config.ts'; -import { insertEvent, isLocallyFollowed } from '@/db/events.ts'; -import { findUser } from '@/db/users.ts'; -import { RelayPool } from '@/deps.ts'; -import { trends } from '@/trends.ts'; -import { nostrDate, nostrNow } from '@/utils.ts'; - -import type { SignedEvent } from '@/event.ts'; - -const relay = new RelayPool([Conf.relay]); - -// This file watches all events on your Ditto relay and triggers -// side-effects based on them. This can be used for things like -// notifications, trending hashtag tracking, etc. -relay.subscribe( - [{ kinds: [1], since: nostrNow() }], - [Conf.relay], - handleEvent, - undefined, - undefined, -); - -/** Handle events through the loopback pipeline. */ -async function handleEvent(event: SignedEvent): Promise { - console.info('loopback event:', event.id); - - trackHashtags(event); - - if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) { - insertEvent(event).catch(console.warn); - } -} - -/** Track whenever a hashtag is used, for processing trending tags. */ -function trackHashtags(event: SignedEvent): void { - const date = nostrDate(event.created_at); - - const tags = event.tags - .filter((tag) => tag[0] === 't') - .map((tag) => tag[1]) - .slice(0, 5); - - if (!tags.length) return; - - try { - console.info('tracking tags:', tags); - trends.addTagUsages(event.pubkey, tags, date); - } catch (_e) { - // do nothing - } -} diff --git a/src/schema.ts b/src/schema.ts index 361a310a..e56c1aeb 100644 --- a/src/schema.ts +++ b/src/schema.ts @@ -20,24 +20,6 @@ const jsonSchema = z.string().transform((value, ctx) => { } }); -/** Alias for `safeParse`, but instead of returning a success object it returns the value (or undefined on fail). */ -function parseValue(schema: z.ZodType, value: unknown): T | undefined { - const result = schema.safeParse(value); - return result.success ? result.data : undefined; -} - -const parseRelay = (relay: string | URL) => parseValue(relaySchema, relay); - -const relaySchema = z.custom((relay) => { - if (typeof relay !== 'string') return false; - try { - const { protocol } = new URL(relay); - return protocol === 'wss:' || protocol === 'ws:'; - } catch (_e) { - return false; - } -}); - const emojiTagSchema = z.tuple([z.literal('emoji'), z.string(), z.string().url()]); /** https://developer.mozilla.org/en-US/docs/Glossary/Base64#the_unicode_problem */ @@ -54,4 +36,10 @@ const decode64Schema = z.string().transform((value, ctx) => { const hashtagSchema = z.string().regex(/^\w{1,30}$/); -export { decode64Schema, emojiTagSchema, filteredArray, hashtagSchema, jsonSchema, parseRelay, relaySchema }; +/** + * Limits the length before trying to parse the URL. + * https://stackoverflow.com/a/417184/8811886 + */ +const safeUrlSchema = z.string().max(2048).url(); + +export { decode64Schema, emojiTagSchema, filteredArray, hashtagSchema, jsonSchema, safeUrlSchema }; diff --git a/src/schemas/nostr.ts b/src/schemas/nostr.ts index e5bdb813..cbc68164 100644 --- a/src/schemas/nostr.ts +++ b/src/schemas/nostr.ts @@ -1,14 +1,16 @@ import { verifySignature, z } from '@/deps.ts'; -import { jsonSchema } from '../schema.ts'; +import { jsonSchema, safeUrlSchema } from '../schema.ts'; /** Schema to validate Nostr hex IDs such as event IDs and pubkeys. */ const nostrIdSchema = z.string().regex(/^[0-9a-f]{64}$/); +/** Nostr kinds are positive integers. */ +const kindSchema = z.number().int().positive(); /** Nostr event schema. */ const eventSchema = z.object({ id: nostrIdSchema, - kind: z.number(), + kind: kindSchema, tags: z.array(z.array(z.string())), content: z.string(), created_at: z.number(), @@ -21,7 +23,7 @@ const signedEventSchema = eventSchema.refine(verifySignature); /** Nostr relay filter schema. */ const filterSchema = z.object({ - kinds: z.number().int().positive().array().optional(), + kinds: kindSchema.array().optional(), ids: nostrIdSchema.array().optional(), authors: nostrIdSchema.array().optional(), since: z.number().int().positive().optional(), @@ -67,6 +69,17 @@ const metaContentSchema = z.object({ /** Parses kind 0 content from a JSON string. */ const jsonMetaContentSchema = jsonSchema.pipe(metaContentSchema).catch({}); +/** NIP-11 Relay Information Document. */ +const relayInfoDocSchema = z.object({ + name: z.string().transform((val) => val.slice(0, 30)).optional().catch(undefined), + description: z.string().transform((val) => val.slice(0, 3000)).optional().catch(undefined), + pubkey: nostrIdSchema.optional().catch(undefined), + contact: safeUrlSchema.optional().catch(undefined), + supported_nips: z.number().int().positive().array().optional().catch(undefined), + software: safeUrlSchema.optional().catch(undefined), + icon: safeUrlSchema.optional().catch(undefined), +}); + export { type ClientCLOSE, type ClientEVENT, @@ -77,5 +90,6 @@ export { jsonMetaContentSchema, metaContentSchema, nostrIdSchema, + relayInfoDocSchema, signedEventSchema, }; diff --git a/src/utils.ts b/src/utils.ts index ed0a640f..8716d634 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -142,6 +142,12 @@ function activityJson(c: Context, object: T) { return response; } +/** Schema to parse a relay URL. */ +const relaySchema = z.string().max(255).startsWith('wss://').url(); + +/** Check whether the value is a valid relay URL. */ +const isRelay = (relay: string): relay is `wss://${string}` => relaySchema.safeParse(relay).success; + export { activityJson, bech32ToPubkey, @@ -149,6 +155,7 @@ export { eventAge, eventDateComparator, findTag, + isRelay, lookupAccount, type Nip05, nostrDate, @@ -157,6 +164,7 @@ export { paginationSchema, parseBody, parseNip05, + relaySchema, sha256, };