mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 11:29:46 +00:00
Merge branch 'threads' into 'main'
Reqmeister See merge request soapbox-pub/ditto!84
This commit is contained in:
commit
8f414a9aee
9 changed files with 476 additions and 29 deletions
|
|
@ -14,7 +14,7 @@ function getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts
|
||||||
|
|
||||||
const unsub = pool.subscribe(
|
const unsub = pool.subscribe(
|
||||||
filters,
|
filters,
|
||||||
activeRelays,
|
opts.relays ?? activeRelays,
|
||||||
(event: Event | null) => {
|
(event: Event | null) => {
|
||||||
if (event && matchFilters(filters, event)) {
|
if (event && matchFilters(filters, event)) {
|
||||||
pipeline.handleEvent(event).catch(() => {});
|
pipeline.handleEvent(event).catch(() => {});
|
||||||
|
|
|
||||||
9
src/common.ts
Normal file
9
src/common.ts
Normal file
|
|
@ -0,0 +1,9 @@
|
||||||
|
import { Reqmeister } from '@/reqmeister.ts';
|
||||||
|
import { Time } from '@/utils/time.ts';
|
||||||
|
|
||||||
|
const reqmeister = new Reqmeister({
|
||||||
|
delay: Time.seconds(1),
|
||||||
|
signal: AbortSignal.timeout(Time.seconds(1)),
|
||||||
|
});
|
||||||
|
|
||||||
|
export { reqmeister };
|
||||||
16
src/cron.ts
16
src/cron.ts
|
|
@ -1,22 +1,9 @@
|
||||||
import * as eventsDB from '@/db/events.ts';
|
|
||||||
import { deleteUnattachedMediaByUrl, getUnattachedMedia } from '@/db/unattached-media.ts';
|
import { deleteUnattachedMediaByUrl, getUnattachedMedia } from '@/db/unattached-media.ts';
|
||||||
import { cron } from '@/deps.ts';
|
import { cron } from '@/deps.ts';
|
||||||
import { Time } from '@/utils/time.ts';
|
import { Time } from '@/utils/time.ts';
|
||||||
import { configUploader as uploader } from '@/uploaders/config.ts';
|
import { configUploader as uploader } from '@/uploaders/config.ts';
|
||||||
import { cidFromUrl } from '@/utils/ipfs.ts';
|
import { cidFromUrl } from '@/utils/ipfs.ts';
|
||||||
|
|
||||||
/** Clean up old remote events. */
|
|
||||||
async function cleanupEvents() {
|
|
||||||
console.log('Cleaning up old remote events...');
|
|
||||||
|
|
||||||
const [result] = await eventsDB.deleteFilters([{
|
|
||||||
until: Math.floor((Date.now() - Time.days(7)) / 1000),
|
|
||||||
local: false,
|
|
||||||
}]);
|
|
||||||
|
|
||||||
console.log(`Cleaned up ${result?.numDeletedRows ?? 0} old remote events.`);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Delete files that aren't attached to any events. */
|
/** Delete files that aren't attached to any events. */
|
||||||
async function cleanupMedia() {
|
async function cleanupMedia() {
|
||||||
console.log('Deleting orphaned media files...');
|
console.log('Deleting orphaned media files...');
|
||||||
|
|
@ -38,8 +25,5 @@ async function cleanupMedia() {
|
||||||
console.log(`Removed ${media?.length ?? 0} orphaned media files.`);
|
console.log(`Removed ${media?.length ?? 0} orphaned media files.`);
|
||||||
}
|
}
|
||||||
|
|
||||||
await cleanupEvents();
|
|
||||||
await cleanupMedia();
|
await cleanupMedia();
|
||||||
|
|
||||||
cron.every15Minute(cleanupEvents);
|
|
||||||
cron.every15Minute(cleanupMedia);
|
cron.every15Minute(cleanupMedia);
|
||||||
|
|
|
||||||
|
|
@ -81,5 +81,7 @@ export { Machina } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/08a1
|
||||||
export * as Sentry from 'https://deno.land/x/sentry@7.78.0/index.js';
|
export * as Sentry from 'https://deno.land/x/sentry@7.78.0/index.js';
|
||||||
export { sentry as sentryMiddleware } from 'npm:@hono/sentry@^1.0.0';
|
export { sentry as sentryMiddleware } from 'npm:@hono/sentry@^1.0.0';
|
||||||
export * as Comlink from 'npm:comlink@^4.4.1';
|
export * as Comlink from 'npm:comlink@^4.4.1';
|
||||||
|
export { EventEmitter } from 'npm:tseep@^1.1.3';
|
||||||
|
export { default as stringifyStable } from 'npm:fast-stable-stringify@^1.0.0';
|
||||||
|
|
||||||
export type * as TypeFest from 'npm:type-fest@^4.3.0';
|
export type * as TypeFest from 'npm:type-fest@^4.3.0';
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
import { type Event, type Filter, matchFilters } from '@/deps.ts';
|
import { type Event, type Filter, matchFilters, stringifyStable } from '@/deps.ts';
|
||||||
|
|
||||||
import type { EventData } from '@/types.ts';
|
import type { EventData } from '@/types.ts';
|
||||||
|
|
||||||
|
|
@ -14,12 +14,17 @@ interface DittoFilter<K extends number = number> extends Filter<K> {
|
||||||
relations?: Relation[];
|
relations?: Relation[];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Filter to get one specific event. */
|
||||||
|
type MicroFilter = { ids: [Event['id']] } | { kinds: [0]; authors: [Event['pubkey']] };
|
||||||
|
|
||||||
/** Additional options to apply to the whole subscription. */
|
/** Additional options to apply to the whole subscription. */
|
||||||
interface GetFiltersOpts {
|
interface GetFiltersOpts {
|
||||||
/** Signal to abort the request. */
|
/** Signal to abort the request. */
|
||||||
signal?: AbortSignal;
|
signal?: AbortSignal;
|
||||||
/** Event limit for the whole subscription. */
|
/** Event limit for the whole subscription. */
|
||||||
limit?: number;
|
limit?: number;
|
||||||
|
/** Relays to use, if applicable. */
|
||||||
|
relays?: WebSocket['url'][];
|
||||||
}
|
}
|
||||||
|
|
||||||
function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean {
|
function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean {
|
||||||
|
|
@ -44,4 +49,33 @@ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
export { type DittoFilter, type GetFiltersOpts, matchDittoFilters, type Relation };
|
/** Get deterministic ID for a microfilter. */
|
||||||
|
function getFilterId(filter: MicroFilter): string {
|
||||||
|
if ('ids' in filter) {
|
||||||
|
return stringifyStable({ ids: [filter.ids] });
|
||||||
|
} else {
|
||||||
|
return stringifyStable({
|
||||||
|
kinds: [filter.kinds[0]],
|
||||||
|
authors: [filter.authors[0]],
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Get a microfilter from a Nostr event. */
|
||||||
|
function eventToMicroFilter(event: Event): MicroFilter {
|
||||||
|
if (event.kind === 0) {
|
||||||
|
return { kinds: [0], authors: [event.pubkey] };
|
||||||
|
} else {
|
||||||
|
return { ids: [event.id] };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export {
|
||||||
|
type DittoFilter,
|
||||||
|
eventToMicroFilter,
|
||||||
|
getFilterId,
|
||||||
|
type GetFiltersOpts,
|
||||||
|
matchDittoFilters,
|
||||||
|
type MicroFilter,
|
||||||
|
type Relation,
|
||||||
|
};
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
import { reqmeister } from '@/common.ts';
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
import * as eventsDB from '@/db/events.ts';
|
import * as eventsDB from '@/db/events.ts';
|
||||||
import { addRelays } from '@/db/relays.ts';
|
import { addRelays } from '@/db/relays.ts';
|
||||||
|
|
@ -23,15 +24,17 @@ import type { EventData } from '@/types.ts';
|
||||||
*/
|
*/
|
||||||
async function handleEvent(event: Event): Promise<void> {
|
async function handleEvent(event: Event): Promise<void> {
|
||||||
if (!(await verifySignatureWorker(event))) return;
|
if (!(await verifySignatureWorker(event))) return;
|
||||||
|
const wanted = reqmeister.isWanted(event);
|
||||||
if (encounterEvent(event)) return;
|
if (encounterEvent(event)) return;
|
||||||
console.info(`pipeline: Event<${event.kind}> ${event.id}`);
|
console.info(`pipeline: Event<${event.kind}> ${event.id}`);
|
||||||
const data = await getEventData(event);
|
const data = await getEventData(event);
|
||||||
|
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
storeEvent(event, data),
|
storeEvent(event, data, { force: wanted }),
|
||||||
processDeletions(event),
|
processDeletions(event),
|
||||||
trackRelays(event),
|
trackRelays(event),
|
||||||
trackHashtags(event),
|
trackHashtags(event),
|
||||||
|
fetchRelatedEvents(event, data),
|
||||||
processMedia(event, data),
|
processMedia(event, data),
|
||||||
streamOut(event, data),
|
streamOut(event, data),
|
||||||
broadcast(event, data),
|
broadcast(event, data),
|
||||||
|
|
@ -39,13 +42,14 @@ async function handleEvent(event: Event): Promise<void> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Tracks encountered events to skip duplicates, improving idempotency and performance. */
|
/** Tracks encountered events to skip duplicates, improving idempotency and performance. */
|
||||||
const encounters = new LRUCache<string, boolean>({ max: 1000 });
|
const encounters = new LRUCache<Event['id'], true>({ max: 1000 });
|
||||||
|
|
||||||
/** Encounter the event, and return whether it has already been encountered. */
|
/** Encounter the event, and return whether it has already been encountered. */
|
||||||
function encounterEvent(event: Event) {
|
function encounterEvent(event: Event): boolean {
|
||||||
const result = encounters.get(event.id);
|
const result = encounters.get(event.id);
|
||||||
encounters.set(event.id, true);
|
encounters.set(event.id, true);
|
||||||
return result;
|
reqmeister.encounter(event);
|
||||||
|
return !!result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Preload data that will be useful to several tasks. */
|
/** Preload data that will be useful to several tasks. */
|
||||||
|
|
@ -57,11 +61,16 @@ async function getEventData({ pubkey }: Event): Promise<EventData> {
|
||||||
/** Check if the pubkey is the `DITTO_NSEC` pubkey. */
|
/** Check if the pubkey is the `DITTO_NSEC` pubkey. */
|
||||||
const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey;
|
const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey;
|
||||||
|
|
||||||
/** Maybe store the event, if eligible. */
|
interface StoreEventOpts {
|
||||||
async function storeEvent(event: Event, data: EventData): Promise<void> {
|
force?: boolean;
|
||||||
if (isEphemeralKind(event.kind)) return;
|
}
|
||||||
|
|
||||||
if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
|
/** Maybe store the event, if eligible. */
|
||||||
|
async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = {}): Promise<void> {
|
||||||
|
if (isEphemeralKind(event.kind)) return;
|
||||||
|
const { force = false } = opts;
|
||||||
|
|
||||||
|
if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {
|
||||||
const [deletion] = await mixer.getFilters(
|
const [deletion] = await mixer.getFilters(
|
||||||
[{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }],
|
[{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }],
|
||||||
{ limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) },
|
{ limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) },
|
||||||
|
|
@ -129,6 +138,18 @@ function trackRelays(event: Event) {
|
||||||
return addRelays([...relays]);
|
return addRelays([...relays]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Queue related events to fetch. */
|
||||||
|
function fetchRelatedEvents(event: Event, data: EventData) {
|
||||||
|
if (!data.user) {
|
||||||
|
reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {});
|
||||||
|
}
|
||||||
|
for (const [name, id, relay] of event.tags) {
|
||||||
|
if (name === 'e' && !encounters.has(id)) {
|
||||||
|
reqmeister.req({ ids: [id] }, [relay]).catch(() => {});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Delete unattached media entries that are attached to the event. */
|
/** Delete unattached media entries that are attached to the event. */
|
||||||
function processMedia({ tags, pubkey }: Event, { user }: EventData) {
|
function processMedia({ tags, pubkey }: Event, { user }: EventData) {
|
||||||
if (user) {
|
if (user) {
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ import * as eventsDB from '@/db/events.ts';
|
||||||
import { type Event, findReplyTag } from '@/deps.ts';
|
import { type Event, findReplyTag } from '@/deps.ts';
|
||||||
import { type DittoFilter, type Relation } from '@/filter.ts';
|
import { type DittoFilter, type Relation } from '@/filter.ts';
|
||||||
import * as mixer from '@/mixer.ts';
|
import * as mixer from '@/mixer.ts';
|
||||||
|
import { reqmeister } from '@/common.ts';
|
||||||
|
|
||||||
interface GetEventOpts<K extends number> {
|
interface GetEventOpts<K extends number> {
|
||||||
/** Signal to abort the request. */
|
/** Signal to abort the request. */
|
||||||
|
|
@ -30,10 +31,10 @@ const getEvent = async <K extends number = number>(
|
||||||
const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Event<0> | undefined> => {
|
const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Event<0> | undefined> => {
|
||||||
const { relations, signal = AbortSignal.timeout(1000) } = opts;
|
const { relations, signal = AbortSignal.timeout(1000) } = opts;
|
||||||
|
|
||||||
const [event] = await mixer.getFilters(
|
const event = await eventsDB.getFilters(
|
||||||
[{ authors: [pubkey], relations, kinds: [0], limit: 1 }],
|
[{ authors: [pubkey], relations, kinds: [0], limit: 1 }],
|
||||||
{ limit: 1, signal },
|
{ limit: 1, signal },
|
||||||
);
|
).then(([event]) => event) || await reqmeister.req({ kinds: [0], authors: [pubkey] }).catch(() => {});
|
||||||
|
|
||||||
return event;
|
return event;
|
||||||
};
|
};
|
||||||
|
|
|
||||||
89
src/reqmeister.ts
Normal file
89
src/reqmeister.ts
Normal file
|
|
@ -0,0 +1,89 @@
|
||||||
|
import * as client from '@/client.ts';
|
||||||
|
import { type Event, EventEmitter, type Filter } from '@/deps.ts';
|
||||||
|
|
||||||
|
import { eventToMicroFilter, getFilterId, type MicroFilter } from '@/filter.ts';
|
||||||
|
|
||||||
|
interface ReqmeisterOpts {
|
||||||
|
delay?: number;
|
||||||
|
signal?: AbortSignal;
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]];
|
||||||
|
|
||||||
|
/** Batches requests to Nostr relays using microfilters. */
|
||||||
|
class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> {
|
||||||
|
#opts: ReqmeisterOpts;
|
||||||
|
#queue: ReqmeisterQueueItem[] = [];
|
||||||
|
#promise!: Promise<void>;
|
||||||
|
#resolve!: () => void;
|
||||||
|
|
||||||
|
constructor(opts: ReqmeisterOpts = {}) {
|
||||||
|
super();
|
||||||
|
this.#opts = opts;
|
||||||
|
this.#cycle();
|
||||||
|
this.#perform();
|
||||||
|
}
|
||||||
|
|
||||||
|
#cycle() {
|
||||||
|
this.#resolve?.();
|
||||||
|
this.#promise = new Promise((resolve) => {
|
||||||
|
this.#resolve = resolve;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async #perform() {
|
||||||
|
const { delay } = this.#opts;
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||||
|
|
||||||
|
const queue = this.#queue;
|
||||||
|
this.#queue = [];
|
||||||
|
|
||||||
|
const wantedEvents = new Set<Event['id']>();
|
||||||
|
const wantedAuthors = new Set<Event['pubkey']>();
|
||||||
|
|
||||||
|
// TODO: batch by relays.
|
||||||
|
for (const [_filterId, filter, _relays] of queue) {
|
||||||
|
if ('ids' in filter) {
|
||||||
|
filter.ids.forEach((id) => wantedEvents.add(id));
|
||||||
|
} else {
|
||||||
|
wantedAuthors.add(filter.authors[0]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const filters: Filter[] = [];
|
||||||
|
|
||||||
|
if (wantedEvents.size) filters.push({ ids: [...wantedEvents] });
|
||||||
|
if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] });
|
||||||
|
|
||||||
|
const events = await client.getFilters(filters, { signal: this.#opts.signal });
|
||||||
|
|
||||||
|
for (const event of events) {
|
||||||
|
this.encounter(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.#cycle();
|
||||||
|
this.#perform();
|
||||||
|
}
|
||||||
|
|
||||||
|
req(filter: MicroFilter, relays: WebSocket['url'][] = []): Promise<Event> {
|
||||||
|
const filterId = getFilterId(filter);
|
||||||
|
this.#queue.push([filterId, filter, relays]);
|
||||||
|
return new Promise<Event>((resolve, reject) => {
|
||||||
|
this.once(filterId, resolve);
|
||||||
|
this.#promise.finally(reject);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
encounter(event: Event): void {
|
||||||
|
const filterId = getFilterId(eventToMicroFilter(event));
|
||||||
|
this.#queue = this.#queue.filter(([id]) => id !== filterId);
|
||||||
|
this.emit(filterId, event);
|
||||||
|
}
|
||||||
|
|
||||||
|
isWanted(event: Event): boolean {
|
||||||
|
const filterId = getFilterId(eventToMicroFilter(event));
|
||||||
|
return this.#queue.some(([id]) => id === filterId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export { Reqmeister };
|
||||||
307
tringifyStable from npm:fast-stable-stringify
Normal file
307
tringifyStable from npm:fast-stable-stringify
Normal file
|
|
@ -0,0 +1,307 @@
|
||||||
|
[1mdiff --git a/src/client.ts b/src/client.ts[m
|
||||||
|
[1mindex 970a077..3cf2e8a 100644[m
|
||||||
|
[1m--- a/src/client.ts[m
|
||||||
|
[1m+++ b/src/client.ts[m
|
||||||
|
[36m@@ -14,7 +14,7 @@[m [mfunction getFilters<K extends number>(filters: Filter<K>[], opts: GetFiltersOpts[m
|
||||||
|
[m
|
||||||
|
const unsub = pool.subscribe([m
|
||||||
|
filters,[m
|
||||||
|
[31m- activeRelays,[m
|
||||||
|
[32m+[m[32m opts.relays ?? activeRelays,[m
|
||||||
|
(event: Event | null) => {[m
|
||||||
|
if (event && matchFilters(filters, event)) {[m
|
||||||
|
pipeline.handleEvent(event).catch(() => {});[m
|
||||||
|
[1mdiff --git a/src/common.ts b/src/common.ts[m
|
||||||
|
[1mnew file mode 100644[m
|
||||||
|
[1mindex 0000000..0424b52[m
|
||||||
|
[1m--- /dev/null[m
|
||||||
|
[1m+++ b/src/common.ts[m
|
||||||
|
[36m@@ -0,0 +1,9 @@[m
|
||||||
|
[32m+[m[32mimport { Reqmeister } from '@/reqmeister.ts';[m
|
||||||
|
[32m+[m[32mimport { Time } from '@/utils/time.ts';[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32mconst reqmeister = new Reqmeister({[m
|
||||||
|
[32m+[m[32m delay: Time.seconds(1),[m
|
||||||
|
[32m+[m[32m signal: AbortSignal.timeout(Time.seconds(1)),[m
|
||||||
|
[32m+[m[32m});[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32mexport { reqmeister };[m
|
||||||
|
[1mdiff --git a/src/deps.ts b/src/deps.ts[m
|
||||||
|
[1mindex b9db9e2..4a9314b 100644[m
|
||||||
|
[1m--- a/src/deps.ts[m
|
||||||
|
[1m+++ b/src/deps.ts[m
|
||||||
|
[36m@@ -81,5 +81,7 @@[m [mexport { Machina } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/08a1[m
|
||||||
|
export * as Sentry from 'https://deno.land/x/sentry@7.78.0/index.js';[m
|
||||||
|
export { sentry as sentryMiddleware } from 'npm:@hono/sentry@^1.0.0';[m
|
||||||
|
export * as Comlink from 'npm:comlink@^4.4.1';[m
|
||||||
|
[32m+[m[32mexport { EventEmitter } from 'npm:tseep@^1.1.3';[m
|
||||||
|
[32m+[m[32mexport { default as stringifyStable } from 'npm:fast-stable-stringify@^1.0.0';[m
|
||||||
|
[m
|
||||||
|
export type * as TypeFest from 'npm:type-fest@^4.3.0';[m
|
||||||
|
[1mdiff --git a/src/filter.ts b/src/filter.ts[m
|
||||||
|
[1mindex fb43251..38fcff7 100644[m
|
||||||
|
[1m--- a/src/filter.ts[m
|
||||||
|
[1m+++ b/src/filter.ts[m
|
||||||
|
[36m@@ -1,5 +1,5 @@[m
|
||||||
|
import { Conf } from '@/config.ts';[m
|
||||||
|
[31m-import { type Event, type Filter, matchFilters } from '@/deps.ts';[m
|
||||||
|
[32m+[m[32mimport { type Event, type Filter, matchFilters, stringifyStable } from '@/deps.ts';[m
|
||||||
|
[m
|
||||||
|
import type { EventData } from '@/types.ts';[m
|
||||||
|
[m
|
||||||
|
[36m@@ -14,12 +14,17 @@[m [minterface DittoFilter<K extends number = number> extends Filter<K> {[m
|
||||||
|
relations?: Relation[];[m
|
||||||
|
}[m
|
||||||
|
[m
|
||||||
|
[32m+[m[32m/** Filter to get one specific event. */[m
|
||||||
|
[32m+[m[32mtype MicroFilter = { ids: [Event['id']] } | { kinds: [0]; authors: [Event['pubkey']] };[m
|
||||||
|
[32m+[m
|
||||||
|
/** Additional options to apply to the whole subscription. */[m
|
||||||
|
interface GetFiltersOpts {[m
|
||||||
|
/** Signal to abort the request. */[m
|
||||||
|
signal?: AbortSignal;[m
|
||||||
|
/** Event limit for the whole subscription. */[m
|
||||||
|
limit?: number;[m
|
||||||
|
[32m+[m[32m /** Relays to use, if applicable. */[m
|
||||||
|
[32m+[m[32m relays?: WebSocket['url'][];[m
|
||||||
|
}[m
|
||||||
|
[m
|
||||||
|
function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean {[m
|
||||||
|
[36m@@ -44,4 +49,33 @@[m [mfunction matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData[m
|
||||||
|
return false;[m
|
||||||
|
}[m
|
||||||
|
[m
|
||||||
|
[31m-export { type DittoFilter, type GetFiltersOpts, matchDittoFilters, type Relation };[m
|
||||||
|
[32m+[m[32m/** Get deterministic ID for a microfilter. */[m
|
||||||
|
[32m+[m[32mfunction getFilterId(filter: MicroFilter): string {[m
|
||||||
|
[32m+[m[32m if ('ids' in filter) {[m
|
||||||
|
[32m+[m[32m return stringifyStable({ ids: [filter.ids] });[m
|
||||||
|
[32m+[m[32m } else {[m
|
||||||
|
[32m+[m[32m return stringifyStable({[m
|
||||||
|
[32m+[m[32m kinds: [filter.kinds[0]],[m
|
||||||
|
[32m+[m[32m authors: [filter.authors[0]],[m
|
||||||
|
[32m+[m[32m });[m
|
||||||
|
[32m+[m[32m }[m
|
||||||
|
[32m+[m[32m}[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32m/** Get a microfilter from a Nostr event. */[m
|
||||||
|
[32m+[m[32mfunction eventToMicroFilter(event: Event): MicroFilter {[m
|
||||||
|
[32m+[m[32m if (event.kind === 0) {[m
|
||||||
|
[32m+[m[32m return { kinds: [0], authors: [event.pubkey] };[m
|
||||||
|
[32m+[m[32m } else {[m
|
||||||
|
[32m+[m[32m return { ids: [event.id] };[m
|
||||||
|
[32m+[m[32m }[m
|
||||||
|
[32m+[m[32m}[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32mexport {[m
|
||||||
|
[32m+[m[32m type DittoFilter,[m
|
||||||
|
[32m+[m[32m eventToMicroFilter,[m
|
||||||
|
[32m+[m[32m getFilterId,[m
|
||||||
|
[32m+[m[32m type GetFiltersOpts,[m
|
||||||
|
[32m+[m[32m matchDittoFilters,[m
|
||||||
|
[32m+[m[32m type MicroFilter,[m
|
||||||
|
[32m+[m[32m type Relation,[m
|
||||||
|
[32m+[m[32m};[m
|
||||||
|
[1mdiff --git a/src/pipeline.ts b/src/pipeline.ts[m
|
||||||
|
[1mindex adf8a84..923bf4e 100644[m
|
||||||
|
[1m--- a/src/pipeline.ts[m
|
||||||
|
[1m+++ b/src/pipeline.ts[m
|
||||||
|
[36m@@ -1,3 +1,4 @@[m
|
||||||
|
[32m+[m[32mimport { reqmeister } from '@/common.ts';[m
|
||||||
|
import { Conf } from '@/config.ts';[m
|
||||||
|
import * as eventsDB from '@/db/events.ts';[m
|
||||||
|
import { addRelays } from '@/db/relays.ts';[m
|
||||||
|
[36m@@ -23,15 +24,17 @@[m [mimport type { EventData } from '@/types.ts';[m
|
||||||
|
*/[m
|
||||||
|
async function handleEvent(event: Event): Promise<void> {[m
|
||||||
|
if (!(await verifySignatureWorker(event))) return;[m
|
||||||
|
[32m+[m[32m const wanted = reqmeister.isWanted(event);[m
|
||||||
|
if (encounterEvent(event)) return;[m
|
||||||
|
console.info(`pipeline: Event<${event.kind}> ${event.id}`);[m
|
||||||
|
const data = await getEventData(event);[m
|
||||||
|
[m
|
||||||
|
await Promise.all([[m
|
||||||
|
[31m- storeEvent(event, data),[m
|
||||||
|
[32m+[m[32m storeEvent(event, data, { force: wanted }),[m
|
||||||
|
processDeletions(event),[m
|
||||||
|
trackRelays(event),[m
|
||||||
|
trackHashtags(event),[m
|
||||||
|
[32m+[m[32m fetchRelatedEvents(event, data),[m
|
||||||
|
processMedia(event, data),[m
|
||||||
|
streamOut(event, data),[m
|
||||||
|
broadcast(event, data),[m
|
||||||
|
[36m@@ -39,13 +42,14 @@[m [masync function handleEvent(event: Event): Promise<void> {[m
|
||||||
|
}[m
|
||||||
|
[m
|
||||||
|
/** Tracks encountered events to skip duplicates, improving idempotency and performance. */[m
|
||||||
|
[31m-const encounters = new LRUCache<string, boolean>({ max: 1000 });[m
|
||||||
|
[32m+[m[32mconst encounters = new LRUCache<Event['id'], true>({ max: 1000 });[m
|
||||||
|
[m
|
||||||
|
/** Encounter the event, and return whether it has already been encountered. */[m
|
||||||
|
[31m-function encounterEvent(event: Event) {[m
|
||||||
|
[32m+[m[32mfunction encounterEvent(event: Event): boolean {[m
|
||||||
|
const result = encounters.get(event.id);[m
|
||||||
|
encounters.set(event.id, true);[m
|
||||||
|
[31m- return result;[m
|
||||||
|
[32m+[m[32m reqmeister.encounter(event);[m
|
||||||
|
[32m+[m[32m return !!result;[m
|
||||||
|
}[m
|
||||||
|
[m
|
||||||
|
/** Preload data that will be useful to several tasks. */[m
|
||||||
|
[36m@@ -57,11 +61,16 @@[m [masync function getEventData({ pubkey }: Event): Promise<EventData> {[m
|
||||||
|
/** Check if the pubkey is the `DITTO_NSEC` pubkey. */[m
|
||||||
|
const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey;[m
|
||||||
|
[m
|
||||||
|
[32m+[m[32minterface StoreEventOpts {[m
|
||||||
|
[32m+[m[32m force?: boolean;[m
|
||||||
|
[32m+[m[32m}[m
|
||||||
|
[32m+[m
|
||||||
|
/** Maybe store the event, if eligible. */[m
|
||||||
|
[31m-async function storeEvent(event: Event, data: EventData): Promise<void> {[m
|
||||||
|
[32m+[m[32masync function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = {}): Promise<void> {[m
|
||||||
|
if (isEphemeralKind(event.kind)) return;[m
|
||||||
|
[32m+[m[32m const { force = false } = opts;[m
|
||||||
|
[m
|
||||||
|
[31m- if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {[m
|
||||||
|
[32m+[m[32m if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) {[m
|
||||||
|
const [deletion] = await mixer.getFilters([m
|
||||||
|
[{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }],[m
|
||||||
|
{ limit: 1, signal: AbortSignal.timeout(Time.seconds(1)) },[m
|
||||||
|
[36m@@ -129,6 +138,18 @@[m [mfunction trackRelays(event: Event) {[m
|
||||||
|
return addRelays([...relays]);[m
|
||||||
|
}[m
|
||||||
|
[m
|
||||||
|
[32m+[m[32m/** Queue related events to fetch. */[m
|
||||||
|
[32m+[m[32mfunction fetchRelatedEvents(event: Event, data: EventData) {[m
|
||||||
|
[32m+[m[32m if (!data.user) {[m
|
||||||
|
[32m+[m[32m reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {});[m
|
||||||
|
[32m+[m[32m }[m
|
||||||
|
[32m+[m[32m for (const [name, id, relay] of event.tags) {[m
|
||||||
|
[32m+[m[32m if (name === 'e' && !encounters.has(id)) {[m
|
||||||
|
[32m+[m[32m reqmeister.req({ ids: [id] }, [relay]).catch(() => {});[m
|
||||||
|
[32m+[m[32m }[m
|
||||||
|
[32m+[m[32m }[m
|
||||||
|
[32m+[m[32m}[m
|
||||||
|
[32m+[m
|
||||||
|
/** Delete unattached media entries that are attached to the event. */[m
|
||||||
|
function processMedia({ tags, pubkey }: Event, { user }: EventData) {[m
|
||||||
|
if (user) {[m
|
||||||
|
[1mdiff --git a/src/queries.ts b/src/queries.ts[m
|
||||||
|
[1mindex fc7365a..1ecff7b 100644[m
|
||||||
|
[1m--- a/src/queries.ts[m
|
||||||
|
[1m+++ b/src/queries.ts[m
|
||||||
|
[36m@@ -2,6 +2,7 @@[m [mimport * as eventsDB from '@/db/events.ts';[m
|
||||||
|
import { type Event, findReplyTag } from '@/deps.ts';[m
|
||||||
|
import { type DittoFilter, type Relation } from '@/filter.ts';[m
|
||||||
|
import * as mixer from '@/mixer.ts';[m
|
||||||
|
[32m+[m[32mimport { reqmeister } from '@/common.ts';[m
|
||||||
|
[m
|
||||||
|
interface GetEventOpts<K extends number> {[m
|
||||||
|
/** Signal to abort the request. */[m
|
||||||
|
[36m@@ -30,10 +31,10 @@[m [mconst getEvent = async <K extends number = number>([m
|
||||||
|
const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise<Event<0> | undefined> => {[m
|
||||||
|
const { relations, signal = AbortSignal.timeout(1000) } = opts;[m
|
||||||
|
[m
|
||||||
|
[31m- const [event] = await mixer.getFilters([m
|
||||||
|
[32m+[m[32m const event = await eventsDB.getFilters([m
|
||||||
|
[{ authors: [pubkey], relations, kinds: [0], limit: 1 }],[m
|
||||||
|
{ limit: 1, signal },[m
|
||||||
|
[31m- );[m
|
||||||
|
[32m+[m[32m ).then(([event]) => event) || await reqmeister.req({ kinds: [0], authors: [pubkey] }).catch(() => {});[m
|
||||||
|
[m
|
||||||
|
return event;[m
|
||||||
|
};[m
|
||||||
|
[1mdiff --git a/src/reqmeister.ts b/src/reqmeister.ts[m
|
||||||
|
[1mnew file mode 100644[m
|
||||||
|
[1mindex 0000000..960151f[m
|
||||||
|
[1m--- /dev/null[m
|
||||||
|
[1m+++ b/src/reqmeister.ts[m
|
||||||
|
[36m@@ -0,0 +1,88 @@[m
|
||||||
|
[32m+[m[32mimport * as client from '@/client.ts';[m
|
||||||
|
[32m+[m[32mimport { type Event, EventEmitter, type Filter } from '@/deps.ts';[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32mimport { eventToMicroFilter, getFilterId, type MicroFilter } from '@/filter.ts';[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32minterface ReqmeisterOpts {[m
|
||||||
|
[32m+[m[32m delay?: number;[m
|
||||||
|
[32m+[m[32m signal?: AbortSignal;[m
|
||||||
|
[32m+[m[32m}[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32mtype ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]];[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32mclass Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> {[m
|
||||||
|
[32m+[m[32m #opts: ReqmeisterOpts;[m
|
||||||
|
[32m+[m[32m #queue: ReqmeisterQueueItem[] = [];[m
|
||||||
|
[32m+[m[32m #promise!: Promise<void>;[m
|
||||||
|
[32m+[m[32m #resolve!: () => void;[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32m constructor(opts: ReqmeisterOpts = {}) {[m
|
||||||
|
[32m+[m[32m super();[m
|
||||||
|
[32m+[m[32m this.#opts = opts;[m
|
||||||
|
[32m+[m[32m this.#cycle();[m
|
||||||
|
[32m+[m[32m this.#perform();[m
|
||||||
|
[32m+[m[32m }[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32m #cycle() {[m
|
||||||
|
[32m+[m[32m this.#resolve?.();[m
|
||||||
|
[32m+[m[32m this.#promise = new Promise((resolve) => {[m
|
||||||
|
[32m+[m[32m this.#resolve = resolve;[m
|
||||||
|
[32m+[m[32m });[m
|
||||||
|
[32m+[m[32m }[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32m async #perform() {[m
|
||||||
|
[32m+[m[32m const { delay } = this.#opts;[m
|
||||||
|
[32m+[m[32m await new Promise((resolve) => setTimeout(resolve, delay));[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32m const queue = this.#queue;[m
|
||||||
|
[32m+[m[32m this.#queue = [];[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32m const wantedEvents = new Set<Event['id']>();[m
|
||||||
|
[32m+[m[32m const wantedAuthors = new Set<Event['pubkey']>();[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32m // TODO: batch by relays.[m
|
||||||
|
[32m+[m[32m for (const [_filterId, filter, _relays] of queue) {[m
|
||||||
|
[32m+[m[32m if ('ids' in filter) {[m
|
||||||
|
[32m+[m[32m filter.ids.forEach((id) => wantedEvents.add(id));[m
|
||||||
|
[32m+[m[32m } else {[m
|
||||||
|
[32m+[m[32m wantedAuthors.add(filter.authors[0]);[m
|
||||||
|
[32m+[m[32m }[m
|
||||||
|
[32m+[m[32m }[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32m const filters: Filter[] = [];[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32m if (wantedEvents.size) filters.push({ ids: [...wantedEvents] });[m
|
||||||
|
[32m+[m[32m if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] });[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32m const events = await client.getFilters(filters, { signal: this.#opts.signal });[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32m for (const event of events) {[m
|
||||||
|
[32m+[m[32m this.encounter(event);[m
|
||||||
|
[32m+[m[32m }[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32m this.#cycle();[m
|
||||||
|
[32m+[m[32m this.#perform();[m
|
||||||
|
[32m+[m[32m }[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32m req(filter: MicroFilter, relays: WebSocket['url'][] = []): Promise<Event> {[m
|
||||||
|
[32m+[m[32m const filterId = getFilterId(filter);[m
|
||||||
|
[32m+[m[32m this.#queue.push([filterId, filter, relays]);[m
|
||||||
|
[32m+[m[32m return new Promise<Event>((resolve, reject) => {[m
|
||||||
|
[32m+[m[32m this.once(filterId, resolve);[m
|
||||||
|
[32m+[m[32m this.#promise.finally(reject);[m
|
||||||
|
[32m+[m[32m });[m
|
||||||
|
[32m+[m[32m }[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32m encounter(event: Event): void {[m
|
||||||
|
[32m+[m[32m const filterId = getFilterId(eventToMicroFilter(event));[m
|
||||||
|
[32m+[m[32m this.#queue = this.#queue.filter(([id]) => id !== filterId);[m
|
||||||
|
[32m+[m[32m this.emit(filterId, event);[m
|
||||||
|
[32m+[m[32m }[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32m isWanted(event: Event): boolean {[m
|
||||||
|
[32m+[m[32m const filterId = getFilterId(eventToMicroFilter(event));[m
|
||||||
|
[32m+[m[32m return this.#queue.some(([id]) => id === filterId);[m
|
||||||
|
[32m+[m[32m }[m
|
||||||
|
[32m+[m[32m}[m
|
||||||
|
[32m+[m
|
||||||
|
[32m+[m[32mexport { Reqmeister };[m
|
||||||
Loading…
Add table
Reference in a new issue