mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 11:29:46 +00:00
Limit the number of active WebSocket connections by IP
This commit is contained in:
parent
6ee9eb63d3
commit
546c0608b7
2 changed files with 56 additions and 2 deletions
|
|
@ -36,6 +36,12 @@ const streamSchema = z.enum([
|
||||||
|
|
||||||
type Stream = z.infer<typeof streamSchema>;
|
type Stream = z.infer<typeof streamSchema>;
|
||||||
|
|
||||||
|
/** Maximum number of connections per IP. */
|
||||||
|
const MAX_CONNECTIONS = 5;
|
||||||
|
|
||||||
|
/** Map of IP addresses to connection counts. */
|
||||||
|
const connections = new Map<string, number>();
|
||||||
|
|
||||||
const streamingController: AppController = async (c) => {
|
const streamingController: AppController = async (c) => {
|
||||||
const upgrade = c.req.header('upgrade');
|
const upgrade = c.req.header('upgrade');
|
||||||
const token = c.req.header('sec-websocket-protocol');
|
const token = c.req.header('sec-websocket-protocol');
|
||||||
|
|
@ -51,6 +57,13 @@ const streamingController: AppController = async (c) => {
|
||||||
return c.json({ error: 'Invalid access token' }, 401);
|
return c.json({ error: 'Invalid access token' }, 401);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const ip = c.req.header('x-real-ip');
|
||||||
|
const count = ip ? connections.get(ip) ?? 0 : 0;
|
||||||
|
|
||||||
|
if (count >= MAX_CONNECTIONS) {
|
||||||
|
return c.text('Too many connections.', 429);
|
||||||
|
}
|
||||||
|
|
||||||
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 });
|
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 });
|
||||||
|
|
||||||
const store = await Storages.db();
|
const store = await Storages.db();
|
||||||
|
|
@ -97,7 +110,9 @@ const streamingController: AppController = async (c) => {
|
||||||
}
|
}
|
||||||
|
|
||||||
socket.onopen = async () => {
|
socket.onopen = async () => {
|
||||||
|
if (ip) connections.set(ip, count + 1);
|
||||||
if (!stream) return;
|
if (!stream) return;
|
||||||
|
|
||||||
const topicFilter = await topicToFilter(stream, c.req.query(), pubkey);
|
const topicFilter = await topicToFilter(stream, c.req.query(), pubkey);
|
||||||
|
|
||||||
if (topicFilter) {
|
if (topicFilter) {
|
||||||
|
|
@ -121,6 +136,15 @@ const streamingController: AppController = async (c) => {
|
||||||
|
|
||||||
socket.onclose = () => {
|
socket.onclose = () => {
|
||||||
controller.abort();
|
controller.abort();
|
||||||
|
|
||||||
|
if (ip) {
|
||||||
|
const next = count - 1;
|
||||||
|
if (next <= 0) {
|
||||||
|
connections.delete(ip);
|
||||||
|
} else {
|
||||||
|
connections.set(ip, next);
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
return response;
|
return response;
|
||||||
|
|
|
||||||
|
|
@ -17,9 +17,22 @@ import { Storages } from '@/storages.ts';
|
||||||
/** Limit of initial events returned for a subscription. */
|
/** Limit of initial events returned for a subscription. */
|
||||||
const FILTER_LIMIT = 100;
|
const FILTER_LIMIT = 100;
|
||||||
|
|
||||||
|
/** Maximum number of connections per IP. */
|
||||||
|
const MAX_CONNECTIONS = 3;
|
||||||
|
|
||||||
|
/** Map of IP addresses to connection counts. */
|
||||||
|
const connections = new Map<string, number>();
|
||||||
|
|
||||||
/** Set up the Websocket connection. */
|
/** Set up the Websocket connection. */
|
||||||
function connectStream(socket: WebSocket) {
|
function connectStream(socket: WebSocket, ip?: string) {
|
||||||
const controllers = new Map<string, AbortController>();
|
const controllers = new Map<string, AbortController>();
|
||||||
|
const count = ip ? connections.get(ip) ?? 0 : 0;
|
||||||
|
|
||||||
|
socket.onopen = () => {
|
||||||
|
if (ip) {
|
||||||
|
connections.set(ip, count + 1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
socket.onmessage = (e) => {
|
socket.onmessage = (e) => {
|
||||||
const result = n.json().pipe(n.clientMsg()).safeParse(e.data);
|
const result = n.json().pipe(n.clientMsg()).safeParse(e.data);
|
||||||
|
|
@ -34,6 +47,15 @@ function connectStream(socket: WebSocket) {
|
||||||
for (const controller of controllers.values()) {
|
for (const controller of controllers.values()) {
|
||||||
controller.abort();
|
controller.abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (ip) {
|
||||||
|
const next = count - 1;
|
||||||
|
if (next <= 0) {
|
||||||
|
connections.delete(ip);
|
||||||
|
} else {
|
||||||
|
connections.set(ip, next);
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Handle client message. */
|
/** Handle client message. */
|
||||||
|
|
@ -138,8 +160,16 @@ const relayController: AppController = (c, next) => {
|
||||||
return c.text('Please use a Nostr client to connect.', 400);
|
return c.text('Please use a Nostr client to connect.', 400);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const ip = c.req.header('x-real-ip');
|
||||||
|
if (ip) {
|
||||||
|
const count = connections.get(ip) ?? 0;
|
||||||
|
if (count >= MAX_CONNECTIONS) {
|
||||||
|
return c.text('Too many connections.', 429);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { idleTimeout: 30 });
|
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { idleTimeout: 30 });
|
||||||
connectStream(socket);
|
connectStream(socket, ip);
|
||||||
|
|
||||||
return response;
|
return response;
|
||||||
};
|
};
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue