diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 74bd8a56..93ffb199 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -1,6 +1,6 @@ import { Stickynotes } from '@soapbox/stickynotes'; -import TTLCache from '@isaacs/ttlcache'; import { + NKinds, NostrClientCLOSE, NostrClientCOUNT, NostrClientEVENT, @@ -19,14 +19,27 @@ import { RelayError } from '@/RelayError.ts'; import { Storages } from '@/storages.ts'; import { Time } from '@/utils/time.ts'; import { purifyEvent } from '@/utils/purify.ts'; +import { MemoryRateLimiter } from '@/utils/ratelimiter/MemoryRateLimiter.ts'; +import { MultiRateLimiter } from '@/utils/ratelimiter/MultiRateLimiter.ts'; +import { RateLimiter } from '@/utils/ratelimiter/types.ts'; /** Limit of initial events returned for a subscription. */ const FILTER_LIMIT = 100; -const LIMITER_WINDOW = Time.minutes(1); -const LIMITER_LIMIT = 300; - -const limiter = new TTLCache(); +const limiters = { + msg: new MemoryRateLimiter({ limit: 300, window: Time.minutes(1) }), + req: new MultiRateLimiter([ + new MemoryRateLimiter({ limit: 15, window: Time.seconds(5) }), + new MemoryRateLimiter({ limit: 300, window: Time.minutes(5) }), + new MemoryRateLimiter({ limit: 1000, window: Time.hours(1) }), + ]), + event: new MultiRateLimiter([ + new MemoryRateLimiter({ limit: 10, window: Time.seconds(10) }), + new MemoryRateLimiter({ limit: 100, window: Time.hours(1) }), + new MemoryRateLimiter({ limit: 500, window: Time.days(1) }), + ]), + ephemeral: new MemoryRateLimiter({ limit: 30, window: Time.seconds(10) }), +}; /** Connections for metrics purposes. */ const connections = new Set(); @@ -43,15 +56,7 @@ function connectStream(socket: WebSocket, ip: string | undefined) { }; socket.onmessage = (e) => { - if (ip) { - const count = limiter.get(ip) ?? 0; - limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW }); - - if (count > LIMITER_LIMIT) { - socket.close(1008, 'Rate limit exceeded'); - return; - } - } + assertRateLimit(limiters.msg); if (typeof e.data !== 'string') { socket.close(1003, 'Invalid message'); @@ -77,6 +82,18 @@ function connectStream(socket: WebSocket, ip: string | undefined) { } }; + function assertRateLimit(limiter: Pick): void { + if (ip) { + const client = limiter.client(ip); + try { + client.hit(); + } catch (error) { + socket.close(1008, 'Rate limit exceeded'); + throw error; + } + } + } + /** Handle client message. */ function handleMsg(msg: NostrClientMsg) { switch (msg[0]) { @@ -97,6 +114,8 @@ function connectStream(socket: WebSocket, ip: string | undefined) { /** Handle REQ. Start a subscription. */ async function handleReq([_, subId, ...filters]: NostrClientREQ): Promise { + assertRateLimit(limiters.req); + const controller = new AbortController(); controllers.get(subId)?.abort(); controllers.set(subId, controller); @@ -136,6 +155,13 @@ function connectStream(socket: WebSocket, ip: string | undefined) { /** Handle EVENT. Store the event. */ async function handleEvent([_, event]: NostrClientEVENT): Promise { relayEventsCounter.inc({ kind: event.kind.toString() }); + + if (NKinds.ephemeral(event.kind)) { + assertRateLimit(limiters.ephemeral); + } else { + assertRateLimit(limiters.event); + } + try { // This will store it (if eligible) and run other side-effects. await pipeline.handleEvent(purifyEvent(event), { source: 'relay', signal: AbortSignal.timeout(1000) }); @@ -161,6 +187,7 @@ function connectStream(socket: WebSocket, ip: string | undefined) { /** Handle COUNT. Return the number of events matching the filters. */ async function handleCount([_, subId, ...filters]: NostrClientCOUNT): Promise { + assertRateLimit(limiters.req); const store = await Storages.db(); const { count } = await store.count(filters, { timeout: Conf.db.timeouts.relay }); send(['COUNT', subId, { count, approximate: false }]); @@ -188,8 +215,11 @@ const relayController: AppController = (c, next) => { const ip = c.req.header('x-real-ip'); if (ip) { - const count = limiter.get(ip) ?? 0; - if (count > LIMITER_LIMIT) { + const remaining = Object + .values(limiters) + .reduce((acc, limiter) => Math.min(acc, limiter.client(ip).remaining), Infinity); + + if (remaining < 0) { return c.json({ error: 'Rate limit exceeded' }, 429); } }