From baace5ea2d63c5e4fe75cabd73c05bdbe05cc7a2 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 25 Aug 2023 13:35:20 -0500 Subject: [PATCH 1/7] Refactor streaming to use async iterators --- src/controllers/nostr/relay.ts | 10 +++--- src/pipeline.ts | 4 +-- src/subs.ts | 63 ++++++++++++++++++---------------- src/subscription.ts | 46 +++++++++++++++++++++++++ 4 files changed, 85 insertions(+), 38 deletions(-) create mode 100644 src/subscription.ts diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index e7b954ee..92607f85 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -63,11 +63,9 @@ function connectStream(socket: WebSocket) { send(['EOSE', subId]); - Sub.sub({ - id: subId, - filters: prepared, - socket, - }); + for await (const event of Sub.sub(socket, subId, prepared)) { + send(['EVENT', subId, event]); + } } /** Handle EVENT. Store the event. */ @@ -87,7 +85,7 @@ function connectStream(socket: WebSocket) { /** Handle CLOSE. Close the subscription. */ function handleClose([_, subId]: ClientCLOSE): void { - Sub.unsub({ id: subId, socket }); + Sub.unsub(socket, subId); } /** Send a message back to the client. */ diff --git a/src/pipeline.ts b/src/pipeline.ts index a6eb15fa..e1a9c3ab 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -93,8 +93,8 @@ const isFresh = ({ created_at }: Event): boolean => created_at >= nostrNow() - T function streamOut(event: Event, data: EventData) { if (!isFresh(event)) return; - for (const { socket, id } of Sub.matches(event, data)) { - socket.send(JSON.stringify(['EVENT', id, event])); + for (const sub of Sub.matches(event, data)) { + sub.stream(event); } } diff --git a/src/subs.ts b/src/subs.ts index 82041c66..f5506780 100644 --- a/src/subs.ts +++ b/src/subs.ts @@ -1,52 +1,56 @@ import { type Event } from '@/deps.ts'; -import { matchDittoFilters } from './filter.ts'; +import { Subscription } from '@/subscription.ts'; import type { DittoFilter, EventData } from '@/types.ts'; -/** Nostr subscription to receive realtime events. */ -interface Subscription { - /** User-defined NIP-01 subscription ID. */ - id: string; - /** Event filters for the subscription. */ - filters: DittoFilter[]; - /** WebSocket to deliver results to. */ - socket: WebSocket; -} - /** * Manages Ditto event subscriptions. - * * Subscriptions can be added, removed, and matched against events. - * - * ```ts - * for (const sub of Sub.matches(event)) { - * // Send event to sub.socket - * sub.socket.send(JSON.stringify(event)); - * } - * ``` */ class SubscriptionStore { #store = new Map>(); - /** Add a subscription to the store. */ - sub(data: Subscription): void { - let subs = this.#store.get(data.socket); + /** + * Add a subscription to the store, and then iterate over it. + * + * ```ts + * for (const event of Sub.sub(socket, subId, filters)) { + * console.log(event); + * } + * ``` + */ + sub(socket: WebSocket, id: string, filters: DittoFilter[]): Subscription { + let subs = this.#store.get(socket); if (!subs) { subs = new Map(); - this.#store.set(data.socket, subs); + this.#store.set(socket, subs); } - subs.set(data.id, data); + const sub = new Subscription(filters); + + this.unsub(socket, id); + subs.set(id, sub); + + return sub; } /** Remove a subscription from the store. */ - unsub(sub: Pick): void { - this.#store.get(sub.socket)?.delete(sub.id); + unsub(socket: WebSocket, id: string): void { + this.#store.get(socket)?.get(id)?.close(); + this.#store.get(socket)?.delete(id); } /** Remove an entire socket. */ close(socket: WebSocket): void { + const subs = this.#store.get(socket); + + if (subs) { + for (const sub of subs.values()) { + sub.close(); + } + } + this.#store.delete(socket); } @@ -54,16 +58,15 @@ class SubscriptionStore { * Loop through matching subscriptions to stream out. * * ```ts - * for (const sub of Sub.matches(event)) { - * // Send event to sub.socket - * sub.socket.send(JSON.stringify(event)); + * for (const sub of Sub.matches(event, data)) { + * sub.stream(event); * } * ``` */ *matches(event: Event, data: EventData): Iterable { for (const subs of this.#store.values()) { for (const sub of subs.values()) { - if (matchDittoFilters(sub.filters, event, data)) { + if (sub.matches(event, data)) { yield sub; } } diff --git a/src/subscription.ts b/src/subscription.ts new file mode 100644 index 00000000..24a78f0c --- /dev/null +++ b/src/subscription.ts @@ -0,0 +1,46 @@ +import { type Event } from '@/deps.ts'; +import { matchDittoFilters } from '@/filter.ts'; + +import type { DittoFilter, EventData } from '@/types.ts'; + +class Subscription implements AsyncIterable> { + filters: DittoFilter[]; + #next?: (event: Event) => void; + #closed = false; + + constructor(filters: DittoFilter[]) { + this.filters = filters; + } + + stream(event: Event): void { + if (this.#next) { + this.#next(event); + this.#next = undefined; + } + } + + matches(event: Event, data: EventData): boolean { + return matchDittoFilters(this.filters, event, data); + } + + close() { + this.#closed = true; + this.#next?.(undefined!); + } + + async *[Symbol.asyncIterator]() { + while (true) { + const event = await new Promise>((resolve) => { + this.#next = resolve; + }); + + if (this.#closed) { + return; + } + + yield event; + } + } +} + +export { Subscription }; From c18d7b952fc0c089515cd55ab2effd824b0c74ab Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 25 Aug 2023 13:38:21 -0500 Subject: [PATCH 2/7] Move filter types into filter.ts --- src/client.ts | 2 +- src/db/events.ts | 2 +- src/filter.ts | 19 ++++++++++++++++--- src/mixer.ts | 2 +- src/subs.ts | 3 ++- src/subscription.ts | 4 ++-- src/types.ts | 17 +---------------- 7 files changed, 24 insertions(+), 25 deletions(-) diff --git a/src/client.ts b/src/client.ts index 0c55d646..dd0efcc3 100644 --- a/src/client.ts +++ b/src/client.ts @@ -3,7 +3,7 @@ import { type Event, type Filter, matchFilters, RelayPool, TTLCache } from '@/de import * as pipeline from '@/pipeline.ts'; import { Time } from '@/utils.ts'; -import type { GetFiltersOpts } from '@/types.ts'; +import type { GetFiltersOpts } from '@/filter.ts'; type Pool = InstanceType; diff --git a/src/db/events.ts b/src/db/events.ts index 77d0204a..48b9666c 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -1,7 +1,7 @@ import { db, type TagRow } from '@/db.ts'; import { type Event, type Insertable } from '@/deps.ts'; -import type { DittoFilter, GetFiltersOpts } from '@/types.ts'; +import type { DittoFilter, GetFiltersOpts } from '@/filter.ts'; type TagCondition = ({ event, count }: { event: Event; count: number }) => boolean; diff --git a/src/filter.ts b/src/filter.ts index 9cde0d1c..605d4cb6 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -1,6 +1,19 @@ -import { type Event, matchFilters } from '@/deps.ts'; +import { type Event, type Filter, matchFilters } from '@/deps.ts'; -import type { DittoFilter, EventData } from '@/types.ts'; +import type { EventData } from '@/types.ts'; + +/** Custom filter interface that extends Nostr filters with extra options for Ditto. */ +interface DittoFilter extends Filter { + local?: boolean; +} + +/** Additional options to apply to the whole subscription. */ +interface GetFiltersOpts { + /** How long to wait (in milliseconds) until aborting the request. */ + timeout?: number; + /** Event limit for the whole subscription. */ + limit?: number; +} function matchDittoFilter(filter: DittoFilter, event: Event, data: EventData): boolean { if (filter.local && !data.user) { @@ -24,4 +37,4 @@ function matchDittoFilters(filters: DittoFilter[], event: Event, data: EventData return false; } -export { matchDittoFilters }; +export { type DittoFilter, type GetFiltersOpts, matchDittoFilters }; diff --git a/src/mixer.ts b/src/mixer.ts index c2e6b3a6..4c160b3e 100644 --- a/src/mixer.ts +++ b/src/mixer.ts @@ -4,7 +4,7 @@ import * as client from '@/client.ts'; import * as eventsDB from '@/db/events.ts'; import { eventDateComparator } from '@/utils.ts'; -import type { DittoFilter, GetFiltersOpts } from '@/types.ts'; +import type { DittoFilter, GetFiltersOpts } from '@/filter.ts'; /** Get filters from the database and pool, and mix the best results together. */ async function getFilters( diff --git a/src/subs.ts b/src/subs.ts index f5506780..5c175c3c 100644 --- a/src/subs.ts +++ b/src/subs.ts @@ -1,7 +1,8 @@ import { type Event } from '@/deps.ts'; import { Subscription } from '@/subscription.ts'; -import type { DittoFilter, EventData } from '@/types.ts'; +import type { DittoFilter } from '@/filter.ts'; +import type { EventData } from '@/types.ts'; /** * Manages Ditto event subscriptions. diff --git a/src/subscription.ts b/src/subscription.ts index 24a78f0c..227c2f41 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -1,7 +1,7 @@ import { type Event } from '@/deps.ts'; -import { matchDittoFilters } from '@/filter.ts'; +import { type DittoFilter, matchDittoFilters } from '@/filter.ts'; -import type { DittoFilter, EventData } from '@/types.ts'; +import type { EventData } from '@/types.ts'; class Subscription implements AsyncIterable> { filters: DittoFilter[]; diff --git a/src/types.ts b/src/types.ts index beb184bd..11f933d7 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,21 +1,6 @@ import { UserRow } from '@/db.ts'; -import { type Filter } from '@/deps.ts'; - -/** Custom filter interface that extends Nostr filters with extra options for Ditto. */ -interface DittoFilter extends Filter { - local?: boolean; -} - -/** Additional options to apply to the whole subscription. */ -interface GetFiltersOpts { - /** How long to wait (in milliseconds) until aborting the request. */ - timeout?: number; - /** Event limit for the whole subscription. */ - limit?: number; -} - interface EventData { user: UserRow | undefined; } -export type { DittoFilter, EventData, GetFiltersOpts }; +export type { EventData }; From d1117f55135017bb3619fee6f3d7019fb71d4d58 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 25 Aug 2023 13:42:58 -0500 Subject: [PATCH 3/7] relay: improve variable names in handleReq --- src/controllers/nostr/relay.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 92607f85..3e03f490 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -54,16 +54,16 @@ function connectStream(socket: WebSocket) { } /** Handle REQ. Start a subscription. */ - async function handleReq([_, subId, ...filters]: ClientREQ): Promise { - const prepared = prepareFilters(filters); + async function handleReq([_, subId, ...rest]: ClientREQ): Promise { + const filters = prepareFilters(rest); - for (const event of await eventsDB.getFilters(prepared)) { + for (const event of await eventsDB.getFilters(filters)) { send(['EVENT', subId, event]); } send(['EOSE', subId]); - for await (const event of Sub.sub(socket, subId, prepared)) { + for await (const event of Sub.sub(socket, subId, filters)) { send(['EVENT', subId, event]); } } From f7cd67c57242e3cbd3e9c0d7234fb5c01a60b902 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 25 Aug 2023 14:59:37 -0500 Subject: [PATCH 4/7] Make MastoAPI streaming work for public feeds --- src/controllers/api/streaming.ts | 39 +++++++++++++++++++++++++++++--- src/subs.ts | 4 ++-- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 412e0240..76a23c16 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -1,6 +1,10 @@ import { AppController } from '@/app.ts'; +import { type Event } from '@/deps.ts'; +import { type DittoFilter } from '@/filter.ts'; import { TOKEN_REGEX } from '@/middleware/auth19.ts'; import { streamSchema, ws } from '@/stream.ts'; +import { Sub } from '@/subs.ts'; +import { toStatus } from '@/transformers/nostr-to-mastoapi.ts'; import { bech32ToPubkey } from '@/utils.ts'; const streamingController: AppController = (c) => { @@ -29,10 +33,30 @@ const streamingController: AppController = (c) => { pubkey: bech32ToPubkey(match[1]), }; - socket.addEventListener('open', () => { + function send(name: string, payload: object) { + if (socket.readyState === WebSocket.OPEN) { + socket.send(JSON.stringify({ + event: name, + payload: JSON.stringify(payload), + })); + } + } + + socket.addEventListener('open', async () => { console.log('websocket: connection opened'); - if (stream) { - ws.subscribe(conn, { stream }); + if (!stream) return; + + ws.subscribe(conn, { stream }); + + const filter = topicToFilter(stream); + + if (filter) { + for await (const event of Sub.sub(socket, '1', [filter])) { + const status = await toStatus(event); + if (status) { + send('update', status); + } + } } }); @@ -46,4 +70,13 @@ const streamingController: AppController = (c) => { return response; }; +function topicToFilter(topic: string): DittoFilter<1> | undefined { + switch (topic) { + case 'public': + return { kinds: [1] }; + case 'public:local': + return { kinds: [1], local: true }; + } +} + export { streamingController }; diff --git a/src/subs.ts b/src/subs.ts index 5c175c3c..eebfac1b 100644 --- a/src/subs.ts +++ b/src/subs.ts @@ -20,7 +20,7 @@ class SubscriptionStore { * } * ``` */ - sub(socket: WebSocket, id: string, filters: DittoFilter[]): Subscription { + sub(socket: WebSocket, id: string, filters: DittoFilter[]): Subscription { let subs = this.#store.get(socket); if (!subs) { @@ -31,7 +31,7 @@ class SubscriptionStore { const sub = new Subscription(filters); this.unsub(socket, id); - subs.set(id, sub); + subs.set(id, sub as unknown as Subscription); return sub; } From b60e84d29bb7f348c797fccd5fdcd4eab29dc828 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 25 Aug 2023 15:00:48 -0500 Subject: [PATCH 5/7] relay: only send if socket is open --- src/controllers/api/streaming.ts | 1 - src/controllers/nostr/relay.ts | 4 +++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 76a23c16..071c1fa5 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -1,5 +1,4 @@ import { AppController } from '@/app.ts'; -import { type Event } from '@/deps.ts'; import { type DittoFilter } from '@/filter.ts'; import { TOKEN_REGEX } from '@/middleware/auth19.ts'; import { streamSchema, ws } from '@/stream.ts'; diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 3e03f490..ca659cf9 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -90,7 +90,9 @@ function connectStream(socket: WebSocket) { /** Send a message back to the client. */ function send(msg: RelayMsg): void { - return socket.send(JSON.stringify(msg)); + if (socket.readyState === WebSocket.OPEN) { + socket.send(JSON.stringify(msg)); + } } } From a93d77c85171b10135e84aa379d920a430a90672 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 25 Aug 2023 16:17:45 -0500 Subject: [PATCH 6/7] streaming: add `stream` param to streaming events --- src/controllers/api/streaming.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 071c1fa5..bc91eeff 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -37,6 +37,7 @@ const streamingController: AppController = (c) => { socket.send(JSON.stringify({ event: name, payload: JSON.stringify(payload), + stream: [stream], })); } } From 9da03aa053ccb85986359fb3fe4ce008984632e3 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Fri, 25 Aug 2023 17:17:26 -0500 Subject: [PATCH 7/7] streaming: socket.addEventListener('open', ...) --> socket.onopen, etc --- src/controllers/api/streaming.ts | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index bc91eeff..8b655f97 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -42,8 +42,7 @@ const streamingController: AppController = (c) => { } } - socket.addEventListener('open', async () => { - console.log('websocket: connection opened'); + socket.onopen = async () => { if (!stream) return; ws.subscribe(conn, { stream }); @@ -58,14 +57,11 @@ const streamingController: AppController = (c) => { } } } - }); + }; - socket.addEventListener('message', (e) => console.log('websocket message: ', e.data)); - - socket.addEventListener('close', () => { - console.log('websocket: connection closed'); + socket.onclose = () => { ws.unsubscribeAll(socket); - }); + }; return response; };