diff --git a/fixtures/events/kind-10002-alex.json b/fixtures/events/kind-10002-alex.json new file mode 100644 index 00000000..629e56cc --- /dev/null +++ b/fixtures/events/kind-10002-alex.json @@ -0,0 +1,34 @@ +{ + "kind": 10002, + "id": "68fc04e23b07219f153a10947663b9dd7b271acbc03b82200e364e35de3e0bdd", + "pubkey": "0461fcbecc4c3374439932d6b8f11269ccdb7cc973ad7a50ae362db135a474dd", + "created_at": 1714969354, + "tags": [ + [ + "r", + "wss://gleasonator.dev/relay" + ], + [ + "r", + "wss://nosdrive.app/relay" + ], + [ + "r", + "wss://relay.mostr.pub/" + ], + [ + "r", + "wss://relay.primal.net/" + ], + [ + "r", + "wss://relay.snort.social/" + ], + [ + "r", + "wss://relay.damus.io/" + ] + ], + "content": "", + "sig": "cb7b1a75fe015d5c9481651379365bd5d098665b1bc7a453522177e2686eaa83581ec36f7a17429aad2541dad02c2c81023b81612f87f28fc57447fef1efab13" +} diff --git a/src/RelayError.test.ts b/src/RelayError.test.ts new file mode 100644 index 00000000..742b799f --- /dev/null +++ b/src/RelayError.test.ts @@ -0,0 +1,23 @@ +import { assertThrows } from '@std/assert'; + +import { RelayError } from '@/RelayError.ts'; + +Deno.test('Construct a RelayError from the reason message', () => { + assertThrows( + () => { + throw RelayError.fromReason('duplicate: already exists'); + }, + RelayError, + 'duplicate: already exists', + ); +}); + +Deno.test('Throw a new RelayError if the OK message is false', () => { + assertThrows( + () => { + RelayError.assert(['OK', 'yolo', false, 'error: bla bla bla']); + }, + RelayError, + 'error: bla bla bla', + ); +}); diff --git a/src/storages.ts b/src/storages.ts index 4aaca1c2..e0039973 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -1,18 +1,17 @@ // deno-lint-ignore-file require-await -import { RelayPoolWorker } from 'nostr-relaypool'; - import { Conf } from '@/config.ts'; import { DittoDB } from '@/db/DittoDB.ts'; import { AdminStore } from '@/storages/AdminStore.ts'; import { EventsDB } from '@/storages/EventsDB.ts'; -import { PoolStore } from '@/storages/pool-store.ts'; import { SearchStore } from '@/storages/search-store.ts'; import { InternalRelay } from '@/storages/InternalRelay.ts'; +import { NPool, NRelay1 } from '@nostrify/nostrify'; +import { getRelays } from '@/utils/outbox.ts'; export class Storages { private static _db: Promise | undefined; private static _admin: Promise | undefined; - private static _client: Promise | undefined; + private static _client: Promise | undefined; private static _pubsub: Promise | undefined; private static _search: Promise | undefined; @@ -44,7 +43,7 @@ export class Storages { } /** Relay pool storage. */ - public static async client(): Promise { + public static async client(): Promise { if (!this._client) { this._client = (async () => { const db = await this.db(); @@ -56,7 +55,7 @@ export class Storages { const tags = relayList?.tags ?? []; const activeRelays = tags.reduce((acc, [name, url, marker]) => { - if (name === 'r' && !marker) { + if (name === 'r' && (!marker || marker === 'write')) { acc.push(url); } return acc; @@ -64,22 +63,22 @@ export class Storages { console.log(`pool: connecting to ${activeRelays.length} relays.`); - const worker = new Worker('https://unpkg.com/nostr-relaypool2@0.6.34/lib/nostr-relaypool.worker.js', { - type: 'module', - }); + return new NPool({ + open(url) { + return new NRelay1(url); + }, + reqRouter: async (filters) => { + return new Map(activeRelays.map((relay) => { + return [relay, filters]; + })); + }, + eventRouter: async (event) => { + const relaySet = await getRelays(await Storages.db(), event.pubkey); + relaySet.delete(Conf.relay); - // @ts-ignore Wrong types. - const pool = new RelayPoolWorker(worker, activeRelays, { - autoReconnect: true, - // The pipeline verifies events. - skipVerification: true, - // The logging feature overwhelms the CPU and creates too many logs. - logErrorsAndNotices: false, - }); - - return new PoolStore({ - pool, - relays: activeRelays, + const relays = [...relaySet].slice(0, 4); + return relays; + }, }); })(); } diff --git a/src/storages/pool-store.ts b/src/storages/pool-store.ts deleted file mode 100644 index 54565091..00000000 --- a/src/storages/pool-store.ts +++ /dev/null @@ -1,103 +0,0 @@ -import { - NostrEvent, - NostrFilter, - NostrRelayCLOSED, - NostrRelayEOSE, - NostrRelayEVENT, - NRelay, - NSet, -} from '@nostrify/nostrify'; -import { Machina } from '@nostrify/nostrify/utils'; -import Debug from '@soapbox/stickynotes/debug'; -import { RelayPoolWorker } from 'nostr-relaypool'; -import { getFilterLimit, matchFilters } from 'nostr-tools'; - -import { Conf } from '@/config.ts'; -import { Storages } from '@/storages.ts'; -import { purifyEvent } from '@/storages/hydrate.ts'; -import { abortError } from '@/utils/abort.ts'; -import { getRelays } from '@/utils/outbox.ts'; - -interface PoolStoreOpts { - pool: InstanceType; - relays: WebSocket['url'][]; -} - -class PoolStore implements NRelay { - private debug = Debug('ditto:client'); - private pool: InstanceType; - private relays: WebSocket['url'][]; - - constructor(opts: PoolStoreOpts) { - this.pool = opts.pool; - this.relays = opts.relays; - } - - async event(event: NostrEvent, opts: { signal?: AbortSignal } = {}): Promise { - if (opts.signal?.aborted) return Promise.reject(abortError()); - - const relaySet = await getRelays(await Storages.db(), event.pubkey); - relaySet.delete(Conf.relay); - - const relays = [...relaySet].slice(0, 4); - - event = purifyEvent(event); - this.debug('EVENT', event, relays); - - this.pool.publish(event, relays); - return Promise.resolve(); - } - - async *req( - filters: NostrFilter[], - opts: { signal?: AbortSignal; limit?: number } = {}, - ): AsyncIterable { - this.debug('REQ', JSON.stringify(filters)); - - const uuid = crypto.randomUUID(); - const machina = new Machina(opts.signal); - - const unsub = this.pool.subscribe( - filters, - this.relays, - (event: NostrEvent | null) => { - if (event && matchFilters(filters, event)) { - machina.push(['EVENT', uuid, purifyEvent(event)]); - } - }, - undefined, - () => { - machina.push(['EOSE', uuid]); - }, - ); - - try { - for await (const msg of machina) { - yield msg; - } - } finally { - unsub(); - } - } - - async query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise { - const events = new NSet(); - - const limit = filters.reduce((result, filter) => result + getFilterLimit(filter), 0); - if (limit === 0) return []; - - for await (const msg of this.req(filters, opts)) { - if (msg[0] === 'EOSE') break; - if (msg[0] === 'EVENT') events.add(msg[2]); - if (msg[0] === 'CLOSED') throw new Error('Subscription closed'); - - if (events.size >= limit) { - break; - } - } - - return [...events]; - } -} - -export { PoolStore }; diff --git a/src/utils/api.ts b/src/utils/api.ts index 355a7a12..069442c9 100644 --- a/src/utils/api.ts +++ b/src/utils/api.ts @@ -14,6 +14,7 @@ import { RelayError } from '@/RelayError.ts'; import { AdminSigner } from '@/signers/AdminSigner.ts'; import { Storages } from '@/storages.ts'; import { nostrNow } from '@/utils.ts'; +import { purifyEvent } from '@/storages/hydrate.ts'; const debug = Debug('ditto:api'); @@ -152,7 +153,7 @@ async function publishEvent(event: NostrEvent, c: AppContext): Promise { + const db = new MockRelay(); + + const relayListMetadata = await eventFixture('kind-10002-alex'); + + await db.event(relayListMetadata); + + const relays = await getRelays(db, relayListMetadata.pubkey); + + assertEquals(relays.size, 6); +}); + +Deno.test('Get write relays with invalid URL - kind 10002', async () => { + const db = new MockRelay(); + + const relayListMetadata = await eventFixture('kind-10002-alex'); + relayListMetadata.tags[0] = ['r', 'yolo']; + + await db.event(relayListMetadata); + + const relays = await getRelays(db, relayListMetadata.pubkey); + + assertEquals(relays.size, 5); +}); diff --git a/src/utils/outbox.ts b/src/utils/outbox.ts index e5d7029c..72b83388 100644 --- a/src/utils/outbox.ts +++ b/src/utils/outbox.ts @@ -2,11 +2,11 @@ import { NStore } from '@nostrify/nostrify'; import { Conf } from '@/config.ts'; -export async function getRelays(store: NStore, _pubkey: string): Promise> { +export async function getRelays(store: NStore, pubkey: string): Promise> { const relays = new Set<`wss://${string}`>(); const events = await store.query([ - { kinds: [10002], authors: [/*pubkey, */ Conf.pubkey], limit: 2 }, + { kinds: [10002], authors: [pubkey, Conf.pubkey], limit: 2 }, ]); for (const event of events) {