mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 11:29:46 +00:00
Add basic Reqmeister module
This commit is contained in:
parent
5ea9cc399f
commit
4a32fe2c37
4 changed files with 151 additions and 10 deletions
|
|
@ -1,6 +1,6 @@
|
||||||
import { type Event, type Filter, matchFilters } from '@/deps.ts';
|
import { type Event, type Filter, matchFilters } from '@/deps.ts';
|
||||||
import * as pipeline from '@/pipeline.ts';
|
import * as pipeline from '@/pipeline.ts';
|
||||||
import { allRelays, pool } from '@/pool.ts';
|
import { activeRelays, pool } from '@/pool.ts';
|
||||||
|
|
||||||
import type { GetFiltersOpts } from '@/filter.ts';
|
import type { GetFiltersOpts } from '@/filter.ts';
|
||||||
|
|
||||||
|
|
@ -13,7 +13,7 @@ function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts
|
||||||
|
|
||||||
const unsub = pool.subscribe(
|
const unsub = pool.subscribe(
|
||||||
filters,
|
filters,
|
||||||
allRelays,
|
opts.relays ?? activeRelays,
|
||||||
(event: Event | null) => {
|
(event: Event | null) => {
|
||||||
if (event && matchFilters(filters, event)) {
|
if (event && matchFilters(filters, event)) {
|
||||||
pipeline.handleEvent(event).catch(() => {});
|
pipeline.handleEvent(event).catch(() => {});
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,8 @@ interface GetFiltersOpts {
|
||||||
timeout?: number;
|
timeout?: number;
|
||||||
/** Event limit for the whole subscription. */
|
/** Event limit for the whole subscription. */
|
||||||
limit?: number;
|
limit?: number;
|
||||||
|
/** Relays to use, if applicable. */
|
||||||
|
relays?: WebSocket['url'][];
|
||||||
}
|
}
|
||||||
|
|
||||||
function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean {
|
function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean {
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import { isEphemeralKind } from '@/kinds.ts';
|
||||||
import * as mixer from '@/mixer.ts';
|
import * as mixer from '@/mixer.ts';
|
||||||
import { publish } from '@/pool.ts';
|
import { publish } from '@/pool.ts';
|
||||||
import { isLocallyFollowed } from '@/queries.ts';
|
import { isLocallyFollowed } from '@/queries.ts';
|
||||||
|
import { Reqmeister } from '@/reqmeister.ts';
|
||||||
import { updateStats } from '@/stats.ts';
|
import { updateStats } from '@/stats.ts';
|
||||||
import { Sub } from '@/subs.ts';
|
import { Sub } from '@/subs.ts';
|
||||||
import { getTagSet } from '@/tags.ts';
|
import { getTagSet } from '@/tags.ts';
|
||||||
|
|
@ -17,20 +18,27 @@ import { verifySignatureWorker } from '@/workers/verify.ts';
|
||||||
|
|
||||||
import type { EventData } from '@/types.ts';
|
import type { EventData } from '@/types.ts';
|
||||||
|
|
||||||
|
const reqmeister = new Reqmeister({
|
||||||
|
delay: Time.seconds(1),
|
||||||
|
timeout: Time.seconds(1),
|
||||||
|
});
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Common pipeline function to process (and maybe store) events.
|
* Common pipeline function to process (and maybe store) events.
|
||||||
* It is idempotent, so it can be called multiple times for the same event.
|
* It is idempotent, so it can be called multiple times for the same event.
|
||||||
*/
|
*/
|
||||||
async function handleEvent(event: Event): Promise<void> {
|
async function handleEvent(event: Event): Promise<void> {
|
||||||
if (!(await verifySignatureWorker(event))) return;
|
if (!(await verifySignatureWorker(event))) return;
|
||||||
|
const wanted = reqmeister.isWanted(event);
|
||||||
if (encounterEvent(event)) return;
|
if (encounterEvent(event)) return;
|
||||||
const data = await getEventData(event);
|
const data = await getEventData(event);
|
||||||
|
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
storeEvent(event, data),
|
storeEvent(event, data, { force: wanted }),
|
||||||
processDeletions(event),
|
processDeletions(event),
|
||||||
trackRelays(event),
|
trackRelays(event),
|
||||||
trackHashtags(event),
|
trackHashtags(event),
|
||||||
|
trackRelatedEvents(event, data),
|
||||||
processMedia(event, data),
|
processMedia(event, data),
|
||||||
streamOut(event, data),
|
streamOut(event, data),
|
||||||
broadcast(event, data),
|
broadcast(event, data),
|
||||||
|
|
@ -38,13 +46,14 @@ async function handleEvent(event: Event): Promise<void> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Tracks encountered events to skip duplicates, improving idempotency and performance. */
|
/** Tracks encountered events to skip duplicates, improving idempotency and performance. */
|
||||||
const encounters = new LRUCache<string, boolean>({ max: 1000 });
|
const encounters = new LRUCache<Event['id'], true>({ max: 1000 });
|
||||||
|
|
||||||
/** Encounter the event, and return whether it has already been encountered. */
|
/** Encounter the event, and return whether it has already been encountered. */
|
||||||
function encounterEvent(event: Event) {
|
function encounterEvent(event: Event): boolean {
|
||||||
const result = encounters.get(event.id);
|
const result = encounters.get(event.id);
|
||||||
encounters.set(event.id, true);
|
encounters.set(event.id, true);
|
||||||
return result;
|
reqmeister.encounter(event);
|
||||||
|
return !!result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Preload data that will be useful to several tasks. */
|
/** Preload data that will be useful to several tasks. */
|
||||||
|
|
@ -56,11 +65,16 @@ async function getEventData({ pubkey }: Event): Promise<EventData> {
|
||||||
/** Check if the pubkey is the `DITTO_NSEC` pubkey. */
|
/** Check if the pubkey is the `DITTO_NSEC` pubkey. */
|
||||||
const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey;
|
const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey;
|
||||||
|
|
||||||
/** Maybe store the event, if eligible. */
|
interface StoreEventOpts {
|
||||||
async function storeEvent(event: Event, data: EventData): Promise<void> {
|
force?: boolean;
|
||||||
if (isEphemeralKind(event.kind)) return;
|
}
|
||||||
|
|
||||||
if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
|
/** Maybe store the event, if eligible. */
|
||||||
|
async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = {}): Promise<void> {
|
||||||
|
if (isEphemeralKind(event.kind)) return;
|
||||||
|
const { force = false } = opts;
|
||||||
|
|
||||||
|
if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
|
||||||
const [deletion] = await mixer.getFilters(
|
const [deletion] = await mixer.getFilters(
|
||||||
[{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }],
|
[{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }],
|
||||||
{ limit: 1, timeout: Time.seconds(1) },
|
{ limit: 1, timeout: Time.seconds(1) },
|
||||||
|
|
@ -128,6 +142,18 @@ function trackRelays(event: Event) {
|
||||||
return addRelays([...relays]);
|
return addRelays([...relays]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Track related events to fetch. */
|
||||||
|
function trackRelatedEvents(event: Event, data: EventData) {
|
||||||
|
if (!data.user) {
|
||||||
|
reqmeister.wantAuthor(event.pubkey);
|
||||||
|
}
|
||||||
|
for (const [name, id, relay] of event.tags) {
|
||||||
|
if (name === 'e' && !encounters.has(id)) {
|
||||||
|
reqmeister.wantEvent(id, [relay]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Delete unattached media entries that are attached to the event. */
|
/** Delete unattached media entries that are attached to the event. */
|
||||||
function processMedia({ tags, pubkey }: Event, { user }: EventData) {
|
function processMedia({ tags, pubkey }: Event, { user }: EventData) {
|
||||||
if (user) {
|
if (user) {
|
||||||
|
|
|
||||||
113
src/reqmeister.ts
Normal file
113
src/reqmeister.ts
Normal file
|
|
@ -0,0 +1,113 @@
|
||||||
|
import * as client from '@/client.ts';
|
||||||
|
import { Event, Filter } from '@/deps.ts';
|
||||||
|
|
||||||
|
interface ReqmeisterOpts {
|
||||||
|
delay?: number;
|
||||||
|
timeout?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
class Reqmeister {
|
||||||
|
#opts: ReqmeisterOpts;
|
||||||
|
|
||||||
|
#wantedEvents = new Map<Event['id'], Set<WebSocket['url']>>();
|
||||||
|
#wantedAuthors = new Map<Event['pubkey'], Set<WebSocket['url']>>();
|
||||||
|
|
||||||
|
constructor(opts: ReqmeisterOpts = {}) {
|
||||||
|
this.#opts = opts;
|
||||||
|
this.#perform();
|
||||||
|
}
|
||||||
|
|
||||||
|
async #perform() {
|
||||||
|
const { delay, timeout } = this.#opts;
|
||||||
|
if (delay) {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||||
|
}
|
||||||
|
|
||||||
|
const relaysWantedEvents = new Map<WebSocket['url'], Set<Event['id']>>();
|
||||||
|
const relaysWantedAuthors = new Map<WebSocket['url'], Set<Event['pubkey']>>();
|
||||||
|
|
||||||
|
const allRelays = new Set<WebSocket['url']>(
|
||||||
|
...relaysWantedEvents.keys(),
|
||||||
|
...relaysWantedAuthors.keys(),
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const [eventId, relays] of this.#wantedEvents) {
|
||||||
|
for (const relay of relays) {
|
||||||
|
const relaysSet = relaysWantedEvents.get(relay);
|
||||||
|
if (relaysSet) {
|
||||||
|
relaysSet.add(eventId);
|
||||||
|
} else {
|
||||||
|
relaysWantedEvents.set(relay, new Set([eventId]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const [author, relays] of this.#wantedAuthors) {
|
||||||
|
for (const relay of relays) {
|
||||||
|
const relaysSet = relaysWantedAuthors.get(relay);
|
||||||
|
if (relaysSet) {
|
||||||
|
relaysSet.add(author);
|
||||||
|
} else {
|
||||||
|
relaysWantedAuthors.set(relay, new Set([author]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const promises: ReturnType<typeof client.getFilters>[] = [];
|
||||||
|
|
||||||
|
for (const relay of allRelays) {
|
||||||
|
const wantedEvents = relaysWantedEvents.get(relay);
|
||||||
|
const wantedAuthors = relaysWantedAuthors.get(relay);
|
||||||
|
|
||||||
|
const filters: Filter[] = [];
|
||||||
|
|
||||||
|
if (wantedEvents) filters.push({ ids: [...wantedEvents] });
|
||||||
|
if (wantedAuthors) filters.push({ authors: [...wantedAuthors] });
|
||||||
|
|
||||||
|
console.log('reqmeister:', [relay, filters]);
|
||||||
|
promises.push(
|
||||||
|
client.getFilters(filters, { relays: [relay], timeout }),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
await Promise.all(promises);
|
||||||
|
this.#perform();
|
||||||
|
}
|
||||||
|
|
||||||
|
wantEvent(eventId: Event['id'], relays: WebSocket['url'][] = []) {
|
||||||
|
const relaysSet = this.#wantedEvents.get(eventId);
|
||||||
|
if (relaysSet) {
|
||||||
|
for (const relay of relays) {
|
||||||
|
relaysSet.add(relay);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
this.#wantedEvents.set(eventId, new Set(relays));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wantAuthor(author: Event['pubkey'], relays: WebSocket['url'][] = []) {
|
||||||
|
const relaysSet = this.#wantedAuthors.get(author);
|
||||||
|
if (relaysSet) {
|
||||||
|
for (const relay of relays) {
|
||||||
|
relaysSet.add(relay);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
this.#wantedAuthors.set(author, new Set(relays));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
encounter(event: Event): void {
|
||||||
|
this.#wantedEvents.delete(event.id);
|
||||||
|
if (event.kind === 0) {
|
||||||
|
this.#wantedAuthors.delete(event.pubkey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
isWanted(event: Event): boolean {
|
||||||
|
if (this.#wantedEvents.has(event.id)) return true;
|
||||||
|
if (event.kind === 0 && this.#wantedAuthors.has(event.pubkey)) return true;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export { Reqmeister };
|
||||||
Loading…
Add table
Reference in a new issue