From 2fd9524e3b0037b276c46a5d9a1626dfa28d4bc3 Mon Sep 17 00:00:00 2001 From: Siddharth Singh Date: Sat, 8 Jun 2024 00:55:02 +0530 Subject: [PATCH] replace nostr-relaypool with NPool; delete pool-store --- src/storages.ts | 31 ++++------- src/storages/pool-store.ts | 103 ------------------------------------- src/utils/api.ts | 4 +- 3 files changed, 13 insertions(+), 125 deletions(-) delete mode 100644 src/storages/pool-store.ts diff --git a/src/storages.ts b/src/storages.ts index f8f206d1..d57eb04a 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -1,18 +1,16 @@ // deno-lint-ignore-file require-await -import { RelayPoolWorker } from 'nostr-relaypool'; - import { Conf } from '@/config.ts'; import { DittoDB } from '@/db/DittoDB.ts'; import { EventsDB } from '@/storages/EventsDB.ts'; -import { PoolStore } from '@/storages/pool-store.ts'; import { SearchStore } from '@/storages/search-store.ts'; import { InternalRelay } from '@/storages/InternalRelay.ts'; import { UserStore } from '@/storages/UserStore.ts'; +import { NPool, NRelay1 } from '@nostrify/nostrify'; export class Storages { private static _db: Promise | undefined; private static _admin: Promise | undefined; - private static _client: Promise | undefined; + private static _client: Promise | undefined; private static _pubsub: Promise | undefined; private static _search: Promise | undefined; @@ -44,7 +42,7 @@ export class Storages { } /** Relay pool storage. */ - public static async client(): Promise { + public static async client(): Promise { if (!this._client) { this._client = (async () => { const db = await this.db(); @@ -64,23 +62,14 @@ export class Storages { console.log(`pool: connecting to ${activeRelays.length} relays.`); - const worker = new Worker('https://unpkg.com/nostr-relaypool2@0.6.34/lib/nostr-relaypool.worker.js', { - type: 'module', - }); - - // @ts-ignore Wrong types. - const pool = new RelayPoolWorker(worker, activeRelays, { - autoReconnect: true, - // The pipeline verifies events. - skipVerification: true, - // The logging feature overwhelms the CPU and creates too many logs. - logErrorsAndNotices: false, - }); - - return new PoolStore({ - pool, - relays: activeRelays, + const pool = new NPool({ + open(url: string) { + return new NRelay1(url); + }, + reqRelays: async () => activeRelays, + eventRelays: async () => activeRelays, }); + return pool; })(); } return this._client; diff --git a/src/storages/pool-store.ts b/src/storages/pool-store.ts deleted file mode 100644 index 54565091..00000000 --- a/src/storages/pool-store.ts +++ /dev/null @@ -1,103 +0,0 @@ -import { - NostrEvent, - NostrFilter, - NostrRelayCLOSED, - NostrRelayEOSE, - NostrRelayEVENT, - NRelay, - NSet, -} from '@nostrify/nostrify'; -import { Machina } from '@nostrify/nostrify/utils'; -import Debug from '@soapbox/stickynotes/debug'; -import { RelayPoolWorker } from 'nostr-relaypool'; -import { getFilterLimit, matchFilters } from 'nostr-tools'; - -import { Conf } from '@/config.ts'; -import { Storages } from '@/storages.ts'; -import { purifyEvent } from '@/storages/hydrate.ts'; -import { abortError } from '@/utils/abort.ts'; -import { getRelays } from '@/utils/outbox.ts'; - -interface PoolStoreOpts { - pool: InstanceType; - relays: WebSocket['url'][]; -} - -class PoolStore implements NRelay { - private debug = Debug('ditto:client'); - private pool: InstanceType; - private relays: WebSocket['url'][]; - - constructor(opts: PoolStoreOpts) { - this.pool = opts.pool; - this.relays = opts.relays; - } - - async event(event: NostrEvent, opts: { signal?: AbortSignal } = {}): Promise { - if (opts.signal?.aborted) return Promise.reject(abortError()); - - const relaySet = await getRelays(await Storages.db(), event.pubkey); - relaySet.delete(Conf.relay); - - const relays = [...relaySet].slice(0, 4); - - event = purifyEvent(event); - this.debug('EVENT', event, relays); - - this.pool.publish(event, relays); - return Promise.resolve(); - } - - async *req( - filters: NostrFilter[], - opts: { signal?: AbortSignal; limit?: number } = {}, - ): AsyncIterable { - this.debug('REQ', JSON.stringify(filters)); - - const uuid = crypto.randomUUID(); - const machina = new Machina(opts.signal); - - const unsub = this.pool.subscribe( - filters, - this.relays, - (event: NostrEvent | null) => { - if (event && matchFilters(filters, event)) { - machina.push(['EVENT', uuid, purifyEvent(event)]); - } - }, - undefined, - () => { - machina.push(['EOSE', uuid]); - }, - ); - - try { - for await (const msg of machina) { - yield msg; - } - } finally { - unsub(); - } - } - - async query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise { - const events = new NSet(); - - const limit = filters.reduce((result, filter) => result + getFilterLimit(filter), 0); - if (limit === 0) return []; - - for await (const msg of this.req(filters, opts)) { - if (msg[0] === 'EOSE') break; - if (msg[0] === 'EVENT') events.add(msg[2]); - if (msg[0] === 'CLOSED') throw new Error('Subscription closed'); - - if (events.size >= limit) { - break; - } - } - - return [...events]; - } -} - -export { PoolStore }; diff --git a/src/utils/api.ts b/src/utils/api.ts index 3cc8b7d2..3f99fef6 100644 --- a/src/utils/api.ts +++ b/src/utils/api.ts @@ -13,6 +13,7 @@ import { RelayError } from '@/RelayError.ts'; import { AdminSigner } from '@/signers/AdminSigner.ts'; import { Storages } from '@/storages.ts'; import { nostrNow } from '@/utils.ts'; +import { purifyEvent } from '@/storages/hydrate.ts'; const debug = Debug('ditto:api'); @@ -113,7 +114,8 @@ async function publishEvent(event: NostrEvent, c: AppContext): Promise