diff --git a/packages/ditto/storages/DittoPool.ts b/packages/ditto/storages/DittoPool.ts index 53545128..eba3c632 100644 --- a/packages/ditto/storages/DittoPool.ts +++ b/packages/ditto/storages/DittoPool.ts @@ -6,6 +6,7 @@ import { logi } from '@soapbox/logi'; interface DittoPoolOpts { conf: DittoConf; relay: NRelay; + maxReqRelays?: number; maxEventRelays?: number; } @@ -33,17 +34,65 @@ export class DittoPool extends NPool { } private async reqRouter(filters: NostrFilter[]): Promise> { - const routes = new Map(); + const { conf, relay, maxReqRelays = 5 } = this._opts; - for (const relayUrl of await this.getRelayUrls({ marker: 'read' })) { - routes.set(relayUrl, filters); + const routes = new Map(); + const authors = new Set(); + + for (const filter of filters) { + if (filter.authors) { + for (const author of filter.authors) { + authors.add(author); + } + } + } + + if (!authors.size) { + return routes; + } + + const pubkey = await conf.signer.getPublicKey(); + const map = new Map(); + + for (const event of await relay.query([{ kinds: [10002], authors: [pubkey, ...authors] }])) { + map.set(event.pubkey, event); + } + + for (const filter of filters) { + if (filter.authors) { + const relayAuthors = new Map<`wss://${string}`, Set>(); + + for (const author of filter.authors) { + const event = map.get(author) ?? map.get(pubkey); + if (event) { + for (const relayUrl of [...this.getEventRelayUrls(event, 'write')].slice(0, maxReqRelays)) { + const value = relayAuthors.get(relayUrl); + relayAuthors.set(relayUrl, value ? new Set([...value, author]) : new Set([author])); + } + } + } + + for (const [relayUrl, authors] of relayAuthors) { + const value = routes.get(relayUrl); + const _filter = { ...filter, authors: [...authors] }; + routes.set(relayUrl, value ? [...value, _filter] : [_filter]); + } + } else { + const event = map.get(pubkey); + if (event) { + for (const relayUrl of [...this.getEventRelayUrls(event, 'read')].slice(0, maxReqRelays)) { + const value = routes.get(relayUrl); + routes.set(relayUrl, value ? [...value, filter] : [filter]); + } + } + } } return routes; } private async eventRouter(event: NostrEvent): Promise { - const { conf, maxEventRelays = 4 } = this._opts; + const { conf, maxEventRelays = 10 } = this._opts; const { pubkey } = event; const relaySet = await this.getRelayUrls({ pubkey, marker: 'write' }); @@ -72,16 +121,26 @@ export class DittoPool extends NPool { } for (const event of events) { - for (const [name, relayUrl, marker] of event.tags) { - if (name === 'r' && (!marker || !opts.marker || marker === opts.marker)) { - try { - const url = new URL(relayUrl); - if (url.protocol === 'wss:') { - relays.add(url.toString() as `wss://${string}`); - } - } catch { - // fallthrough + for (const relayUrl of this.getEventRelayUrls(event, opts.marker)) { + relays.add(relayUrl); + } + } + + return relays; + } + + private getEventRelayUrls(event: NostrEvent, marker?: 'read' | 'write'): Set<`wss://${string}`> { + const relays = new Set<`wss://${string}`>(); + + for (const [name, relayUrl, _marker] of event.tags) { + if (name === 'r' && (!marker || !_marker || marker === _marker)) { + try { + const url = new URL(relayUrl); + if (url.protocol === 'wss:') { + relays.add(url.toString() as `wss://${string}`); } + } catch { + // fallthrough } } }