mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 11:29:46 +00:00
replace nostr-relaypool with NPool; delete pool-store
This commit is contained in:
parent
d2cf2e2e27
commit
2fd9524e3b
3 changed files with 13 additions and 125 deletions
|
|
@ -1,18 +1,16 @@
|
|||
// deno-lint-ignore-file require-await
|
||||
import { RelayPoolWorker } from 'nostr-relaypool';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
import { DittoDB } from '@/db/DittoDB.ts';
|
||||
import { EventsDB } from '@/storages/EventsDB.ts';
|
||||
import { PoolStore } from '@/storages/pool-store.ts';
|
||||
import { SearchStore } from '@/storages/search-store.ts';
|
||||
import { InternalRelay } from '@/storages/InternalRelay.ts';
|
||||
import { UserStore } from '@/storages/UserStore.ts';
|
||||
import { NPool, NRelay1 } from '@nostrify/nostrify';
|
||||
|
||||
export class Storages {
|
||||
private static _db: Promise<EventsDB> | undefined;
|
||||
private static _admin: Promise<UserStore> | undefined;
|
||||
private static _client: Promise<PoolStore> | undefined;
|
||||
private static _client: Promise<NPool> | undefined;
|
||||
private static _pubsub: Promise<InternalRelay> | undefined;
|
||||
private static _search: Promise<SearchStore> | undefined;
|
||||
|
||||
|
|
@ -44,7 +42,7 @@ export class Storages {
|
|||
}
|
||||
|
||||
/** Relay pool storage. */
|
||||
public static async client(): Promise<PoolStore> {
|
||||
public static async client(): Promise<NPool> {
|
||||
if (!this._client) {
|
||||
this._client = (async () => {
|
||||
const db = await this.db();
|
||||
|
|
@ -64,23 +62,14 @@ export class Storages {
|
|||
|
||||
console.log(`pool: connecting to ${activeRelays.length} relays.`);
|
||||
|
||||
const worker = new Worker('https://unpkg.com/nostr-relaypool2@0.6.34/lib/nostr-relaypool.worker.js', {
|
||||
type: 'module',
|
||||
});
|
||||
|
||||
// @ts-ignore Wrong types.
|
||||
const pool = new RelayPoolWorker(worker, activeRelays, {
|
||||
autoReconnect: true,
|
||||
// The pipeline verifies events.
|
||||
skipVerification: true,
|
||||
// The logging feature overwhelms the CPU and creates too many logs.
|
||||
logErrorsAndNotices: false,
|
||||
});
|
||||
|
||||
return new PoolStore({
|
||||
pool,
|
||||
relays: activeRelays,
|
||||
const pool = new NPool({
|
||||
open(url: string) {
|
||||
return new NRelay1(url);
|
||||
},
|
||||
reqRelays: async () => activeRelays,
|
||||
eventRelays: async () => activeRelays,
|
||||
});
|
||||
return pool;
|
||||
})();
|
||||
}
|
||||
return this._client;
|
||||
|
|
|
|||
|
|
@ -1,103 +0,0 @@
|
|||
import {
|
||||
NostrEvent,
|
||||
NostrFilter,
|
||||
NostrRelayCLOSED,
|
||||
NostrRelayEOSE,
|
||||
NostrRelayEVENT,
|
||||
NRelay,
|
||||
NSet,
|
||||
} from '@nostrify/nostrify';
|
||||
import { Machina } from '@nostrify/nostrify/utils';
|
||||
import Debug from '@soapbox/stickynotes/debug';
|
||||
import { RelayPoolWorker } from 'nostr-relaypool';
|
||||
import { getFilterLimit, matchFilters } from 'nostr-tools';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
import { Storages } from '@/storages.ts';
|
||||
import { purifyEvent } from '@/storages/hydrate.ts';
|
||||
import { abortError } from '@/utils/abort.ts';
|
||||
import { getRelays } from '@/utils/outbox.ts';
|
||||
|
||||
interface PoolStoreOpts {
|
||||
pool: InstanceType<typeof RelayPoolWorker>;
|
||||
relays: WebSocket['url'][];
|
||||
}
|
||||
|
||||
class PoolStore implements NRelay {
|
||||
private debug = Debug('ditto:client');
|
||||
private pool: InstanceType<typeof RelayPoolWorker>;
|
||||
private relays: WebSocket['url'][];
|
||||
|
||||
constructor(opts: PoolStoreOpts) {
|
||||
this.pool = opts.pool;
|
||||
this.relays = opts.relays;
|
||||
}
|
||||
|
||||
async event(event: NostrEvent, opts: { signal?: AbortSignal } = {}): Promise<void> {
|
||||
if (opts.signal?.aborted) return Promise.reject(abortError());
|
||||
|
||||
const relaySet = await getRelays(await Storages.db(), event.pubkey);
|
||||
relaySet.delete(Conf.relay);
|
||||
|
||||
const relays = [...relaySet].slice(0, 4);
|
||||
|
||||
event = purifyEvent(event);
|
||||
this.debug('EVENT', event, relays);
|
||||
|
||||
this.pool.publish(event, relays);
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
async *req(
|
||||
filters: NostrFilter[],
|
||||
opts: { signal?: AbortSignal; limit?: number } = {},
|
||||
): AsyncIterable<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
|
||||
this.debug('REQ', JSON.stringify(filters));
|
||||
|
||||
const uuid = crypto.randomUUID();
|
||||
const machina = new Machina<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED>(opts.signal);
|
||||
|
||||
const unsub = this.pool.subscribe(
|
||||
filters,
|
||||
this.relays,
|
||||
(event: NostrEvent | null) => {
|
||||
if (event && matchFilters(filters, event)) {
|
||||
machina.push(['EVENT', uuid, purifyEvent(event)]);
|
||||
}
|
||||
},
|
||||
undefined,
|
||||
() => {
|
||||
machina.push(['EOSE', uuid]);
|
||||
},
|
||||
);
|
||||
|
||||
try {
|
||||
for await (const msg of machina) {
|
||||
yield msg;
|
||||
}
|
||||
} finally {
|
||||
unsub();
|
||||
}
|
||||
}
|
||||
|
||||
async query(filters: NostrFilter[], opts: { signal?: AbortSignal; limit?: number } = {}): Promise<NostrEvent[]> {
|
||||
const events = new NSet();
|
||||
|
||||
const limit = filters.reduce((result, filter) => result + getFilterLimit(filter), 0);
|
||||
if (limit === 0) return [];
|
||||
|
||||
for await (const msg of this.req(filters, opts)) {
|
||||
if (msg[0] === 'EOSE') break;
|
||||
if (msg[0] === 'EVENT') events.add(msg[2]);
|
||||
if (msg[0] === 'CLOSED') throw new Error('Subscription closed');
|
||||
|
||||
if (events.size >= limit) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return [...events];
|
||||
}
|
||||
}
|
||||
|
||||
export { PoolStore };
|
||||
|
|
@ -13,6 +13,7 @@ import { RelayError } from '@/RelayError.ts';
|
|||
import { AdminSigner } from '@/signers/AdminSigner.ts';
|
||||
import { Storages } from '@/storages.ts';
|
||||
import { nostrNow } from '@/utils.ts';
|
||||
import { purifyEvent } from '@/storages/hydrate.ts';
|
||||
|
||||
const debug = Debug('ditto:api');
|
||||
|
||||
|
|
@ -113,7 +114,8 @@ async function publishEvent(event: NostrEvent, c: AppContext): Promise<NostrEven
|
|||
try {
|
||||
await pipeline.handleEvent(event, c.req.raw.signal);
|
||||
const client = await Storages.client();
|
||||
await client.event(event);
|
||||
console.log('publishing event', event, 'to', client);
|
||||
await client.event(purifyEvent(event));
|
||||
} catch (e) {
|
||||
if (e instanceof RelayError) {
|
||||
throw new HTTPException(422, {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue