From 12de164a4fac6ca6ad03e4ce2e28663ccc669461 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sat, 25 Jan 2025 13:36:49 -0600 Subject: [PATCH 1/5] Add a custom RateLimiter implementation --- src/utils/ratelimiter/MemoryRateLimiter.ts | 77 ++++++++++++++++++++++ src/utils/ratelimiter/RateLimitError.ts | 10 +++ src/utils/ratelimiter/types.ts | 12 ++++ 3 files changed, 99 insertions(+) create mode 100644 src/utils/ratelimiter/MemoryRateLimiter.ts create mode 100644 src/utils/ratelimiter/RateLimitError.ts create mode 100644 src/utils/ratelimiter/types.ts diff --git a/src/utils/ratelimiter/MemoryRateLimiter.ts b/src/utils/ratelimiter/MemoryRateLimiter.ts new file mode 100644 index 00000000..b3f14d81 --- /dev/null +++ b/src/utils/ratelimiter/MemoryRateLimiter.ts @@ -0,0 +1,77 @@ +import { RateLimitError } from './RateLimitError.ts'; +import { RateLimiter, RateLimiterClient } from './types.ts'; + +interface MemoryRateLimiterOpts { + limit: number; + window: number; +} + +export class MemoryRateLimiter implements RateLimiter { + private iid: number; + + private previous = new Map(); + private current = new Map(); + + constructor(private opts: MemoryRateLimiterOpts) { + this.iid = setInterval(() => { + this.previous = this.current; + this.current = new Map(); + }, opts.window); + } + + get limit(): number { + return this.opts.limit; + } + + get window(): number { + return this.opts.window; + } + + client(key: string): RateLimiterClient { + const curr = this.current.get(key); + const prev = this.previous.get(key); + + if (curr) { + return curr; + } + + if (prev) { + this.current.set(key, prev); + this.previous.delete(key); + return prev; + } + + const next = new MemoryRateLimiterClient(this); + this.current.set(key, next); + return next; + } + + [Symbol.dispose](): void { + clearInterval(this.iid); + } +} + +class MemoryRateLimiterClient implements RateLimiterClient { + private _hits: number = 0; + readonly resetAt: Date; + + constructor(private limiter: MemoryRateLimiter) { + this.resetAt = new Date(Date.now() + limiter.window); + } + + get hits(): number { + return this._hits; + } + + get remaining(): number { + return this.limiter.limit - this.hits; + } + + hit(n: number = 1): void { + this._hits += n; + + if (this.remaining < 0) { + throw new RateLimitError(this.limiter, this); + } + } +} diff --git a/src/utils/ratelimiter/RateLimitError.ts b/src/utils/ratelimiter/RateLimitError.ts new file mode 100644 index 00000000..ce21af72 --- /dev/null +++ b/src/utils/ratelimiter/RateLimitError.ts @@ -0,0 +1,10 @@ +import { RateLimiter, RateLimiterClient } from './types.ts'; + +export class RateLimitError extends Error { + constructor( + readonly limiter: RateLimiter, + readonly client: RateLimiterClient, + ) { + super('Rate limit exceeded'); + } +} diff --git a/src/utils/ratelimiter/types.ts b/src/utils/ratelimiter/types.ts new file mode 100644 index 00000000..c1a6b2f0 --- /dev/null +++ b/src/utils/ratelimiter/types.ts @@ -0,0 +1,12 @@ +export interface RateLimiter extends Disposable { + readonly limit: number; + readonly window: number; + client(key: string): RateLimiterClient; +} + +export interface RateLimiterClient { + readonly hits: number; + readonly resetAt: Date; + readonly remaining: number; + hit(n?: number): void; +} From 68a0ef664819c9d2755ba3eaa369b3be182a3ce8 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sat, 25 Jan 2025 15:20:52 -0600 Subject: [PATCH 2/5] Add ratelimiter tests --- .../ratelimiter/MemoryRateLimiter.test.ts | 31 +++++++++++++ src/utils/ratelimiter/MemoryRateLimiter.ts | 2 +- .../ratelimiter/MultiRateLimiter.test.ts | 39 ++++++++++++++++ src/utils/ratelimiter/MultiRateLimiter.ts | 45 +++++++++++++++++++ 4 files changed, 116 insertions(+), 1 deletion(-) create mode 100644 src/utils/ratelimiter/MemoryRateLimiter.test.ts create mode 100644 src/utils/ratelimiter/MultiRateLimiter.test.ts create mode 100644 src/utils/ratelimiter/MultiRateLimiter.ts diff --git a/src/utils/ratelimiter/MemoryRateLimiter.test.ts b/src/utils/ratelimiter/MemoryRateLimiter.test.ts new file mode 100644 index 00000000..2da6b2d1 --- /dev/null +++ b/src/utils/ratelimiter/MemoryRateLimiter.test.ts @@ -0,0 +1,31 @@ +import { assertEquals, assertThrows } from '@std/assert'; + +import { MemoryRateLimiter } from './MemoryRateLimiter.ts'; +import { RateLimitError } from './RateLimitError.ts'; + +Deno.test('MemoryRateLimiter', async (t) => { + const limit = 5; + const window = 100; + + using limiter = new MemoryRateLimiter({ limit, window }); + + await t.step('can hit up to limit', () => { + for (let i = 0; i < limit; i++) { + const client = limiter.client('test'); + assertEquals(client.hits, i); + client.hit(); + } + }); + + await t.step('throws when hit if limit exceeded', () => { + assertThrows(() => limiter.client('test').hit(), RateLimitError); + }); + + await t.step('can hit after window resets', async () => { + await new Promise((resolve) => setTimeout(resolve, window + 1)); + + const client = limiter.client('test'); + assertEquals(client.hits, 0); + client.hit(); + }); +}); diff --git a/src/utils/ratelimiter/MemoryRateLimiter.ts b/src/utils/ratelimiter/MemoryRateLimiter.ts index b3f14d81..0eaa5540 100644 --- a/src/utils/ratelimiter/MemoryRateLimiter.ts +++ b/src/utils/ratelimiter/MemoryRateLimiter.ts @@ -35,7 +35,7 @@ export class MemoryRateLimiter implements RateLimiter { return curr; } - if (prev) { + if (prev && prev.resetAt > new Date()) { this.current.set(key, prev); this.previous.delete(key); return prev; diff --git a/src/utils/ratelimiter/MultiRateLimiter.test.ts b/src/utils/ratelimiter/MultiRateLimiter.test.ts new file mode 100644 index 00000000..3cfa4696 --- /dev/null +++ b/src/utils/ratelimiter/MultiRateLimiter.test.ts @@ -0,0 +1,39 @@ +import { assertEquals, assertThrows } from '@std/assert'; + +import { MemoryRateLimiter } from './MemoryRateLimiter.ts'; +import { MultiRateLimiter } from './MultiRateLimiter.ts'; + +Deno.test('MultiRateLimiter', async (t) => { + using limiter1 = new MemoryRateLimiter({ limit: 5, window: 100 }); + using limiter2 = new MemoryRateLimiter({ limit: 8, window: 200 }); + + const limiter = new MultiRateLimiter([limiter1, limiter2]); + + await t.step('can hit up to first limit', () => { + for (let i = 0; i < limiter1.limit; i++) { + const client = limiter.client('test'); + assertEquals(client.hits, i); + client.hit(); + } + }); + + await t.step('throws when hit if first limit exceeded', () => { + assertThrows(() => limiter.client('test').hit(), Error); + }); + + await t.step('can hit up to second limit after the first window resets', async () => { + await new Promise((resolve) => setTimeout(resolve, limiter1.window + 1)); + + const limit = limiter2.limit - limiter1.limit - 1; + + for (let i = 0; i < limit; i++) { + const client = limiter.client('test'); + assertEquals(client.hits, i); + client.hit(); + } + }); + + await t.step('throws when hit if second limit exceeded', () => { + assertThrows(() => limiter.client('test').hit(), Error); + }); +}); diff --git a/src/utils/ratelimiter/MultiRateLimiter.ts b/src/utils/ratelimiter/MultiRateLimiter.ts new file mode 100644 index 00000000..dc9b62a7 --- /dev/null +++ b/src/utils/ratelimiter/MultiRateLimiter.ts @@ -0,0 +1,45 @@ +import { RateLimiter, RateLimiterClient } from './types.ts'; + +export class MultiRateLimiter { + constructor(private limiters: RateLimiter[]) {} + + client(key: string): RateLimiterClient { + return new MultiRateLimiterClient(key, this.limiters); + } +} + +class MultiRateLimiterClient implements RateLimiterClient { + constructor(private key: string, private limiters: RateLimiter[]) { + if (!limiters.length) { + throw new Error('No limiters provided'); + } + } + + get hits(): number { + return this.limiters[0].client(this.key).hits; + } + + get resetAt(): Date { + return this.limiters[0].client(this.key).resetAt; + } + + get remaining(): number { + return this.limiters[0].client(this.key).remaining; + } + + hit(n?: number): void { + let error: unknown; + + for (const limiter of this.limiters) { + try { + limiter.client(this.key).hit(n); + } catch (e) { + error ??= e; + } + } + + if (error instanceof Error) { + throw error; + } + } +} From 43a47770f4d8cc44c7e8142c796aee3f5f6a78ca Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sat, 25 Jan 2025 15:21:16 -0600 Subject: [PATCH 3/5] relay: stricter rate limits --- src/controllers/nostr/relay.ts | 62 +++++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 74bd8a56..93ffb199 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -1,6 +1,6 @@ import { Stickynotes } from '@soapbox/stickynotes'; -import TTLCache from '@isaacs/ttlcache'; import { + NKinds, NostrClientCLOSE, NostrClientCOUNT, NostrClientEVENT, @@ -19,14 +19,27 @@ import { RelayError } from '@/RelayError.ts'; import { Storages } from '@/storages.ts'; import { Time } from '@/utils/time.ts'; import { purifyEvent } from '@/utils/purify.ts'; +import { MemoryRateLimiter } from '@/utils/ratelimiter/MemoryRateLimiter.ts'; +import { MultiRateLimiter } from '@/utils/ratelimiter/MultiRateLimiter.ts'; +import { RateLimiter } from '@/utils/ratelimiter/types.ts'; /** Limit of initial events returned for a subscription. */ const FILTER_LIMIT = 100; -const LIMITER_WINDOW = Time.minutes(1); -const LIMITER_LIMIT = 300; - -const limiter = new TTLCache(); +const limiters = { + msg: new MemoryRateLimiter({ limit: 300, window: Time.minutes(1) }), + req: new MultiRateLimiter([ + new MemoryRateLimiter({ limit: 15, window: Time.seconds(5) }), + new MemoryRateLimiter({ limit: 300, window: Time.minutes(5) }), + new MemoryRateLimiter({ limit: 1000, window: Time.hours(1) }), + ]), + event: new MultiRateLimiter([ + new MemoryRateLimiter({ limit: 10, window: Time.seconds(10) }), + new MemoryRateLimiter({ limit: 100, window: Time.hours(1) }), + new MemoryRateLimiter({ limit: 500, window: Time.days(1) }), + ]), + ephemeral: new MemoryRateLimiter({ limit: 30, window: Time.seconds(10) }), +}; /** Connections for metrics purposes. */ const connections = new Set(); @@ -43,15 +56,7 @@ function connectStream(socket: WebSocket, ip: string | undefined) { }; socket.onmessage = (e) => { - if (ip) { - const count = limiter.get(ip) ?? 0; - limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW }); - - if (count > LIMITER_LIMIT) { - socket.close(1008, 'Rate limit exceeded'); - return; - } - } + assertRateLimit(limiters.msg); if (typeof e.data !== 'string') { socket.close(1003, 'Invalid message'); @@ -77,6 +82,18 @@ function connectStream(socket: WebSocket, ip: string | undefined) { } }; + function assertRateLimit(limiter: Pick): void { + if (ip) { + const client = limiter.client(ip); + try { + client.hit(); + } catch (error) { + socket.close(1008, 'Rate limit exceeded'); + throw error; + } + } + } + /** Handle client message. */ function handleMsg(msg: NostrClientMsg) { switch (msg[0]) { @@ -97,6 +114,8 @@ function connectStream(socket: WebSocket, ip: string | undefined) { /** Handle REQ. Start a subscription. */ async function handleReq([_, subId, ...filters]: NostrClientREQ): Promise { + assertRateLimit(limiters.req); + const controller = new AbortController(); controllers.get(subId)?.abort(); controllers.set(subId, controller); @@ -136,6 +155,13 @@ function connectStream(socket: WebSocket, ip: string | undefined) { /** Handle EVENT. Store the event. */ async function handleEvent([_, event]: NostrClientEVENT): Promise { relayEventsCounter.inc({ kind: event.kind.toString() }); + + if (NKinds.ephemeral(event.kind)) { + assertRateLimit(limiters.ephemeral); + } else { + assertRateLimit(limiters.event); + } + try { // This will store it (if eligible) and run other side-effects. await pipeline.handleEvent(purifyEvent(event), { source: 'relay', signal: AbortSignal.timeout(1000) }); @@ -161,6 +187,7 @@ function connectStream(socket: WebSocket, ip: string | undefined) { /** Handle COUNT. Return the number of events matching the filters. */ async function handleCount([_, subId, ...filters]: NostrClientCOUNT): Promise { + assertRateLimit(limiters.req); const store = await Storages.db(); const { count } = await store.count(filters, { timeout: Conf.db.timeouts.relay }); send(['COUNT', subId, { count, approximate: false }]); @@ -188,8 +215,11 @@ const relayController: AppController = (c, next) => { const ip = c.req.header('x-real-ip'); if (ip) { - const count = limiter.get(ip) ?? 0; - if (count > LIMITER_LIMIT) { + const remaining = Object + .values(limiters) + .reduce((acc, limiter) => Math.min(acc, limiter.client(ip).remaining), Infinity); + + if (remaining < 0) { return c.json({ error: 'Rate limit exceeded' }, 429); } } From fd312032a4188ddfd658355933169dafc8406f87 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sat, 25 Jan 2025 15:31:49 -0600 Subject: [PATCH 4/5] MultiRateLimiter: ensure the active limiter is used for ratelimit values --- src/utils/ratelimiter/MultiRateLimiter.test.ts | 2 ++ src/utils/ratelimiter/MultiRateLimiter.ts | 14 ++++++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/utils/ratelimiter/MultiRateLimiter.test.ts b/src/utils/ratelimiter/MultiRateLimiter.test.ts index 3cfa4696..9b1fd648 100644 --- a/src/utils/ratelimiter/MultiRateLimiter.test.ts +++ b/src/utils/ratelimiter/MultiRateLimiter.test.ts @@ -34,6 +34,8 @@ Deno.test('MultiRateLimiter', async (t) => { }); await t.step('throws when hit if second limit exceeded', () => { + assertEquals(limiter.client('test').limiter, limiter1); assertThrows(() => limiter.client('test').hit(), Error); + assertEquals(limiter.client('test').limiter, limiter2); }); }); diff --git a/src/utils/ratelimiter/MultiRateLimiter.ts b/src/utils/ratelimiter/MultiRateLimiter.ts index dc9b62a7..14b23142 100644 --- a/src/utils/ratelimiter/MultiRateLimiter.ts +++ b/src/utils/ratelimiter/MultiRateLimiter.ts @@ -3,7 +3,7 @@ import { RateLimiter, RateLimiterClient } from './types.ts'; export class MultiRateLimiter { constructor(private limiters: RateLimiter[]) {} - client(key: string): RateLimiterClient { + client(key: string): MultiRateLimiterClient { return new MultiRateLimiterClient(key, this.limiters); } } @@ -15,16 +15,22 @@ class MultiRateLimiterClient implements RateLimiterClient { } } + /** Returns the _active_ limiter, which is either the first exceeded or the first. */ + get limiter(): RateLimiter { + const exceeded = this.limiters.find((limiter) => limiter.client(this.key).remaining < 0); + return exceeded ?? this.limiters[0]; + } + get hits(): number { - return this.limiters[0].client(this.key).hits; + return this.limiter.client(this.key).hits; } get resetAt(): Date { - return this.limiters[0].client(this.key).resetAt; + return this.limiter.client(this.key).resetAt; } get remaining(): number { - return this.limiters[0].client(this.key).remaining; + return this.limiter.client(this.key).remaining; } hit(n?: number): void { From 7601cfa4309f0e55f16e9b3a5f53b28ba817d0fc Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sat, 25 Jan 2025 15:37:09 -0600 Subject: [PATCH 5/5] Don't throw inside the websocket callbacks because that crashes the whole application --- src/controllers/nostr/relay.ts | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 93ffb199..6b1c2fbc 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -56,7 +56,7 @@ function connectStream(socket: WebSocket, ip: string | undefined) { }; socket.onmessage = (e) => { - assertRateLimit(limiters.msg); + if (rateLimited(limiters.msg)) return; if (typeof e.data !== 'string') { socket.close(1003, 'Invalid message'); @@ -82,16 +82,17 @@ function connectStream(socket: WebSocket, ip: string | undefined) { } }; - function assertRateLimit(limiter: Pick): void { + function rateLimited(limiter: Pick): boolean { if (ip) { const client = limiter.client(ip); try { client.hit(); - } catch (error) { + } catch { socket.close(1008, 'Rate limit exceeded'); - throw error; + return true; } } + return false; } /** Handle client message. */ @@ -114,7 +115,7 @@ function connectStream(socket: WebSocket, ip: string | undefined) { /** Handle REQ. Start a subscription. */ async function handleReq([_, subId, ...filters]: NostrClientREQ): Promise { - assertRateLimit(limiters.req); + if (rateLimited(limiters.req)) return; const controller = new AbortController(); controllers.get(subId)?.abort(); @@ -156,11 +157,8 @@ function connectStream(socket: WebSocket, ip: string | undefined) { async function handleEvent([_, event]: NostrClientEVENT): Promise { relayEventsCounter.inc({ kind: event.kind.toString() }); - if (NKinds.ephemeral(event.kind)) { - assertRateLimit(limiters.ephemeral); - } else { - assertRateLimit(limiters.event); - } + const limiter = NKinds.ephemeral(event.kind) ? limiters.ephemeral : limiters.event; + if (rateLimited(limiter)) return; try { // This will store it (if eligible) and run other side-effects. @@ -187,7 +185,7 @@ function connectStream(socket: WebSocket, ip: string | undefined) { /** Handle COUNT. Return the number of events matching the filters. */ async function handleCount([_, subId, ...filters]: NostrClientCOUNT): Promise { - assertRateLimit(limiters.req); + if (rateLimited(limiters.req)) return; const store = await Storages.db(); const { count } = await store.count(filters, { timeout: Conf.db.timeouts.relay }); send(['COUNT', subId, { count, approximate: false }]);