Merge remote-tracking branch 'origin/main' into use-postgres-js

This commit is contained in:
Alex Gleason 2024-07-11 17:16:11 -05:00
commit 9584638d5c
No known key found for this signature in database
GPG key ID: 7211D1F99744FBB7
9 changed files with 143 additions and 23 deletions

View file

@ -1,4 +1,4 @@
image: denoland/deno:1.44.2 image: denoland/deno:1.45.0
default: default:
interruptible: true interruptible: true

View file

@ -1 +1 @@
deno 1.44.2 deno 1.45.0

View file

@ -97,8 +97,8 @@ const createStatusController: AppController = async (c) => {
const root = ancestor.tags.find((tag) => tag[0] === 'e' && tag[3] === 'root')?.[1] ?? ancestor.id; const root = ancestor.tags.find((tag) => tag[0] === 'e' && tag[3] === 'root')?.[1] ?? ancestor.id;
tags.push(['e', root, 'root']); tags.push(['e', root, Conf.relay, 'root']);
tags.push(['e', data.in_reply_to_id, 'reply']); tags.push(['e', data.in_reply_to_id, Conf.relay, 'reply']);
} }
if (data.quote_id) { if (data.quote_id) {
@ -202,7 +202,7 @@ const deleteStatusController: AppController = async (c) => {
if (event.pubkey === pubkey) { if (event.pubkey === pubkey) {
await createEvent({ await createEvent({
kind: 5, kind: 5,
tags: [['e', id]], tags: [['e', id, Conf.relay]],
}, c); }, c);
const author = await getAuthor(event.pubkey); const author = await getAuthor(event.pubkey);
@ -260,8 +260,8 @@ const favouriteController: AppController = async (c) => {
kind: 7, kind: 7,
content: '+', content: '+',
tags: [ tags: [
['e', target.id], ['e', target.id, Conf.relay],
['p', target.pubkey], ['p', target.pubkey, Conf.relay],
], ],
}, c); }, c);
@ -302,7 +302,10 @@ const reblogStatusController: AppController = async (c) => {
const reblogEvent = await createEvent({ const reblogEvent = await createEvent({
kind: 6, kind: 6,
tags: [['e', event.id], ['p', event.pubkey]], tags: [
['e', event.id, Conf.relay],
['p', event.pubkey, Conf.relay],
],
}, c); }, c);
await hydrateEvents({ await hydrateEvents({
@ -337,7 +340,7 @@ const unreblogStatusController: AppController = async (c) => {
await createEvent({ await createEvent({
kind: 5, kind: 5,
tags: [['e', repostEvent.id]], tags: [['e', repostEvent.id, Conf.relay]],
}, c); }, c);
return c.json(await renderStatus(event, { viewerPubkey: pubkey })); return c.json(await renderStatus(event, { viewerPubkey: pubkey }));
@ -389,7 +392,7 @@ const bookmarkController: AppController = async (c) => {
if (event) { if (event) {
await updateListEvent( await updateListEvent(
{ kinds: [10003], authors: [pubkey], limit: 1 }, { kinds: [10003], authors: [pubkey], limit: 1 },
(tags) => addTag(tags, ['e', eventId]), (tags) => addTag(tags, ['e', eventId, Conf.relay]),
c, c,
); );
@ -416,7 +419,7 @@ const unbookmarkController: AppController = async (c) => {
if (event) { if (event) {
await updateListEvent( await updateListEvent(
{ kinds: [10003], authors: [pubkey], limit: 1 }, { kinds: [10003], authors: [pubkey], limit: 1 },
(tags) => deleteTag(tags, ['e', eventId]), (tags) => deleteTag(tags, ['e', eventId, Conf.relay]),
c, c,
); );
@ -443,7 +446,7 @@ const pinController: AppController = async (c) => {
if (event) { if (event) {
await updateListEvent( await updateListEvent(
{ kinds: [10001], authors: [pubkey], limit: 1 }, { kinds: [10001], authors: [pubkey], limit: 1 },
(tags) => addTag(tags, ['e', eventId]), (tags) => addTag(tags, ['e', eventId, Conf.relay]),
c, c,
); );
@ -472,7 +475,7 @@ const unpinController: AppController = async (c) => {
if (event) { if (event) {
await updateListEvent( await updateListEvent(
{ kinds: [10001], authors: [pubkey], limit: 1 }, { kinds: [10001], authors: [pubkey], limit: 1 },
(tags) => deleteTag(tags, ['e', eventId]), (tags) => deleteTag(tags, ['e', eventId, Conf.relay]),
c, c,
); );
@ -516,7 +519,7 @@ const zapController: AppController = async (c) => {
lnurl = getLnurl(meta); lnurl = getLnurl(meta);
if (target && lnurl) { if (target && lnurl) {
tags.push( tags.push(
['e', target.id], ['e', target.id, Conf.relay],
['p', target.pubkey], ['p', target.pubkey],
['amount', amount.toString()], ['amount', amount.toString()],
['relays', Conf.relay], ['relays', Conf.relay],

View file

@ -10,9 +10,10 @@ import { MuteListPolicy } from '@/policies/MuteListPolicy.ts';
import { getFeedPubkeys } from '@/queries.ts'; import { getFeedPubkeys } from '@/queries.ts';
import { hydrateEvents } from '@/storages/hydrate.ts'; import { hydrateEvents } from '@/storages/hydrate.ts';
import { Storages } from '@/storages.ts'; import { Storages } from '@/storages.ts';
import { bech32ToPubkey } from '@/utils.ts'; import { bech32ToPubkey, Time } from '@/utils.ts';
import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts'; import { renderReblog, renderStatus } from '@/views/mastodon/statuses.ts';
import { renderNotification } from '@/views/mastodon/notifications.ts'; import { renderNotification } from '@/views/mastodon/notifications.ts';
import TTLCache from '@isaacs/ttlcache';
const debug = Debug('ditto:streaming'); const debug = Debug('ditto:streaming');
@ -37,6 +38,11 @@ const streamSchema = z.enum([
type Stream = z.infer<typeof streamSchema>; type Stream = z.infer<typeof streamSchema>;
const LIMITER_WINDOW = Time.minutes(5);
const LIMITER_LIMIT = 100;
const limiter = new TTLCache<string, number>();
const streamingController: AppController = async (c) => { const streamingController: AppController = async (c) => {
const upgrade = c.req.header('upgrade'); const upgrade = c.req.header('upgrade');
const token = c.req.header('sec-websocket-protocol'); const token = c.req.header('sec-websocket-protocol');
@ -52,6 +58,14 @@ const streamingController: AppController = async (c) => {
return c.json({ error: 'Invalid access token' }, 401); return c.json({ error: 'Invalid access token' }, 401);
} }
const ip = c.req.header('x-real-ip');
if (ip) {
const count = limiter.get(ip) ?? 0;
if (count > LIMITER_LIMIT) {
return c.json({ error: 'Rate limit exceeded' }, 429);
}
}
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 }); const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 });
const store = await Storages.db(); const store = await Storages.db();
@ -122,6 +136,23 @@ const streamingController: AppController = async (c) => {
} }
}; };
socket.onmessage = (e) => {
if (ip) {
const count = limiter.get(ip) ?? 0;
limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW });
if (count > LIMITER_LIMIT) {
socket.close(1008, 'Rate limit exceeded');
return;
}
}
if (typeof e.data !== 'string') {
socket.close(1003, 'Invalid message');
return;
}
};
socket.onclose = () => { socket.onclose = () => {
streamingConnectionsGauge.dec(); streamingConnectionsGauge.dec();
controller.abort(); controller.abort();

View file

@ -1,6 +1,11 @@
import { ErrorHandler } from '@hono/hono'; import { ErrorHandler } from '@hono/hono';
import { HTTPException } from '@hono/hono/http-exception';
export const errorHandler: ErrorHandler = (err, c) => { export const errorHandler: ErrorHandler = (err, c) => {
if (err instanceof HTTPException) {
return c.json({ error: err.message }, err.status);
}
console.error(err); console.error(err);
if (err.message === 'canceling statement due to statement timeout') { if (err.message === 'canceling statement due to statement timeout') {

View file

@ -1,3 +1,4 @@
import TTLCache from '@isaacs/ttlcache';
import { import {
NostrClientCLOSE, NostrClientCLOSE,
NostrClientCOUNT, NostrClientCOUNT,
@ -14,12 +15,18 @@ import { relayConnectionsGauge, relayEventCounter, relayMessageCounter } from '@
import * as pipeline from '@/pipeline.ts'; import * as pipeline from '@/pipeline.ts';
import { RelayError } from '@/RelayError.ts'; import { RelayError } from '@/RelayError.ts';
import { Storages } from '@/storages.ts'; import { Storages } from '@/storages.ts';
import { Time } from '@/utils/time.ts';
/** Limit of initial events returned for a subscription. */ /** Limit of initial events returned for a subscription. */
const FILTER_LIMIT = 100; const FILTER_LIMIT = 100;
const LIMITER_WINDOW = Time.minutes(1);
const LIMITER_LIMIT = 300;
const limiter = new TTLCache<string, number>();
/** Set up the Websocket connection. */ /** Set up the Websocket connection. */
function connectStream(socket: WebSocket) { function connectStream(socket: WebSocket, ip: string | undefined) {
const controllers = new Map<string, AbortController>(); const controllers = new Map<string, AbortController>();
socket.onopen = () => { socket.onopen = () => {
@ -27,6 +34,21 @@ function connectStream(socket: WebSocket) {
}; };
socket.onmessage = (e) => { socket.onmessage = (e) => {
if (ip) {
const count = limiter.get(ip) ?? 0;
limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW });
if (count > LIMITER_LIMIT) {
socket.close(1008, 'Rate limit exceeded');
return;
}
}
if (typeof e.data !== 'string') {
socket.close(1003, 'Invalid message');
return;
}
const result = n.json().pipe(n.clientMsg()).safeParse(e.data); const result = n.json().pipe(n.clientMsg()).safeParse(e.data);
if (result.success) { if (result.success) {
relayMessageCounter.inc({ verb: result.data[0] }); relayMessageCounter.inc({ verb: result.data[0] });
@ -152,8 +174,16 @@ const relayController: AppController = (c, next) => {
return c.text('Please use a Nostr client to connect.', 400); return c.text('Please use a Nostr client to connect.', 400);
} }
const ip = c.req.header('x-real-ip');
if (ip) {
const count = limiter.get(ip) ?? 0;
if (count > LIMITER_LIMIT) {
return c.json({ error: 'Rate limit exceeded' }, 429);
}
}
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { idleTimeout: 30 }); const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { idleTimeout: 30 });
connectStream(socket); connectStream(socket, ip);
return response; return response;
}; };

View file

@ -1,4 +1,5 @@
// deno-lint-ignore-file require-await // deno-lint-ignore-file require-await
import { HTTPException } from '@hono/hono/http-exception';
import { NConnectSigner, NostrEvent, NostrSigner } from '@nostrify/nostrify'; import { NConnectSigner, NostrEvent, NostrSigner } from '@nostrify/nostrify';
import { Storages } from '@/storages.ts'; import { Storages } from '@/storages.ts';
@ -27,30 +28,78 @@ export class ConnectSigner implements NostrSigner {
async signEvent(event: Omit<NostrEvent, 'id' | 'pubkey' | 'sig'>): Promise<NostrEvent> { async signEvent(event: Omit<NostrEvent, 'id' | 'pubkey' | 'sig'>): Promise<NostrEvent> {
const signer = await this.signer; const signer = await this.signer;
return signer.signEvent(event); try {
return await signer.signEvent(event);
} catch (e) {
if (e.name === 'AbortError') {
throw new HTTPException(408, { message: 'The event was not signed quickly enough' });
} else {
throw e;
}
}
} }
readonly nip04 = { readonly nip04 = {
encrypt: async (pubkey: string, plaintext: string): Promise<string> => { encrypt: async (pubkey: string, plaintext: string): Promise<string> => {
const signer = await this.signer; const signer = await this.signer;
return signer.nip04.encrypt(pubkey, plaintext); try {
return await signer.nip04.encrypt(pubkey, plaintext);
} catch (e) {
if (e.name === 'AbortError') {
throw new HTTPException(408, {
message: 'Text was not encrypted quickly enough',
});
} else {
throw e;
}
}
}, },
decrypt: async (pubkey: string, ciphertext: string): Promise<string> => { decrypt: async (pubkey: string, ciphertext: string): Promise<string> => {
const signer = await this.signer; const signer = await this.signer;
return signer.nip04.decrypt(pubkey, ciphertext); try {
return await signer.nip04.decrypt(pubkey, ciphertext);
} catch (e) {
if (e.name === 'AbortError') {
throw new HTTPException(408, {
message: 'Text was not decrypted quickly enough',
});
} else {
throw e;
}
}
}, },
}; };
readonly nip44 = { readonly nip44 = {
encrypt: async (pubkey: string, plaintext: string): Promise<string> => { encrypt: async (pubkey: string, plaintext: string): Promise<string> => {
const signer = await this.signer; const signer = await this.signer;
return signer.nip44.encrypt(pubkey, plaintext); try {
return await signer.nip44.encrypt(pubkey, plaintext);
} catch (e) {
if (e.name === 'AbortError') {
throw new HTTPException(408, {
message: 'Text was not encrypted quickly enough',
});
} else {
throw e;
}
}
}, },
decrypt: async (pubkey: string, ciphertext: string): Promise<string> => { decrypt: async (pubkey: string, ciphertext: string): Promise<string> => {
const signer = await this.signer; const signer = await this.signer;
return signer.nip44.decrypt(pubkey, ciphertext); try {
return await signer.nip44.decrypt(pubkey, ciphertext);
} catch (e) {
if (e.name === 'AbortError') {
throw new HTTPException(408, {
message: 'Text was not decrypted quickly enough',
});
} else {
throw e;
}
}
}, },
}; };

View file

@ -7,7 +7,7 @@ export class ReadOnlySigner implements NostrSigner {
async signEvent(): Promise<NostrEvent> { async signEvent(): Promise<NostrEvent> {
throw new HTTPException(401, { throw new HTTPException(401, {
message: 'Log out and back in', message: 'Log in with Nostr Connect to sign events',
}); });
} }

View file

@ -250,6 +250,8 @@ class EventsDB implements NStore {
/** Converts filters to more performant, simpler filters that are better for SQLite. */ /** Converts filters to more performant, simpler filters that are better for SQLite. */
async expandFilters(filters: NostrFilter[]): Promise<NostrFilter[]> { async expandFilters(filters: NostrFilter[]): Promise<NostrFilter[]> {
filters = structuredClone(filters);
for (const filter of filters) { for (const filter of filters) {
if (filter.search) { if (filter.search) {
const tokens = NIP50.parseInput(filter.search); const tokens = NIP50.parseInput(filter.search);