Merge branch 'db-export' into 'main'

Add db:export command

See merge request soapbox-pub/ditto!441
This commit is contained in:
Alex Gleason 2024-08-02 22:00:15 +00:00
commit 3e6f4bb256
9 changed files with 134 additions and 21 deletions

View file

@ -5,6 +5,8 @@
"start": "deno run -A src/server.ts",
"dev": "deno run -A --watch src/server.ts",
"hook": "deno run --allow-read --allow-run --allow-write https://deno.land/x/deno_hooks@0.1.1/mod.ts",
"db:export": "deno run -A scripts/db-export.ts",
"db:import": "deno run -A scripts/db-import.ts",
"db:migrate": "deno run -A scripts/db-migrate.ts",
"nostr:pull": "deno run -A scripts/nostr-pull.ts",
"debug": "deno run -A --inspect src/server.ts",
@ -28,7 +30,7 @@
"@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1",
"@lambdalisue/async": "jsr:@lambdalisue/async@^2.1.1",
"@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0",
"@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.28.0",
"@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.29.0",
"@scure/base": "npm:@scure/base@^1.1.6",
"@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs",
"@soapbox/kysely-deno-sqlite": "jsr:@soapbox/kysely-deno-sqlite@^2.1.0",
@ -43,6 +45,7 @@
"@std/media-types": "jsr:@std/media-types@^0.224.1",
"@std/streams": "jsr:@std/streams@^0.223.0",
"comlink": "npm:comlink@^4.4.1",
"comlink-async-generator": "npm:comlink-async-generator@^0.0.1",
"deno-safe-fetch/load": "https://gitlab.com/soapbox-pub/deno-safe-fetch/-/raw/v1.0.0/load.ts",
"deno.json": "./deno.json",
"entities": "npm:entities@^4.5.0",

34
deno.lock generated
View file

