Merge branch 'main' into zap-notification-streaming

This commit is contained in:
P. Reis 2024-09-24 18:12:28 -03:00
commit b81ae3f043
36 changed files with 1008 additions and 216 deletions

View file

@ -1,4 +1,4 @@
image: denoland/deno:1.46.3 image: denoland/deno:2.0.0-rc.3
default: default:
interruptible: true interruptible: true

View file

@ -32,8 +32,9 @@
"@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1", "@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1",
"@lambdalisue/async": "jsr:@lambdalisue/async@^2.1.1", "@lambdalisue/async": "jsr:@lambdalisue/async@^2.1.1",
"@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0", "@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0",
"@nostrify/db": "jsr:@nostrify/db@^0.32.2", "@nostrify/db": "jsr:@nostrify/db@^0.35.0",
"@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.30.1", "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.36.0",
"@nostrify/policies": "jsr:@nostrify/policies@^0.35.0",
"@scure/base": "npm:@scure/base@^1.1.6", "@scure/base": "npm:@scure/base@^1.1.6",
"@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs", "@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs",
"@soapbox/kysely-pglite": "jsr:@soapbox/kysely-pglite@^0.0.1", "@soapbox/kysely-pglite": "jsr:@soapbox/kysely-pglite@^0.0.1",

127
deno.lock generated
View file

