From f33ad040841581951bb32decf456f84643023a35 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 25 Apr 2024 13:18:35 -0500 Subject: [PATCH] Remove subs.ts & subscription.ts, refactor around it --- src/controllers/api/streaming.ts | 35 +++++++------ src/controllers/nostr/relay.ts | 25 +++++++--- src/pipeline.ts | 15 +++--- src/storages/InternalRelay.ts | 12 +++-- src/subs.ts | 84 -------------------------------- src/subscription.ts | 49 ------------------- 6 files changed, 54 insertions(+), 166 deletions(-) delete mode 100644 src/subs.ts delete mode 100644 src/subscription.ts diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index f2e26386..8fa1e24c 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -5,11 +5,11 @@ import { type AppController } from '@/app.ts'; import { Conf } from '@/config.ts'; import { Debug } from '@/deps.ts'; import { getFeedPubkeys } from '@/queries.ts'; -import { Sub } from '@/subs.ts'; import { bech32ToPubkey } from '@/utils.ts'; import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts'; import { hydrateEvents } from '@/storages/hydrate.ts'; import { eventsDB } from '@/storages.ts'; +import { Storages } from '@/storages.ts'; const debug = Debug('ditto:streaming'); @@ -38,6 +38,7 @@ const streamingController: AppController = (c) => { const upgrade = c.req.header('upgrade'); const token = c.req.header('sec-websocket-protocol'); const stream = streamSchema.optional().catch(undefined).parse(c.req.query('stream')); + const controller = new AbortController(); if (upgrade?.toLowerCase() !== 'websocket') { return c.text('Please use websocket protocol', 400); @@ -63,33 +64,37 @@ const streamingController: AppController = (c) => { socket.onopen = async () => { if (!stream) return; + const filter = await topicToFilter(stream, c.req.query(), pubkey); + if (!filter) return; - if (filter) { - for await (const event of Sub.sub(socket, '1', [filter])) { - if (event.kind === 6) { - await hydrateEvents({ - events: [event], - storage: eventsDB, - signal: AbortSignal.timeout(1000), - }); + for await (const msg of Storages.pubsub.req([filter], { signal: controller.signal })) { + if (msg[0] === 'EVENT') { + const [event] = await hydrateEvents({ + events: [msg[2]], + storage: eventsDB, + signal: AbortSignal.timeout(1000), + }); - const status = await renderReblog(event, { viewerPubkey: c.get('pubkey') }); + if (event.kind === 1) { + const status = await renderStatus(event, { viewerPubkey: pubkey }); if (status) { send('update', status); } - continue; } - const status = await renderStatus(event, { viewerPubkey: pubkey }); - if (status) { - send('update', status); + + if (event.kind === 6) { + const status = await renderReblog(event, { viewerPubkey: pubkey }); + if (status) { + send('update', status); + } } } } }; socket.onclose = () => { - Sub.close(socket); + controller.abort(); }; return response; diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index c0e905ba..6705a89f 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -11,8 +11,7 @@ import { clientMsgSchema, type ClientREQ, } from '@/schemas/nostr.ts'; -import { purifyEvent } from '@/storages/hydrate.ts'; -import { Sub } from '@/subs.ts'; +import { Storages } from '@/storages.ts'; import type { AppController } from '@/app.ts'; @@ -29,6 +28,8 @@ type RelayMsg = /** Set up the Websocket connection. */ function connectStream(socket: WebSocket) { + const controllers = new Map(); + socket.onmessage = (e) => { const result = jsonSchema.pipe(clientMsgSchema).safeParse(e.data); if (result.success) { @@ -39,7 +40,9 @@ function connectStream(socket: WebSocket) { }; socket.onclose = () => { - Sub.close(socket); + for (const controller of controllers.values()) { + controller.abort(); + } }; /** Handle client message. */ @@ -64,14 +67,20 @@ function connectStream(socket: WebSocket) { async function handleReq([_, subId, ...rest]: ClientREQ): Promise { const filters = prepareFilters(rest); + const controller = new AbortController(); + controllers.get(subId)?.abort(); + controllers.set(subId, controller); + for (const event of await eventsDB.query(filters, { limit: FILTER_LIMIT })) { send(['EVENT', subId, event]); } send(['EOSE', subId]); - for await (const event of Sub.sub(socket, subId, filters)) { - send(['EVENT', subId, purifyEvent(event)]); + for await (const msg of Storages.pubsub.req(filters, { signal: controller.signal })) { + if (msg[0] === 'EVENT') { + send(['EVENT', subId, msg[2]]); + } } } @@ -93,7 +102,11 @@ function connectStream(socket: WebSocket) { /** Handle CLOSE. Close the subscription. */ function handleClose([_, subId]: ClientCLOSE): void { - Sub.unsub(socket, subId); + const controller = controllers.get(subId); + if (controller) { + controller.abort(); + controllers.delete(subId); + } } /** Handle COUNT. Return the number of events matching the filters. */ diff --git a/src/pipeline.ts b/src/pipeline.ts index cbae5055..c3feeab2 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -9,8 +9,7 @@ import { isEphemeralKind } from '@/kinds.ts'; import { DVM } from '@/pipeline/DVM.ts'; import { updateStats } from '@/stats.ts'; import { hydrateEvents, purifyEvent } from '@/storages/hydrate.ts'; -import { cache, eventsDB, reqmeister } from '@/storages.ts'; -import { Sub } from '@/subs.ts'; +import { cache, eventsDB, reqmeister, Storages } from '@/storages.ts'; import { getTagSet } from '@/tags.ts'; import { eventAge, isRelay, nostrDate, nostrNow, parseNip05, Time } from '@/utils.ts'; import { fetchWorker } from '@/workers/fetch.ts'; @@ -269,14 +268,14 @@ async function payZap(event: DittoEvent, signal: AbortSignal) { } /** Determine if the event is being received in a timely manner. */ -const isFresh = (event: NostrEvent): boolean => eventAge(event) < Time.seconds(10); +function isFresh(event: NostrEvent): boolean { + return eventAge(event) < Time.seconds(10); +} /** Distribute the event through active subscriptions. */ -function streamOut(event: NostrEvent) { - if (!isFresh(event)) return; - - for (const sub of Sub.matches(event)) { - sub.stream(event); +async function streamOut(event: NostrEvent): Promise { + if (isFresh(event)) { + await Storages.pubsub.event(event); } } diff --git a/src/storages/InternalRelay.ts b/src/storages/InternalRelay.ts index ac280451..70aba29e 100644 --- a/src/storages/InternalRelay.ts +++ b/src/storages/InternalRelay.ts @@ -12,6 +12,7 @@ import { import { matchFilter } from '@/deps.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts'; +import { purifyEvent } from '@/storages/hydrate.ts'; /** * PubSub event store for streaming events within the application. @@ -20,9 +21,12 @@ import { DittoEvent } from '@/interfaces/DittoEvent.ts'; export class InternalRelay implements NRelay { private subs = new Map }>(); - async *req(filters: NostrFilter[]): AsyncGenerator { + async *req( + filters: NostrFilter[], + opts: { signal?: AbortSignal }, + ): AsyncGenerator { const id = crypto.randomUUID(); - const machina = new Machina(); + const machina = new Machina(opts?.signal); yield ['EOSE', id]; @@ -49,10 +53,10 @@ export class InternalRelay implements NRelay { ) as { key: 'domain'; value: string } | undefined)?.value; if (domain === event.author_domain) { - return machina.push(event); + return machina.push(purifyEvent(event)); } } else { - return machina.push(event); + return machina.push(purifyEvent(event)); } } } diff --git a/src/subs.ts b/src/subs.ts deleted file mode 100644 index d3610c2d..00000000 --- a/src/subs.ts +++ /dev/null @@ -1,84 +0,0 @@ -import { NostrFilter } from '@nostrify/nostrify'; -import { Debug } from '@/deps.ts'; -import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; -import { Subscription } from '@/subscription.ts'; - -const debug = Debug('ditto:subs'); - -/** - * Manages Ditto event subscriptions. - * Subscriptions can be added, removed, and matched against events. - */ -class SubscriptionStore { - #store = new Map>(); - - /** - * Add a subscription to the store, and then iterate over it. - * - * ```ts - * for (const event of Sub.sub(socket, subId, filters)) { - * console.log(event); - * } - * ``` - */ - sub(socket: unknown, id: string, filters: NostrFilter[]): Subscription { - debug('sub', id, JSON.stringify(filters)); - let subs = this.#store.get(socket); - - if (!subs) { - subs = new Map(); - this.#store.set(socket, subs); - } - - const sub = new Subscription(filters); - - this.unsub(socket, id); - subs.set(id, sub as unknown as Subscription); - - return sub; - } - - /** Remove a subscription from the store. */ - unsub(socket: unknown, id: string): void { - debug('unsub', id); - this.#store.get(socket)?.get(id)?.close(); - this.#store.get(socket)?.delete(id); - } - - /** Remove an entire socket. */ - close(socket: unknown): void { - debug('close', (socket as any)?.constructor?.name); - const subs = this.#store.get(socket); - - if (subs) { - for (const sub of subs.values()) { - sub.close(); - } - } - - this.#store.delete(socket); - } - - /** - * Loop through matching subscriptions to stream out. - * - * ```ts - * for (const sub of Sub.matches(event, data)) { - * sub.stream(event); - * } - * ``` - */ - *matches(event: DittoEvent): Iterable { - for (const subs of this.#store.values()) { - for (const sub of subs.values()) { - if (sub.matches(event)) { - yield sub; - } - } - } - } -} - -const Sub = new SubscriptionStore(); - -export { Sub }; diff --git a/src/subscription.ts b/src/subscription.ts deleted file mode 100644 index 840a0b4f..00000000 --- a/src/subscription.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { NIP50, NostrEvent, NostrFilter } from '@nostrify/nostrify'; -import { Machina, matchFilter } from '@/deps.ts'; -import { type DittoEvent } from '@/interfaces/DittoEvent.ts'; - -class Subscription implements AsyncIterable { - filters: NostrFilter[]; - #machina: Machina; - - constructor(filters: NostrFilter[]) { - this.filters = filters; - this.#machina = new Machina(); - } - - stream(event: NostrEvent): void { - this.#machina.push(event); - } - - matches(event: DittoEvent): boolean { - for (const filter of this.filters) { - if (matchFilter(filter, event)) { - if (filter.search) { - const tokens = NIP50.parseInput(filter.search); - - const domain = (tokens.find((t) => - typeof t === 'object' && t.key === 'domain' - ) as { key: 'domain'; value: string } | undefined)?.value; - - if (domain) { - return domain === event.author_domain; - } - } - - return true; - } - } - - return false; - } - - close() { - this.#machina.close(); - } - - [Symbol.asyncIterator]() { - return this.#machina.stream(); - } -} - -export { Subscription };