webpush: add metrics and pipeline error handling

This commit is contained in:
Alex Gleason 2024-10-14 18:06:22 -05:00
parent 462f4ad786
commit 95d970d8d0
No known key found for this signature in database
GPG key ID: 7211D1F99744FBB7
2 changed files with 20 additions and 7 deletions

View file

@ -141,3 +141,9 @@ export const relayPoolSubscriptionsSizeGauge = new Gauge({
name: 'ditto_relay_pool_subscriptions_size', name: 'ditto_relay_pool_subscriptions_size',
help: 'Number of active subscriptions to the relay pool', help: 'Number of active subscriptions to the relay pool',
}); });
export const webPushNotificationsCounter = new Counter({
name: 'ditto_web_push_notifications_total',
help: 'Total number of Web Push notifications sent',
labelNames: ['type'],
});

View file

@ -9,7 +9,7 @@ import { Conf } from '@/config.ts';
import { DittoTables } from '@/db/DittoTables.ts'; import { DittoTables } from '@/db/DittoTables.ts';
import { DittoPush } from '@/DittoPush.ts'; import { DittoPush } from '@/DittoPush.ts';
import { DittoEvent } from '@/interfaces/DittoEvent.ts'; import { DittoEvent } from '@/interfaces/DittoEvent.ts';
import { pipelineEventsCounter, policyEventsCounter } from '@/metrics.ts'; import { pipelineEventsCounter, policyEventsCounter, webPushNotificationsCounter } from '@/metrics.ts';
import { RelayError } from '@/RelayError.ts'; import { RelayError } from '@/RelayError.ts';
import { AdminSigner } from '@/signers/AdminSigner.ts'; import { AdminSigner } from '@/signers/AdminSigner.ts';
import { hydrateEvents } from '@/storages/hydrate.ts'; import { hydrateEvents } from '@/storages/hydrate.ts';
@ -67,15 +67,21 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
try { try {
await storeEvent(purifyEvent(event), signal); await storeEvent(purifyEvent(event), signal);
await Promise.all([ } finally {
// This needs to run in steps, and should not block the API from responding.
Promise.all([
handleZaps(kysely, event), handleZaps(kysely, event),
parseMetadata(event, signal), parseMetadata(event, signal),
setLanguage(event), setLanguage(event),
]); generateSetEvents(event),
} finally { ])
await generateSetEvents(event); .then(() =>
await streamOut(event); Promise.all([
await webPush(event); streamOut(event),
webPush(event),
])
)
.catch(console.warn);
} }
} }
@ -280,6 +286,7 @@ async function webPush(event: NostrEvent): Promise<void> {
}; };
await DittoPush.push(subscription, message); await DittoPush.push(subscription, message);
webPushNotificationsCounter.inc({ type: notification.type });
} }
} }