Make DittoAPIStore and DittoRelay separate things

This commit is contained in:
Alex Gleason 2025-02-23 18:54:04 -06:00
parent 52a9017730
commit cce78f2b0c
No known key found for this signature in database
GPG key ID: 7211D1F99744FBB7
3 changed files with 66 additions and 36 deletions

View file

@ -1,4 +1,54 @@
import { DittoRelayStore } from '@/storages/DittoRelayStore.ts'; import { DittoConf } from '@ditto/conf';
import { DittoDB } from '@ditto/db';
import { logi } from '@soapbox/logi';
import { NostrEvent, NRelay } from '@nostrify/nostrify';
export class DittoAPIStore extends DittoRelayStore { import { DittoRelayStore } from '@/storages/DittoRelayStore.ts';
import { errorJson } from '@/utils/log.ts';
import { purifyEvent } from '@/utils/purify.ts';
interface DittoAPIStoreOpts {
db: DittoDB;
conf: DittoConf;
pool: NRelay;
relay: NRelay;
fetch?: typeof fetch;
}
/**
* Store used by Ditto's Mastodon API implementation.
* It extends the RelayStore to publish events to the wider Nostr network.
*/
export class DittoAPIStore extends DittoRelayStore {
_opts: DittoAPIStoreOpts;
private _ns = 'ditto.relay.store';
constructor(opts: DittoAPIStoreOpts) {
super(opts);
this._opts = opts;
}
override async event(event: NostrEvent, opts?: { signal?: AbortSignal }): Promise<void> {
const { pool } = this._opts;
const { id, kind } = event;
await super.event(event, opts);
(async () => {
try {
// `purifyEvent` is important, or you will suffer.
await pool.event(purifyEvent(event), opts);
} catch (e) {
logi({ level: 'error', ns: this._ns, source: 'publish', id, kind, error: errorJson(e) });
}
})();
}
override async close(): Promise<void> {
const { pool } = this._opts;
await pool.close();
await super.close();
}
} }

View file

@ -41,8 +41,6 @@ Deno.test('updateAuthorData sets nip05', async () => {
function setupTest(cb: (req: Request) => Response | Promise<Response>) { function setupTest(cb: (req: Request) => Response | Promise<Response>) {
const conf = new DittoConf(Deno.env); const conf = new DittoConf(Deno.env);
const db = new DittoPolyPg(conf.databaseUrl); const db = new DittoPolyPg(conf.databaseUrl);
const pool = new MockRelay();
const relay = new MockRelay(); const relay = new MockRelay();
const mockFetch: typeof fetch = async (input, init) => { const mockFetch: typeof fetch = async (input, init) => {
@ -50,7 +48,7 @@ function setupTest(cb: (req: Request) => Response | Promise<Response>) {
return await cb(req); return await cb(req);
}; };
const store = new DittoRelayStore({ conf, db, relay, pool, fetch: mockFetch }); const store = new DittoRelayStore({ conf, db, relay, fetch: mockFetch });
return { return {
db, db,

View file

@ -46,11 +46,11 @@ import { nip19 } from 'nostr-tools';
interface DittoRelayStoreOpts { interface DittoRelayStoreOpts {
db: DittoDB; db: DittoDB;
conf: DittoConf; conf: DittoConf;
pool: NRelay;
relay: NRelay; relay: NRelay;
fetch?: typeof fetch; fetch?: typeof fetch;
} }
/** Backing storage class for Ditto relay implementation at `/relay`. */
export class DittoRelayStore implements NRelay { export class DittoRelayStore implements NRelay {
private push: DittoPush; private push: DittoPush;
private encounters = new LRUCache<string, true>({ max: 5000 }); private encounters = new LRUCache<string, true>({ max: 5000 });
@ -60,7 +60,7 @@ export class DittoRelayStore implements NRelay {
private faviconCache: SimpleLRU<string, URL>; private faviconCache: SimpleLRU<string, URL>;
private nip05Cache: SimpleLRU<string, nip19.ProfilePointer>; private nip05Cache: SimpleLRU<string, nip19.ProfilePointer>;
private ns = 'ditto.apistore'; private ns = 'ditto.api.store';
constructor(private opts: DittoRelayStoreOpts) { constructor(private opts: DittoRelayStoreOpts) {
const { conf, db } = this.opts; const { conf, db } = this.opts;
@ -95,31 +95,6 @@ export class DittoRelayStore implements NRelay {
); );
} }
req(
filters: NostrFilter[],
opts?: { signal?: AbortSignal },
): AsyncIterable<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
const { relay } = this.opts;
return relay.req(filters, opts);
}
async event(event: NostrEvent, opts?: { publish?: boolean; signal?: AbortSignal }): Promise<void> {
const { pool } = this.opts;
const { id, kind } = event;
await this.handleEvent(event, opts);
if (opts?.publish) {
(async () => {
try {
await pool.event(purifyEvent(event), opts);
} catch (e) {
logi({ level: 'error', ns: this.ns, source: 'publish', id, kind, error: errorJson(e) });
}
})();
}
}
/** Open a firehose to the relay. */ /** Open a firehose to the relay. */
private async listen(): Promise<void> { private async listen(): Promise<void> {
const { relay } = this.opts; const { relay } = this.opts;
@ -128,16 +103,24 @@ export class DittoRelayStore implements NRelay {
for await (const msg of relay.req([{ limit: 0 }], { signal })) { for await (const msg of relay.req([{ limit: 0 }], { signal })) {
if (msg[0] === 'EVENT') { if (msg[0] === 'EVENT') {
const [, , event] = msg; const [, , event] = msg;
await this.handleEvent(event, { signal }); await this.event(event, { signal });
} }
} }
} }
req(
filters: NostrFilter[],
opts?: { signal?: AbortSignal },
): AsyncIterable<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
const { relay } = this.opts;
return relay.req(filters, opts);
}
/** /**
* Common pipeline function to process (and maybe store) events. * Common pipeline function to process (and maybe store) events.
* It is idempotent, so it can be called multiple times for the same event. * It is idempotent, so it can be called multiple times for the same event.
*/ */
private async handleEvent(event: DittoEvent, opts: { signal?: AbortSignal } = {}): Promise<void> { async event(event: DittoEvent, opts: { publish?: boolean; signal?: AbortSignal } = {}): Promise<void> {
const { conf, relay } = this.opts; const { conf, relay } = this.opts;
const { signal } = opts; const { signal } = opts;
@ -474,11 +457,10 @@ export class DittoRelayStore implements NRelay {
} }
async close(): Promise<void> { async close(): Promise<void> {
const { relay, pool } = this.opts; const { relay } = this.opts;
this.controller.abort(); this.controller.abort();
await pool.close();
await relay.close(); await relay.close();
} }