diff --git a/src/notify.ts b/src/notify.ts index cda22718..e62f7c50 100644 --- a/src/notify.ts +++ b/src/notify.ts @@ -1,36 +1,41 @@ -import { Semaphore } from '@lambdalisue/async'; +import { NostrEvent } from '@nostrify/nostrify'; import { Stickynotes } from '@soapbox/stickynotes'; import { pipelineEncounters } from '@/caches/pipelineEncounters.ts'; import { Conf } from '@/config.ts'; import * as pipeline from '@/pipeline.ts'; import { Storages } from '@/storages.ts'; +import { AsyncBatcher } from '@/utils/AsyncBatcher.ts'; -const sem = new Semaphore(1); const console = new Stickynotes('ditto:notify'); +const batcher = new AsyncBatcher( + async (ids) => { + const store = await Storages.db(); + const signal = AbortSignal.timeout(Conf.db.timeouts.default); + const events = await store.query([{ ids }], { signal }); + return ids.map((id) => events.find((e) => e.id === id)); + }, +); + export async function startNotify(): Promise { const { listen } = await Storages.database(); - const store = await Storages.db(); - listen('nostr_event', (id) => { + listen('nostr_event', async (id) => { if (pipelineEncounters.has(id)) { console.debug(`Skip event ${id} because it was already in the pipeline`); return; } - sem.lock(async () => { - try { - const signal = AbortSignal.timeout(Conf.db.timeouts.default); + try { + const signal = AbortSignal.timeout(Conf.db.timeouts.default); + const event = await batcher.add(id); - const [event] = await store.query([{ ids: [id], limit: 1 }], { signal }); - - if (event) { - await pipeline.handleEvent(event, { source: 'notify', signal }); - } - } catch (e) { - console.warn(e); + if (event) { + await pipeline.handleEvent(event, { source: 'notify', signal }); } - }); + } catch (e) { + console.warn(e); + } }); } diff --git a/src/utils/AsyncBatcher.test.ts b/src/utils/AsyncBatcher.test.ts new file mode 100644 index 00000000..f80b3684 --- /dev/null +++ b/src/utils/AsyncBatcher.test.ts @@ -0,0 +1,6 @@ +import { assert, assertEquals } from '@std/assert'; + +import { AsyncBatcher } from '@/utils/AsyncBatcher.ts'; + +Deno.test('AsyncBatcher', async () => { +}); diff --git a/src/utils/AsyncBatcher.ts b/src/utils/AsyncBatcher.ts new file mode 100644 index 00000000..79478cd7 --- /dev/null +++ b/src/utils/AsyncBatcher.ts @@ -0,0 +1,58 @@ +interface Query { + resolve: (result: O) => void; + reject: (error: unknown) => void; + payload: I; +} + +export class AsyncBatcher implements Disposable { + private queue: Query[] = []; + private tid: number | undefined; + + constructor( + private batchFunction: (payloads: I[]) => Promise, + private batchInterval: number = 1000, + ) {} + + add(payload: I): Promise { + const { promise, resolve, reject } = Promise.withResolvers(); + + this.queue.push({ resolve, reject, payload }); + + // Start the timer if it's not already running. + if (!this.tid) { + this.tid = setTimeout(() => this.processBatch(), this.batchInterval); + } + + return promise; + } + + private async processBatch(): Promise { + const batch = [...this.queue]; + this.queue = []; + + clearTimeout(this.tid); + this.tid = undefined; + + try { + const payloads = batch.map((q) => q.payload); + const results = await this.batchFunction(payloads); + + if (results.length !== batch.length) { + throw new Error('Batch function returned mismatched results.'); + } + + batch.forEach((q, i) => q.resolve(results[i])); + } catch (error) { + batch.forEach((q) => q.reject(error)); + } + } + + close(): void { + clearTimeout(this.tid); + this.queue.forEach((q) => q.reject(new Error('Batcher closed.'))); + } + + [Symbol.dispose]() { + this.close(); + } +}