Merge branch 'streaming-metrics' into 'main'

Streaming metrics

See merge request soapbox-pub/ditto!508
This commit is contained in:
Alex Gleason 2024-09-22 03:17:07 +00:00
commit 1be8fcbcdf
4 changed files with 40 additions and 4 deletions

View file

@ -4,7 +4,11 @@ import { z } from 'zod';
import { type AppController } from '@/app.ts'; import { type AppController } from '@/app.ts';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { streamingConnectionsGauge } from '@/metrics.ts'; import {
streamingClientMessagesCounter,
streamingConnectionsGauge,
streamingServerMessagesCounter,
} from '@/metrics.ts';
import { MuteListPolicy } from '@/policies/MuteListPolicy.ts'; import { MuteListPolicy } from '@/policies/MuteListPolicy.ts';
import { getFeedPubkeys } from '@/queries.ts'; import { getFeedPubkeys } from '@/queries.ts';
import { hydrateEvents } from '@/storages/hydrate.ts'; import { hydrateEvents } from '@/storages/hydrate.ts';
@ -61,6 +65,8 @@ const LIMITER_LIMIT = 100;
const limiter = new TTLCache<string, number>(); const limiter = new TTLCache<string, number>();
const connections = new Set<WebSocket>();
const streamingController: AppController = async (c) => { const streamingController: AppController = async (c) => {
const upgrade = c.req.header('upgrade'); const upgrade = c.req.header('upgrade');
const token = c.req.header('sec-websocket-protocol'); const token = c.req.header('sec-websocket-protocol');
@ -94,6 +100,7 @@ const streamingController: AppController = async (c) => {
function send(e: StreamingEvent) { function send(e: StreamingEvent) {
if (socket.readyState === WebSocket.OPEN) { if (socket.readyState === WebSocket.OPEN) {
debug('send', e.event, e.payload); debug('send', e.event, e.payload);
streamingServerMessagesCounter.inc();
socket.send(JSON.stringify(e)); socket.send(JSON.stringify(e));
} }
} }
@ -126,7 +133,8 @@ const streamingController: AppController = async (c) => {
} }
socket.onopen = async () => { socket.onopen = async () => {
streamingConnectionsGauge.inc(); connections.add(socket);
streamingConnectionsGauge.set(connections.size);
if (!stream) return; if (!stream) return;
const topicFilter = await topicToFilter(stream, c.req.query(), pubkey); const topicFilter = await topicToFilter(stream, c.req.query(), pubkey);
@ -169,6 +177,8 @@ const streamingController: AppController = async (c) => {
}; };
socket.onmessage = (e) => { socket.onmessage = (e) => {
streamingClientMessagesCounter.inc();
if (ip) { if (ip) {
const count = limiter.get(ip) ?? 0; const count = limiter.get(ip) ?? 0;
limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW }); limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW });
@ -186,7 +196,8 @@ const streamingController: AppController = async (c) => {
}; };
socket.onclose = () => { socket.onclose = () => {
streamingConnectionsGauge.dec(); connections.delete(socket);
streamingConnectionsGauge.set(connections.size);
controller.abort(); controller.abort();
}; };

View file

@ -17,6 +17,16 @@ export const streamingConnectionsGauge = new Gauge({
help: 'Number of active connections to the streaming API', help: 'Number of active connections to the streaming API',
}); });
export const streamingServerMessagesCounter = new Counter({
name: 'ditto_streaming_server_messages_total',
help: 'Total number of messages sent from the streaming API',
});
export const streamingClientMessagesCounter = new Counter({
name: 'ditto_streaming_client_messages_total',
help: 'Total number of messages received by the streaming API',
});
export const fetchCounter = new Counter({ export const fetchCounter = new Counter({
name: 'ditto_fetch_total', name: 'ditto_fetch_total',
help: 'Total number of fetch requests', help: 'Total number of fetch requests',
@ -104,3 +114,8 @@ export const cachedLinkPreviewSizeGauge = new Gauge({
name: 'ditto_cached_link_previews_size', name: 'ditto_cached_link_previews_size',
help: 'Number of link previews in cache', help: 'Number of link previews in cache',
}); });
export const internalSubscriptionsSizeGauge = new Gauge({
name: 'ditto_internal_subscriptions_size',
help: "Number of active subscriptions to Ditto's internal relay",
});

View file

@ -2,6 +2,7 @@
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { DittoDatabase } from '@/db/DittoDatabase.ts'; import { DittoDatabase } from '@/db/DittoDatabase.ts';
import { DittoDB } from '@/db/DittoDB.ts'; import { DittoDB } from '@/db/DittoDB.ts';
import { internalSubscriptionsSizeGauge } from '@/metrics.ts';
import { AdminStore } from '@/storages/AdminStore.ts'; import { AdminStore } from '@/storages/AdminStore.ts';
import { EventsDB } from '@/storages/EventsDB.ts'; import { EventsDB } from '@/storages/EventsDB.ts';
import { SearchStore } from '@/storages/search-store.ts'; import { SearchStore } from '@/storages/search-store.ts';
@ -61,7 +62,7 @@ export class Storages {
/** Internal pubsub relay between controllers and the pipeline. */ /** Internal pubsub relay between controllers and the pipeline. */
public static async pubsub(): Promise<InternalRelay> { public static async pubsub(): Promise<InternalRelay> {
if (!this._pubsub) { if (!this._pubsub) {
this._pubsub = Promise.resolve(new InternalRelay()); this._pubsub = Promise.resolve(new InternalRelay({ gauge: internalSubscriptionsSizeGauge }));
} }
return this._pubsub; return this._pubsub;
} }

View file

@ -10,10 +10,15 @@ import {
} from '@nostrify/nostrify'; } from '@nostrify/nostrify';
import { Machina } from '@nostrify/nostrify/utils'; import { Machina } from '@nostrify/nostrify/utils';
import { matchFilter } from 'nostr-tools'; import { matchFilter } from 'nostr-tools';
import { Gauge } from 'prom-client';
import { DittoEvent } from '@/interfaces/DittoEvent.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts';
import { purifyEvent } from '@/utils/purify.ts'; import { purifyEvent } from '@/utils/purify.ts';
interface InternalRelayOpts {
gauge?: Gauge;
}
/** /**
* PubSub event store for streaming events within the application. * PubSub event store for streaming events within the application.
* The pipeline should push events to it, then anything in the application can subscribe to it. * The pipeline should push events to it, then anything in the application can subscribe to it.
@ -21,6 +26,8 @@ import { purifyEvent } from '@/utils/purify.ts';
export class InternalRelay implements NRelay { export class InternalRelay implements NRelay {
private subs = new Map<string, { filters: NostrFilter[]; machina: Machina<NostrEvent> }>(); private subs = new Map<string, { filters: NostrFilter[]; machina: Machina<NostrEvent> }>();
constructor(private opts: InternalRelayOpts = {}) {}
async *req( async *req(
filters: NostrFilter[], filters: NostrFilter[],
opts?: { signal?: AbortSignal }, opts?: { signal?: AbortSignal },
@ -31,6 +38,7 @@ export class InternalRelay implements NRelay {
yield ['EOSE', id]; yield ['EOSE', id];
this.subs.set(id, { filters, machina }); this.subs.set(id, { filters, machina });
this.opts.gauge?.set(this.subs.size);
try { try {
for await (const event of machina) { for await (const event of machina) {
@ -38,6 +46,7 @@ export class InternalRelay implements NRelay {
} }
} finally { } finally {
this.subs.delete(id); this.subs.delete(id);
this.opts.gauge?.set(this.subs.size);
} }
} }