mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 11:29:46 +00:00
relay: stricter rate limits
This commit is contained in:
parent
68a0ef6648
commit
43a47770f4
1 changed files with 46 additions and 16 deletions
|
|
@ -1,6 +1,6 @@
|
||||||
import { Stickynotes } from '@soapbox/stickynotes';
|
import { Stickynotes } from '@soapbox/stickynotes';
|
||||||
import TTLCache from '@isaacs/ttlcache';
|
|
||||||
import {
|
import {
|
||||||
|
NKinds,
|
||||||
NostrClientCLOSE,
|
NostrClientCLOSE,
|
||||||
NostrClientCOUNT,
|
NostrClientCOUNT,
|
||||||
NostrClientEVENT,
|
NostrClientEVENT,
|
||||||
|
|
@ -19,14 +19,27 @@ import { RelayError } from '@/RelayError.ts';
|
||||||
import { Storages } from '@/storages.ts';
|
import { Storages } from '@/storages.ts';
|
||||||
import { Time } from '@/utils/time.ts';
|
import { Time } from '@/utils/time.ts';
|
||||||
import { purifyEvent } from '@/utils/purify.ts';
|
import { purifyEvent } from '@/utils/purify.ts';
|
||||||
|
import { MemoryRateLimiter } from '@/utils/ratelimiter/MemoryRateLimiter.ts';
|
||||||
|
import { MultiRateLimiter } from '@/utils/ratelimiter/MultiRateLimiter.ts';
|
||||||
|
import { RateLimiter } from '@/utils/ratelimiter/types.ts';
|
||||||
|
|
||||||
/** Limit of initial events returned for a subscription. */
|
/** Limit of initial events returned for a subscription. */
|
||||||
const FILTER_LIMIT = 100;
|
const FILTER_LIMIT = 100;
|
||||||
|
|
||||||
const LIMITER_WINDOW = Time.minutes(1);
|
const limiters = {
|
||||||
const LIMITER_LIMIT = 300;
|
msg: new MemoryRateLimiter({ limit: 300, window: Time.minutes(1) }),
|
||||||
|
req: new MultiRateLimiter([
|
||||||
const limiter = new TTLCache<string, number>();
|
new MemoryRateLimiter({ limit: 15, window: Time.seconds(5) }),
|
||||||
|
new MemoryRateLimiter({ limit: 300, window: Time.minutes(5) }),
|
||||||
|
new MemoryRateLimiter({ limit: 1000, window: Time.hours(1) }),
|
||||||
|
]),
|
||||||
|
event: new MultiRateLimiter([
|
||||||
|
new MemoryRateLimiter({ limit: 10, window: Time.seconds(10) }),
|
||||||
|
new MemoryRateLimiter({ limit: 100, window: Time.hours(1) }),
|
||||||
|
new MemoryRateLimiter({ limit: 500, window: Time.days(1) }),
|
||||||
|
]),
|
||||||
|
ephemeral: new MemoryRateLimiter({ limit: 30, window: Time.seconds(10) }),
|
||||||
|
};
|
||||||
|
|
||||||
/** Connections for metrics purposes. */
|
/** Connections for metrics purposes. */
|
||||||
const connections = new Set<WebSocket>();
|
const connections = new Set<WebSocket>();
|
||||||
|
|
@ -43,15 +56,7 @@ function connectStream(socket: WebSocket, ip: string | undefined) {
|
||||||
};
|
};
|
||||||
|
|
||||||
socket.onmessage = (e) => {
|
socket.onmessage = (e) => {
|
||||||
if (ip) {
|
assertRateLimit(limiters.msg);
|
||||||
const count = limiter.get(ip) ?? 0;
|
|
||||||
limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW });
|
|
||||||
|
|
||||||
if (count > LIMITER_LIMIT) {
|
|
||||||
socket.close(1008, 'Rate limit exceeded');
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (typeof e.data !== 'string') {
|
if (typeof e.data !== 'string') {
|
||||||
socket.close(1003, 'Invalid message');
|
socket.close(1003, 'Invalid message');
|
||||||
|
|
@ -77,6 +82,18 @@ function connectStream(socket: WebSocket, ip: string | undefined) {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
function assertRateLimit(limiter: Pick<RateLimiter, 'client'>): void {
|
||||||
|
if (ip) {
|
||||||
|
const client = limiter.client(ip);
|
||||||
|
try {
|
||||||
|
client.hit();
|
||||||
|
} catch (error) {
|
||||||
|
socket.close(1008, 'Rate limit exceeded');
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Handle client message. */
|
/** Handle client message. */
|
||||||
function handleMsg(msg: NostrClientMsg) {
|
function handleMsg(msg: NostrClientMsg) {
|
||||||
switch (msg[0]) {
|
switch (msg[0]) {
|
||||||
|
|
@ -97,6 +114,8 @@ function connectStream(socket: WebSocket, ip: string | undefined) {
|
||||||
|
|
||||||
/** Handle REQ. Start a subscription. */
|
/** Handle REQ. Start a subscription. */
|
||||||
async function handleReq([_, subId, ...filters]: NostrClientREQ): Promise<void> {
|
async function handleReq([_, subId, ...filters]: NostrClientREQ): Promise<void> {
|
||||||
|
assertRateLimit(limiters.req);
|
||||||
|
|
||||||
const controller = new AbortController();
|
const controller = new AbortController();
|
||||||
controllers.get(subId)?.abort();
|
controllers.get(subId)?.abort();
|
||||||
controllers.set(subId, controller);
|
controllers.set(subId, controller);
|
||||||
|
|
@ -136,6 +155,13 @@ function connectStream(socket: WebSocket, ip: string | undefined) {
|
||||||
/** Handle EVENT. Store the event. */
|
/** Handle EVENT. Store the event. */
|
||||||
async function handleEvent([_, event]: NostrClientEVENT): Promise<void> {
|
async function handleEvent([_, event]: NostrClientEVENT): Promise<void> {
|
||||||
relayEventsCounter.inc({ kind: event.kind.toString() });
|
relayEventsCounter.inc({ kind: event.kind.toString() });
|
||||||
|
|
||||||
|
if (NKinds.ephemeral(event.kind)) {
|
||||||
|
assertRateLimit(limiters.ephemeral);
|
||||||
|
} else {
|
||||||
|
assertRateLimit(limiters.event);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// This will store it (if eligible) and run other side-effects.
|
// This will store it (if eligible) and run other side-effects.
|
||||||
await pipeline.handleEvent(purifyEvent(event), { source: 'relay', signal: AbortSignal.timeout(1000) });
|
await pipeline.handleEvent(purifyEvent(event), { source: 'relay', signal: AbortSignal.timeout(1000) });
|
||||||
|
|
@ -161,6 +187,7 @@ function connectStream(socket: WebSocket, ip: string | undefined) {
|
||||||
|
|
||||||
/** Handle COUNT. Return the number of events matching the filters. */
|
/** Handle COUNT. Return the number of events matching the filters. */
|
||||||
async function handleCount([_, subId, ...filters]: NostrClientCOUNT): Promise<void> {
|
async function handleCount([_, subId, ...filters]: NostrClientCOUNT): Promise<void> {
|
||||||
|
assertRateLimit(limiters.req);
|
||||||
const store = await Storages.db();
|
const store = await Storages.db();
|
||||||
const { count } = await store.count(filters, { timeout: Conf.db.timeouts.relay });
|
const { count } = await store.count(filters, { timeout: Conf.db.timeouts.relay });
|
||||||
send(['COUNT', subId, { count, approximate: false }]);
|
send(['COUNT', subId, { count, approximate: false }]);
|
||||||
|
|
@ -188,8 +215,11 @@ const relayController: AppController = (c, next) => {
|
||||||
|
|
||||||
const ip = c.req.header('x-real-ip');
|
const ip = c.req.header('x-real-ip');
|
||||||
if (ip) {
|
if (ip) {
|
||||||
const count = limiter.get(ip) ?? 0;
|
const remaining = Object
|
||||||
if (count > LIMITER_LIMIT) {
|
.values(limiters)
|
||||||
|
.reduce((acc, limiter) => Math.min(acc, limiter.client(ip).remaining), Infinity);
|
||||||
|
|
||||||
|
if (remaining < 0) {
|
||||||
return c.json({ error: 'Rate limit exceeded' }, 429);
|
return c.json({ error: 'Rate limit exceeded' }, 429);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue