diff --git a/src/app.ts b/src/app.ts index 82e083cc..2a36a83e 100644 --- a/src/app.ts +++ b/src/app.ts @@ -1,3 +1,4 @@ +import '@/cron.ts'; import { type Context, cors, diff --git a/src/client.ts b/src/client.ts index dd0efcc3..c3d7cd9a 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,32 +1,9 @@ -import { Conf } from '@/config.ts'; -import { type Event, type Filter, matchFilters, RelayPool, TTLCache } from '@/deps.ts'; +import { type Event, type Filter, matchFilters } from '@/deps.ts'; import * as pipeline from '@/pipeline.ts'; -import { Time } from '@/utils.ts'; +import { allRelays, pool } from '@/pool.ts'; import type { GetFiltersOpts } from '@/filter.ts'; -type Pool = InstanceType; - -/** HACK: Websockets in Deno are finnicky... get a new pool every 30 minutes. */ -const poolCache = new TTLCache<0, Pool>({ - ttl: Time.minutes(30), - max: 2, - dispose: (pool) => { - console.log('Closing pool.'); - pool.close(); - }, -}); - -function getPool(): Pool { - const cached = poolCache.get(0); - if (cached !== undefined) return cached; - - console.log('Creating new pool.'); - const pool = new RelayPool(Conf.poolRelays); - poolCache.set(0, pool); - return pool; -} - /** Get events from a NIP-01 filter. */ function getFilters(filters: Filter[], opts: GetFiltersOpts = {}): Promise[]> { if (!filters.length) return Promise.resolve([]); @@ -34,9 +11,9 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts let tid: number; const results: Event[] = []; - const unsub = getPool().subscribe( + const unsub = pool.subscribe( filters, - Conf.poolRelays, + allRelays, (event: Event | null) => { if (event && matchFilters(filters, event)) { pipeline.handleEvent(event).catch(() => {}); diff --git a/src/config.ts b/src/config.ts index e03ef8bc..dc870b16 100644 --- a/src/config.ts +++ b/src/config.ts @@ -58,14 +58,6 @@ const Conf = { get adminEmail() { return Deno.env.get('ADMIN_EMAIL') || 'webmaster@localhost'; }, - /** @deprecated Use relays from the database instead. */ - get poolRelays() { - return (Deno.env.get('RELAY_POOL') || '').split(',').filter(Boolean); - }, - /** @deprecated Publish only to the local relay unless users are mentioned, then try to also send to the relay of those users. Deletions should also be fanned out. */ - get publishRelays() { - return ['wss://relay.mostr.pub']; - }, /** Domain of the Ditto server as a `URL` object, for easily grabbing the `hostname`, etc. */ get url() { return new URL(Conf.localDomain); diff --git a/src/cron.ts b/src/cron.ts new file mode 100644 index 00000000..a08fdf64 --- /dev/null +++ b/src/cron.ts @@ -0,0 +1,19 @@ +import * as eventsDB from '@/db/events.ts'; +import { cron } from '@/deps.ts'; +import { Time } from '@/utils/time.ts'; + +/** Clean up old remote events. */ +async function cleanupEvents() { + console.log('Cleaning up old remote events...'); + + const [result] = await eventsDB.deleteFilters([{ + until: Math.floor((Date.now() - Time.days(7)) / 1000), + local: false, + }]); + + console.log(`Cleaned up ${result?.numDeletedRows ?? 0} old remote events.`); +} + +await cleanupEvents(); + +cron.every15Minute(cleanupEvents); diff --git a/src/db.ts b/src/db.ts index 2deca9a0..e371ac94 100644 --- a/src/db.ts +++ b/src/db.ts @@ -29,9 +29,7 @@ interface EventFTSRow { interface TagRow { tag: string; - value_1: string | null; - value_2: string | null; - value_3: string | null; + value: string; event_id: string; } @@ -63,8 +61,26 @@ const migrator = new Migrator({ }), }); -console.log('Running migrations...'); -const results = await migrator.migrateToLatest(); -console.log('Migrations finished:', results); +/** Migrate the database to the latest version. */ +async function migrate() { + console.log('Running migrations...'); + const results = await migrator.migrateToLatest(); + + if (results.error) { + console.error(results.error); + Deno.exit(1); + } else { + if (!results.results?.length) { + console.log('Everything up-to-date.'); + } else { + console.log('Migrations finished!'); + for (const { migrationName, status } of results.results) { + console.log(` - ${migrationName}: ${status}`); + } + } + } +} + +await migrate(); export { db, type DittoDB, type EventRow, type TagRow, type UserRow }; diff --git a/src/db/events.test.ts b/src/db/events.test.ts index 94a4e235..a9ab016d 100644 --- a/src/db/events.test.ts +++ b/src/db/events.test.ts @@ -1,7 +1,14 @@ import event55920b75 from '~/fixtures/events/55920b75.json' assert { type: 'json' }; import { assertEquals } from '@/deps-test.ts'; -import { getFilters, insertEvent } from './events.ts'; +import { countFilters, deleteFilters, getFilters, insertEvent } from './events.ts'; +import { insertUser } from '@/db/users.ts'; + +Deno.test('count filters', async () => { + assertEquals(await countFilters([{ kinds: [1] }]), 0); + await insertEvent(event55920b75); + assertEquals(await countFilters([{ kinds: [1] }]), 1); +}); Deno.test('insert and filter events', async () => { await insertEvent(event55920b75); @@ -15,3 +22,28 @@ Deno.test('insert and filter events', async () => { [event55920b75], ); }); + +Deno.test('delete events', async () => { + await insertEvent(event55920b75); + assertEquals(await getFilters([{ kinds: [1] }]), [event55920b75]); + await deleteFilters([{ kinds: [1] }]); + assertEquals(await getFilters([{ kinds: [1] }]), []); +}); + +Deno.test('query events with local filter', async () => { + await insertEvent(event55920b75); + + assertEquals(await getFilters([{}]), [event55920b75]); + assertEquals(await getFilters([{ local: true }]), []); + assertEquals(await getFilters([{ local: false }]), [event55920b75]); + + await insertUser({ + username: 'alex', + pubkey: event55920b75.pubkey, + inserted_at: new Date(), + admin: 0, + }); + + assertEquals(await getFilters([{ local: true }]), [event55920b75]); + assertEquals(await getFilters([{ local: false }]), []); +}); diff --git a/src/db/events.ts b/src/db/events.ts index 1b30f5b3..b86e70d9 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -34,17 +34,14 @@ function insertEvent(event: Event): Promise { } const tagCounts: Record = {}; - const tags = event.tags.reduce[]>((results, tag) => { - const tagName = tag[0]; - tagCounts[tagName] = (tagCounts[tagName] || 0) + 1; + const tags = event.tags.reduce[]>((results, [name, value]) => { + tagCounts[name] = (tagCounts[name] || 0) + 1; - if (tagConditions[tagName]?.({ event, count: tagCounts[tagName] - 1 })) { + if (value && tagConditions[name]?.({ event, count: tagCounts[name] - 1 })) { results.push({ event_id: event.id, - tag: tagName, - value_1: tag[1] || null, - value_2: tag[2] || null, - value_3: tag[3] || null, + tag: name, + value, }); } @@ -111,12 +108,14 @@ function getFilterQuery(filter: DittoFilter) { query = query .leftJoin('tags', 'tags.event_id', 'events.id') .where('tags.tag', '=', tag) - .where('tags.value_1', 'in', value) as typeof query; + .where('tags.value', 'in', value) as typeof query; } } - if (filter.local) { - query = query.innerJoin('users', 'users.pubkey', 'events.pubkey'); + if (typeof filter.local === 'boolean') { + query = filter.local + ? query.innerJoin('users', 'users.pubkey', 'events.pubkey') as typeof query + : query.leftJoin('users', 'users.pubkey', 'events.pubkey').where('users.pubkey', 'is', null) as typeof query; } if (filter.search) { @@ -128,13 +127,20 @@ function getFilterQuery(filter: DittoFilter) { return query; } +/** Combine filter queries into a single union query. */ +function getFiltersQuery(filters: DittoFilter[]) { + return filters + .map(getFilterQuery) + .reduce((result, query) => result.union(query)); +} + /** Get events for filters from the database. */ async function getFilters( filters: DittoFilter[], opts: GetFiltersOpts = {}, ): Promise[]> { if (!filters.length) return Promise.resolve([]); - let query = filters.map(getFilterQuery).reduce((acc, curr) => acc.union(curr)); + let query = getFiltersQuery(filters); if (typeof opts.limit === 'number') { query = query.limit(opts.limit); @@ -145,10 +151,27 @@ async function getFilters( )); } +/** Delete events based on filters from the database. */ +function deleteFilters(filters: DittoFilter[]) { + if (!filters.length) return Promise.resolve([]); + + return db.transaction().execute(async (trx) => { + const query = getFiltersQuery(filters).clearSelect().select('id'); + + await trx.deleteFrom('events_fts') + .where('id', 'in', () => query) + .execute(); + + return trx.deleteFrom('events') + .where('id', 'in', () => query) + .execute(); + }); +} + /** Get number of events that would be returned by filters. */ async function countFilters(filters: DittoFilter[]): Promise { if (!filters.length) return Promise.resolve(0); - const query = filters.map(getFilterQuery).reduce((acc, curr) => acc.union(curr)); + const query = getFiltersQuery(filters); const [{ count }] = await query .clearSelect() @@ -176,4 +199,4 @@ function buildUserSearchContent(event: Event<0>): string { return [name, nip05, about].filter(Boolean).join('\n'); } -export { countFilters, getFilters, insertEvent }; +export { countFilters, deleteFilters, getFilters, insertEvent }; diff --git a/src/db/migrations/003_events_admin.ts b/src/db/migrations/003_events_admin.ts index 9183322c..e481a59a 100644 --- a/src/db/migrations/003_events_admin.ts +++ b/src/db/migrations/003_events_admin.ts @@ -8,5 +8,5 @@ export async function up(db: Kysely): Promise { } export async function down(db: Kysely): Promise { - await db.schema.dropTable('relays').execute(); + await db.schema.alterTable('users').dropColumn('admin').execute(); } diff --git a/src/db/migrations/004_add_user_indexes.ts b/src/db/migrations/004_add_user_indexes.ts new file mode 100644 index 00000000..e77693b9 --- /dev/null +++ b/src/db/migrations/004_add_user_indexes.ts @@ -0,0 +1,20 @@ +import { Kysely } from '@/deps.ts'; + +export async function up(db: Kysely): Promise { + await db.schema + .createIndex('idx_users_pubkey') + .on('users') + .column('pubkey') + .execute(); + + await db.schema + .createIndex('idx_users_username') + .on('users') + .column('username') + .execute(); +} + +export async function down(db: Kysely): Promise { + await db.schema.dropIndex('idx_users_pubkey').execute(); + await db.schema.dropIndex('idx_users_username').execute(); +} diff --git a/src/db/migrations/005_rework_tags.ts b/src/db/migrations/005_rework_tags.ts new file mode 100644 index 00000000..f2746701 --- /dev/null +++ b/src/db/migrations/005_rework_tags.ts @@ -0,0 +1,68 @@ +import { Kysely, sql } from '@/deps.ts'; + +export async function up(db: Kysely): Promise { + await db.schema + .createTable('tags_new') + .addColumn('tag', 'text', (col) => col.notNull()) + .addColumn('value', 'text', (col) => col.notNull()) + .addColumn('event_id', 'text', (col) => col.references('events.id').onDelete('cascade')) + .execute(); + + await sql` + INSERT INTO tags_new (tag, value, event_id) + SELECT tag, value_1 as value, event_id + FROM tags + WHERE value_1 IS NOT NULL + `.execute(db); + + await db.schema + .dropTable('tags') + .execute(); + + await db.schema + .alterTable('tags_new') + .renameTo('tags').execute(); + + await db.schema + .createIndex('idx_tags_tag') + .on('tags') + .column('tag') + .execute(); + + await db.schema + .createIndex('idx_tags_value') + .on('tags') + .column('value') + .execute(); + + await db.schema + .createIndex('idx_tags_event_id') + .on('tags') + .column('event_id') + .execute(); +} + +export async function down(db: Kysely): Promise { + await db.schema.dropTable('tags').execute(); + + await db.schema + .createTable('tags') + .addColumn('tag', 'text', (col) => col.notNull()) + .addColumn('value_1', 'text') + .addColumn('value_2', 'text') + .addColumn('value_3', 'text') + .addColumn('event_id', 'text', (col) => col.notNull()) + .execute(); + + await db.schema + .createIndex('idx_tags_tag') + .on('tags') + .column('tag') + .execute(); + + await db.schema + .createIndex('idx_tags_value_1') + .on('tags') + .column('value_1') + .execute(); +} diff --git a/src/db/migrations/006_pragma.ts b/src/db/migrations/006_pragma.ts new file mode 100644 index 00000000..e9dbad15 --- /dev/null +++ b/src/db/migrations/006_pragma.ts @@ -0,0 +1,12 @@ +import { Kysely, sql } from '@/deps.ts'; + +export async function up(db: Kysely): Promise { + await sql`PRAGMA foreign_keys = ON`.execute(db); + await sql`PRAGMA auto_vacuum = FULL`.execute(db); + await sql`VACUUM`.execute(db); +} + +export async function down(db: Kysely): Promise { + await sql`PRAGMA foreign_keys = OFF`.execute(db); + await sql`PRAGMA auto_vacuum = NONE`.execute(db); +} diff --git a/src/db/relays.ts b/src/db/relays.ts index d6f99c38..d41948f2 100644 --- a/src/db/relays.ts +++ b/src/db/relays.ts @@ -6,7 +6,7 @@ function addRelays(relays: `wss://${string}`[]) { if (!relays.length) return Promise.resolve(); const values = relays.map((url) => ({ - url, + url: new URL(url).toString(), domain: tldts.getDomain(url)!, active: true, })); diff --git a/src/deps.ts b/src/deps.ts index b26bd8f4..f4efc62c 100644 --- a/src/deps.ts +++ b/src/deps.ts @@ -65,5 +65,6 @@ export { } from 'npm:kysely@^0.25.0'; export { DenoSqliteDialect } from 'https://gitlab.com/soapbox-pub/kysely-deno-sqlite/-/raw/v1.0.1/mod.ts'; export { default as tldts } from 'npm:tldts@^6.0.14'; +export * as cron from 'https://deno.land/x/deno_cron@v1.0.0/cron.ts'; export type * as TypeFest from 'npm:type-fest@^4.3.0'; diff --git a/src/firehose.ts b/src/firehose.ts index b6d7ad4d..a510d656 100644 --- a/src/firehose.ts +++ b/src/firehose.ts @@ -1,18 +1,15 @@ -import { getActiveRelays } from '@/db/relays.ts'; -import { type Event, RelayPool } from '@/deps.ts'; +import { type Event } from '@/deps.ts'; +import { allRelays, pool } from '@/pool.ts'; import { nostrNow } from '@/utils.ts'; import * as pipeline from './pipeline.ts'; -const relays = await getActiveRelays(); -const pool = new RelayPool(relays); - // This file watches events on all known relays and performs // side-effects based on them, such as trending hashtag tracking // and storing events for notifications and the home feed. pool.subscribe( [{ kinds: [0, 1, 3, 5, 6, 7, 10002], limit: 0, since: nostrNow() }], - relays, + allRelays, handleEvent, undefined, undefined, diff --git a/src/pipeline.ts b/src/pipeline.ts index 573dafbf..38e32141 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -4,10 +4,13 @@ import { addRelays } from '@/db/relays.ts'; import { findUser } from '@/db/users.ts'; import { type Event, LRUCache } from '@/deps.ts'; import { isEphemeralKind } from '@/kinds.ts'; +import * as mixer from '@/mixer.ts'; +import { publish } from '@/pool.ts'; import { isLocallyFollowed } from '@/queries.ts'; import { Sub } from '@/subs.ts'; +import { getTagSet } from '@/tags.ts'; import { trends } from '@/trends.ts'; -import { isRelay, nostrDate, nostrNow, Time } from '@/utils.ts'; +import { eventAge, isRelay, nostrDate, Time } from '@/utils.ts'; import type { EventData } from '@/types.ts'; @@ -21,9 +24,11 @@ async function handleEvent(event: Event): Promise { await Promise.all([ storeEvent(event, data), + processDeletions(event), trackRelays(event), trackHashtags(event), streamOut(event, data), + broadcast(event, data), ]); } @@ -49,16 +54,39 @@ const isAdminEvent = ({ pubkey }: Event): boolean => pubkey === Conf.pubkey; /** Maybe store the event, if eligible. */ async function storeEvent(event: Event, data: EventData): Promise { if (isEphemeralKind(event.kind)) return; + if (data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { - await eventsDB.insertEvent(event).catch(console.warn); + const [deletion] = await mixer.getFilters( + [{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }], + { limit: 1, timeout: Time.seconds(1) }, + ); + + if (deletion) { + return Promise.reject(new RelayError('blocked', 'event was deleted')); + } else { + await eventsDB.insertEvent(event).catch(console.warn); + } } else { return Promise.reject(new RelayError('blocked', 'only registered users can post')); } } +/** Query to-be-deleted events, ensure their pubkey matches, then delete them from the database. */ +async function processDeletions(event: Event): Promise { + if (event.kind === 5) { + const ids = getTagSet(event.tags, 'e'); + const events = await eventsDB.getFilters([{ ids: [...ids] }]); + + const deleteIds = events + .filter(({ pubkey, id }) => pubkey === event.pubkey && ids.has(id)) + .map((event) => event.id); + + await eventsDB.deleteFilters([{ ids: deleteIds }]); + } +} + /** Track whenever a hashtag is used, for processing trending tags. */ -// deno-lint-ignore require-await -async function trackHashtags(event: Event): Promise { +function trackHashtags(event: Event): void { const date = nostrDate(event.created_at); const tags = event.tags @@ -93,7 +121,7 @@ function trackRelays(event: Event) { } /** Determine if the event is being received in a timely manner. */ -const isFresh = ({ created_at }: Event): boolean => created_at >= nostrNow() - Time.seconds(10); +const isFresh = (event: Event): boolean => eventAge(event) < Time.seconds(10); /** Distribute the event through active subscriptions. */ function streamOut(event: Event, data: EventData) { @@ -104,6 +132,18 @@ function streamOut(event: Event, data: EventData) { } } +/** + * Publish the event to other relays. + * This should only be done in certain circumstances, like mentioning a user or publishing deletions. + */ +function broadcast(event: Event, data: EventData) { + if (!data.user || !isFresh(event)) return; + + if (event.kind === 5) { + publish(event); + } +} + /** NIP-20 command line result. */ class RelayError extends Error { constructor(prefix: 'duplicate' | 'pow' | 'blocked' | 'rate-limited' | 'invalid' | 'error', message: string) { diff --git a/src/pool.ts b/src/pool.ts new file mode 100644 index 00000000..5b5eea7c --- /dev/null +++ b/src/pool.ts @@ -0,0 +1,12 @@ +import { getActiveRelays } from '@/db/relays.ts'; +import { type Event, RelayPool } from '@/deps.ts'; + +const allRelays = await getActiveRelays(); +const pool = new RelayPool(allRelays); + +/** Publish an event to the given relays, or the entire pool. */ +function publish(event: Event, relays: string[] = allRelays) { + return pool.publish(event, relays); +} + +export { allRelays, pool, publish }; diff --git a/src/tags.ts b/src/tags.ts new file mode 100644 index 00000000..98efc7d2 --- /dev/null +++ b/src/tags.ts @@ -0,0 +1,14 @@ +/** Get the values for a tag in a `Set`. */ +function getTagSet(tags: string[][], tagName: string): Set { + const set = new Set(); + + tags.forEach((tag) => { + if (tag[0] === tagName) { + set.add(tag[1]); + } + }); + + return set; +} + +export { getTagSet };