diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index aafa9915..4bbe6177 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -4,7 +4,11 @@ import { z } from 'zod'; import { type AppController } from '@/app.ts'; import { Conf } from '@/config.ts'; -import { streamingConnectionsGauge } from '@/metrics.ts'; +import { + streamingClientMessagesCounter, + streamingConnectionsGauge, + streamingServerMessagesCounter, +} from '@/metrics.ts'; import { MuteListPolicy } from '@/policies/MuteListPolicy.ts'; import { getFeedPubkeys } from '@/queries.ts'; import { hydrateEvents } from '@/storages/hydrate.ts'; @@ -61,6 +65,8 @@ const LIMITER_LIMIT = 100; const limiter = new TTLCache(); +const connections = new Set(); + const streamingController: AppController = async (c) => { const upgrade = c.req.header('upgrade'); const token = c.req.header('sec-websocket-protocol'); @@ -94,6 +100,7 @@ const streamingController: AppController = async (c) => { function send(e: StreamingEvent) { if (socket.readyState === WebSocket.OPEN) { debug('send', e.event, e.payload); + streamingServerMessagesCounter.inc(); socket.send(JSON.stringify(e)); } } @@ -126,7 +133,8 @@ const streamingController: AppController = async (c) => { } socket.onopen = async () => { - streamingConnectionsGauge.inc(); + connections.add(socket); + streamingConnectionsGauge.set(connections.size); if (!stream) return; const topicFilter = await topicToFilter(stream, c.req.query(), pubkey); @@ -169,6 +177,8 @@ const streamingController: AppController = async (c) => { }; socket.onmessage = (e) => { + streamingClientMessagesCounter.inc(); + if (ip) { const count = limiter.get(ip) ?? 0; limiter.set(ip, count + 1, { ttl: LIMITER_WINDOW }); @@ -186,7 +196,8 @@ const streamingController: AppController = async (c) => { }; socket.onclose = () => { - streamingConnectionsGauge.dec(); + connections.delete(socket); + streamingConnectionsGauge.set(connections.size); controller.abort(); }; diff --git a/src/metrics.ts b/src/metrics.ts index ccb4d382..1e20747b 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -17,6 +17,16 @@ export const streamingConnectionsGauge = new Gauge({ help: 'Number of active connections to the streaming API', }); +export const streamingServerMessagesCounter = new Counter({ + name: 'ditto_streaming_server_messages_total', + help: 'Total number of messages sent from the streaming API', +}); + +export const streamingClientMessagesCounter = new Counter({ + name: 'ditto_streaming_client_messages_total', + help: 'Total number of messages received by the streaming API', +}); + export const fetchCounter = new Counter({ name: 'ditto_fetch_total', help: 'Total number of fetch requests', @@ -104,3 +114,8 @@ export const cachedLinkPreviewSizeGauge = new Gauge({ name: 'ditto_cached_link_previews_size', help: 'Number of link previews in cache', }); + +export const internalSubscriptionsSizeGauge = new Gauge({ + name: 'ditto_internal_subscriptions_size', + help: "Number of active subscriptions to Ditto's internal relay", +}); diff --git a/src/storages.ts b/src/storages.ts index 073b6135..8c7f2d28 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -2,6 +2,7 @@ import { Conf } from '@/config.ts'; import { DittoDatabase } from '@/db/DittoDatabase.ts'; import { DittoDB } from '@/db/DittoDB.ts'; +import { internalSubscriptionsSizeGauge } from '@/metrics.ts'; import { AdminStore } from '@/storages/AdminStore.ts'; import { EventsDB } from '@/storages/EventsDB.ts'; import { SearchStore } from '@/storages/search-store.ts'; @@ -61,7 +62,7 @@ export class Storages { /** Internal pubsub relay between controllers and the pipeline. */ public static async pubsub(): Promise { if (!this._pubsub) { - this._pubsub = Promise.resolve(new InternalRelay()); + this._pubsub = Promise.resolve(new InternalRelay({ gauge: internalSubscriptionsSizeGauge })); } return this._pubsub; } diff --git a/src/storages/InternalRelay.ts b/src/storages/InternalRelay.ts index 93a480e1..71a13b31 100644 --- a/src/storages/InternalRelay.ts +++ b/src/storages/InternalRelay.ts @@ -10,10 +10,15 @@ import { } from '@nostrify/nostrify'; import { Machina } from '@nostrify/nostrify/utils'; import { matchFilter } from 'nostr-tools'; +import { Gauge } from 'prom-client'; import { DittoEvent } from '@/interfaces/DittoEvent.ts'; import { purifyEvent } from '@/utils/purify.ts'; +interface InternalRelayOpts { + gauge?: Gauge; +} + /** * PubSub event store for streaming events within the application. * The pipeline should push events to it, then anything in the application can subscribe to it. @@ -21,6 +26,8 @@ import { purifyEvent } from '@/utils/purify.ts'; export class InternalRelay implements NRelay { private subs = new Map }>(); + constructor(private opts: InternalRelayOpts = {}) {} + async *req( filters: NostrFilter[], opts?: { signal?: AbortSignal }, @@ -31,6 +38,7 @@ export class InternalRelay implements NRelay { yield ['EOSE', id]; this.subs.set(id, { filters, machina }); + this.opts.gauge?.set(this.subs.size); try { for await (const event of machina) { @@ -38,6 +46,7 @@ export class InternalRelay implements NRelay { } } finally { this.subs.delete(id); + this.opts.gauge?.set(this.subs.size); } }