From 4a32fe2c372f7e5592e21473c1238d0ba195ab60 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 21 Dec 2023 14:56:21 -0600 Subject: [PATCH] Add basic Reqmeister module --- src/client.ts | 4 +- src/filter.ts | 2 + src/pipeline.ts | 42 +++++++++++++---- src/reqmeister.ts | 113 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 151 insertions(+), 10 deletions(-) create mode 100644 src/reqmeister.ts diff --git a/src/client.ts b/src/client.ts index c3d7cd9a..1d7cd87a 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,6 +1,6 @@ import { type Event, type Filter, matchFilters } from '@/deps.ts'; import * as pipeline from '@/pipeline.ts'; -import { allRelays, pool } from '@/pool.ts'; +import { activeRelays, pool } from '@/pool.ts'; import type { GetFiltersOpts } from '@/filter.ts'; @@ -13,7 +13,7 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts const unsub = pool.subscribe( filters, - allRelays, + opts.relays ?? activeRelays, (event: Event | null) => { if (event && matchFilters(filters, event)) { pipeline.handleEvent(event).catch(() => {}); diff --git a/src/filter.ts b/src/filter.ts index 76a0fcde..35a5098a 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -20,6 +20,8 @@ interface GetFiltersOpts { timeout?: number; /** Event limit for the whole subscription. */ limit?: number; + /** Relays to use, if applicable. */ + relays?: WebSocket['url'][]; } function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean { diff --git a/src/pipeline.ts b/src/pipeline.ts index 4f0c51ab..5998396d 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -8,6 +8,7 @@ import { isEphemeralKind } from '@/kinds.ts'; import * as mixer from '@/mixer.ts'; import { publish } from '@/pool.ts'; import { isLocallyFollowed } from '@/queries.ts'; +import { Reqmeister } from '@/reqmeister.ts'; import { updateStats } from '@/stats.ts'; import { Sub } from '@/subs.ts'; import { getTagSet } from '@/tags.ts'; @@ -17,20 +18,27 @@ import { verifySignatureWorker } from '@/workers/verify.ts'; import type { EventData } from '@/types.ts'; +const reqmeister = new Reqmeister({ + delay: Time.seconds(1), + timeout: Time.seconds(1), +}); + /** * Common pipeline function to process (and maybe store) events. * It is idempotent, so it can be called multiple times for the same event. */ async function handleEvent(event: Event): Promise { if (!(await verifySignatureWorker(event))) return; + const wanted = reqmeister.isWanted(event); if (encounterEvent(event)) return; const data = await getEventData(event); await Promise.all([ - storeEvent(event, data), + storeEvent(event, data, { force: wanted }), processDeletions(event), trackRelays(event), trackHashtags(event), + trackRelatedEvents(event, data), processMedia(event, data), streamOut(event, data), broadcast(event, data), @@ -38,13 +46,14 @@ async function handleEvent(event: Event): Promise { } /** Tracks encountered events to skip duplicates, improving idempotency and performance. */ -const encounters = new LRUCache({ max: 1000 }); +const encounters = new LRUCache({ max: 1000 }); /** Encounter the event, and return whether it has already been encountered. */ -function encounterEvent(event: Event) { +function encounterEvent(event: Event): boolean { const result = encounters.get(event.id); encounters.set(event.id, true); - return result; + reqmeister.encounter(event); + return !!result; } /** Preload data that will be useful to several tasks. */ @@ -56,11 +65,16 @@ async function getEventData({ pubkey }: Event): Promise { /** Check if the pubkey is the `DITTO_NSEC` pubkey. */ const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey; -/** Maybe store the event, if eligible. */ -async function storeEvent(event: Event, data: EventData): Promise { - if (isEphemeralKind(event.kind)) return; +interface StoreEventOpts { + force?: boolean; +} - if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { +/** Maybe store the event, if eligible. */ +async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = {}): Promise { + if (isEphemeralKind(event.kind)) return; + const { force = false } = opts; + + if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { const [deletion] = await mixer.getFilters( [{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }], { limit: 1, timeout: Time.seconds(1) }, @@ -128,6 +142,18 @@ function trackRelays(event: Event) { return addRelays([...relays]); } +/** Track related events to fetch. */ +function trackRelatedEvents(event: Event, data: EventData) { + if (!data.user) { + reqmeister.wantAuthor(event.pubkey); + } + for (const [name, id, relay] of event.tags) { + if (name === 'e' && !encounters.has(id)) { + reqmeister.wantEvent(id, [relay]); + } + } +} + /** Delete unattached media entries that are attached to the event. */ function processMedia({ tags, pubkey }: Event, { user }: EventData) { if (user) { diff --git a/src/reqmeister.ts b/src/reqmeister.ts new file mode 100644 index 00000000..5fe6e17a --- /dev/null +++ b/src/reqmeister.ts @@ -0,0 +1,113 @@ +import * as client from '@/client.ts'; +import { Event, Filter } from '@/deps.ts'; + +interface ReqmeisterOpts { + delay?: number; + timeout?: number; +} + +class Reqmeister { + #opts: ReqmeisterOpts; + + #wantedEvents = new Map>(); + #wantedAuthors = new Map>(); + + constructor(opts: ReqmeisterOpts = {}) { + this.#opts = opts; + this.#perform(); + } + + async #perform() { + const { delay, timeout } = this.#opts; + if (delay) { + await new Promise((resolve) => setTimeout(resolve, delay)); + } + + const relaysWantedEvents = new Map>(); + const relaysWantedAuthors = new Map>(); + + const allRelays = new Set( + ...relaysWantedEvents.keys(), + ...relaysWantedAuthors.keys(), + ); + + for (const [eventId, relays] of this.#wantedEvents) { + for (const relay of relays) { + const relaysSet = relaysWantedEvents.get(relay); + if (relaysSet) { + relaysSet.add(eventId); + } else { + relaysWantedEvents.set(relay, new Set([eventId])); + } + } + } + + for (const [author, relays] of this.#wantedAuthors) { + for (const relay of relays) { + const relaysSet = relaysWantedAuthors.get(relay); + if (relaysSet) { + relaysSet.add(author); + } else { + relaysWantedAuthors.set(relay, new Set([author])); + } + } + } + + const promises: ReturnType[] = []; + + for (const relay of allRelays) { + const wantedEvents = relaysWantedEvents.get(relay); + const wantedAuthors = relaysWantedAuthors.get(relay); + + const filters: Filter[] = []; + + if (wantedEvents) filters.push({ ids: [...wantedEvents] }); + if (wantedAuthors) filters.push({ authors: [...wantedAuthors] }); + + console.log('reqmeister:', [relay, filters]); + promises.push( + client.getFilters(filters, { relays: [relay], timeout }), + ); + } + + await Promise.all(promises); + this.#perform(); + } + + wantEvent(eventId: Event['id'], relays: WebSocket['url'][] = []) { + const relaysSet = this.#wantedEvents.get(eventId); + if (relaysSet) { + for (const relay of relays) { + relaysSet.add(relay); + } + } else { + this.#wantedEvents.set(eventId, new Set(relays)); + } + } + + wantAuthor(author: Event['pubkey'], relays: WebSocket['url'][] = []) { + const relaysSet = this.#wantedAuthors.get(author); + if (relaysSet) { + for (const relay of relays) { + relaysSet.add(relay); + } + } else { + this.#wantedAuthors.set(author, new Set(relays)); + } + } + + encounter(event: Event): void { + this.#wantedEvents.delete(event.id); + if (event.kind === 0) { + this.#wantedAuthors.delete(event.pubkey); + } + } + + isWanted(event: Event): boolean { + if (this.#wantedEvents.has(event.id)) return true; + if (event.kind === 0 && this.#wantedAuthors.has(event.pubkey)) return true; + return false; + } +} + +export { Reqmeister };