ditto/packages/ditto/controllers/api/streaming.ts

254 lines
7.1 KiB
TypeScript

import { DittoTables } from '@ditto/db';
import { MuteListPolicy } from '@ditto/policies';
import {
streamingClientMessagesCounter,
streamingConnectionsGauge,
streamingServerMessagesCounter,
} from '@ditto/metrics';
import TTLCache from '@isaacs/ttlcache';
import { NostrEvent, NostrFilter, NStore } from '@nostrify/nostrify';
import { logi } from '@soapbox/logi';
import { Kysely } from 'kysely';
import { z } from 'zod';
import { type AppController } from '@/app.ts';
import { getFeedPubkeys } from '@/queries.ts';
import { AdminStore } from '@/storages/AdminStore.ts';
import { hydrateEvents } from '@/storages/hydrate.ts';
import { getTokenHash } from '../../../utils/auth.ts';
import { errorJson } from '../../../utils/log.ts';
import { bech32ToPubkey, Time } from '@/utils.ts';
import { StatusView } from '@/views/mastodon/StatusView.ts';
import { NotificationView } from '@/views/mastodon/NotificationView.ts';
import { HTTPException } from '@hono/hono/http-exception';
/**
* Streaming timelines/categories.
* https://docs.joinmastodon.org/methods/streaming/#streams
*/
const streamSchema = z.enum([
'public',
'public:media',
'public:local',
'public:local:media',
'public:remote',
'public:remote:media',
'hashtag',
'hashtag:local',
'user',
'user:notification',
'list',
'direct',
]);
type Stream = z.infer<typeof streamSchema>;
/** https://docs.joinmastodon.org/methods/streaming/#events-11 */
interface StreamingEvent {
/** https://docs.joinmastodon.org/methods/streaming/#events */
event:
| 'update'
| 'delete'
| 'notification'
| 'filters_changed'
| 'conversation'
| 'announcement'
| 'announcement.reaction'
| 'announcement.delete'
| 'status.update'
| 'encrypted_message'
| 'notifications_merged';
payload: string;
stream: Stream[];
}
const LIMITER_WINDOW = Time.minutes(5);
const LIMITER_LIMIT = 100;
const limiter = new TTLCache<string, number>();
const connections = new Set<WebSocket>();
const streamingController: AppController = async (c) => {
const { conf, kysely, store, pubsub } = c.var;
const upgrade = c.req.header('upgrade');
const token = c.req.header('sec-websocket-protocol');
const stream = streamSchema.optional().catch(undefined).parse(c.req.query('stream'));
const controller = new AbortController();
if (upgrade?.toLowerCase() !== 'websocket') {
return c.text('Please use websocket protocol', 400);
}
const pubkey = token ? await getTokenPubkey(kysely, token) : undefined;
if (token && !pubkey) {
return c.json({ error: 'Invalid access token' }, 401);
}
const ip = c.req.header('x-real-ip');
if (ip) {
const count = limiter.get(ip) ?? 0;
if (count > LIMITER_LIMIT) {
return c.json({ error: 'Rate limit exceeded' }, 429);
}
}
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 });
const statusView = new StatusView(c.var);
const adminStore = new AdminStore(conf, store);
const notificationView = new NotificationView(c.var);
const policy = pubkey ? new MuteListPolicy(pubkey, adminStore) : undefined;
function send(e: StreamingEvent) {
if (socket.readyState === WebSocket.OPEN) {
streamingServerMessagesCounter.inc();
socket.send(JSON.stringify(e));
}
}
async function sub(filters: NostrFilter[], render: (event: NostrEvent) => Promise<StreamingEvent | undefined>) {
try {
for await (const msg of pubsub.req(filters, { signal: controller.signal })) {
if (msg[0] === 'EVENT') {
const event = msg[2];
if (policy) {
const [, , ok] = await policy.call(event);
if (!ok) {
continue;
}
}
await hydrateEvents({ conf, kysely, store, signal: AbortSignal.timeout(1000) }, [event]);
const result = await render(event);
if (result) {
send(result);
}
}
}
} catch (e) {
logi({ level: 'error', ns: 'ditto.streaming', msg: 'Error in streaming', error: errorJson(e) });
}
}
socket.onopen = async () => {
connections.add(socket);
streamingConnectionsGauge.set(connections.size);
if (!stream) return;
const topicFilter = await topicToFilter(store, stream, c.req.query(), pubkey, conf.url.host);
if (topicFilter) {
sub([topicFilter], async (event) => {
let payload: object | undefined;
if ([1, 6, 20, 1111].includes(event.kind)) {
payload = await statusView.render(event);
}
if (payload) {
return {
event: 'update',
payload: JSON.stringify(payload),
stream: [stream],
};
}
});
}
if (['user', 'user:notification'].includes(stream) && pubkey) {
sub([{ '#p': [pubkey] }], async (event) => {
if (event.pubkey === pubkey) return; // skip own events
const payload = await notificationView.render(event);
if (payload) {
return {
event: 'notification',
payload: JSON.stringify(payload),
stream: [stream],
};
}
});
return;
}
};
socket.onmessage = (e) => {
streamingClientMessagesCounter.inc();
if (ip) {
const count = limiter.get(ip) ?? 0;
limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW });
if (count > LIMITER_LIMIT) {
socket.close(1008, 'Rate limit exceeded');
return;
}
}
if (typeof e.data !== 'string') {
socket.close(1003, 'Invalid message');
return;
}
};
socket.onclose = () => {
connections.delete(socket);
streamingConnectionsGauge.set(connections.size);
controller.abort();
};
return response;
};
async function topicToFilter(
store: NStore,
topic: Stream,
query: Record<string, string>,
pubkey: string | undefined,
host: string,
): Promise<NostrFilter | undefined> {
switch (topic) {
case 'public':
return { kinds: [1, 6, 20] };
case 'public:local':
return { kinds: [1, 6, 20], search: `domain:${host}` };
case 'hashtag':
if (query.tag) return { kinds: [1, 6, 20], '#t': [query.tag] };
break;
case 'hashtag:local':
if (query.tag) return { kinds: [1, 6, 20], '#t': [query.tag], search: `domain:${host}` };
break;
case 'user':
// HACK: this puts the user's entire contacts list into RAM,
// and then calls `matchFilters` over it. Refreshing the page
// is required after following a new user.
return pubkey ? { kinds: [1, 6, 20], authors: [...await getFeedPubkeys(store, pubkey)] } : undefined;
}
}
async function getTokenPubkey(kysely: Kysely<DittoTables>, token: string): Promise<string | undefined> {
if (token.startsWith('token1')) {
const tokenHash = await getTokenHash(token as `token1${string}`);
const row = await kysely
.selectFrom('auth_tokens')
.select('pubkey')
.where('token_hash', '=', tokenHash)
.limit(1)
.executeTakeFirst();
if (!row) {
throw new HTTPException(401, { message: 'Invalid access token' });
}
return row.pubkey;
} else {
return bech32ToPubkey(token);
}
}
export { streamingController };