mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 11:29:46 +00:00
Merge branch 'use-npool-nostrify' into 'main'
Replace PoolStore with NPool & add tests for outbox.ts file and RelayError.ts file Closes #148, #180, and #181 See merge request soapbox-pub/ditto!424
This commit is contained in:
commit
9cb85e42d2
7 changed files with 110 additions and 127 deletions
34
fixtures/events/kind-10002-alex.json
Normal file
34
fixtures/events/kind-10002-alex.json
Normal file
|
|
@ -0,0 +1,34 @@
|
||||||
|
{
|
||||||
|
"kind": 10002,
|
||||||
|
"id": "68fc04e23b07219f153a10947663b9dd7b271acbc03b82200e364e35de3e0bdd",
|
||||||
|
"pubkey": "0461fcbecc4c3374439932d6b8f11269ccdb7cc973ad7a50ae362db135a474dd",
|
||||||
|
"created_at": 1714969354,
|
||||||
|
"tags": [
|
||||||
|
[
|
||||||
|
"r",
|
||||||
|
"wss://gleasonator.dev/relay"
|
||||||
|
],
|
||||||
|
[
|
||||||
|
"r",
|
||||||
|
"wss://nosdrive.app/relay"
|
||||||
|
],
|
||||||
|
[
|
||||||
|
"r",
|
||||||
|
"wss://relay.mostr.pub/"
|
||||||
|
],
|
||||||
|
[
|
||||||
|
"r",
|
||||||
|
"wss://relay.primal.net/"
|
||||||
|
],
|
||||||
|
[
|
||||||
|
"r",
|
||||||
|
"wss://relay.snort.social/"
|
||||||
|
],
|
||||||
|
[
|
||||||
|
"r",
|
||||||
|
"wss://relay.damus.io/"
|
||||||
|
]
|
||||||
|
],
|
||||||
|
"content": "",
|
||||||
|
"sig": "cb7b1a75fe015d5c9481651379365bd5d098665b1bc7a453522177e2686eaa83581ec36f7a17429aad2541dad02c2c81023b81612f87f28fc57447fef1efab13"
|
||||||
|
}
|
||||||
23
src/RelayError.test.ts
Normal file
23
src/RelayError.test.ts
Normal file
|
|
@ -0,0 +1,23 @@
|
||||||
|
import { assertThrows } from '@std/assert';
|
||||||
|
|
||||||
|
import { RelayError } from '@/RelayError.ts';
|
||||||
|
|
||||||
|
Deno.test('Construct a RelayError from the reason message', () => {
|
||||||
|
assertThrows(
|
||||||
|
() => {
|
||||||
|
throw RelayError.fromReason('duplicate: already exists');
|
||||||
|
},
|
||||||
|
RelayError,
|
||||||
|
'duplicate: already exists',
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
Deno.test('Throw a new RelayError if the OK message is false', () => {
|
||||||
|
assertThrows(
|
||||||
|
() => {
|
||||||
|
RelayError.assert(['OK', 'yolo', false, 'error: bla bla bla']);
|
||||||
|
},
|
||||||
|
RelayError,
|
||||||
|
'error: bla bla bla',
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
@ -1,18 +1,17 @@
|
||||||
// deno-lint-ignore-file require-await
|
// deno-lint-ignore-file require-await
|
||||||
import { RelayPoolWorker } from 'nostr-relaypool';
|
|
||||||
|
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
import { DittoDB } from '@/db/DittoDB.ts';
|
import { DittoDB } from '@/db/DittoDB.ts';
|
||||||
import { AdminStore } from '@/storages/AdminStore.ts';
|
import { AdminStore } from '@/storages/AdminStore.ts';
|
||||||
import { EventsDB } from '@/storages/EventsDB.ts';
|
import { EventsDB } from '@/storages/EventsDB.ts';
|
||||||
import { PoolStore } from '@/storages/pool-store.ts';
|
|
||||||
import { SearchStore } from '@/storages/search-store.ts';
|
import { SearchStore } from '@/storages/search-store.ts';
|
||||||
import { InternalRelay } from '@/storages/InternalRelay.ts';
|
import { InternalRelay } from '@/storages/InternalRelay.ts';
|
||||||
|
import { NPool, NRelay1 } from '@nostrify/nostrify';
|
||||||
|
import { getRelays } from '@/utils/outbox.ts';
|
||||||
|
|
||||||
export class Storages {
|
export class Storages {
|
||||||
private static _db: Promise<EventsDB> | undefined;
|
private static _db: Promise<EventsDB> | undefined;
|
||||||
private static _admin: Promise<AdminStore> | undefined;
|
private static _admin: Promise<AdminStore> | undefined;
|
||||||
private static _client: Promise<PoolStore> | undefined;
|
private static _client: Promise<NPool> | undefined;
|
||||||
private static _pubsub: Promise<InternalRelay> | undefined;
|
private static _pubsub: Promise<InternalRelay> | undefined;
|
||||||
private static _search: Promise<SearchStore> | undefined;
|
private static _search: Promise<SearchStore> | undefined;
|
||||||
|
|
||||||
|
|
@ -44,7 +43,7 @@ export class Storages {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Relay pool storage. */
|
/** Relay pool storage. */
|
||||||
public static async client(): Promise<PoolStore> {
|
public static async client(): Promise<NPool> {
|
||||||
if (!this._client) {
|
if (!this._client) {
|
||||||
this._client = (async () => {
|
this._client = (async () => {
|
||||||
const db = await this.db();
|
const db = await this.db();
|
||||||
|
|
@ -56,7 +55,7 @@ export class Storages {
|
||||||
const tags = relayList?.tags ?? [];
|
const tags = relayList?.tags ?? [];
|
||||||
|
|
||||||
const activeRelays = tags.reduce((acc, [name, url, marker]) => {
|
const activeRelays = tags.reduce((acc, [name, url, marker]) => {
|
||||||
if (name === 'r' && !marker) {
|
if (name === 'r' && (!marker || marker === 'write')) {
|
||||||
acc.push(url);
|
acc.push(url);
|
||||||
}
|
}
|
||||||
return acc;
|
return acc;
|
||||||
|
|
@ -64,22 +63,22 @@ export class Storages {
|
||||||
|
|
||||||
console.log(`pool: connecting to ${activeRelays.length} relays.`);
|
console.log(`pool: connecting to ${activeRelays.length} relays.`);
|
||||||
|
|
||||||
const worker = new Worker('https://unpkg.com/nostr-relaypool2@0.6.34/lib/nostr-relaypool.worker.js', {
|
return new NPool({
|
||||||
type: 'module',
|
open(url) {
|
||||||
});
|
return new NRelay1(url);
|
||||||
|
},
|
||||||
|
reqRouter: async (filters) => {
|
||||||
|
return new Map(activeRelays.map((relay) => {
|
||||||
|
return [relay, filters];
|
||||||
|
}));
|
||||||
|
},
|
||||||
|
eventRouter: async (event) => {
|
||||||
|
const relaySet = await getRelays(await Storages.db(), event.pubkey);
|
||||||
|
relaySet.delete(Conf.relay);
|
||||||
|
|
||||||
// @ts-ignore Wrong types.
|
const relays = [...relaySet].slice(0, 4);
|
||||||
const pool = new RelayPoolWorker(worker, activeRelays, {
|
return relays;
|
||||||
autoReconnect: true,
|
},
|
||||||
// The pipeline verifies events.
|
|
||||||
skipVerification: true,
|
|
||||||
// The logging feature overwhelms the CPU and creates too many logs.
|
|
||||||
logErrorsAndNotices: false,
|
|
||||||
});
|
|
||||||
|
|
||||||
return new PoolStore({
|
|
||||||
pool,
|
|
||||||
relays: activeRelays,
|
|
||||||
});
|
});
|
||||||
})();
|
})();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,103 +0,0 @@
|
||||||
import {
|
|
||||||
NostrEvent,
|
|
||||||
NostrFilter,
|
|
||||||
NostrRelayCLOSED,
|
|
||||||
NostrRelayEOSE,
|
|
||||||
NostrRelayEVENT,
|
|
||||||
NRelay,
|
|
||||||
NSet,
|
|
||||||
} from '@nostrify/nostrify';
|
|
||||||
import { Machina } from '@nostrify/nostrify/utils';
|
|
||||||
import Debug from '@soapbox/stickynotes/debug';
|
|
||||||
import { RelayPoolWorker } from 'nostr-relaypool';
|
|
||||||
import { getFilterLimit, matchFilters } from 'nostr-tools';
|
|
||||||
|
|
||||||
import { Conf } from '@/config.ts';
|
|
||||||
import { Storages } from '@/storages.ts';
|
|
||||||
import { purifyEvent } from '@/storages/hydrate.ts';
|
|
||||||
import { abortError } from '@/utils/abort.ts';
|
|
||||||
import { getRelays } from '@/utils/outbox.ts';
|
|
||||||
|
|
||||||
interface PoolStoreOpts {
|
|
||||||
pool: InstanceType<typeof RelayPoolWorker>;
|
|
||||||
relays: WebSocket['url'][];
|
|
||||||
}
|
|
||||||
|
|
||||||
class PoolStore implements NRelay {
|
|
||||||
private debug = Debug('ditto:client');
|
|
||||||
private pool: InstanceType<typeof RelayPoolWorker>;
|
|
||||||
private relays: WebSocket['url'][];
|
|
||||||
|
|
||||||
constructor(opts: PoolStoreOpts) {
|
|
||||||
this.pool = opts.pool;
|
|
||||||
this.relays = opts.relays;
|
|
||||||
}
|
|
||||||
|
|
||||||
async event(event: NostrEvent, opts: { signal?: AbortSignal } = {}): Promise<void> {
|
|
||||||
if (opts.signal?.aborted) return Promise.reject(abortError());
|
|
||||||
|
|
||||||
const relaySet = await getRelays(await Storages.db(), event.pubkey);
|
|
||||||
relaySet.delete(Conf.relay);
|
|
||||||
|
|
||||||
const relays = [...relaySet].slice(0, 4);
|
|
||||||
|
|
||||||
event = purifyEvent(event);
|
|
||||||
this.debug('EVENT', event, relays);
|
|
||||||
|
|
||||||
this.pool.publish(event, relays);
|
|
||||||
return Promise.resolve();
|
|
||||||
}
|
|
||||||
|
|
||||||
async *req(
|
|
||||||
filters: NostrFilter[],
|
|
||||||
opts: { signal?: AbortSignal; limit?: number } = {},
|
|
||||||
): AsyncIterable<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
|
|
||||||
this.debug('REQ', JSON.stringify(filters));
|
|
||||||
|
|
||||||
const uuid = crypto.randomUUID();
|
|
||||||
const machina = new Machina<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED>(opts.signal);
|
|
||||||
|
|
||||||
const unsub = this.pool.subscribe(
|
|
||||||
filters,
|
|
||||||
this.relays,
|
|
||||||
(event: NostrEvent | null) => {
|
|
||||||
if (event && matchFilters(filters, event)) {
|
|
||||||
machina.push(['EVENT', uuid, purifyEvent(event)]);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
undefined,
|
|
||||||
() => {
|
|
||||||
machina.push(['EOSE', uuid]);
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
try {
|
|
||||||
for await (const msg of machina) {
|
|
||||||
yield msg;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
unsub();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise<NostrEvent[]> {
|
|
||||||
const events = new NSet();
|
|
||||||
|
|
||||||
const limit = filters.reduce((result, filter) => result + getFilterLimit(filter), 0);
|
|
||||||
if (limit === 0) return [];
|
|
||||||
|
|
||||||
for await (const msg of this.req(filters, opts)) {
|
|
||||||
if (msg[0] === 'EOSE') break;
|
|
||||||
if (msg[0] === 'EVENT') events.add(msg[2]);
|
|
||||||
if (msg[0] === 'CLOSED') throw new Error('Subscription closed');
|
|
||||||
|
|
||||||
if (events.size >= limit) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return [...events];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export { PoolStore };
|
|
||||||
|
|
@ -14,6 +14,7 @@ import { RelayError } from '@/RelayError.ts';
|
||||||
import { AdminSigner } from '@/signers/AdminSigner.ts';
|
import { AdminSigner } from '@/signers/AdminSigner.ts';
|
||||||
import { Storages } from '@/storages.ts';
|
import { Storages } from '@/storages.ts';
|
||||||
import { nostrNow } from '@/utils.ts';
|
import { nostrNow } from '@/utils.ts';
|
||||||
|
import { purifyEvent } from '@/storages/hydrate.ts';
|
||||||
|
|
||||||
const debug = Debug('ditto:api');
|
const debug = Debug('ditto:api');
|
||||||
|
|
||||||
|
|
@ -152,7 +153,7 @@ async function publishEvent(event: NostrEvent, c: AppContext): Promise<NostrEven
|
||||||
try {
|
try {
|
||||||
await pipeline.handleEvent(event, c.req.raw.signal);
|
await pipeline.handleEvent(event, c.req.raw.signal);
|
||||||
const client = await Storages.client();
|
const client = await Storages.client();
|
||||||
await client.event(event);
|
await client.event(purifyEvent(event));
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
if (e instanceof RelayError) {
|
if (e instanceof RelayError) {
|
||||||
throw new HTTPException(422, {
|
throw new HTTPException(422, {
|
||||||
|
|
|
||||||
29
src/utils/outbox.test.ts
Normal file
29
src/utils/outbox.test.ts
Normal file
|
|
@ -0,0 +1,29 @@
|
||||||
|
import { MockRelay } from '@nostrify/nostrify/test';
|
||||||
|
import { eventFixture } from '@/test.ts';
|
||||||
|
import { getRelays } from '@/utils/outbox.ts';
|
||||||
|
import { assertEquals } from '@std/assert';
|
||||||
|
|
||||||
|
Deno.test('Get write relays - kind 10002', async () => {
|
||||||
|
const db = new MockRelay();
|
||||||
|
|
||||||
|
const relayListMetadata = await eventFixture('kind-10002-alex');
|
||||||
|
|
||||||
|
await db.event(relayListMetadata);
|
||||||
|
|
||||||
|
const relays = await getRelays(db, relayListMetadata.pubkey);
|
||||||
|
|
||||||
|
assertEquals(relays.size, 6);
|
||||||
|
});
|
||||||
|
|
||||||
|
Deno.test('Get write relays with invalid URL - kind 10002', async () => {
|
||||||
|
const db = new MockRelay();
|
||||||
|
|
||||||
|
const relayListMetadata = await eventFixture('kind-10002-alex');
|
||||||
|
relayListMetadata.tags[0] = ['r', 'yolo'];
|
||||||
|
|
||||||
|
await db.event(relayListMetadata);
|
||||||
|
|
||||||
|
const relays = await getRelays(db, relayListMetadata.pubkey);
|
||||||
|
|
||||||
|
assertEquals(relays.size, 5);
|
||||||
|
});
|
||||||
|
|
@ -2,11 +2,11 @@ import { NStore } from '@nostrify/nostrify';
|
||||||
|
|
||||||
import { Conf } from '@/config.ts';
|
import { Conf } from '@/config.ts';
|
||||||
|
|
||||||
export async function getRelays(store: NStore, _pubkey: string): Promise<Set<string>> {
|
export async function getRelays(store: NStore, pubkey: string): Promise<Set<string>> {
|
||||||
const relays = new Set<`wss://${string}`>();
|
const relays = new Set<`wss://${string}`>();
|
||||||
|
|
||||||
const events = await store.query([
|
const events = await store.query([
|
||||||
{ kinds: [10002], authors: [/*pubkey, */ Conf.pubkey], limit: 2 },
|
{ kinds: [10002], authors: [pubkey, Conf.pubkey], limit: 2 },
|
||||||
]);
|
]);
|
||||||
|
|
||||||
for (const event of events) {
|
for (const event of events) {
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue