diff --git a/src/client.ts b/src/client.ts index 9c5e74a6..6959b9f6 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,9 +1,9 @@ -import { Author, type Filter, findReplyTag, matchFilter, RelayPool, TTLCache } from '@/deps.ts'; +import { Conf } from '@/config.ts'; +import { Author, type Filter, findReplyTag, matchFilters, RelayPool, TTLCache } from '@/deps.ts'; import { type Event, type SignedEvent } from '@/event.ts'; +import { eventDateComparator, type PaginationParams, Time } from '@/utils.ts'; -import { Conf } from './config.ts'; - -import { eventDateComparator, type PaginationParams, Time } from './utils.ts'; +import type { GetFiltersOpts } from '@/types.ts'; const db = await Deno.openKv(); @@ -29,21 +29,17 @@ function getPool(): Pool { return pool; } -interface GetFilterOpts { - timeout?: number; -} - /** Get events from a NIP-01 filter. */ -function getFilter(filter: Filter, opts: GetFilterOpts = {}): Promise[]> { +function getFilters(filters: Filter[], opts: GetFiltersOpts = {}): Promise[]> { return new Promise((resolve) => { let tid: number; const results: SignedEvent[] = []; const unsub = getPool().subscribe( - [filter], + filters, Conf.poolRelays, (event: SignedEvent | null) => { - if (event && matchFilter(filter, event)) { + if (event && matchFilters(filters, event)) { results.push({ id: event.id, kind: event.kind, @@ -54,7 +50,7 @@ function getFilter(filter: Filter, opts: GetFilterOpts = {} sig: event.sig, }); } - if (filter.limit && results.length >= filter.limit) { + if (typeof opts.limit === 'number' && results.length >= opts.limit) { unsub(); clearTimeout(tid); resolve(results as SignedEvent[]); @@ -101,7 +97,7 @@ const getAuthor = async (pubkey: string, timeout = 1000): Promise /** Get users the given pubkey follows. */ const getFollows = async (pubkey: string): Promise | undefined> => { - const [event] = await getFilter({ authors: [pubkey], kinds: [3] }, { timeout: 5000 }); + const [event] = await getFilters([{ authors: [pubkey], kinds: [3] }], { timeout: 5000 }); // TODO: figure out a better, more generic & flexible way to handle event cache (and timeouts?) // Prewarm cache in GET `/api/v1/accounts/verify_credentials` @@ -127,13 +123,13 @@ async function getFeed(event3: Event<3>, params: PaginationParams): Promise[]; + const results = await getFilters([filter], { timeout: 5000 }) as SignedEvent<1>[]; return results.sort(eventDateComparator); } /** Get a feed of all known text notes. */ async function getPublicFeed(params: PaginationParams): Promise[]> { - const results = await getFilter({ kinds: [1], ...params }, { timeout: 5000 }); + const results = await getFilters([{ kinds: [1], ...params }], { timeout: 5000 }); return results.sort(eventDateComparator); } @@ -156,7 +152,7 @@ async function getAncestors(event: Event<1>, result = [] as Event<1>[]): Promise } function getDescendants(eventId: string): Promise[]> { - return getFilter({ kinds: [1], '#e': [eventId], limit: 200 }, { timeout: 2000 }) as Promise[]>; + return getFilters([{ kinds: [1], '#e': [eventId] }], { limit: 200, timeout: 2000 }) as Promise[]>; } /** Publish an event to the Nostr relay. */ @@ -169,4 +165,4 @@ function publish(event: SignedEvent, relays = Conf.publishRelays): void { } } -export { getAncestors, getAuthor, getDescendants, getEvent, getFeed, getFilter, getFollows, getPublicFeed, publish }; +export { getAncestors, getAuthor, getDescendants, getEvent, getFeed, getFilters, getFollows, getPublicFeed, publish }; diff --git a/src/controllers/api/accounts.ts b/src/controllers/api/accounts.ts index 7c8b0a0c..d65c9e1e 100644 --- a/src/controllers/api/accounts.ts +++ b/src/controllers/api/accounts.ts @@ -1,6 +1,7 @@ import { type AppController } from '@/app.ts'; import { type Filter, findReplyTag, z } from '@/deps.ts'; -import { getAuthor, getFilter, getFollows, publish } from '@/client.ts'; +import { getAuthor, getFollows, publish } from '@/client.ts'; +import { getFilters } from '@/mixer.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { signEvent } from '@/sign.ts'; import { toAccount, toStatus } from '@/transformers/nostr-to-mastoapi.ts'; @@ -115,7 +116,7 @@ const accountStatusesController: AppController = async (c) => { filter['#t'] = [tagged]; } - let events = await getFilter(filter); + let events = await getFilters([filter]); events.sort(eventDateComparator); if (exclude_replies) { diff --git a/src/db/events.test.ts b/src/db/events.test.ts index 26fe7bff..94a4e235 100644 --- a/src/db/events.test.ts +++ b/src/db/events.test.ts @@ -1,17 +1,17 @@ import event55920b75 from '~/fixtures/events/55920b75.json' assert { type: 'json' }; import { assertEquals } from '@/deps-test.ts'; -import { getFilter, insertEvent } from './events.ts'; +import { getFilters, insertEvent } from './events.ts'; Deno.test('insert and filter events', async () => { await insertEvent(event55920b75); - assertEquals(await getFilter({ kinds: [1] }), [event55920b75]); - assertEquals(await getFilter({ kinds: [3] }), []); - assertEquals(await getFilter({ since: 1691091000 }), [event55920b75]); - assertEquals(await getFilter({ until: 1691091000 }), []); + assertEquals(await getFilters([{ kinds: [1] }]), [event55920b75]); + assertEquals(await getFilters([{ kinds: [3] }]), []); + assertEquals(await getFilters([{ since: 1691091000 }]), [event55920b75]); + assertEquals(await getFilters([{ until: 1691091000 }]), []); assertEquals( - await getFilter({ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }), + await getFilters([{ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }]), [event55920b75], ); }); diff --git a/src/db/events.ts b/src/db/events.ts index bac26580..8178710f 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -1,7 +1,9 @@ import { db, type TagRow } from '@/db.ts'; -import { type Filter, type Insertable } from '@/deps.ts'; +import { type Insertable } from '@/deps.ts'; import { type SignedEvent } from '@/event.ts'; +import type { DittoFilter, GetFiltersOpts } from '@/types.ts'; + type TagCondition = ({ event, count }: { event: SignedEvent; count: number }) => boolean; /** Conditions for when to index certain tags. */ @@ -42,19 +44,12 @@ function insertEvent(event: SignedEvent): Promise { return results; }, []); - await Promise.all(tags.map((tag) => { - return trx.insertInto('tags') - .values(tag) - .execute(); - })); + await trx.insertInto('tags') + .values(tags) + .execute(); }); } -/** Custom filter interface that extends Nostr filters with extra options for Ditto. */ -interface DittoFilter extends Filter { - local?: boolean; -} - /** Build the query for a filter. */ function getFilterQuery(filter: DittoFilter) { let query = db @@ -110,25 +105,20 @@ function getFilterQuery(filter: DittoFilter) { } /** Get events for filters from the database. */ -async function getFilters(filters: [DittoFilter]): Promise[]>; -async function getFilters(filters: DittoFilter[]): Promise; -async function getFilters(filters: DittoFilter[]) { - const queries = filters +async function getFilters( + filters: DittoFilter[], + _opts?: GetFiltersOpts, +): Promise[]> { + const events = await filters .map(getFilterQuery) - .map((query) => query.execute()); - - const events = (await Promise.all(queries)).flat(); + .reduce((acc, curr) => acc.union(curr)) + .execute(); return events.map((event) => ( - { ...event, tags: JSON.parse(event.tags) } + { ...event, tags: JSON.parse(event.tags) } as SignedEvent )); } -/** Get events for a filter from the database. */ -function getFilter(filter: DittoFilter): Promise[]> { - return getFilters([filter]); -} - /** Returns whether the pubkey is followed by a local user. */ async function isLocallyFollowed(pubkey: string): Promise { return Boolean( @@ -141,4 +131,4 @@ async function isLocallyFollowed(pubkey: string): Promise { ); } -export { getFilter, getFilters, insertEvent, isLocallyFollowed }; +export { getFilters, insertEvent, isLocallyFollowed }; diff --git a/src/deps.ts b/src/deps.ts index 2c7d67d5..a65503c4 100644 --- a/src/deps.ts +++ b/src/deps.ts @@ -17,7 +17,7 @@ export { getPublicKey, getSignature, Kind, - matchFilter, + matchFilters, nip05, nip19, nip21, diff --git a/src/mixer.ts b/src/mixer.ts new file mode 100644 index 00000000..426c6125 --- /dev/null +++ b/src/mixer.ts @@ -0,0 +1,67 @@ +import { matchFilters } from '@/deps.ts'; + +import { getFilters as getFiltersClient } from '@/client.ts'; +import { getFilters as getFiltersDB } from '@/db/events.ts'; +import { eventDateComparator } from '@/utils.ts'; + +import type { SignedEvent } from '@/event.ts'; +import type { DittoFilter, GetFiltersOpts } from '@/types.ts'; + +/** Get filters from the database and pool, and mix the best results together. */ +async function getFilters( + filters: DittoFilter[], + opts?: GetFiltersOpts, +): Promise[]> { + const results = await Promise.allSettled([ + getFiltersClient(filters, opts), + getFiltersDB(filters, opts), + ]); + + const events = results + .filter((result): result is PromiseFulfilledResult[]> => result.status === 'fulfilled') + .flatMap((result) => result.value); + + return unmixEvents(events, filters); +} + +/** Combine and sort events to match the filters. */ +function unmixEvents(events: SignedEvent[], filters: DittoFilter[]): SignedEvent[] { + events = dedupeEvents(events); + events = takeNewestEvents(events); + events = events.filter((event) => matchFilters(filters, event)); + events.sort(eventDateComparator); + + return events; +} + +/** Deduplicate events by ID. */ +function dedupeEvents(events: SignedEvent[]): SignedEvent[] { + return [...new Map(events.map((event) => [event.id, event])).values()]; +} + +/** Take the newest events among replaceable ones. */ +function takeNewestEvents(events: SignedEvent[]): SignedEvent[] { + const isReplaceable = (kind: number) => + kind === 0 || kind === 3 || (10000 <= kind && kind < 20000) || (30000 <= kind && kind < 40000); + + // Group events by author and kind. + const groupedEvents = events.reduce[]>>((acc, event) => { + const key = `${event.pubkey}:${event.kind}`; + const group = acc.get(key) || []; + acc.set(key, [...group, event]); + return acc; + }, new Map()); + + // Process each group. + const processedEvents = Array.from(groupedEvents.values()).flatMap((group) => { + if (isReplaceable(group[0].kind)) { + // Sort by `created_at` and take the latest event. + return group.sort(eventDateComparator)[0]; + } + return group; + }); + + return processedEvents; +} + +export { getFilters }; diff --git a/src/types.ts b/src/types.ts new file mode 100644 index 00000000..096b847a --- /dev/null +++ b/src/types.ts @@ -0,0 +1,16 @@ +import { type Filter } from '@/deps.ts'; + +/** Custom filter interface that extends Nostr filters with extra options for Ditto. */ +interface DittoFilter extends Filter { + local?: boolean; +} + +/** Additional options to apply to the whole subscription. */ +interface GetFiltersOpts { + /** How long to wait (in milliseconds) until aborting the request. */ + timeout?: number; + /** Event limit for the whole subscription. */ + limit?: number; +} + +export type { DittoFilter, GetFiltersOpts };