diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 4d8ab2cb..f124360e 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -1,3 +1,4 @@ +import TTLCache from '@isaacs/ttlcache'; import { NostrClientCLOSE, NostrClientCOUNT, @@ -14,12 +15,18 @@ import { relayConnectionsGauge, relayEventCounter, relayMessageCounter } from '@ import * as pipeline from '@/pipeline.ts'; import { RelayError } from '@/RelayError.ts'; import { Storages } from '@/storages.ts'; +import { Time } from '@/utils/time.ts'; /** Limit of initial events returned for a subscription. */ const FILTER_LIMIT = 100; +const LIMITER_WINDOW = Time.minutes(1); +const LIMITER_LIMIT = 300; + +const limiter = new TTLCache(); + /** Set up the Websocket connection. */ -function connectStream(socket: WebSocket) { +function connectStream(socket: WebSocket, ip: string | undefined) { const controllers = new Map(); socket.onopen = () => { @@ -27,6 +34,21 @@ function connectStream(socket: WebSocket) { }; socket.onmessage = (e) => { + if (ip) { + const count = limiter.get(ip) ?? 0; + limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW }); + + if (count > LIMITER_LIMIT) { + socket.close(1008, 'Rate limit exceeded'); + return; + } + } + + if (typeof e.data !== 'string') { + socket.close(1003, 'Invalid message'); + return; + } + const result = n.json().pipe(n.clientMsg()).safeParse(e.data); if (result.success) { relayMessageCounter.inc({ verb: result.data[0] }); @@ -152,8 +174,16 @@ const relayController: AppController = (c, next) => { return c.text('Please use a Nostr client to connect.', 400); } + const ip = c.req.header('x-real-ip'); + if (ip) { + const count = limiter.get(ip) ?? 0; + if (count > LIMITER_LIMIT) { + return c.json({ error: 'Rate limit exceeded' }, 429); + } + } + const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { idleTimeout: 30 }); - connectStream(socket); + connectStream(socket, ip); return response; };