diff --git a/src/pipeline.ts b/src/pipeline.ts index 1c7a891d..8ab6b894 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -9,7 +9,7 @@ import { isEphemeralKind } from '@/kinds.ts'; import { DVM } from '@/pipeline/DVM.ts'; import { updateStats } from '@/stats.ts'; import { hydrateEvents, purifyEvent } from '@/storages/hydrate.ts'; -import { cache, client, eventsDB, reqmeister } from '@/storages.ts'; +import { cache, eventsDB, reqmeister } from '@/storages.ts'; import { Sub } from '@/subs.ts'; import { getTagSet } from '@/tags.ts'; import { eventAge, isRelay, nostrDate, nostrNow, parseNip05, Time } from '@/utils.ts'; @@ -43,7 +43,6 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise; @@ -34,12 +36,16 @@ class PoolStore implements NStore { this.#publisher = opts.publisher; } - event(event: NostrEvent, opts: NStoreOpts = {}): Promise { + async event(event: NostrEvent, opts: NStoreOpts = {}): Promise { if (opts.signal?.aborted) return Promise.reject(abortError()); - const { relays = this.#relays } = opts; + + const relaySet = await getRelays(event.pubkey); + relaySet.delete(Conf.relay); + + const relays = [...relaySet].slice(0, 4); event = purifyEvent(event); - this.#debug('EVENT', event); + this.#debug('EVENT', event, relays); this.#pool.publish(event, relays); return Promise.resolve(); diff --git a/src/utils/api.ts b/src/utils/api.ts index 476f854f..c458c50e 100644 --- a/src/utils/api.ts +++ b/src/utils/api.ts @@ -14,7 +14,7 @@ import { import * as pipeline from '@/pipeline.ts'; import { AdminSigner } from '@/signers/AdminSigner.ts'; import { APISigner } from '@/signers/APISigner.ts'; -import { eventsDB } from '@/storages.ts'; +import { client, eventsDB } from '@/storages.ts'; import { nostrNow } from '@/utils.ts'; const debug = Debug('ditto:api'); @@ -89,7 +89,10 @@ async function createAdminEvent(t: EventStub, c: AppContext): Promise { debug('EVENT', event); try { - await pipeline.handleEvent(event, c.req.raw.signal); + await Promise.all([ + pipeline.handleEvent(event, c.req.raw.signal), + client.event(event), + ]); } catch (e) { if (e instanceof pipeline.RelayError) { throw new HTTPException(422, { diff --git a/src/utils/outbox.ts b/src/utils/outbox.ts new file mode 100644 index 00000000..8189fe25 --- /dev/null +++ b/src/utils/outbox.ts @@ -0,0 +1,27 @@ +import { Conf } from '@/config.ts'; +import { eventsDB } from '@/storages.ts'; + +export async function getRelays(pubkey: string): Promise> { + const relays = new Set<`wss://${string}`>(); + + const events = await eventsDB.query([ + { kinds: [10002], authors: [pubkey, Conf.pubkey], limit: 2 }, + ]); + + for (const event of events) { + for (const [name, relay, marker] of event.tags) { + if (name === 'r' && (marker === 'write' || !marker)) { + try { + const url = new URL(relay); + if (url.protocol === 'wss:') { + relays.add(url.toString() as `wss://${string}`); + } + } catch (_e) { + // do nothing + } + } + } + } + + return relays; +}