From 8f45f3a7adea6adad239761d4f54008ba0eb60b8 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 16 Aug 2023 12:54:17 -0500 Subject: [PATCH 1/7] Move DittoFilter to @/types.ts, refactor db/events query --- src/db/events.ts | 17 ++++++----------- src/types.ts | 8 ++++++++ 2 files changed, 14 insertions(+), 11 deletions(-) create mode 100644 src/types.ts diff --git a/src/db/events.ts b/src/db/events.ts index bac26580..71bc7657 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -1,7 +1,9 @@ import { db, type TagRow } from '@/db.ts'; -import { type Filter, type Insertable } from '@/deps.ts'; +import { type Insertable } from '@/deps.ts'; import { type SignedEvent } from '@/event.ts'; +import type { DittoFilter } from '@/types.ts'; + type TagCondition = ({ event, count }: { event: SignedEvent; count: number }) => boolean; /** Conditions for when to index certain tags. */ @@ -42,19 +44,12 @@ function insertEvent(event: SignedEvent): Promise { return results; }, []); - await Promise.all(tags.map((tag) => { - return trx.insertInto('tags') - .values(tag) - .execute(); - })); + await trx.insertInto('tags') + .values(tags) + .execute(); }); } -/** Custom filter interface that extends Nostr filters with extra options for Ditto. */ -interface DittoFilter extends Filter { - local?: boolean; -} - /** Build the query for a filter. */ function getFilterQuery(filter: DittoFilter) { let query = db diff --git a/src/types.ts b/src/types.ts new file mode 100644 index 00000000..6c7feba0 --- /dev/null +++ b/src/types.ts @@ -0,0 +1,8 @@ +import { type Filter } from '@/deps.ts'; + +/** Custom filter interface that extends Nostr filters with extra options for Ditto. */ +interface DittoFilter extends Filter { + local?: boolean; +} + +export { type DittoFilter }; From 4602b85afe8cd50dfa2b0d6be6ac654e71a0b732 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 16 Aug 2023 12:57:02 -0500 Subject: [PATCH 2/7] db/events: remove unnecessary overloads --- src/db/events.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/db/events.ts b/src/db/events.ts index 71bc7657..2f291896 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -105,9 +105,7 @@ function getFilterQuery(filter: DittoFilter) { } /** Get events for filters from the database. */ -async function getFilters(filters: [DittoFilter]): Promise[]>; -async function getFilters(filters: DittoFilter[]): Promise; -async function getFilters(filters: DittoFilter[]) { +async function getFilters(filters: DittoFilter[]): Promise[]> { const queries = filters .map(getFilterQuery) .map((query) => query.execute()); @@ -115,7 +113,7 @@ async function getFilters(filters: DittoFilter[]) { const events = (await Promise.all(queries)).flat(); return events.map((event) => ( - { ...event, tags: JSON.parse(event.tags) } + { ...event, tags: JSON.parse(event.tags) } as SignedEvent )); } From 59b7a3eed8119be463475e21fc1046ff1dfbcc8d Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 16 Aug 2023 15:32:02 -0500 Subject: [PATCH 3/7] db/events: perform multiple filters with one union query, greatly simplify logic --- src/db/events.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/db/events.ts b/src/db/events.ts index 2f291896..15e24f45 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -106,11 +106,10 @@ function getFilterQuery(filter: DittoFilter) { /** Get events for filters from the database. */ async function getFilters(filters: DittoFilter[]): Promise[]> { - const queries = filters + const events = await filters .map(getFilterQuery) - .map((query) => query.execute()); - - const events = (await Promise.all(queries)).flat(); + .reduce((acc, curr) => acc.union(curr)) + .execute(); return events.map((event) => ( { ...event, tags: JSON.parse(event.tags) } as SignedEvent From d4721fb82dfc407a1260c1726174a60c55cce052 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 16 Aug 2023 16:12:27 -0500 Subject: [PATCH 4/7] Add mixer module to combine results from DB and pool --- src/client.ts | 29 +++++++++++---- src/controllers/api/accounts.ts | 5 +-- src/deps.ts | 2 +- src/mixer.ts | 64 +++++++++++++++++++++++++++++++++ 4 files changed, 91 insertions(+), 9 deletions(-) create mode 100644 src/mixer.ts diff --git a/src/client.ts b/src/client.ts index 9c5e74a6..71a644fb 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,4 +1,4 @@ -import { Author, type Filter, findReplyTag, matchFilter, RelayPool, TTLCache } from '@/deps.ts'; +import { Author, type Filter, findReplyTag, matchFilters, RelayPool, TTLCache } from '@/deps.ts'; import { type Event, type SignedEvent } from '@/event.ts'; import { Conf } from './config.ts'; @@ -34,16 +34,16 @@ interface GetFilterOpts { } /** Get events from a NIP-01 filter. */ -function getFilter(filter: Filter, opts: GetFilterOpts = {}): Promise[]> { +function getFilters(filters: Filter[], opts: GetFilterOpts = {}): Promise[]> { return new Promise((resolve) => { let tid: number; const results: SignedEvent[] = []; const unsub = getPool().subscribe( - [filter], + filters, Conf.poolRelays, (event: SignedEvent | null) => { - if (event && matchFilter(filter, event)) { + if (event && matchFilters(filters, event)) { results.push({ id: event.id, kind: event.kind, @@ -54,7 +54,8 @@ function getFilter(filter: Filter, opts: GetFilterOpts = {} sig: event.sig, }); } - if (filter.limit && results.length >= filter.limit) { + // HACK + if (filters.length === 1 && filters[0].limit && results.length >= filters[0].limit) { unsub(); clearTimeout(tid); resolve(results as SignedEvent[]); @@ -77,6 +78,11 @@ function getFilter(filter: Filter, opts: GetFilterOpts = {} }); } +/** @deprecated Use `getFilters` instead. */ +function getFilter(filter: Filter, opts: GetFilterOpts = {}): Promise[]> { + return getFilters([filter], opts); +} + /** Get a Nostr event by its ID. */ const getEvent = async (id: string, kind?: K): Promise | undefined> => { const event = await (getPool().getEventById(id, Conf.poolRelays, 0) as Promise); @@ -169,4 +175,15 @@ function publish(event: SignedEvent, relays = Conf.publishRelays): void { } } -export { getAncestors, getAuthor, getDescendants, getEvent, getFeed, getFilter, getFollows, getPublicFeed, publish }; +export { + getAncestors, + getAuthor, + getDescendants, + getEvent, + getFeed, + getFilter, + getFilters, + getFollows, + getPublicFeed, + publish, +}; diff --git a/src/controllers/api/accounts.ts b/src/controllers/api/accounts.ts index 7c8b0a0c..d65c9e1e 100644 --- a/src/controllers/api/accounts.ts +++ b/src/controllers/api/accounts.ts @@ -1,6 +1,7 @@ import { type AppController } from '@/app.ts'; import { type Filter, findReplyTag, z } from '@/deps.ts'; -import { getAuthor, getFilter, getFollows, publish } from '@/client.ts'; +import { getAuthor, getFollows, publish } from '@/client.ts'; +import { getFilters } from '@/mixer.ts'; import { jsonMetaContentSchema } from '@/schemas/nostr.ts'; import { signEvent } from '@/sign.ts'; import { toAccount, toStatus } from '@/transformers/nostr-to-mastoapi.ts'; @@ -115,7 +116,7 @@ const accountStatusesController: AppController = async (c) => { filter['#t'] = [tagged]; } - let events = await getFilter(filter); + let events = await getFilters([filter]); events.sort(eventDateComparator); if (exclude_replies) { diff --git a/src/deps.ts b/src/deps.ts index 2c7d67d5..a65503c4 100644 --- a/src/deps.ts +++ b/src/deps.ts @@ -17,7 +17,7 @@ export { getPublicKey, getSignature, Kind, - matchFilter, + matchFilters, nip05, nip19, nip21, diff --git a/src/mixer.ts b/src/mixer.ts new file mode 100644 index 00000000..ee123ccd --- /dev/null +++ b/src/mixer.ts @@ -0,0 +1,64 @@ +import { matchFilters } from '@/deps.ts'; + +import { getFilters as getFiltersClient } from '@/client.ts'; +import { getFilters as getFiltersDB } from '@/db/events.ts'; +import { eventDateComparator } from '@/utils.ts'; + +import type { SignedEvent } from '@/event.ts'; +import type { DittoFilter } from '@/types.ts'; + +/** Get filters from the database and pool, and mix the best results together. */ +async function getFilters(filters: DittoFilter[]): Promise[]> { + const results = await Promise.allSettled([ + getFiltersClient(filters), + getFiltersDB(filters), + ]); + + const events = results + .filter((result): result is PromiseFulfilledResult[]> => result.status === 'fulfilled') + .flatMap((result) => result.value); + + return unmixEvents(events, filters); +} + +/** Combine and sort events to match the filters. */ +function unmixEvents(events: SignedEvent[], filters: DittoFilter[]): SignedEvent[] { + events = dedupeEvents(events); + events = takeNewestEvents(events); + events = events.filter((event) => matchFilters(filters, event)); + events.sort(eventDateComparator); + + return events; +} + +/** Deduplicate events by ID. */ +function dedupeEvents(events: SignedEvent[]): SignedEvent[] { + return [...new Map(events.map((event) => [event.id, event])).values()]; +} + +/** Take the newest events among replaceable ones. */ +function takeNewestEvents(events: SignedEvent[]): SignedEvent[] { + const isReplaceable = (kind: number) => + kind === 0 || kind === 3 || (10000 <= kind && kind < 20000) || (30000 <= kind && kind < 40000); + + // Group events by author and kind. + const groupedEvents = events.reduce[]>>((acc, event) => { + const key = `${event.pubkey}:${event.kind}`; + const group = acc.get(key) || []; + acc.set(key, [...group, event]); + return acc; + }, new Map()); + + // Process each group. + const processedEvents = Array.from(groupedEvents.values()).flatMap((group) => { + if (isReplaceable(group[0].kind)) { + // Sort by `created_at` and take the latest event. + return group.sort(eventDateComparator)[0]; + } + return group; + }); + + return processedEvents; +} + +export { getFilters }; From bbaf1cb012659a8ceab7511e16ee416a4cc913e7 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 16 Aug 2023 21:21:29 -0500 Subject: [PATCH 5/7] mixer: pass opts through to all getFilters functions --- src/client.ts | 14 +++++--------- src/db/events.ts | 9 ++++++--- src/mixer.ts | 11 +++++++---- src/types.ts | 7 ++++++- 4 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/client.ts b/src/client.ts index 71a644fb..ce3b9e14 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,9 +1,9 @@ +import { Conf } from '@/config.ts'; import { Author, type Filter, findReplyTag, matchFilters, RelayPool, TTLCache } from '@/deps.ts'; import { type Event, type SignedEvent } from '@/event.ts'; +import { eventDateComparator, type PaginationParams, Time } from '@/utils.ts'; -import { Conf } from './config.ts'; - -import { eventDateComparator, type PaginationParams, Time } from './utils.ts'; +import type { GetFiltersOpts } from '@/types.ts'; const db = await Deno.openKv(); @@ -29,12 +29,8 @@ function getPool(): Pool { return pool; } -interface GetFilterOpts { - timeout?: number; -} - /** Get events from a NIP-01 filter. */ -function getFilters(filters: Filter[], opts: GetFilterOpts = {}): Promise[]> { +function getFilters(filters: Filter[], opts: GetFiltersOpts = {}): Promise[]> { return new Promise((resolve) => { let tid: number; const results: SignedEvent[] = []; @@ -79,7 +75,7 @@ function getFilters(filters: Filter[], opts: GetFilterOpts } /** @deprecated Use `getFilters` instead. */ -function getFilter(filter: Filter, opts: GetFilterOpts = {}): Promise[]> { +function getFilter(filter: Filter, opts: GetFiltersOpts = {}): Promise[]> { return getFilters([filter], opts); } diff --git a/src/db/events.ts b/src/db/events.ts index 15e24f45..7bd94ce7 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -2,7 +2,7 @@ import { db, type TagRow } from '@/db.ts'; import { type Insertable } from '@/deps.ts'; import { type SignedEvent } from '@/event.ts'; -import type { DittoFilter } from '@/types.ts'; +import type { DittoFilter, GetFiltersOpts } from '@/types.ts'; type TagCondition = ({ event, count }: { event: SignedEvent; count: number }) => boolean; @@ -105,7 +105,10 @@ function getFilterQuery(filter: DittoFilter) { } /** Get events for filters from the database. */ -async function getFilters(filters: DittoFilter[]): Promise[]> { +async function getFilters( + filters: DittoFilter[], + _opts?: GetFiltersOpts, +): Promise[]> { const events = await filters .map(getFilterQuery) .reduce((acc, curr) => acc.union(curr)) @@ -116,7 +119,7 @@ async function getFilters(filters: DittoFilter[]): Promise< )); } -/** Get events for a filter from the database. */ +/** @deprecated Use `getFilters` instead. */ function getFilter(filter: DittoFilter): Promise[]> { return getFilters([filter]); } diff --git a/src/mixer.ts b/src/mixer.ts index ee123ccd..426c6125 100644 --- a/src/mixer.ts +++ b/src/mixer.ts @@ -5,13 +5,16 @@ import { getFilters as getFiltersDB } from '@/db/events.ts'; import { eventDateComparator } from '@/utils.ts'; import type { SignedEvent } from '@/event.ts'; -import type { DittoFilter } from '@/types.ts'; +import type { DittoFilter, GetFiltersOpts } from '@/types.ts'; /** Get filters from the database and pool, and mix the best results together. */ -async function getFilters(filters: DittoFilter[]): Promise[]> { +async function getFilters( + filters: DittoFilter[], + opts?: GetFiltersOpts, +): Promise[]> { const results = await Promise.allSettled([ - getFiltersClient(filters), - getFiltersDB(filters), + getFiltersClient(filters, opts), + getFiltersDB(filters, opts), ]); const events = results diff --git a/src/types.ts b/src/types.ts index 6c7feba0..e3338180 100644 --- a/src/types.ts +++ b/src/types.ts @@ -5,4 +5,9 @@ interface DittoFilter extends Filter { local?: boolean; } -export { type DittoFilter }; +/** Additional options to apply to the whole subscription. */ +interface GetFiltersOpts { + timeout?: number; +} + +export type { DittoFilter, GetFiltersOpts }; From 96641a6fa0def85f89c4c6d812c4f01d9de4ab49 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 16 Aug 2023 21:38:21 -0500 Subject: [PATCH 6/7] Move limit to GetFilterOpts --- src/client.ts | 3 +-- src/types.ts | 3 +++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/client.ts b/src/client.ts index ce3b9e14..6e298767 100644 --- a/src/client.ts +++ b/src/client.ts @@ -50,8 +50,7 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts sig: event.sig, }); } - // HACK - if (filters.length === 1 && filters[0].limit && results.length >= filters[0].limit) { + if (typeof opts.limit === 'number' && results.length >= opts.limit) { unsub(); clearTimeout(tid); resolve(results as SignedEvent[]); diff --git a/src/types.ts b/src/types.ts index e3338180..096b847a 100644 --- a/src/types.ts +++ b/src/types.ts @@ -7,7 +7,10 @@ interface DittoFilter extends Filter { /** Additional options to apply to the whole subscription. */ interface GetFiltersOpts { + /** How long to wait (in milliseconds) until aborting the request. */ timeout?: number; + /** Event limit for the whole subscription. */ + limit?: number; } export type { DittoFilter, GetFiltersOpts }; From 5515c40df34431718b204f84474906b9a0402986 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 16 Aug 2023 21:41:18 -0500 Subject: [PATCH 7/7] Remove deprecated getFilter (singular) functions --- src/client.ts | 26 +++++--------------------- src/db/events.test.ts | 12 ++++++------ src/db/events.ts | 7 +------ 3 files changed, 12 insertions(+), 33 deletions(-) diff --git a/src/client.ts b/src/client.ts index 6e298767..6959b9f6 100644 --- a/src/client.ts +++ b/src/client.ts @@ -73,11 +73,6 @@ function getFilters(filters: Filter[], opts: GetFiltersOpts }); } -/** @deprecated Use `getFilters` instead. */ -function getFilter(filter: Filter, opts: GetFiltersOpts = {}): Promise[]> { - return getFilters([filter], opts); -} - /** Get a Nostr event by its ID. */ const getEvent = async (id: string, kind?: K): Promise | undefined> => { const event = await (getPool().getEventById(id, Conf.poolRelays, 0) as Promise); @@ -102,7 +97,7 @@ const getAuthor = async (pubkey: string, timeout = 1000): Promise /** Get users the given pubkey follows. */ const getFollows = async (pubkey: string): Promise | undefined> => { - const [event] = await getFilter({ authors: [pubkey], kinds: [3] }, { timeout: 5000 }); + const [event] = await getFilters([{ authors: [pubkey], kinds: [3] }], { timeout: 5000 }); // TODO: figure out a better, more generic & flexible way to handle event cache (and timeouts?) // Prewarm cache in GET `/api/v1/accounts/verify_credentials` @@ -128,13 +123,13 @@ async function getFeed(event3: Event<3>, params: PaginationParams): Promise[]; + const results = await getFilters([filter], { timeout: 5000 }) as SignedEvent<1>[]; return results.sort(eventDateComparator); } /** Get a feed of all known text notes. */ async function getPublicFeed(params: PaginationParams): Promise[]> { - const results = await getFilter({ kinds: [1], ...params }, { timeout: 5000 }); + const results = await getFilters([{ kinds: [1], ...params }], { timeout: 5000 }); return results.sort(eventDateComparator); } @@ -157,7 +152,7 @@ async function getAncestors(event: Event<1>, result = [] as Event<1>[]): Promise } function getDescendants(eventId: string): Promise[]> { - return getFilter({ kinds: [1], '#e': [eventId], limit: 200 }, { timeout: 2000 }) as Promise[]>; + return getFilters([{ kinds: [1], '#e': [eventId] }], { limit: 200, timeout: 2000 }) as Promise[]>; } /** Publish an event to the Nostr relay. */ @@ -170,15 +165,4 @@ function publish(event: SignedEvent, relays = Conf.publishRelays): void { } } -export { - getAncestors, - getAuthor, - getDescendants, - getEvent, - getFeed, - getFilter, - getFilters, - getFollows, - getPublicFeed, - publish, -}; +export { getAncestors, getAuthor, getDescendants, getEvent, getFeed, getFilters, getFollows, getPublicFeed, publish }; diff --git a/src/db/events.test.ts b/src/db/events.test.ts index 26fe7bff..94a4e235 100644 --- a/src/db/events.test.ts +++ b/src/db/events.test.ts @@ -1,17 +1,17 @@ import event55920b75 from '~/fixtures/events/55920b75.json' assert { type: 'json' }; import { assertEquals } from '@/deps-test.ts'; -import { getFilter, insertEvent } from './events.ts'; +import { getFilters, insertEvent } from './events.ts'; Deno.test('insert and filter events', async () => { await insertEvent(event55920b75); - assertEquals(await getFilter({ kinds: [1] }), [event55920b75]); - assertEquals(await getFilter({ kinds: [3] }), []); - assertEquals(await getFilter({ since: 1691091000 }), [event55920b75]); - assertEquals(await getFilter({ until: 1691091000 }), []); + assertEquals(await getFilters([{ kinds: [1] }]), [event55920b75]); + assertEquals(await getFilters([{ kinds: [3] }]), []); + assertEquals(await getFilters([{ since: 1691091000 }]), [event55920b75]); + assertEquals(await getFilters([{ until: 1691091000 }]), []); assertEquals( - await getFilter({ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }), + await getFilters([{ '#proxy': ['https://gleasonator.com/objects/8f6fac53-4f66-4c6e-ac7d-92e5e78c3e79'] }]), [event55920b75], ); }); diff --git a/src/db/events.ts b/src/db/events.ts index 7bd94ce7..8178710f 100644 --- a/src/db/events.ts +++ b/src/db/events.ts @@ -119,11 +119,6 @@ async function getFilters( )); } -/** @deprecated Use `getFilters` instead. */ -function getFilter(filter: DittoFilter): Promise[]> { - return getFilters([filter]); -} - /** Returns whether the pubkey is followed by a local user. */ async function isLocallyFollowed(pubkey: string): Promise { return Boolean( @@ -136,4 +131,4 @@ async function isLocallyFollowed(pubkey: string): Promise { ); } -export { getFilter, getFilters, insertEvent, isLocallyFollowed }; +export { getFilters, insertEvent, isLocallyFollowed };