mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 11:29:46 +00:00
Consolidate EventsDB and InternalRelay
This commit is contained in:
parent
e2ce9d0dab
commit
a8b125162f
5 changed files with 170 additions and 190 deletions
|
|
@ -1,17 +0,0 @@
|
||||||
import { HTTPException } from '@hono/hono/http-exception';
|
|
||||||
|
|
||||||
import type { SetRequired } from 'type-fest';
|
|
||||||
import type { DittoEnv } from '../DittoEnv.ts';
|
|
||||||
import type { DittoMiddleware } from '../DittoMiddleware.ts';
|
|
||||||
|
|
||||||
type DittoVars = DittoEnv['Variables'];
|
|
||||||
|
|
||||||
export function requireVar<K extends keyof DittoVars>(key: K): DittoMiddleware<SetRequired<DittoVars, K>> {
|
|
||||||
return (c, next) => {
|
|
||||||
if (!c.var[key]) {
|
|
||||||
throw new HTTPException(500, { message: `Missing required variable: ${key}` });
|
|
||||||
}
|
|
||||||
|
|
||||||
return next();
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
@ -7,6 +7,26 @@ import { eventFixture, genEvent } from '@/test.ts';
|
||||||
import { EventsDB } from '@/storages/EventsDB.ts';
|
import { EventsDB } from '@/storages/EventsDB.ts';
|
||||||
import { createTestDB } from '@/test.ts';
|
import { createTestDB } from '@/test.ts';
|
||||||
|
|
||||||
|
Deno.test('InternalRelay', async () => {
|
||||||
|
await using db = await createTestDB({ pure: true });
|
||||||
|
const { store: relay } = db;
|
||||||
|
|
||||||
|
const event1 = await eventFixture('event-1');
|
||||||
|
|
||||||
|
const promise = new Promise((resolve) => setTimeout(() => resolve(relay.event(event1)), 0));
|
||||||
|
|
||||||
|
for await (const msg of relay.req([{}])) {
|
||||||
|
if (msg[0] === 'EVENT') {
|
||||||
|
assertEquals(relay.subs.size, 1);
|
||||||
|
assertEquals(msg[2], event1);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await promise;
|
||||||
|
assertEquals(relay.subs.size, 0); // cleanup
|
||||||
|
});
|
||||||
|
|
||||||
Deno.test('count filters', async () => {
|
Deno.test('count filters', async () => {
|
||||||
await using db = await createTestDB({ pure: true });
|
await using db = await createTestDB({ pure: true });
|
||||||
const { store } = db;
|
const { store } = db;
|
||||||
|
|
|
||||||
|
|
@ -1,15 +1,28 @@
|
||||||
// deno-lint-ignore-file require-await
|
// deno-lint-ignore-file require-await
|
||||||
|
|
||||||
import { DittoTables } from '@ditto/db';
|
import { DittoEvent } from '@ditto/api';
|
||||||
|
import { DittoDatabase, DittoTables } from '@ditto/db';
|
||||||
|
import { detectLanguage } from '@ditto/lang';
|
||||||
import { NPostgres, NPostgresSchema } from '@nostrify/db';
|
import { NPostgres, NPostgresSchema } from '@nostrify/db';
|
||||||
import { dbEventsCounter } from '@ditto/metrics';
|
import { dbEventsCounter, internalSubscriptionsSizeGauge } from '@ditto/metrics';
|
||||||
import { NIP50, NKinds, NostrEvent, NostrFilter, NSchema as n } from '@nostrify/nostrify';
|
import {
|
||||||
|
NIP50,
|
||||||
|
NKinds,
|
||||||
|
NostrEvent,
|
||||||
|
NostrFilter,
|
||||||
|
NostrRelayCLOSED,
|
||||||
|
NostrRelayEOSE,
|
||||||
|
NostrRelayEVENT,
|
||||||
|
NSchema as n,
|
||||||
|
} from '@nostrify/nostrify';
|
||||||
|
import { Machina } from '@nostrify/nostrify/utils';
|
||||||
import { logi } from '@soapbox/logi';
|
import { logi } from '@soapbox/logi';
|
||||||
import { JsonValue } from '@std/json';
|
import { JsonValue } from '@std/json';
|
||||||
import { LanguageCode } from 'iso-639-1';
|
import { LanguageCode } from 'iso-639-1';
|
||||||
import { Kysely } from 'kysely';
|
import { Kysely } from 'kysely';
|
||||||
import linkify from 'linkifyjs';
|
import linkify from 'linkifyjs';
|
||||||
import { nip27 } from 'nostr-tools';
|
import { LRUCache } from 'lru-cache';
|
||||||
|
import { matchFilter, nip27 } from 'nostr-tools';
|
||||||
import tldts from 'tldts';
|
import tldts from 'tldts';
|
||||||
import { z } from 'zod';
|
import { z } from 'zod';
|
||||||
|
|
||||||
|
|
@ -17,8 +30,6 @@ import { RelayError } from '@/RelayError.ts';
|
||||||
import { isNostrId } from '@/utils.ts';
|
import { isNostrId } from '@/utils.ts';
|
||||||
import { abortError } from '../../utils/abort.ts';
|
import { abortError } from '../../utils/abort.ts';
|
||||||
import { purifyEvent } from '../../utils/purify.ts';
|
import { purifyEvent } from '../../utils/purify.ts';
|
||||||
import { DittoEvent } from '@/interfaces/DittoEvent.ts';
|
|
||||||
import { detectLanguage } from '../../utils/language.ts';
|
|
||||||
import { getMediaLinks } from '../../utils/note.ts';
|
import { getMediaLinks } from '../../utils/note.ts';
|
||||||
|
|
||||||
/** Function to decide whether or not to index a tag. */
|
/** Function to decide whether or not to index a tag. */
|
||||||
|
|
@ -37,30 +48,35 @@ interface TagConditionOpts {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Options for the EventsDB store. */
|
/** Options for the EventsDB store. */
|
||||||
interface EventsDBOpts {
|
interface DittoPgStoreOpts {
|
||||||
/** Kysely instance to use. */
|
/** Kysely instance to use. */
|
||||||
kysely: Kysely<DittoTables>;
|
db: DittoDatabase;
|
||||||
/** Pubkey of the admin account. */
|
/** Pubkey of the admin account. */
|
||||||
pubkey: string;
|
pubkey: string;
|
||||||
/** Timeout in milliseconds for database queries. */
|
/** Timeout in milliseconds for database queries. */
|
||||||
timeout: number;
|
timeout: number;
|
||||||
/** Whether the event returned should be a Nostr event or a Ditto event. Defaults to false. */
|
/** Whether the event returned should be a Nostr event or a Ditto event. Defaults to false. */
|
||||||
pure?: boolean;
|
pure?: boolean;
|
||||||
|
/** Chunk size for streaming events. Defaults to 100. */
|
||||||
|
chunkSize?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** SQL database storage adapter for Nostr events. */
|
/** SQL database storage adapter for Nostr events. */
|
||||||
class EventsDB extends NPostgres {
|
export class DittoPgStore extends NPostgres {
|
||||||
|
readonly subs = new Map<string, { filters: NostrFilter[]; machina: Machina<NostrEvent> }>();
|
||||||
|
readonly encounters = new LRUCache<string, boolean>({ max: 1000 });
|
||||||
|
|
||||||
/** Conditions for when to index certain tags. */
|
/** Conditions for when to index certain tags. */
|
||||||
static tagConditions: Record<string, TagCondition> = {
|
static tagConditions: Record<string, TagCondition> = {
|
||||||
'a': ({ count }) => count < 15,
|
'a': ({ count }) => count < 15,
|
||||||
'd': ({ event, count }) => count === 0 && NKinds.parameterizedReplaceable(event.kind),
|
'd': ({ event, count }) => count === 0 && NKinds.parameterizedReplaceable(event.kind),
|
||||||
'e': EventsDB.eTagCondition,
|
'e': DittoPgStore.eTagCondition,
|
||||||
'k': ({ count, value }) => count === 0 && Number.isInteger(Number(value)),
|
'k': ({ count, value }) => count === 0 && Number.isInteger(Number(value)),
|
||||||
'L': ({ event, count }) => event.kind === 1985 || count === 0,
|
'L': ({ event, count }) => event.kind === 1985 || count === 0,
|
||||||
'l': ({ event, count }) => event.kind === 1985 || count === 0,
|
'l': ({ event, count }) => event.kind === 1985 || count === 0,
|
||||||
'n': ({ count, value }) => count < 50 && value.length < 50,
|
'n': ({ count, value }) => count < 50 && value.length < 50,
|
||||||
'P': ({ count, value }) => count === 0 && isNostrId(value),
|
'P': ({ count, value }) => count === 0 && isNostrId(value),
|
||||||
'p': EventsDB.pTagCondition,
|
'p': DittoPgStore.pTagCondition,
|
||||||
'proxy': ({ count, value }) => count === 0 && value.length < 256,
|
'proxy': ({ count, value }) => count === 0 && value.length < 256,
|
||||||
'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value),
|
'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value),
|
||||||
'r': ({ event, count }) => (event.kind === 1985 ? count < 20 : count < 3),
|
'r': ({ event, count }) => (event.kind === 1985 ? count < 20 : count < 3),
|
||||||
|
|
@ -72,65 +88,48 @@ class EventsDB extends NPostgres {
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
static indexExtensions(event: NostrEvent): Record<string, string> {
|
constructor(private opts: DittoPgStoreOpts) {
|
||||||
const ext: Record<string, string> = {};
|
super(opts.db.kysely, {
|
||||||
|
indexTags: DittoPgStore.indexTags,
|
||||||
|
indexSearch: DittoPgStore.searchText,
|
||||||
|
indexExtensions: DittoPgStore.indexExtensions,
|
||||||
|
chunkSize: opts.chunkSize,
|
||||||
|
});
|
||||||
|
|
||||||
if (event.kind === 1) {
|
opts.db.listen('nostr_event', async (id) => {
|
||||||
ext.reply = event.tags.some(([name]) => name === 'e').toString();
|
if (this.encounters.has(id)) return;
|
||||||
} else if (event.kind === 1111) {
|
this.encounters.set(id, true);
|
||||||
ext.reply = event.tags.some(([name]) => ['e', 'E'].includes(name)).toString();
|
|
||||||
} else if (event.kind === 6) {
|
|
||||||
ext.reply = 'false';
|
|
||||||
}
|
|
||||||
|
|
||||||
if ([1, 20, 30023].includes(event.kind)) {
|
const [event] = await this.query([{ ids: [id] }]);
|
||||||
const language = detectLanguage(event.content, 0.90);
|
|
||||||
|
|
||||||
if (language) {
|
if (event) {
|
||||||
ext.language = language;
|
this.streamOut(event);
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
|
|
||||||
const imeta: string[][][] = event.tags
|
|
||||||
.filter(([name]) => name === 'imeta')
|
|
||||||
.map(([_, ...entries]) =>
|
|
||||||
entries.map((entry) => {
|
|
||||||
const split = entry.split(' ');
|
|
||||||
return [split[0], split.splice(1).join(' ')];
|
|
||||||
})
|
|
||||||
);
|
|
||||||
|
|
||||||
// quirks mode
|
|
||||||
if (!imeta.length && event.kind === 1) {
|
|
||||||
const links = linkify.find(event.content).filter(({ type }) => type === 'url');
|
|
||||||
imeta.push(...getMediaLinks(links));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (imeta.length) {
|
|
||||||
ext.media = 'true';
|
|
||||||
|
|
||||||
if (imeta.every((tags) => tags.some(([name, value]) => name === 'm' && value.startsWith('video/')))) {
|
|
||||||
ext.video = 'true';
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ext.protocol = event.tags.find(([name]) => name === 'proxy')?.[2] ?? 'nostr';
|
|
||||||
|
|
||||||
return ext;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
constructor(private opts: EventsDBOpts) {
|
protected matchesFilter(event: NostrEvent, filter: NostrFilter): boolean {
|
||||||
super(opts.kysely, {
|
// TODO: support streaming by search.
|
||||||
indexTags: EventsDB.indexTags,
|
return matchFilter(filter, event) && filter.search === undefined;
|
||||||
indexSearch: EventsDB.searchText,
|
}
|
||||||
indexExtensions: EventsDB.indexExtensions,
|
|
||||||
});
|
streamOut(event: NostrEvent): void {
|
||||||
|
for (const { filters, machina } of this.subs.values()) {
|
||||||
|
for (const filter of filters) {
|
||||||
|
if (this.matchesFilter(event, filter)) {
|
||||||
|
machina.push(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Insert an event (and its tags) into the database. */
|
/** Insert an event (and its tags) into the database. */
|
||||||
override async event(event: NostrEvent, opts: { signal?: AbortSignal; timeout?: number } = {}): Promise<void> {
|
override async event(event: NostrEvent, opts: { signal?: AbortSignal; timeout?: number } = {}): Promise<void> {
|
||||||
event = purifyEvent(event);
|
event = purifyEvent(event);
|
||||||
|
|
||||||
logi({ level: 'debug', ns: 'ditto.event', source: 'db', id: event.id, kind: event.kind });
|
logi({ level: 'debug', ns: 'ditto.event', source: 'db', id: event.id, kind: event.kind });
|
||||||
|
|
||||||
|
this.encounters.set(event.id, true);
|
||||||
dbEventsCounter.inc({ kind: event.kind });
|
dbEventsCounter.inc({ kind: event.kind });
|
||||||
|
|
||||||
if (await this.isDeletedAdmin(event)) {
|
if (await this.isDeletedAdmin(event)) {
|
||||||
|
|
@ -141,6 +140,7 @@ class EventsDB extends NPostgres {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await super.event(event, { ...opts, timeout: opts.timeout ?? this.opts.timeout });
|
await super.event(event, { ...opts, timeout: opts.timeout ?? this.opts.timeout });
|
||||||
|
this.streamOut(event);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
if (e instanceof Error && e.message === 'Cannot add a deleted event') {
|
if (e instanceof Error && e.message === 'Cannot add a deleted event') {
|
||||||
throw new RelayError('blocked', 'event deleted by user');
|
throw new RelayError('blocked', 'event deleted by user');
|
||||||
|
|
@ -213,6 +213,47 @@ class EventsDB extends NPostgres {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override async *req(
|
||||||
|
filters: NostrFilter[],
|
||||||
|
opts?: { signal?: AbortSignal },
|
||||||
|
): AsyncIterable<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
|
||||||
|
const subId = crypto.randomUUID();
|
||||||
|
filters = this.normalizeFilters(filters);
|
||||||
|
|
||||||
|
if (filters.length) {
|
||||||
|
const { db, chunkSize = 100 } = this.opts;
|
||||||
|
const rows = this.getEventsQuery(db.kysely as unknown as Kysely<NPostgresSchema>, filters).stream(chunkSize);
|
||||||
|
|
||||||
|
for await (const row of rows) {
|
||||||
|
const event = this.parseEventRow(row);
|
||||||
|
yield ['EVENT', subId, event];
|
||||||
|
|
||||||
|
if (opts?.signal?.aborted) {
|
||||||
|
yield ['CLOSED', subId, 'aborted'];
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
yield ['EOSE', subId];
|
||||||
|
|
||||||
|
const machina = new Machina<DittoEvent>(opts?.signal);
|
||||||
|
|
||||||
|
this.subs.set(subId, { filters, machina });
|
||||||
|
internalSubscriptionsSizeGauge.set(this.subs.size);
|
||||||
|
|
||||||
|
try {
|
||||||
|
for await (const event of machina) {
|
||||||
|
yield ['EVENT', subId, event];
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
yield ['CLOSED', subId, 'error: something went wrong'];
|
||||||
|
} finally {
|
||||||
|
this.subs.delete(subId);
|
||||||
|
internalSubscriptionsSizeGauge.set(this.subs.size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Get events for filters from the database. */
|
/** Get events for filters from the database. */
|
||||||
override async query(
|
override async query(
|
||||||
filters: NostrFilter[],
|
filters: NostrFilter[],
|
||||||
|
|
@ -323,7 +364,7 @@ class EventsDB extends NPostgres {
|
||||||
|
|
||||||
return event.tags.reduce<string[][]>((results, tag, index) => {
|
return event.tags.reduce<string[][]>((results, tag, index) => {
|
||||||
const [name, value] = tag;
|
const [name, value] = tag;
|
||||||
const condition = EventsDB.tagConditions[name] as TagCondition | undefined;
|
const condition = DittoPgStore.tagConditions[name] as TagCondition | undefined;
|
||||||
|
|
||||||
if (value && condition && value.length < 200 && checkCondition(name, value, condition, index)) {
|
if (value && condition && value.length < 200 && checkCondition(name, value, condition, index)) {
|
||||||
results.push(tag);
|
results.push(tag);
|
||||||
|
|
@ -334,16 +375,63 @@ class EventsDB extends NPostgres {
|
||||||
}, []);
|
}, []);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static indexExtensions(event: NostrEvent): Record<string, string> {
|
||||||
|
const ext: Record<string, string> = {};
|
||||||
|
|
||||||
|
if (event.kind === 1) {
|
||||||
|
ext.reply = event.tags.some(([name]) => name === 'e').toString();
|
||||||
|
} else if (event.kind === 1111) {
|
||||||
|
ext.reply = event.tags.some(([name]) => ['e', 'E'].includes(name)).toString();
|
||||||
|
} else if (event.kind === 6) {
|
||||||
|
ext.reply = 'false';
|
||||||
|
}
|
||||||
|
|
||||||
|
if ([1, 20, 30023].includes(event.kind)) {
|
||||||
|
const language = detectLanguage(event.content, 0.90);
|
||||||
|
|
||||||
|
if (language) {
|
||||||
|
ext.language = language;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const imeta: string[][][] = event.tags
|
||||||
|
.filter(([name]) => name === 'imeta')
|
||||||
|
.map(([_, ...entries]) =>
|
||||||
|
entries.map((entry) => {
|
||||||
|
const split = entry.split(' ');
|
||||||
|
return [split[0], split.splice(1).join(' ')];
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
// quirks mode
|
||||||
|
if (!imeta.length && event.kind === 1) {
|
||||||
|
const links = linkify.find(event.content).filter(({ type }) => type === 'url');
|
||||||
|
imeta.push(...getMediaLinks(links));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (imeta.length) {
|
||||||
|
ext.media = 'true';
|
||||||
|
|
||||||
|
if (imeta.every((tags) => tags.some(([name, value]) => name === 'm' && value.startsWith('video/')))) {
|
||||||
|
ext.video = 'true';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ext.protocol = event.tags.find(([name]) => name === 'proxy')?.[2] ?? 'nostr';
|
||||||
|
|
||||||
|
return ext;
|
||||||
|
}
|
||||||
|
|
||||||
/** Build a search index from the event. */
|
/** Build a search index from the event. */
|
||||||
static searchText(event: NostrEvent): string {
|
static searchText(event: NostrEvent): string {
|
||||||
switch (event.kind) {
|
switch (event.kind) {
|
||||||
case 0:
|
case 0:
|
||||||
return EventsDB.buildUserSearchContent(event);
|
return DittoPgStore.buildUserSearchContent(event);
|
||||||
case 1:
|
case 1:
|
||||||
case 20:
|
case 20:
|
||||||
return nip27.replaceAll(event.content, () => '');
|
return nip27.replaceAll(event.content, () => '');
|
||||||
case 30009:
|
case 30009:
|
||||||
return EventsDB.buildTagsSearchContent(event.tags.filter(([t]) => t !== 'alt'));
|
return DittoPgStore.buildTagsSearchContent(event.tags.filter(([t]) => t !== 'alt'));
|
||||||
case 30360:
|
case 30360:
|
||||||
return event.tags.find(([name]) => name === 'd')?.[1] || '';
|
return event.tags.find(([name]) => name === 'd')?.[1] || '';
|
||||||
default:
|
default:
|
||||||
|
|
@ -385,7 +473,7 @@ class EventsDB extends NPostgres {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (domains.size || hostnames.size) {
|
if (domains.size || hostnames.size) {
|
||||||
let query = this.opts.kysely
|
let query = this.opts.db.kysely
|
||||||
.selectFrom('author_stats')
|
.selectFrom('author_stats')
|
||||||
.select('pubkey')
|
.select('pubkey')
|
||||||
.where((eb) => {
|
.where((eb) => {
|
||||||
|
|
@ -433,5 +521,3 @@ class EventsDB extends NPostgres {
|
||||||
return super.transaction((store, kysely) => callback(store, kysely as unknown as Kysely<DittoTables>));
|
return super.transaction((store, kysely) => callback(store, kysely as unknown as Kysely<DittoTables>));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export { EventsDB };
|
|
||||||
|
|
|
||||||
|
|
@ -1,23 +0,0 @@
|
||||||
import { assertEquals } from '@std/assert';
|
|
||||||
|
|
||||||
import { eventFixture } from '@/test.ts';
|
|
||||||
|
|
||||||
import { InternalRelay } from './InternalRelay.ts';
|
|
||||||
|
|
||||||
Deno.test('InternalRelay', async () => {
|
|
||||||
const relay = new InternalRelay();
|
|
||||||
const event1 = await eventFixture('event-1');
|
|
||||||
|
|
||||||
const promise = new Promise((resolve) => setTimeout(() => resolve(relay.event(event1)), 0));
|
|
||||||
|
|
||||||
for await (const msg of relay.req([{}])) {
|
|
||||||
if (msg[0] === 'EVENT') {
|
|
||||||
assertEquals(relay.subs.size, 1);
|
|
||||||
assertEquals(msg[2], event1);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
await promise;
|
|
||||||
assertEquals(relay.subs.size, 0); // cleanup
|
|
||||||
});
|
|
||||||
|
|
@ -1,86 +0,0 @@
|
||||||
// deno-lint-ignore-file require-await
|
|
||||||
import {
|
|
||||||
NIP50,
|
|
||||||
NostrEvent,
|
|
||||||
NostrFilter,
|
|
||||||
NostrRelayCLOSED,
|
|
||||||
NostrRelayEOSE,
|
|
||||||
NostrRelayEVENT,
|
|
||||||
NRelay,
|
|
||||||
} from '@nostrify/nostrify';
|
|
||||||
import { Machina } from '@nostrify/nostrify/utils';
|
|
||||||
import { matchFilter } from 'nostr-tools';
|
|
||||||
import { Gauge } from 'prom-client';
|
|
||||||
|
|
||||||
import { DittoEvent } from '@/interfaces/DittoEvent.ts';
|
|
||||||
import { purifyEvent } from '../../utils/purify.ts';
|
|
||||||
|
|
||||||
interface InternalRelayOpts {
|
|
||||||
gauge?: Gauge;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* PubSub event store for streaming events within the application.
|
|
||||||
* The pipeline should push events to it, then anything in the application can subscribe to it.
|
|
||||||
*/
|
|
||||||
export class InternalRelay implements NRelay {
|
|
||||||
readonly subs = new Map<string, { filters: NostrFilter[]; machina: Machina<NostrEvent> }>();
|
|
||||||
|
|
||||||
constructor(private opts: InternalRelayOpts = {}) {}
|
|
||||||
|
|
||||||
async *req(
|
|
||||||
filters: NostrFilter[],
|
|
||||||
opts?: { signal?: AbortSignal },
|
|
||||||
): AsyncGenerator<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
|
|
||||||
const id = crypto.randomUUID();
|
|
||||||
const machina = new Machina<NostrEvent>(opts?.signal);
|
|
||||||
|
|
||||||
yield ['EOSE', id];
|
|
||||||
|
|
||||||
this.subs.set(id, { filters, machina });
|
|
||||||
this.opts.gauge?.set(this.subs.size);
|
|
||||||
|
|
||||||
try {
|
|
||||||
for await (const event of machina) {
|
|
||||||
yield ['EVENT', id, event];
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
this.subs.delete(id);
|
|
||||||
this.opts.gauge?.set(this.subs.size);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async event(event: DittoEvent): Promise<void> {
|
|
||||||
for (const { filters, machina } of this.subs.values()) {
|
|
||||||
for (const filter of filters) {
|
|
||||||
if (matchFilter(filter, event)) {
|
|
||||||
if (filter.search) {
|
|
||||||
const tokens = NIP50.parseInput(filter.search);
|
|
||||||
|
|
||||||
const domain = (tokens.find((t) =>
|
|
||||||
typeof t === 'object' && t.key === 'domain'
|
|
||||||
) as { key: 'domain'; value: string } | undefined)?.value;
|
|
||||||
|
|
||||||
if (domain === event.author_stats?.nip05_hostname) {
|
|
||||||
machina.push(purifyEvent(event));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
machina.push(purifyEvent(event));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return Promise.resolve();
|
|
||||||
}
|
|
||||||
|
|
||||||
async query(): Promise<NostrEvent[]> {
|
|
||||||
return [];
|
|
||||||
}
|
|
||||||
|
|
||||||
async close(): Promise<void> {
|
|
||||||
return Promise.resolve();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
Add table
Reference in a new issue