diff --git a/packages/ditto/controllers/api/oauth.ts b/packages/ditto/controllers/api/oauth.ts index 7ac2c2b2..c48963a9 100644 --- a/packages/ditto/controllers/api/oauth.ts +++ b/packages/ditto/controllers/api/oauth.ts @@ -123,7 +123,7 @@ async function getToken( encryption: 'nip44', pubkey: bunkerPubkey, signer: new NSecSigner(nip46Seckey), - relay: await Storages.pubsub(), // TODO: Use the relays from the request. + relay: await Storages.db(), // TODO: Use the relays from the request. timeout: 60_000, }); diff --git a/packages/ditto/controllers/api/streaming.ts b/packages/ditto/controllers/api/streaming.ts index 01eaaed8..25fe877d 100644 --- a/packages/ditto/controllers/api/streaming.ts +++ b/packages/ditto/controllers/api/streaming.ts @@ -94,8 +94,6 @@ const streamingController: AppController = async (c) => { const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 }); const store = await Storages.db(); - const pubsub = await Storages.pubsub(); - const policy = pubkey ? new MuteListPolicy(pubkey, await Storages.admin()) : undefined; function send(e: StreamingEvent) { @@ -107,7 +105,7 @@ const streamingController: AppController = async (c) => { async function sub(filters: NostrFilter[], render: (event: NostrEvent) => Promise) { try { - for await (const msg of pubsub.req(filters, { signal: controller.signal })) { + for await (const msg of store.req(filters, { signal: controller.signal })) { if (msg[0] === 'EVENT') { const event = msg[2]; diff --git a/packages/ditto/controllers/nostr/relay.ts b/packages/ditto/controllers/nostr/relay.ts index b4924f22..c549c594 100644 --- a/packages/ditto/controllers/nostr/relay.ts +++ b/packages/ditto/controllers/nostr/relay.ts @@ -23,9 +23,6 @@ import { errorJson } from '@/utils/log.ts'; import { purifyEvent } from '@/utils/purify.ts'; import { Time } from '@/utils/time.ts'; -/** Limit of initial events returned for a subscription. */ -const FILTER_LIMIT = 100; - const limiters = { msg: new MemoryRateLimiter({ limit: 300, window: Time.minutes(1) }), req: new MultiRateLimiter([ @@ -126,11 +123,10 @@ function connectStream(socket: WebSocket, ip: string | undefined, conf: DittoCon controllers.set(subId, controller); const store = await Storages.db(); - const pubsub = await Storages.pubsub(); try { - for (const event of await store.query(filters, { limit: FILTER_LIMIT, timeout: conf.db.timeouts.relay })) { - send(['EVENT', subId, purifyEvent(event)]); + for await (const [verb, , ...rest] of store.req(filters, { timeout: conf.db.timeouts.relay })) { + send([verb, subId, ...rest] as NostrRelayMsg); } } catch (e) { if (e instanceof RelayError) { @@ -143,18 +139,6 @@ function connectStream(socket: WebSocket, ip: string | undefined, conf: DittoCon controllers.delete(subId); return; } - - send(['EOSE', subId]); - - try { - for await (const msg of pubsub.req(filters, { signal: controller.signal })) { - if (msg[0] === 'EVENT') { - send(['EVENT', subId, msg[2]]); - } - } - } catch { - controllers.delete(subId); - } } /** Handle EVENT. Store the event. */ diff --git a/packages/ditto/pipeline.ts b/packages/ditto/pipeline.ts index d3168c0e..1c49e930 100644 --- a/packages/ditto/pipeline.ts +++ b/packages/ditto/pipeline.ts @@ -77,42 +77,21 @@ async function handleEvent(event: DittoEvent, opts: PipelineOpts): Promise // NIP-46 events get special treatment. // They are exempt from policies and other side-effects, and should be streamed out immediately. // If streaming fails, an error should be returned. - if (event.kind === 24133) { - await streamOut(event); - return; - } + if (event.kind !== 24133) { + // Ensure the event doesn't violate the policy. + if (event.pubkey !== Conf.pubkey) { + await policyFilter(event, opts.signal); + } - // Ensure the event doesn't violate the policy. - if (event.pubkey !== Conf.pubkey) { - await policyFilter(event, opts.signal); - } + // Prepare the event for additional checks. + // FIXME: This is kind of hacky. Should be reorganized to fetch only what's needed for each stage. + await hydrateEvent(event, opts.signal); - // Prepare the event for additional checks. - // FIXME: This is kind of hacky. Should be reorganized to fetch only what's needed for each stage. - await hydrateEvent(event, opts.signal); - - // Ensure that the author is not banned. - const n = getTagSet(event.user?.tags ?? [], 'n'); - if (n.has('disabled')) { - throw new RelayError('blocked', 'author is blocked'); - } - - // Ephemeral events must throw if they are not streamed out. - if (NKinds.ephemeral(event.kind)) { - await Promise.all([ - streamOut(event), - webPush(event), - ]); - return; - } - - // Events received through notify are thought to already be in the database, so they only need to be streamed. - if (opts.source === 'notify') { - await Promise.all([ - streamOut(event), - webPush(event), - ]); - return; + // Ensure that the author is not banned. + const n = getTagSet(event.user?.tags ?? [], 'n'); + if (n.has('disabled')) { + throw new RelayError('blocked', 'author is blocked'); + } } const kysely = await Storages.kysely(); @@ -127,12 +106,7 @@ async function handleEvent(event: DittoEvent, opts: PipelineOpts): Promise prewarmLinkPreview(event, opts.signal), generateSetEvents(event), ]) - .then(() => - Promise.allSettled([ - streamOut(event), - webPush(event), - ]) - ); + .then(() => webPush(event)); } } @@ -165,12 +139,13 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise { - if (NKinds.ephemeral(event.kind)) return; const store = await Storages.db(); try { await store.transaction(async (store, kysely) => { - await updateStats({ event, store, kysely }); + if (!NKinds.ephemeral(event.kind)) { + await updateStats({ event, store, kysely }); + } await store.event(event, { signal }); }); } catch (e) { @@ -274,16 +249,6 @@ function isFresh(event: NostrEvent): boolean { return eventAge(event) < Time.minutes(1); } -/** Distribute the event through active subscriptions. */ -async function streamOut(event: NostrEvent): Promise { - if (!isFresh(event)) { - throw new RelayError('invalid', 'event too old'); - } - - const pubsub = await Storages.pubsub(); - await pubsub.event(event); -} - async function webPush(event: NostrEvent): Promise { if (!isFresh(event)) { throw new RelayError('invalid', 'event too old'); diff --git a/packages/ditto/signers/ConnectSigner.ts b/packages/ditto/signers/ConnectSigner.ts index 89c62679..c6d23d37 100644 --- a/packages/ditto/signers/ConnectSigner.ts +++ b/packages/ditto/signers/ConnectSigner.ts @@ -28,7 +28,7 @@ export class ConnectSigner implements NostrSigner { encryption: 'nip44', pubkey: this.opts.bunkerPubkey, // TODO: use a remote relay for `nprofile` signing (if present and `Conf.relay` isn't already in the list) - relay: await Storages.pubsub(), + relay: await Storages.db(), signer, timeout: 60_000, }); diff --git a/packages/ditto/storages.ts b/packages/ditto/storages.ts index f7bde886..1fe46e83 100644 --- a/packages/ditto/storages.ts +++ b/packages/ditto/storages.ts @@ -1,14 +1,12 @@ // deno-lint-ignore-file require-await import { type DittoDatabase, DittoDB } from '@ditto/db'; -import { internalSubscriptionsSizeGauge } from '@ditto/metrics'; +import { NPool, NRelay1 } from '@nostrify/nostrify'; import { logi } from '@soapbox/logi'; import { Conf } from '@/config.ts'; import { wsUrlSchema } from '@/schema.ts'; import { AdminStore } from '@/storages/AdminStore.ts'; import { DittoPgStore } from '@/storages/DittoPgStore.ts'; -import { InternalRelay } from '@/storages/InternalRelay.ts'; -import { NPool, NRelay1 } from '@nostrify/nostrify'; import { getRelays } from '@/utils/outbox.ts'; import { seedZapSplits } from '@/utils/zap-split.ts'; @@ -17,7 +15,6 @@ export class Storages { private static _database: Promise | undefined; private static _admin: Promise | undefined; private static _client: Promise> | undefined; - private static _pubsub: Promise | undefined; public static async database(): Promise { if (!this._database) { @@ -59,14 +56,6 @@ export class Storages { return this._admin; } - /** Internal pubsub relay between controllers and the pipeline. */ - public static async pubsub(): Promise { - if (!this._pubsub) { - this._pubsub = Promise.resolve(new InternalRelay({ gauge: internalSubscriptionsSizeGauge })); - } - return this._pubsub; - } - /** Relay pool storage. */ public static async client(): Promise> { if (!this._client) { diff --git a/packages/ditto/storages/InternalRelay.test.ts b/packages/ditto/storages/InternalRelay.test.ts deleted file mode 100644 index c97dcd39..00000000 --- a/packages/ditto/storages/InternalRelay.test.ts +++ /dev/null @@ -1,23 +0,0 @@ -import { assertEquals } from '@std/assert'; - -import { eventFixture } from '@/test.ts'; - -import { InternalRelay } from './InternalRelay.ts'; - -Deno.test('InternalRelay', async () => { - const relay = new InternalRelay(); - const event1 = await eventFixture('event-1'); - - const promise = new Promise((resolve) => setTimeout(() => resolve(relay.event(event1)), 0)); - - for await (const msg of relay.req([{}])) { - if (msg[0] === 'EVENT') { - assertEquals(relay.subs.size, 1); - assertEquals(msg[2], event1); - break; - } - } - - await promise; - assertEquals(relay.subs.size, 0); // cleanup -}); diff --git a/packages/ditto/storages/InternalRelay.ts b/packages/ditto/storages/InternalRelay.ts deleted file mode 100644 index 9ab942fb..00000000 --- a/packages/ditto/storages/InternalRelay.ts +++ /dev/null @@ -1,86 +0,0 @@ -// deno-lint-ignore-file require-await -import { - NIP50, - NostrEvent, - NostrFilter, - NostrRelayCLOSED, - NostrRelayEOSE, - NostrRelayEVENT, - NRelay, -} from '@nostrify/nostrify'; -import { Machina } from '@nostrify/nostrify/utils'; -import { matchFilter } from 'nostr-tools'; -import { Gauge } from 'prom-client'; - -import { DittoEvent } from '@/interfaces/DittoEvent.ts'; -import { purifyEvent } from '@/utils/purify.ts'; - -interface InternalRelayOpts { - gauge?: Gauge; -} - -/** - * PubSub event store for streaming events within the application. - * The pipeline should push events to it, then anything in the application can subscribe to it. - */ -export class InternalRelay implements NRelay { - readonly subs = new Map }>(); - - constructor(private opts: InternalRelayOpts = {}) {} - - async *req( - filters: NostrFilter[], - opts?: { signal?: AbortSignal }, - ): AsyncGenerator { - const id = crypto.randomUUID(); - const machina = new Machina(opts?.signal); - - yield ['EOSE', id]; - - this.subs.set(id, { filters, machina }); - this.opts.gauge?.set(this.subs.size); - - try { - for await (const event of machina) { - yield ['EVENT', id, event]; - } - } finally { - this.subs.delete(id); - this.opts.gauge?.set(this.subs.size); - } - } - - async event(event: DittoEvent): Promise { - for (const { filters, machina } of this.subs.values()) { - for (const filter of filters) { - if (matchFilter(filter, event)) { - if (filter.search) { - const tokens = NIP50.parseInput(filter.search); - - const domain = (tokens.find((t) => - typeof t === 'object' && t.key === 'domain' - ) as { key: 'domain'; value: string } | undefined)?.value; - - if (domain === event.author_stats?.nip05_hostname) { - machina.push(purifyEvent(event)); - break; - } - } else { - machina.push(purifyEvent(event)); - break; - } - } - } - } - - return Promise.resolve(); - } - - async query(): Promise { - return []; - } - - async close(): Promise { - return Promise.resolve(); - } -}