Make DittoPgStore pubsub capable

This commit is contained in:
Alex Gleason 2025-02-18 13:34:32 -06:00
parent 37f418899b
commit 6fb873e72f
No known key found for this signature in database
GPG key ID: 7211D1F99744FBB7
5 changed files with 178 additions and 70 deletions

View file

@ -44,8 +44,8 @@ export class Storages {
public static async db(): Promise<DittoPgStore> { public static async db(): Promise<DittoPgStore> {
if (!this._db) { if (!this._db) {
this._db = (async () => { this._db = (async () => {
const kysely = await this.kysely(); const db = await this.database();
const store = new DittoPgStore({ kysely, pubkey: Conf.pubkey, timeout: Conf.db.timeouts.default }); const store = new DittoPgStore({ db, pubkey: Conf.pubkey, timeout: Conf.db.timeouts.default });
await seedZapSplits(store); await seedZapSplits(store);
return store; return store;
})(); })();

View file

@ -7,6 +7,26 @@ import { Conf } from '@/config.ts';
import { DittoPgStore } from '@/storages/DittoPgStore.ts'; import { DittoPgStore } from '@/storages/DittoPgStore.ts';
import { createTestDB } from '@/test.ts'; import { createTestDB } from '@/test.ts';
Deno.test('req streaming', async () => {
await using db = await createTestDB({ pure: true });
const { store: relay } = db;
const event1 = await eventFixture('event-1');
const promise = new Promise((resolve) => setTimeout(() => resolve(relay.event(event1)), 0));
for await (const msg of relay.req([{ since: 0 }])) {
if (msg[0] === 'EVENT') {
assertEquals(relay.subs.size, 1);
assertEquals(msg[2], event1);
break;
}
}
await promise;
assertEquals(relay.subs.size, 0); // cleanup
});
Deno.test('count filters', async () => { Deno.test('count filters', async () => {
await using db = await createTestDB({ pure: true }); await using db = await createTestDB({ pure: true });
const { store } = db; const { store } = db;

View file

@ -1,16 +1,27 @@
// deno-lint-ignore-file require-await // deno-lint-ignore-file require-await
import { DittoTables } from '@ditto/db'; import { DittoDatabase, DittoTables } from '@ditto/db';
import { detectLanguage } from '@ditto/lang'; import { detectLanguage } from '@ditto/lang';
import { NPostgres, NPostgresSchema } from '@nostrify/db'; import { NPostgres, NPostgresSchema } from '@nostrify/db';
import { dbEventsCounter } from '@ditto/metrics'; import { dbEventsCounter, internalSubscriptionsSizeGauge } from '@ditto/metrics';
import { NIP50, NKinds, NostrEvent, NostrFilter, NSchema as n } from '@nostrify/nostrify'; import {
NIP50,
NKinds,
NostrEvent,
NostrFilter,
NostrRelayCLOSED,
NostrRelayEOSE,
NostrRelayEVENT,
NSchema as n,
} from '@nostrify/nostrify';
import { Machina } from '@nostrify/nostrify/utils';
import { logi } from '@soapbox/logi'; import { logi } from '@soapbox/logi';
import { JsonValue } from '@std/json'; import { JsonValue } from '@std/json';
import { LanguageCode } from 'iso-639-1'; import { LanguageCode } from 'iso-639-1';
import { Kysely } from 'kysely'; import { Kysely } from 'kysely';
import linkify from 'linkifyjs'; import linkify from 'linkifyjs';
import { nip27 } from 'nostr-tools'; import { LRUCache } from 'lru-cache';
import { matchFilter, nip27 } from 'nostr-tools';
import tldts from 'tldts'; import tldts from 'tldts';
import { z } from 'zod'; import { z } from 'zod';
@ -36,20 +47,25 @@ interface TagConditionOpts {
value: string; value: string;
} }
/** Options for the DittoPgStore store. */ /** Options for the EventsDB store. */
interface DittoPgStoreOpts { interface DittoPgStoreOpts {
/** Kysely instance to use. */ /** Kysely instance to use. */
kysely: Kysely<DittoTables>; db: DittoDatabase;
/** Pubkey of the admin account. */ /** Pubkey of the admin account. */
pubkey: string; pubkey: string;
/** Timeout in milliseconds for database queries. */ /** Timeout in milliseconds for database queries. */
timeout: number; timeout: number;
/** Whether the event returned should be a Nostr event or a Ditto event. Defaults to false. */ /** Whether the event returned should be a Nostr event or a Ditto event. Defaults to false. */
pure?: boolean; pure?: boolean;
/** Chunk size for streaming events. Defaults to 100. */
chunkSize?: number;
} }
/** SQL database storage adapter for Nostr events. */ /** SQL database storage adapter for Nostr events. */
class DittoPgStore extends NPostgres { export class DittoPgStore extends NPostgres {
readonly subs = new Map<string, { filters: NostrFilter[]; machina: Machina<NostrEvent> }>();
readonly encounters = new LRUCache<string, boolean>({ max: 100 });
/** Conditions for when to index certain tags. */ /** Conditions for when to index certain tags. */
static tagConditions: Record<string, TagCondition> = { static tagConditions: Record<string, TagCondition> = {
'a': ({ count }) => count < 15, 'a': ({ count }) => count < 15,
@ -72,65 +88,33 @@ class DittoPgStore extends NPostgres {
}, },
}; };
static indexExtensions(event: NostrEvent): Record<string, string> {
const ext: Record<string, string> = {};
if (event.kind === 1) {
ext.reply = event.tags.some(([name]) => name === 'e').toString();
} else if (event.kind === 1111) {
ext.reply = event.tags.some(([name]) => ['e', 'E'].includes(name)).toString();
} else if (event.kind === 6) {
ext.reply = 'false';
}
if ([1, 20, 30023].includes(event.kind)) {
const language = detectLanguage(event.content, 0.90);
if (language) {
ext.language = language;
}
}
const imeta: string[][][] = event.tags
.filter(([name]) => name === 'imeta')
.map(([_, ...entries]) =>
entries.map((entry) => {
const split = entry.split(' ');
return [split[0], split.splice(1).join(' ')];
})
);
// quirks mode
if (!imeta.length && event.kind === 1) {
const links = linkify.find(event.content).filter(({ type }) => type === 'url');
imeta.push(...getMediaLinks(links));
}
if (imeta.length) {
ext.media = 'true';
if (imeta.every((tags) => tags.some(([name, value]) => name === 'm' && value.startsWith('video/')))) {
ext.video = 'true';
}
}
ext.protocol = event.tags.find(([name]) => name === 'proxy')?.[2] ?? 'nostr';
return ext;
}
constructor(private opts: DittoPgStoreOpts) { constructor(private opts: DittoPgStoreOpts) {
super(opts.kysely, { super(opts.db.kysely, {
indexTags: DittoPgStore.indexTags, indexTags: DittoPgStore.indexTags,
indexSearch: DittoPgStore.searchText, indexSearch: DittoPgStore.searchText,
indexExtensions: DittoPgStore.indexExtensions, indexExtensions: DittoPgStore.indexExtensions,
chunkSize: opts.chunkSize,
});
opts.db.listen('nostr_event', async (id) => {
if (this.encounters.has(id)) return;
this.encounters.set(id, true);
const [event] = await this.query([{ ids: [id] }]);
if (event) {
this.streamOut(event);
}
}); });
} }
/** Insert an event (and its tags) into the database. */ /** Insert an event (and its tags) into the database. */
override async event(event: NostrEvent, opts: { signal?: AbortSignal; timeout?: number } = {}): Promise<void> { override async event(event: NostrEvent, opts: { signal?: AbortSignal; timeout?: number } = {}): Promise<void> {
event = purifyEvent(event); event = purifyEvent(event);
logi({ level: 'debug', ns: 'ditto.event', source: 'db', id: event.id, kind: event.kind }); logi({ level: 'debug', ns: 'ditto.event', source: 'db', id: event.id, kind: event.kind });
this.encounters.set(event.id, true);
dbEventsCounter.inc({ kind: event.kind }); dbEventsCounter.inc({ kind: event.kind });
if (await this.isDeletedAdmin(event)) { if (await this.isDeletedAdmin(event)) {
@ -141,6 +125,7 @@ class DittoPgStore extends NPostgres {
try { try {
await super.event(event, { ...opts, timeout: opts.timeout ?? this.opts.timeout }); await super.event(event, { ...opts, timeout: opts.timeout ?? this.opts.timeout });
this.streamOut(event);
} catch (e) { } catch (e) {
if (e instanceof Error && e.message === 'Cannot add a deleted event') { if (e instanceof Error && e.message === 'Cannot add a deleted event') {
throw new RelayError('blocked', 'event deleted by user'); throw new RelayError('blocked', 'event deleted by user');
@ -152,6 +137,21 @@ class DittoPgStore extends NPostgres {
} }
} }
protected matchesFilter(event: NostrEvent, filter: NostrFilter): boolean {
// TODO: support streaming by search.
return matchFilter(filter, event) && filter.search === undefined;
}
protected streamOut(event: NostrEvent): void {
for (const { filters, machina } of this.subs.values()) {
for (const filter of filters) {
if (this.matchesFilter(event, filter)) {
machina.push(event);
}
}
}
}
/** Check if an event has been deleted by the admin. */ /** Check if an event has been deleted by the admin. */
private async isDeletedAdmin(event: NostrEvent): Promise<boolean> { private async isDeletedAdmin(event: NostrEvent): Promise<boolean> {
const filters: NostrFilter[] = [ const filters: NostrFilter[] = [
@ -213,10 +213,53 @@ class DittoPgStore extends NPostgres {
} }
} }
override async *req(
filters: NostrFilter[],
opts?: { signal?: AbortSignal },
): AsyncIterable<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
const subId = crypto.randomUUID();
const normalFilters = this.normalizeFilters(filters);
if (normalFilters.length) {
const { db, chunkSize = 100 } = this.opts;
const rows = this.getEventsQuery(db.kysely as unknown as Kysely<NPostgresSchema>, normalFilters).stream(
chunkSize,
);
for await (const row of rows) {
const event = this.parseEventRow(row);
yield ['EVENT', subId, event];
if (opts?.signal?.aborted) {
yield ['CLOSED', subId, 'aborted'];
return;
}
}
}
yield ['EOSE', subId];
const machina = new Machina<DittoEvent>(opts?.signal);
this.subs.set(subId, { filters, machina });
internalSubscriptionsSizeGauge.set(this.subs.size);
try {
for await (const event of machina) {
yield ['EVENT', subId, event];
}
} catch {
yield ['CLOSED', subId, 'error: something went wrong'];
} finally {
this.subs.delete(subId);
internalSubscriptionsSizeGauge.set(this.subs.size);
}
}
/** Get events for filters from the database. */ /** Get events for filters from the database. */
override async query( override async query(
filters: NostrFilter[], filters: NostrFilter[],
opts: { signal?: AbortSignal; timeout?: number; limit?: number } = {}, opts: { signal?: AbortSignal; pure?: boolean; timeout?: number; limit?: number } = {},
): Promise<DittoEvent[]> { ): Promise<DittoEvent[]> {
filters = await this.expandFilters(filters); filters = await this.expandFilters(filters);
@ -334,6 +377,53 @@ class DittoPgStore extends NPostgres {
}, []); }, []);
} }
static indexExtensions(event: NostrEvent): Record<string, string> {
const ext: Record<string, string> = {};
if (event.kind === 1) {
ext.reply = event.tags.some(([name]) => name === 'e').toString();
} else if (event.kind === 1111) {
ext.reply = event.tags.some(([name]) => ['e', 'E'].includes(name)).toString();
} else if (event.kind === 6) {
ext.reply = 'false';
}
if ([1, 20, 30023].includes(event.kind)) {
const language = detectLanguage(event.content, 0.90);
if (language) {
ext.language = language;
}
}
const imeta: string[][][] = event.tags
.filter(([name]) => name === 'imeta')
.map(([_, ...entries]) =>
entries.map((entry) => {
const split = entry.split(' ');
return [split[0], split.splice(1).join(' ')];
})
);
// quirks mode
if (!imeta.length && event.kind === 1) {
const links = linkify.find(event.content).filter(({ type }) => type === 'url');
imeta.push(...getMediaLinks(links));
}
if (imeta.length) {
ext.media = 'true';
if (imeta.every((tags) => tags.some(([name, value]) => name === 'm' && value.startsWith('video/')))) {
ext.video = 'true';
}
}
ext.protocol = event.tags.find(([name]) => name === 'proxy')?.[2] ?? 'nostr';
return ext;
}
/** Build a search index from the event. */ /** Build a search index from the event. */
static searchText(event: NostrEvent): string { static searchText(event: NostrEvent): string {
switch (event.kind) { switch (event.kind) {
@ -385,7 +475,7 @@ class DittoPgStore extends NPostgres {
} }
if (domains.size || hostnames.size) { if (domains.size || hostnames.size) {
let query = this.opts.kysely let query = this.opts.db.kysely
.selectFrom('author_stats') .selectFrom('author_stats')
.select('pubkey') .select('pubkey')
.where((eb) => { .where((eb) => {
@ -433,5 +523,3 @@ class DittoPgStore extends NPostgres {
return super.transaction((store, kysely) => callback(store, kysely as unknown as Kysely<DittoTables>)); return super.transaction((store, kysely) => callback(store, kysely as unknown as Kysely<DittoTables>));
} }
} }
export { DittoPgStore };

View file

@ -34,31 +34,31 @@ export function genEvent(t: Partial<NostrEvent> = {}, sk: Uint8Array = generateS
/** Create a database for testing. It uses `DATABASE_URL`, or creates an in-memory database by default. */ /** Create a database for testing. It uses `DATABASE_URL`, or creates an in-memory database by default. */
export async function createTestDB(opts?: { pure?: boolean }) { export async function createTestDB(opts?: { pure?: boolean }) {
const { kysely } = DittoDB.create(Conf.databaseUrl, { poolSize: 1 }); const db = DittoDB.create(Conf.databaseUrl, { poolSize: 1 });
await DittoDB.migrate(kysely); await DittoDB.migrate(db.kysely);
const store = new DittoPgStore({ const store = new DittoPgStore({
kysely, db,
timeout: Conf.db.timeouts.default, timeout: Conf.db.timeouts.default,
pubkey: Conf.pubkey, pubkey: Conf.pubkey,
pure: opts?.pure ?? false, pure: opts?.pure ?? false,
}); });
return { return {
...db,
store, store,
kysely,
[Symbol.asyncDispose]: async () => { [Symbol.asyncDispose]: async () => {
const { rows } = await sql< const { rows } = await sql<
{ tablename: string } { tablename: string }
>`select tablename from pg_tables where schemaname = current_schema()`.execute(kysely); >`select tablename from pg_tables where schemaname = current_schema()`.execute(db.kysely);
for (const { tablename } of rows) { for (const { tablename } of rows) {
if (tablename.startsWith('kysely_')) continue; if (tablename.startsWith('kysely_')) continue;
await sql`truncate table ${sql.ref(tablename)} cascade`.execute(kysely); await sql`truncate table ${sql.ref(tablename)} cascade`.execute(db.kysely);
} }
await kysely.destroy(); await db.kysely.destroy();
}, },
}; };
} }

View file

@ -30,10 +30,10 @@ export class CustomPolicy implements NPolicy {
async init({ path, databaseUrl, pubkey }: PolicyInit): Promise<void> { async init({ path, databaseUrl, pubkey }: PolicyInit): Promise<void> {
const Policy = (await import(path)).default; const Policy = (await import(path)).default;
const { kysely } = DittoDB.create(databaseUrl, { poolSize: 1 }); const db = DittoDB.create(databaseUrl, { poolSize: 1 });
const store = new DittoPgStore({ const store = new DittoPgStore({
kysely, db,
pubkey, pubkey,
timeout: 5_000, timeout: 5_000,
}); });