diff --git a/src/client.ts b/src/client.ts index 0c55d646..dd0efcc3 100644 --- a/src/client.ts +++ b/src/client.ts @@ -3,7 +3,7 @@ import { type Event, type Filter, matchFilters, RelayPool, TTLCache } from '@/de import * as pipeline from '@/pipeline.ts'; import { Time } from '@/utils.ts'; -import type { GetFiltersOpts } from '@/types.ts'; +import type { GetFiltersOpts } from '@/filter.ts'; type Pool = InstanceType; diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 412e0240..8b655f97 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -1,6 +1,9 @@ import { AppController } from '@/app.ts'; +import { type DittoFilter } from '@/filter.ts'; import { TOKEN_REGEX } from '@/middleware/auth19.ts'; import { streamSchema, ws } from '@/stream.ts'; +import { Sub } from '@/subs.ts'; +import { toStatus } from '@/transformers/nostr-to-mastoapi.ts'; import { bech32ToPubkey } from '@/utils.ts'; const streamingController: AppController = (c) => { @@ -29,21 +32,47 @@ const streamingController: AppController = (c) => { pubkey: bech32ToPubkey(match[1]), }; - socket.addEventListener('open', () => { - console.log('websocket: connection opened'); - if (stream) { - ws.subscribe(conn, { stream }); + function send(name: string, payload: object) { + if (socket.readyState === WebSocket.OPEN) { + socket.send(JSON.stringify({ + event: name, + payload: JSON.stringify(payload), + stream: [stream], + })); } - }); + } - socket.addEventListener('message', (e) => console.log('websocket message: ', e.data)); + socket.onopen = async () => { + if (!stream) return; - socket.addEventListener('close', () => { - console.log('websocket: connection closed'); + ws.subscribe(conn, { stream }); + + const filter = topicToFilter(stream); + + if (filter) { + for await (const event of Sub.sub(socket, '1', [filter])) { + const status = await toStatus(event); + if (status) { + send('update', status); + } + } + } + }; + + socket.onclose = () => { ws.unsubscribeAll(socket); - }); + }; return response; }; +function topicToFilter(topic: string): DittoFilter<1> | undefined { + switch (topic) { + case 'public': + return { kinds: [1] }; + case 'public:local': + return { kinds: [1], local: true }; + } +} + export { streamingController }; diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index e7b954ee..ca659cf9 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -54,20 +54,18 @@ function connectStream(socket: WebSocket) { } /** Handle REQ. Start a subscription. */ - async function handleReq([_, subId, ...filters]: ClientREQ): Promise { - const prepared = prepareFilters(filters); + async function handleReq([_, subId, ...rest]: ClientREQ): Promise { + const filters = prepareFilters(rest); - for (const event of await eventsDB.getFilters(prepared)) { + for (const event of await eventsDB.getFilters(filters)) { send(['EVENT', subId, event]); } send(['EOSE', subId]); - Sub.sub({ - id: subId, - filters: prepared, - socket, - }); + for await (const event of Sub.sub(socket, subId, filters)) { + send(['EVENT', subId, event]); + } } /** Handle EVENT. Store the event. */ @@ -87,12 +85,14 @@ function connectStream(socket: WebSocket) { /** Handle CLOSE. Close the subscription. */ function handleClose([_, subId]: ClientCLOSE): void { - Sub.unsub({ id: subId, socket }); + Sub.unsub(socket, subId); } /** Send a message back to the client. */ function send(msg: RelayMsg): void { - return socket.send(JSON.stringify(msg)); + if (socket.readyState === WebSocket.OPEN) { + socket.send(JSON.stringify(msg)); + } } } diff --git a/src/db/events.ts b/src/db/events.ts index 77d0204a..48b9666c 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -1,7 +1,7 @@ import { db, type TagRow } from '@/db.ts'; import { type Event, type Insertable } from '@/deps.ts'; -import type { DittoFilter, GetFiltersOpts } from '@/types.ts'; +import type { DittoFilter, GetFiltersOpts } from '@/filter.ts'; type TagCondition = ({ event, count }: { event: Event; count: number }) => boolean; diff --git a/src/filter.ts b/src/filter.ts index 9cde0d1c..605d4cb6 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -1,6 +1,19 @@ -import { type Event, matchFilters } from '@/deps.ts'; +import { type Event, type Filter, matchFilters } from '@/deps.ts'; -import type { DittoFilter, EventData } from '@/types.ts'; +import type { EventData } from '@/types.ts'; + +/** Custom filter interface that extends Nostr filters with extra options for Ditto. */ +interface DittoFilter extends Filter { + local?: boolean; +} + +/** Additional options to apply to the whole subscription. */ +interface GetFiltersOpts { + /** How long to wait (in milliseconds) until aborting the request. */ + timeout?: number; + /** Event limit for the whole subscription. */ + limit?: number; +} function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean { if (filter.local && !data.user) { @@ -24,4 +37,4 @@ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData return false; } -export { matchDittoFilters }; +export { type DittoFilter, type GetFiltersOpts, matchDittoFilters }; diff --git a/src/mixer.ts b/src/mixer.ts index c2e6b3a6..4c160b3e 100644 --- a/src/mixer.ts +++ b/src/mixer.ts @@ -4,7 +4,7 @@ import * as client from '@/client.ts'; import * as eventsDB from '@/db/events.ts'; import { eventDateComparator } from '@/utils.ts'; -import type { DittoFilter, GetFiltersOpts } from '@/types.ts'; +import type { DittoFilter, GetFiltersOpts } from '@/filter.ts'; /** Get filters from the database and pool, and mix the best results together. */ async function getFilters( diff --git a/src/pipeline.ts b/src/pipeline.ts index a6eb15fa..e1a9c3ab 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -93,8 +93,8 @@ const isFresh = ({ created_at }: Event): boolean => created_at >= nostrNow() - T function streamOut(event: Event, data: EventData) { if (!isFresh(event)) return; - for (const { socket, id } of Sub.matches(event, data)) { - socket.send(JSON.stringify(['EVENT', id, event])); + for (const sub of Sub.matches(event, data)) { + sub.stream(event); } } diff --git a/src/subs.ts b/src/subs.ts index 82041c66..eebfac1b 100644 --- a/src/subs.ts +++ b/src/subs.ts @@ -1,52 +1,57 @@ import { type Event } from '@/deps.ts'; -import { matchDittoFilters } from './filter.ts'; +import { Subscription } from '@/subscription.ts'; -import type { DittoFilter, EventData } from '@/types.ts'; - -/** Nostr subscription to receive realtime events. */ -interface Subscription { - /** User-defined NIP-01 subscription ID. */ - id: string; - /** Event filters for the subscription. */ - filters: DittoFilter[]; - /** WebSocket to deliver results to. */ - socket: WebSocket; -} +import type { DittoFilter } from '@/filter.ts'; +import type { EventData } from '@/types.ts'; /** * Manages Ditto event subscriptions. - * * Subscriptions can be added, removed, and matched against events. - * - * ```ts - * for (const sub of Sub.matches(event)) { - * // Send event to sub.socket - * sub.socket.send(JSON.stringify(event)); - * } - * ``` */ class SubscriptionStore { #store = new Map>(); - /** Add a subscription to the store. */ - sub(data: Subscription): void { - let subs = this.#store.get(data.socket); + /** + * Add a subscription to the store, and then iterate over it. + * + * ```ts + * for (const event of Sub.sub(socket, subId, filters)) { + * console.log(event); + * } + * ``` + */ + sub(socket: WebSocket, id: string, filters: DittoFilter[]): Subscription { + let subs = this.#store.get(socket); if (!subs) { subs = new Map(); - this.#store.set(data.socket, subs); + this.#store.set(socket, subs); } - subs.set(data.id, data); + const sub = new Subscription(filters); + + this.unsub(socket, id); + subs.set(id, sub as unknown as Subscription); + + return sub; } /** Remove a subscription from the store. */ - unsub(sub: Pick): void { - this.#store.get(sub.socket)?.delete(sub.id); + unsub(socket: WebSocket, id: string): void { + this.#store.get(socket)?.get(id)?.close(); + this.#store.get(socket)?.delete(id); } /** Remove an entire socket. */ close(socket: WebSocket): void { + const subs = this.#store.get(socket); + + if (subs) { + for (const sub of subs.values()) { + sub.close(); + } + } + this.#store.delete(socket); } @@ -54,16 +59,15 @@ class SubscriptionStore { * Loop through matching subscriptions to stream out. * * ```ts - * for (const sub of Sub.matches(event)) { - * // Send event to sub.socket - * sub.socket.send(JSON.stringify(event)); + * for (const sub of Sub.matches(event, data)) { + * sub.stream(event); * } * ``` */ *matches(event: Event, data: EventData): Iterable { for (const subs of this.#store.values()) { for (const sub of subs.values()) { - if (matchDittoFilters(sub.filters, event, data)) { + if (sub.matches(event, data)) { yield sub; } } diff --git a/src/subscription.ts b/src/subscription.ts new file mode 100644 index 00000000..227c2f41 --- /dev/null +++ b/src/subscription.ts @@ -0,0 +1,46 @@ +import { type Event } from '@/deps.ts'; +import { type DittoFilter, matchDittoFilters } from '@/filter.ts'; + +import type { EventData } from '@/types.ts'; + +class Subscription implements AsyncIterable> { + filters: DittoFilter[]; + #next?: (event: Event) => void; + #closed = false; + + constructor(filters: DittoFilter[]) { + this.filters = filters; + } + + stream(event: Event): void { + if (this.#next) { + this.#next(event); + this.#next = undefined; + } + } + + matches(event: Event, data: EventData): boolean { + return matchDittoFilters(this.filters, event, data); + } + + close() { + this.#closed = true; + this.#next?.(undefined!); + } + + async *[Symbol.asyncIterator]() { + while (true) { + const event = await new Promise>((resolve) => { + this.#next = resolve; + }); + + if (this.#closed) { + return; + } + + yield event; + } + } +} + +export { Subscription }; diff --git a/src/types.ts b/src/types.ts index beb184bd..11f933d7 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,21 +1,6 @@ import { UserRow } from '@/db.ts'; -import { type Filter } from '@/deps.ts'; - -/** Custom filter interface that extends Nostr filters with extra options for Ditto. */ -interface DittoFilter extends Filter { - local?: boolean; -} - -/** Additional options to apply to the whole subscription. */ -interface GetFiltersOpts { - /** How long to wait (in milliseconds) until aborting the request. */ - timeout?: number; - /** Event limit for the whole subscription. */ - limit?: number; -} - interface EventData { user: UserRow | undefined; } -export type { DittoFilter, EventData, GetFiltersOpts }; +export type { EventData };