From 87c67c0a397d18c9089e733cf9577776744fffe0 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 17 Aug 2023 16:47:22 -0500 Subject: [PATCH 1/9] Add pipeline module --- src/firehose.ts | 54 +++++---------------------------------------- src/pipeline.ts | 58 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 48 deletions(-) create mode 100644 src/pipeline.ts diff --git a/src/firehose.ts b/src/firehose.ts index c717c075..8510a70a 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -1,9 +1,8 @@ -import { insertEvent, isLocallyFollowed } from '@/db/events.ts'; -import { addRelays, getActiveRelays } from '@/db/relays.ts'; -import { findUser } from '@/db/users.ts'; +import { getActiveRelays } from '@/db/relays.ts'; import { type Event, RelayPool } from '@/deps.ts'; -import { trends } from '@/trends.ts'; -import { isRelay, nostrDate, nostrNow } from '@/utils.ts'; +import { nostrNow } from '@/utils.ts'; + +import * as pipeline from './pipeline.ts'; const relays = await getActiveRelays(); const pool = new RelayPool(relays); @@ -20,48 +19,7 @@ pool.subscribe( ); /** Handle events through the firehose pipeline. */ -async function handleEvent(event: Event): Promise { +function handleEvent(event: Event): Promise { console.info(`firehose: Event<${event.kind}> ${event.id}`); - - trackHashtags(event); - trackRelays(event); - - if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) { - insertEvent(event).catch(console.warn); - } -} - -/** Track whenever a hashtag is used, for processing trending tags. */ -function trackHashtags(event: Event): void { - const date = nostrDate(event.created_at); - - const tags = event.tags - .filter((tag) => tag[0] === 't') - .map((tag) => tag[1]) - .slice(0, 5); - - if (!tags.length) return; - - try { - console.info('tracking tags:', tags); - trends.addTagUsages(event.pubkey, tags, date); - } catch (_e) { - // do nothing - } -} - -/** Tracks known relays in the database. */ -function trackRelays(event: Event) { - const relays = new Set<`wss://${string}`>(); - - event.tags.forEach((tag) => { - if (['p', 'e', 'a'].includes(tag[0]) && isRelay(tag[2])) { - relays.add(tag[2]); - } - if (event.kind === 10002 && tag[0] === 'r' && isRelay(tag[1])) { - relays.add(tag[1]); - } - }); - - return addRelays([...relays]); + return pipeline.handleEvent(event); } diff --git a/src/pipeline.ts b/src/pipeline.ts new file mode 100644 index 00000000..b22128b3 --- /dev/null +++ b/src/pipeline.ts @@ -0,0 +1,58 @@ +import { insertEvent, isLocallyFollowed } from '@/db/events.ts'; +import { addRelays } from '@/db/relays.ts'; +import { findUser } from '@/db/users.ts'; +import { type Event } from '@/deps.ts'; +import { trends } from '@/trends.ts'; +import { isRelay, nostrDate } from '@/utils.ts'; + +/** + * Common pipeline function to process (and maybe store) events. + * It is idempotent, so it can be called multiple times for the same event. + */ +async function handleEvent(event: Event): Promise { + console.info(`firehose: Event<${event.kind}> ${event.id}`); + + trackHashtags(event); + trackRelays(event); + + if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) { + insertEvent(event).catch(console.warn); + } +} + +/** Track whenever a hashtag is used, for processing trending tags. */ +function trackHashtags(event: Event): void { + const date = nostrDate(event.created_at); + + const tags = event.tags + .filter((tag) => tag[0] === 't') + .map((tag) => tag[1]) + .slice(0, 5); + + if (!tags.length) return; + + try { + console.info('tracking tags:', tags); + trends.addTagUsages(event.pubkey, tags, date); + } catch (_e) { + // do nothing + } +} + +/** Tracks known relays in the database. */ +function trackRelays(event: Event) { + const relays = new Set<`wss://${string}`>(); + + event.tags.forEach((tag) => { + if (['p', 'e', 'a'].includes(tag[0]) && isRelay(tag[2])) { + relays.add(tag[2]); + } + if (event.kind === 10002 && tag[0] === 'r' && isRelay(tag[1])) { + relays.add(tag[1]); + } + }); + + return addRelays([...relays]); +} + +export { handleEvent }; From 51881efde0ac3808aab9999d76a0b3d80be28c3a Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 17 Aug 2023 16:48:58 -0500 Subject: [PATCH 2/9] Push events from the client through the pipeline --- src/client.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/client.ts b/src/client.ts index abb5fdce..a99ce8ee 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,5 +1,6 @@ import { Conf } from '@/config.ts'; import { type Event, type Filter, matchFilters, RelayPool, TTLCache } from '@/deps.ts'; +import * as pipeline from '@/pipeline.ts'; import { Time } from '@/utils.ts'; import type { GetFiltersOpts } from '@/types.ts'; @@ -37,6 +38,7 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts Conf.poolRelays, (event: Event | null) => { if (event && matchFilters(filters, event)) { + pipeline.handleEvent(event); results.push({ id: event.id, kind: event.kind, From 45abaf14a426f8fab1ecc916e8e04c7247a79680 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 17 Aug 2023 18:07:25 -0500 Subject: [PATCH 3/9] pipeline: refactor, use pipeline from relay --- src/controllers/nostr/relay.ts | 8 ++++---- src/pipeline.ts | 19 +++++++++++++------ 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 24316e7c..1537047d 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -1,5 +1,5 @@ import * as eventsDB from '@/db/events.ts'; -import { findUser } from '@/db/users.ts'; +import * as pipeline from '@/pipeline.ts'; import { jsonSchema } from '@/schema.ts'; import { type ClientCLOSE, @@ -53,10 +53,10 @@ function connectStream(socket: WebSocket) { } async function handleEvent([_, event]: ClientEVENT) { - if (await findUser({ pubkey: event.pubkey })) { - eventsDB.insertEvent(event); + try { + await pipeline.handleEvent(event); send(['OK', event.id, true, '']); - } else { + } catch (_e) { send(['OK', event.id, false, 'blocked: only registered users can post']); } } diff --git a/src/pipeline.ts b/src/pipeline.ts index b22128b3..95f0a82e 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,4 +1,4 @@ -import { insertEvent, isLocallyFollowed } from '@/db/events.ts'; +import * as eventsDB from '@/db/events.ts'; import { addRelays } from '@/db/relays.ts'; import { findUser } from '@/db/users.ts'; import { type Event } from '@/deps.ts'; @@ -12,16 +12,23 @@ import { isRelay, nostrDate } from '@/utils.ts'; async function handleEvent(event: Event): Promise { console.info(`firehose: Event<${event.kind}> ${event.id}`); - trackHashtags(event); - trackRelays(event); + await Promise.all([ + trackHashtags(event), + storeEvent(event), + trackRelays(event), + ]); +} - if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) { - insertEvent(event).catch(console.warn); +/** Maybe store the event, if eligible. */ +async function storeEvent(event: Event): Promise { + if (await findUser({ pubkey: event.pubkey }) || await eventsDB.isLocallyFollowed(event.pubkey)) { + await eventsDB.insertEvent(event).catch(console.warn); } } /** Track whenever a hashtag is used, for processing trending tags. */ -function trackHashtags(event: Event): void { +// deno-lint-ignore require-await +async function trackHashtags(event: Event): Promise { const date = nostrDate(event.created_at); const tags = event.tags From 2011ca6e1dd1b6eb4dd2b59b98f78c5f7e0bf4e4 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 17 Aug 2023 18:08:05 -0500 Subject: [PATCH 4/9] pipeline: remove redundant console.log --- src/pipeline.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/pipeline.ts b/src/pipeline.ts index 95f0a82e..3e2306f5 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -10,8 +10,6 @@ import { isRelay, nostrDate } from '@/utils.ts'; * It is idempotent, so it can be called multiple times for the same event. */ async function handleEvent(event: Event): Promise { - console.info(`firehose: Event<${event.kind}> ${event.id}`); - await Promise.all([ trackHashtags(event), storeEvent(event), From dc49c305bd29c895762dee5215cd33ecfb263826 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 17 Aug 2023 19:32:05 -0500 Subject: [PATCH 5/9] Move isLocallyFollowed to queries.ts --- src/db/events.ts | 14 +------------- src/pipeline.ts | 3 ++- src/queries.ts | 12 +++++++++--- 3 files changed, 12 insertions(+), 17 deletions(-) diff --git a/src/db/events.ts b/src/db/events.ts index cb04b235..7910a400 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -119,16 +119,4 @@ async function getFilters( )); } -/** Returns whether the pubkey is followed by a local user. */ -async function isLocallyFollowed(pubkey: string): Promise { - return Boolean( - await getFilterQuery({ - kinds: [3], - '#p': [pubkey], - limit: 1, - local: true, - }).executeTakeFirst(), - ); -} - -export { getFilters, insertEvent, isLocallyFollowed }; +export { getFilters, insertEvent }; diff --git a/src/pipeline.ts b/src/pipeline.ts index 3e2306f5..00d3eb9b 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -2,6 +2,7 @@ import * as eventsDB from '@/db/events.ts'; import { addRelays } from '@/db/relays.ts'; import { findUser } from '@/db/users.ts'; import { type Event } from '@/deps.ts'; +import { isLocallyFollowed } from '@/queries.ts'; import { trends } from '@/trends.ts'; import { isRelay, nostrDate } from '@/utils.ts'; @@ -19,7 +20,7 @@ async function handleEvent(event: Event): Promise { /** Maybe store the event, if eligible. */ async function storeEvent(event: Event): Promise { - if (await findUser({ pubkey: event.pubkey }) || await eventsDB.isLocallyFollowed(event.pubkey)) { + if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) { await eventsDB.insertEvent(event).catch(console.warn); } } diff --git a/src/queries.ts b/src/queries.ts index 7d03c308..0f4b3ec8 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -1,8 +1,8 @@ +import * as eventsDB from '@/db/events.ts'; import { type Event, type Filter, findReplyTag } from '@/deps.ts'; +import * as mixer from '@/mixer.ts'; import { type PaginationParams } from '@/utils.ts'; -import * as mixer from './mixer.ts'; - interface GetEventOpts { /** Timeout in milliseconds. */ timeout?: number; @@ -83,4 +83,10 @@ function getDescendants(eventId: string): Promise[]> { return mixer.getFilters([{ kinds: [1], '#e': [eventId] }], { limit: 200, timeout: 2000 }); } -export { getAncestors, getAuthor, getDescendants, getEvent, getFeed, getFollows, getPublicFeed }; +/** Returns whether the pubkey is followed by a local user. */ +async function isLocallyFollowed(pubkey: string): Promise { + const [event] = await eventsDB.getFilters([{ kinds: [3], '#p': [pubkey], local: true }], { limit: 1 }); + return Boolean(event); +} + +export { getAncestors, getAuthor, getDescendants, getEvent, getFeed, getFollows, getPublicFeed, isLocallyFollowed }; From 0f70f5c76fafc121703a41adb86fb29ea5158126 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 17 Aug 2023 20:24:16 -0500 Subject: [PATCH 6/9] Add a RelayError class, improve relay error handling --- src/controllers/nostr/relay.ts | 16 ++++++++++++++-- src/pipeline.ts | 13 +++++++++++-- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 1537047d..8befa566 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -15,12 +15,14 @@ import type { Event, Filter } from '@/deps.ts'; /** Limit of events returned per-filter. */ const FILTER_LIMIT = 100; +/** NIP-01 relay to client message. */ type RelayMsg = | ['EVENT', string, Event] | ['NOTICE', string] | ['EOSE', string] | ['OK', string, boolean, string]; +/** Set up the Websocket connection. */ function connectStream(socket: WebSocket) { socket.onmessage = (e) => { const result = jsonSchema.pipe(clientMsgSchema).safeParse(e.data); @@ -31,6 +33,7 @@ function connectStream(socket: WebSocket) { } }; + /** Handle client message. */ function handleMsg(msg: ClientMsg) { switch (msg[0]) { case 'REQ': @@ -45,6 +48,7 @@ function connectStream(socket: WebSocket) { } } + /** Handle REQ. Start a subscription. */ async function handleReq([_, sub, ...filters]: ClientREQ) { for (const event of await eventsDB.getFilters(prepareFilters(filters))) { send(['EVENT', sub, event]); @@ -52,20 +56,28 @@ function connectStream(socket: WebSocket) { send(['EOSE', sub]); } + /** Handle EVENT. Store the event. */ async function handleEvent([_, event]: ClientEVENT) { try { + // This will store it (if eligible) and run other side-effects. await pipeline.handleEvent(event); send(['OK', event.id, true, '']); - } catch (_e) { - send(['OK', event.id, false, 'blocked: only registered users can post']); + } catch (e) { + if (e instanceof pipeline.RelayError) { + send(['OK', event.id, false, e.message]); + } else { + send(['OK', event.id, false, 'error: something went wrong']); + } } } + /** Handle CLOSE. Close the subscription. */ function handleClose([_, _sub]: ClientCLOSE) { // TODO: ??? return; } + /** Send a message back to the client. */ function send(msg: RelayMsg) { return socket.send(JSON.stringify(msg)); } diff --git a/src/pipeline.ts b/src/pipeline.ts index 00d3eb9b..af72b794 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -12,9 +12,9 @@ import { isRelay, nostrDate } from '@/utils.ts'; */ async function handleEvent(event: Event): Promise { await Promise.all([ - trackHashtags(event), storeEvent(event), trackRelays(event), + trackHashtags(event), ]); } @@ -22,6 +22,8 @@ async function handleEvent(event: Event): Promise { async function storeEvent(event: Event): Promise { if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) { await eventsDB.insertEvent(event).catch(console.warn); + } else { + return Promise.reject(new RelayError('blocked', 'only registered users can post')); } } @@ -61,4 +63,11 @@ function trackRelays(event: Event) { return addRelays([...relays]); } -export { handleEvent }; +/** NIP-20 command line result. */ +class RelayError extends Error { + constructor(prefix: 'duplicate' | 'pow' | 'blocked' | 'rate-limited' | 'invalid' | 'error', message: string) { + super(`${prefix}: ${message}`); + } +} + +export { handleEvent, RelayError }; From 85345bc1576c6a1d82e6df3d69b107e57613648f Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 17 Aug 2023 20:28:23 -0500 Subject: [PATCH 7/9] firehose: catch pipeline crash --- src/firehose.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/firehose.ts b/src/firehose.ts index 8510a70a..57cc1022 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -21,5 +21,8 @@ pool.subscribe( /** Handle events through the firehose pipeline. */ function handleEvent(event: Event): Promise { console.info(`firehose: Event<${event.kind}> ${event.id}`); - return pipeline.handleEvent(event); + + return pipeline + .handleEvent(event) + .catch(() => {}); } From 0158a6979ea7337dd356f02dc3a6effd6118022c Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 17 Aug 2023 20:45:50 -0500 Subject: [PATCH 8/9] Publish events through pipeline --- src/controllers/api/accounts.ts | 10 +++++++++- src/controllers/api/statuses.ts | 17 +++++++++++++++-- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/src/controllers/api/accounts.ts b/src/controllers/api/accounts.ts index 544447b2..96b3ac28 100644 --- a/src/controllers/api/accounts.ts +++ b/src/controllers/api/accounts.ts @@ -2,11 +2,13 @@ import { type AppController } from '@/app.ts'; import { type Filter, findReplyTag, z } from '@/deps.ts'; import { publish } from '@/client.ts'; import * as mixer from '@/mixer.ts'; +import * as pipeline from '@/pipeline.ts'; import { getAuthor, getFollows } from '@/queries.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { signEvent } from '@/sign.ts'; import { toAccount, toStatus } from '@/transformers/nostr-to-mastoapi.ts'; import { buildLinkHeader, eventDateComparator, lookupAccount, nostrNow, paginationSchema, parseBody } from '@/utils.ts'; +import pipe from 'https://deno.land/x/ramda@v0.27.2/source/pipe.js'; const createAccountController: AppController = (c) => { return c.json({ error: 'Please log in with Nostr.' }, 405); @@ -167,7 +169,13 @@ const updateCredentialsController: AppController = async (c) => { created_at: nostrNow(), }, c); - publish(event); + try { + await pipeline.handleEvent(event); + } catch (e) { + if (e instanceof pipeline.RelayError) { + return c.json({ error: e.message }, 422); + } + } const account = await toAccount(event); return c.json(account); diff --git a/src/controllers/api/statuses.ts b/src/controllers/api/statuses.ts index 2f594ac7..2a78f22e 100644 --- a/src/controllers/api/statuses.ts +++ b/src/controllers/api/statuses.ts @@ -1,6 +1,7 @@ import { type AppController } from '@/app.ts'; import { publish } from '@/client.ts'; import { ISO6391, Kind, z } from '@/deps.ts'; +import * as pipeline from '@/pipeline.ts'; import { getAncestors, getDescendants, getEvent } from '@/queries.ts'; import { signEvent } from '@/sign.ts'; import { toStatus } from '@/transformers/nostr-to-mastoapi.ts'; @@ -77,7 +78,13 @@ const createStatusController: AppController = async (c) => { created_at: nostrNow(), }, c); - publish(event); + try { + await pipeline.handleEvent(event); + } catch (e) { + if (e instanceof pipeline.RelayError) { + return c.json({ error: e.message }, 422); + } + } return c.json(await toStatus(event)); } else { @@ -118,7 +125,13 @@ const favouriteController: AppController = async (c) => { created_at: nostrNow(), }, c); - publish(event); + try { + await pipeline.handleEvent(event); + } catch (e) { + if (e instanceof pipeline.RelayError) { + return c.json({ error: e.message }, 422); + } + } const status = await toStatus(target); From 719b703e7cb42092e56da8199b59301e42e8d8ba Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Thu, 17 Aug 2023 20:54:11 -0500 Subject: [PATCH 9/9] client: remove `publish` function, create statuses through pipeline --- src/client.ts | 14 ++------------ src/controllers/api/accounts.ts | 2 -- src/controllers/api/statuses.ts | 1 - 3 files changed, 2 insertions(+), 15 deletions(-) diff --git a/src/client.ts b/src/client.ts index a99ce8ee..e3361163 100644 --- a/src/client.ts +++ b/src/client.ts @@ -38,7 +38,7 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts Conf.poolRelays, (event: Event | null) => { if (event && matchFilters(filters, event)) { - pipeline.handleEvent(event); + pipeline.handleEvent(event).catch(() => {}); results.push({ id: event.id, kind: event.kind, @@ -72,14 +72,4 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts }); } -/** Publish an event to the Nostr relay. */ -function publish(event: Event, relays = Conf.publishRelays): void { - console.log('Publishing event', event, relays); - try { - getPool().publish(event, relays); - } catch (e) { - console.error(e); - } -} - -export { getFilters, publish }; +export { getFilters }; diff --git a/src/controllers/api/accounts.ts b/src/controllers/api/accounts.ts index 96b3ac28..da464efe 100644 --- a/src/controllers/api/accounts.ts +++ b/src/controllers/api/accounts.ts @@ -1,6 +1,5 @@ import { type AppController } from '@/app.ts'; import { type Filter, findReplyTag, z } from '@/deps.ts'; -import { publish } from '@/client.ts'; import * as mixer from '@/mixer.ts'; import * as pipeline from '@/pipeline.ts'; import { getAuthor, getFollows } from '@/queries.ts'; @@ -8,7 +7,6 @@ import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { signEvent } from '@/sign.ts'; import { toAccount, toStatus } from '@/transformers/nostr-to-mastoapi.ts'; import { buildLinkHeader, eventDateComparator, lookupAccount, nostrNow, paginationSchema, parseBody } from '@/utils.ts'; -import pipe from 'https://deno.land/x/ramda@v0.27.2/source/pipe.js'; const createAccountController: AppController = (c) => { return c.json({ error: 'Please log in with Nostr.' }, 405); diff --git a/src/controllers/api/statuses.ts b/src/controllers/api/statuses.ts index 2a78f22e..aae7f6cc 100644 --- a/src/controllers/api/statuses.ts +++ b/src/controllers/api/statuses.ts @@ -1,5 +1,4 @@ import { type AppController } from '@/app.ts'; -import { publish } from '@/client.ts'; import { ISO6391, Kind, z } from '@/deps.ts'; import * as pipeline from '@/pipeline.ts'; import { getAncestors, getDescendants, getEvent } from '@/queries.ts';