From fc7228e183174ab83e9813417c1fa1eb6c022238 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sat, 21 Sep 2024 21:40:52 -0500 Subject: [PATCH 1/2] Streaming metrics --- src/controllers/api/streaming.ts | 8 ++++++-- src/metrics.ts | 5 +++++ src/storages.ts | 3 ++- src/storages/InternalRelay.ts | 9 +++++++++ 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index aafa9915..31e8a398 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -61,6 +61,8 @@ const LIMITER_LIMIT = 100; const limiter = new TTLCache(); +const connections = new Set(); + const streamingController: AppController = async (c) => { const upgrade = c.req.header('upgrade'); const token = c.req.header('sec-websocket-protocol'); @@ -126,7 +128,8 @@ const streamingController: AppController = async (c) => { } socket.onopen = async () => { - streamingConnectionsGauge.inc(); + connections.add(socket); + streamingConnectionsGauge.set(connections.size); if (!stream) return; const topicFilter = await topicToFilter(stream, c.req.query(), pubkey); @@ -186,7 +189,8 @@ const streamingController: AppController = async (c) => { }; socket.onclose = () => { - streamingConnectionsGauge.dec(); + connections.delete(socket); + streamingConnectionsGauge.set(connections.size); controller.abort(); }; diff --git a/src/metrics.ts b/src/metrics.ts index ccb4d382..8d1a6531 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -104,3 +104,8 @@ export const cachedLinkPreviewSizeGauge = new Gauge({ name: 'ditto_cached_link_previews_size', help: 'Number of link previews in cache', }); + +export const internalSubscriptionsSizeGauge = new Gauge({ + name: 'ditto_internal_subscriptions_size', + help: "Number of active subscriptions to Ditto's internal relay", +}); diff --git a/src/storages.ts b/src/storages.ts index 073b6135..8c7f2d28 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -2,6 +2,7 @@ import { Conf } from '@/config.ts'; import { DittoDatabase } from '@/db/DittoDatabase.ts'; import { DittoDB } from '@/db/DittoDB.ts'; +import { internalSubscriptionsSizeGauge } from '@/metrics.ts'; import { AdminStore } from '@/storages/AdminStore.ts'; import { EventsDB } from '@/storages/EventsDB.ts'; import { SearchStore } from '@/storages/search-store.ts'; @@ -61,7 +62,7 @@ export class Storages { /** Internal pubsub relay between controllers and the pipeline. */ public static async pubsub(): Promise { if (!this._pubsub) { - this._pubsub = Promise.resolve(new InternalRelay()); + this._pubsub = Promise.resolve(new InternalRelay({ gauge: internalSubscriptionsSizeGauge })); } return this._pubsub; } diff --git a/src/storages/InternalRelay.ts b/src/storages/InternalRelay.ts index 93a480e1..71a13b31 100644 --- a/src/storages/InternalRelay.ts +++ b/src/storages/InternalRelay.ts @@ -10,10 +10,15 @@ import { } from '@nostrify/nostrify'; import { Machina } from '@nostrify/nostrify/utils'; import { matchFilter } from 'nostr-tools'; +import { Gauge } from 'prom-client'; import { DittoEvent } from '@/interfaces/DittoEvent.ts'; import { purifyEvent } from '@/utils/purify.ts'; +interface InternalRelayOpts { + gauge?: Gauge; +} + /** * PubSub event store for streaming events within the application. * The pipeline should push events to it, then anything in the application can subscribe to it. @@ -21,6 +26,8 @@ import { purifyEvent } from '@/utils/purify.ts'; export class InternalRelay implements NRelay { private subs = new Map }>(); + constructor(private opts: InternalRelayOpts = {}) {} + async *req( filters: NostrFilter[], opts?: { signal?: AbortSignal }, @@ -31,6 +38,7 @@ export class InternalRelay implements NRelay { yield ['EOSE', id]; this.subs.set(id, { filters, machina }); + this.opts.gauge?.set(this.subs.size); try { for await (const event of machina) { @@ -38,6 +46,7 @@ export class InternalRelay implements NRelay { } } finally { this.subs.delete(id); + this.opts.gauge?.set(this.subs.size); } } From 195cf9f44e9000c41fbd4d0e81360879eff93639 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Sat, 21 Sep 2024 21:44:24 -0500 Subject: [PATCH 2/2] metrics: add messages sent and received by streaming API --- src/controllers/api/streaming.ts | 9 ++++++++- src/metrics.ts | 10 ++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 31e8a398..4bbe6177 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -4,7 +4,11 @@ import { z } from 'zod'; import { type AppController } from '@/app.ts'; import { Conf } from '@/config.ts'; -import { streamingConnectionsGauge } from '@/metrics.ts'; +import { + streamingClientMessagesCounter, + streamingConnectionsGauge, + streamingServerMessagesCounter, +} from '@/metrics.ts'; import { MuteListPolicy } from '@/policies/MuteListPolicy.ts'; import { getFeedPubkeys } from '@/queries.ts'; import { hydrateEvents } from '@/storages/hydrate.ts'; @@ -96,6 +100,7 @@ const streamingController: AppController = async (c) => { function send(e: StreamingEvent) { if (socket.readyState === WebSocket.OPEN) { debug('send', e.event, e.payload); + streamingServerMessagesCounter.inc(); socket.send(JSON.stringify(e)); } } @@ -172,6 +177,8 @@ const streamingController: AppController = async (c) => { }; socket.onmessage = (e) => { + streamingClientMessagesCounter.inc(); + if (ip) { const count = limiter.get(ip) ?? 0; limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW }); diff --git a/src/metrics.ts b/src/metrics.ts index 8d1a6531..1e20747b 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -17,6 +17,16 @@ export const streamingConnectionsGauge = new Gauge({ help: 'Number of active connections to the streaming API', }); +export const streamingServerMessagesCounter = new Counter({ + name: 'ditto_streaming_server_messages_total', + help: 'Total number of messages sent from the streaming API', +}); + +export const streamingClientMessagesCounter = new Counter({ + name: 'ditto_streaming_client_messages_total', + help: 'Total number of messages received by the streaming API', +}); + export const fetchCounter = new Counter({ name: 'ditto_fetch_total', help: 'Total number of fetch requests',