diff --git a/scripts/db-import.ts b/scripts/db-import.ts index f33e7e56..45a530d3 100644 --- a/scripts/db-import.ts +++ b/scripts/db-import.ts @@ -1,10 +1,13 @@ +import { Semaphore } from '@lambdalisue/async'; import { NostrEvent } from '@nostrify/nostrify'; import { JsonParseStream } from '@std/json/json-parse-stream'; import { TextLineStream } from '@std/streams/text-line-stream'; +import { Conf } from '@/config.ts'; import { Storages } from '@/storages.ts'; const store = await Storages.db(); +const sem = new Semaphore(Conf.pg.poolSize); console.warn('Importing events...'); @@ -15,9 +18,26 @@ const readable = Deno.stdin.readable .pipeThrough(new TextLineStream()) .pipeThrough(new JsonParseStream()); -for await (const event of readable) { - await store.event(event as unknown as NostrEvent); - count++; +for await (const line of readable) { + const event = line as unknown as NostrEvent; + + while (sem.locked) { + await new Promise((resolve) => setTimeout(resolve, 0)); + } + + sem.lock(async () => { + try { + await store.event(event); + console.warn(`(${count}) Event<${event.kind}> ${event.id}`); + } catch (error) { + if (error.message.includes('violates unique constraint')) { + console.warn(`(${count}) Skipping existing event... ${event.id}`); + } else { + console.error(error); + } + } + count++; + }); } console.warn(`Imported ${count} events`); diff --git a/src/config.ts b/src/config.ts index d5b94da3..dabb3bb7 100644 --- a/src/config.ts +++ b/src/config.ts @@ -235,7 +235,7 @@ class Conf { static pg = { /** Number of connections to use in the pool. */ get poolSize(): number { - return Number(Deno.env.get('PG_POOL_SIZE') ?? 10); + return Number(Deno.env.get('PG_POOL_SIZE') ?? 20); }, }; /** Whether to enable requesting events from known relays. */