diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 552ea3bd..b653ab2c 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -9,9 +9,10 @@ import { MuteListPolicy } from '@/policies/MuteListPolicy.ts'; import { getFeedPubkeys } from '@/queries.ts'; import { hydrateEvents } from '@/storages/hydrate.ts'; import { Storages } from '@/storages.ts'; -import { bech32ToPubkey } from '@/utils.ts'; +import { bech32ToPubkey, Time } from '@/utils.ts'; import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts'; import { renderNotification } from '@/views/mastodon/notifications.ts'; +import TTLCache from '@isaacs/ttlcache'; const debug = Debug('ditto:streaming'); @@ -36,6 +37,11 @@ const streamSchema = z.enum([ type Stream = z.infer; +const LIMITER_WINDOW = Time.minutes(5); +const LIMITER_LIMIT = 100; + +const limiter = new TTLCache(); + const streamingController: AppController = async (c) => { const upgrade = c.req.header('upgrade'); const token = c.req.header('sec-websocket-protocol'); @@ -56,6 +62,7 @@ const streamingController: AppController = async (c) => { const store = await Storages.db(); const pubsub = await Storages.pubsub(); + const ip = c.req.header('x-real-ip'); const policy = pubkey ? new MuteListPolicy(pubkey, await Storages.admin()) : undefined; function send(name: string, payload: object) { @@ -119,6 +126,22 @@ const streamingController: AppController = async (c) => { } }; + socket.onmessage = (e) => { + if (ip) { + const count = limiter.get(ip) ?? 0; + limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW }); + + if (count > LIMITER_LIMIT) { + socket.close(1008, 'Rate limit exceeded'); + return; + } + } + + if (typeof e.data !== 'string') { + socket.close(1003, 'Invalid message'); + } + }; + socket.onclose = () => { controller.abort(); };