diff --git a/src/storages/InternalRelay.ts b/src/storages/InternalRelay.ts new file mode 100644 index 00000000..ac280451 --- /dev/null +++ b/src/storages/InternalRelay.ts @@ -0,0 +1,67 @@ +// deno-lint-ignore-file require-await +import { + Machina, + NIP50, + NostrEvent, + NostrFilter, + NostrRelayCLOSED, + NostrRelayEOSE, + NostrRelayEVENT, + NRelay, +} from '@nostrify/nostrify'; + +import { matchFilter } from '@/deps.ts'; +import { DittoEvent } from '@/interfaces/DittoEvent.ts'; + +/** + * PubSub event store for streaming events within the application. + * The pipeline should push events to it, then anything in the application can subscribe to it. + */ +export class InternalRelay implements NRelay { + private subs = new Map }>(); + + async *req(filters: NostrFilter[]): AsyncGenerator { + const id = crypto.randomUUID(); + const machina = new Machina(); + + yield ['EOSE', id]; + + this.subs.set(id, { filters, machina }); + + try { + for await (const event of machina) { + yield ['EVENT', id, event]; + } + } finally { + this.subs.delete(id); + } + } + + async event(event: DittoEvent): Promise { + for (const { filters, machina } of this.subs.values()) { + for (const filter of filters) { + if (matchFilter(filter, event)) { + if (filter.search) { + const tokens = NIP50.parseInput(filter.search); + + const domain = (tokens.find((t) => + typeof t === 'object' && t.key === 'domain' + ) as { key: 'domain'; value: string } | undefined)?.value; + + if (domain === event.author_domain) { + return machina.push(event); + } + } else { + return machina.push(event); + } + } + } + } + + return Promise.resolve(); + } + + async query(): Promise { + return []; + } +}