From e377d7b3cedc3f8918dd9887407f0b302cf49ed2 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 4 Sep 2023 12:58:35 -0500 Subject: [PATCH 01/24] events/db: add a deleteFilters function to delete all events from the local db matching the filters --- src/db/events.test.ts | 15 ++++++++++++++- src/db/events.ts | 22 +++++++++++++++++++--- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/src/db/events.test.ts b/src/db/events.test.ts index 94a4e235..6193960a 100644 --- a/src/db/events.test.ts +++ b/src/db/events.test.ts @@ -1,7 +1,13 @@ import event55920b75 from '~/fixtures/events/55920b75.json' assert { type: 'json' }; import { assertEquals } from '@/deps-test.ts'; -import { getFilters, insertEvent } from './events.ts'; +import { countFilters, deleteFilters, getFilters, insertEvent } from './events.ts'; + +Deno.test('count filters', async () => { + assertEquals(await countFilters([{ kinds: [1] }]), 0); + await insertEvent(event55920b75); + assertEquals(await countFilters([{ kinds: [1] }]), 1); +}); Deno.test('insert and filter events', async () => { await insertEvent(event55920b75); @@ -15,3 +21,10 @@ Deno.test('insert and filter events', async () => { [event55920b75], ); }); + +Deno.test('delete events', async () => { + await insertEvent(event55920b75); + assertEquals(await getFilters([{ kinds: [1] }]), [event55920b75]); + await deleteFilters([{ kinds: [1] }]); + assertEquals(await getFilters([{ kinds: [1] }]), []); +}); diff --git a/src/db/events.ts b/src/db/events.ts index 1b30f5b3..62db3d54 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -128,13 +128,18 @@ function getFilterQuery(filter: DittoFilter) { return query; } +/** Combine filter queries into a single union query. */ +function getFiltersQuery(filters: DittoFilter[]) { + return filters.map(getFilterQuery).reduce((acc, curr) => acc.union(curr)); +} + /** Get events for filters from the database. */ async function getFilters( filters: DittoFilter[], opts: GetFiltersOpts = {}, ): Promise[]> { if (!filters.length) return Promise.resolve([]); - let query = filters.map(getFilterQuery).reduce((acc, curr) => acc.union(curr)); + let query = getFiltersQuery(filters); if (typeof opts.limit === 'number') { query = query.limit(opts.limit); @@ -145,10 +150,21 @@ async function getFilters( )); } +/** Delete events based on filters from the database. */ +function deleteFilters(filters: DittoFilter[]) { + if (!filters.length) return Promise.resolve(); + const query = getFiltersQuery(filters); + + return db + .deleteFrom('events') + .where('id', 'in', () => query.clearSelect().select('id')) + .execute(); +} + /** Get number of events that would be returned by filters. */ async function countFilters(filters: DittoFilter[]): Promise { if (!filters.length) return Promise.resolve(0); - const query = filters.map(getFilterQuery).reduce((acc, curr) => acc.union(curr)); + const query = getFiltersQuery(filters); const [{ count }] = await query .clearSelect() @@ -176,4 +192,4 @@ function buildUserSearchContent(event: Event<0>): string { return [name, nip05, about].filter(Boolean).join('\n'); } -export { countFilters, getFilters, insertEvent }; +export { countFilters, deleteFilters, getFilters, insertEvent }; From 1f06035bf26509d72e394cdf5305b1647c0a6b58 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 4 Sep 2023 13:03:10 -0500 Subject: [PATCH 02/24] db/events: getFiltersQuery, improve variable names --- src/db/events.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db/events.ts b/src/db/events.ts index 62db3d54..87c8c3f9 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -130,7 +130,7 @@ function getFilterQuery(filter: DittoFilter) { /** Combine filter queries into a single union query. */ function getFiltersQuery(filters: DittoFilter[]) { - return filters.map(getFilterQuery).reduce((acc, curr) => acc.union(curr)); + return filters.map(getFilterQuery).reduce((result, query) => result.union(query)); } /** Get events for filters from the database. */ From 561ae9532af78472893fc5430a2c380eb828cc70 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 4 Sep 2023 13:04:20 -0500 Subject: [PATCH 03/24] db/events: getFiltersQuery, break to multiple lines for readability --- src/db/events.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/db/events.ts b/src/db/events.ts index 87c8c3f9..3edefecb 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -130,7 +130,9 @@ function getFilterQuery(filter: DittoFilter) { /** Combine filter queries into a single union query. */ function getFiltersQuery(filters: DittoFilter[]) { - return filters.map(getFilterQuery).reduce((result, query) => result.union(query)); + return filters + .map(getFilterQuery) + .reduce((result, query) => result.union(query)); } /** Get events for filters from the database. */ From 25e023aaf261a1a05e9da7c5a1b9bbc6258bcdd9 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 4 Sep 2023 13:18:11 -0500 Subject: [PATCH 04/24] db/users: add indexes on users.pubkey and users.username --- src/db/migrations/004_add_user_indexes.ts | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 src/db/migrations/004_add_user_indexes.ts diff --git a/src/db/migrations/004_add_user_indexes.ts b/src/db/migrations/004_add_user_indexes.ts new file mode 100644 index 00000000..e77693b9 --- /dev/null +++ b/src/db/migrations/004_add_user_indexes.ts @@ -0,0 +1,20 @@ +import { Kysely } from '@/deps.ts'; + +export async function up(db: Kysely): Promise { + await db.schema + .createIndex('idx_users_pubkey') + .on('users') + .column('pubkey') + .execute(); + + await db.schema + .createIndex('idx_users_username') + .on('users') + .column('username') + .execute(); +} + +export async function down(db: Kysely): Promise { + await db.schema.dropIndex('idx_users_pubkey').execute(); + await db.schema.dropIndex('idx_users_username').execute(); +} From 8ec215402f7eb0d6e02958478e638cc15f255bc9 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 4 Sep 2023 13:19:27 -0500 Subject: [PATCH 05/24] db/migrations: fix `down` in 003_events_admin to drop the new "admin" column, not the "relays" table (whoops) --- src/db/migrations/003_events_admin.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db/migrations/003_events_admin.ts b/src/db/migrations/003_events_admin.ts index 9183322c..e481a59a 100644 --- a/src/db/migrations/003_events_admin.ts +++ b/src/db/migrations/003_events_admin.ts @@ -8,5 +8,5 @@ export async function up(db: Kysely): Promise { } export async function down(db: Kysely): Promise { - await db.schema.dropTable('relays').execute(); + await db.schema.alterTable('users').dropColumn('admin').execute(); } From 08dd5fa4ebece7a5051d981a9ba5c7c2b7a33887 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 4 Sep 2023 17:17:47 -0500 Subject: [PATCH 06/24] db/events: fix `local: false` filter --- src/db/events.test.ts | 19 +++++++++++++++++++ src/db/events.ts | 6 ++++-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/src/db/events.test.ts b/src/db/events.test.ts index 6193960a..a9ab016d 100644 --- a/src/db/events.test.ts +++ b/src/db/events.test.ts @@ -2,6 +2,7 @@ import event55920b75 from '~/fixtures/events/55920b75.json' assert { type: 'json import { assertEquals } from '@/deps-test.ts'; import { countFilters, deleteFilters, getFilters, insertEvent } from './events.ts'; +import { insertUser } from '@/db/users.ts'; Deno.test('count filters', async () => { assertEquals(await countFilters([{ kinds: [1] }]), 0); @@ -28,3 +29,21 @@ Deno.test('delete events', async () => { await deleteFilters([{ kinds: [1] }]); assertEquals(await getFilters([{ kinds: [1] }]), []); }); + +Deno.test('query events with local filter', async () => { + await insertEvent(event55920b75); + + assertEquals(await getFilters([{}]), [event55920b75]); + assertEquals(await getFilters([{ local: true }]), []); + assertEquals(await getFilters([{ local: false }]), [event55920b75]); + + await insertUser({ + username: 'alex', + pubkey: event55920b75.pubkey, + inserted_at: new Date(), + admin: 0, + }); + + assertEquals(await getFilters([{ local: true }]), [event55920b75]); + assertEquals(await getFilters([{ local: false }]), []); +}); diff --git a/src/db/events.ts b/src/db/events.ts index 3edefecb..59480d38 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -115,8 +115,10 @@ function getFilterQuery(filter: DittoFilter) { } } - if (filter.local) { - query = query.innerJoin('users', 'users.pubkey', 'events.pubkey'); + if (typeof filter.local === 'boolean') { + query = filter.local + ? query.innerJoin('users', 'users.pubkey', 'events.pubkey') + : query.leftJoin('users', 'users.pubkey', 'events.pubkey').where('users.pubkey', 'is', null); } if (filter.search) { From 5d19c21f51ee8e63bfe88b57efed8d10c26a4b0e Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 4 Sep 2023 19:45:33 -0500 Subject: [PATCH 07/24] cron: delete old remote events every hour --- src/app.ts | 1 + src/cron.ts | 17 +++++++++++++++++ src/db/events.ts | 2 +- src/deps.ts | 1 + 4 files changed, 20 insertions(+), 1 deletion(-) create mode 100644 src/cron.ts diff --git a/src/app.ts b/src/app.ts index 82e083cc..2a36a83e 100644 --- a/src/app.ts +++ b/src/app.ts @@ -1,3 +1,4 @@ +import '@/cron.ts'; import { type Context, cors, diff --git a/src/cron.ts b/src/cron.ts new file mode 100644 index 00000000..1e08fdb8 --- /dev/null +++ b/src/cron.ts @@ -0,0 +1,17 @@ +import * as eventsDB from '@/db/events.ts'; +import { cron } from '@/deps.ts'; +import { Time } from '@/utils/time.ts'; + +/** Clean up old remote events. */ +async function cleanupEvents() { + console.log('Cleaning up old remote events...'); + + const [result] = await eventsDB.deleteFilters([{ + until: Math.floor((Date.now() - Time.days(7)) / 1000), + local: false, + }]); + + console.log(`Deleted ${result?.numDeletedRows ?? 0} events.`); +} + +cron.hourly(cleanupEvents); diff --git a/src/db/events.ts b/src/db/events.ts index 59480d38..0e03fbd6 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -156,7 +156,7 @@ async function getFilters( /** Delete events based on filters from the database. */ function deleteFilters(filters: DittoFilter[]) { - if (!filters.length) return Promise.resolve(); + if (!filters.length) return Promise.resolve([]); const query = getFiltersQuery(filters); return db diff --git a/src/deps.ts b/src/deps.ts index b26bd8f4..f4efc62c 100644 --- a/src/deps.ts +++ b/src/deps.ts @@ -65,5 +65,6 @@ export { } from 'npm:kysely@^0.25.0'; export { DenoSqliteDialect } from 'https://gitlab.com/soapbox-pub/kysely-deno-sqlite/-/raw/v1.0.1/mod.ts'; export { default as tldts } from 'npm:tldts@^6.0.14'; +export * as cron from 'https://deno.land/x/deno_cron@v1.0.0/cron.ts'; export type * as TypeFest from 'npm:type-fest@^4.3.0'; From aea07fd28a279560a433a24cf633d61ee6b2f31b Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Mon, 4 Sep 2023 22:16:43 -0500 Subject: [PATCH 08/24] db/events: fix type error --- src/db/events.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db/events.ts b/src/db/events.ts index 0e03fbd6..ea610a9a 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -117,7 +117,7 @@ function getFilterQuery(filter: DittoFilter) { if (typeof filter.local === 'boolean') { query = filter.local - ? query.innerJoin('users', 'users.pubkey', 'events.pubkey') + ? query.innerJoin('users', 'users.pubkey', 'events.pubkey') as typeof query : query.leftJoin('users', 'users.pubkey', 'events.pubkey').where('users.pubkey', 'is', null); } From df17f62ecb139cfeed938f43ca0ba7a08e7e1f20 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 5 Sep 2023 12:31:59 -0500 Subject: [PATCH 09/24] db/events: try fixing types in CI? --- src/db/events.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db/events.ts b/src/db/events.ts index ea610a9a..b901c10d 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -118,7 +118,7 @@ function getFilterQuery(filter: DittoFilter) { if (typeof filter.local === 'boolean') { query = filter.local ? query.innerJoin('users', 'users.pubkey', 'events.pubkey') as typeof query - : query.leftJoin('users', 'users.pubkey', 'events.pubkey').where('users.pubkey', 'is', null); + : query.leftJoin('users', 'users.pubkey', 'events.pubkey').where('users.pubkey', 'is', null) as typeof query; } if (filter.search) { From cdffe42cfd1a575085b68eb9ae1449f5a1fda50e Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 5 Sep 2023 13:23:45 -0500 Subject: [PATCH 10/24] db/events: delete tags during deleteFilters --- src/db/events.ts | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/db/events.ts b/src/db/events.ts index b901c10d..efb5afcd 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -157,12 +157,19 @@ async function getFilters( /** Delete events based on filters from the database. */ function deleteFilters(filters: DittoFilter[]) { if (!filters.length) return Promise.resolve([]); - const query = getFiltersQuery(filters); - return db - .deleteFrom('events') - .where('id', 'in', () => query.clearSelect().select('id')) - .execute(); + return db.transaction().execute(async (trx) => { + const query = getFiltersQuery(filters).clearSelect().select('id'); + + await trx.deleteFrom('tags') + .where('event_id', 'in', () => query) + .where('tag', 'not in', ['d', 'proxy']) + .execute(); + + return trx.deleteFrom('events') + .where('id', 'in', () => query) + .execute(); + }); } /** Get number of events that would be returned by filters. */ From 4708839fd6a0ed29af464a899951245bc0ea37da Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 5 Sep 2023 16:04:23 -0500 Subject: [PATCH 11/24] db: improve output of migrations, exit on failure --- src/db.ts | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/db.ts b/src/db.ts index 2deca9a0..a5f52afb 100644 --- a/src/db.ts +++ b/src/db.ts @@ -63,8 +63,26 @@ const migrator = new Migrator({ }), }); -console.log('Running migrations...'); -const results = await migrator.migrateToLatest(); -console.log('Migrations finished:', results); +/** Migrate the database to the latest version. */ +async function migrate() { + console.log('Running migrations...'); + const results = await migrator.migrateToLatest(); + + if (results.error) { + console.error(results.error); + Deno.exit(1); + } else { + if (!results.results?.length) { + console.log('Everything up-to-date.'); + } else { + console.log('Migrations finished!'); + for (const { migrationName, status } of results.results) { + console.log(` - ${migrationName}: ${status}`); + } + } + } +} + +await migrate(); export { db, type DittoDB, type EventRow, type TagRow, type UserRow }; From 2ff40c8fc5572bbd45910aeb7131ca41ebc64eb7 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 5 Sep 2023 16:11:38 -0500 Subject: [PATCH 12/24] db/events: rework tags, make event_id a FK to events again, drop value_2 and value_3 --- src/db.ts | 4 +- src/db/events.ts | 31 ++++--------- src/db/migrations/005_rework_tags.ts | 68 ++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 24 deletions(-) create mode 100644 src/db/migrations/005_rework_tags.ts diff --git a/src/db.ts b/src/db.ts index a5f52afb..e371ac94 100644 --- a/src/db.ts +++ b/src/db.ts @@ -29,9 +29,7 @@ interface EventFTSRow { interface TagRow { tag: string; - value_1: string | null; - value_2: string | null; - value_3: string | null; + value: string; event_id: string; } diff --git a/src/db/events.ts b/src/db/events.ts index efb5afcd..fdca7ca5 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -34,17 +34,14 @@ function insertEvent(event: Event): Promise { } const tagCounts: Record = {}; - const tags = event.tags.reduce[]>((results, tag) => { - const tagName = tag[0]; - tagCounts[tagName] = (tagCounts[tagName] || 0) + 1; + const tags = event.tags.reduce[]>((results, [name, value]) => { + tagCounts[name] = (tagCounts[name] || 0) + 1; - if (tagConditions[tagName]?.({ event, count: tagCounts[tagName] - 1 })) { + if (value && tagConditions[name]?.({ event, count: tagCounts[name] - 1 })) { results.push({ event_id: event.id, - tag: tagName, - value_1: tag[1] || null, - value_2: tag[2] || null, - value_3: tag[3] || null, + tag: name, + value, }); } @@ -111,7 +108,7 @@ function getFilterQuery(filter: DittoFilter) { query = query .leftJoin('tags', 'tags.event_id', 'events.id') .where('tags.tag', '=', tag) - .where('tags.value_1', 'in', value) as typeof query; + .where('tags.value', 'in', value) as typeof query; } } @@ -157,19 +154,11 @@ async function getFilters( /** Delete events based on filters from the database. */ function deleteFilters(filters: DittoFilter[]) { if (!filters.length) return Promise.resolve([]); + const query = getFiltersQuery(filters); - return db.transaction().execute(async (trx) => { - const query = getFiltersQuery(filters).clearSelect().select('id'); - - await trx.deleteFrom('tags') - .where('event_id', 'in', () => query) - .where('tag', 'not in', ['d', 'proxy']) - .execute(); - - return trx.deleteFrom('events') - .where('id', 'in', () => query) - .execute(); - }); + return db.deleteFrom('events') + .where('id', 'in', () => query.clearSelect().select('id')) + .execute(); } /** Get number of events that would be returned by filters. */ diff --git a/src/db/migrations/005_rework_tags.ts b/src/db/migrations/005_rework_tags.ts new file mode 100644 index 00000000..f2746701 --- /dev/null +++ b/src/db/migrations/005_rework_tags.ts @@ -0,0 +1,68 @@ +import { Kysely, sql } from '@/deps.ts'; + +export async function up(db: Kysely): Promise { + await db.schema + .createTable('tags_new') + .addColumn('tag', 'text', (col) => col.notNull()) + .addColumn('value', 'text', (col) => col.notNull()) + .addColumn('event_id', 'text', (col) => col.references('events.id').onDelete('cascade')) + .execute(); + + await sql` + INSERT INTO tags_new (tag, value, event_id) + SELECT tag, value_1 as value, event_id + FROM tags + WHERE value_1 IS NOT NULL + `.execute(db); + + await db.schema + .dropTable('tags') + .execute(); + + await db.schema + .alterTable('tags_new') + .renameTo('tags').execute(); + + await db.schema + .createIndex('idx_tags_tag') + .on('tags') + .column('tag') + .execute(); + + await db.schema + .createIndex('idx_tags_value') + .on('tags') + .column('value') + .execute(); + + await db.schema + .createIndex('idx_tags_event_id') + .on('tags') + .column('event_id') + .execute(); +} + +export async function down(db: Kysely): Promise { + await db.schema.dropTable('tags').execute(); + + await db.schema + .createTable('tags') + .addColumn('tag', 'text', (col) => col.notNull()) + .addColumn('value_1', 'text') + .addColumn('value_2', 'text') + .addColumn('value_3', 'text') + .addColumn('event_id', 'text', (col) => col.notNull()) + .execute(); + + await db.schema + .createIndex('idx_tags_tag') + .on('tags') + .column('tag') + .execute(); + + await db.schema + .createIndex('idx_tags_value_1') + .on('tags') + .column('value_1') + .execute(); +} From 5c02fd07730be2f28ce4fd1cb32611f78f0108ed Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 5 Sep 2023 16:58:34 -0500 Subject: [PATCH 13/24] db/relays: normalize url before inserting --- src/db/relays.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db/relays.ts b/src/db/relays.ts index d6f99c38..d41948f2 100644 --- a/src/db/relays.ts +++ b/src/db/relays.ts @@ -6,7 +6,7 @@ function addRelays(relays: `wss://${string}`[]) { if (!relays.length) return Promise.resolve(); const values = relays.map((url) => ({ - url, + url: new URL(url).toString(), domain: tldts.getDomain(url)!, active: true, })); From 02049ed9d1f83253d030b4edb33001f506861994 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 5 Sep 2023 17:10:20 -0500 Subject: [PATCH 14/24] pipeline: fix isFresh comparison --- src/pipeline.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pipeline.ts b/src/pipeline.ts index 573dafbf..c1974d6f 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -7,7 +7,7 @@ import { isEphemeralKind } from '@/kinds.ts'; import { isLocallyFollowed } from '@/queries.ts'; import { Sub } from '@/subs.ts'; import { trends } from '@/trends.ts'; -import { isRelay, nostrDate, nostrNow, Time } from '@/utils.ts'; +import { eventAge, isRelay, nostrDate, Time } from '@/utils.ts'; import type { EventData } from '@/types.ts'; @@ -93,7 +93,7 @@ function trackRelays(event: Event) { } /** Determine if the event is being received in a timely manner. */ -const isFresh = ({ created_at }: Event): boolean => created_at >= nostrNow() - Time.seconds(10); +const isFresh = (event: Event): boolean => eventAge(event) < Time.seconds(10); /** Distribute the event through active subscriptions. */ function streamOut(event: Event, data: EventData) { From a25d6c975588282fbe3701ce79c9b8693e4608c3 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 5 Sep 2023 17:25:14 -0500 Subject: [PATCH 15/24] db: pragma, enable fk constraints, enable autovacuum full --- src/db/migrations/006_pragma.ts | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 src/db/migrations/006_pragma.ts diff --git a/src/db/migrations/006_pragma.ts b/src/db/migrations/006_pragma.ts new file mode 100644 index 00000000..e9dbad15 --- /dev/null +++ b/src/db/migrations/006_pragma.ts @@ -0,0 +1,12 @@ +import { Kysely, sql } from '@/deps.ts'; + +export async function up(db: Kysely): Promise { + await sql`PRAGMA foreign_keys = ON`.execute(db); + await sql`PRAGMA auto_vacuum = FULL`.execute(db); + await sql`VACUUM`.execute(db); +} + +export async function down(db: Kysely): Promise { + await sql`PRAGMA foreign_keys = OFF`.execute(db); + await sql`PRAGMA auto_vacuum = NONE`.execute(db); +} From a46381849f6009430dd3bb8e56f89fea382f98fe Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 5 Sep 2023 17:25:54 -0500 Subject: [PATCH 16/24] db/events: delete FTS rows when deleting events --- src/db/events.ts | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/db/events.ts b/src/db/events.ts index fdca7ca5..b86e70d9 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -154,11 +154,18 @@ async function getFilters( /** Delete events based on filters from the database. */ function deleteFilters(filters: DittoFilter[]) { if (!filters.length) return Promise.resolve([]); - const query = getFiltersQuery(filters); - return db.deleteFrom('events') - .where('id', 'in', () => query.clearSelect().select('id')) - .execute(); + return db.transaction().execute(async (trx) => { + const query = getFiltersQuery(filters).clearSelect().select('id'); + + await trx.deleteFrom('events_fts') + .where('id', 'in', () => query) + .execute(); + + return trx.deleteFrom('events') + .where('id', 'in', () => query) + .execute(); + }); } /** Get number of events that would be returned by filters. */ From 97a3478b1a4138f464fe463882c6bacabfd5060b Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 5 Sep 2023 17:48:50 -0500 Subject: [PATCH 17/24] cron: change cleanupEvents to every 15 minutes --- src/cron.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cron.ts b/src/cron.ts index 1e08fdb8..0085c59d 100644 --- a/src/cron.ts +++ b/src/cron.ts @@ -14,4 +14,4 @@ async function cleanupEvents() { console.log(`Deleted ${result?.numDeletedRows ?? 0} events.`); } -cron.hourly(cleanupEvents); +cron.every15Minute(cleanupEvents); From 48195f02b3fcb27d486f2b8427f41495be82f63a Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 5 Sep 2023 20:29:35 -0500 Subject: [PATCH 18/24] pipeline: don't store event if a deletion for it exists --- src/pipeline.ts | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/pipeline.ts b/src/pipeline.ts index c1974d6f..1bbed40a 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -4,6 +4,7 @@ import { addRelays } from '@/db/relays.ts'; import { findUser } from '@/db/users.ts'; import { type Event, LRUCache } from '@/deps.ts'; import { isEphemeralKind } from '@/kinds.ts'; +import * as mixer from '@/mixer.ts'; import { isLocallyFollowed } from '@/queries.ts'; import { Sub } from '@/subs.ts'; import { trends } from '@/trends.ts'; @@ -49,8 +50,18 @@ const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey; /** Maybe store the event, if eligible. */ async function storeEvent(event: Event, data: EventData): Promise { if (isEphemeralKind(event.kind)) return; + if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { - await eventsDB.insertEvent(event).catch(console.warn); + const [deletion] = await mixer.getFilters( + [{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }], + { limit: 1, timeout: Time.seconds(1) }, + ); + + if (deletion) { + return Promise.reject(new RelayError('blocked', 'event was deleted')); + } else { + await eventsDB.insertEvent(event).catch(console.warn); + } } else { return Promise.reject(new RelayError('blocked', 'only registered users can post')); } From 22ddc7b1a89c1daa8513905944f4c4cc15428187 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 5 Sep 2023 20:35:58 -0500 Subject: [PATCH 19/24] pipeline: remove unnecessary async keyword from trackHashtags --- src/pipeline.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/pipeline.ts b/src/pipeline.ts index 1bbed40a..4bdec81d 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -68,8 +68,7 @@ async function storeEvent(event: Event, data: EventData): Promise { } /** Track whenever a hashtag is used, for processing trending tags. */ -// deno-lint-ignore require-await -async function trackHashtags(event: Event): Promise { +function trackHashtags(event: Event): void { const date = nostrDate(event.created_at); const tags = event.tags From e2b88d57d965b17dcf24d1e203ef67de7955a76e Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 5 Sep 2023 21:24:59 -0500 Subject: [PATCH 20/24] pipeline: process kind 5 deletion events --- src/pipeline.ts | 16 ++++++++++++++++ src/tags.ts | 14 ++++++++++++++ 2 files changed, 30 insertions(+) create mode 100644 src/tags.ts diff --git a/src/pipeline.ts b/src/pipeline.ts index 4bdec81d..dddc0f65 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -7,6 +7,7 @@ import { isEphemeralKind } from '@/kinds.ts'; import * as mixer from '@/mixer.ts'; import { isLocallyFollowed } from '@/queries.ts'; import { Sub } from '@/subs.ts'; +import { getTagSet } from '@/tags.ts'; import { trends } from '@/trends.ts'; import { eventAge, isRelay, nostrDate, Time } from '@/utils.ts'; @@ -22,6 +23,7 @@ async function handleEvent(event: Event): Promise { await Promise.all([ storeEvent(event, data), + processDeletions(event), trackRelays(event), trackHashtags(event), streamOut(event, data), @@ -67,6 +69,20 @@ async function storeEvent(event: Event, data: EventData): Promise { } } +/** Query to-be-deleted events, ensure their pubkey matches, then delete them from the database. */ +async function processDeletions(event: Event): Promise { + if (event.kind === 5) { + const ids = getTagSet(event.tags, 'e'); + const events = await eventsDB.getFilters([{ ids: [...ids] }]); + + const deleteIds = events + .filter(({ pubkey, id }) => pubkey === event.pubkey && ids.has(id)) + .map((event) => event.id); + + await eventsDB.deleteFilters([{ ids: deleteIds }]); + } +} + /** Track whenever a hashtag is used, for processing trending tags. */ function trackHashtags(event: Event): void { const date = nostrDate(event.created_at); diff --git a/src/tags.ts b/src/tags.ts new file mode 100644 index 00000000..98efc7d2 --- /dev/null +++ b/src/tags.ts @@ -0,0 +1,14 @@ +/** Get the values for a tag in a `Set`. */ +function getTagSet(tags: string[][], tagName: string): Set { + const set = new Set(); + + tags.forEach((tag) => { + if (tag[0] === tagName) { + set.add(tag[1]); + } + }); + + return set; +} + +export { getTagSet }; From a69b7f54f84c617ae2b04c77cd5ebf0c980793a9 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 5 Sep 2023 21:38:15 -0500 Subject: [PATCH 21/24] pipeline: broadcast deletions to all known relays --- src/firehose.ts | 13 ++++++++++--- src/pipeline.ts | 14 ++++++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/src/firehose.ts b/src/firehose.ts index b6d7ad4d..a685160b 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -4,15 +4,15 @@ import { nostrNow } from '@/utils.ts'; import * as pipeline from './pipeline.ts'; -const relays = await getActiveRelays(); -const pool = new RelayPool(relays); +const _relays = await getActiveRelays(); +const pool = new RelayPool(_relays); // This file watches events on all known relays and performs // side-effects based on them, such as trending hashtag tracking // and storing events for notifications and the home feed. pool.subscribe( [{ kinds: [0, 1, 3, 5, 6, 7, 10002], limit: 0, since: nostrNow() }], - relays, + _relays, handleEvent, undefined, undefined, @@ -26,3 +26,10 @@ function handleEvent(event: Event): Promise { .handleEvent(event) .catch(() => {}); } + +/** Publish an event to the given relays, or the entire pool. */ +function publish(event: Event, relays: string[] = _relays) { + return pool.publish(event, relays); +} + +export { publish }; diff --git a/src/pipeline.ts b/src/pipeline.ts index dddc0f65..2b24fece 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -3,6 +3,7 @@ import * as eventsDB from '@/db/events.ts'; import { addRelays } from '@/db/relays.ts'; import { findUser } from '@/db/users.ts'; import { type Event, LRUCache } from '@/deps.ts'; +import { publish } from '@/firehose.ts'; import { isEphemeralKind } from '@/kinds.ts'; import * as mixer from '@/mixer.ts'; import { isLocallyFollowed } from '@/queries.ts'; @@ -27,6 +28,7 @@ async function handleEvent(event: Event): Promise { trackRelays(event), trackHashtags(event), streamOut(event, data), + broadcast(event, data), ]); } @@ -130,6 +132,18 @@ function streamOut(event: Event, data: EventData) { } } +/** + * Publish the event to other relays. + * This should only be done in certain circumstances, like mentioning a user or publishing deletions. + */ +function broadcast(event: Event, data: EventData) { + if (!data.user || !isFresh(event)) return; + + if (event.kind === 5) { + publish(event); + } +} + /** NIP-20 command line result. */ class RelayError extends Error { constructor(prefix: 'duplicate' | 'pow' | 'blocked' | 'rate-limited' | 'invalid' | 'error', message: string) { From f2ccb5254e8db3030a08c9cba6a4a4098931f6fc Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 5 Sep 2023 21:52:16 -0500 Subject: [PATCH 22/24] cron: run cleanupEvents on startup --- src/cron.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/cron.ts b/src/cron.ts index 0085c59d..a08fdf64 100644 --- a/src/cron.ts +++ b/src/cron.ts @@ -11,7 +11,9 @@ async function cleanupEvents() { local: false, }]); - console.log(`Deleted ${result?.numDeletedRows ?? 0} events.`); + console.log(`Cleaned up ${result?.numDeletedRows ?? 0} old remote events.`); } +await cleanupEvents(); + cron.every15Minute(cleanupEvents); From 17c75e67616a79e62644d4f1fb62d605e69817f1 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 5 Sep 2023 22:00:32 -0500 Subject: [PATCH 23/24] Move pool to a separate module (to avoid importing firehose in tests) --- src/firehose.ts | 16 +++------------- src/pipeline.ts | 2 +- src/pool.ts | 12 ++++++++++++ 3 files changed, 16 insertions(+), 14 deletions(-) create mode 100644 src/pool.ts diff --git a/src/firehose.ts b/src/firehose.ts index a685160b..a510d656 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -1,18 +1,15 @@ -import { getActiveRelays } from '@/db/relays.ts'; -import { type Event, RelayPool } from '@/deps.ts'; +import { type Event } from '@/deps.ts'; +import { allRelays, pool } from '@/pool.ts'; import { nostrNow } from '@/utils.ts'; import * as pipeline from './pipeline.ts'; -const _relays = await getActiveRelays(); -const pool = new RelayPool(_relays); - // This file watches events on all known relays and performs // side-effects based on them, such as trending hashtag tracking // and storing events for notifications and the home feed. pool.subscribe( [{ kinds: [0, 1, 3, 5, 6, 7, 10002], limit: 0, since: nostrNow() }], - _relays, + allRelays, handleEvent, undefined, undefined, @@ -26,10 +23,3 @@ function handleEvent(event: Event): Promise { .handleEvent(event) .catch(() => {}); } - -/** Publish an event to the given relays, or the entire pool. */ -function publish(event: Event, relays: string[] = _relays) { - return pool.publish(event, relays); -} - -export { publish }; diff --git a/src/pipeline.ts b/src/pipeline.ts index 2b24fece..38e32141 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -3,9 +3,9 @@ import * as eventsDB from '@/db/events.ts'; import { addRelays } from '@/db/relays.ts'; import { findUser } from '@/db/users.ts'; import { type Event, LRUCache } from '@/deps.ts'; -import { publish } from '@/firehose.ts'; import { isEphemeralKind } from '@/kinds.ts'; import * as mixer from '@/mixer.ts'; +import { publish } from '@/pool.ts'; import { isLocallyFollowed } from '@/queries.ts'; import { Sub } from '@/subs.ts'; import { getTagSet } from '@/tags.ts'; diff --git a/src/pool.ts b/src/pool.ts new file mode 100644 index 00000000..5b5eea7c --- /dev/null +++ b/src/pool.ts @@ -0,0 +1,12 @@ +import { getActiveRelays } from '@/db/relays.ts'; +import { type Event, RelayPool } from '@/deps.ts'; + +const allRelays = await getActiveRelays(); +const pool = new RelayPool(allRelays); + +/** Publish an event to the given relays, or the entire pool. */ +function publish(event: Event, relays: string[] = allRelays) { + return pool.publish(event, relays); +} + +export { allRelays, pool, publish }; From 5f82f4f11b494fdf5b152044c6ba4766c69e1d79 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 6 Sep 2023 01:18:07 -0500 Subject: [PATCH 24/24] client: use the same pool as the firehose --- src/client.ts | 31 ++++--------------------------- src/config.ts | 8 -------- 2 files changed, 4 insertions(+), 35 deletions(-) diff --git a/src/client.ts b/src/client.ts index dd0efcc3..c3d7cd9a 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,32 +1,9 @@ -import { Conf } from '@/config.ts'; -import { type Event, type Filter, matchFilters, RelayPool, TTLCache } from '@/deps.ts'; +import { type Event, type Filter, matchFilters } from '@/deps.ts'; import * as pipeline from '@/pipeline.ts'; -import { Time } from '@/utils.ts'; +import { allRelays, pool } from '@/pool.ts'; import type { GetFiltersOpts } from '@/filter.ts'; -type Pool = InstanceType; - -/** HACK: Websockets in Deno are finnicky... get a new pool every 30 minutes. */ -const poolCache = new TTLCache<0, Pool>({ - ttl: Time.minutes(30), - max: 2, - dispose: (pool) => { - console.log('Closing pool.'); - pool.close(); - }, -}); - -function getPool(): Pool { - const cached = poolCache.get(0); - if (cached !== undefined) return cached; - - console.log('Creating new pool.'); - const pool = new RelayPool(Conf.poolRelays); - poolCache.set(0, pool); - return pool; -} - /** Get events from a NIP-01 filter. */ function getFilters(filters: Filter[], opts: GetFiltersOpts = {}): Promise[]> { if (!filters.length) return Promise.resolve([]); @@ -34,9 +11,9 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts let tid: number; const results: Event[] = []; - const unsub = getPool().subscribe( + const unsub = pool.subscribe( filters, - Conf.poolRelays, + allRelays, (event: Event | null) => { if (event && matchFilters(filters, event)) { pipeline.handleEvent(event).catch(() => {}); diff --git a/src/config.ts b/src/config.ts index e03ef8bc..dc870b16 100644 --- a/src/config.ts +++ b/src/config.ts @@ -58,14 +58,6 @@ const Conf = { get adminEmail() { return Deno.env.get('ADMIN_EMAIL') || 'webmaster@localhost'; }, - /** @deprecated Use relays from the database instead. */ - get poolRelays() { - return (Deno.env.get('RELAY_POOL') || '').split(',').filter(Boolean); - }, - /** @deprecated Publish only to the local relay unless users are mentioned, then try to also send to the relay of those users. Deletions should also be fanned out. */ - get publishRelays() { - return ['wss://relay.mostr.pub']; - }, /** Domain of the Ditto server as a `URL` object, for easily grabbing the `hostname`, etc. */ get url() { return new URL(Conf.localDomain);