mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 11:29:46 +00:00
Streaming metrics
This commit is contained in:
parent
323d97e5e0
commit
fc7228e183
4 changed files with 22 additions and 3 deletions
|
|
@ -61,6 +61,8 @@ const LIMITER_LIMIT = 100;
|
||||||
|
|
||||||
const limiter = new TTLCache<string, number>();
|
const limiter = new TTLCache<string, number>();
|
||||||
|
|
||||||
|
const connections = new Set<WebSocket>();
|
||||||
|
|
||||||
const streamingController: AppController = async (c) => {
|
const streamingController: AppController = async (c) => {
|
||||||
const upgrade = c.req.header('upgrade');
|
const upgrade = c.req.header('upgrade');
|
||||||
const token = c.req.header('sec-websocket-protocol');
|
const token = c.req.header('sec-websocket-protocol');
|
||||||
|
|
@ -126,7 +128,8 @@ const streamingController: AppController = async (c) => {
|
||||||
}
|
}
|
||||||
|
|
||||||
socket.onopen = async () => {
|
socket.onopen = async () => {
|
||||||
streamingConnectionsGauge.inc();
|
connections.add(socket);
|
||||||
|
streamingConnectionsGauge.set(connections.size);
|
||||||
|
|
||||||
if (!stream) return;
|
if (!stream) return;
|
||||||
const topicFilter = await topicToFilter(stream, c.req.query(), pubkey);
|
const topicFilter = await topicToFilter(stream, c.req.query(), pubkey);
|
||||||
|
|
@ -186,7 +189,8 @@ const streamingController: AppController = async (c) => {
|
||||||
};
|
};
|
||||||
|
|
||||||
socket.onclose = () => {
|
socket.onclose = () => {
|
||||||
streamingConnectionsGauge.dec();
|
connections.delete(socket);
|
||||||
|
streamingConnectionsGauge.set(connections.size);
|
||||||
controller.abort();
|
controller.abort();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -104,3 +104,8 @@ export const cachedLinkPreviewSizeGauge = new Gauge({
|
||||||
name: 'ditto_cached_link_previews_size',
|
name: 'ditto_cached_link_previews_size',
|
||||||
help: 'Number of link previews in cache',
|
help: 'Number of link previews in cache',
|
||||||
});
|
});
|
||||||
|
|
||||||
|
export const internalSubscriptionsSizeGauge = new Gauge({
|
||||||
|
name: 'ditto_internal_subscriptions_size',
|
||||||
|
help: "Number of active subscriptions to Ditto's internal relay",
|
||||||
|
});
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
import { DittoDatabase } from '@/db/DittoDatabase.ts';
|
import { DittoDatabase } from '@/db/DittoDatabase.ts';
|
||||||
import { DittoDB } from '@/db/DittoDB.ts';
|
import { DittoDB } from '@/db/DittoDB.ts';
|
||||||
|
import { internalSubscriptionsSizeGauge } from '@/metrics.ts';
|
||||||
import { AdminStore } from '@/storages/AdminStore.ts';
|
import { AdminStore } from '@/storages/AdminStore.ts';
|
||||||
import { EventsDB } from '@/storages/EventsDB.ts';
|
import { EventsDB } from '@/storages/EventsDB.ts';
|
||||||
import { SearchStore } from '@/storages/search-store.ts';
|
import { SearchStore } from '@/storages/search-store.ts';
|
||||||
|
|
@ -61,7 +62,7 @@ export class Storages {
|
||||||
/** Internal pubsub relay between controllers and the pipeline. */
|
/** Internal pubsub relay between controllers and the pipeline. */
|
||||||
public static async pubsub(): Promise<InternalRelay> {
|
public static async pubsub(): Promise<InternalRelay> {
|
||||||
if (!this._pubsub) {
|
if (!this._pubsub) {
|
||||||
this._pubsub = Promise.resolve(new InternalRelay());
|
this._pubsub = Promise.resolve(new InternalRelay({ gauge: internalSubscriptionsSizeGauge }));
|
||||||
}
|
}
|
||||||
return this._pubsub;
|
return this._pubsub;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,10 +10,15 @@ import {
|
||||||
} from '@nostrify/nostrify';
|
} from '@nostrify/nostrify';
|
||||||
import { Machina } from '@nostrify/nostrify/utils';
|
import { Machina } from '@nostrify/nostrify/utils';
|
||||||
import { matchFilter } from 'nostr-tools';
|
import { matchFilter } from 'nostr-tools';
|
||||||
|
import { Gauge } from 'prom-client';
|
||||||
|
|
||||||
import { DittoEvent } from '@/interfaces/DittoEvent.ts';
|
import { DittoEvent } from '@/interfaces/DittoEvent.ts';
|
||||||
import { purifyEvent } from '@/utils/purify.ts';
|
import { purifyEvent } from '@/utils/purify.ts';
|
||||||
|
|
||||||
|
interface InternalRelayOpts {
|
||||||
|
gauge?: Gauge;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* PubSub event store for streaming events within the application.
|
* PubSub event store for streaming events within the application.
|
||||||
* The pipeline should push events to it, then anything in the application can subscribe to it.
|
* The pipeline should push events to it, then anything in the application can subscribe to it.
|
||||||
|
|
@ -21,6 +26,8 @@ import { purifyEvent } from '@/utils/purify.ts';
|
||||||
export class InternalRelay implements NRelay {
|
export class InternalRelay implements NRelay {
|
||||||
private subs = new Map<string, { filters: NostrFilter[]; machina: Machina<NostrEvent> }>();
|
private subs = new Map<string, { filters: NostrFilter[]; machina: Machina<NostrEvent> }>();
|
||||||
|
|
||||||
|
constructor(private opts: InternalRelayOpts = {}) {}
|
||||||
|
|
||||||
async *req(
|
async *req(
|
||||||
filters: NostrFilter[],
|
filters: NostrFilter[],
|
||||||
opts?: { signal?: AbortSignal },
|
opts?: { signal?: AbortSignal },
|
||||||
|
|
@ -31,6 +38,7 @@ export class InternalRelay implements NRelay {
|
||||||
yield ['EOSE', id];
|
yield ['EOSE', id];
|
||||||
|
|
||||||
this.subs.set(id, { filters, machina });
|
this.subs.set(id, { filters, machina });
|
||||||
|
this.opts.gauge?.set(this.subs.size);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for await (const event of machina) {
|
for await (const event of machina) {
|
||||||
|
|
@ -38,6 +46,7 @@ export class InternalRelay implements NRelay {
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
this.subs.delete(id);
|
this.subs.delete(id);
|
||||||
|
this.opts.gauge?.set(this.subs.size);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue