From 52893bab357a04d59ffe153f8c3bb0eb4ddd6d7a Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 19 Jun 2024 19:34:06 -0500 Subject: [PATCH] Rewrite streaming controller to use hono websocket helper --- src/app.ts | 4 +- src/controllers/api/streaming.ts | 127 +++++++++++++++++-------------- src/utils/websocket.ts | 49 ++++++++++++ 3 files changed, 121 insertions(+), 59 deletions(-) create mode 100644 src/utils/websocket.ts diff --git a/src/app.ts b/src/app.ts index 0c21ecf0..58881b20 100644 --- a/src/app.ts +++ b/src/app.ts @@ -1,4 +1,4 @@ -import { Context, Env as HonoEnv, Handler, Hono, Input as HonoInput, MiddlewareHandler } from '@hono/hono'; +import { Context, Env as HonoEnv, Handler, Hono, MiddlewareHandler } from '@hono/hono'; import { cors } from '@hono/hono/cors'; import { serveStatic } from '@hono/hono/deno'; import { logger } from '@hono/hono/logger'; @@ -134,7 +134,7 @@ interface AppEnv extends HonoEnv { type AppContext = Context; type AppMiddleware = MiddlewareHandler; -type AppController = Handler>; +type AppController = Handler; const app = new Hono({ strict: false }); diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 552ea3bd..2d997099 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -2,13 +2,14 @@ import { NostrEvent, NostrFilter } from '@nostrify/nostrify'; import Debug from '@soapbox/stickynotes/debug'; import { z } from 'zod'; -import { type AppController } from '@/app.ts'; +import { AppController } from '@/app.ts'; import { Conf } from '@/config.ts'; import { DittoDB } from '@/db/DittoDB.ts'; import { MuteListPolicy } from '@/policies/MuteListPolicy.ts'; import { getFeedPubkeys } from '@/queries.ts'; import { hydrateEvents } from '@/storages/hydrate.ts'; import { Storages } from '@/storages.ts'; +import { upgradeWebSocket } from '@/utils/websocket.ts'; import { bech32ToPubkey } from '@/utils.ts'; import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts'; import { renderNotification } from '@/views/mastodon/notifications.ts'; @@ -51,79 +52,91 @@ const streamingController: AppController = async (c) => { return c.json({ error: 'Invalid access token' }, 401); } - const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 }); - const store = await Storages.db(); const pubsub = await Storages.pubsub(); const policy = pubkey ? new MuteListPolicy(pubkey, await Storages.admin()) : undefined; - function send(name: string, payload: object) { - if (socket.readyState === WebSocket.OPEN) { - debug('send', name, JSON.stringify(payload)); - socket.send(JSON.stringify({ - event: name, - payload: JSON.stringify(payload), - stream: [stream], - })); + return upgradeWebSocket((c) => { + let socket: WebSocket; + + function send(name: string, payload: object): void { + if (socket.readyState === WebSocket.OPEN) { + debug('send', name, JSON.stringify(payload)); + socket.send(JSON.stringify({ + event: name, + payload: JSON.stringify(payload), + stream: [stream], + })); + } } - } - async function sub(type: string, filters: NostrFilter[], render: (event: NostrEvent) => Promise) { - try { - for await (const msg of pubsub.req(filters, { signal: controller.signal })) { - if (msg[0] === 'EVENT') { - const event = msg[2]; + async function sub( + type: string, + filters: NostrFilter[], + render: (event: NostrEvent) => Promise, + ): Promise { + try { + for await (const msg of pubsub.req(filters, { signal: controller.signal })) { + if (msg[0] === 'EVENT') { + const event = msg[2]; - if (policy) { - const [, , ok] = await policy.call(event); - if (!ok) { - continue; + if (policy) { + const [, , ok] = await policy.call(event); + if (!ok) { + continue; + } + } + + await hydrateEvents({ events: [event], store, signal: AbortSignal.timeout(1000) }); + + const result = await render(event); + + if (result) { + send(type, result); } } - - await hydrateEvents({ events: [event], store, signal: AbortSignal.timeout(1000) }); - - const result = await render(event); - - if (result) { - send(type, result); - } } + } catch (e) { + debug('streaming error:', e); } - } catch (e) { - debug('streaming error:', e); } - } - socket.onopen = async () => { - if (!stream) return; - const topicFilter = await topicToFilter(stream, c.req.query(), pubkey); + return { + async onOpen(_event, ws) { + socket = ws.raw as WebSocket; - if (topicFilter) { - sub('update', [topicFilter], async (event) => { - if (event.kind === 1) { - return await renderStatus(event, { viewerPubkey: pubkey }); + if (!stream) return; + const topicFilter = await topicToFilter(stream, c.req.query(), pubkey); + + if (topicFilter) { + sub('update', [topicFilter], async (event) => { + if (event.kind === 1) { + return await renderStatus(event, { viewerPubkey: pubkey }); + } + if (event.kind === 6) { + return await renderReblog(event, { viewerPubkey: pubkey }); + } + }); } - if (event.kind === 6) { - return await renderReblog(event, { viewerPubkey: pubkey }); + + if (['user', 'user:notification'].includes(stream) && pubkey) { + sub('notification', [{ '#p': [pubkey] }], async (event) => { + return await renderNotification(event, { viewerPubkey: pubkey }); + }); + return; } - }); - } - - if (['user', 'user:notification'].includes(stream) && pubkey) { - sub('notification', [{ '#p': [pubkey] }], async (event) => { - return await renderNotification(event, { viewerPubkey: pubkey }); - }); - return; - } - }; - - socket.onclose = () => { - controller.abort(); - }; - - return response; + }, + onMessage(event, ws) { + if (typeof event.data !== 'string') { + ws.close(1003, 'Unsupported data type'); + } + }, + onClose(_event) { + controller.abort(); + }, + }; + }, { protocol: token, idleTimeout: 30 }); }; async function topicToFilter( diff --git a/src/utils/websocket.ts b/src/utils/websocket.ts new file mode 100644 index 00000000..0a685b9b --- /dev/null +++ b/src/utils/websocket.ts @@ -0,0 +1,49 @@ +import { Context, MiddlewareHandler } from '@hono/hono'; +import { WSContext, WSEvents, WSReadyState } from '@hono/hono/ws'; + +type UpgradeWebSocket = ( + createEvents: (c: Context) => WSEvents | Promise, + options?: Deno.UpgradeWebSocketOptions, +) => MiddlewareHandler< + any, + string, + { + outputFormat: 'ws'; + } +>; + +/** + * A modified version of Hono's WebSocket Helper, just to support Deno's `options` object. + * + * See: https://github.com/honojs/hono/issues/2997 + * + * (If that issue gets fixed, we can remove this code.) + */ +export const upgradeWebSocket: UpgradeWebSocket = (createEvents, options) => async (c, next) => { + if (c.req.header('upgrade') !== 'websocket') { + return await next(); + } + + const events = await createEvents(c); + const { response, socket } = Deno.upgradeWebSocket(c.req.raw, options); + + const wsContext: WSContext = { + binaryType: 'arraybuffer', + close: (code, reason) => socket.close(code, reason), + get protocol() { + return socket.protocol; + }, + raw: socket, + get readyState() { + return socket.readyState as WSReadyState; + }, + url: socket.url ? new URL(socket.url) : null, + send: (source) => socket.send(source), + }; + socket.onopen = (evt) => events.onOpen?.(evt, wsContext); + socket.onmessage = (evt) => events.onMessage?.(evt, wsContext); + socket.onclose = (evt) => events.onClose?.(evt, wsContext); + socket.onerror = (evt) => events.onError?.(evt, wsContext); + + return response; +};