Rewrite streaming controller to use hono websocket helper

This commit is contained in:
Alex Gleason 2024-06-19 19:34:06 -05:00
parent 6bdd29922a
commit 52893bab35
No known key found for this signature in database
GPG key ID: 7211D1F99744FBB7
3 changed files with 121 additions and 59 deletions

View file

@ -1,4 +1,4 @@
import { Context, Env as HonoEnv, Handler, Hono, Input as HonoInput, MiddlewareHandler } from '@hono/hono';
import { Context, Env as HonoEnv, Handler, Hono, MiddlewareHandler } from '@hono/hono';
import { cors } from '@hono/hono/cors';
import { serveStatic } from '@hono/hono/deno';
import { logger } from '@hono/hono/logger';
@ -134,7 +134,7 @@ interface AppEnv extends HonoEnv {
type AppContext = Context<AppEnv>;
type AppMiddleware = MiddlewareHandler<AppEnv>;
type AppController = Handler<AppEnv, any, HonoInput, Response | Promise<Response>>;
type AppController = Handler<AppEnv, any>;
const app = new Hono<AppEnv>({ strict: false });

View file

@ -2,13 +2,14 @@ import { NostrEvent, NostrFilter } from '@nostrify/nostrify';
import Debug from '@soapbox/stickynotes/debug';
import { z } from 'zod';
import { type AppController } from '@/app.ts';
import { AppController } from '@/app.ts';
import { Conf } from '@/config.ts';
import { DittoDB } from '@/db/DittoDB.ts';
import { MuteListPolicy } from '@/policies/MuteListPolicy.ts';
import { getFeedPubkeys } from '@/queries.ts';
import { hydrateEvents } from '@/storages/hydrate.ts';
import { Storages } from '@/storages.ts';
import { upgradeWebSocket } from '@/utils/websocket.ts';
import { bech32ToPubkey } from '@/utils.ts';
import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts';
import { renderNotification } from '@/views/mastodon/notifications.ts';
@ -51,79 +52,91 @@ const streamingController: AppController = async (c) => {
return c.json({ error: 'Invalid access token' }, 401);
}
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 });
const store = await Storages.db();
const pubsub = await Storages.pubsub();
const policy = pubkey ? new MuteListPolicy(pubkey, await Storages.admin()) : undefined;
function send(name: string, payload: object) {
if (socket.readyState === WebSocket.OPEN) {
debug('send', name, JSON.stringify(payload));
socket.send(JSON.stringify({
event: name,
payload: JSON.stringify(payload),
stream: [stream],
}));
return upgradeWebSocket((c) => {
let socket: WebSocket;
function send(name: string, payload: object): void {
if (socket.readyState === WebSocket.OPEN) {
debug('send', name, JSON.stringify(payload));
socket.send(JSON.stringify({
event: name,
payload: JSON.stringify(payload),
stream: [stream],
}));
}
}
}
async function sub(type: string, filters: NostrFilter[], render: (event: NostrEvent) => Promise<unknown>) {
try {
for await (const msg of pubsub.req(filters, { signal: controller.signal })) {
if (msg[0] === 'EVENT') {
const event = msg[2];
async function sub(
type: string,
filters: NostrFilter[],
render: (event: NostrEvent) => Promise<unknown>,
): Promise<void> {
try {
for await (const msg of pubsub.req(filters, { signal: controller.signal })) {
if (msg[0] === 'EVENT') {
const event = msg[2];
if (policy) {
const [, , ok] = await policy.call(event);
if (!ok) {
continue;
if (policy) {
const [, , ok] = await policy.call(event);
if (!ok) {
continue;
}
}
await hydrateEvents({ events: [event], store, signal: AbortSignal.timeout(1000) });
const result = await render(event);
if (result) {
send(type, result);
}
}
await hydrateEvents({ events: [event], store, signal: AbortSignal.timeout(1000) });
const result = await render(event);
if (result) {
send(type, result);
}
}
} catch (e) {
debug('streaming error:', e);
}
} catch (e) {
debug('streaming error:', e);
}
}
socket.onopen = async () => {
if (!stream) return;
const topicFilter = await topicToFilter(stream, c.req.query(), pubkey);
return {
async onOpen(_event, ws) {
socket = ws.raw as WebSocket;
if (topicFilter) {
sub('update', [topicFilter], async (event) => {
if (event.kind === 1) {
return await renderStatus(event, { viewerPubkey: pubkey });
if (!stream) return;
const topicFilter = await topicToFilter(stream, c.req.query(), pubkey);
if (topicFilter) {
sub('update', [topicFilter], async (event) => {
if (event.kind === 1) {
return await renderStatus(event, { viewerPubkey: pubkey });
}
if (event.kind === 6) {
return await renderReblog(event, { viewerPubkey: pubkey });
}
});
}
if (event.kind === 6) {
return await renderReblog(event, { viewerPubkey: pubkey });
if (['user', 'user:notification'].includes(stream) && pubkey) {
sub('notification', [{ '#p': [pubkey] }], async (event) => {
return await renderNotification(event, { viewerPubkey: pubkey });
});
return;
}
});
}
if (['user', 'user:notification'].includes(stream) && pubkey) {
sub('notification', [{ '#p': [pubkey] }], async (event) => {
return await renderNotification(event, { viewerPubkey: pubkey });
});
return;
}
};
socket.onclose = () => {
controller.abort();
};
return response;
},
onMessage(event, ws) {
if (typeof event.data !== 'string') {
ws.close(1003, 'Unsupported data type');
}
},
onClose(_event) {
controller.abort();
},
};
}, { protocol: token, idleTimeout: 30 });
};
async function topicToFilter(

49
src/utils/websocket.ts Normal file
View file

@ -0,0 +1,49 @@
import { Context, MiddlewareHandler } from '@hono/hono';
import { WSContext, WSEvents, WSReadyState } from '@hono/hono/ws';
type UpgradeWebSocket = (
createEvents: (c: Context) => WSEvents | Promise<WSEvents>,
options?: Deno.UpgradeWebSocketOptions,
) => MiddlewareHandler<
any,
string,
{
outputFormat: 'ws';
}
>;
/**
* A modified version of Hono's WebSocket Helper, just to support Deno's `options` object.
*
* See: https://github.com/honojs/hono/issues/2997
*
* (If that issue gets fixed, we can remove this code.)
*/
export const upgradeWebSocket: UpgradeWebSocket = (createEvents, options) => async (c, next) => {
if (c.req.header('upgrade') !== 'websocket') {
return await next();
}
const events = await createEvents(c);
const { response, socket } = Deno.upgradeWebSocket(c.req.raw, options);
const wsContext: WSContext = {
binaryType: 'arraybuffer',
close: (code, reason) => socket.close(code, reason),
get protocol() {
return socket.protocol;
},
raw: socket,
get readyState() {
return socket.readyState as WSReadyState;
},
url: socket.url ? new URL(socket.url) : null,
send: (source) => socket.send(source),
};
socket.onopen = (evt) => events.onOpen?.(evt, wsContext);
socket.onmessage = (evt) => events.onMessage?.(evt, wsContext);
socket.onclose = (evt) => events.onClose?.(evt, wsContext);
socket.onerror = (evt) => events.onError?.(evt, wsContext);
return response;
};