diff --git a/src/client.ts b/src/client.ts index abb5fdce..e3361163 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,5 +1,6 @@ import { Conf } from '@/config.ts'; import { type Event, type Filter, matchFilters, RelayPool, TTLCache } from '@/deps.ts'; +import * as pipeline from '@/pipeline.ts'; import { Time } from '@/utils.ts'; import type { GetFiltersOpts } from '@/types.ts'; @@ -37,6 +38,7 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts Conf.poolRelays, (event: Event | null) => { if (event && matchFilters(filters, event)) { + pipeline.handleEvent(event).catch(() => {}); results.push({ id: event.id, kind: event.kind, @@ -70,14 +72,4 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts }); } -/** Publish an event to the Nostr relay. */ -function publish(event: Event, relays = Conf.publishRelays): void { - console.log('Publishing event', event, relays); - try { - getPool().publish(event, relays); - } catch (e) { - console.error(e); - } -} - -export { getFilters, publish }; +export { getFilters }; diff --git a/src/controllers/api/accounts.ts b/src/controllers/api/accounts.ts index 544447b2..da464efe 100644 --- a/src/controllers/api/accounts.ts +++ b/src/controllers/api/accounts.ts @@ -1,7 +1,7 @@ import { type AppController } from '@/app.ts'; import { type Filter, findReplyTag, z } from '@/deps.ts'; -import { publish } from '@/client.ts'; import * as mixer from '@/mixer.ts'; +import * as pipeline from '@/pipeline.ts'; import { getAuthor, getFollows } from '@/queries.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { signEvent } from '@/sign.ts'; @@ -167,7 +167,13 @@ const updateCredentialsController: AppController = async (c) => { created_at: nostrNow(), }, c); - publish(event); + try { + await pipeline.handleEvent(event); + } catch (e) { + if (e instanceof pipeline.RelayError) { + return c.json({ error: e.message }, 422); + } + } const account = await toAccount(event); return c.json(account); diff --git a/src/controllers/api/statuses.ts b/src/controllers/api/statuses.ts index 2f594ac7..aae7f6cc 100644 --- a/src/controllers/api/statuses.ts +++ b/src/controllers/api/statuses.ts @@ -1,6 +1,6 @@ import { type AppController } from '@/app.ts'; -import { publish } from '@/client.ts'; import { ISO6391, Kind, z } from '@/deps.ts'; +import * as pipeline from '@/pipeline.ts'; import { getAncestors, getDescendants, getEvent } from '@/queries.ts'; import { signEvent } from '@/sign.ts'; import { toStatus } from '@/transformers/nostr-to-mastoapi.ts'; @@ -77,7 +77,13 @@ const createStatusController: AppController = async (c) => { created_at: nostrNow(), }, c); - publish(event); + try { + await pipeline.handleEvent(event); + } catch (e) { + if (e instanceof pipeline.RelayError) { + return c.json({ error: e.message }, 422); + } + } return c.json(await toStatus(event)); } else { @@ -118,7 +124,13 @@ const favouriteController: AppController = async (c) => { created_at: nostrNow(), }, c); - publish(event); + try { + await pipeline.handleEvent(event); + } catch (e) { + if (e instanceof pipeline.RelayError) { + return c.json({ error: e.message }, 422); + } + } const status = await toStatus(target); diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index 24316e7c..8befa566 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -1,5 +1,5 @@ import * as eventsDB from '@/db/events.ts'; -import { findUser } from '@/db/users.ts'; +import * as pipeline from '@/pipeline.ts'; import { jsonSchema } from '@/schema.ts'; import { type ClientCLOSE, @@ -15,12 +15,14 @@ import type { Event, Filter } from '@/deps.ts'; /** Limit of events returned per-filter. */ const FILTER_LIMIT = 100; +/** NIP-01 relay to client message. */ type RelayMsg = | ['EVENT', string, Event] | ['NOTICE', string] | ['EOSE', string] | ['OK', string, boolean, string]; +/** Set up the Websocket connection. */ function connectStream(socket: WebSocket) { socket.onmessage = (e) => { const result = jsonSchema.pipe(clientMsgSchema).safeParse(e.data); @@ -31,6 +33,7 @@ function connectStream(socket: WebSocket) { } }; + /** Handle client message. */ function handleMsg(msg: ClientMsg) { switch (msg[0]) { case 'REQ': @@ -45,6 +48,7 @@ function connectStream(socket: WebSocket) { } } + /** Handle REQ. Start a subscription. */ async function handleReq([_, sub, ...filters]: ClientREQ) { for (const event of await eventsDB.getFilters(prepareFilters(filters))) { send(['EVENT', sub, event]); @@ -52,20 +56,28 @@ function connectStream(socket: WebSocket) { send(['EOSE', sub]); } + /** Handle EVENT. Store the event. */ async function handleEvent([_, event]: ClientEVENT) { - if (await findUser({ pubkey: event.pubkey })) { - eventsDB.insertEvent(event); + try { + // This will store it (if eligible) and run other side-effects. + await pipeline.handleEvent(event); send(['OK', event.id, true, '']); - } else { - send(['OK', event.id, false, 'blocked: only registered users can post']); + } catch (e) { + if (e instanceof pipeline.RelayError) { + send(['OK', event.id, false, e.message]); + } else { + send(['OK', event.id, false, 'error: something went wrong']); + } } } + /** Handle CLOSE. Close the subscription. */ function handleClose([_, _sub]: ClientCLOSE) { // TODO: ??? return; } + /** Send a message back to the client. */ function send(msg: RelayMsg) { return socket.send(JSON.stringify(msg)); } diff --git a/src/db/events.ts b/src/db/events.ts index cb04b235..7910a400 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -119,16 +119,4 @@ async function getFilters( )); } -/** Returns whether the pubkey is followed by a local user. */ -async function isLocallyFollowed(pubkey: string): Promise { - return Boolean( - await getFilterQuery({ - kinds: [3], - '#p': [pubkey], - limit: 1, - local: true, - }).executeTakeFirst(), - ); -} - -export { getFilters, insertEvent, isLocallyFollowed }; +export { getFilters, insertEvent }; diff --git a/src/firehose.ts b/src/firehose.ts index c717c075..57cc1022 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -1,9 +1,8 @@ -import { insertEvent, isLocallyFollowed } from '@/db/events.ts'; -import { addRelays, getActiveRelays } from '@/db/relays.ts'; -import { findUser } from '@/db/users.ts'; +import { getActiveRelays } from '@/db/relays.ts'; import { type Event, RelayPool } from '@/deps.ts'; -import { trends } from '@/trends.ts'; -import { isRelay, nostrDate, nostrNow } from '@/utils.ts'; +import { nostrNow } from '@/utils.ts'; + +import * as pipeline from './pipeline.ts'; const relays = await getActiveRelays(); const pool = new RelayPool(relays); @@ -20,48 +19,10 @@ pool.subscribe( ); /** Handle events through the firehose pipeline. */ -async function handleEvent(event: Event): Promise { +function handleEvent(event: Event): Promise { console.info(`firehose: Event<${event.kind}> ${event.id}`); - trackHashtags(event); - trackRelays(event); - - if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) { - insertEvent(event).catch(console.warn); - } -} - -/** Track whenever a hashtag is used, for processing trending tags. */ -function trackHashtags(event: Event): void { - const date = nostrDate(event.created_at); - - const tags = event.tags - .filter((tag) => tag[0] === 't') - .map((tag) => tag[1]) - .slice(0, 5); - - if (!tags.length) return; - - try { - console.info('tracking tags:', tags); - trends.addTagUsages(event.pubkey, tags, date); - } catch (_e) { - // do nothing - } -} - -/** Tracks known relays in the database. */ -function trackRelays(event: Event) { - const relays = new Set<`wss://${string}`>(); - - event.tags.forEach((tag) => { - if (['p', 'e', 'a'].includes(tag[0]) && isRelay(tag[2])) { - relays.add(tag[2]); - } - if (event.kind === 10002 && tag[0] === 'r' && isRelay(tag[1])) { - relays.add(tag[1]); - } - }); - - return addRelays([...relays]); + return pipeline + .handleEvent(event) + .catch(() => {}); } diff --git a/src/pipeline.ts b/src/pipeline.ts new file mode 100644 index 00000000..af72b794 --- /dev/null +++ b/src/pipeline.ts @@ -0,0 +1,73 @@ +import * as eventsDB from '@/db/events.ts'; +import { addRelays } from '@/db/relays.ts'; +import { findUser } from '@/db/users.ts'; +import { type Event } from '@/deps.ts'; +import { isLocallyFollowed } from '@/queries.ts'; +import { trends } from '@/trends.ts'; +import { isRelay, nostrDate } from '@/utils.ts'; + +/** + * Common pipeline function to process (and maybe store) events. + * It is idempotent, so it can be called multiple times for the same event. + */ +async function handleEvent(event: Event): Promise { + await Promise.all([ + storeEvent(event), + trackRelays(event), + trackHashtags(event), + ]); +} + +/** Maybe store the event, if eligible. */ +async function storeEvent(event: Event): Promise { + if (await findUser({ pubkey: event.pubkey }) || await isLocallyFollowed(event.pubkey)) { + await eventsDB.insertEvent(event).catch(console.warn); + } else { + return Promise.reject(new RelayError('blocked', 'only registered users can post')); + } +} + +/** Track whenever a hashtag is used, for processing trending tags. */ +// deno-lint-ignore require-await +async function trackHashtags(event: Event): Promise { + const date = nostrDate(event.created_at); + + const tags = event.tags + .filter((tag) => tag[0] === 't') + .map((tag) => tag[1]) + .slice(0, 5); + + if (!tags.length) return; + + try { + console.info('tracking tags:', tags); + trends.addTagUsages(event.pubkey, tags, date); + } catch (_e) { + // do nothing + } +} + +/** Tracks known relays in the database. */ +function trackRelays(event: Event) { + const relays = new Set<`wss://${string}`>(); + + event.tags.forEach((tag) => { + if (['p', 'e', 'a'].includes(tag[0]) && isRelay(tag[2])) { + relays.add(tag[2]); + } + if (event.kind === 10002 && tag[0] === 'r' && isRelay(tag[1])) { + relays.add(tag[1]); + } + }); + + return addRelays([...relays]); +} + +/** NIP-20 command line result. */ +class RelayError extends Error { + constructor(prefix: 'duplicate' | 'pow' | 'blocked' | 'rate-limited' | 'invalid' | 'error', message: string) { + super(`${prefix}: ${message}`); + } +} + +export { handleEvent, RelayError }; diff --git a/src/queries.ts b/src/queries.ts index 7d03c308..0f4b3ec8 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -1,8 +1,8 @@ +import * as eventsDB from '@/db/events.ts'; import { type Event, type Filter, findReplyTag } from '@/deps.ts'; +import * as mixer from '@/mixer.ts'; import { type PaginationParams } from '@/utils.ts'; -import * as mixer from './mixer.ts'; - interface GetEventOpts { /** Timeout in milliseconds. */ timeout?: number; @@ -83,4 +83,10 @@ function getDescendants(eventId: string): Promise[]> { return mixer.getFilters([{ kinds: [1], '#e': [eventId] }], { limit: 200, timeout: 2000 }); } -export { getAncestors, getAuthor, getDescendants, getEvent, getFeed, getFollows, getPublicFeed }; +/** Returns whether the pubkey is followed by a local user. */ +async function isLocallyFollowed(pubkey: string): Promise { + const [event] = await eventsDB.getFilters([{ kinds: [3], '#p': [pubkey], local: true }], { limit: 1 }); + return Boolean(event); +} + +export { getAncestors, getAuthor, getDescendants, getEvent, getFeed, getFollows, getPublicFeed, isLocallyFollowed };