From 546c0608b7e902332917c93c40d66f3eede7ab5b Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 18 Jun 2024 21:14:02 -0500 Subject: [PATCH] Limit the number of active WebSocket connections by IP --- src/controllers/api/streaming.ts | 24 ++++++++++++++++++++++ src/controllers/nostr/relay.ts | 34 ++++++++++++++++++++++++++++++-- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 552ea3bd..6ac7017a 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -36,6 +36,12 @@ const streamSchema = z.enum([ type Stream = z.infer; +/** Maximum number of connections per IP. */ +const MAX_CONNECTIONS = 5; + +/** Map of IP addresses to connection counts. */ +const connections = new Map(); + const streamingController: AppController = async (c) => { const upgrade = c.req.header('upgrade'); const token = c.req.header('sec-websocket-protocol'); @@ -51,6 +57,13 @@ const streamingController: AppController = async (c) => { return c.json({ error: 'Invalid access token' }, 401); } + const ip = c.req.header('x-real-ip'); + const count = ip ? connections.get(ip) ?? 0 : 0; + + if (count >= MAX_CONNECTIONS) { + return c.text('Too many connections.', 429); + } + const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 }); const store = await Storages.db(); @@ -97,7 +110,9 @@ const streamingController: AppController = async (c) => { } socket.onopen = async () => { + if (ip) connections.set(ip, count + 1); if (!stream) return; + const topicFilter = await topicToFilter(stream, c.req.query(), pubkey); if (topicFilter) { @@ -121,6 +136,15 @@ const streamingController: AppController = async (c) => { socket.onclose = () => { controller.abort(); + + if (ip) { + const next = count - 1; + if (next <= 0) { + connections.delete(ip); + } else { + connections.set(ip, next); + } + } }; return response; diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 5d08e02c..c0b0330a 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -17,9 +17,22 @@ import { Storages } from '@/storages.ts'; /** Limit of initial events returned for a subscription. */ const FILTER_LIMIT = 100; +/** Maximum number of connections per IP. */ +const MAX_CONNECTIONS = 3; + +/** Map of IP addresses to connection counts. */ +const connections = new Map(); + /** Set up the Websocket connection. */ -function connectStream(socket: WebSocket) { +function connectStream(socket: WebSocket, ip?: string) { const controllers = new Map(); + const count = ip ? connections.get(ip) ?? 0 : 0; + + socket.onopen = () => { + if (ip) { + connections.set(ip, count + 1); + } + }; socket.onmessage = (e) => { const result = n.json().pipe(n.clientMsg()).safeParse(e.data); @@ -34,6 +47,15 @@ function connectStream(socket: WebSocket) { for (const controller of controllers.values()) { controller.abort(); } + + if (ip) { + const next = count - 1; + if (next <= 0) { + connections.delete(ip); + } else { + connections.set(ip, next); + } + } }; /** Handle client message. */ @@ -138,8 +160,16 @@ const relayController: AppController = (c, next) => { return c.text('Please use a Nostr client to connect.', 400); } + const ip = c.req.header('x-real-ip'); + if (ip) { + const count = connections.get(ip) ?? 0; + if (count >= MAX_CONNECTIONS) { + return c.text('Too many connections.', 429); + } + } + const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { idleTimeout: 30 }); - connectStream(socket); + connectStream(socket, ip); return response; };