diff --git a/src/deps.ts b/src/deps.ts index a6b893cd..d0bf79c9 100644 --- a/src/deps.ts +++ b/src/deps.ts @@ -9,7 +9,7 @@ export { } from 'https://deno.land/x/hono@v3.10.1/mod.ts'; export { cors, logger, serveStatic } from 'https://deno.land/x/hono@v3.10.1/middleware.ts'; export { z } from 'https://deno.land/x/zod@v3.21.4/mod.ts'; -export { Author, RelayPool } from 'https://dev.jspm.io/nostr-relaypool@0.6.30'; +export { Author } from 'https://dev.jspm.io/nostr-relaypool@0.6.30'; export { type Event, type EventTemplate, @@ -77,7 +77,10 @@ export * as cron from 'https://deno.land/x/deno_cron@v1.0.0/cron.ts'; export { S3Client } from 'https://deno.land/x/s3_lite_client@0.6.1/mod.ts'; export { default as IpfsHash } from 'npm:ipfs-only-hash@^4.0.0'; export { default as uuid62 } from 'npm:uuid62@^1.0.2'; -export { Machina } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/08a157d39f2741c9a3a4364cb97db36e71d8c03a/mod.ts'; +export { + Machina, + RelayPool, +} from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/9773dbf00f29338f4bde17b136b3a54569d66afa/mod.ts'; export * as Sentry from 'https://deno.land/x/sentry@7.78.0/index.js'; export { sentry as sentryMiddleware } from 'npm:@hono/sentry@^1.0.0'; export * as Comlink from 'npm:comlink@^4.4.1'; diff --git a/src/firehose.ts b/src/firehose.ts index ea6be62a..ee5c38d1 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -1,5 +1,7 @@ import { type Event } from '@/deps.ts'; import { activeRelays, pool } from '@/pool.ts'; +import { jsonSchema } from '@/schema.ts'; +import { relayEventSchema } from '@/schemas/nostr.ts'; import { nostrNow } from '@/utils.ts'; import * as pipeline from './pipeline.ts'; @@ -7,18 +9,21 @@ import * as pipeline from './pipeline.ts'; // This file watches events on all known relays and performs // side-effects based on them, such as trending hashtag tracking // and storing events for notifications and the home feed. -pool.subscribe( - [{ kinds: [0, 1, 3, 5, 6, 7, 10002], limit: 0, since: nostrNow() }], - activeRelays, - handleEvent, - undefined, - undefined, -); +const sub = pool.req({ + filtersFn: () => [{ kinds: [0, 1, 3, 5, 6, 7, 10002], limit: 0, since: nostrNow() }], + relays: activeRelays, +}); + +sub.addEventListener('message', (event) => { + const result = jsonSchema.pipe(relayEventSchema).safeParse(event.data); + if (result.success) { + const [_cmd, _subId, event] = result.data; + handleEvent(event); + } +}); /** Handle events through the firehose pipeline. */ function handleEvent(event: Event): Promise { - console.info(`firehose: Event<${event.kind}> ${event.id}`); - return pipeline .handleEvent(event) .catch(() => {}); diff --git a/src/pipeline.ts b/src/pipeline.ts index 4f0c51ab..cf721c49 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -24,6 +24,7 @@ import type { EventData } from '@/types.ts'; async function handleEvent(event: Event): Promise { if (!(await verifySignatureWorker(event))) return; if (encounterEvent(event)) return; + console.info(`pipeline: Event<${event.kind}> ${event.id}`); const data = await getEventData(event); await Promise.all([ diff --git a/src/pool.ts b/src/pool.ts index 07ac6b62..d7ca99e0 100644 --- a/src/pool.ts +++ b/src/pool.ts @@ -5,16 +5,12 @@ const activeRelays = await getActiveRelays(); console.log(`pool: connecting to ${activeRelays.length} relays.`); -const pool = new RelayPool(activeRelays, { - // The pipeline verifies events. - skipVerification: true, - // The logging feature overwhelms the CPU and creates too many logs. - logErrorsAndNotices: false, -}); +const pool = new RelayPool(); /** Publish an event to the given relays, or the entire pool. */ -function publish(event: Event, relays: string[] = activeRelays) { - return pool.publish(event, relays); +function publish(_event: Event, _relays: string[] = activeRelays) { + // TODO: + // return pool.publish(event, relays); } export { activeRelays, pool, publish }; diff --git a/src/schemas/nostr.ts b/src/schemas/nostr.ts index 7fb888c7..511d93c1 100644 --- a/src/schemas/nostr.ts +++ b/src/schemas/nostr.ts @@ -63,6 +63,8 @@ type ClientCOUNT = z.infer; /** Client message to a Nostr relay. */ type ClientMsg = z.infer; +const relayEventSchema = z.tuple([z.literal('EVENT'), z.string().min(1), eventSchema]); + /** Kind 0 content schema. */ const metaContentSchema = z.object({ name: z.string().optional().catch(undefined), @@ -134,6 +136,7 @@ export { mediaDataSchema, metaContentSchema, nostrIdSchema, + relayEventSchema, relayInfoDocSchema, signedEventSchema, };