Pass DittoAPIStore to MastoAPI endpoints, DittoRelayStore to /relay

This commit is contained in:
Alex Gleason 2025-02-23 19:39:43 -06:00
parent cce78f2b0c
commit 751c09035c
No known key found for this signature in database
GPG key ID: 7211D1F99744FBB7
4 changed files with 50 additions and 33 deletions

View file

@ -148,6 +148,7 @@ import { rateLimitMiddleware } from '@/middleware/rateLimitMiddleware.ts';
import { uploaderMiddleware } from '@/middleware/uploaderMiddleware.ts'; import { uploaderMiddleware } from '@/middleware/uploaderMiddleware.ts';
import { translatorMiddleware } from '@/middleware/translatorMiddleware.ts'; import { translatorMiddleware } from '@/middleware/translatorMiddleware.ts';
import { logiMiddleware } from '@/middleware/logiMiddleware.ts'; import { logiMiddleware } from '@/middleware/logiMiddleware.ts';
import { DittoRelayStore } from '@/storages/DittoRelayStore.ts';
export interface AppEnv extends DittoEnv { export interface AppEnv extends DittoEnv {
Variables: { Variables: {
@ -188,32 +189,33 @@ const db = new DittoPolyPg(conf.databaseUrl, {
await db.migrate(); await db.migrate();
const store = new DittoPgStore({ const pgstore = new DittoPgStore({
db, db,
pubkey: await conf.signer.getPublicKey(), pubkey: await conf.signer.getPublicKey(),
timeout: conf.db.timeouts.default, timeout: conf.db.timeouts.default,
notify: conf.notifyEnabled, notify: conf.notifyEnabled,
}); });
const pool = new DittoPool({ conf, relay: store }); const pool = new DittoPool({ conf, relay: pgstore });
const relay = new DittoAPIStore({ db, conf, relay: store, pool }); const relaystore = new DittoRelayStore({ db, conf, relay: pgstore });
const apistore = new DittoAPIStore({ relay: relaystore, pool });
await seedZapSplits(relay); await seedZapSplits(apistore);
if (conf.firehoseEnabled) { if (conf.firehoseEnabled) {
startFirehose({ startFirehose({
pool, pool,
store: relay, store: relaystore,
concurrency: conf.firehoseConcurrency, concurrency: conf.firehoseConcurrency,
kinds: conf.firehoseKinds, kinds: conf.firehoseKinds,
}); });
} }
if (conf.cronEnabled) { if (conf.cronEnabled) {
cron({ conf, db, relay }); cron({ conf, db, relay: relaystore });
} }
const app = new DittoApp({ conf, db, relay }, { strict: false }); const app = new DittoApp({ conf, db, relay: relaystore }, { strict: false });
/** User-provided files in the gitignored `public/` directory. */ /** User-provided files in the gitignored `public/` directory. */
const publicFiles = serveStatic({ root: './public/' }); const publicFiles = serveStatic({ root: './public/' });
@ -240,7 +242,17 @@ app.use('/nodeinfo/*', metricsMiddleware, ratelimit, logiMiddleware);
app.use('/oauth/*', metricsMiddleware, ratelimit, logiMiddleware); app.use('/oauth/*', metricsMiddleware, ratelimit, logiMiddleware);
app.get('/api/v1/streaming', socketTokenMiddleware, metricsMiddleware, ratelimit, streamingController); app.get('/api/v1/streaming', socketTokenMiddleware, metricsMiddleware, ratelimit, streamingController);
app.get('/relay', metricsMiddleware, ratelimit, relayController);
app.get(
'/relay',
(c, next) => {
c.set('relay', relaystore);
return next();
},
metricsMiddleware,
ratelimit,
relayController,
);
app.use( app.use(
cspMiddleware(), cspMiddleware(),

View file

@ -1,54 +1,60 @@
import { DittoConf } from '@ditto/conf';
import { DittoDB } from '@ditto/db';
import { logi } from '@soapbox/logi'; import { logi } from '@soapbox/logi';
import { NostrEvent, NRelay } from '@nostrify/nostrify'; import { NostrEvent, NostrFilter, NostrRelayCLOSED, NostrRelayEOSE, NostrRelayEVENT, NRelay } from '@nostrify/nostrify';
import { DittoRelayStore } from '@/storages/DittoRelayStore.ts';
import { errorJson } from '@/utils/log.ts'; import { errorJson } from '@/utils/log.ts';
import { purifyEvent } from '@/utils/purify.ts'; import { purifyEvent } from '@/utils/purify.ts';
interface DittoAPIStoreOpts { interface DittoAPIStoreOpts {
db: DittoDB;
conf: DittoConf;
pool: NRelay; pool: NRelay;
relay: NRelay; relay: NRelay;
fetch?: typeof fetch;
} }
/** /**
* Store used by Ditto's Mastodon API implementation. * Store used by Ditto's Mastodon API implementation.
* It extends the RelayStore to publish events to the wider Nostr network. * It extends the RelayStore to publish events to the wider Nostr network.
*/ */
export class DittoAPIStore extends DittoRelayStore { export class DittoAPIStore implements NRelay {
_opts: DittoAPIStoreOpts; private ns = 'ditto.api.store';
private _ns = 'ditto.relay.store'; constructor(private opts: DittoAPIStoreOpts) {}
constructor(opts: DittoAPIStoreOpts) { req(
super(opts); filters: NostrFilter[],
this._opts = opts; opts?: { signal?: AbortSignal },
): AsyncIterable<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
const { relay } = this.opts;
return relay.req(filters, opts);
} }
override async event(event: NostrEvent, opts?: { signal?: AbortSignal }): Promise<void> { query(filters: NostrFilter[], opts?: { signal?: AbortSignal }): Promise<NostrEvent[]> {
const { pool } = this._opts; const { relay } = this.opts;
return relay.query(filters, opts);
}
async event(event: NostrEvent, opts?: { signal?: AbortSignal }): Promise<void> {
const { pool, relay } = this.opts;
const { id, kind } = event; const { id, kind } = event;
await super.event(event, opts); await relay.event(event, opts);
(async () => { (async () => {
try { try {
// `purifyEvent` is important, or you will suffer. // `purifyEvent` is important, or you will suffer.
await pool.event(purifyEvent(event), opts); await pool.event(purifyEvent(event), opts);
} catch (e) { } catch (e) {
logi({ level: 'error', ns: this._ns, source: 'publish', id, kind, error: errorJson(e) }); logi({ level: 'error', ns: this.ns, source: 'publish', id, kind, error: errorJson(e) });
} }
})(); })();
} }
override async close(): Promise<void> { async close(): Promise<void> {
const { pool } = this._opts; const { pool, relay } = this.opts;
await pool.close(); await pool.close();
await super.close(); await relay.close();
}
[Symbol.asyncDispose](): Promise<void> {
return this.close();
} }
} }

View file

@ -60,7 +60,7 @@ export class DittoRelayStore implements NRelay {
private faviconCache: SimpleLRU<string, URL>; private faviconCache: SimpleLRU<string, URL>;
private nip05Cache: SimpleLRU<string, nip19.ProfilePointer>; private nip05Cache: SimpleLRU<string, nip19.ProfilePointer>;
private ns = 'ditto.api.store'; private ns = 'ditto.relay.store';
constructor(private opts: DittoRelayStoreOpts) { constructor(private opts: DittoRelayStoreOpts) {
const { conf, db } = this.opts; const { conf, db } = this.opts;

View file

@ -1,18 +1,17 @@
import { Semaphore } from '@core/asyncutil'; import { Semaphore } from '@core/asyncutil';
import { NostrEvent } from '@nostrify/nostrify'; import { NostrEvent } from '@nostrify/nostrify';
import { MockRelay } from '@nostrify/nostrify/test';
import { DittoConf } from '@ditto/conf'; import { DittoConf } from '@ditto/conf';
import { DittoPolyPg } from '@ditto/db'; import { DittoPolyPg } from '@ditto/db';
import { DittoAPIStore } from '../packages/ditto/storages/DittoAPIStore.ts';
import { DittoPgStore } from '../packages/ditto/storages/DittoPgStore.ts'; import { DittoPgStore } from '../packages/ditto/storages/DittoPgStore.ts';
import { DittoRelayStore } from '../packages/ditto/storages/DittoRelayStore.ts';
const conf = new DittoConf(Deno.env); const conf = new DittoConf(Deno.env);
const db = new DittoPolyPg(conf.databaseUrl); const db = new DittoPolyPg(conf.databaseUrl);
const pgstore = new DittoPgStore({ db, pubkey: await conf.signer.getPublicKey() }); const pgstore = new DittoPgStore({ db, pubkey: await conf.signer.getPublicKey() });
const apistore = new DittoAPIStore({ conf, db, relay: pgstore, pool: new MockRelay() }); const relaystore = new DittoRelayStore({ conf, db, relay: pgstore });
const sem = new Semaphore(5); const sem = new Semaphore(5);
@ -28,7 +27,7 @@ for await (const row of query.stream(100)) {
sem.lock(async () => { sem.lock(async () => {
const event: NostrEvent = { ...row, created_at: Number(row.created_at) }; const event: NostrEvent = { ...row, created_at: Number(row.created_at) };
await apistore.updateAuthorData(event, AbortSignal.timeout(3000)); await relaystore.updateAuthorData(event, AbortSignal.timeout(3000));
}); });
} }