@ -14,7 +14,7 @@
"jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.5",
"jsr:@nostrify/nostrify@^0.22.4": "jsr:@nostrify/nostrify@0.22.4",
"jsr:@nostrify/nostrify@^0.22.5": "jsr:@nostrify/nostrify@0.22.5",
"jsr:@nostrify/nostrify@^0.28.0": "jsr:@nostrify/nostrify@0.28.0",
"jsr:@nostrify/nostrify@^0.29.0": "jsr:@nostrify/nostrify@0.29.0",
"jsr:@soapbox/kysely-deno-sqlite@^2.1.0": "jsr:@soapbox/kysely-deno-sqlite@2.2.0",
"jsr:@soapbox/stickynotes@^0.4.0": "jsr:@soapbox/stickynotes@0.4.0",
"jsr:@std/assert@^0.217.0": "jsr:@std/assert@0.217.0",
@ -24,6 +24,7 @@
"jsr:@std/bytes@^0.224.0": "jsr:@std/bytes@0.224.0",
"jsr:@std/bytes@^1.0.0-rc.3": "jsr:@std/bytes@1.0.0",
"jsr:@std/bytes@^1.0.1-rc.3": "jsr:@std/bytes@1.0.2",
"jsr:@std/bytes@^1.0.2-rc.3": "jsr:@std/bytes@1.0.2",
"jsr:@std/crypto@^0.224.0": "jsr:@std/crypto@0.224.0",
"jsr:@std/dotenv@^0.224.0": "jsr:@std/dotenv@0.224.2",
"jsr:@std/encoding@^0.221.0": "jsr:@std/encoding@0.221.0",
@ -33,16 +34,20 @@
"jsr:@std/fs@^0.221.0": "jsr:@std/fs@0.221.0",
"jsr:@std/fs@^0.229.3": "jsr:@std/fs@0.229.3",
"jsr:@std/internal@^1.0.0": "jsr:@std/internal@1.0.1",
"jsr:@std/io@^0.224": "jsr:@std/io@0.224.3",
"jsr:@std/io@^0.224": "jsr:@std/io@0.224.4",
"jsr:@std/json@^0.223.0": "jsr:@std/json@0.223.0",
"jsr:@std/media-types@^0.224.1": "jsr:@std/media-types@0.224.1",
"jsr:@std/path@0.217": "jsr:@std/path@0.217.0",
"jsr:@std/path@^0.221.0": "jsr:@std/path@0.221.0",
"jsr:@std/streams@^0.223.0": "jsr:@std/streams@0.223.0",
"npm:@isaacs/ttlcache@^1.4.1": "npm:@isaacs/ttlcache@1.4.1",
"npm:@noble/hashes@^1.4.0": "npm:@noble/hashes@1.4.0",
"npm:@scure/base@^1.1.6": "npm:@scure/base@1.1.6",
"npm:@scure/bip32@^1.4.0": "npm:@scure/bip32@1.4.0",
"npm:@scure/bip39@^1.3.0": "npm:@scure/bip39@1.3.0",
"npm:@types/node": "npm:@types/node@18.16.19",
"npm:comlink-async-generator": "npm:comlink-async-generator@0.0.1",
"npm:comlink-async-generator@^0.0.1": "npm:comlink-async-generator@0.0.1",
"npm:comlink@^4.4.1": "npm:comlink@4.4.1",
"npm:entities@^4.5.0": "npm:entities@4.5.0",
"npm:fast-stable-stringify@^1.0.0": "npm:fast-stable-stringify@1.0.0",
@ -159,8 +164,8 @@
"npm:zod@^3.23.8"
]
},
"@nostrify/nostrify@0.28.0": {
"integrity": "abaacd679e2a00a4394d60858d67e5b9a11605c785526d7f6ba354ccd8df087d",
"@nostrify/nostrify@0.29.0": {
"integrity": "d0489b62441c891324cce60c14bb398013259494b5ad9d21ec6dfbf0ca7368c9",
"dependencies": [
"jsr:@std/crypto@^0.224.0",
"jsr:@std/encoding@^0.224.1",
@ -263,6 +268,15 @@
"jsr:@std/bytes@^1.0.1-rc.3"
]
},
"@std/io@0.224.4": {
"integrity": "bce1151765e4e70e376039fd72c71672b4d4aae363878a5ee3e58361b81197ec",
"dependencies": [
"jsr:@std/bytes@^1.0.2-rc.3"
]
},
"@std/json@0.223.0": {
"integrity": "9a4a255931dd0397924c6b10bb6a72fe3e28ddd876b981ada2e3b8dd0764163f"
},
"@std/media-types@0.224.1": {
"integrity": "9e69a5daed37c5b5c6d3ce4731dc191f80e67f79bed392b0957d1d03b87f11e1"
},
@ -277,6 +291,9 @@
"dependencies": [
"jsr:@std/assert@^0.221.0"
]
},
"@std/streams@0.223.0": {
"integrity": "d6b28e498ced3960b04dc5d251f2dcfc1df244b5ec5a48dc23a8f9b490be3b99"
}
},
"npm": {
@ -439,6 +456,12 @@
"delayed-stream": "delayed-stream@1.0.0"
}
},
"comlink-async-generator@0.0.1": {
"integrity": "sha512-RjOPv6Tb7cL9FiIgwanUJuFG9aW4myAFyyzxZoEkEegeDQrZqr92d1Njv2WIgi7nbGpTiyy5GdNTUubDaNgZ6A==",
"dependencies": {
"comlink": "comlink@4.4.1"
}
},
"comlink@4.4.1": {
"integrity": "sha512-+1dlx0aY5Jo1vHy/tSsIGpSkN4tS9rZSW8FIhG0JH/crs9wwweswIo/POr451r7bZww3hFbPAKnTpimzL/mm4Q==",
"dependencies": {}
@ -1785,7 +1808,7 @@
"jsr:@db/sqlite@^0.11.1",
"jsr:@hono/hono@^4.4.6",
"jsr:@lambdalisue/async@^2.1.1",
"jsr:@nostrify/nostrify@^0.28.0",
"jsr:@nostrify/nostrify@^0.29.0",
"jsr:@soapbox/kysely-deno-sqlite@^2.1.0",
"jsr:@soapbox/stickynotes@^0.4.0",
"jsr:@std/assert@^0.225.1",
@ -1800,6 +1823,7 @@
"npm:@isaacs/ttlcache@^1.4.1",
"npm:@noble/secp256k1@^2.0.0",
"npm:@scure/base@^1.1.6",
"npm:comlink-async-generator@^0.0.1",
"npm:comlink@^4.4.1",
"npm:entities@^4.5.0",
"npm:fast-stable-stringify@^1.0.0",

24
scripts/db-export.ts Normal file
View file

@ -0,0 +1,24 @@
import { Storages } from '@/storages.ts';
const store = await Storages.db();
console.warn('Exporting events...');
let count = 0;
for await (const msg of store.req([{}])) {
if (msg[0] === 'EOSE') {
break;
}
if (msg[0] === 'EVENT') {
console.log(JSON.stringify(msg[2]));
count++;
}
if (msg[0] === 'CLOSED') {
console.error('Database closed unexpectedly');
break;
}
}
console.warn(`Exported ${count} events`);
Deno.exit();

24
scripts/db-import.ts Normal file
View file

@ -0,0 +1,24 @@
import { NostrEvent } from '@nostrify/nostrify';
import { JsonParseStream } from '@std/json/json-parse-stream';
import { TextLineStream } from '@std/streams/text-line-stream';
import { Storages } from '@/storages.ts';
const store = await Storages.db();
console.warn('Importing events...');
let count = 0;
const readable = Deno.stdin.readable
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream())
.pipeThrough(new JsonParseStream());
for await (const event of readable) {
await store.event(event as unknown as NostrEvent);
count++;
}
console.warn(`Imported ${count} events`);
Deno.exit();

View file

