Merge branch 'main' into feat-domain-api

(((Update local branch)))
This commit is contained in:
P. Reis 2024-08-13 12:16:51 -03:00
commit 9b74a8c5e8
25 changed files with 349 additions and 181 deletions

View file

@ -30,7 +30,7 @@
"@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1",
"@lambdalisue/async": "jsr:@lambdalisue/async@^2.1.1",
"@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0",
"@nostrify/db": "jsr:@nostrify/db@^0.30.0",
"@nostrify/db": "jsr:@nostrify/db@^0.31.2",
"@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.30.0",
"@scure/base": "npm:@scure/base@^1.1.6",
"@sentry/deno": "https://deno.land/x/sentry@7.112.2/index.mjs",

16
deno.lock generated
View file

@ -9,9 +9,9 @@
"jsr:@gleasonator/policy@0.2.0": "jsr:@gleasonator/policy@0.2.0",
"jsr:@gleasonator/policy@0.4.0": "jsr:@gleasonator/policy@0.4.0",
"jsr:@gleasonator/policy@0.4.1": "jsr:@gleasonator/policy@0.4.1",
"jsr:@hono/hono@^4.4.6": "jsr:@hono/hono@4.5.3",
"jsr:@hono/hono@^4.4.6": "jsr:@hono/hono@4.5.5",
"jsr:@lambdalisue/async@^2.1.1": "jsr:@lambdalisue/async@2.1.1",
"jsr:@nostrify/db@^0.30.0": "jsr:@nostrify/db@0.30.0",
"jsr:@nostrify/db@^0.31.2": "jsr:@nostrify/db@0.31.2",
"jsr:@nostrify/nostrify@^0.22.1": "jsr:@nostrify/nostrify@0.22.5",
"jsr:@nostrify/nostrify@^0.22.4": "jsr:@nostrify/nostrify@0.22.4",
"jsr:@nostrify/nostrify@^0.22.5": "jsr:@nostrify/nostrify@0.22.5",
@ -135,11 +135,17 @@
"@hono/hono@4.5.3": {
"integrity": "429923b2b3c6586a1450862328d61a1346fee5841e8ae86c494250475057213c"
},
"@hono/hono@4.5.4": {
"integrity": "3792780b8460d5df0959b07c059db9325e4fa1a49f8b5aff7ab9bc870bdec8e3"
},
"@hono/hono@4.5.5": {
"integrity": "e5a63b5f535475cd80974b65fed23a138d0cbb91fe1cc9a17a7c7278e835c308"
},
"@lambdalisue/async@2.1.1": {
"integrity": "1fc9bc6f4ed50215cd2f7217842b18cea80f81c25744f88f8c5eb4be5a1c9ab4"
},
"@nostrify/db@0.30.0": {
"integrity": "a75ba78be89d57c54c3d47e9e94c7142817c5b50daec27bf7f9a4af4629be20b",
"@nostrify/db@0.31.2": {
"integrity": "a906b64edbf84a6b482cd7c9f5df2d2237c4ec42589116097d99ceb41347b1f5",
"dependencies": [
"jsr:@nostrify/nostrify@^0.30.0",
"jsr:@nostrify/types@^0.30.0",
@ -1827,7 +1833,7 @@
"jsr:@db/sqlite@^0.11.1",
"jsr:@hono/hono@^4.4.6",
"jsr:@lambdalisue/async@^2.1.1",
"jsr:@nostrify/db@^0.30.0",
"jsr:@nostrify/db@^0.31.2",
"jsr:@nostrify/nostrify@^0.30.0",
"jsr:@soapbox/kysely-deno-sqlite@^2.1.0",
"jsr:@soapbox/stickynotes@^0.4.0",

View file

@ -9,8 +9,8 @@ import { nostrNow } from '@/utils.ts';
const signer = new AdminSigner();
const kysely = await DittoDB.getInstance();
const eventsDB = new EventsDB(kysely);
const db = await DittoDB.getInstance();
const eventsDB = new EventsDB(db);
const readable = Deno.stdin.readable
.pipeThrough(new TextDecoderStream())

View file

@ -6,8 +6,8 @@ import { AdminSigner } from '@/signers/AdminSigner.ts';
import { EventsDB } from '@/storages/EventsDB.ts';
import { nostrNow } from '@/utils.ts';
const kysely = await DittoDB.getInstance();
const eventsDB = new EventsDB(kysely);
const db = await DittoDB.getInstance();
const eventsDB = new EventsDB(db);
const [pubkeyOrNpub, role] = Deno.args;
const pubkey = pubkeyOrNpub.startsWith('npub1') ? nip19.decode(pubkeyOrNpub as `npub1${string}`).data : pubkeyOrNpub;

View file

@ -7,7 +7,10 @@ if (Deno.env.get('CI') && Conf.db.dialect === 'postgres') {
await sleep(1_000);
}
const kysely = await DittoDB.getInstance();
// This migrates kysely internally.
const { kysely } = await DittoDB.getInstance();
// Close the connection before exiting.
await kysely.destroy();
Deno.exit();

View file

@ -9,8 +9,8 @@ import { nip19 } from 'nostr-tools';
import { DittoDB } from '@/db/DittoDB.ts';
import { EventsDB } from '@/storages/EventsDB.ts';
const kysely = await DittoDB.getInstance();
const eventsDB = new EventsDB(kysely);
const db = await DittoDB.getInstance();
const eventsDB = new EventsDB(db);
interface ImportEventsOpts {
profilesOnly: boolean;

View file

@ -82,7 +82,7 @@ const createTokenController: AppController = async (c) => {
async function getToken(
{ pubkey, secret, relays = [] }: { pubkey: string; secret?: string; relays?: string[] },
): Promise<`token1${string}`> {
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
const token = generateToken();
const serverSeckey = generateSecretKey();

View file

@ -63,7 +63,7 @@ const statusController: AppController = async (c) => {
const createStatusController: AppController = async (c) => {
const body = await parseBody(c.req.raw);
const result = createStatusSchema.safeParse(body);
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
const store = c.get('store');
if (!result.success) {
@ -565,9 +565,9 @@ const zappedByController: AppController = async (c) => {
const id = c.req.param('id');
const params = c.get('listPagination');
const store = await Storages.db();
const db = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
const zaps = await db.selectFrom('event_zaps')
const zaps = await kysely.selectFrom('event_zaps')
.selectAll()
.where('target_event_id', '=', id)
.orderBy('amount_millisats', 'desc')

View file

@ -189,7 +189,7 @@ async function topicToFilter(
async function getTokenPubkey(token: string): Promise<string | undefined> {
if (token.startsWith('token1')) {
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
const { user_pubkey } = await kysely
.selectFrom('nip46_tokens')

View file

@ -106,6 +106,8 @@ function connectStream(socket: WebSocket, ip: string | undefined) {
} catch (e) {
if (e instanceof RelayError) {
send(['CLOSED', subId, e.message]);
} else if (e.message.includes('timeout')) {
send(['CLOSED', subId, 'error: the relay could not respond fast enough']);
} else {
send(['CLOSED', subId, 'error: something went wrong']);
}

View file

@ -1,6 +1,7 @@
import fs from 'node:fs/promises';
import path from 'node:path';
import { NDatabaseSchema, NPostgresSchema } from '@nostrify/db';
import { FileMigrationProvider, Kysely, Migrator } from 'kysely';
import { Conf } from '@/config.ts';
@ -8,33 +9,43 @@ import { DittoPostgres } from '@/db/adapters/DittoPostgres.ts';
import { DittoSQLite } from '@/db/adapters/DittoSQLite.ts';
import { DittoTables } from '@/db/DittoTables.ts';
export type DittoDatabase = {
dialect: 'sqlite';
kysely: Kysely<DittoTables> & Kysely<NDatabaseSchema>;
} | {
dialect: 'postgres';
kysely: Kysely<DittoTables> & Kysely<NPostgresSchema>;
};
export class DittoDB {
private static kysely: Promise<Kysely<DittoTables>> | undefined;
private static db: Promise<DittoDatabase> | undefined;
static getInstance(): Promise<Kysely<DittoTables>> {
if (!this.kysely) {
this.kysely = this._getInstance();
static getInstance(): Promise<DittoDatabase> {
if (!this.db) {
this.db = this._getInstance();
}
return this.kysely;
return this.db;
}
static async _getInstance(): Promise<Kysely<DittoTables>> {
let kysely: Kysely<DittoTables>;
static async _getInstance(): Promise<DittoDatabase> {
const result = {} as DittoDatabase;
switch (Conf.db.dialect) {
case 'sqlite':
kysely = await DittoSQLite.getInstance();
result.dialect = 'sqlite';
result.kysely = await DittoSQLite.getInstance();
break;
case 'postgres':
kysely = await DittoPostgres.getInstance();
result.dialect = 'postgres';
result.kysely = await DittoPostgres.getInstance();
break;
default:
throw new Error('Unsupported database URL.');
}
await this.migrate(kysely);
await this.migrate(result.kysely);
return kysely;
return result;
}
static get poolSize(): number {
@ -52,7 +63,7 @@ export class DittoDB {
}
/** Migrate the database to the latest version. */
static async migrate(kysely: Kysely<DittoTables>) {
static async migrate(kysely: DittoDatabase['kysely']) {
const migrator = new Migrator({
db: kysely,
provider: new FileMigrationProvider({

View file

@ -1,7 +1,4 @@
export interface DittoTables {
nostr_events: EventRow;
nostr_tags: TagRow;
nostr_fts5: EventFTSRow;
nip46_tokens: NIP46TokenRow;
unattached_media: UnattachedMediaRow;
author_stats: AuthorStatsRow;
@ -27,30 +24,6 @@ interface EventStatsRow {
zaps_amount: number;
}
interface EventRow {
id: string;
kind: number;
pubkey: string;
content: string;
created_at: number;
tags: string;
sig: string;
}
interface EventFTSRow {
event_id: string;
content: string;
}
interface TagRow {
event_id: string;
name: string;
value: string;
kind: number;
pubkey: string;
created_at: number;
}
interface NIP46TokenRow {
api_token: string;
user_pubkey: string;

View file

@ -1,3 +1,4 @@
import { NPostgresSchema } from '@nostrify/db';
import {
BinaryOperationNode,
FunctionNode,
@ -17,11 +18,11 @@ import { DittoTables } from '@/db/DittoTables.ts';
import { KyselyLogger } from '@/db/KyselyLogger.ts';
export class DittoPostgres {
static db: Kysely<DittoTables> | undefined;
static db: Kysely<DittoTables> & Kysely<NPostgresSchema> | undefined;
static postgres?: postgres.Sql;
// deno-lint-ignore require-await
static async getInstance(): Promise<Kysely<DittoTables>> {
static async getInstance(): Promise<Kysely<DittoTables> & Kysely<NPostgresSchema>> {
if (!this.postgres) {
this.postgres = postgres(Conf.databaseUrl, { max: Conf.pg.poolSize });
}
@ -45,7 +46,7 @@ export class DittoPostgres {
},
},
log: KyselyLogger,
});
}) as Kysely<DittoTables> & Kysely<NPostgresSchema>;
}
return this.db;

View file

@ -1,3 +1,4 @@
import { NDatabaseSchema } from '@nostrify/db';
import { PolySqliteDialect } from '@soapbox/kysely-deno-sqlite';
import { Kysely, sql } from 'kysely';
@ -7,19 +8,19 @@ import { KyselyLogger } from '@/db/KyselyLogger.ts';
import SqliteWorker from '@/workers/sqlite.ts';
export class DittoSQLite {
static db: Kysely<DittoTables> | undefined;
static db: Kysely<DittoTables> & Kysely<NDatabaseSchema> | undefined;
static async getInstance(): Promise<Kysely<DittoTables>> {
static async getInstance(): Promise<Kysely<DittoTables> & Kysely<NDatabaseSchema>> {
if (!this.db) {
const sqliteWorker = new SqliteWorker();
await sqliteWorker.open(this.path);
this.db = new Kysely<DittoTables>({
this.db = new Kysely({
dialect: new PolySqliteDialect({
database: sqliteWorker,
}),
log: KyselyLogger,
});
}) as Kysely<DittoTables> & Kysely<NDatabaseSchema>;
// Set PRAGMA values.
await Promise.all([

View file

@ -0,0 +1,141 @@
import { Kysely, sql } from 'kysely';
import { Conf } from '@/config.ts';
export async function up(db: Kysely<any>): Promise<void> {
if (Conf.db.dialect !== 'postgres') return;
// Create new table and indexes.
await db.schema
.createTable('nostr_events_new')
.addColumn('id', 'char(64)', (col) => col.primaryKey())
.addColumn('kind', 'integer', (col) => col.notNull())
.addColumn('pubkey', 'char(64)', (col) => col.notNull())
.addColumn('content', 'text', (col) => col.notNull())
.addColumn('created_at', 'bigint', (col) => col.notNull())
.addColumn('tags', 'jsonb', (col) => col.notNull())
.addColumn('tags_index', 'jsonb', (col) => col.notNull())
.addColumn('sig', 'char(128)', (col) => col.notNull())
.addColumn('d', 'text')
.addColumn('search', sql`tsvector`)
.addCheckConstraint('nostr_events_kind_chk', sql`kind >= 0`)
.addCheckConstraint('nostr_events_created_chk', sql`created_at >= 0`)
.addCheckConstraint(
'nostr_events_d_chk',
sql`(kind >= 30000 and kind < 40000 and d is not null) or ((kind < 30000 or kind >= 40000) and d is null)`,
)
.ifNotExists()
.execute();
await db.schema
.createIndex('nostr_events_created_kind_idx')
.on('nostr_events_new')
.columns(['created_at desc', 'id asc', 'kind', 'pubkey'])
.ifNotExists()
.execute();
await db.schema
.createIndex('nostr_events_pubkey_created_idx')
.on('nostr_events_new')
.columns(['pubkey', 'created_at desc', 'id asc', 'kind'])
.ifNotExists()
.execute();
await db.schema
.createIndex('nostr_events_tags_idx').using('gin')
.on('nostr_events_new')
.column('tags_index')
.ifNotExists()
.execute();
await db.schema
.createIndex('nostr_events_replaceable_idx')
.on('nostr_events_new')
.columns(['kind', 'pubkey'])
.where(() => sql`kind >= 10000 and kind < 20000 or (kind in (0, 3))`)
.unique()
.ifNotExists()
.execute();
await db.schema
.createIndex('nostr_events_parameterized_idx')
.on('nostr_events_new')
.columns(['kind', 'pubkey', 'd'])
.where(() => sql`kind >= 30000 and kind < 40000`)
.unique()
.ifNotExists()
.execute();
await db.schema
.createIndex('nostr_events_search_idx').using('gin')
.on('nostr_events_new')
.column('search')
.ifNotExists()
.execute();
let iid: number | undefined;
const tid = setTimeout(() => {
console.warn(`Recreating the database to improve performance. This will take several hours.
If you don't want to wait, you can create a fresh database and then import your old events:
1. Revert to a prior commit: e789e08c
2. Export your events: "deno task db:export > events.jsonl"
3. Checkout the latest commit: "git checkout main && git pull"
4. Drop your old database: "dropdb ditto"
5. Create a new database: "createdb ditto"
6. Start Ditto
7. While Ditto is running, import your events: "cat events.jsonl | deno task db:import"`);
const emojis = ['⚡', '🐛', '🔎', '😂', '😅', '😬', '😭', '🙃', '🤔', '🧐', '🧐', '🫠'];
iid = setInterval(() => {
const emoji = emojis[Math.floor(Math.random() * emojis.length)];
console.info(`Recreating the database... ${emoji}`);
}, 60_000);
}, 10_000);
// Copy data to the new table.
await sql`
INSERT INTO nostr_events_new(id, kind, pubkey, content, created_at, tags, sig, d, tags_index, search)
SELECT
e.id,
e.kind,
e.pubkey,
e.content,
e.created_at,
e.tags::jsonb,
e.sig,
t_agg.tags_index->'d'->>0 as d,
COALESCE(t_agg.tags_index, '{}'::jsonb) as tags_index,
fts.search_vec
FROM
nostr_events AS e
LEFT JOIN
(SELECT event_id, jsonb_object_agg(name, values_array) as tags_index
FROM (
SELECT event_id, name, jsonb_agg(value) as values_array
FROM nostr_tags
GROUP BY event_id, name
) sub
GROUP BY event_id) AS t_agg ON e.id = t_agg.event_id
LEFT JOIN
nostr_pgfts AS fts ON e.id = fts.event_id
WHERE
(e.kind >= 30000 AND e.kind < 40000 AND t_agg.tags_index->'d'->>0 IS NOT NULL)
OR ((e.kind < 30000 OR e.kind >= 40000) AND t_agg.tags_index->'d'->>0 IS NULL)
ON CONFLICT DO NOTHING;
`.execute(db);
clearTimeout(tid);
if (iid) clearInterval(iid);
await db.schema.dropTable('nostr_events').cascade().execute();
await db.schema.dropTable('nostr_tags').execute();
await db.schema.dropTable('nostr_pgfts').execute();
await db.schema.alterTable('nostr_events_new').renameTo('nostr_events').execute();
}
export function down(_db: Kysely<any>): Promise<void> {
throw new Error("Sorry, you can't migrate back from here.");
}

View file

@ -14,7 +14,7 @@ interface UnattachedMedia {
/** Add unattached media into the database. */
async function insertUnattachedMedia(media: UnattachedMedia) {
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
await kysely.insertInto('unattached_media')
.values({ ...media, data: JSON.stringify(media.data) })
.execute();
@ -34,17 +34,9 @@ function selectUnattachedMediaQuery(kysely: Kysely<DittoTables>) {
]);
}
/** Find attachments that exist but aren't attached to any events. */
function getUnattachedMedia(kysely: Kysely<DittoTables>, until: Date) {
return selectUnattachedMediaQuery(kysely)
.leftJoin('nostr_tags', 'unattached_media.url', 'nostr_tags.value')
.where('uploaded_at', '<', until.getTime())
.execute();
}
/** Delete unattached media by URL. */
async function deleteUnattachedMediaByUrl(url: string) {
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
return kysely.deleteFrom('unattached_media')
.where('url', '=', url)
.execute();
@ -67,7 +59,7 @@ async function getUnattachedMediaByIds(kysely: Kysely<DittoTables>, ids: string[
/** Delete rows as an event with media is being created. */
async function deleteAttachedMedia(pubkey: string, urls: string[]): Promise<void> {
if (!urls.length) return;
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
await kysely.deleteFrom('unattached_media')
.where('pubkey', '=', pubkey)
.where('url', 'in', urls)
@ -77,7 +69,6 @@ async function deleteAttachedMedia(pubkey: string, urls: string[]): Promise<void
export {
deleteAttachedMedia,
deleteUnattachedMediaByUrl,
getUnattachedMedia,
getUnattachedMediaByIds,
insertUnattachedMedia,
type UnattachedMedia,

View file

@ -20,7 +20,7 @@ export const signerMiddleware: AppMiddleware = async (c, next) => {
if (bech32.startsWith('token1')) {
try {
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
const { user_pubkey, server_seckey, relays } = await kysely
.selectFrom('nip46_tokens')

View file

@ -37,7 +37,7 @@ Deno.test('store one zap receipt in nostr_events; convert it into event_zaps tab
await handleZaps(kysely, event);
await handleZaps(kysely, event);
const zapReceipts = await kysely.selectFrom('nostr_events').selectAll().execute();
const zapReceipts = await db.store.query([{}]);
const customEventZaps = await kysely.selectFrom('event_zaps').selectAll().execute();
assertEquals(zapReceipts.length, 1); // basic check

View file

@ -19,8 +19,8 @@ import { verifyEventWorker } from '@/workers/verify.ts';
import { nip05Cache } from '@/utils/nip05.ts';
import { updateStats } from '@/utils/stats.ts';
import { getTagSet } from '@/utils/tags.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { getAmount } from '@/utils/bolt11.ts';
import { DittoTables } from '@/db/DittoTables.ts';
const debug = Debug('ditto:pipeline');
@ -54,7 +54,7 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
throw new RelayError('blocked', 'user is disabled');
}
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
await Promise.all([
storeEvent(event, signal),
@ -106,7 +106,7 @@ async function existsInDB(event: DittoEvent): Promise<boolean> {
async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<void> {
await hydrateEvents({ events: [event], store: await Storages.db(), signal });
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
const domain = await kysely
.selectFrom('pubkey_domains')
.select('domain')
@ -120,7 +120,7 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<voi
async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise<undefined> {
if (NKinds.ephemeral(event.kind)) return;
const store = await Storages.db();
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
await updateStats({ event, store, kysely }).catch(debug);
await store.event(event, { signal });
@ -148,7 +148,7 @@ async function parseMetadata(event: NostrEvent, signal: AbortSignal): Promise<vo
// Track pubkey domain.
try {
const kysely = await DittoDB.getInstance();
const { kysely } = await DittoDB.getInstance();
const { domain } = parseNip05(nip05);
await sql`

View file

@ -20,8 +20,8 @@ export class Storages {
public static async db(): Promise<EventsDB> {
if (!this._db) {
this._db = (async () => {
const kysely = await DittoDB.getInstance();
const store = new EventsDB(kysely);
const db = await DittoDB.getInstance();
const store = new EventsDB(db);
await seedZapSplits(store);
return store;
})();

View file

@ -58,9 +58,11 @@ Deno.test('delete events', async () => {
await using db = await createTestDB();
const { store } = db;
const sk = generateSecretKey();
const [one, two] = [
{ id: '1', kind: 1, pubkey: 'abc', content: 'hello world', created_at: 1, sig: '', tags: [] },
{ id: '2', kind: 1, pubkey: 'abc', content: 'yolo fam', created_at: 2, sig: '', tags: [] },
genEvent({ kind: 1, content: 'hello world', created_at: 1 }, sk),
genEvent({ kind: 1, content: 'yolo fam', created_at: 2 }, sk),
];
await store.event(one);
@ -69,15 +71,9 @@ Deno.test('delete events', async () => {
// Sanity check
assertEquals(await store.query([{ kinds: [1] }]), [two, one]);
await store.event({
kind: 5,
pubkey: one.pubkey,
tags: [['e', one.id]],
created_at: 0,
content: '',
id: '',
sig: '',
});
await store.event(
genEvent({ kind: 5, tags: [['e', one.id]] }, sk),
);
assertEquals(await store.query([{ kinds: [1] }]), [two]);
});
@ -86,21 +82,15 @@ Deno.test("user cannot delete another user's event", async () => {
await using db = await createTestDB();
const { store } = db;
const event = { id: '1', kind: 1, pubkey: 'abc', content: 'hello world', created_at: 1, sig: '', tags: [] };
const event = genEvent({ kind: 1, content: 'hello world', created_at: 1 });
await store.event(event);
// Sanity check
assertEquals(await store.query([{ kinds: [1] }]), [event]);
await store.event({
kind: 5,
pubkey: 'def', // different pubkey
tags: [['e', event.id]],
created_at: 0,
content: '',
id: '',
sig: '',
});
await store.event(
genEvent({ kind: 5, tags: [['e', event.id]] }), // different sk
);
assertEquals(await store.query([{ kinds: [1] }]), [event]);
});
@ -109,9 +99,11 @@ Deno.test('admin can delete any event', async () => {
await using db = await createTestDB();
const { store } = db;
const sk = generateSecretKey();
const [one, two] = [
{ id: '1', kind: 1, pubkey: 'abc', content: 'hello world', created_at: 1, sig: '', tags: [] },
{ id: '2', kind: 1, pubkey: 'abc', content: 'yolo fam', created_at: 2, sig: '', tags: [] },
genEvent({ kind: 1, content: 'hello world', created_at: 1 }, sk),
genEvent({ kind: 1, content: 'yolo fam', created_at: 2 }, sk),
];
await store.event(one);
@ -120,15 +112,9 @@ Deno.test('admin can delete any event', async () => {
// Sanity check
assertEquals(await store.query([{ kinds: [1] }]), [two, one]);
await store.event({
kind: 5,
pubkey: Conf.pubkey, // Admin pubkey
tags: [['e', one.id]],
created_at: 0,
content: '',
id: '',
sig: '',
});
await store.event(
genEvent({ kind: 5, tags: [['e', one.id]] }, Conf.seckey), // admin sk
);
assertEquals(await store.query([{ kinds: [1] }]), [two]);
});
@ -173,14 +159,15 @@ Deno.test('inserting replaceable events', async () => {
await using db = await createTestDB();
const { store } = db;
const event = await eventFixture('event-0');
const sk = generateSecretKey();
const event = genEvent({ kind: 0, created_at: 100 }, sk);
await store.event(event);
const olderEvent = { ...event, id: '123', created_at: event.created_at - 1 };
const olderEvent = genEvent({ kind: 0, created_at: 50 }, sk);
await store.event(olderEvent);
assertEquals(await store.query([{ kinds: [0], authors: [event.pubkey] }]), [event]);
const newerEvent = { ...event, id: '123', created_at: event.created_at + 1 };
const newerEvent = genEvent({ kind: 0, created_at: 999 }, sk);
await store.event(newerEvent);
assertEquals(await store.query([{ kinds: [0] }]), [newerEvent]);
});

View file

@ -1,6 +1,6 @@
// deno-lint-ignore-file require-await
import { NDatabase } from '@nostrify/db';
import { NDatabase, NPostgres } from '@nostrify/db';
import {
NIP50,
NKinds,
@ -13,11 +13,10 @@ import {
NStore,
} from '@nostrify/nostrify';
import { Stickynotes } from '@soapbox/stickynotes';
import { Kysely } from 'kysely';
import { nip27 } from 'nostr-tools';
import { Conf } from '@/config.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { DittoDatabase } from '@/db/DittoDB.ts';
import { dbEventCounter } from '@/metrics.ts';
import { RelayError } from '@/RelayError.ts';
import { purifyEvent } from '@/storages/hydrate.ts';
@ -33,7 +32,7 @@ type TagCondition = ({ event, count, value }: {
/** SQLite database storage adapter for Nostr events. */
class EventsDB implements NStore {
private store: NDatabase;
private store: NDatabase | NPostgres;
private console = new Stickynotes('ditto:db:events');
/** Conditions for when to index certain tags. */
@ -53,14 +52,22 @@ class EventsDB implements NStore {
't': ({ event, count, value }) => (event.kind === 1985 ? count < 20 : count < 5) && value.length < 50,
};
constructor(private kysely: Kysely<DittoTables>) {
constructor(private database: DittoDatabase) {
const { dialect, kysely } = database;
if (dialect === 'postgres') {
this.store = new NPostgres(kysely, {
indexTags: EventsDB.indexTags,
indexSearch: EventsDB.searchText,
});
} else {
this.store = new NDatabase(kysely, {
fts: Conf.db.dialect,
timeoutStrategy: Conf.db.dialect === 'postgres' ? 'setStatementTimeout' : undefined,
fts: 'sqlite',
indexTags: EventsDB.indexTags,
searchText: EventsDB.searchText,
});
}
}
/** Insert an event (and its tags) into the database. */
async event(event: NostrEvent, opts: { signal?: AbortSignal; timeout?: number } = {}): Promise<void> {
@ -279,7 +286,7 @@ class EventsDB implements NStore {
) as { key: 'domain'; value: string } | undefined)?.value;
if (domain) {
const query = this.kysely
const query = this.database.kysely
.selectFrom('pubkey_domains')
.select('pubkey')
.where('domain', '=', domain);

View file

@ -18,7 +18,7 @@ interface HydrateOpts {
/** Hydrate events using the provided storage. */
async function hydrateEvents(opts: HydrateOpts): Promise<DittoEvent[]> {
const { events, store, signal, kysely = await DittoDB.getInstance() } = opts;
const { events, store, signal, kysely = (await DittoDB.getInstance()).kysely } = opts;
if (!events.length) {
return events;
@ -59,8 +59,8 @@ async function hydrateEvents(opts: HydrateOpts): Promise<DittoEvent[]> {
}
const stats = {
authors: await gatherAuthorStats(cache, kysely),
events: await gatherEventStats(cache, kysely),
authors: await gatherAuthorStats(cache, kysely as Kysely<DittoTables>),
events: await gatherEventStats(cache, kysely as Kysely<DittoTables>),
};
// Dedupe events.

View file

@ -2,7 +2,7 @@ import fs from 'node:fs/promises';
import path from 'node:path';
import { Database as Sqlite } from '@db/sqlite';
import { NDatabase } from '@nostrify/db';
import { NDatabase, NDatabaseSchema, NPostgresSchema } from '@nostrify/db';
import { NostrEvent } from '@nostrify/nostrify';
import { DenoSqlite3Dialect } from '@soapbox/kysely-deno-sqlite';
import { finalizeEvent, generateSecretKey } from 'nostr-tools';
@ -10,7 +10,7 @@ import { FileMigrationProvider, Kysely, Migrator } from 'kysely';
import { PostgresJSDialect, PostgresJSDialectConfig } from 'kysely-postgres-js';
import postgres from 'postgres';
import { DittoDB } from '@/db/DittoDB.ts';
import { DittoDatabase, DittoDB } from '@/db/DittoDB.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { purifyEvent } from '@/storages/hydrate.ts';
import { KyselyLogger } from '@/db/KyselyLogger.ts';
@ -99,7 +99,7 @@ export const createTestDB = async (databaseUrl?: string) => {
console.warn(`Using: ${dialect}`);
let kysely: Kysely<DittoTables>;
const db: DittoDatabase = { dialect } as DittoDatabase;
if (dialect === 'sqlite') {
// migration 021_pgfts_index.ts calls 'Conf.db.dialect',
@ -107,13 +107,13 @@ export const createTestDB = async (databaseUrl?: string) => {
// The following line ensures to NOT use the DATABASE_URL that may exist in an .env file.
Deno.env.set('DATABASE_URL', 'sqlite://:memory:');
kysely = new Kysely<DittoTables>({
db.kysely = new Kysely({
dialect: new DenoSqlite3Dialect({
database: new Sqlite(':memory:'),
}),
});
}) as Kysely<DittoTables> & Kysely<NDatabaseSchema>;
} else {
kysely = new Kysely({
db.kysely = new Kysely({
// @ts-ignore Kysely version mismatch.
dialect: new PostgresJSDialect({
postgres: postgres(Conf.databaseUrl, {
@ -121,15 +121,16 @@ export const createTestDB = async (databaseUrl?: string) => {
}) as unknown as PostgresJSDialectConfig['postgres'],
}),
log: KyselyLogger,
});
}) as Kysely<DittoTables> & Kysely<NPostgresSchema>;
}
await DittoDB.migrate(kysely);
const store = new EventsDB(kysely);
await DittoDB.migrate(db.kysely);
const store = new EventsDB(db);
return {
dialect,
store,
kysely,
kysely: db.kysely,
[Symbol.asyncDispose]: async () => {
if (dialect === 'postgres') {
for (
@ -148,9 +149,9 @@ export const createTestDB = async (databaseUrl?: string) => {
'event_zaps',
]
) {
await kysely.schema.dropTable(table).ifExists().cascade().execute();
await db.kysely.schema.dropTable(table).ifExists().cascade().execute();
}
await kysely.destroy();
await db.kysely.destroy();
}
},
};

View file

@ -1,10 +1,9 @@
import { NostrFilter } from '@nostrify/nostrify';
import { Stickynotes } from '@soapbox/stickynotes';
import { Kysely } from 'kysely';
import { sql } from 'kysely';
import { Conf } from '@/config.ts';
import { DittoDB } from '@/db/DittoDB.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { DittoDatabase, DittoDB } from '@/db/DittoDB.ts';
import { handleEvent } from '@/pipeline.ts';
import { AdminSigner } from '@/signers/AdminSigner.ts';
import { Time } from '@/utils/time.ts';
@ -14,12 +13,54 @@ const console = new Stickynotes('ditto:trends');
/** Get trending tag values for a given tag in the given time frame. */
export async function getTrendingTagValues(
/** Kysely instance to execute queries on. */
kysely: Kysely<DittoTables>,
{ dialect, kysely }: DittoDatabase,
/** Tag name to filter by, eg `t` or `r`. */
tagNames: string[],
/** Filter of eligible events. */
filter: NostrFilter,
): Promise<{ value: string; authors: number; uses: number }[]> {
if (dialect === 'postgres') {
let query = kysely
.selectFrom([
'nostr_events',
sql<{ key: string; value: string }>`jsonb_each_text(nostr_events.tags_index)`.as('kv'),
sql<{ key: string; value: string }>`jsonb_array_elements_text(kv.value::jsonb)`.as('element'),
])
.select(({ fn }) => [
fn<string>('lower', ['element.value']).as('value'),
fn.agg<number>('count', ['nostr_events.pubkey']).distinct().as('authors'),
fn.countAll<number>().as('uses'),
])
.where('kv.key', '=', (eb) => eb.fn.any(eb.val(tagNames)))
.groupBy((eb) => eb.fn<string>('lower', ['element.value']))
.orderBy((eb) => eb.fn.agg('count', ['nostr_events.pubkey']).distinct(), 'desc');
if (filter.kinds) {
query = query.where('nostr_events.kind', '=', ({ fn, val }) => fn.any(val(filter.kinds)));
}
if (filter.authors) {
query = query.where('nostr_events.pubkey', '=', ({ fn, val }) => fn.any(val(filter.authors)));
}
if (typeof filter.since === 'number') {
query = query.where('nostr_events.created_at', '>=', filter.since);
}
if (typeof filter.until === 'number') {
query = query.where('nostr_events.created_at', '<=', filter.until);
}
if (typeof filter.limit === 'number') {
query = query.limit(filter.limit);
}
const rows = await query.execute();
return rows.map((row) => ({
value: row.value,
authors: Number(row.authors),
uses: Number(row.uses),
}));
}
if (dialect === 'sqlite') {
let query = kysely
.selectFrom('nostr_tags')
.select(({ fn }) => [
@ -51,6 +92,9 @@ export async function getTrendingTagValues(
authors: Number(row.authors),
uses: Number(row.uses),
}));
}
return [];
}
/** Get trending tags and publish an event with them. */
@ -63,7 +107,7 @@ export async function updateTrendingTags(
aliases?: string[],
) {
console.info(`Updating trending ${l}...`);
const kysely = await DittoDB.getInstance();
const db = await DittoDB.getInstance();
const signal = AbortSignal.timeout(1000);
const yesterday = Math.floor((Date.now() - Time.days(1)) / 1000);
@ -72,7 +116,7 @@ export async function updateTrendingTags(
const tagNames = aliases ? [tagName, ...aliases] : [tagName];
try {
const trends = await getTrendingTagValues(kysely, tagNames, {
const trends = await getTrendingTagValues(db, tagNames, {
kinds,
since: yesterday,
until: now,