diff --git a/packages/ditto/controllers/api/ditto.ts b/packages/ditto/controllers/api/ditto.ts index 4d470f24..ff1b958f 100644 --- a/packages/ditto/controllers/api/ditto.ts +++ b/packages/ditto/controllers/api/ditto.ts @@ -10,7 +10,7 @@ import { getInstanceMetadata } from '@/utils/instance.ts'; import { deleteTag } from '@/utils/tags.ts'; import { DittoZapSplits, getZapSplits } from '@/utils/zap-split.ts'; import { screenshotsSchema } from '@/schemas/nostr.ts'; -import { booleanParamSchema, percentageSchema, wsUrlSchema } from '@/schema.ts'; +import { booleanParamSchema, percentageSchema } from '@/schema.ts'; import { hydrateEvents } from '@/storages/hydrate.ts'; import { renderNameRequest } from '@/views/ditto.ts'; import { accountFromPubkey } from '@/views/mastodon/accounts.ts'; @@ -20,6 +20,16 @@ import { updateListAdminEvent } from '@/utils/api.ts'; const markerSchema = z.enum(['read', 'write']); +/** WebSocket URL. */ +const wsUrlSchema = z.string().refine((val): val is `wss://${string}` | `ws://${string}` => { + try { + const { protocol } = new URL(val); + return protocol === 'wss:' || protocol === 'ws:'; + } catch { + return false; + } +}, 'Invalid WebSocket URL'); + const relaySchema = z.object({ url: wsUrlSchema, marker: markerSchema.optional(), @@ -62,7 +72,7 @@ function renderRelays(event: NostrEvent): RelayEntity[] { return event.tags.reduce((acc, [name, url, marker]) => { if (name === 'r') { const relay: RelayEntity = { - url, + url: url as `wss://${string}`, marker: markerSchema.safeParse(marker).success ? marker as 'read' | 'write' : undefined, }; acc.push(relay); diff --git a/packages/ditto/schema.ts b/packages/ditto/schema.ts index 56c9b998..c67aa5f6 100644 --- a/packages/ditto/schema.ts +++ b/packages/ditto/schema.ts @@ -22,16 +22,6 @@ const hashtagSchema = z.string().regex(/^\w{1,30}$/); */ const safeUrlSchema = z.string().max(2048).url(); -/** WebSocket URL. */ -const wsUrlSchema = z.string().refine((val) => { - try { - const { protocol } = new URL(val); - return protocol === 'wss:' || protocol === 'ws:'; - } catch { - return false; - } -}, 'Invalid WebSocket URL'); - /** https://github.com/colinhacks/zod/issues/1630#issuecomment-1365983831 */ const booleanParamSchema = z.enum(['true', 'false']).transform((value) => value === 'true'); @@ -93,5 +83,4 @@ export { safeUrlSchema, sizesSchema, walletSchema, - wsUrlSchema, }; diff --git a/packages/ditto/storages.ts b/packages/ditto/storages.ts index d5d0f029..aae165f2 100644 --- a/packages/ditto/storages.ts +++ b/packages/ditto/storages.ts @@ -1,13 +1,11 @@ // deno-lint-ignore-file require-await import { type DittoDB, DittoPolyPg } from '@ditto/db'; import { NPool, NRelay1 } from '@nostrify/nostrify'; -import { logi } from '@soapbox/logi'; import { Conf } from '@/config.ts'; -import { wsUrlSchema } from '@/schema.ts'; import { DittoPgStore } from '@/storages/DittoPgStore.ts'; -import { getRelays } from '@/utils/outbox.ts'; import { seedZapSplits } from '@/utils/zap-split.ts'; +import { DittoPool } from '@/storages/DittoPool.ts'; export class Storages { private static _db: Promise | undefined; @@ -55,53 +53,8 @@ export class Storages { public static async client(): Promise> { if (!this._client) { this._client = (async () => { - const db = await this.db(); - - const [relayList] = await db.query([ - { kinds: [10002], authors: [await Conf.signer.getPublicKey()], limit: 1 }, - ]); - - const tags = relayList?.tags ?? []; - - const activeRelays = tags.reduce((acc, [name, url, marker]) => { - const valid = wsUrlSchema.safeParse(url).success; - - if (valid && name === 'r' && (!marker || marker === 'write')) { - acc.push(url); - } - return acc; - }, []); - - logi({ - level: 'info', - ns: 'ditto.pool', - msg: `connecting to ${activeRelays.length} relays`, - relays: activeRelays, - }); - - return new NPool({ - open(url) { - return new NRelay1(url, { - // Skip event verification (it's done in the pipeline). - verifyEvent: () => true, - log(log) { - logi(log); - }, - }); - }, - reqRouter: async (filters) => { - return new Map(activeRelays.map((relay) => { - return [relay, filters]; - })); - }, - eventRouter: async (event) => { - const relaySet = await getRelays(await Storages.db(), event.pubkey); - relaySet.delete(Conf.relay); - - const relays = [...relaySet].slice(0, 4); - return relays; - }, - }); + const relay = await this.db(); + return new DittoPool({ conf: Conf, relay }); })(); } return this._client; diff --git a/packages/ditto/storages/DittoPool.ts b/packages/ditto/storages/DittoPool.ts new file mode 100644 index 00000000..53545128 --- /dev/null +++ b/packages/ditto/storages/DittoPool.ts @@ -0,0 +1,91 @@ +// deno-lint-ignore-file require-await +import { DittoConf } from '@ditto/conf'; +import { NostrEvent, NostrFilter, NPool, type NRelay, NRelay1 } from '@nostrify/nostrify'; +import { logi } from '@soapbox/logi'; + +interface DittoPoolOpts { + conf: DittoConf; + relay: NRelay; + maxEventRelays?: number; +} + +export class DittoPool extends NPool { + private _opts: DittoPoolOpts; + + constructor(opts: DittoPoolOpts) { + super({ + open(url) { + return new NRelay1(url, { + // Skip event verification (it's done in the pipeline). + verifyEvent: () => true, + log: logi, + }); + }, + reqRouter: (filters) => { + return this.reqRouter(filters); + }, + eventRouter: async (event) => { + return this.eventRouter(event); + }, + }); + + this._opts = opts; + } + + private async reqRouter(filters: NostrFilter[]): Promise> { + const routes = new Map(); + + for (const relayUrl of await this.getRelayUrls({ marker: 'read' })) { + routes.set(relayUrl, filters); + } + + return routes; + } + + private async eventRouter(event: NostrEvent): Promise { + const { conf, maxEventRelays = 4 } = this._opts; + const { pubkey } = event; + + const relaySet = await this.getRelayUrls({ pubkey, marker: 'write' }); + relaySet.delete(conf.relay); + + return [...relaySet].slice(0, maxEventRelays); + } + + private async getRelayUrls(opts: { pubkey?: string; marker?: 'read' | 'write' } = {}): Promise> { + const { conf, relay } = this._opts; + + const relays = new Set<`wss://${string}`>(); + const authors = new Set([await conf.signer.getPublicKey()]); + + if (opts.pubkey) { + authors.add(opts.pubkey); + } + + const events = await relay.query([ + { kinds: [10002], authors: [...authors] }, + ]); + + // Ensure user's own relay list is counted first. + if (opts.pubkey) { + events.sort((a) => a.pubkey === opts.pubkey ? -1 : 1); + } + + for (const event of events) { + for (const [name, relayUrl, marker] of event.tags) { + if (name === 'r' && (!marker || !opts.marker || marker === opts.marker)) { + try { + const url = new URL(relayUrl); + if (url.protocol === 'wss:') { + relays.add(url.toString() as `wss://${string}`); + } + } catch { + // fallthrough + } + } + } + } + + return relays; + } +} diff --git a/packages/ditto/utils/outbox.test.ts b/packages/ditto/utils/outbox.test.ts deleted file mode 100644 index 62dac2d0..00000000 --- a/packages/ditto/utils/outbox.test.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { MockRelay } from '@nostrify/nostrify/test'; -import { eventFixture } from '@/test.ts'; -import { getRelays } from '@/utils/outbox.ts'; -import { assertEquals } from '@std/assert'; - -Deno.test('Get write relays - kind 10002', async () => { - const db = new MockRelay(); - - const relayListMetadata = await eventFixture('kind-10002-alex'); - - await db.event(relayListMetadata); - - const relays = await getRelays(db, relayListMetadata.pubkey); - - assertEquals(relays.size, 6); -}); - -Deno.test('Get write relays with invalid URL - kind 10002', async () => { - const db = new MockRelay(); - - const relayListMetadata = await eventFixture('kind-10002-alex'); - relayListMetadata.tags[0] = ['r', 'yolo']; - - await db.event(relayListMetadata); - - const relays = await getRelays(db, relayListMetadata.pubkey); - - assertEquals(relays.size, 5); -}); diff --git a/packages/ditto/utils/outbox.ts b/packages/ditto/utils/outbox.ts deleted file mode 100644 index 074518bc..00000000 --- a/packages/ditto/utils/outbox.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { NStore } from '@nostrify/nostrify'; - -import { Conf } from '@/config.ts'; - -export async function getRelays(store: NStore, pubkey: string): Promise> { - const relays = new Set<`wss://${string}`>(); - - const events = await store.query([ - { kinds: [10002], authors: [pubkey, await Conf.signer.getPublicKey()], limit: 2 }, - ]); - - for (const event of events) { - for (const [name, relay, marker] of event.tags) { - if (name === 'r' && (marker === 'write' || !marker)) { - try { - const url = new URL(relay); - if (url.protocol === 'wss:') { - relays.add(url.toString() as `wss://${string}`); - } - } catch (_e) { - // fall through - } - } - } - } - - return relays; -}