From 44dfd15502218f171c79a4f5d3698e68cc47a554 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 20 Jun 2024 16:15:31 -0500 Subject: [PATCH 1/3] streamingController: rate-limit with ttl-cache --- src/controllers/api/streaming.ts | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 552ea3bd..b653ab2c 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -9,9 +9,10 @@ import { MuteListPolicy } from '@/policies/MuteListPolicy.ts'; import { getFeedPubkeys } from '@/queries.ts'; import { hydrateEvents } from '@/storages/hydrate.ts'; import { Storages } from '@/storages.ts'; -import { bech32ToPubkey } from '@/utils.ts'; +import { bech32ToPubkey, Time } from '@/utils.ts'; import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts'; import { renderNotification } from '@/views/mastodon/notifications.ts'; +import TTLCache from '@isaacs/ttlcache'; const debug = Debug('ditto:streaming'); @@ -36,6 +37,11 @@ const streamSchema = z.enum([ type Stream = z.infer; +const LIMITER_WINDOW = Time.minutes(5); +const LIMITER_LIMIT = 100; + +const limiter = new TTLCache(); + const streamingController: AppController = async (c) => { const upgrade = c.req.header('upgrade'); const token = c.req.header('sec-websocket-protocol'); @@ -56,6 +62,7 @@ const streamingController: AppController = async (c) => { const store = await Storages.db(); const pubsub = await Storages.pubsub(); + const ip = c.req.header('x-real-ip'); const policy = pubkey ? new MuteListPolicy(pubkey, await Storages.admin()) : undefined; function send(name: string, payload: object) { @@ -119,6 +126,22 @@ const streamingController: AppController = async (c) => { } }; + socket.onmessage = (e) => { + if (ip) { + const count = limiter.get(ip) ?? 0; + limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW }); + + if (count > LIMITER_LIMIT) { + socket.close(1008, 'Rate limit exceeded'); + return; + } + } + + if (typeof e.data !== 'string') { + socket.close(1003, 'Invalid message'); + } + }; + socket.onclose = () => { controller.abort(); }; From 53e7e856c1e3ba0dccac1d9d664f363cfeb84dea Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 20 Jun 2024 16:20:18 -0500 Subject: [PATCH 2/3] streamingController: bail early if limited --- src/controllers/api/streaming.ts | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index b653ab2c..e3ead8c0 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -57,12 +57,19 @@ const streamingController: AppController = async (c) => { return c.json({ error: 'Invalid access token' }, 401); } + const ip = c.req.header('x-real-ip'); + if (ip) { + const count = limiter.get(ip) ?? 0; + if (count > LIMITER_LIMIT) { + return c.json({ error: 'Rate limit exceeded' }, 429); + } + } + const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 }); const store = await Storages.db(); const pubsub = await Storages.pubsub(); - const ip = c.req.header('x-real-ip'); const policy = pubkey ? new MuteListPolicy(pubkey, await Storages.admin()) : undefined; function send(name: string, payload: object) { @@ -139,6 +146,7 @@ const streamingController: AppController = async (c) => { if (typeof e.data !== 'string') { socket.close(1003, 'Invalid message'); + return; } }; From 5c6479b3fea4d3dcae7fa159c1b40fa5c5a6b2ab Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 11 Jul 2024 17:10:05 -0500 Subject: [PATCH 3/3] Rate-limit messages to the relay --- src/controllers/nostr/relay.ts | 34 ++++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 4d8ab2cb..f124360e 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -1,3 +1,4 @@ +import TTLCache from '@isaacs/ttlcache'; import { NostrClientCLOSE, NostrClientCOUNT, @@ -14,12 +15,18 @@ import { relayConnectionsGauge, relayEventCounter, relayMessageCounter } from '@ import * as pipeline from '@/pipeline.ts'; import { RelayError } from '@/RelayError.ts'; import { Storages } from '@/storages.ts'; +import { Time } from '@/utils/time.ts'; /** Limit of initial events returned for a subscription. */ const FILTER_LIMIT = 100; +const LIMITER_WINDOW = Time.minutes(1); +const LIMITER_LIMIT = 300; + +const limiter = new TTLCache(); + /** Set up the Websocket connection. */ -function connectStream(socket: WebSocket) { +function connectStream(socket: WebSocket, ip: string | undefined) { const controllers = new Map(); socket.onopen = () => { @@ -27,6 +34,21 @@ function connectStream(socket: WebSocket) { }; socket.onmessage = (e) => { + if (ip) { + const count = limiter.get(ip) ?? 0; + limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW }); + + if (count > LIMITER_LIMIT) { + socket.close(1008, 'Rate limit exceeded'); + return; + } + } + + if (typeof e.data !== 'string') { + socket.close(1003, 'Invalid message'); + return; + } + const result = n.json().pipe(n.clientMsg()).safeParse(e.data); if (result.success) { relayMessageCounter.inc({ verb: result.data[0] }); @@ -152,8 +174,16 @@ const relayController: AppController = (c, next) => { return c.text('Please use a Nostr client to connect.', 400); } + const ip = c.req.header('x-real-ip'); + if (ip) { + const count = limiter.get(ip) ?? 0; + if (count > LIMITER_LIMIT) { + return c.json({ error: 'Rate limit exceeded' }, 429); + } + } + const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { idleTimeout: 30 }); - connectStream(socket); + connectStream(socket, ip); return response; };