diff --git a/packages/ditto/storages/DittoAPIStore.ts b/packages/ditto/storages/DittoAPIStore.ts index 26f3c2c6..42e08011 100644 --- a/packages/ditto/storages/DittoAPIStore.ts +++ b/packages/ditto/storages/DittoAPIStore.ts @@ -1,4 +1,54 @@ -import { DittoRelayStore } from '@/storages/DittoRelayStore.ts'; +import { DittoConf } from '@ditto/conf'; +import { DittoDB } from '@ditto/db'; +import { logi } from '@soapbox/logi'; +import { NostrEvent, NRelay } from '@nostrify/nostrify'; -export class DittoAPIStore extends DittoRelayStore { +import { DittoRelayStore } from '@/storages/DittoRelayStore.ts'; +import { errorJson } from '@/utils/log.ts'; +import { purifyEvent } from '@/utils/purify.ts'; + +interface DittoAPIStoreOpts { + db: DittoDB; + conf: DittoConf; + pool: NRelay; + relay: NRelay; + fetch?: typeof fetch; +} + +/** + * Store used by Ditto's Mastodon API implementation. + * It extends the RelayStore to publish events to the wider Nostr network. + */ +export class DittoAPIStore extends DittoRelayStore { + _opts: DittoAPIStoreOpts; + + private _ns = 'ditto.relay.store'; + + constructor(opts: DittoAPIStoreOpts) { + super(opts); + this._opts = opts; + } + + override async event(event: NostrEvent, opts?: { signal?: AbortSignal }): Promise { + const { pool } = this._opts; + const { id, kind } = event; + + await super.event(event, opts); + + (async () => { + try { + // `purifyEvent` is important, or you will suffer. + await pool.event(purifyEvent(event), opts); + } catch (e) { + logi({ level: 'error', ns: this._ns, source: 'publish', id, kind, error: errorJson(e) }); + } + })(); + } + + override async close(): Promise { + const { pool } = this._opts; + + await pool.close(); + await super.close(); + } } diff --git a/packages/ditto/storages/DittoRelayStore.test.ts b/packages/ditto/storages/DittoRelayStore.test.ts index e589490b..66690efa 100644 --- a/packages/ditto/storages/DittoRelayStore.test.ts +++ b/packages/ditto/storages/DittoRelayStore.test.ts @@ -41,8 +41,6 @@ Deno.test('updateAuthorData sets nip05', async () => { function setupTest(cb: (req: Request) => Response | Promise) { const conf = new DittoConf(Deno.env); const db = new DittoPolyPg(conf.databaseUrl); - - const pool = new MockRelay(); const relay = new MockRelay(); const mockFetch: typeof fetch = async (input, init) => { @@ -50,7 +48,7 @@ function setupTest(cb: (req: Request) => Response | Promise) { return await cb(req); }; - const store = new DittoRelayStore({ conf, db, relay, pool, fetch: mockFetch }); + const store = new DittoRelayStore({ conf, db, relay, fetch: mockFetch }); return { db, diff --git a/packages/ditto/storages/DittoRelayStore.ts b/packages/ditto/storages/DittoRelayStore.ts index a6f14025..1553d422 100644 --- a/packages/ditto/storages/DittoRelayStore.ts +++ b/packages/ditto/storages/DittoRelayStore.ts @@ -46,11 +46,11 @@ import { nip19 } from 'nostr-tools'; interface DittoRelayStoreOpts { db: DittoDB; conf: DittoConf; - pool: NRelay; relay: NRelay; fetch?: typeof fetch; } +/** Backing storage class for Ditto relay implementation at `/relay`. */ export class DittoRelayStore implements NRelay { private push: DittoPush; private encounters = new LRUCache({ max: 5000 }); @@ -60,7 +60,7 @@ export class DittoRelayStore implements NRelay { private faviconCache: SimpleLRU; private nip05Cache: SimpleLRU; - private ns = 'ditto.apistore'; + private ns = 'ditto.api.store'; constructor(private opts: DittoRelayStoreOpts) { const { conf, db } = this.opts; @@ -95,31 +95,6 @@ export class DittoRelayStore implements NRelay { ); } - req( - filters: NostrFilter[], - opts?: { signal?: AbortSignal }, - ): AsyncIterable { - const { relay } = this.opts; - return relay.req(filters, opts); - } - - async event(event: NostrEvent, opts?: { publish?: boolean; signal?: AbortSignal }): Promise { - const { pool } = this.opts; - const { id, kind } = event; - - await this.handleEvent(event, opts); - - if (opts?.publish) { - (async () => { - try { - await pool.event(purifyEvent(event), opts); - } catch (e) { - logi({ level: 'error', ns: this.ns, source: 'publish', id, kind, error: errorJson(e) }); - } - })(); - } - } - /** Open a firehose to the relay. */ private async listen(): Promise { const { relay } = this.opts; @@ -128,16 +103,24 @@ export class DittoRelayStore implements NRelay { for await (const msg of relay.req([{ limit: 0 }], { signal })) { if (msg[0] === 'EVENT') { const [, , event] = msg; - await this.handleEvent(event, { signal }); + await this.event(event, { signal }); } } } + req( + filters: NostrFilter[], + opts?: { signal?: AbortSignal }, + ): AsyncIterable { + const { relay } = this.opts; + return relay.req(filters, opts); + } + /** * Common pipeline function to process (and maybe store) events. * It is idempotent, so it can be called multiple times for the same event. */ - private async handleEvent(event: DittoEvent, opts: { signal?: AbortSignal } = {}): Promise { + async event(event: DittoEvent, opts: { publish?: boolean; signal?: AbortSignal } = {}): Promise { const { conf, relay } = this.opts; const { signal } = opts; @@ -474,11 +457,10 @@ export class DittoRelayStore implements NRelay { } async close(): Promise { - const { relay, pool } = this.opts; + const { relay } = this.opts; this.controller.abort(); - await pool.close(); await relay.close(); }