diff --git a/deno.json b/deno.json index aa9729e1..964490c6 100644 --- a/deno.json +++ b/deno.json @@ -5,6 +5,8 @@ "start": "deno run -A src/server.ts", "dev": "deno run -A --watch src/server.ts", "hook": "deno run --allow-read --allow-run --allow-write https://deno.land/x/deno_hooks@0.1.1/mod.ts", + "db:export": "deno run -A scripts/db-export.ts", + "db:import": "deno run -A scripts/db-import.ts", "db:migrate": "deno run -A scripts/db-migrate.ts", "nostr:pull": "deno run -A scripts/nostr-pull.ts", "debug": "deno run -A --inspect src/server.ts", @@ -28,7 +30,7 @@ "@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1", "@lambdalisue/async": "jsr:@lambdalisue/async@^2.1.1", "@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0", - "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.28.0", + "@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.29.0", "@scure/base": "npm:@scure/base@^1.1.6", "@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs", "@soapbox/kysely-deno-sqlite": "jsr:@soapbox/kysely-deno-sqlite@^2.1.0", @@ -43,6 +45,7 @@ "@std/media-types": "jsr:@std/media-types@^0.224.1", "@std/streams": "jsr:@std/streams@^0.223.0", "comlink": "npm:comlink@^4.4.1", + "comlink-async-generator": "npm:comlink-async-generator@^0.0.1", "deno-safe-fetch/load": "https://gitlab.com/soapbox-pub/deno-safe-fetch/-/raw/v1.0.0/load.ts", "deno.json": "./deno.json", "entities": "npm:entities@^4.5.0", diff --git a/deno.lock b/deno.lock index 2f1f6090..5b403d44 100644 --- a/deno.lock +++ b/deno.lock @@ -14,7 +14,7 @@ "jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.5", "jsr:@nostrify/nostrify@^0.22.4": "jsr:@nostrify/nostrify@0.22.4", "jsr:@nostrify/nostrify@^0.22.5": "jsr:@nostrify/nostrify@0.22.5", - "jsr:@nostrify/nostrify@^0.28.0": "jsr:@nostrify/nostrify@0.28.0", + "jsr:@nostrify/nostrify@^0.29.0": "jsr:@nostrify/nostrify@0.29.0", "jsr:@soapbox/kysely-deno-sqlite@^2.1.0": "jsr:@soapbox/kysely-deno-sqlite@2.2.0", "jsr:@soapbox/stickynotes@^0.4.0": "jsr:@soapbox/stickynotes@0.4.0", "jsr:@std/assert@^0.217.0": "jsr:@std/assert@0.217.0", @@ -24,6 +24,7 @@ "jsr:@std/bytes@^0.224.0": "jsr:@std/bytes@0.224.0", "jsr:@std/bytes@^1.0.0-rc.3": "jsr:@std/bytes@1.0.0", "jsr:@std/bytes@^1.0.1-rc.3": "jsr:@std/bytes@1.0.2", + "jsr:@std/bytes@^1.0.2-rc.3": "jsr:@std/bytes@1.0.2", "jsr:@std/crypto@^0.224.0": "jsr:@std/crypto@0.224.0", "jsr:@std/dotenv@^0.224.0": "jsr:@std/dotenv@0.224.2", "jsr:@std/encoding@^0.221.0": "jsr:@std/encoding@0.221.0", @@ -33,16 +34,20 @@ "jsr:@std/fs@^0.221.0": "jsr:@std/fs@0.221.0", "jsr:@std/fs@^0.229.3": "jsr:@std/fs@0.229.3", "jsr:@std/internal@^1.0.0": "jsr:@std/internal@1.0.1", - "jsr:@std/io@^0.224": "jsr:@std/io@0.224.3", + "jsr:@std/io@^0.224": "jsr:@std/io@0.224.4", + "jsr:@std/json@^0.223.0": "jsr:@std/json@0.223.0", "jsr:@std/media-types@^0.224.1": "jsr:@std/media-types@0.224.1", "jsr:@std/path@0.217": "jsr:@std/path@0.217.0", "jsr:@std/path@^0.221.0": "jsr:@std/path@0.221.0", + "jsr:@std/streams@^0.223.0": "jsr:@std/streams@0.223.0", "npm:@isaacs/ttlcache@^1.4.1": "npm:@isaacs/ttlcache@1.4.1", "npm:@noble/hashes@^1.4.0": "npm:@noble/hashes@1.4.0", "npm:@scure/base@^1.1.6": "npm:@scure/base@1.1.6", "npm:@scure/bip32@^1.4.0": "npm:@scure/bip32@1.4.0", "npm:@scure/bip39@^1.3.0": "npm:@scure/bip39@1.3.0", "npm:@types/node": "npm:@types/node@18.16.19", + "npm:comlink-async-generator": "npm:comlink-async-generator@0.0.1", + "npm:comlink-async-generator@^0.0.1": "npm:comlink-async-generator@0.0.1", "npm:comlink@^4.4.1": "npm:comlink@4.4.1", "npm:entities@^4.5.0": "npm:entities@4.5.0", "npm:fast-stable-stringify@^1.0.0": "npm:fast-stable-stringify@1.0.0", @@ -159,8 +164,8 @@ "npm:zod@^3.23.8" ] }, - "@nostrify/nostrify@0.28.0": { - "integrity": "abaacd679e2a00a4394d60858d67e5b9a11605c785526d7f6ba354ccd8df087d", + "@nostrify/nostrify@0.29.0": { + "integrity": "d0489b62441c891324cce60c14bb398013259494b5ad9d21ec6dfbf0ca7368c9", "dependencies": [ "jsr:@std/crypto@^0.224.0", "jsr:@std/encoding@^0.224.1", @@ -263,6 +268,15 @@ "jsr:@std/bytes@^1.0.1-rc.3" ] }, + "@std/io@0.224.4": { + "integrity": "bce1151765e4e70e376039fd72c71672b4d4aae363878a5ee3e58361b81197ec", + "dependencies": [ + "jsr:@std/bytes@^1.0.2-rc.3" + ] + }, + "@std/json@0.223.0": { + "integrity": "9a4a255931dd0397924c6b10bb6a72fe3e28ddd876b981ada2e3b8dd0764163f" + }, "@std/media-types@0.224.1": { "integrity": "9e69a5daed37c5b5c6d3ce4731dc191f80e67f79bed392b0957d1d03b87f11e1" }, @@ -277,6 +291,9 @@ "dependencies": [ "jsr:@std/assert@^0.221.0" ] + }, + "@std/streams@0.223.0": { + "integrity": "d6b28e498ced3960b04dc5d251f2dcfc1df244b5ec5a48dc23a8f9b490be3b99" } }, "npm": { @@ -439,6 +456,12 @@ "delayed-stream": "delayed-stream@1.0.0" } }, + "comlink-async-generator@0.0.1": { + "integrity": "sha512-RjOPv6Tb7cL9FiIgwanUJuFG9aW4myAFyyzxZoEkEegeDQrZqr92d1Njv2WIgi7nbGpTiyy5GdNTUubDaNgZ6A==", + "dependencies": { + "comlink": "comlink@4.4.1" + } + }, "comlink@4.4.1": { "integrity": "sha512-+1dlx0aY5Jo1vHy/tSsIGpSkN4tS9rZSW8FIhG0JH/crs9wwweswIo/POr451r7bZww3hFbPAKnTpimzL/mm4Q==", "dependencies": {} @@ -1785,7 +1808,7 @@ "jsr:@db/sqlite@^0.11.1", "jsr:@hono/hono@^4.4.6", "jsr:@lambdalisue/async@^2.1.1", - "jsr:@nostrify/nostrify@^0.28.0", + "jsr:@nostrify/nostrify@^0.29.0", "jsr:@soapbox/kysely-deno-sqlite@^2.1.0", "jsr:@soapbox/stickynotes@^0.4.0", "jsr:@std/assert@^0.225.1", @@ -1800,6 +1823,7 @@ "npm:@isaacs/ttlcache@^1.4.1", "npm:@noble/secp256k1@^2.0.0", "npm:@scure/base@^1.1.6", + "npm:comlink-async-generator@^0.0.1", "npm:comlink@^4.4.1", "npm:entities@^4.5.0", "npm:fast-stable-stringify@^1.0.0", diff --git a/scripts/db-export.ts b/scripts/db-export.ts new file mode 100644 index 00000000..fbdac1b7 --- /dev/null +++ b/scripts/db-export.ts @@ -0,0 +1,24 @@ +import { Storages } from '@/storages.ts'; + +const store = await Storages.db(); + +console.warn('Exporting events...'); + +let count = 0; + +for await (const msg of store.req([{}])) { + if (msg[0] === 'EOSE') { + break; + } + if (msg[0] === 'EVENT') { + console.log(JSON.stringify(msg[2])); + count++; + } + if (msg[0] === 'CLOSED') { + console.error('Database closed unexpectedly'); + break; + } +} + +console.warn(`Exported ${count} events`); +Deno.exit(); diff --git a/scripts/db-import.ts b/scripts/db-import.ts new file mode 100644 index 00000000..f33e7e56 --- /dev/null +++ b/scripts/db-import.ts @@ -0,0 +1,24 @@ +import { NostrEvent } from '@nostrify/nostrify'; +import { JsonParseStream } from '@std/json/json-parse-stream'; +import { TextLineStream } from '@std/streams/text-line-stream'; + +import { Storages } from '@/storages.ts'; + +const store = await Storages.db(); + +console.warn('Importing events...'); + +let count = 0; + +const readable = Deno.stdin.readable + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new TextLineStream()) + .pipeThrough(new JsonParseStream()); + +for await (const event of readable) { + await store.event(event as unknown as NostrEvent); + count++; +} + +console.warn(`Imported ${count} events`); +Deno.exit(); diff --git a/src/db/DittoDB.ts b/src/db/DittoDB.ts index 5607cf93..c9756cf7 100644 --- a/src/db/DittoDB.ts +++ b/src/db/DittoDB.ts @@ -62,19 +62,19 @@ export class DittoDB { }), }); - console.info('Running migrations...'); - const results = await migrator.migrateToLatest(); + console.warn('Running migrations...'); + const { results, error } = await migrator.migrateToLatest(); - if (results.error) { - console.error(results.error); + if (error) { + console.error(error); Deno.exit(1); } else { - if (!results.results?.length) { - console.info('Everything up-to-date.'); + if (!results?.length) { + console.warn('Everything up-to-date.'); } else { - console.info('Migrations finished!'); - for (const { migrationName, status } of results.results!) { - console.info(` - ${migrationName}: ${status}`); + console.warn('Migrations finished!'); + for (const { migrationName, status } of results!) { + console.warn(` - ${migrationName}: ${status}`); } } } diff --git a/src/db/KyselyLogger.ts b/src/db/KyselyLogger.ts index 5d285cc6..3b5b4398 100644 --- a/src/db/KyselyLogger.ts +++ b/src/db/KyselyLogger.ts @@ -1,6 +1,6 @@ import { Stickynotes } from '@soapbox/stickynotes'; import { Logger } from 'kysely'; -import { dbQueryTimeHistogram } from '@/metrics.ts'; +import { dbQueryCounter, dbQueryTimeHistogram } from '@/metrics.ts'; /** Log the SQL for queries. */ export const KyselyLogger: Logger = (event) => { @@ -9,6 +9,7 @@ export const KyselyLogger: Logger = (event) => { const { query, queryDurationMillis } = event; const { sql, parameters } = query; + dbQueryCounter.inc(); dbQueryTimeHistogram.observe(queryDurationMillis); console.debug( diff --git a/src/storages/EventsDB.ts b/src/storages/EventsDB.ts index abf076c7..011f5a02 100644 --- a/src/storages/EventsDB.ts +++ b/src/storages/EventsDB.ts @@ -1,13 +1,24 @@ // deno-lint-ignore-file require-await -import { NDatabase, NIP50, NKinds, NostrEvent, NostrFilter, NSchema as n, NStore } from '@nostrify/nostrify'; +import { + NDatabase, + NIP50, + NKinds, + NostrEvent, + NostrFilter, + NostrRelayCLOSED, + NostrRelayEOSE, + NostrRelayEVENT, + NSchema as n, + NStore, +} from '@nostrify/nostrify'; import { Stickynotes } from '@soapbox/stickynotes'; import { Kysely } from 'kysely'; import { nip27 } from 'nostr-tools'; import { Conf } from '@/config.ts'; import { DittoTables } from '@/db/DittoTables.ts'; -import { dbEventCounter, dbQueryCounter } from '@/metrics.ts'; +import { dbEventCounter } from '@/metrics.ts'; import { RelayError } from '@/RelayError.ts'; import { purifyEvent } from '@/storages/hydrate.ts'; import { isNostrId, isURL } from '@/utils.ts'; @@ -137,13 +148,20 @@ class EventsDB implements NStore { } } + /** Stream events from the database. */ + req( + filters: NostrFilter[], + opts: { signal?: AbortSignal } = {}, + ): AsyncIterable { + return this.store.req(filters, opts); + } + /** Get events for filters from the database. */ async query( filters: NostrFilter[], opts: { signal?: AbortSignal; timeout?: number; limit?: number } = {}, ): Promise { filters = await this.expandFilters(filters); - dbQueryCounter.inc(); for (const filter of filters) { if (filter.since && filter.since >= 2_147_483_647) { diff --git a/src/workers/sqlite.ts b/src/workers/sqlite.ts index 37c33b43..154ec556 100644 --- a/src/workers/sqlite.ts +++ b/src/workers/sqlite.ts @@ -1,8 +1,11 @@ import * as Comlink from 'comlink'; +import { asyncGeneratorTransferHandler } from 'comlink-async-generator'; import { CompiledQuery, QueryResult } from 'kysely'; import type { SqliteWorker as _SqliteWorker } from './sqlite.worker.ts'; +Comlink.transferHandlers.set('asyncGenerator', asyncGeneratorTransferHandler); + class SqliteWorker { #worker: Worker; #client: ReturnType>; @@ -33,8 +36,12 @@ class SqliteWorker { return this.#client.executeQuery(query) as Promise>; } - streamQuery(): AsyncIterableIterator { - throw new Error('Streaming queries are not supported in the web worker'); + async *streamQuery(query: CompiledQuery): AsyncIterableIterator> { + await this.#ready; + + for await (const result of await this.#client.streamQuery(query)) { + yield result as QueryResult; + } } destroy(): Promise { diff --git a/src/workers/sqlite.worker.ts b/src/workers/sqlite.worker.ts index 68c70d6c..23839dbd 100644 --- a/src/workers/sqlite.worker.ts +++ b/src/workers/sqlite.worker.ts @@ -2,6 +2,7 @@ import { Database as SQLite } from '@db/sqlite'; import * as Comlink from 'comlink'; import { CompiledQuery, QueryResult } from 'kysely'; +import { asyncGeneratorTransferHandler } from 'comlink-async-generator'; import '@/sentry.ts'; @@ -20,11 +21,22 @@ export const SqliteWorker = { insertId: BigInt(db!.lastInsertRowId), }; }, + async *streamQuery({ sql, parameters }: CompiledQuery): AsyncIterableIterator> { + if (!db) throw new Error('Database not open'); + + const stmt = db.prepare(sql).bind(...parameters as any[]); + for (const row of stmt) { + yield { + rows: [row], + }; + } + }, destroy() { db?.close(); }, }; +Comlink.transferHandlers.set('asyncGenerator', asyncGeneratorTransferHandler); Comlink.expose(SqliteWorker); self.postMessage(['ready']);