diff --git a/src/common.ts b/src/common.ts new file mode 100644 index 00000000..14614597 --- /dev/null +++ b/src/common.ts @@ -0,0 +1,9 @@ +import { Reqmeister } from '@/reqmeister.ts'; +import { Time } from '@/utils/time.ts'; + +const reqmeister = new Reqmeister({ + delay: Time.seconds(1), + timeout: Time.seconds(1), +}); + +export { reqmeister }; diff --git a/src/filter.ts b/src/filter.ts index 35a5098a..7fb02974 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -3,6 +3,8 @@ import { type Event, type Filter, matchFilters } from '@/deps.ts'; import type { EventData } from '@/types.ts'; +import stringifyStable from 'npm:fast-stable-stringify'; + /** Additional properties that may be added by Ditto to events. */ type Relation = 'author' | 'author_stats' | 'event_stats'; @@ -14,6 +16,9 @@ interface DittoFilter extends Filter { relations?: Relation[]; } +/** Filter to get one specific event. */ +type MicroFilter = { ids: [Event['id']] } | { kinds: [0]; authors: [Event['pubkey']] }; + /** Additional options to apply to the whole subscription. */ interface GetFiltersOpts { /** How long to wait (in milliseconds) until aborting the request. */ @@ -46,4 +51,33 @@ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData return false; } -export { type DittoFilter, type GetFiltersOpts, matchDittoFilters, type Relation }; +/** Get deterministic ID for a microfilter. */ +function getFilterId(filter: MicroFilter): string { + if ('ids' in filter) { + return stringifyStable({ ids: [filter.ids] }); + } else { + return stringifyStable({ + kinds: [filter.kinds[0]], + authors: [filter.authors[0]], + }); + } +} + +/** Get a microfilter from a Nostr event. */ +function eventToMicroFilter(event: Event): MicroFilter { + if (event.kind === 0) { + return { kinds: [0], authors: [event.pubkey] }; + } else { + return { ids: [event.id] }; + } +} + +export { + type DittoFilter, + eventToMicroFilter, + getFilterId, + type GetFiltersOpts, + matchDittoFilters, + type MicroFilter, + type Relation, +}; diff --git a/src/pipeline.ts b/src/pipeline.ts index 5998396d..0c7520be 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,3 +1,4 @@ +import { reqmeister } from '@/common.ts'; import { Conf } from '@/config.ts'; import * as eventsDB from '@/db/events.ts'; import { addRelays } from '@/db/relays.ts'; @@ -8,7 +9,6 @@ import { isEphemeralKind } from '@/kinds.ts'; import * as mixer from '@/mixer.ts'; import { publish } from '@/pool.ts'; import { isLocallyFollowed } from '@/queries.ts'; -import { Reqmeister } from '@/reqmeister.ts'; import { updateStats } from '@/stats.ts'; import { Sub } from '@/subs.ts'; import { getTagSet } from '@/tags.ts'; @@ -18,11 +18,6 @@ import { verifySignatureWorker } from '@/workers/verify.ts'; import type { EventData } from '@/types.ts'; -const reqmeister = new Reqmeister({ - delay: Time.seconds(1), - timeout: Time.seconds(1), -}); - /** * Common pipeline function to process (and maybe store) events. * It is idempotent, so it can be called multiple times for the same event. @@ -145,11 +140,11 @@ function trackRelays(event: Event) { /** Track related events to fetch. */ function trackRelatedEvents(event: Event, data: EventData) { if (!data.user) { - reqmeister.wantAuthor(event.pubkey); + reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {}); } for (const [name, id, relay] of event.tags) { if (name === 'e' && !encounters.has(id)) { - reqmeister.wantEvent(id, [relay]); + reqmeister.req({ ids: [id] }, [relay]).catch(() => {}); } } } diff --git a/src/queries.ts b/src/queries.ts index edd5f4aa..6a9557d9 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -2,6 +2,7 @@ import * as eventsDB from '@/db/events.ts'; import { type Event, findReplyTag } from '@/deps.ts'; import { type DittoFilter, type Relation } from '@/filter.ts'; import * as mixer from '@/mixer.ts'; +import { reqmeister } from '@/common.ts'; interface GetEventOpts { /** Timeout in milliseconds. */ @@ -30,10 +31,10 @@ const getEvent = async ( const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise | undefined> => { const { relations, timeout = 1000 } = opts; - const [event] = await mixer.getFilters( + const event = await eventsDB.getFilters( [{ authors: [pubkey], relations, kinds: [0], limit: 1 }], { limit: 1, timeout }, - ); + ).then(([event]) => event) || await reqmeister.req({ kinds: [0], authors: [pubkey] }).catch(() => {}); return event; }; diff --git a/src/reqmeister.ts b/src/reqmeister.ts index 5fe6e17a..f7f469e1 100644 --- a/src/reqmeister.ts +++ b/src/reqmeister.ts @@ -1,112 +1,88 @@ import * as client from '@/client.ts'; import { Event, Filter } from '@/deps.ts'; +import { EventEmitter } from 'npm:tseep'; +import { eventToMicroFilter, getFilterId, type MicroFilter } from '@/filter.ts'; + interface ReqmeisterOpts { delay?: number; timeout?: number; } -class Reqmeister { - #opts: ReqmeisterOpts; +type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]]; - #wantedEvents = new Map>(); - #wantedAuthors = new Map>(); +class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> { + #opts: ReqmeisterOpts; + #queue: ReqmeisterQueueItem[] = []; + #promise!: Promise; + #resolve!: () => void; constructor(opts: ReqmeisterOpts = {}) { + super(); this.#opts = opts; + this.#cycle(); this.#perform(); } + #cycle() { + this.#resolve?.(); + this.#promise = new Promise((resolve) => { + this.#resolve = resolve; + }); + } + async #perform() { - const { delay, timeout } = this.#opts; - if (delay) { - await new Promise((resolve) => setTimeout(resolve, delay)); - } + const { delay } = this.#opts; + await new Promise((resolve) => setTimeout(resolve, delay)); - const relaysWantedEvents = new Map>(); - const relaysWantedAuthors = new Map>(); + const queue = this.#queue; + this.#queue = []; - const allRelays = new Set( - ...relaysWantedEvents.keys(), - ...relaysWantedAuthors.keys(), - ); + const wantedEvents = new Set(); + const wantedAuthors = new Set(); - for (const [eventId, relays] of this.#wantedEvents) { - for (const relay of relays) { - const relaysSet = relaysWantedEvents.get(relay); - if (relaysSet) { - relaysSet.add(eventId); - } else { - relaysWantedEvents.set(relay, new Set([eventId])); - } + // TODO: batch by relays. + for (const [_filterId, filter, _relays] of queue) { + if ('ids' in filter) { + filter.ids.forEach((id) => wantedEvents.add(id)); + } else { + wantedAuthors.add(filter.authors[0]); } } - for (const [author, relays] of this.#wantedAuthors) { - for (const relay of relays) { - const relaysSet = relaysWantedAuthors.get(relay); - if (relaysSet) { - relaysSet.add(author); - } else { - relaysWantedAuthors.set(relay, new Set([author])); - } - } + const filters: Filter[] = []; + + if (wantedEvents.size) filters.push({ ids: [...wantedEvents] }); + if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] }); + + const events = await client.getFilters(filters, { timeout: this.#opts.timeout }); + + for (const event of events) { + this.encounter(event); } - const promises: ReturnType[] = []; - - for (const relay of allRelays) { - const wantedEvents = relaysWantedEvents.get(relay); - const wantedAuthors = relaysWantedAuthors.get(relay); - - const filters: Filter[] = []; - - if (wantedEvents) filters.push({ ids: [...wantedEvents] }); - if (wantedAuthors) filters.push({ authors: [...wantedAuthors] }); - - console.log('reqmeister:', [relay, filters]); - promises.push( - client.getFilters(filters, { relays: [relay], timeout }), - ); - } - - await Promise.all(promises); + this.#cycle(); this.#perform(); } - wantEvent(eventId: Event['id'], relays: WebSocket['url'][] = []) { - const relaysSet = this.#wantedEvents.get(eventId); - if (relaysSet) { - for (const relay of relays) { - relaysSet.add(relay); - } - } else { - this.#wantedEvents.set(eventId, new Set(relays)); - } - } - - wantAuthor(author: Event['pubkey'], relays: WebSocket['url'][] = []) { - const relaysSet = this.#wantedAuthors.get(author); - if (relaysSet) { - for (const relay of relays) { - relaysSet.add(relay); - } - } else { - this.#wantedAuthors.set(author, new Set(relays)); - } + req(filter: MicroFilter, relays: WebSocket['url'][] = []): Promise { + const filterId = getFilterId(filter); + this.#queue.push([filterId, filter, relays]); + return new Promise((resolve, reject) => { + this.once(filterId, resolve); + this.#promise.finally(reject); + }); } encounter(event: Event): void { - this.#wantedEvents.delete(event.id); - if (event.kind === 0) { - this.#wantedAuthors.delete(event.pubkey); - } + const filterId = getFilterId(eventToMicroFilter(event)); + this.#queue = this.#queue.filter(([id]) => id !== filterId); + this.emit(filterId, event); } isWanted(event: Event): boolean { - if (this.#wantedEvents.has(event.id)) return true; - if (event.kind === 0 && this.#wantedAuthors.has(event.pubkey)) return true; - return false; + const filterId = getFilterId(eventToMicroFilter(event)); + return this.#queue.some(([id]) => id === filterId); } }