Add DittoPool class

This commit is contained in:
Alex Gleason 2025-02-22 12:59:36 -06:00
parent 2f0dbc44e4
commit 79fc568548
No known key found for this signature in database
GPG key ID: 7211D1F99744FBB7
6 changed files with 106 additions and 120 deletions

View file

@ -10,7 +10,7 @@ import { getInstanceMetadata } from '@/utils/instance.ts';
import { deleteTag } from '@/utils/tags.ts'; import { deleteTag } from '@/utils/tags.ts';
import { DittoZapSplits, getZapSplits } from '@/utils/zap-split.ts'; import { DittoZapSplits, getZapSplits } from '@/utils/zap-split.ts';
import { screenshotsSchema } from '@/schemas/nostr.ts'; import { screenshotsSchema } from '@/schemas/nostr.ts';
import { booleanParamSchema, percentageSchema, wsUrlSchema } from '@/schema.ts'; import { booleanParamSchema, percentageSchema } from '@/schema.ts';
import { hydrateEvents } from '@/storages/hydrate.ts'; import { hydrateEvents } from '@/storages/hydrate.ts';
import { renderNameRequest } from '@/views/ditto.ts'; import { renderNameRequest } from '@/views/ditto.ts';
import { accountFromPubkey } from '@/views/mastodon/accounts.ts'; import { accountFromPubkey } from '@/views/mastodon/accounts.ts';
@ -20,6 +20,16 @@ import { updateListAdminEvent } from '@/utils/api.ts';
const markerSchema = z.enum(['read', 'write']); const markerSchema = z.enum(['read', 'write']);
/** WebSocket URL. */
const wsUrlSchema = z.string().refine((val): val is `wss://${string}` | `ws://${string}` => {
try {
const { protocol } = new URL(val);
return protocol === 'wss:' || protocol === 'ws:';
} catch {
return false;
}
}, 'Invalid WebSocket URL');
const relaySchema = z.object({ const relaySchema = z.object({
url: wsUrlSchema, url: wsUrlSchema,
marker: markerSchema.optional(), marker: markerSchema.optional(),
@ -62,7 +72,7 @@ function renderRelays(event: NostrEvent): RelayEntity[] {
return event.tags.reduce((acc, [name, url, marker]) => { return event.tags.reduce((acc, [name, url, marker]) => {
if (name === 'r') { if (name === 'r') {
const relay: RelayEntity = { const relay: RelayEntity = {
url, url: url as `wss://${string}`,
marker: markerSchema.safeParse(marker).success ? marker as 'read' | 'write' : undefined, marker: markerSchema.safeParse(marker).success ? marker as 'read' | 'write' : undefined,
}; };
acc.push(relay); acc.push(relay);

View file

@ -22,16 +22,6 @@ const hashtagSchema = z.string().regex(/^\w{1,30}$/);
*/ */
const safeUrlSchema = z.string().max(2048).url(); const safeUrlSchema = z.string().max(2048).url();
/** WebSocket URL. */
const wsUrlSchema = z.string().refine((val) => {
try {
const { protocol } = new URL(val);
return protocol === 'wss:' || protocol === 'ws:';
} catch {
return false;
}
}, 'Invalid WebSocket URL');
/** https://github.com/colinhacks/zod/issues/1630#issuecomment-1365983831 */ /** https://github.com/colinhacks/zod/issues/1630#issuecomment-1365983831 */
const booleanParamSchema = z.enum(['true', 'false']).transform((value) => value === 'true'); const booleanParamSchema = z.enum(['true', 'false']).transform((value) => value === 'true');
@ -93,5 +83,4 @@ export {
safeUrlSchema, safeUrlSchema,
sizesSchema, sizesSchema,
walletSchema, walletSchema,
wsUrlSchema,
}; };

View file

@ -1,13 +1,11 @@
// deno-lint-ignore-file require-await // deno-lint-ignore-file require-await
import { type DittoDB, DittoPolyPg } from '@ditto/db'; import { type DittoDB, DittoPolyPg } from '@ditto/db';
import { NPool, NRelay1 } from '@nostrify/nostrify'; import { NPool, NRelay1 } from '@nostrify/nostrify';
import { logi } from '@soapbox/logi';
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { wsUrlSchema } from '@/schema.ts';
import { DittoPgStore } from '@/storages/DittoPgStore.ts'; import { DittoPgStore } from '@/storages/DittoPgStore.ts';
import { getRelays } from '@/utils/outbox.ts';
import { seedZapSplits } from '@/utils/zap-split.ts'; import { seedZapSplits } from '@/utils/zap-split.ts';
import { DittoPool } from '@/storages/DittoPool.ts';
export class Storages { export class Storages {
private static _db: Promise<DittoPgStore> | undefined; private static _db: Promise<DittoPgStore> | undefined;
@ -55,53 +53,8 @@ export class Storages {
public static async client(): Promise<NPool<NRelay1>> { public static async client(): Promise<NPool<NRelay1>> {
if (!this._client) { if (!this._client) {
this._client = (async () => { this._client = (async () => {
const db = await this.db(); const relay = await this.db();
return new DittoPool({ conf: Conf, relay });
const [relayList] = await db.query([
{ kinds: [10002], authors: [await Conf.signer.getPublicKey()], limit: 1 },
]);
const tags = relayList?.tags ?? [];
const activeRelays = tags.reduce((acc, [name, url, marker]) => {
const valid = wsUrlSchema.safeParse(url).success;
if (valid && name === 'r' && (!marker || marker === 'write')) {
acc.push(url);
}
return acc;
}, []);
logi({
level: 'info',
ns: 'ditto.pool',
msg: `connecting to ${activeRelays.length} relays`,
relays: activeRelays,
});
return new NPool({
open(url) {
return new NRelay1(url, {
// Skip event verification (it's done in the pipeline).
verifyEvent: () => true,
log(log) {
logi(log);
},
});
},
reqRouter: async (filters) => {
return new Map(activeRelays.map((relay) => {
return [relay, filters];
}));
},
eventRouter: async (event) => {
const relaySet = await getRelays(await Storages.db(), event.pubkey);
relaySet.delete(Conf.relay);
const relays = [...relaySet].slice(0, 4);
return relays;
},
});
})(); })();
} }
return this._client; return this._client;

View file

@ -0,0 +1,91 @@
// deno-lint-ignore-file require-await
import { DittoConf } from '@ditto/conf';
import { NostrEvent, NostrFilter, NPool, type NRelay, NRelay1 } from '@nostrify/nostrify';
import { logi } from '@soapbox/logi';
interface DittoPoolOpts {
conf: DittoConf;
relay: NRelay;
maxEventRelays?: number;
}
export class DittoPool extends NPool<NRelay1> {
private _opts: DittoPoolOpts;
constructor(opts: DittoPoolOpts) {
super({
open(url) {
return new NRelay1(url, {
// Skip event verification (it's done in the pipeline).
verifyEvent: () => true,
log: logi,
});
},
reqRouter: (filters) => {
return this.reqRouter(filters);
},
eventRouter: async (event) => {
return this.eventRouter(event);
},
});
this._opts = opts;
}
private async reqRouter(filters: NostrFilter[]): Promise<Map<string, NostrFilter[]>> {
const routes = new Map<string, NostrFilter[]>();
for (const relayUrl of await this.getRelayUrls({ marker: 'read' })) {
routes.set(relayUrl, filters);
}
return routes;
}
private async eventRouter(event: NostrEvent): Promise<string[]> {
const { conf, maxEventRelays = 4 } = this._opts;
const { pubkey } = event;
const relaySet = await this.getRelayUrls({ pubkey, marker: 'write' });
relaySet.delete(conf.relay);
return [...relaySet].slice(0, maxEventRelays);
}
private async getRelayUrls(opts: { pubkey?: string; marker?: 'read' | 'write' } = {}): Promise<Set<string>> {
const { conf, relay } = this._opts;
const relays = new Set<`wss://${string}`>();
const authors = new Set<string>([await conf.signer.getPublicKey()]);
if (opts.pubkey) {
authors.add(opts.pubkey);
}
const events = await relay.query([
{ kinds: [10002], authors: [...authors] },
]);
// Ensure user's own relay list is counted first.
if (opts.pubkey) {
events.sort((a) => a.pubkey === opts.pubkey ? -1 : 1);
}
for (const event of events) {
for (const [name, relayUrl, marker] of event.tags) {
if (name === 'r' && (!marker || !opts.marker || marker === opts.marker)) {
try {
const url = new URL(relayUrl);
if (url.protocol === 'wss:') {
relays.add(url.toString() as `wss://${string}`);
}
} catch {
// fallthrough
}
}
}
}
return relays;
}
}

View file

@ -1,29 +0,0 @@
import { MockRelay } from '@nostrify/nostrify/test';
import { eventFixture } from '@/test.ts';
import { getRelays } from '@/utils/outbox.ts';
import { assertEquals } from '@std/assert';
Deno.test('Get write relays - kind 10002', async () => {
const db = new MockRelay();
const relayListMetadata = await eventFixture('kind-10002-alex');
await db.event(relayListMetadata);
const relays = await getRelays(db, relayListMetadata.pubkey);
assertEquals(relays.size, 6);
});
Deno.test('Get write relays with invalid URL - kind 10002', async () => {
const db = new MockRelay();
const relayListMetadata = await eventFixture('kind-10002-alex');
relayListMetadata.tags[0] = ['r', 'yolo'];
await db.event(relayListMetadata);
const relays = await getRelays(db, relayListMetadata.pubkey);
assertEquals(relays.size, 5);
});

View file

@ -1,28 +0,0 @@
import { NStore } from '@nostrify/nostrify';
import { Conf } from '@/config.ts';
export async function getRelays(store: NStore, pubkey: string): Promise<Set<string>> {
const relays = new Set<`wss://${string}`>();
const events = await store.query([
{ kinds: [10002], authors: [pubkey, await Conf.signer.getPublicKey()], limit: 2 },
]);
for (const event of events) {
for (const [name, relay, marker] of event.tags) {
if (name === 'r' && (marker === 'write' || !marker)) {
try {
const url = new URL(relay);
if (url.protocol === 'wss:') {
relays.add(url.toString() as `wss://${string}`);
}
} catch (_e) {
// fall through
}
}
}
}
return relays;
}