diff --git a/scripts/db.ts b/scripts/db.ts index cbaca91f..7893fd59 100644 --- a/scripts/db.ts +++ b/scripts/db.ts @@ -39,6 +39,6 @@ async function usersToEvents() { created_at: Math.floor(new Date(row.inserted_at).getTime() / 1000), }); - await eventsDB.storeEvent(event); + await eventsDB.add(event); } } diff --git a/src/client.ts b/src/client.ts deleted file mode 100644 index 34d45441..00000000 --- a/src/client.ts +++ /dev/null @@ -1,74 +0,0 @@ -import { Debug, type Event, type Filter, matchFilters } from '@/deps.ts'; -import { normalizeFilters } from '@/filter.ts'; -import * as pipeline from '@/pipeline.ts'; -import { activeRelays, pool } from '@/pool.ts'; -import { type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/storages/types.ts'; -import { EventSet } from '@/utils/event-set.ts'; - -const debug = Debug('ditto:client'); - -/** Get events from a NIP-01 filter. */ -function getEvents(filters: Filter[], opts: GetEventsOpts = {}): Promise[]> { - filters = normalizeFilters(filters); - - if (opts.signal?.aborted) return Promise.resolve([]); - if (!filters.length) return Promise.resolve([]); - - debug('REQ', JSON.stringify(filters)); - - return new Promise((resolve) => { - const results = new EventSet>(); - - const unsub = pool.subscribe( - filters, - opts.relays ?? activeRelays, - (event: Event | null) => { - if (event && matchFilters(filters, event)) { - pipeline.handleEvent(event).catch(() => {}); - results.add({ - id: event.id, - kind: event.kind as K, - pubkey: event.pubkey, - content: event.content, - tags: event.tags, - created_at: event.created_at, - sig: event.sig, - }); - } - if (typeof opts.limit === 'number' && results.size >= opts.limit) { - unsub(); - resolve([...results]); - } - }, - undefined, - () => { - unsub(); - resolve([...results]); - }, - ); - - opts.signal?.addEventListener('abort', () => { - unsub(); - resolve([...results]); - }); - }); -} - -/** Publish an event to the given relays, or the entire pool. */ -function storeEvent(event: Event, opts: StoreEventOpts = {}): Promise { - const { relays = activeRelays } = opts; - const debug = Debug('ditto:client:publish'); - debug('EVENT', event); - pool.publish(event, relays); - return Promise.resolve(); -} - -const client: EventStore = { - supportedNips: [1], - getEvents, - storeEvent, - countEvents: () => Promise.reject(new Error('COUNT not implemented')), - deleteEvents: () => Promise.reject(new Error('Cannot delete events from relays. Create a kind 5 event instead.')), -}; - -export { client }; diff --git a/src/controllers/api/accounts.ts b/src/controllers/api/accounts.ts index 34518cda..2424deb0 100644 --- a/src/controllers/api/accounts.ts +++ b/src/controllers/api/accounts.ts @@ -134,7 +134,7 @@ const accountStatusesController: AppController = async (c) => { const { pinned, limit, exclude_replies, tagged } = accountStatusesQuerySchema.parse(c.req.query()); if (pinned) { - const [pinEvent] = await eventsDB.getEvents([{ kinds: [10001], authors: [pubkey], limit: 1 }]); + const [pinEvent] = await eventsDB.filter([{ kinds: [10001], authors: [pubkey], limit: 1 }]); if (pinEvent) { const pinnedEventIds = getTagSet(pinEvent.tags, 'e'); return renderStatuses(c, [...pinnedEventIds].reverse()); @@ -156,7 +156,7 @@ const accountStatusesController: AppController = async (c) => { filter['#t'] = [tagged]; } - let events = await eventsDB.getEvents([filter]); + let events = await eventsDB.filter([filter]); if (exclude_replies) { events = events.filter((event) => !findReplyTag(event)); @@ -293,7 +293,7 @@ const favouritesController: AppController = async (c) => { const pubkey = c.get('pubkey')!; const params = paginationSchema.parse(c.req.query()); - const events7 = await eventsDB.getEvents( + const events7 = await eventsDB.filter( [{ kinds: [7], authors: [pubkey], ...params }], { signal: AbortSignal.timeout(1000) }, ); @@ -302,7 +302,7 @@ const favouritesController: AppController = async (c) => { .map((event) => event.tags.find((tag) => tag[0] === 'e')?.[1]) .filter((id): id is string => !!id); - const events1 = await eventsDB.getEvents( + const events1 = await eventsDB.filter( [{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'] }], { signal: AbortSignal.timeout(1000), diff --git a/src/controllers/api/admin.ts b/src/controllers/api/admin.ts index b90c0514..32a97333 100644 --- a/src/controllers/api/admin.ts +++ b/src/controllers/api/admin.ts @@ -39,9 +39,9 @@ const adminAccountsController: AppController = async (c) => { const { since, until, limit } = paginationSchema.parse(c.req.query()); - const events = await eventsDB.getEvents([{ kinds: [30361], authors: [Conf.pubkey], since, until, limit }]); + const events = await eventsDB.filter([{ kinds: [30361], authors: [Conf.pubkey], since, until, limit }]); const pubkeys = events.map((event) => event.tags.find(([name]) => name === 'd')?.[1]!); - const authors = await eventsDB.getEvents([{ kinds: [0], authors: pubkeys }]); + const authors = await eventsDB.filter([{ kinds: [0], authors: pubkeys }]); for (const event of events) { const d = event.tags.find(([name]) => name === 'd')?.[1]; diff --git a/src/controllers/api/blocks.ts b/src/controllers/api/blocks.ts index b45e1c6d..84229151 100644 --- a/src/controllers/api/blocks.ts +++ b/src/controllers/api/blocks.ts @@ -7,7 +7,7 @@ import { renderAccounts } from '@/views.ts'; const blocksController: AppController = async (c) => { const pubkey = c.get('pubkey')!; - const [event10000] = await eventsDB.getEvents([ + const [event10000] = await eventsDB.filter([ { kinds: [10000], authors: [pubkey], limit: 1 }, ]); diff --git a/src/controllers/api/bookmarks.ts b/src/controllers/api/bookmarks.ts index 81cd011d..5722d97b 100644 --- a/src/controllers/api/bookmarks.ts +++ b/src/controllers/api/bookmarks.ts @@ -7,7 +7,7 @@ import { renderStatuses } from '@/views.ts'; const bookmarksController: AppController = async (c) => { const pubkey = c.get('pubkey')!; - const [event10003] = await eventsDB.getEvents([ + const [event10003] = await eventsDB.filter([ { kinds: [10003], authors: [pubkey], limit: 1 }, ]); diff --git a/src/controllers/api/notifications.ts b/src/controllers/api/notifications.ts index 43c974ac..c618a6a8 100644 --- a/src/controllers/api/notifications.ts +++ b/src/controllers/api/notifications.ts @@ -7,7 +7,7 @@ const notificationsController: AppController = async (c) => { const pubkey = c.get('pubkey')!; const { since, until } = paginationSchema.parse(c.req.query()); - const events = await eventsDB.getEvents( + const events = await eventsDB.filter( [{ kinds: [1], '#p': [pubkey], since, until }], { signal: AbortSignal.timeout(3000) }, ); diff --git a/src/controllers/api/pleroma.ts b/src/controllers/api/pleroma.ts index 04370e43..91c6cc7c 100644 --- a/src/controllers/api/pleroma.ts +++ b/src/controllers/api/pleroma.ts @@ -6,7 +6,7 @@ import { createAdminEvent } from '@/utils/api.ts'; import { Conf } from '@/config.ts'; const frontendConfigController: AppController = async (c) => { - const [event] = await eventsDB.getEvents([{ + const [event] = await eventsDB.filter([{ kinds: [30078], authors: [Conf.pubkey], '#d': ['pub.ditto.frontendConfig'], diff --git a/src/controllers/api/search.ts b/src/controllers/api/search.ts index d18694ca..5feeef67 100644 --- a/src/controllers/api/search.ts +++ b/src/controllers/api/search.ts @@ -78,7 +78,7 @@ function searchEvents({ q, type, limit, account_id }: SearchQuery, signal: Abort filter.authors = [account_id]; } - return searchStore.getEvents([filter], { signal }); + return searchStore.filter([filter], { signal }); } /** Get event kinds to search from `type` query param. */ @@ -96,7 +96,7 @@ function typeToKinds(type: SearchQuery['type']): number[] { /** Resolve a searched value into an event, if applicable. */ async function lookupEvent(query: SearchQuery, signal: AbortSignal): Promise { const filters = await getLookupFilters(query); - const [event] = await searchStore.getEvents(filters, { limit: 1, signal }); + const [event] = await searchStore.filter(filters, { limit: 1, signal }); return event; } diff --git a/src/controllers/api/timelines.ts b/src/controllers/api/timelines.ts index 47496a00..00a4a55f 100644 --- a/src/controllers/api/timelines.ts +++ b/src/controllers/api/timelines.ts @@ -33,7 +33,7 @@ const hashtagTimelineController: AppController = (c) => { /** Render statuses for timelines. */ async function renderStatuses(c: AppContext, filters: DittoFilter<1>[], signal = AbortSignal.timeout(1000)) { - const events = await eventsDB.getEvents( + const events = await eventsDB.filter( filters.map((filter) => ({ ...filter, relations: ['author', 'event_stats', 'author_stats'] })), { signal }, ); diff --git a/src/controllers/nostr/relay.ts b/src/controllers/nostr/relay.ts index f1c35111..cc21b3c2 100644 --- a/src/controllers/nostr/relay.ts +++ b/src/controllers/nostr/relay.ts @@ -63,7 +63,7 @@ function connectStream(socket: WebSocket) { async function handleReq([_, subId, ...rest]: ClientREQ): Promise { const filters = prepareFilters(rest); - for (const event of await eventsDB.getEvents(filters, { limit: FILTER_LIMIT })) { + for (const event of await eventsDB.filter(filters, { limit: FILTER_LIMIT })) { send(['EVENT', subId, event]); } @@ -96,7 +96,7 @@ function connectStream(socket: WebSocket) { /** Handle COUNT. Return the number of events matching the filters. */ async function handleCount([_, subId, ...rest]: ClientCOUNT): Promise { - const count = await eventsDB.countEvents(prepareFilters(rest)); + const count = await eventsDB.count(prepareFilters(rest)); send(['COUNT', subId, { count, approximate: false }]); } diff --git a/src/db/users.ts b/src/db/users.ts index 62340e1e..53f5ff88 100644 --- a/src/db/users.ts +++ b/src/db/users.ts @@ -65,7 +65,7 @@ async function findUser(user: Partial): Promise { } } - const [event] = await eventsDB.getEvents([filter]); + const [event] = await eventsDB.filter([filter]); if (event) { return { diff --git a/src/pipeline.ts b/src/pipeline.ts index a237b72b..79b88c7b 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -1,4 +1,3 @@ -import { client } from '@/client.ts'; import { Conf } from '@/config.ts'; import { addRelays } from '@/db/relays.ts'; import { deleteAttachedMedia } from '@/db/unattached-media.ts'; @@ -6,9 +5,8 @@ import { findUser } from '@/db/users.ts'; import { Debug, type Event } from '@/deps.ts'; import { isEphemeralKind } from '@/kinds.ts'; import { isLocallyFollowed } from '@/queries.ts'; -import { reqmeister } from '@/reqmeister.ts'; import { updateStats } from '@/stats.ts'; -import { eventsDB, memorelay } from '@/storages.ts'; +import { client, eventsDB, memorelay, reqmeister } from '@/storages.ts'; import { Sub } from '@/subs.ts'; import { getTagSet } from '@/tags.ts'; import { type EventData } from '@/types.ts'; @@ -43,9 +41,9 @@ async function handleEvent(event: Event): Promise { /** Encounter the event, and return whether it has already been encountered. */ async function encounterEvent(event: Event): Promise { - const preexisting = (await memorelay.countEvents([{ ids: [event.id] }])) > 0; - memorelay.storeEvent(event); - reqmeister.encounter(event); + const preexisting = (await memorelay.count([{ ids: [event.id] }])) > 0; + memorelay.add(event); + reqmeister.add(event); return preexisting; } @@ -68,7 +66,7 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = const { force = false } = opts; if (force || data.user || isAdminEvent(event) || await isLocallyFollowed(event.pubkey)) { - const [deletion] = await eventsDB.getEvents( + const [deletion] = await eventsDB.filter( [{ kinds: [5], authors: [event.pubkey], '#e': [event.id], limit: 1 }], { limit: 1 }, ); @@ -77,7 +75,7 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = return Promise.reject(new RelayError('blocked', 'event was deleted')); } else { await Promise.all([ - eventsDB.storeEvent(event, { data }).catch(debug), + eventsDB.add(event, { data }).catch(debug), updateStats(event).catch(debug), ]); } @@ -90,13 +88,13 @@ async function storeEvent(event: Event, data: EventData, opts: StoreEventOpts = async function processDeletions(event: Event): Promise { if (event.kind === 5) { const ids = getTagSet(event.tags, 'e'); - const events = await eventsDB.getEvents([{ ids: [...ids] }]); + const events = await eventsDB.filter([{ ids: [...ids] }]); const deleteIds = events .filter(({ pubkey, id }) => pubkey === event.pubkey && ids.has(id)) .map((event) => event.id); - await eventsDB.deleteEvents([{ ids: deleteIds }]); + await eventsDB.deleteFilters([{ ids: deleteIds }]); } } @@ -141,7 +139,7 @@ function fetchRelatedEvents(event: Event, data: EventData) { reqmeister.req({ kinds: [0], authors: [event.pubkey] }).catch(() => {}); } for (const [name, id, relay] of event.tags) { - if (name === 'e' && !memorelay.countEvents([{ ids: [id] }])) { + if (name === 'e' && !memorelay.count([{ ids: [id] }])) { reqmeister.req({ ids: [id] }, { relays: [relay] }).catch(() => {}); } } @@ -175,7 +173,7 @@ function broadcast(event: Event, data: EventData) { if (!data.user || !isFresh(event)) return; if (event.kind === 5) { - client.storeEvent(event); + client.add(event); } } diff --git a/src/queries.ts b/src/queries.ts index 56a8b679..3dc27d74 100644 --- a/src/queries.ts +++ b/src/queries.ts @@ -1,7 +1,6 @@ -import { eventsDB, memorelay } from '@/storages.ts'; +import { eventsDB, memorelay, reqmeister } from '@/storages.ts'; import { Debug, type Event, findReplyTag } from '@/deps.ts'; import { type AuthorMicrofilter, type DittoFilter, type IdMicrofilter, type Relation } from '@/filter.ts'; -import { reqmeister } from '@/reqmeister.ts'; import { type DittoEvent } from '@/storages/types.ts'; import { getTagSet } from '@/tags.ts'; @@ -25,7 +24,7 @@ const getEvent = async ( const { kind, relations, signal = AbortSignal.timeout(1000) } = opts; const microfilter: IdMicrofilter = { ids: [id] }; - const [memoryEvent] = await memorelay.getEvents([microfilter], opts) as DittoEvent[]; + const [memoryEvent] = await memorelay.filter([microfilter], opts) as DittoEvent[]; if (memoryEvent && !relations) { debug(`getEvent: ${id.slice(0, 8)} found in memory`); @@ -37,13 +36,13 @@ const getEvent = async ( filter.kinds = [kind]; } - const dbEvent = await eventsDB.getEvents([filter], { limit: 1, signal }) + const dbEvent = await eventsDB.filter([filter], { limit: 1, signal }) .then(([event]) => event); // TODO: make this DRY-er. if (dbEvent && !dbEvent.author) { - const [author] = await memorelay.getEvents([{ kinds: [0], authors: [dbEvent.pubkey] }], opts); + const [author] = await memorelay.filter([{ kinds: [0], authors: [dbEvent.pubkey] }], opts); dbEvent.author = author; } @@ -53,7 +52,7 @@ const getEvent = async ( } if (memoryEvent && !memoryEvent.author) { - const [author] = await memorelay.getEvents([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts); + const [author] = await memorelay.filter([{ kinds: [0], authors: [memoryEvent.pubkey] }], opts); memoryEvent.author = author; } @@ -77,13 +76,13 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise event); @@ -96,7 +95,7 @@ const getAuthor = async (pubkey: string, opts: GetEventOpts<0> = {}): Promise | undefined> => { - const [event] = await eventsDB.getEvents([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal }); + const [event] = await eventsDB.filter([{ authors: [pubkey], kinds: [3], limit: 1 }], { limit: 1, signal }); return event; }; @@ -132,7 +131,7 @@ async function getAncestors(event: Event<1>, result = [] as Event<1>[]): Promise } function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Promise[]> { - return eventsDB.getEvents( + return eventsDB.filter( [{ kinds: [1], '#e': [eventId], relations: ['author', 'event_stats', 'author_stats'] }], { limit: 200, signal }, ); @@ -140,7 +139,7 @@ function getDescendants(eventId: string, signal = AbortSignal.timeout(2000)): Pr /** Returns whether the pubkey is followed by a local user. */ async function isLocallyFollowed(pubkey: string): Promise { - const [event] = await eventsDB.getEvents([{ kinds: [3], '#p': [pubkey], local: true, limit: 1 }], { limit: 1 }); + const [event] = await eventsDB.filter([{ kinds: [3], '#p': [pubkey], local: true, limit: 1 }], { limit: 1 }); return Boolean(event); } diff --git a/src/stats.ts b/src/stats.ts index fd08cf52..568fac61 100644 --- a/src/stats.ts +++ b/src/stats.ts @@ -125,7 +125,7 @@ function eventStatsQuery(diffs: EventStatDiff[]) { /** Get the last version of the event, if any. */ async function maybeGetPrev(event: Event): Promise> { - const [prev] = await eventsDB.getEvents([ + const [prev] = await eventsDB.filter([ { kinds: [event.kind], authors: [event.pubkey], limit: 1 }, ]); diff --git a/src/storages.ts b/src/storages.ts index db00b519..0388518a 100644 --- a/src/storages.ts +++ b/src/storages.ts @@ -1,10 +1,21 @@ import { Conf } from '@/config.ts'; import { db } from '@/db.ts'; +import * as pipeline from '@/pipeline.ts'; +import { activeRelays, pool } from '@/pool.ts'; import { EventsDB } from '@/storages/events-db.ts'; import { Memorelay } from '@/storages/memorelay.ts'; import { Optimizer } from '@/storages/optimizer.ts'; +import { PoolStore } from '@/storages/pool-store.ts'; +import { Reqmeister } from '@/storages/reqmeister.ts'; import { SearchStore } from '@/storages/search-store.ts'; -import { reqmeister } from '@/reqmeister.ts'; +import { Time } from '@/utils/time.ts'; + +/** Relay pool storage. */ +const client = new PoolStore({ + pool, + relays: activeRelays, + publisher: pipeline, +}); /** SQLite database to store events this Ditto server cares about. */ const eventsDB = new EventsDB(db); @@ -12,6 +23,13 @@ const eventsDB = new EventsDB(db); /** In-memory data store for cached events. */ const memorelay = new Memorelay({ max: 3000 }); +/** Batches requests for single events. */ +const reqmeister = new Reqmeister({ + client, + delay: Time.seconds(1), + timeout: Time.seconds(1), +}); + /** Main Ditto storage adapter */ const optimizer = new Optimizer({ db: eventsDB, @@ -25,4 +43,4 @@ const searchStore = new SearchStore({ fallback: optimizer, }); -export { eventsDB, memorelay, optimizer, searchStore }; +export { client, eventsDB, memorelay, optimizer, reqmeister, searchStore }; diff --git a/src/storages/events-db.test.ts b/src/storages/events-db.test.ts index 349f82b9..9b561717 100644 --- a/src/storages/events-db.test.ts +++ b/src/storages/events-db.test.ts @@ -10,37 +10,37 @@ import { EventsDB } from './events-db.ts'; const eventsDB = new EventsDB(db); Deno.test('count filters', async () => { - assertEquals(await eventsDB.countEvents([{ kinds: [1] }]), 0); - await eventsDB.storeEvent(event1); - assertEquals(await eventsDB.countEvents([{ kinds: [1] }]), 1); + assertEquals(await eventsDB.count([{ kinds: [1] }]), 0); + await eventsDB.add(event1); + assertEquals(await eventsDB.count([{ kinds: [1] }]), 1); }); Deno.test('insert and filter events', async () => { - await eventsDB.storeEvent(event1); + await eventsDB.add(event1); - assertEquals(await eventsDB.getEvents([{ kinds: [1] }]), [event1]); - assertEquals(await eventsDB.getEvents([{ kinds: [3] }]), []); - assertEquals(await eventsDB.getEvents([{ since: 1691091000 }]), [event1]); - assertEquals(await eventsDB.getEvents([{ until: 1691091000 }]), []); + assertEquals(await eventsDB.filter([{ kinds: [1] }]), [event1]); + assertEquals(await eventsDB.filter([{ kinds: [3] }]), []); + assertEquals(await eventsDB.filter([{ since: 1691091000 }]), [event1]); + assertEquals(await eventsDB.filter([{ until: 1691091000 }]), []); assertEquals( - await eventsDB.getEvents([{ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }]), + await eventsDB.filter([{ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }]), [event1], ); }); Deno.test('delete events', async () => { - await eventsDB.storeEvent(event1); - assertEquals(await eventsDB.getEvents([{ kinds: [1] }]), [event1]); - await eventsDB.deleteEvents([{ kinds: [1] }]); - assertEquals(await eventsDB.getEvents([{ kinds: [1] }]), []); + await eventsDB.add(event1); + assertEquals(await eventsDB.filter([{ kinds: [1] }]), [event1]); + await eventsDB.deleteFilters([{ kinds: [1] }]); + assertEquals(await eventsDB.filter([{ kinds: [1] }]), []); }); Deno.test('query events with local filter', async () => { - await eventsDB.storeEvent(event1); + await eventsDB.add(event1); - assertEquals(await eventsDB.getEvents([{}]), [event1]); - assertEquals(await eventsDB.getEvents([{ local: true }]), []); - assertEquals(await eventsDB.getEvents([{ local: false }]), [event1]); + assertEquals(await eventsDB.filter([{}]), [event1]); + assertEquals(await eventsDB.filter([{ local: true }]), []); + assertEquals(await eventsDB.filter([{ local: false }]), [event1]); const userEvent = await buildUserEvent({ username: 'alex', @@ -48,20 +48,20 @@ Deno.test('query events with local filter', async () => { inserted_at: new Date(), admin: false, }); - await eventsDB.storeEvent(userEvent); + await eventsDB.add(userEvent); - assertEquals(await eventsDB.getEvents([{ kinds: [1], local: true }]), [event1]); - assertEquals(await eventsDB.getEvents([{ kinds: [1], local: false }]), []); + assertEquals(await eventsDB.filter([{ kinds: [1], local: true }]), [event1]); + assertEquals(await eventsDB.filter([{ kinds: [1], local: false }]), []); }); Deno.test('inserting replaceable events', async () => { - assertEquals(await eventsDB.countEvents([{ kinds: [0], authors: [event0.pubkey] }]), 0); + assertEquals(await eventsDB.count([{ kinds: [0], authors: [event0.pubkey] }]), 0); - await eventsDB.storeEvent(event0); - await assertRejects(() => eventsDB.storeEvent(event0)); - assertEquals(await eventsDB.countEvents([{ kinds: [0], authors: [event0.pubkey] }]), 1); + await eventsDB.add(event0); + await assertRejects(() => eventsDB.add(event0)); + assertEquals(await eventsDB.count([{ kinds: [0], authors: [event0.pubkey] }]), 1); const changeEvent = { ...event0, id: '123', created_at: event0.created_at + 1 }; - await eventsDB.storeEvent(changeEvent); - assertEquals(await eventsDB.getEvents([{ kinds: [0] }]), [changeEvent]); + await eventsDB.add(changeEvent); + assertEquals(await eventsDB.filter([{ kinds: [0] }]), [changeEvent]); }); diff --git a/src/storages/events-db.ts b/src/storages/events-db.ts index 0bff2bd3..f6ed16fb 100644 --- a/src/storages/events-db.ts +++ b/src/storages/events-db.ts @@ -65,7 +65,7 @@ class EventsDB implements EventStore { } /** Insert an event (and its tags) into the database. */ - async storeEvent(event: Event, opts: StoreEventOpts = {}): Promise { + async add(event: Event, opts: StoreEventOpts = {}): Promise { this.#debug('EVENT', JSON.stringify(event)); if (isDittoInternalKind(event.kind) && event.pubkey !== Conf.pubkey) { @@ -264,7 +264,7 @@ class EventsDB implements EventStore { } /** Get events for filters from the database. */ - async getEvents(filters: DittoFilter[], opts: GetEventsOpts = {}): Promise[]> { + async filter(filters: DittoFilter[], opts: GetEventsOpts = {}): Promise[]> { filters = normalizeFilters(filters); // Improves performance of `{ kinds: [0], authors: ['...'] }` queries. if (opts.signal?.aborted) return Promise.resolve([]); @@ -337,7 +337,7 @@ class EventsDB implements EventStore { } /** Delete events based on filters from the database. */ - async deleteEvents(filters: DittoFilter[]): Promise { + async deleteFilters(filters: DittoFilter[]): Promise { if (!filters.length) return Promise.resolve(); this.#debug('DELETE', JSON.stringify(filters)); @@ -345,7 +345,7 @@ class EventsDB implements EventStore { } /** Get number of events that would be returned by filters. */ - async countEvents(filters: DittoFilter[]): Promise { + async count(filters: DittoFilter[]): Promise { if (!filters.length) return Promise.resolve(0); this.#debug('COUNT', JSON.stringify(filters)); const query = this.getEventsQuery(filters); diff --git a/src/storages/hydrate.ts b/src/storages/hydrate.ts index 4d8edcba..1e45f40e 100644 --- a/src/storages/hydrate.ts +++ b/src/storages/hydrate.ts @@ -14,7 +14,7 @@ async function hydrateEvents(opts: HydrateEventOpts): Promi if (filters.some((filter) => filter.relations?.includes('author'))) { const pubkeys = new Set([...events].map((event) => event.pubkey)); - const authors = await storage.getEvents([{ kinds: [0], authors: [...pubkeys] }], { signal }); + const authors = await storage.filter([{ kinds: [0], authors: [...pubkeys] }], { signal }); for (const event of events) { event.author = authors.find((author) => author.pubkey === event.pubkey); diff --git a/src/storages/memorelay.test.ts b/src/storages/memorelay.test.ts index fbf3ba32..536c6ebd 100644 --- a/src/storages/memorelay.test.ts +++ b/src/storages/memorelay.test.ts @@ -11,12 +11,12 @@ const memorelay = new Memorelay({ }); Deno.test('memorelay', async () => { - assertEquals(await memorelay.countEvents([{ ids: [event1.id] }]), 0); + assertEquals(await memorelay.count([{ ids: [event1.id] }]), 0); - await memorelay.storeEvent(event1); + await memorelay.add(event1); - assertEquals(await memorelay.countEvents([{ ids: [event1.id] }]), 1); + assertEquals(await memorelay.count([{ ids: [event1.id] }]), 1); - const result = await memorelay.getEvents([{ ids: [event1.id] }]); + const result = await memorelay.filter([{ ids: [event1.id] }]); assertEquals(result[0], event1); }); diff --git a/src/storages/memorelay.ts b/src/storages/memorelay.ts index bfa2a320..550faf67 100644 --- a/src/storages/memorelay.ts +++ b/src/storages/memorelay.ts @@ -26,7 +26,7 @@ class Memorelay implements EventStore { } /** Get events from memory. */ - getEvents(filters: Filter[], opts: GetEventsOpts = {}): Promise[]> { + filter(filters: Filter[], opts: GetEventsOpts = {}): Promise[]> { filters = normalizeFilters(filters); if (opts.signal?.aborted) return Promise.resolve([]); @@ -91,20 +91,20 @@ class Memorelay implements EventStore { } /** Insert an event into memory. */ - storeEvent(event: Event): Promise { + add(event: Event): Promise { this.#cache.set(event.id, event); return Promise.resolve(); } /** Count events in memory for the filters. */ - async countEvents(filters: Filter[]): Promise { - const events = await this.getEvents(filters); + async count(filters: Filter[]): Promise { + const events = await this.filter(filters); return events.length; } /** Delete events from memory. */ - async deleteEvents(filters: Filter[]): Promise { - for (const event of await this.getEvents(filters)) { + async deleteFilters(filters: Filter[]): Promise { + for (const event of await this.filter(filters)) { this.#cache.delete(event.id); } return Promise.resolve(); diff --git a/src/storages/optimizer.ts b/src/storages/optimizer.ts index e0d18fb6..966dff25 100644 --- a/src/storages/optimizer.ts +++ b/src/storages/optimizer.ts @@ -25,14 +25,14 @@ class Optimizer implements EventStore { this.#client = opts.client; } - async storeEvent(event: DittoEvent, opts?: StoreEventOpts | undefined): Promise { + async add(event: DittoEvent, opts?: StoreEventOpts | undefined): Promise { await Promise.all([ - this.#db.storeEvent(event, opts), - this.#cache.storeEvent(event, opts), + this.#db.add(event, opts), + this.#cache.add(event, opts), ]); } - async getEvents( + async filter( filters: DittoFilter[], opts: GetEventsOpts | undefined = {}, ): Promise[]> { @@ -52,7 +52,7 @@ class Optimizer implements EventStore { if (filter.ids) { this.#debug(`Filter[${i}] is an IDs filter; querying cache...`); const ids = new Set(filter.ids); - for (const event of await this.#cache.getEvents([filter], opts)) { + for (const event of await this.#cache.filter([filter], opts)) { ids.delete(event.id); results.add(event); if (results.size >= limit) return getResults(); @@ -66,7 +66,7 @@ class Optimizer implements EventStore { // Query the database for events. this.#debug('Querying database...'); - for (const dbEvent of await this.#db.getEvents(filters, opts)) { + for (const dbEvent of await this.#db.filter(filters, opts)) { results.add(dbEvent); if (results.size >= limit) return getResults(); } @@ -79,14 +79,14 @@ class Optimizer implements EventStore { // Query the cache again. this.#debug('Querying cache...'); - for (const cacheEvent of await this.#cache.getEvents(filters, opts)) { + for (const cacheEvent of await this.#cache.filter(filters, opts)) { results.add(cacheEvent); if (results.size >= limit) return getResults(); } // Finally, query the client. this.#debug('Querying client...'); - for (const clientEvent of await this.#client.getEvents(filters, opts)) { + for (const clientEvent of await this.#client.filter(filters, opts)) { results.add(clientEvent); if (results.size >= limit) return getResults(); } diff --git a/src/storages/pool-store.ts b/src/storages/pool-store.ts new file mode 100644 index 00000000..b97fe3d2 --- /dev/null +++ b/src/storages/pool-store.ts @@ -0,0 +1,92 @@ +import { Debug, type Event, type Filter, matchFilters, type RelayPoolWorker } from '@/deps.ts'; +import { normalizeFilters } from '@/filter.ts'; +import { type EventStore, type GetEventsOpts, type StoreEventOpts } from '@/storages/types.ts'; +import { EventSet } from '@/utils/event-set.ts'; + +interface PoolStoreOpts { + pool: InstanceType; + relays: WebSocket['url'][]; + publisher: { + handleEvent(event: Event): Promise; + }; +} + +class PoolStore implements EventStore { + #debug = Debug('ditto:client'); + #pool: InstanceType; + #relays: WebSocket['url'][]; + #publisher: { + handleEvent(event: Event): Promise; + }; + + supportedNips = [1]; + + constructor(opts: PoolStoreOpts) { + this.#pool = opts.pool; + this.#relays = opts.relays; + this.#publisher = opts.publisher; + } + + add(event: Event, opts: StoreEventOpts = {}): Promise { + const { relays = this.#relays } = opts; + this.#debug('EVENT', event); + this.#pool.publish(event, relays); + return Promise.resolve(); + } + + filter(filters: Filter[], opts: GetEventsOpts = {}): Promise[]> { + filters = normalizeFilters(filters); + + if (opts.signal?.aborted) return Promise.resolve([]); + if (!filters.length) return Promise.resolve([]); + + this.#debug('REQ', JSON.stringify(filters)); + + return new Promise((resolve) => { + const results = new EventSet>(); + + const unsub = this.#pool.subscribe( + filters, + opts.relays ?? this.#relays, + (event: Event | null) => { + if (event && matchFilters(filters, event)) { + this.#publisher.handleEvent(event).catch(() => {}); + results.add({ + id: event.id, + kind: event.kind as K, + pubkey: event.pubkey, + content: event.content, + tags: event.tags, + created_at: event.created_at, + sig: event.sig, + }); + } + if (typeof opts.limit === 'number' && results.size >= opts.limit) { + unsub(); + resolve([...results]); + } + }, + undefined, + () => { + unsub(); + resolve([...results]); + }, + ); + + opts.signal?.addEventListener('abort', () => { + unsub(); + resolve([...results]); + }); + }); + } + + count() { + return Promise.reject(new Error('COUNT not implemented')); + } + + deleteFilters() { + return Promise.reject(new Error('Cannot delete events from relays. Create a kind 5 event instead.')); + } +} + +export { PoolStore }; diff --git a/src/reqmeister.ts b/src/storages/reqmeister.ts similarity index 82% rename from src/reqmeister.ts rename to src/storages/reqmeister.ts index 48b1370d..b43ac14d 100644 --- a/src/reqmeister.ts +++ b/src/storages/reqmeister.ts @@ -1,4 +1,3 @@ -import { client } from '@/client.ts'; import { Debug, type Event, EventEmitter, type Filter } from '@/deps.ts'; import { AuthorMicrofilter, @@ -11,9 +10,8 @@ import { import { type EventStore, GetEventsOpts } from '@/storages/types.ts'; import { Time } from '@/utils/time.ts'; -const debug = Debug('ditto:reqmeister'); - interface ReqmeisterOpts { + client: EventStore; delay?: number; timeout?: number; } @@ -27,6 +25,8 @@ type ReqmeisterQueueItem = [string, MicroFilter, WebSocket['url'][]]; /** Batches requests to Nostr relays using microfilters. */ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => any }> implements EventStore { + #debug = Debug('ditto:reqmeister'); + #opts: ReqmeisterOpts; #queue: ReqmeisterQueueItem[] = []; #promise!: Promise; @@ -34,7 +34,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an supportedNips = []; - constructor(opts: ReqmeisterOpts = {}) { + constructor(opts: ReqmeisterOpts) { super(); this.#opts = opts; this.#tick(); @@ -49,7 +49,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an } async #perform() { - const { delay, timeout = Time.seconds(1) } = this.#opts; + const { client, delay, timeout = Time.seconds(1) } = this.#opts; await new Promise((resolve) => setTimeout(resolve, delay)); const queue = this.#queue; @@ -73,11 +73,11 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an if (wantedAuthors.size) filters.push({ kinds: [0], authors: [...wantedAuthors] }); if (filters.length) { - debug('REQ', JSON.stringify(filters)); - const events = await client.getEvents(filters, { signal: AbortSignal.timeout(timeout) }); + this.#debug('REQ', JSON.stringify(filters)); + const events = await client.filter(filters, { signal: AbortSignal.timeout(timeout) }); for (const event of events) { - this.encounter(event); + this.add(event); } } @@ -119,10 +119,11 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an }); } - encounter(event: Event): void { + add(event: Event): Promise { const filterId = getFilterId(eventToMicroFilter(event)); this.#queue = this.#queue.filter(([id]) => id !== filterId); this.emit(filterId, event); + return Promise.resolve(); } isWanted(event: Event): boolean { @@ -130,7 +131,7 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an return this.#queue.some(([id]) => id === filterId); } - getEvents(filters: Filter[], opts?: GetEventsOpts | undefined): Promise[]> { + filter(filters: Filter[], opts?: GetEventsOpts | undefined): Promise[]> { if (opts?.signal?.aborted) return Promise.resolve([]); if (!filters.length) return Promise.resolve([]); @@ -144,23 +145,13 @@ class Reqmeister extends EventEmitter<{ [filterId: string]: (event: Event) => an return Promise.all(promises); } - storeEvent(event: Event): Promise { - this.encounter(event); - return Promise.resolve(); - } - - countEvents(_filters: Filter[]): Promise { + count(_filters: Filter[]): Promise { throw new Error('COUNT not implemented.'); } - deleteEvents(_filters: Filter[]): Promise { + deleteFilters(_filters: Filter[]): Promise { throw new Error('DELETE not implemented.'); } } -const reqmeister = new Reqmeister({ - delay: Time.seconds(1), - timeout: Time.seconds(1), -}); - -export { reqmeister }; +export { Reqmeister }; diff --git a/src/storages/search-store.ts b/src/storages/search-store.ts index 5afba6df..65176fb7 100644 --- a/src/storages/search-store.ts +++ b/src/storages/search-store.ts @@ -30,11 +30,11 @@ class SearchStore implements EventStore { } } - storeEvent(_event: Event, _opts?: StoreEventOpts | undefined): Promise { + add(_event: Event, _opts?: StoreEventOpts | undefined): Promise { throw new Error('EVENT not implemented.'); } - async getEvents( + async filter( filters: DittoFilter[], opts?: GetEventsOpts | undefined, ): Promise[]> { @@ -69,15 +69,15 @@ class SearchStore implements EventStore { return hydrateEvents({ events: [...events], filters, storage: this.#hydrator, signal: opts?.signal }); } else { this.#debug(`Searching for "${query}" locally...`); - return this.#fallback.getEvents(filters, opts); + return this.#fallback.filter(filters, opts); } } - countEvents(_filters: Filter[]): Promise { + count(_filters: Filter[]): Promise { throw new Error('COUNT not implemented.'); } - deleteEvents(_filters: Filter[]): Promise { + deleteFilters(_filters: Filter[]): Promise { throw new Error('DELETE not implemented.'); } } diff --git a/src/storages/types.ts b/src/storages/types.ts index dda2205a..65a37258 100644 --- a/src/storages/types.ts +++ b/src/storages/types.ts @@ -38,13 +38,13 @@ interface EventStore { /** Indicates NIPs supported by this data store, similar to NIP-11. For example, `50` would indicate support for `search` filters. */ supportedNips: readonly number[]; /** Add an event to the store. */ - storeEvent(event: Event, opts?: StoreEventOpts): Promise; + add(event: Event, opts?: StoreEventOpts): Promise; /** Get events from filters. */ - getEvents(filters: DittoFilter[], opts?: GetEventsOpts): Promise[]>; + filter(filters: DittoFilter[], opts?: GetEventsOpts): Promise[]>; /** Get the number of events from filters. */ - countEvents(filters: DittoFilter[]): Promise; + count?(filters: DittoFilter[]): Promise; /** Delete events from filters. */ - deleteEvents(filters: DittoFilter[]): Promise; + deleteFilters?(filters: DittoFilter[]): Promise; } export type { DittoEvent, EventStore, GetEventsOpts, StoreEventOpts }; diff --git a/src/utils/api.ts b/src/utils/api.ts index dbedc6c9..af0333a0 100644 --- a/src/utils/api.ts +++ b/src/utils/api.ts @@ -51,7 +51,7 @@ async function updateEvent>( fn: (prev: Event | undefined) => E, c: AppContext, ): Promise> { - const [prev] = await eventsDB.getEvents([filter], { limit: 1 }); + const [prev] = await eventsDB.filter([filter], { limit: 1 }); return createEvent(fn(prev), c); } diff --git a/src/views.ts b/src/views.ts index 9ade6001..4995c827 100644 --- a/src/views.ts +++ b/src/views.ts @@ -11,14 +11,14 @@ async function renderEventAccounts(c: AppContext, filters: Filter[], signal = Ab return c.json([]); } - const events = await eventsDB.getEvents(filters, { signal }); + const events = await eventsDB.filter(filters, { signal }); const pubkeys = new Set(events.map(({ pubkey }) => pubkey)); if (!pubkeys.size) { return c.json([]); } - const authors = await eventsDB.getEvents( + const authors = await eventsDB.filter( [{ kinds: [0], authors: [...pubkeys], relations: ['author_stats'] }], { signal }, ); @@ -33,7 +33,7 @@ async function renderEventAccounts(c: AppContext, filters: Filter[], signal = Ab async function renderAccounts(c: AppContext, authors: string[], signal = AbortSignal.timeout(1000)) { const { since, until, limit } = paginationSchema.parse(c.req.query()); - const events = await eventsDB.getEvents( + const events = await eventsDB.filter( [{ kinds: [0], authors, relations: ['author_stats'], since, until, limit }], { signal }, ); @@ -53,7 +53,7 @@ async function renderStatuses(c: AppContext, ids: string[], signal = AbortSignal const { limit } = paginationSchema.parse(c.req.query()); - const events = await eventsDB.getEvents( + const events = await eventsDB.filter( [{ kinds: [1], ids, relations: ['author', 'event_stats', 'author_stats'], limit }], { signal }, ); diff --git a/src/views/mastodon/relationships.ts b/src/views/mastodon/relationships.ts index 43ed7b83..9f20024a 100644 --- a/src/views/mastodon/relationships.ts +++ b/src/views/mastodon/relationships.ts @@ -2,7 +2,7 @@ import { eventsDB } from '@/storages.ts'; import { hasTag } from '@/tags.ts'; async function renderRelationship(sourcePubkey: string, targetPubkey: string) { - const events = await eventsDB.getEvents([ + const events = await eventsDB.filter([ { kinds: [3], authors: [sourcePubkey], limit: 1 }, { kinds: [3], authors: [targetPubkey], limit: 1 }, { kinds: [10000], authors: [sourcePubkey], limit: 1 }, diff --git a/src/views/mastodon/statuses.ts b/src/views/mastodon/statuses.ts index 9e45f495..bb8b95e3 100644 --- a/src/views/mastodon/statuses.ts +++ b/src/views/mastodon/statuses.ts @@ -35,7 +35,7 @@ async function renderStatus(event: DittoEvent<1>, viewerPubkey?: string) { Promise.all(mentionedPubkeys.map(toMention)), firstUrl ? unfurlCardCached(firstUrl) : null, viewerPubkey - ? await eventsDB.getEvents([ + ? await eventsDB.filter([ { kinds: [6], '#e': [event.id], authors: [viewerPubkey], limit: 1 }, { kinds: [7], '#e': [event.id], authors: [viewerPubkey], limit: 1 }, { kinds: [10001], '#e': [event.id], authors: [viewerPubkey], limit: 1 },