Merge branch 'ws-limit' into 'main'

Rate-limit websocket endpoints

See merge request soapbox-pub/ditto!415
This commit is contained in:
Alex Gleason 2024-07-11 22:11:56 +00:00
commit d0d385682d
2 changed files with 64 additions and 3 deletions

View file

@ -10,9 +10,10 @@ import { MuteListPolicy } from '@/policies/MuteListPolicy.ts';
import { getFeedPubkeys } from '@/queries.ts'; import { getFeedPubkeys } from '@/queries.ts';
import { hydrateEvents } from '@/storages/hydrate.ts'; import { hydrateEvents } from '@/storages/hydrate.ts';
import { Storages } from '@/storages.ts'; import { Storages } from '@/storages.ts';
import { bech32ToPubkey } from '@/utils.ts'; import { bech32ToPubkey, Time } from '@/utils.ts';
import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts'; import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts';
import { renderNotification } from '@/views/mastodon/notifications.ts'; import { renderNotification } from '@/views/mastodon/notifications.ts';
import TTLCache from '@isaacs/ttlcache';
const debug = Debug('ditto:streaming'); const debug = Debug('ditto:streaming');
@ -37,6 +38,11 @@ const streamSchema = z.enum([
type Stream = z.infer<typeof streamSchema>; type Stream = z.infer<typeof streamSchema>;
const LIMITER_WINDOW = Time.minutes(5);
const LIMITER_LIMIT = 100;
const limiter = new TTLCache<string, number>();
const streamingController: AppController = async (c) => { const streamingController: AppController = async (c) => {
const upgrade = c.req.header('upgrade'); const upgrade = c.req.header('upgrade');
const token = c.req.header('sec-websocket-protocol'); const token = c.req.header('sec-websocket-protocol');
@ -52,6 +58,14 @@ const streamingController: AppController = async (c) => {
return c.json({ error: 'Invalid access token' }, 401); return c.json({ error: 'Invalid access token' }, 401);
} }
const ip = c.req.header('x-real-ip');
if (ip) {
const count = limiter.get(ip) ?? 0;
if (count > LIMITER_LIMIT) {
return c.json({ error: 'Rate limit exceeded' }, 429);
}
}
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 }); const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 });
const store = await Storages.db(); const store = await Storages.db();
@ -122,6 +136,23 @@ const streamingController: AppController = async (c) => {
} }
}; };
socket.onmessage = (e) => {
if (ip) {
const count = limiter.get(ip) ?? 0;
limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW });
if (count > LIMITER_LIMIT) {
socket.close(1008, 'Rate limit exceeded');
return;
}
}
if (typeof e.data !== 'string') {
socket.close(1003, 'Invalid message');
return;
}
};
socket.onclose = () => { socket.onclose = () => {
streamingConnectionsGauge.dec(); streamingConnectionsGauge.dec();
controller.abort(); controller.abort();

View file

@ -1,3 +1,4 @@
import TTLCache from '@isaacs/ttlcache';
import { import {
NostrClientCLOSE, NostrClientCLOSE,
NostrClientCOUNT, NostrClientCOUNT,
@ -14,12 +15,18 @@ import { relayConnectionsGauge, relayEventCounter, relayMessageCounter } from '@
import * as pipeline from '@/pipeline.ts'; import * as pipeline from '@/pipeline.ts';
import { RelayError } from '@/RelayError.ts'; import { RelayError } from '@/RelayError.ts';
import { Storages } from '@/storages.ts'; import { Storages } from '@/storages.ts';
import { Time } from '@/utils/time.ts';
/** Limit of initial events returned for a subscription. */ /** Limit of initial events returned for a subscription. */
const FILTER_LIMIT = 100; const FILTER_LIMIT = 100;
const LIMITER_WINDOW = Time.minutes(1);
const LIMITER_LIMIT = 300;
const limiter = new TTLCache<string, number>();
/** Set up the Websocket connection. */ /** Set up the Websocket connection. */
function connectStream(socket: WebSocket) { function connectStream(socket: WebSocket, ip: string | undefined) {
const controllers = new Map<string, AbortController>(); const controllers = new Map<string, AbortController>();
socket.onopen = () => { socket.onopen = () => {
@ -27,6 +34,21 @@ function connectStream(socket: WebSocket) {
}; };
socket.onmessage = (e) => { socket.onmessage = (e) => {
if (ip) {
const count = limiter.get(ip) ?? 0;
limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW });
if (count > LIMITER_LIMIT) {
socket.close(1008, 'Rate limit exceeded');
return;
}
}
if (typeof e.data !== 'string') {
socket.close(1003, 'Invalid message');
return;
}
const result = n.json().pipe(n.clientMsg()).safeParse(e.data); const result = n.json().pipe(n.clientMsg()).safeParse(e.data);
if (result.success) { if (result.success) {
relayMessageCounter.inc({ verb: result.data[0] }); relayMessageCounter.inc({ verb: result.data[0] });
@ -152,8 +174,16 @@ const relayController: AppController = (c, next) => {
return c.text('Please use a Nostr client to connect.', 400); return c.text('Please use a Nostr client to connect.', 400);
} }
const ip = c.req.header('x-real-ip');
if (ip) {
const count = limiter.get(ip) ?? 0;
if (count > LIMITER_LIMIT) {
return c.json({ error: 'Rate limit exceeded' }, 429);
}
}
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { idleTimeout: 30 }); const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { idleTimeout: 30 });
connectStream(socket); connectStream(socket, ip);
return response; return response;
}; };