@ -62,19 +62,19 @@ export class DittoDB {
}),
});
console.info('Running migrations...');
const results = await migrator.migrateToLatest();
console.warn('Running migrations...');
const { results, error } = await migrator.migrateToLatest();
if (results.error) {
console.error(results.error);
if (error) {
console.error(error);
Deno.exit(1);
} else {
if (!results.results?.length) {
console.info('Everything up-to-date.');
if (!results?.length) {
console.warn('Everything up-to-date.');
} else {
console.info('Migrations finished!');
for (const { migrationName, status } of results.results!) {
console.info(` - ${migrationName}: ${status}`);
console.warn('Migrations finished!');
for (const { migrationName, status } of results!) {
console.warn(` - ${migrationName}: ${status}`);
}
}
}

View file

@ -1,6 +1,6 @@
import { Stickynotes } from '@soapbox/stickynotes';
import { Logger } from 'kysely';
import { dbQueryTimeHistogram } from '@/metrics.ts';
import { dbQueryCounter, dbQueryTimeHistogram } from '@/metrics.ts';
/** Log the SQL for queries. */
export const KyselyLogger: Logger = (event) => {
@ -9,6 +9,7 @@ export const KyselyLogger: Logger = (event) => {
const { query, queryDurationMillis } = event;
const { sql, parameters } = query;
dbQueryCounter.inc();
dbQueryTimeHistogram.observe(queryDurationMillis);
console.debug(

View file

@ -1,13 +1,24 @@
// deno-lint-ignore-file require-await
import { NDatabase, NIP50, NKinds, NostrEvent, NostrFilter, NSchema as n, NStore } from '@nostrify/nostrify';
import {
NDatabase,
NIP50,
NKinds,
NostrEvent,
NostrFilter,
NostrRelayCLOSED,
NostrRelayEOSE,
NostrRelayEVENT,
NSchema as n,
NStore,
} from '@nostrify/nostrify';
import { Stickynotes } from '@soapbox/stickynotes';
import { Kysely } from 'kysely';
import { nip27 } from 'nostr-tools';
import { Conf } from '@/config.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { dbEventCounter, dbQueryCounter } from '@/metrics.ts';
import { dbEventCounter } from '@/metrics.ts';
import { RelayError } from '@/RelayError.ts';
import { purifyEvent } from '@/storages/hydrate.ts';
import { isNostrId, isURL } from '@/utils.ts';
@ -137,13 +148,20 @@ class EventsDB implements NStore {
}
}
/** Stream events from the database. */
req(
filters: NostrFilter[],
opts: { signal?: AbortSignal } = {},
): AsyncIterable<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
return this.store.req(filters, opts);
}
/** Get events for filters from the database. */
async query(
filters: NostrFilter[],
opts: { signal?: AbortSignal; timeout?: number; limit?: number } = {},
): Promise<NostrEvent[]> {
filters = await this.expandFilters(filters);
dbQueryCounter.inc();
for (const filter of filters) {
if (filter.since && filter.since >= 2_147_483_647) {

View file

@ -1,8 +1,11 @@
import * as Comlink from 'comlink';
import { asyncGeneratorTransferHandler } from 'comlink-async-generator';
import { CompiledQuery, QueryResult } from 'kysely';
import type { SqliteWorker as _SqliteWorker } from './sqlite.worker.ts';
Comlink.transferHandlers.set('asyncGenerator', asyncGeneratorTransferHandler);
class SqliteWorker {
#worker: Worker;
#client: ReturnType<typeof Comlink.wrap<typeof _SqliteWorker>>;
@ -33,8 +36,12 @@ class SqliteWorker {
return this.#client.executeQuery(query) as Promise<QueryResult<R>>;
}
streamQuery<R>(): AsyncIterableIterator<R> {
throw new Error('Streaming queries are not supported in the web worker');
async *streamQuery<R>(query: CompiledQuery): AsyncIterableIterator<QueryResult<R>> {
await this.#ready;
for await (const result of await this.#client.streamQuery(query)) {
yield result as QueryResult<R>;
}
}
destroy(): Promise<void> {

View file

@ -2,6 +2,7 @@
import { Database as SQLite } from '@db/sqlite';
import * as Comlink from 'comlink';
import { CompiledQuery, QueryResult } from 'kysely';
import { asyncGeneratorTransferHandler } from 'comlink-async-generator';
import '@/sentry.ts';
@ -20,11 +21,22 @@ export const SqliteWorker = {
insertId: BigInt(db!.lastInsertRowId),
};
},
async *streamQuery<R>({ sql, parameters }: CompiledQuery): AsyncIterableIterator<QueryResult<R>> {
if (!db) throw new Error('Database not open');
const stmt = db.prepare(sql).bind(...parameters as any[]);
for (const row of stmt) {
yield {
rows: [row],
};
}
},
destroy() {
db?.close();
},
};
Comlink.transferHandlers.set('asyncGenerator', asyncGeneratorTransferHandler);
Comlink.expose(SqliteWorker);
self.postMessage(['ready']);