@ -13,19 +13,27 @@
"jsr:@gleasonator/policy@0.5.0": "jsr:@gleasonator/policy@0.5.0", "jsr:@gleasonator/policy@0.5.0": "jsr:@gleasonator/policy@0.5.0",
"jsr:@gleasonator/policy@0.5.1": "jsr:@gleasonator/policy@0.5.1", "jsr:@gleasonator/policy@0.5.1": "jsr:@gleasonator/policy@0.5.1",
"jsr:@gleasonator/policy@0.5.2": "jsr:@gleasonator/policy@0.5.2", "jsr:@gleasonator/policy@0.5.2": "jsr:@gleasonator/policy@0.5.2",
"jsr:@hono/hono@^4.4.6": "jsr:@hono/hono@4.5.11", "jsr:@gleasonator/policy@0.6.0": "jsr:@gleasonator/policy@0.6.0",
"jsr:@gleasonator/policy@0.6.1": "jsr:@gleasonator/policy@0.6.1",
"jsr:@gleasonator/policy@0.6.3": "jsr:@gleasonator/policy@0.6.3",
"jsr:@gleasonator/policy@0.6.4": "jsr:@gleasonator/policy@0.6.4",
"jsr:@hono/hono@^4.4.6": "jsr:@hono/hono@4.6.2",
"jsr:@lambdalisue/async@^2.1.1": "jsr:@lambdalisue/async@2.1.1", "jsr:@lambdalisue/async@^2.1.1": "jsr:@lambdalisue/async@2.1.1",
"jsr:@nostrify/db@^0.32.2": "jsr:@nostrify/db@0.32.2", "jsr:@nostrify/db@^0.35.0": "jsr:@nostrify/db@0.35.0",
"jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.5", "jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.5",
"jsr:@nostrify/nostrify@^0.22.4": "jsr:@nostrify/nostrify@0.22.4", "jsr:@nostrify/nostrify@^0.22.4": "jsr:@nostrify/nostrify@0.22.4",
"jsr:@nostrify/nostrify@^0.22.5": "jsr:@nostrify/nostrify@0.22.5", "jsr:@nostrify/nostrify@^0.22.5": "jsr:@nostrify/nostrify@0.22.5",
"jsr:@nostrify/nostrify@^0.30.0": "jsr:@nostrify/nostrify@0.30.1",
"jsr:@nostrify/nostrify@^0.30.1": "jsr:@nostrify/nostrify@0.30.1",
"jsr:@nostrify/nostrify@^0.31.0": "jsr:@nostrify/nostrify@0.31.0", "jsr:@nostrify/nostrify@^0.31.0": "jsr:@nostrify/nostrify@0.31.0",
"jsr:@nostrify/nostrify@^0.32.0": "jsr:@nostrify/nostrify@0.32.0",
"jsr:@nostrify/nostrify@^0.35.0": "jsr:@nostrify/nostrify@0.35.0",
"jsr:@nostrify/nostrify@^0.36.0": "jsr:@nostrify/nostrify@0.36.0",
"jsr:@nostrify/policies@^0.33.0": "jsr:@nostrify/policies@0.33.0", "jsr:@nostrify/policies@^0.33.0": "jsr:@nostrify/policies@0.33.0",
"jsr:@nostrify/policies@^0.33.1": "jsr:@nostrify/policies@0.33.1", "jsr:@nostrify/policies@^0.33.1": "jsr:@nostrify/policies@0.33.1",
"jsr:@nostrify/policies@^0.34.0": "jsr:@nostrify/policies@0.34.0",
"jsr:@nostrify/policies@^0.35.0": "jsr:@nostrify/policies@0.35.0",
"jsr:@nostrify/types@^0.30.0": "jsr:@nostrify/types@0.30.1", "jsr:@nostrify/types@^0.30.0": "jsr:@nostrify/types@0.30.1",
"jsr:@nostrify/types@^0.30.1": "jsr:@nostrify/types@0.30.1", "jsr:@nostrify/types@^0.30.1": "jsr:@nostrify/types@0.30.1",
"jsr:@nostrify/types@^0.35.0": "jsr:@nostrify/types@0.35.0",
"jsr:@soapbox/kysely-pglite@^0.0.1": "jsr:@soapbox/kysely-pglite@0.0.1", "jsr:@soapbox/kysely-pglite@^0.0.1": "jsr:@soapbox/kysely-pglite@0.0.1",
"jsr:@soapbox/stickynotes@^0.4.0": "jsr:@soapbox/stickynotes@0.4.0", "jsr:@soapbox/stickynotes@^0.4.0": "jsr:@soapbox/stickynotes@0.4.0",
"jsr:@std/assert@^0.213.1": "jsr:@std/assert@0.213.1", "jsr:@std/assert@^0.213.1": "jsr:@std/assert@0.213.1",
@ -49,7 +57,7 @@
"jsr:@std/fs@^0.229.3": "jsr:@std/fs@0.229.3", "jsr:@std/fs@^0.229.3": "jsr:@std/fs@0.229.3",
"jsr:@std/internal@^1.0.0": "jsr:@std/internal@1.0.3", "jsr:@std/internal@^1.0.0": "jsr:@std/internal@1.0.3",
"jsr:@std/io@^0.223.0": "jsr:@std/io@0.223.0", "jsr:@std/io@^0.223.0": "jsr:@std/io@0.223.0",
"jsr:@std/io@^0.224": "jsr:@std/io@0.224.7", "jsr:@std/io@^0.224": "jsr:@std/io@0.224.8",
"jsr:@std/json@^0.223.0": "jsr:@std/json@0.223.0", "jsr:@std/json@^0.223.0": "jsr:@std/json@0.223.0",
"jsr:@std/media-types@^0.224.1": "jsr:@std/media-types@0.224.1", "jsr:@std/media-types@^0.224.1": "jsr:@std/media-types@0.224.1",
"jsr:@std/path@0.213.1": "jsr:@std/path@0.213.1", "jsr:@std/path@0.213.1": "jsr:@std/path@0.213.1",
@ -171,6 +179,34 @@
"jsr:@nostrify/policies@^0.33.1" "jsr:@nostrify/policies@^0.33.1"
] ]
}, },
"@gleasonator/policy@0.6.0": {
"integrity": "77f52bb245255a61070a4970c50e2ea8e82345c1de2fef12b9d8887a20b46e6d",
"dependencies": [
"jsr:@nostrify/nostrify@^0.32.0",
"jsr:@nostrify/policies@^0.34.0"
]
},
"@gleasonator/policy@0.6.1": {
"integrity": "ba763d69332a736678b068b4063709874bc64010dfc3f974818218a41deb2291",
"dependencies": [
"jsr:@nostrify/nostrify@^0.32.0",
"jsr:@nostrify/policies@^0.34.0"
]
},
"@gleasonator/policy@0.6.3": {
"integrity": "7126c52edd3de21488714e66ec71f31ba9b14f8afc761ab73ac7c3ecc936625c",
"dependencies": [
"jsr:@nostrify/nostrify@^0.32.0",
"jsr:@nostrify/policies@^0.34.0"
]
},
"@gleasonator/policy@0.6.4": {
"integrity": "fd91c94546edd1de1faa80cb3248699b2f010ef1bdd89818dbc4a03e7606e0bb",
"dependencies": [
"jsr:@nostrify/nostrify@^0.32.0",
"jsr:@nostrify/policies@^0.34.0"
]
},
"@hono/hono@4.4.6": { "@hono/hono@4.4.6": {
"integrity": "aa557ca9930787ee86b9ca1730691f1ce1c379174c2cb244d5934db2b6314453" "integrity": "aa557ca9930787ee86b9ca1730691f1ce1c379174c2cb244d5934db2b6314453"
}, },
@ -195,14 +231,17 @@
"@hono/hono@4.5.9": { "@hono/hono@4.5.9": {
"integrity": "47f561e67aedbd6d1e21e3a1ae26c1b80ffdb62a51c161d502e75bee17ca40af" "integrity": "47f561e67aedbd6d1e21e3a1ae26c1b80ffdb62a51c161d502e75bee17ca40af"
}, },
"@hono/hono@4.6.2": {
"integrity": "35fcf3be4687825080b01bed7bbe2ac66f8d8b8939f0bad459661bf3b46d916f"
},
"@lambdalisue/async@2.1.1": { "@lambdalisue/async@2.1.1": {
"integrity": "1fc9bc6f4ed50215cd2f7217842b18cea80f81c25744f88f8c5eb4be5a1c9ab4" "integrity": "1fc9bc6f4ed50215cd2f7217842b18cea80f81c25744f88f8c5eb4be5a1c9ab4"
}, },
"@nostrify/db@0.32.2": { "@nostrify/db@0.35.0": {
"integrity": "265fb41e9d5810b99f1003ce56c89e4b468e6d0c04e7b9d9e3126c4efd49c1c2", "integrity": "637191c41812544e361b7997dc44ea098f8bd7efebb28f37a8a7142a0ecada8d",
"dependencies": [ "dependencies": [
"jsr:@nostrify/nostrify@^0.31.0", "jsr:@nostrify/nostrify@^0.35.0",
"jsr:@nostrify/types@^0.30.1", "jsr:@nostrify/types@^0.35.0",
"npm:kysely@^0.27.3", "npm:kysely@^0.27.3",
"npm:nostr-tools@^2.7.0" "npm:nostr-tools@^2.7.0"
] ]
@ -251,13 +290,11 @@
"npm:zod@^3.23.8" "npm:zod@^3.23.8"
] ]
}, },
"@nostrify/nostrify@0.30.1": { "@nostrify/nostrify@0.31.0": {
"integrity": "fcc923707e87a9fbecc82dbb18756d1d3d134cd0763f4b1254c4bce709e811eb", "integrity": "1c1b686bb9ca3ad8d19807e3b96ef3793a65d70fd0f433fe6ef8b3fdb9f45557",
"dependencies": [ "dependencies": [
"jsr:@nostrify/types@^0.30.0", "jsr:@nostrify/types@^0.30.1",
"jsr:@std/crypto@^0.224.0",
"jsr:@std/encoding@^0.224.1", "jsr:@std/encoding@^0.224.1",
"npm:@scure/base@^1.1.6",
"npm:@scure/bip32@^1.4.0", "npm:@scure/bip32@^1.4.0",
"npm:@scure/bip39@^1.3.0", "npm:@scure/bip39@^1.3.0",
"npm:lru-cache@^10.2.0", "npm:lru-cache@^10.2.0",
@ -266,8 +303,8 @@
"npm:zod@^3.23.8" "npm:zod@^3.23.8"
] ]
}, },
"@nostrify/nostrify@0.31.0": { "@nostrify/nostrify@0.32.0": {
"integrity": "1c1b686bb9ca3ad8d19807e3b96ef3793a65d70fd0f433fe6ef8b3fdb9f45557", "integrity": "2d3b7a9cce275c150355f8e566c11f14044afd0b889afcb48e883da9467bdaa9",
"dependencies": [ "dependencies": [
"jsr:@nostrify/types@^0.30.1", "jsr:@nostrify/types@^0.30.1",
"jsr:@std/encoding@^0.224.1", "jsr:@std/encoding@^0.224.1",
@ -279,6 +316,34 @@
"npm:zod@^3.23.8" "npm:zod@^3.23.8"
] ]
}, },
"@nostrify/nostrify@0.35.0": {
"integrity": "9bfef4883838b8b4cb2e2b28a60b72de95391ca5b789bc7206a2baea054dea55",
"dependencies": [
"jsr:@nostrify/types@^0.35.0",
"jsr:@std/encoding@^0.224.1",
"npm:@scure/bip32@^1.4.0",
"npm:@scure/bip39@^1.3.0",
"npm:lru-cache@^10.2.0",
"npm:nostr-tools@^2.7.0",
"npm:websocket-ts@^2.1.5",
"npm:zod@^3.23.8"
]
},
"@nostrify/nostrify@0.36.0": {
"integrity": "f00dbff1f02a2c496c5e85eeeb7a84101b7dd874d87456449dc71b6d037e40fc",
"dependencies": [
"jsr:@nostrify/types@^0.35.0",
"jsr:@std/crypto@^0.224.0",
"jsr:@std/encoding@^0.224.1",
"npm:@scure/base@^1.1.6",
"npm:@scure/bip32@^1.4.0",
"npm:@scure/bip39@^1.3.0",
"npm:lru-cache@^10.2.0",
"npm:nostr-tools@^2.7.0",
"npm:websocket-ts@^2.1.5",
"npm:zod@^3.23.8"
]
},
"@nostrify/policies@0.33.0": { "@nostrify/policies@0.33.0": {
"integrity": "c946b06d0527298b4d7c9819d142a10f522ba09eee76c37525aa4acfc5d87aee", "integrity": "c946b06d0527298b4d7c9819d142a10f522ba09eee76c37525aa4acfc5d87aee",
"dependencies": [ "dependencies": [
@ -293,12 +358,31 @@
"npm:nostr-tools@^2.7.0" "npm:nostr-tools@^2.7.0"
] ]
}, },
"@nostrify/policies@0.34.0": {
"integrity": "27eb8fb36106a29e982ec7fc6bbb91bd6989f8ce11113a3ef6c528b4c2deceee",
"dependencies": [
"jsr:@nostrify/nostrify@^0.32.0",
"jsr:@nostrify/types@^0.30.1",
"npm:nostr-tools@^2.7.0"
]
},
"@nostrify/policies@0.35.0": {
"integrity": "b828fac9f253e460a9587c05588b7dae6a0a32c5a9c9083e449219887b9e8e20",
"dependencies": [
"jsr:@nostrify/nostrify@^0.35.0",
"jsr:@nostrify/types@^0.35.0",
"npm:nostr-tools@^2.7.0"
]
},
"@nostrify/types@0.30.0": { "@nostrify/types@0.30.0": {
"integrity": "1f38fa849cff930bd709edbf94ef9ac02f46afb8b851f86c8736517b354616da" "integrity": "1f38fa849cff930bd709edbf94ef9ac02f46afb8b851f86c8736517b354616da"
}, },
"@nostrify/types@0.30.1": { "@nostrify/types@0.30.1": {
"integrity": "245da176f6893a43250697db51ad32bfa29bf9b1cdc1ca218043d9abf6de5ae5" "integrity": "245da176f6893a43250697db51ad32bfa29bf9b1cdc1ca218043d9abf6de5ae5"
}, },
"@nostrify/types@0.35.0": {
"integrity": "b8d515563d467072694557d5626fa1600f74e83197eef45dd86a9a99c64f7fe6"
},
"@soapbox/kysely-pglite@0.0.1": { "@soapbox/kysely-pglite@0.0.1": {
"integrity": "7a4221aa780aad6fba9747c45c59dfb1c62017ba8cad9db5607f6e5822c058d5", "integrity": "7a4221aa780aad6fba9747c45c59dfb1c62017ba8cad9db5607f6e5822c058d5",
"dependencies": [ "dependencies": [
@ -428,6 +512,12 @@
"jsr:@std/bytes@^1.0.2" "jsr:@std/bytes@^1.0.2"
] ]
}, },
"@std/io@0.224.8": {
"integrity": "f525d05d51fd873de6352b9afcf35cab9ab5dc448bf3c20e0c8b521ded9be392",
"dependencies": [
"jsr:@std/bytes@^1.0.2"
]
},
"@std/json@0.223.0": { "@std/json@0.223.0": {
"integrity": "9a4a255931dd0397924c6b10bb6a72fe3e28ddd876b981ada2e3b8dd0764163f", "integrity": "9a4a255931dd0397924c6b10bb6a72fe3e28ddd876b981ada2e3b8dd0764163f",
"dependencies": [ "dependencies": [
@ -2009,8 +2099,9 @@
"jsr:@bradenmacdonald/s3-lite-client@^0.7.4", "jsr:@bradenmacdonald/s3-lite-client@^0.7.4",
"jsr:@hono/hono@^4.4.6", "jsr:@hono/hono@^4.4.6",
"jsr:@lambdalisue/async@^2.1.1", "jsr:@lambdalisue/async@^2.1.1",
"jsr:@nostrify/db@^0.32.2", "jsr:@nostrify/db@^0.35.0",
"jsr:@nostrify/nostrify@^0.30.1", "jsr:@nostrify/nostrify@^0.36.0",
"jsr:@nostrify/policies@^0.35.0",
"jsr:@soapbox/kysely-pglite@^0.0.1", "jsr:@soapbox/kysely-pglite@^0.0.1",
"jsr:@soapbox/stickynotes@^0.4.0", "jsr:@soapbox/stickynotes@^0.4.0",
"jsr:@std/assert@^0.225.1", "jsr:@std/assert@^0.225.1",

File diff suppressed because it is too large Load diff

View file

@ -102,7 +102,7 @@ async function exportEvents(args: ExportFilter) {
let filter: NostrFilter = {}; let filter: NostrFilter = {};
try { try {
filter = buildFilter(args); filter = buildFilter(args);
} catch (e) { } catch (e: any) {
die(1, e.message || e.toString()); die(1, e.message || e.toString());
} }

View file

@ -209,6 +209,12 @@ class Conf {
static get firehoseConcurrency(): number { static get firehoseConcurrency(): number {
return Math.ceil(Number(Deno.env.get('FIREHOSE_CONCURRENCY') ?? (Conf.pg.poolSize * 0.25))); return Math.ceil(Number(Deno.env.get('FIREHOSE_CONCURRENCY') ?? (Conf.pg.poolSize * 0.25)));
} }
/** Nostr event kinds of events to listen for on the firehose. */
static get firehoseKinds(): number[] {
return (Deno.env.get('FIREHOSE_KINDS') ?? '0, 1, 3, 5, 6, 7, 9735, 10002')
.split(/[, ]+/g)
.map(Number);
}
/** Whether to enable Ditto cron jobs. */ /** Whether to enable Ditto cron jobs. */
static get cronEnabled(): boolean { static get cronEnabled(): boolean {
return optionalBooleanSchema.parse(Deno.env.get('CRON_ENABLED')) ?? true; return optionalBooleanSchema.parse(Deno.env.get('CRON_ENABLED')) ?? true;
@ -241,6 +247,30 @@ class Conf {
static get zapSplitsEnabled(): boolean { static get zapSplitsEnabled(): boolean {
return optionalBooleanSchema.parse(Deno.env.get('ZAP_SPLITS_ENABLED')) ?? false; return optionalBooleanSchema.parse(Deno.env.get('ZAP_SPLITS_ENABLED')) ?? false;
} }
/** Cache settings. */
static caches = {
/** NIP-05 cache settings. */
get nip05(): { max: number; ttl: number } {
return {
max: Number(Deno.env.get('DITTO_CACHE_NIP05_MAX') || 3000),
ttl: Number(Deno.env.get('DITTO_CACHE_NIP05_TTL') || 1 * 60 * 60 * 1000),
};
},
/** Favicon cache settings. */
get favicon(): { max: number; ttl: number } {
return {
max: Number(Deno.env.get('DITTO_CACHE_FAVICON_MAX') || 500),
ttl: Number(Deno.env.get('DITTO_CACHE_FAVICON_TTL') || 1 * 60 * 60 * 1000),
};
},
/** Link preview cache settings. */
get linkPreview(): { max: number; ttl: number } {
return {
max: Number(Deno.env.get('DITTO_CACHE_LINK_PREVIEW_MAX') || 1000),
ttl: Number(Deno.env.get('DITTO_CACHE_LINK_PREVIEW_TTL') || 12 * 60 * 60 * 1000),
};
},
};
} }
const optionalBooleanSchema = z const optionalBooleanSchema = z

View file

@ -5,7 +5,11 @@ import { z } from 'zod';
import { type AppController } from '@/app.ts'; import { type AppController } from '@/app.ts';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { streamingConnectionsGauge } from '@/metrics.ts'; import {
streamingClientMessagesCounter,
streamingConnectionsGauge,
streamingServerMessagesCounter,
} from '@/metrics.ts';
import { MuteListPolicy } from '@/policies/MuteListPolicy.ts'; import { MuteListPolicy } from '@/policies/MuteListPolicy.ts';
import { getFeedPubkeys } from '@/queries.ts'; import { getFeedPubkeys } from '@/queries.ts';
import { hydrateEvents } from '@/storages/hydrate.ts'; import { hydrateEvents } from '@/storages/hydrate.ts';
@ -61,6 +65,8 @@ const LIMITER_LIMIT = 100;
const limiter = new TTLCache<string, number>(); const limiter = new TTLCache<string, number>();
const connections = new Set<WebSocket>();
const streamingController: AppController = async (c) => { const streamingController: AppController = async (c) => {
const upgrade = c.req.header('upgrade'); const upgrade = c.req.header('upgrade');
const token = c.req.header('sec-websocket-protocol'); const token = c.req.header('sec-websocket-protocol');
@ -94,6 +100,7 @@ const streamingController: AppController = async (c) => {
function send(e: StreamingEvent) { function send(e: StreamingEvent) {
if (socket.readyState === WebSocket.OPEN) { if (socket.readyState === WebSocket.OPEN) {
debug('send', e.event, e.payload); debug('send', e.event, e.payload);
streamingServerMessagesCounter.inc();
socket.send(JSON.stringify(e)); socket.send(JSON.stringify(e));
} }
} }
@ -126,7 +133,8 @@ const streamingController: AppController = async (c) => {
} }
socket.onopen = async () => { socket.onopen = async () => {
streamingConnectionsGauge.inc(); connections.add(socket);
streamingConnectionsGauge.set(connections.size);
if (!stream) return; if (!stream) return;
const topicFilter = await topicToFilter(stream, c.req.query(), pubkey); const topicFilter = await topicToFilter(stream, c.req.query(), pubkey);
@ -169,6 +177,8 @@ const streamingController: AppController = async (c) => {
}; };
socket.onmessage = (e) => { socket.onmessage = (e) => {
streamingClientMessagesCounter.inc();
if (ip) { if (ip) {
const count = limiter.get(ip) ?? 0; const count = limiter.get(ip) ?? 0;
limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW }); limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW });
@ -186,7 +196,8 @@ const streamingController: AppController = async (c) => {
}; };
socket.onclose = () => { socket.onclose = () => {
streamingConnectionsGauge.dec(); connections.delete(socket);
streamingConnectionsGauge.set(connections.size);
controller.abort(); controller.abort();
}; };

View file

@ -1,17 +1,32 @@
import { register } from 'prom-client'; import { register } from 'prom-client';
import { AppController } from '@/app.ts'; import { AppController } from '@/app.ts';
import { dbAvailableConnectionsGauge, dbPoolSizeGauge } from '@/metrics.ts'; import {
dbAvailableConnectionsGauge,
dbPoolSizeGauge,
relayPoolRelaysSizeGauge,
relayPoolSubscriptionsSizeGauge,
} from '@/metrics.ts';
import { Storages } from '@/storages.ts'; import { Storages } from '@/storages.ts';
/** Prometheus/OpenMetrics controller. */ /** Prometheus/OpenMetrics controller. */
export const metricsController: AppController = async (c) => { export const metricsController: AppController = async (c) => {
const db = await Storages.database(); const db = await Storages.database();
const pool = await Storages.client();
// Update some metrics at request time. // Update some metrics at request time.
dbPoolSizeGauge.set(db.poolSize); dbPoolSizeGauge.set(db.poolSize);
dbAvailableConnectionsGauge.set(db.availableConnections); dbAvailableConnectionsGauge.set(db.availableConnections);
relayPoolRelaysSizeGauge.reset();
relayPoolSubscriptionsSizeGauge.reset();
for (const relay of pool.relays.values()) {
relayPoolRelaysSizeGauge.inc({ ready_state: relay.socket.readyState });
relayPoolSubscriptionsSizeGauge.inc(relay.subscriptions.length);
}
// Serve the metrics.
const metrics = await register.metrics(); const metrics = await register.metrics();
const headers: HeadersInit = { const headers: HeadersInit = {

View file

@ -1,3 +1,4 @@
import { Stickynotes } from '@soapbox/stickynotes';
import TTLCache from '@isaacs/ttlcache'; import TTLCache from '@isaacs/ttlcache';
import { import {
NostrClientCLOSE, NostrClientCLOSE,
@ -26,14 +27,18 @@ const LIMITER_LIMIT = 300;
const limiter = new TTLCache<string, number>(); const limiter = new TTLCache<string, number>();
/** Connections for metrics purposes. */
const connections = new Set<WebSocket>();
const console = new Stickynotes('ditto:relay');
/** Set up the Websocket connection. */ /** Set up the Websocket connection. */
function connectStream(socket: WebSocket, ip: string | undefined) { function connectStream(socket: WebSocket, ip: string | undefined) {
let opened = false;
const controllers = new Map<string, AbortController>(); const controllers = new Map<string, AbortController>();
socket.onopen = () => { socket.onopen = () => {
opened = true; connections.add(socket);
relayConnectionsGauge.inc(); relayConnectionsGauge.set(connections.size);
}; };
socket.onmessage = (e) => { socket.onmessage = (e) => {
@ -63,9 +68,8 @@ function connectStream(socket: WebSocket, ip: string | undefined) {
}; };
socket.onclose = () => { socket.onclose = () => {
if (opened) { connections.delete(socket);
relayConnectionsGauge.dec(); relayConnectionsGauge.set(connections.size);
}
for (const controller of controllers.values()) { for (const controller of controllers.values()) {
controller.abort(); controller.abort();
@ -103,7 +107,7 @@ function connectStream(socket: WebSocket, ip: string | undefined) {
for (const event of await store.query(filters, { limit: FILTER_LIMIT, timeout: Conf.db.timeouts.relay })) { for (const event of await store.query(filters, { limit: FILTER_LIMIT, timeout: Conf.db.timeouts.relay })) {
send(['EVENT', subId, event]); send(['EVENT', subId, event]);
} }
} catch (e) { } catch (e: any) {
if (e instanceof RelayError) { if (e instanceof RelayError) {
send(['CLOSED', subId, e.message]); send(['CLOSED', subId, e.message]);
} else if (e.message.includes('timeout')) { } else if (e.message.includes('timeout')) {

View file

@ -9,7 +9,6 @@ export interface DittoTables extends NPostgresSchema {
event_stats: EventStatsRow; event_stats: EventStatsRow;
pubkey_domains: PubkeyDomainRow; pubkey_domains: PubkeyDomainRow;
event_zaps: EventZapRow; event_zaps: EventZapRow;
author_search: AuthorSearch;
} }
type NostrEventsRow = NPostgresSchema['nostr_events'] & { type NostrEventsRow = NPostgresSchema['nostr_events'] & {
@ -21,6 +20,7 @@ interface AuthorStatsRow {
followers_count: number; followers_count: number;
following_count: number; following_count: number;
notes_count: number; notes_count: number;
search: string;
} }
interface EventStatsRow { interface EventStatsRow {
@ -55,8 +55,3 @@ interface EventZapRow {
amount_millisats: number; amount_millisats: number;
comment: string; comment: string;
} }
interface AuthorSearch {
pubkey: string;
search: string;
}

View file

@ -0,0 +1,32 @@
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.alterTable('author_stats')
.addColumn('search', 'text', (col) => col.notNull().defaultTo(''))
.execute();
await sql`CREATE INDEX author_stats_search_idx ON author_stats USING GIN (search gin_trgm_ops)`.execute(db);
await db.insertInto('author_stats')
.columns(['pubkey', 'search'])
.expression(
db.selectFrom('author_search')
.select(['pubkey', 'search']),
)
.onConflict((oc) =>
oc.column('pubkey')
.doUpdateSet((eb) => ({
search: eb.ref('excluded.search'),
}))
)
.execute();
await db.schema.dropIndex('author_search_search_idx').ifExists().execute();
await db.schema.dropTable('author_search').execute();
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.dropIndex('author_stats_search_idx').ifExists().execute();
await db.schema.alterTable('author_stats').dropColumn('search').execute();
}

View file

@ -0,0 +1,17 @@
import { Kysely } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.createIndex('author_stats_followers_count_idx')
.ifNotExists()
.on('author_stats')
.column('followers_count desc')
.execute();
// This index should have never been added, because pubkey is the primary key.
await db.schema.dropIndex('idx_author_stats_pubkey').ifExists().execute();
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.dropIndex('author_stats_followers_count_idx').ifExists().execute();
}

View file

@ -0,0 +1,14 @@
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await db.deleteFrom('event_stats').where(sql<number>`length(event_id)`, '>', 64).execute();
await db.deleteFrom('author_stats').where(sql<number>`length(pubkey)`, '>', 64).execute();
await db.schema.alterTable('event_stats').alterColumn('event_id', (col) => col.setDataType('char(64)')).execute();
await db.schema.alterTable('author_stats').alterColumn('pubkey', (col) => col.setDataType('char(64)')).execute();
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema.alterTable('event_stats').alterColumn('event_id', (col) => col.setDataType('text')).execute();
await db.schema.alterTable('author_stats').alterColumn('pubkey', (col) => col.setDataType('text')).execute();
}

View file

@ -19,7 +19,7 @@ const sem = new Semaphore(Conf.firehoseConcurrency);
export async function startFirehose(): Promise<void> { export async function startFirehose(): Promise<void> {
const store = await Storages.client(); const store = await Storages.client();
for await (const msg of store.req([{ kinds: [0, 1, 3, 5, 6, 7, 9735, 10002], limit: 0, since: nostrNow() }])) { for await (const msg of store.req([{ kinds: Conf.firehoseKinds, limit: 0, since: nostrNow() }])) {
if (msg[0] === 'EVENT') { if (msg[0] === 'EVENT') {
const event = msg[2]; const event = msg[2];
console.debug(`NostrEvent<${event.kind}> ${event.id}`); console.debug(`NostrEvent<${event.kind}> ${event.id}`);

View file

@ -17,10 +17,20 @@ export const streamingConnectionsGauge = new Gauge({
help: 'Number of active connections to the streaming API', help: 'Number of active connections to the streaming API',
}); });
export const fetchCounter = new Counter({ export const streamingServerMessagesCounter = new Counter({
name: 'ditto_fetch_total', name: 'ditto_streaming_server_messages_total',
help: 'Total number of messages sent from the streaming API',
});
export const streamingClientMessagesCounter = new Counter({
name: 'ditto_streaming_client_messages_total',
help: 'Total number of messages received by the streaming API',
});
export const fetchResponsesCounter = new Counter({
name: 'ditto_fetch_responses_total',
help: 'Total number of fetch requests', help: 'Total number of fetch requests',
labelNames: ['method'], labelNames: ['method', 'status'],
}); });
export const firehoseEventsCounter = new Counter({ export const firehoseEventsCounter = new Counter({
@ -84,3 +94,39 @@ export const dbQueryDurationHistogram = new Histogram({
name: 'ditto_db_query_duration_ms', name: 'ditto_db_query_duration_ms',
help: 'Duration of database queries', help: 'Duration of database queries',
}); });
export const cachedFaviconsSizeGauge = new Gauge({
name: 'ditto_cached_favicons_size',
help: 'Number of domain favicons in cache',
});
export const cachedLnurlsSizeGauge = new Gauge({
name: 'ditto_cached_lnurls_size',
help: 'Number of LNURL details in cache',
});
export const cachedNip05sSizeGauge = new Gauge({
name: 'ditto_cached_nip05s_size',
help: 'Number of NIP-05 results in cache',
});
export const cachedLinkPreviewSizeGauge = new Gauge({
name: 'ditto_cached_link_previews_size',
help: 'Number of link previews in cache',
});
export const internalSubscriptionsSizeGauge = new Gauge({
name: 'ditto_internal_subscriptions_size',
help: "Number of active subscriptions to Ditto's internal relay",
});
export const relayPoolRelaysSizeGauge = new Gauge({
name: 'ditto_relay_pool_relays_size',
help: 'Number of relays in the relay pool',
labelNames: ['ready_state'],
});
export const relayPoolSubscriptionsSizeGauge = new Gauge({
name: 'ditto_relay_pool_subscriptions_size',
help: 'Number of active subscriptions to the relay pool',
});

View file

@ -1,5 +1,5 @@
import { NKinds, NostrEvent, NSchema as n } from '@nostrify/nostrify'; import { NKinds, NostrEvent, NSchema as n } from '@nostrify/nostrify';
import Debug from '@soapbox/stickynotes/debug'; import { Stickynotes } from '@soapbox/stickynotes';
import ISO6391 from 'iso-639-1'; import ISO6391 from 'iso-639-1';
import { Kysely, sql } from 'kysely'; import { Kysely, sql } from 'kysely';
import lande from 'lande'; import lande from 'lande';
@ -23,7 +23,7 @@ import { purifyEvent } from '@/utils/purify.ts';
import { updateStats } from '@/utils/stats.ts'; import { updateStats } from '@/utils/stats.ts';
import { getTagSet } from '@/utils/tags.ts'; import { getTagSet } from '@/utils/tags.ts';
const debug = Debug('ditto:pipeline'); const console = new Stickynotes('ditto:pipeline');
/** /**
* Common pipeline function to process (and maybe store) events. * Common pipeline function to process (and maybe store) events.
@ -41,15 +41,15 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
if (encounterEvent(event)) return; if (encounterEvent(event)) return;
if (await existsInDB(event)) return; if (await existsInDB(event)) return;
debug(`NostrEvent<${event.kind}> ${event.id}`); console.info(`NostrEvent<${event.kind}> ${event.id}`);
pipelineEventsCounter.inc({ kind: event.kind }); pipelineEventsCounter.inc({ kind: event.kind });
if (isProtectedEvent(event)) { if (isProtectedEvent(event)) {
throw new RelayError('invalid', 'protected event'); throw new RelayError('invalid', 'protected event');
} }
if (event.kind !== 24133) { if (event.kind !== 24133 && event.pubkey !== Conf.pubkey) {
await policyFilter(event); await policyFilter(event, signal);
} }
await hydrateEvent(event, signal); await hydrateEvent(event, signal);
@ -62,29 +62,32 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
const kysely = await Storages.kysely(); const kysely = await Storages.kysely();
try {
await storeEvent(purifyEvent(event), signal); await storeEvent(purifyEvent(event), signal);
await Promise.all([ await Promise.all([
handleZaps(kysely, event), handleZaps(kysely, event),
parseMetadata(event, signal), parseMetadata(event, signal),
setLanguage(event), setLanguage(event),
generateSetEvents(event),
streamOut(event),
]); ]);
} finally {
await generateSetEvents(event);
await streamOut(event);
}
} }
async function policyFilter(event: NostrEvent): Promise<void> { async function policyFilter(event: NostrEvent, signal: AbortSignal): Promise<void> {
const debug = Debug('ditto:policy'); const console = new Stickynotes('ditto:policy');
try { try {
const result = await policyWorker.call(event); const result = await policyWorker.call(event, signal);
policyEventsCounter.inc({ ok: String(result[2]) }); policyEventsCounter.inc({ ok: String(result[2]) });
debug(JSON.stringify(result)); console.log(JSON.stringify(result));
RelayError.assert(result); RelayError.assert(result);
} catch (e) { } catch (e) {
if (e instanceof RelayError) { if (e instanceof RelayError) {
throw e; throw e;
} else { } else {
console.error('POLICY ERROR:', e); console.error(e);
throw new RelayError('blocked', 'policy error'); throw new RelayError('blocked', 'policy error');
} }
} }
@ -133,7 +136,7 @@ async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise<unde
const store = await Storages.db(); const store = await Storages.db();
await store.transaction(async (store, kysely) => { await store.transaction(async (store, kysely) => {
await updateStats({ event, store, kysely }); await updateStats({ event, store, kysely }).catch((e) => console.error(e));
await store.event(event, { signal }); await store.event(event, { signal });
}); });
} }
@ -157,8 +160,8 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise<vo
const search = result?.pubkey === event.pubkey ? [name, nip05].filter(Boolean).join(' ').trim() : name ?? ''; const search = result?.pubkey === event.pubkey ? [name, nip05].filter(Boolean).join(' ').trim() : name ?? '';
if (search) { if (search) {
await kysely.insertInto('author_search') await kysely.insertInto('author_stats')
.values({ pubkey: event.pubkey, search }) .values({ pubkey: event.pubkey, search, followers_count: 0, following_count: 0, notes_count: 0 })
.onConflict((oc) => oc.column('pubkey').doUpdateSet({ search })) .onConflict((oc) => oc.column('pubkey').doUpdateSet({ search }))
.execute(); .execute();
} }

View file

@ -30,7 +30,7 @@ export class ConnectSigner implements NostrSigner {
const signer = await this.signer; const signer = await this.signer;
try { try {
return await signer.signEvent(event); return await signer.signEvent(event);
} catch (e) { } catch (e: any) {
if (e.name === 'AbortError') { if (e.name === 'AbortError') {
throw new HTTPException(408, { message: 'The event was not signed quickly enough' }); throw new HTTPException(408, { message: 'The event was not signed quickly enough' });
} else { } else {
@ -44,7 +44,7 @@ export class ConnectSigner implements NostrSigner {
const signer = await this.signer; const signer = await this.signer;
try { try {
return await signer.nip04.encrypt(pubkey, plaintext); return await signer.nip04.encrypt(pubkey, plaintext);
} catch (e) { } catch (e: any) {
if (e.name === 'AbortError') { if (e.name === 'AbortError') {
throw new HTTPException(408, { throw new HTTPException(408, {
message: 'Text was not encrypted quickly enough', message: 'Text was not encrypted quickly enough',
@ -59,7 +59,7 @@ export class ConnectSigner implements NostrSigner {
const signer = await this.signer; const signer = await this.signer;
try { try {
return await signer.nip04.decrypt(pubkey, ciphertext); return await signer.nip04.decrypt(pubkey, ciphertext);
} catch (e) { } catch (e: any) {
if (e.name === 'AbortError') { if (e.name === 'AbortError') {
throw new HTTPException(408, { throw new HTTPException(408, {
message: 'Text was not decrypted quickly enough', message: 'Text was not decrypted quickly enough',
@ -76,7 +76,7 @@ export class ConnectSigner implements NostrSigner {
const signer = await this.signer; const signer = await this.signer;
try { try {
return await signer.nip44.encrypt(pubkey, plaintext); return await signer.nip44.encrypt(pubkey, plaintext);
} catch (e) { } catch (e: any) {
if (e.name === 'AbortError') { if (e.name === 'AbortError') {
throw new HTTPException(408, { throw new HTTPException(408, {
message: 'Text was not encrypted quickly enough', message: 'Text was not encrypted quickly enough',
@ -91,7 +91,7 @@ export class ConnectSigner implements NostrSigner {
const signer = await this.signer; const signer = await this.signer;
try { try {
return await signer.nip44.decrypt(pubkey, ciphertext); return await signer.nip44.decrypt(pubkey, ciphertext);
} catch (e) { } catch (e: any) {
if (e.name === 'AbortError') { if (e.name === 'AbortError') {
throw new HTTPException(408, { throw new HTTPException(408, {
message: 'Text was not decrypted quickly enough', message: 'Text was not decrypted quickly enough',

View file

@ -2,6 +2,7 @@
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { DittoDatabase } from '@/db/DittoDatabase.ts'; import { DittoDatabase } from '@/db/DittoDatabase.ts';
import { DittoDB } from '@/db/DittoDB.ts'; import { DittoDB } from '@/db/DittoDB.ts';
import { internalSubscriptionsSizeGauge } from '@/metrics.ts';
import { AdminStore } from '@/storages/AdminStore.ts'; import { AdminStore } from '@/storages/AdminStore.ts';
import { EventsDB } from '@/storages/EventsDB.ts'; import { EventsDB } from '@/storages/EventsDB.ts';
import { SearchStore } from '@/storages/search-store.ts'; import { SearchStore } from '@/storages/search-store.ts';
@ -14,7 +15,7 @@ export class Storages {
private static _db: Promise<EventsDB> | undefined; private static _db: Promise<EventsDB> | undefined;
private static _database: Promise<DittoDatabase> | undefined; private static _database: Promise<DittoDatabase> | undefined;
private static _admin: Promise<AdminStore> | undefined; private static _admin: Promise<AdminStore> | undefined;
private static _client: Promise<NPool> | undefined; private static _client: Promise<NPool<NRelay1>> | undefined;
private static _pubsub: Promise<InternalRelay> | undefined; private static _pubsub: Promise<InternalRelay> | undefined;
private static _search: Promise<SearchStore> | undefined; private static _search: Promise<SearchStore> | undefined;
@ -61,13 +62,13 @@ export class Storages {
/** Internal pubsub relay between controllers and the pipeline. */ /** Internal pubsub relay between controllers and the pipeline. */
public static async pubsub(): Promise<InternalRelay> { public static async pubsub(): Promise<InternalRelay> {
if (!this._pubsub) { if (!this._pubsub) {
this._pubsub = Promise.resolve(new InternalRelay()); this._pubsub = Promise.resolve(new InternalRelay({ gauge: internalSubscriptionsSizeGauge }));
} }
return this._pubsub; return this._pubsub;
} }
/** Relay pool storage. */ /** Relay pool storage. */
public static async client(): Promise<NPool> { public static async client(): Promise<NPool<NRelay1>> {
if (!this._client) { if (!this._client) {
this._client = (async () => { this._client = (async () => {
const db = await this.db(); const db = await this.db();

View file

@ -59,7 +59,7 @@ class EventsDB extends NPostgres {
} }
/** Insert an event (and its tags) into the database. */ /** Insert an event (and its tags) into the database. */
async event(event: NostrEvent, opts: { signal?: AbortSignal; timeout?: number } = {}): Promise<void> { override async event(event: NostrEvent, opts: { signal?: AbortSignal; timeout?: number } = {}): Promise<void> {
event = purifyEvent(event); event = purifyEvent(event);
this.console.debug('EVENT', JSON.stringify(event)); this.console.debug('EVENT', JSON.stringify(event));
dbEventsCounter.inc({ kind: event.kind }); dbEventsCounter.inc({ kind: event.kind });
@ -72,7 +72,7 @@ class EventsDB extends NPostgres {
try { try {
await super.event(event, { ...opts, timeout: opts.timeout ?? this.opts.timeout }); await super.event(event, { ...opts, timeout: opts.timeout ?? this.opts.timeout });
} catch (e) { } catch (e: any) {
if (e.message === 'Cannot add a deleted event') { if (e.message === 'Cannot add a deleted event') {
throw new RelayError('blocked', 'event deleted by user'); throw new RelayError('blocked', 'event deleted by user');
} else if (e.message === 'Cannot replace an event with an older event') { } else if (e.message === 'Cannot replace an event with an older event') {
@ -144,7 +144,7 @@ class EventsDB extends NPostgres {
} }
} }
protected getFilterQuery(trx: Kysely<NPostgresSchema>, filter: NostrFilter) { protected override getFilterQuery(trx: Kysely<NPostgresSchema>, filter: NostrFilter) {
if (filter.search) { if (filter.search) {
const tokens = NIP50.parseInput(filter.search); const tokens = NIP50.parseInput(filter.search);
@ -172,7 +172,7 @@ class EventsDB extends NPostgres {
} }
/** Get events for filters from the database. */ /** Get events for filters from the database. */
async query( override async query(
filters: NostrFilter[], filters: NostrFilter[],
opts: { signal?: AbortSignal; timeout?: number; limit?: number } = {}, opts: { signal?: AbortSignal; timeout?: number; limit?: number } = {},
): Promise<NostrEvent[]> { ): Promise<NostrEvent[]> {
@ -200,13 +200,13 @@ class EventsDB extends NPostgres {
} }
/** Delete events based on filters from the database. */ /** Delete events based on filters from the database. */
async remove(filters: NostrFilter[], opts: { signal?: AbortSignal; timeout?: number } = {}): Promise<void> { override async remove(filters: NostrFilter[], opts: { signal?: AbortSignal; timeout?: number } = {}): Promise<void> {
this.console.debug('DELETE', JSON.stringify(filters)); this.console.debug('DELETE', JSON.stringify(filters));
return super.remove(filters, { ...opts, timeout: opts.timeout ?? this.opts.timeout }); return super.remove(filters, { ...opts, timeout: opts.timeout ?? this.opts.timeout });
} }
/** Get number of events that would be returned by filters. */ /** Get number of events that would be returned by filters. */
async count( override async count(
filters: NostrFilter[], filters: NostrFilter[],
opts: { signal?: AbortSignal; timeout?: number } = {}, opts: { signal?: AbortSignal; timeout?: number } = {},
): Promise<{ count: number; approximate: any }> { ): Promise<{ count: number; approximate: any }> {
@ -218,7 +218,7 @@ class EventsDB extends NPostgres {
} }
/** Return only the tags that should be indexed. */ /** Return only the tags that should be indexed. */
static indexTags(event: NostrEvent): string[][] { static override indexTags(event: NostrEvent): string[][] {
const tagCounts: Record<string, number> = {}; const tagCounts: Record<string, number> = {};
function getCount(name: string) { function getCount(name: string) {
@ -325,7 +325,7 @@ class EventsDB extends NPostgres {
return filters; return filters;
} }
async transaction(callback: (store: NPostgres, kysely: Kysely<any>) => Promise<void>): Promise<void> { override async transaction(callback: (store: NPostgres, kysely: Kysely<any>) => Promise<void>): Promise<void> {
return super.transaction((store, kysely) => callback(store, kysely as unknown as Kysely<DittoTables>)); return super.transaction((store, kysely) => callback(store, kysely as unknown as Kysely<DittoTables>));
} }
} }

View file

@ -10,10 +10,15 @@ import {
} from '@nostrify/nostrify'; } from '@nostrify/nostrify';
import { Machina } from '@nostrify/nostrify/utils'; import { Machina } from '@nostrify/nostrify/utils';
import { matchFilter } from 'nostr-tools'; import { matchFilter } from 'nostr-tools';
import { Gauge } from 'prom-client';
import { DittoEvent } from '@/interfaces/DittoEvent.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts';
import { purifyEvent } from '@/utils/purify.ts'; import { purifyEvent } from '@/utils/purify.ts';
interface InternalRelayOpts {
gauge?: Gauge;
}
/** /**
* PubSub event store for streaming events within the application. * PubSub event store for streaming events within the application.
* The pipeline should push events to it, then anything in the application can subscribe to it. * The pipeline should push events to it, then anything in the application can subscribe to it.
@ -21,6 +26,8 @@ import { purifyEvent } from '@/utils/purify.ts';
export class InternalRelay implements NRelay { export class InternalRelay implements NRelay {
private subs = new Map<string, { filters: NostrFilter[]; machina: Machina<NostrEvent> }>(); private subs = new Map<string, { filters: NostrFilter[]; machina: Machina<NostrEvent> }>();
constructor(private opts: InternalRelayOpts = {}) {}
async *req( async *req(
filters: NostrFilter[], filters: NostrFilter[],
opts?: { signal?: AbortSignal }, opts?: { signal?: AbortSignal },
@ -31,6 +38,7 @@ export class InternalRelay implements NRelay {
yield ['EOSE', id]; yield ['EOSE', id];
this.subs.set(id, { filters, machina }); this.subs.set(id, { filters, machina });
this.opts.gauge?.set(this.subs.size);
try { try {
for await (const event of machina) { for await (const event of machina) {
@ -38,6 +46,7 @@ export class InternalRelay implements NRelay {
} }
} finally { } finally {
this.subs.delete(id); this.subs.delete(id);
this.opts.gauge?.set(this.subs.size);
} }
} }
@ -70,4 +79,8 @@ export class InternalRelay implements NRelay {
async query(): Promise<NostrEvent[]> { async query(): Promise<NostrEvent[]> {
return []; return [];
} }
async close(): Promise<void> {
return Promise.resolve();
}
} }

View file

@ -385,6 +385,7 @@ async function gatherAuthorStats(
followers_count: Math.max(0, row.followers_count), followers_count: Math.max(0, row.followers_count),
following_count: Math.max(0, row.following_count), following_count: Math.max(0, row.following_count),
notes_count: Math.max(0, row.notes_count), notes_count: Math.max(0, row.notes_count),
search: row.search,
})); }));
} }

View file

@ -106,7 +106,7 @@ export async function updateTrendingTags(
await handleEvent(label, signal); await handleEvent(label, signal);
console.info(`Trending ${l} updated.`); console.info(`Trending ${l} updated.`);
} catch (e) { } catch (e: any) {
console.error(`Error updating trending ${l}: ${e.message}`); console.error(`Error updating trending ${l}: ${e.message}`);
} }
} }

View file

@ -0,0 +1,21 @@
import { SimpleLRU } from '@/utils/SimpleLRU.ts';
import { assertEquals, assertRejects } from '@std/assert';
Deno.test("SimpleLRU doesn't repeat failed calls", async () => {
let calls = 0;
const cache = new SimpleLRU(
// deno-lint-ignore require-await
async () => {
calls++;
throw new Error('gg');
},
{ max: 100 },
);
await assertRejects(() => cache.fetch('foo'));
assertEquals(calls, 1);
await assertRejects(() => cache.fetch('foo'));
assertEquals(calls, 1);
});

View file

@ -1,6 +1,7 @@
// deno-lint-ignore-file ban-types // deno-lint-ignore-file ban-types
import { LRUCache } from 'lru-cache'; import { LRUCache } from 'lru-cache';
import { type Gauge } from 'prom-client';
type FetchFn<K extends {}, V extends {}, O extends {}> = (key: K, opts: O) => Promise<V>; type FetchFn<K extends {}, V extends {}, O extends {}> = (key: K, opts: O) => Promise<V>;
@ -8,6 +9,10 @@ interface FetchFnOpts {
signal?: AbortSignal | null; signal?: AbortSignal | null;
} }
type SimpleLRUOpts<K extends {}, V extends {}> = LRUCache.Options<K, V, void> & {
gauge?: Gauge;
};
export class SimpleLRU< export class SimpleLRU<
K extends {}, K extends {},
V extends {}, V extends {},
@ -15,18 +20,28 @@ export class SimpleLRU<
> { > {
protected cache: LRUCache<K, V, void>; protected cache: LRUCache<K, V, void>;
constructor(fetchFn: FetchFn<K, V, { signal: AbortSignal }>, opts: LRUCache.Options<K, V, void>) { constructor(fetchFn: FetchFn<K, V, { signal: AbortSignal }>, private opts: SimpleLRUOpts<K, V>) {
this.cache = new LRUCache({ this.cache = new LRUCache({
fetchMethod: (key, _staleValue, { signal }) => fetchFn(key, { signal: signal as unknown as AbortSignal }), async fetchMethod(key, _staleValue, { signal }) {
try {
return await fetchFn(key, { signal: signal as unknown as AbortSignal });
} catch {
return null as unknown as V;
}
},
...opts, ...opts,
}); });
} }
async fetch(key: K, opts?: O): Promise<V> { async fetch(key: K, opts?: O): Promise<V> {
const result = await this.cache.fetch(key, opts); const result = await this.cache.fetch(key, opts);
if (result === undefined) {
this.opts.gauge?.set(this.cache.size);
if (result === undefined || result === null) {
throw new Error('SimpleLRU: fetch failed'); throw new Error('SimpleLRU: fetch failed');
} }
return result; return result;
} }

View file

@ -2,8 +2,9 @@ import { DOMParser } from '@b-fuze/deno-dom';
import Debug from '@soapbox/stickynotes/debug'; import Debug from '@soapbox/stickynotes/debug';
import tldts from 'tldts'; import tldts from 'tldts';
import { Conf } from '@/config.ts';
import { cachedFaviconsSizeGauge } from '@/metrics.ts';
import { SimpleLRU } from '@/utils/SimpleLRU.ts'; import { SimpleLRU } from '@/utils/SimpleLRU.ts';
import { Time } from '@/utils/time.ts';
import { fetchWorker } from '@/workers/fetch.ts'; import { fetchWorker } from '@/workers/fetch.ts';
const debug = Debug('ditto:favicon'); const debug = Debug('ditto:favicon');
@ -37,7 +38,7 @@ const faviconCache = new SimpleLRU<string, URL>(
throw new Error(`Favicon not found: ${key}`); throw new Error(`Favicon not found: ${key}`);
}, },
{ max: 500, ttl: Time.hours(1) }, { ...Conf.caches.favicon, gauge: cachedFaviconsSizeGauge },
); );
export { faviconCache }; export { faviconCache };

View file

@ -1,6 +1,7 @@
import { LNURL, LNURLDetails } from '@nostrify/nostrify/ln'; import { LNURL, LNURLDetails } from '@nostrify/nostrify/ln';
import Debug from '@soapbox/stickynotes/debug'; import Debug from '@soapbox/stickynotes/debug';
import { cachedLnurlsSizeGauge } from '@/metrics.ts';
import { SimpleLRU } from '@/utils/SimpleLRU.ts'; import { SimpleLRU } from '@/utils/SimpleLRU.ts';
import { Time } from '@/utils/time.ts'; import { Time } from '@/utils/time.ts';
import { fetchWorker } from '@/workers/fetch.ts'; import { fetchWorker } from '@/workers/fetch.ts';
@ -20,7 +21,7 @@ const lnurlCache = new SimpleLRU<string, LNURLDetails>(
throw e; throw e;
} }
}, },
{ max: 1000, ttl: Time.minutes(30) }, { max: 1000, ttl: Time.minutes(30), gauge: cachedLnurlsSizeGauge },
); );
/** Get an LNURL from a lud06 or lud16. */ /** Get an LNURL from a lud06 or lud16. */

View file

@ -4,9 +4,9 @@ import Debug from '@soapbox/stickynotes/debug';
import tldts from 'tldts'; import tldts from 'tldts';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { cachedNip05sSizeGauge } from '@/metrics.ts';
import { Storages } from '@/storages.ts'; import { Storages } from '@/storages.ts';
import { SimpleLRU } from '@/utils/SimpleLRU.ts'; import { SimpleLRU } from '@/utils/SimpleLRU.ts';
import { Time } from '@/utils/time.ts';
import { Nip05, parseNip05 } from '@/utils.ts'; import { Nip05, parseNip05 } from '@/utils.ts';
import { fetchWorker } from '@/workers/fetch.ts'; import { fetchWorker } from '@/workers/fetch.ts';
@ -43,7 +43,7 @@ const nip05Cache = new SimpleLRU<string, nip19.ProfilePointer>(
throw e; throw e;
} }
}, },
{ max: 500, ttl: Time.hours(1) }, { ...Conf.caches.nip05, gauge: cachedNip05sSizeGauge },
); );
async function localNip05Lookup(store: NStore, localpart: string): Promise<nip19.ProfilePointer | undefined> { async function localNip05Lookup(store: NStore, localpart: string): Promise<nip19.ProfilePointer | undefined> {

View file

@ -36,6 +36,27 @@ Deno.test('parseNoteContent parses mentions with apostrophes', () => {
); );
}); });
Deno.test('parseNoteContent parses mentions with commas', () => {
const { html } = parseNoteContent(
`Sim. Hi nostr:npub1q3sle0kvfsehgsuexttt3ugjd8xdklxfwwkh559wxckmzddywnws6cd26p and nostr:npub1gujeqakgt7fyp6zjggxhyy7ft623qtcaay5lkc8n8gkry4cvnrzqd3f67z, any chance to have Cobrafuma as PWA?`,
[{
id: '0461fcbecc4c3374439932d6b8f11269ccdb7cc973ad7a50ae362db135a474dd',
username: 'alex',
acct: 'alex@gleasonator.dev',
url: 'https://gleasonator.dev/@alex',
}, {
id: '47259076c85f9240e852420d7213c95e95102f1de929fb60f33a2c32570c98c4',
username: 'patrick',
acct: 'patrick@patrickdosreis.com',
url: 'https://gleasonator.dev/@patrick@patrickdosreis.com',
}],
);
assertEquals(
html,
'Sim. Hi <span class="h-card"><a class="u-url mention" href="https://gleasonator.dev/@alex" rel="ugc">@<span>alex@gleasonator.dev</span></a></span> and <span class="h-card"><a class="u-url mention" href="https://gleasonator.dev/@patrick@patrickdosreis.com" rel="ugc">@<span>patrick@patrickdosreis.com</span></a></span>, any chance to have Cobrafuma as PWA?',
);
});
Deno.test("parseNoteContent doesn't parse invalid nostr URIs", () => { Deno.test("parseNoteContent doesn't parse invalid nostr URIs", () => {
const { html } = parseNoteContent('nip19 has URIs like nostr:npub and nostr:nevent, etc.', []); const { html } = parseNoteContent('nip19 has URIs like nostr:npub and nostr:nevent, etc.', []);
assertEquals(html, 'nip19 has URIs like nostr:npub and nostr:nevent, etc.'); assertEquals(html, 'nip19 has URIs like nostr:npub and nostr:nevent, etc.');

View file

@ -6,14 +6,17 @@ import { getPubkeysBySearch } from '@/utils/search.ts';
Deno.test('fuzzy search works', async () => { Deno.test('fuzzy search works', async () => {
await using db = await createTestDB(); await using db = await createTestDB();
await db.kysely.insertInto('author_search').values({ await db.kysely.insertInto('author_stats').values({
pubkey: '47259076c85f9240e852420d7213c95e95102f1de929fb60f33a2c32570c98c4', pubkey: '47259076c85f9240e852420d7213c95e95102f1de929fb60f33a2c32570c98c4',
search: 'patrickReiis patrickdosreis.com', search: 'patrickReiis patrickdosreis.com',
notes_count: 0,
followers_count: 0,
following_count: 0,
}).execute(); }).execute();
assertEquals(await getPubkeysBySearch(db.kysely, { q: 'pat rick', limit: 1, followedPubkeys: new Set() }), new Set()); assertEquals(await getPubkeysBySearch(db.kysely, { q: 'pat rick', limit: 1, followedPubkeys: new Set() }), new Set());
assertEquals( assertEquals(
await getPubkeysBySearch(db.kysely, { q: 'patrick dos reis', limit: 1, followedPubkeys: new Set() }), await getPubkeysBySearch(db.kysely, { q: 'patrick dosreis', limit: 1, followedPubkeys: new Set() }),
new Set([ new Set([
'47259076c85f9240e852420d7213c95e95102f1de929fb60f33a2c32570c98c4', '47259076c85f9240e852420d7213c95e95102f1de929fb60f33a2c32570c98c4',
]), ]),

View file

@ -10,13 +10,14 @@ export async function getPubkeysBySearch(
const { q, limit, followedPubkeys } = opts; const { q, limit, followedPubkeys } = opts;
let query = kysely let query = kysely
.selectFrom('author_search') .selectFrom('author_stats')
.select((eb) => [ .select((eb) => [
'pubkey', 'pubkey',
'search', 'search',
eb.fn('word_similarity', [sql`${q}`, 'search']).as('sml'), eb.fn('word_similarity', [sql`${q}`, 'search']).as('sml'),
]) ])
.where(() => sql`${q} % search`) .where(() => sql`${q} <% search`)
.orderBy(['followers_count desc'])
.orderBy(['sml desc', 'search']) .orderBy(['sml desc', 'search'])
.limit(limit); .limit(limit);

View file

@ -171,7 +171,16 @@ Deno.test('countAuthorStats counts author stats from the database', async () =>
await db.store.event(genEvent({ kind: 1, content: 'yolo' }, sk)); await db.store.event(genEvent({ kind: 1, content: 'yolo' }, sk));
await db.store.event(genEvent({ kind: 3, tags: [['p', pubkey]] })); await db.store.event(genEvent({ kind: 3, tags: [['p', pubkey]] }));
const stats = await countAuthorStats(db.store, pubkey); await db.kysely.insertInto('author_stats').values({
pubkey,
search: 'Yolo Lolo',
notes_count: 0,
followers_count: 0,
following_count: 0,
}).onConflict((oc) => oc.column('pubkey').doUpdateSet({ 'search': 'baka' }))
.execute();
const stats = await countAuthorStats({ store: db.store, pubkey, kysely: db.kysely });
assertEquals(stats!.notes_count, 2); assertEquals(stats!.notes_count, 2);
assertEquals(stats!.followers_count, 1); assertEquals(stats!.followers_count, 1);

View file

@ -194,6 +194,7 @@ export async function updateAuthorStats(
followers_count: 0, followers_count: 0,
following_count: 0, following_count: 0,
notes_count: 0, notes_count: 0,
search: '',
}; };
const prev = await kysely const prev = await kysely
@ -268,20 +269,27 @@ export async function updateEventStats(
/** Calculate author stats from the database. */ /** Calculate author stats from the database. */
export async function countAuthorStats( export async function countAuthorStats(
store: SetRequired<NStore, 'count'>, { pubkey, store }: RefreshAuthorStatsOpts,
pubkey: string,
): Promise<DittoTables['author_stats']> { ): Promise<DittoTables['author_stats']> {
const [{ count: followers_count }, { count: notes_count }, [followList]] = await Promise.all([ const [{ count: followers_count }, { count: notes_count }, [followList], [kind0]] = await Promise.all([
store.count([{ kinds: [3], '#p': [pubkey] }]), store.count([{ kinds: [3], '#p': [pubkey] }]),
store.count([{ kinds: [1], authors: [pubkey] }]), store.count([{ kinds: [1], authors: [pubkey] }]),
store.query([{ kinds: [3], authors: [pubkey], limit: 1 }]), store.query([{ kinds: [3], authors: [pubkey], limit: 1 }]),
store.query([{ kinds: [0], authors: [pubkey], limit: 1 }]),
]); ]);
let search: string = '';
const metadata = n.json().pipe(n.metadata()).catch({}).safeParse(kind0?.content);
if (metadata.success) {
const { name, nip05 } = metadata.data;
search = [name, nip05].filter(Boolean).join(' ').trim();
}
return { return {
pubkey, pubkey,
followers_count, followers_count,
following_count: getTagSet(followList?.tags ?? [], 'p').size, following_count: getTagSet(followList?.tags ?? [], 'p').size,
notes_count, notes_count,
search,
}; };
} }
@ -295,7 +303,7 @@ export interface RefreshAuthorStatsOpts {
export async function refreshAuthorStats( export async function refreshAuthorStats(
{ pubkey, kysely, store }: RefreshAuthorStatsOpts, { pubkey, kysely, store }: RefreshAuthorStatsOpts,
): Promise<DittoTables['author_stats']> { ): Promise<DittoTables['author_stats']> {
const stats = await countAuthorStats(store, pubkey); const stats = await countAuthorStats({ store, pubkey, kysely });
await kysely.insertInto('author_stats') await kysely.insertInto('author_stats')
.values(stats) .values(stats)

View file

@ -5,7 +5,7 @@ import { unfurl } from 'unfurl.js';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { PreviewCard } from '@/entities/PreviewCard.ts'; import { PreviewCard } from '@/entities/PreviewCard.ts';
import { Time } from '@/utils/time.ts'; import { cachedLinkPreviewSizeGauge } from '@/metrics.ts';
import { fetchWorker } from '@/workers/fetch.ts'; import { fetchWorker } from '@/workers/fetch.ts';
const debug = Debug('ditto:unfurl'); const debug = Debug('ditto:unfurl');
@ -54,10 +54,7 @@ async function unfurlCard(url: string, signal: AbortSignal): Promise<PreviewCard
} }
/** TTL cache for preview cards. */ /** TTL cache for preview cards. */
const previewCardCache = new TTLCache<string, Promise<PreviewCard | null>>({ const previewCardCache = new TTLCache<string, Promise<PreviewCard | null>>(Conf.caches.linkPreview);
ttl: Time.hours(12),
max: 500,
});
/** Unfurl card from cache if available, otherwise fetch it. */ /** Unfurl card from cache if available, otherwise fetch it. */
function unfurlCardCached(url: string, signal = AbortSignal.timeout(1000)): Promise<PreviewCard | null> { function unfurlCardCached(url: string, signal = AbortSignal.timeout(1000)): Promise<PreviewCard | null> {
@ -67,6 +64,7 @@ function unfurlCardCached(url: string, signal = AbortSignal.timeout(1000)): Prom
} else { } else {
const card = unfurlCard(url, signal); const card = unfurlCard(url, signal);
previewCardCache.set(url, card); previewCardCache.set(url, card);
cachedLinkPreviewSizeGauge.set(previewCardCache.size);
return card; return card;
} }
} }

View file

@ -3,7 +3,7 @@ import * as Comlink from 'comlink';
import { FetchWorker } from './fetch.worker.ts'; import { FetchWorker } from './fetch.worker.ts';
import './handlers/abortsignal.ts'; import './handlers/abortsignal.ts';
import { fetchCounter } from '@/metrics.ts'; import { fetchResponsesCounter } from '@/metrics.ts';
const worker = new Worker(new URL('./fetch.worker.ts', import.meta.url), { type: 'module' }); const worker = new Worker(new URL('./fetch.worker.ts', import.meta.url), { type: 'module' });
const client = Comlink.wrap<typeof FetchWorker>(worker); const client = Comlink.wrap<typeof FetchWorker>(worker);
@ -23,11 +23,18 @@ const ready = new Promise<void>((resolve) => {
*/ */
const fetchWorker: typeof fetch = async (...args) => { const fetchWorker: typeof fetch = async (...args) => {
await ready; await ready;
const [url, init] = serializeFetchArgs(args); const [url, init] = serializeFetchArgs(args);
const { body, signal, ...rest } = init; const { body, signal, ...rest } = init;
fetchCounter.inc({ method: init.method });
const result = await client.fetch(url, { ...rest, body: await prepareBodyForWorker(body) }, signal); const result = await client.fetch(url, { ...rest, body: await prepareBodyForWorker(body) }, signal);
return new Response(...result); const response = new Response(...result);
const { method } = init;
const { status } = response;
fetchResponsesCounter.inc({ method, status });
return response;
}; };
/** Take arguments to `fetch`, and turn them into something we can send over Comlink. */ /** Take arguments to `fetch`, and turn them into something we can send over Comlink. */

View file

@ -4,6 +4,8 @@ import * as Comlink from 'comlink';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import type { CustomPolicy } from '@/workers/policy.worker.ts'; import type { CustomPolicy } from '@/workers/policy.worker.ts';
import '@/workers/handlers/abortsignal.ts';
const console = new Stickynotes('ditto:policy'); const console = new Stickynotes('ditto:policy');
export const policyWorker = Comlink.wrap<CustomPolicy>( export const policyWorker = Comlink.wrap<CustomPolicy>(
@ -31,7 +33,7 @@ try {
adminPubkey: Conf.pubkey, adminPubkey: Conf.pubkey,
}); });
console.debug(`Using custom policy: ${Conf.policy}`); console.debug(`Using custom policy: ${Conf.policy}`);
} catch (e) { } catch (e: any) {
if (e.message.includes('Module not found')) { if (e.message.includes('Module not found')) {
console.debug('Custom policy not found <https://docs.soapbox.pub/ditto/policies/>'); console.debug('Custom policy not found <https://docs.soapbox.pub/ditto/policies/>');
} else { } else {

View file

@ -1,11 +1,13 @@
import 'deno-safe-fetch/load'; import 'deno-safe-fetch/load';
import { NostrEvent, NostrRelayOK, NPolicy } from '@nostrify/nostrify'; import { NostrEvent, NostrRelayOK, NPolicy } from '@nostrify/nostrify';
import { NoOpPolicy, ReadOnlyPolicy } from '@nostrify/nostrify/policies'; import { NoOpPolicy, ReadOnlyPolicy } from '@nostrify/policies';
import * as Comlink from 'comlink'; import * as Comlink from 'comlink';
import { DittoDB } from '@/db/DittoDB.ts'; import { DittoDB } from '@/db/DittoDB.ts';
import { EventsDB } from '@/storages/EventsDB.ts'; import { EventsDB } from '@/storages/EventsDB.ts';
import '@/workers/handlers/abortsignal.ts';
// @ts-ignore Don't try to access the env from this worker. // @ts-ignore Don't try to access the env from this worker.
Deno.env = new Map<string, string>(); Deno.env = new Map<string, string>();
@ -25,8 +27,8 @@ export class CustomPolicy implements NPolicy {
private policy: NPolicy = new ReadOnlyPolicy(); private policy: NPolicy = new ReadOnlyPolicy();
// deno-lint-ignore require-await // deno-lint-ignore require-await
async call(event: NostrEvent): Promise<NostrRelayOK> { async call(event: NostrEvent, signal?: AbortSignal): Promise<NostrRelayOK> {
return this.policy.call(event); return this.policy.call(event, signal);
} }
async init({ path, cwd, databaseUrl, adminPubkey }: PolicyInit): Promise<void> { async init({ path, cwd, databaseUrl, adminPubkey }: PolicyInit): Promise<void> {
@ -45,7 +47,7 @@ export class CustomPolicy implements NPolicy {
try { try {
const Policy = (await import(path)).default; const Policy = (await import(path)).default;
this.policy = new Policy({ store }); this.policy = new Policy({ store });
} catch (e) { } catch (e: any) {
if (e.message.includes('Module not found')) { if (e.message.includes('Module not found')) {
this.policy = new NoOpPolicy(); this.policy = new NoOpPolicy();
} }