diff --git a/src/controllers/api/streaming.ts b/src/controllers/api/streaming.ts index 9a5b5deb..047aa573 100644 --- a/src/controllers/api/streaming.ts +++ b/src/controllers/api/streaming.ts @@ -38,6 +38,25 @@ const streamSchema = z.enum([ type Stream = z.infer; +/** https://docs.joinmastodon.org/methods/streaming/#events-11 */ +interface StreamingEvent { + /** https://docs.joinmastodon.org/methods/streaming/#events */ + event: + | 'update' + | 'delete' + | 'notification' + | 'filters_changed' + | 'conversation' + | 'announcement' + | 'announcement.reaction' + | 'announcement.delete' + | 'status.update' + | 'encrypted_message' + | 'notifications_merged'; + payload: string; + stream: Stream[]; +} + const LIMITER_WINDOW = Time.minutes(5); const LIMITER_LIMIT = 100; @@ -73,18 +92,14 @@ const streamingController: AppController = async (c) => { const policy = pubkey ? new MuteListPolicy(pubkey, await Storages.admin()) : undefined; - function send(name: string, payload: object) { + function send(e: StreamingEvent) { if (socket.readyState === WebSocket.OPEN) { - debug('send', name, JSON.stringify(payload)); - socket.send(JSON.stringify({ - event: name, - payload: JSON.stringify(payload), - stream: [stream], - })); + debug('send', e.event, e.payload); + socket.send(JSON.stringify(e)); } } - async function sub(type: string, filters: NostrFilter[], render: (event: NostrEvent) => Promise) { + async function sub(filters: NostrFilter[], render: (event: NostrEvent) => Promise) { try { for await (const msg of pubsub.req(filters, { signal: controller.signal })) { if (msg[0] === 'EVENT') { @@ -102,7 +117,7 @@ const streamingController: AppController = async (c) => { const result = await render(event); if (result) { - send(type, result); + send(result); } } } @@ -118,19 +133,37 @@ const streamingController: AppController = async (c) => { const topicFilter = await topicToFilter(stream, c.req.query(), pubkey); if (topicFilter) { - sub('update', [topicFilter], async (event) => { + sub([topicFilter], async (event) => { + let payload: object | undefined; + if (event.kind === 1) { - return await renderStatus(event, { viewerPubkey: pubkey }); + payload = await renderStatus(event, { viewerPubkey: pubkey }); } if (event.kind === 6) { - return await renderReblog(event, { viewerPubkey: pubkey }); + payload = await renderReblog(event, { viewerPubkey: pubkey }); + } + + if (payload) { + return { + event: 'update', + payload: JSON.stringify(payload), + stream: [stream], + }; } }); } if (['user', 'user:notification'].includes(stream) && pubkey) { - sub('notification', [{ '#p': [pubkey] }], async (event) => { - return await renderNotification(event, { viewerPubkey: pubkey }); + sub([{ '#p': [pubkey] }], async (event) => { + if (event.pubkey === pubkey) return; // skip own events + const payload = await renderNotification(event, { viewerPubkey: pubkey }); + if (payload) { + return { + event: 'notification', + payload: JSON.stringify(payload), + stream: [stream], + }; + } }); return; }