mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 11:29:46 +00:00
Merge branch 'rm-pubsub' into 'main'
Remove pubsub storage See merge request soapbox-pub/ditto!676
This commit is contained in:
commit
e2f23e51d2
21 changed files with 376 additions and 357 deletions
|
|
@ -61,7 +61,7 @@
|
|||
"@isaacs/ttlcache": "npm:@isaacs/ttlcache@^1.4.1",
|
||||
"@negrel/webpush": "jsr:@negrel/webpush@^0.3.0",
|
||||
"@noble/secp256k1": "npm:@noble/secp256k1@^2.0.0",
|
||||
"@nostrify/db": "jsr:@nostrify/db@^0.39.0",
|
||||
"@nostrify/db": "jsr:@nostrify/db@^0.39.3",
|
||||
"@nostrify/nostrify": "jsr:@nostrify/nostrify@^0.38.1",
|
||||
"@nostrify/policies": "jsr:@nostrify/policies@^0.36.1",
|
||||
"@nostrify/types": "jsr:@nostrify/types@^0.36.0",
|
||||
|
|
|
|||
8
deno.lock
generated
8
deno.lock
generated
|
|
@ -31,7 +31,7 @@
|
|||
"jsr:@hono/hono@^4.4.6": "4.6.15",
|
||||
"jsr:@negrel/http-ece@0.6.0": "0.6.0",
|
||||
"jsr:@negrel/webpush@0.3": "0.3.0",
|
||||
"jsr:@nostrify/db@0.39": "0.39.0",
|
||||
"jsr:@nostrify/db@~0.39.3": "0.39.3",
|
||||
"jsr:@nostrify/nostrify@0.31": "0.31.0",
|
||||
"jsr:@nostrify/nostrify@0.32": "0.32.0",
|
||||
"jsr:@nostrify/nostrify@0.36": "0.36.2",
|
||||
|
|
@ -363,8 +363,8 @@
|
|||
"jsr:@std/path@0.224.0"
|
||||
]
|
||||
},
|
||||
"@nostrify/db@0.39.0": {
|
||||
"integrity": "13a88c610eb15a5dd13848d5beec9170406376c9d05299ce5e5298452a5431ac",
|
||||
"@nostrify/db@0.39.3": {
|
||||
"integrity": "d1f1104316b33e0fd3c263086b325ee49f86859abc1a966b43bb9f9a21c15429",
|
||||
"dependencies": [
|
||||
"jsr:@nostrify/nostrify@~0.38.1",
|
||||
"jsr:@nostrify/types@0.36",
|
||||
|
|
@ -2460,7 +2460,7 @@
|
|||
"jsr:@gfx/canvas-wasm@~0.4.2",
|
||||
"jsr:@hono/hono@^4.4.6",
|
||||
"jsr:@negrel/webpush@0.3",
|
||||
"jsr:@nostrify/db@0.39",
|
||||
"jsr:@nostrify/db@~0.39.3",
|
||||
"jsr:@nostrify/nostrify@~0.38.1",
|
||||
"jsr:@nostrify/policies@~0.36.1",
|
||||
"jsr:@nostrify/types@0.36",
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import type { Kysely } from 'kysely';
|
|||
|
||||
import type { DittoTables } from './DittoTables.ts';
|
||||
|
||||
export interface DittoDatabase {
|
||||
export interface DittoDatabase extends AsyncDisposable {
|
||||
readonly kysely: Kysely<DittoTables>;
|
||||
readonly poolSize: number;
|
||||
readonly availableConnections: number;
|
||||
|
|
|
|||
|
|
@ -36,6 +36,10 @@ export class DittoPglite {
|
|||
poolSize: 1,
|
||||
availableConnections: 1,
|
||||
listen,
|
||||
[Symbol.asyncDispose]: async () => {
|
||||
await pglite.close();
|
||||
await kysely.destroy();
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -54,6 +54,10 @@ export class DittoPostgres {
|
|||
return pg.connections.idle;
|
||||
},
|
||||
listen,
|
||||
[Symbol.asyncDispose]: async () => {
|
||||
await pg.end();
|
||||
await kysely.destroy();
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -140,7 +140,10 @@ Deno.test('PUT /wallet must NOT be successful: wrong request body/schema', async
|
|||
assertObjectMatch(body, { error: 'Bad schema' });
|
||||
});
|
||||
|
||||
Deno.test('PUT /wallet must NOT be successful: wallet already exists', async () => {
|
||||
Deno.test('PUT /wallet must NOT be successful: wallet already exists', {
|
||||
sanitizeOps: false,
|
||||
sanitizeResources: false,
|
||||
}, async () => {
|
||||
using _mock = mockFetch();
|
||||
await using db = await createTestDB();
|
||||
const store = db.store;
|
||||
|
|
@ -178,7 +181,10 @@ Deno.test('PUT /wallet must NOT be successful: wallet already exists', async ()
|
|||
assertEquals(body2, { error: 'You already have a wallet 😏' });
|
||||
});
|
||||
|
||||
Deno.test('GET /wallet must be successful', async () => {
|
||||
Deno.test('GET /wallet must be successful', {
|
||||
sanitizeOps: false,
|
||||
sanitizeResources: false,
|
||||
}, async () => {
|
||||
using _mock = mockFetch();
|
||||
await using db = await createTestDB();
|
||||
const store = db.store;
|
||||
|
|
|
|||
|
|
@ -123,7 +123,7 @@ async function getToken(
|
|||
encryption: 'nip44',
|
||||
pubkey: bunkerPubkey,
|
||||
signer: new NSecSigner(nip46Seckey),
|
||||
relay: await Storages.pubsub(), // TODO: Use the relays from the request.
|
||||
relay: await Storages.db(), // TODO: Use the relays from the request.
|
||||
timeout: 60_000,
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -94,8 +94,6 @@ const streamingController: AppController = async (c) => {
|
|||
const { socket, response } = Deno.upgradeWebSocket(c.req.raw, { protocol: token, idleTimeout: 30 });
|
||||
|
||||
const store = await Storages.db();
|
||||
const pubsub = await Storages.pubsub();
|
||||
|
||||
const policy = pubkey ? new MuteListPolicy(pubkey, await Storages.admin()) : undefined;
|
||||
|
||||
function send(e: StreamingEvent) {
|
||||
|
|
@ -105,9 +103,12 @@ const streamingController: AppController = async (c) => {
|
|||
}
|
||||
}
|
||||
|
||||
async function sub(filters: NostrFilter[], render: (event: NostrEvent) => Promise<StreamingEvent | undefined>) {
|
||||
async function sub(
|
||||
filter: NostrFilter & { limit: 0 },
|
||||
render: (event: NostrEvent) => Promise<StreamingEvent | undefined>,
|
||||
) {
|
||||
try {
|
||||
for await (const msg of pubsub.req(filters, { signal: controller.signal })) {
|
||||
for await (const msg of store.req([filter], { signal: controller.signal })) {
|
||||
if (msg[0] === 'EVENT') {
|
||||
const event = msg[2];
|
||||
|
||||
|
|
@ -140,7 +141,7 @@ const streamingController: AppController = async (c) => {
|
|||
const topicFilter = await topicToFilter(stream, c.req.query(), pubkey, conf.url.host);
|
||||
|
||||
if (topicFilter) {
|
||||
sub([topicFilter], async (event) => {
|
||||
sub(topicFilter, async (event) => {
|
||||
let payload: object | undefined;
|
||||
|
||||
if (event.kind === 1) {
|
||||
|
|
@ -161,7 +162,7 @@ const streamingController: AppController = async (c) => {
|
|||
}
|
||||
|
||||
if (['user', 'user:notification'].includes(stream) && pubkey) {
|
||||
sub([{ '#p': [pubkey] }], async (event) => {
|
||||
sub({ '#p': [pubkey], limit: 0 }, async (event) => {
|
||||
if (event.pubkey === pubkey) return; // skip own events
|
||||
const payload = await renderNotification(event, { viewerPubkey: pubkey });
|
||||
if (payload) {
|
||||
|
|
@ -209,23 +210,23 @@ async function topicToFilter(
|
|||
query: Record<string, string>,
|
||||
pubkey: string | undefined,
|
||||
host: string,
|
||||
): Promise<NostrFilter | undefined> {
|
||||
): Promise<(NostrFilter & { limit: 0 }) | undefined> {
|
||||
switch (topic) {
|
||||
case 'public':
|
||||
return { kinds: [1, 6, 20] };
|
||||
return { kinds: [1, 6, 20], limit: 0 };
|
||||
case 'public:local':
|
||||
return { kinds: [1, 6, 20], search: `domain:${host}` };
|
||||
return { kinds: [1, 6, 20], search: `domain:${host}`, limit: 0 };
|
||||
case 'hashtag':
|
||||
if (query.tag) return { kinds: [1, 6, 20], '#t': [query.tag] };
|
||||
if (query.tag) return { kinds: [1, 6, 20], '#t': [query.tag], limit: 0 };
|
||||
break;
|
||||
case 'hashtag:local':
|
||||
if (query.tag) return { kinds: [1, 6, 20], '#t': [query.tag], search: `domain:${host}` };
|
||||
if (query.tag) return { kinds: [1, 6, 20], '#t': [query.tag], search: `domain:${host}`, limit: 0 };
|
||||
break;
|
||||
case 'user':
|
||||
// HACK: this puts the user's entire contacts list into RAM,
|
||||
// and then calls `matchFilters` over it. Refreshing the page
|
||||
// is required after following a new user.
|
||||
return pubkey ? { kinds: [1, 6, 20], authors: [...await getFeedPubkeys(pubkey)] } : undefined;
|
||||
return pubkey ? { kinds: [1, 6, 20], authors: [...await getFeedPubkeys(pubkey)], limit: 0 } : undefined;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -23,9 +23,6 @@ import { errorJson } from '@/utils/log.ts';
|
|||
import { purifyEvent } from '@/utils/purify.ts';
|
||||
import { Time } from '@/utils/time.ts';
|
||||
|
||||
/** Limit of initial events returned for a subscription. */
|
||||
const FILTER_LIMIT = 100;
|
||||
|
||||
const limiters = {
|
||||
msg: new MemoryRateLimiter({ limit: 300, window: Time.minutes(1) }),
|
||||
req: new MultiRateLimiter([
|
||||
|
|
@ -126,11 +123,10 @@ function connectStream(socket: WebSocket, ip: string | undefined, conf: DittoCon
|
|||
controllers.set(subId, controller);
|
||||
|
||||
const store = await Storages.db();
|
||||
const pubsub = await Storages.pubsub();
|
||||
|
||||
try {
|
||||
for (const event of await store.query(filters, { limit: FILTER_LIMIT, timeout: conf.db.timeouts.relay })) {
|
||||
send(['EVENT', subId, purifyEvent(event)]);
|
||||
for await (const [verb, , ...rest] of store.req(filters, { limit: 100, timeout: conf.db.timeouts.relay })) {
|
||||
send([verb, subId, ...rest] as NostrRelayMsg);
|
||||
}
|
||||
} catch (e) {
|
||||
if (e instanceof RelayError) {
|
||||
|
|
@ -143,18 +139,6 @@ function connectStream(socket: WebSocket, ip: string | undefined, conf: DittoCon
|
|||
controllers.delete(subId);
|
||||
return;
|
||||
}
|
||||
|
||||
send(['EOSE', subId]);
|
||||
|
||||
try {
|
||||
for await (const msg of pubsub.req(filters, { signal: controller.signal })) {
|
||||
if (msg[0] === 'EVENT') {
|
||||
send(['EVENT', subId, msg[2]]);
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
controllers.delete(subId);
|
||||
}
|
||||
}
|
||||
|
||||
/** Handle EVENT. Store the event. */
|
||||
|
|
|
|||
|
|
@ -1,38 +0,0 @@
|
|||
import { Semaphore } from '@core/asyncutil';
|
||||
|
||||
import { pipelineEncounters } from '@/caches/pipelineEncounters.ts';
|
||||
import { Conf } from '@/config.ts';
|
||||
import * as pipeline from '@/pipeline.ts';
|
||||
import { Storages } from '@/storages.ts';
|
||||
import { logi } from '@soapbox/logi';
|
||||
|
||||
const sem = new Semaphore(1);
|
||||
|
||||
export async function startNotify(): Promise<void> {
|
||||
const { listen } = await Storages.database();
|
||||
const store = await Storages.db();
|
||||
|
||||
listen('nostr_event', (id) => {
|
||||
if (pipelineEncounters.has(id)) {
|
||||
logi({ level: 'debug', ns: 'ditto.notify', id, skipped: true });
|
||||
return;
|
||||
}
|
||||
|
||||
logi({ level: 'debug', ns: 'ditto.notify', id, skipped: false });
|
||||
|
||||
sem.lock(async () => {
|
||||
try {
|
||||
const signal = AbortSignal.timeout(Conf.db.timeouts.default);
|
||||
|
||||
const [event] = await store.query([{ ids: [id], limit: 1 }], { signal });
|
||||
|
||||
if (event) {
|
||||
logi({ level: 'debug', ns: 'ditto.event', source: 'notify', id: event.id, kind: event.kind });
|
||||
await pipeline.handleEvent(event, { source: 'notify', signal });
|
||||
}
|
||||
} catch {
|
||||
// Ignore
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
@ -78,8 +78,8 @@ async function handleEvent(event: DittoEvent, opts: PipelineOpts): Promise<void>
|
|||
// They are exempt from policies and other side-effects, and should be streamed out immediately.
|
||||
// If streaming fails, an error should be returned.
|
||||
if (event.kind === 24133) {
|
||||
await streamOut(event);
|
||||
return;
|
||||
const store = await Storages.db();
|
||||
await store.event(event, { signal: opts.signal });
|
||||
}
|
||||
|
||||
// Ensure the event doesn't violate the policy.
|
||||
|
|
@ -97,24 +97,6 @@ async function handleEvent(event: DittoEvent, opts: PipelineOpts): Promise<void>
|
|||
throw new RelayError('blocked', 'author is blocked');
|
||||
}
|
||||
|
||||
// Ephemeral events must throw if they are not streamed out.
|
||||
if (NKinds.ephemeral(event.kind)) {
|
||||
await Promise.all([
|
||||
streamOut(event),
|
||||
webPush(event),
|
||||
]);
|
||||
return;
|
||||
}
|
||||
|
||||
// Events received through notify are thought to already be in the database, so they only need to be streamed.
|
||||
if (opts.source === 'notify') {
|
||||
await Promise.all([
|
||||
streamOut(event),
|
||||
webPush(event),
|
||||
]);
|
||||
return;
|
||||
}
|
||||
|
||||
const kysely = await Storages.kysely();
|
||||
|
||||
try {
|
||||
|
|
@ -127,12 +109,8 @@ async function handleEvent(event: DittoEvent, opts: PipelineOpts): Promise<void>
|
|||
prewarmLinkPreview(event, opts.signal),
|
||||
generateSetEvents(event),
|
||||
])
|
||||
.then(() =>
|
||||
Promise.allSettled([
|
||||
streamOut(event),
|
||||
webPush(event),
|
||||
])
|
||||
);
|
||||
.then(() => webPush(event))
|
||||
.catch(() => {});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -165,12 +143,13 @@ async function hydrateEvent(event: DittoEvent, signal: AbortSignal): Promise<voi
|
|||
|
||||
/** Maybe store the event, if eligible. */
|
||||
async function storeEvent(event: NostrEvent, signal?: AbortSignal): Promise<undefined> {
|
||||
if (NKinds.ephemeral(event.kind)) return;
|
||||
const store = await Storages.db();
|
||||
|
||||
try {
|
||||
await store.transaction(async (store, kysely) => {
|
||||
if (!NKinds.ephemeral(event.kind)) {
|
||||
await updateStats({ event, store, kysely });
|
||||
}
|
||||
await store.event(event, { signal });
|
||||
});
|
||||
} catch (e) {
|
||||
|
|
@ -274,16 +253,6 @@ function isFresh(event: NostrEvent): boolean {
|
|||
return eventAge(event) < Time.minutes(1);
|
||||
}
|
||||
|
||||
/** Distribute the event through active subscriptions. */
|
||||
async function streamOut(event: NostrEvent): Promise<void> {
|
||||
if (!isFresh(event)) {
|
||||
throw new RelayError('invalid', 'event too old');
|
||||
}
|
||||
|
||||
const pubsub = await Storages.pubsub();
|
||||
await pubsub.event(event);
|
||||
}
|
||||
|
||||
async function webPush(event: NostrEvent): Promise<void> {
|
||||
if (!isFresh(event)) {
|
||||
throw new RelayError('invalid', 'event too old');
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ export class ConnectSigner implements NostrSigner {
|
|||
encryption: 'nip44',
|
||||
pubkey: this.opts.bunkerPubkey,
|
||||
// TODO: use a remote relay for `nprofile` signing (if present and `Conf.relay` isn't already in the list)
|
||||
relay: await Storages.pubsub(),
|
||||
relay: await Storages.db(),
|
||||
signer,
|
||||
timeout: 60_000,
|
||||
});
|
||||
|
|
|
|||
|
|
@ -2,16 +2,11 @@
|
|||
import { Conf } from '@/config.ts';
|
||||
import { cron } from '@/cron.ts';
|
||||
import { startFirehose } from '@/firehose.ts';
|
||||
import { startNotify } from '@/notify.ts';
|
||||
|
||||
if (Conf.firehoseEnabled) {
|
||||
startFirehose();
|
||||
}
|
||||
|
||||
if (Conf.notifyEnabled) {
|
||||
startNotify();
|
||||
}
|
||||
|
||||
if (Conf.cronEnabled) {
|
||||
cron();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,23 +1,20 @@
|
|||
// deno-lint-ignore-file require-await
|
||||
import { type DittoDatabase, DittoDB } from '@ditto/db';
|
||||
import { internalSubscriptionsSizeGauge } from '@ditto/metrics';
|
||||
import { NPool, NRelay1 } from '@nostrify/nostrify';
|
||||
import { logi } from '@soapbox/logi';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
import { wsUrlSchema } from '@/schema.ts';
|
||||
import { AdminStore } from '@/storages/AdminStore.ts';
|
||||
import { EventsDB } from '@/storages/EventsDB.ts';
|
||||
import { InternalRelay } from '@/storages/InternalRelay.ts';
|
||||
import { NPool, NRelay1 } from '@nostrify/nostrify';
|
||||
import { DittoPgStore } from '@/storages/DittoPgStore.ts';
|
||||
import { getRelays } from '@/utils/outbox.ts';
|
||||
import { seedZapSplits } from '@/utils/zap-split.ts';
|
||||
|
||||
export class Storages {
|
||||
private static _db: Promise<EventsDB> | undefined;
|
||||
private static _db: Promise<DittoPgStore> | undefined;
|
||||
private static _database: Promise<DittoDatabase> | undefined;
|
||||
private static _admin: Promise<AdminStore> | undefined;
|
||||
private static _client: Promise<NPool<NRelay1>> | undefined;
|
||||
private static _pubsub: Promise<InternalRelay> | undefined;
|
||||
|
||||
public static async database(): Promise<DittoDatabase> {
|
||||
if (!this._database) {
|
||||
|
|
@ -39,11 +36,16 @@ export class Storages {
|
|||
}
|
||||
|
||||
/** SQL database to store events this Ditto server cares about. */
|
||||
public static async db(): Promise<EventsDB> {
|
||||
public static async db(): Promise<DittoPgStore> {
|
||||
if (!this._db) {
|
||||
this._db = (async () => {
|
||||
const kysely = await this.kysely();
|
||||
const store = new EventsDB({ kysely, pubkey: Conf.pubkey, timeout: Conf.db.timeouts.default });
|
||||
const db = await this.database();
|
||||
const store = new DittoPgStore({
|
||||
db,
|
||||
pubkey: Conf.pubkey,
|
||||
timeout: Conf.db.timeouts.default,
|
||||
notify: Conf.notifyEnabled,
|
||||
});
|
||||
await seedZapSplits(store);
|
||||
return store;
|
||||
})();
|
||||
|
|
@ -59,14 +61,6 @@ export class Storages {
|
|||
return this._admin;
|
||||
}
|
||||
|
||||
/** Internal pubsub relay between controllers and the pipeline. */
|
||||
public static async pubsub(): Promise<InternalRelay> {
|
||||
if (!this._pubsub) {
|
||||
this._pubsub = Promise.resolve(new InternalRelay({ gauge: internalSubscriptionsSizeGauge }));
|
||||
}
|
||||
return this._pubsub;
|
||||
}
|
||||
|
||||
/** Relay pool storage. */
|
||||
public static async client(): Promise<NPool<NRelay1>> {
|
||||
if (!this._client) {
|
||||
|
|
|
|||
|
|
@ -1,13 +1,41 @@
|
|||
import { assertEquals, assertRejects } from '@std/assert';
|
||||
import { NostrRelayMsg } from '@nostrify/nostrify';
|
||||
import { genEvent } from '@nostrify/nostrify/test';
|
||||
import { generateSecretKey } from 'nostr-tools';
|
||||
|
||||
import { RelayError } from '@/RelayError.ts';
|
||||
import { eventFixture } from '@/test.ts';
|
||||
import { Conf } from '@/config.ts';
|
||||
import { EventsDB } from '@/storages/EventsDB.ts';
|
||||
import { DittoPgStore } from '@/storages/DittoPgStore.ts';
|
||||
import { createTestDB } from '@/test.ts';
|
||||
|
||||
Deno.test('req streaming', async () => {
|
||||
await using db = await createTestDB({ pure: true });
|
||||
const { store: relay } = db;
|
||||
|
||||
const msgs: NostrRelayMsg[] = [];
|
||||
const controller = new AbortController();
|
||||
|
||||
const promise = (async () => {
|
||||
for await (const msg of relay.req([{ since: 0 }], { signal: controller.signal })) {
|
||||
msgs.push(msg);
|
||||
}
|
||||
})();
|
||||
|
||||
const event = genEvent({ created_at: Math.floor(Date.now() / 1000) });
|
||||
await relay.event(event);
|
||||
|
||||
controller.abort();
|
||||
|
||||
await promise;
|
||||
|
||||
const verbs = msgs.map(([verb]) => verb);
|
||||
|
||||
assertEquals(verbs, ['EOSE', 'EVENT', 'CLOSED']);
|
||||
assertEquals(msgs[1][2], event);
|
||||
assertEquals(relay.subs.size, 0); // cleanup
|
||||
});
|
||||
|
||||
Deno.test('count filters', async () => {
|
||||
await using db = await createTestDB({ pure: true });
|
||||
const { store } = db;
|
||||
|
|
@ -255,7 +283,7 @@ Deno.test('NPostgres.query with search', async (t) => {
|
|||
});
|
||||
});
|
||||
|
||||
Deno.test('EventsDB.indexTags indexes only the final `e` and `p` tag of kind 7 events', () => {
|
||||
Deno.test('DittoPgStore.indexTags indexes only the final `e` and `p` tag of kind 7 events', () => {
|
||||
const event = {
|
||||
kind: 7,
|
||||
id: 'a92549a442d306b32273aa9456ba48e3851a4e6203af3f567543298ab964b35b',
|
||||
|
|
@ -286,7 +314,7 @@ Deno.test('EventsDB.indexTags indexes only the final `e` and `p` tag of kind 7 e
|
|||
'44639d039a7f7fb8772fcfa13d134d3cda684ec34b6a777ead589676f9e8d81b08a24234066dcde1aacfbe193224940fba7586e7197c159757d3caf8f2b57e1b',
|
||||
};
|
||||
|
||||
const tags = EventsDB.indexTags(event);
|
||||
const tags = DittoPgStore.indexTags(event);
|
||||
|
||||
assertEquals(tags, [
|
||||
['e', 'e3653ae41ffb510e5fc071555ecfbc94d2fc31e355d61d941e39a97ac6acb15b'],
|
||||
|
|
@ -1,16 +1,27 @@
|
|||
// deno-lint-ignore-file require-await
|
||||
|
||||
import { DittoTables } from '@ditto/db';
|
||||
import { DittoDatabase, DittoTables } from '@ditto/db';
|
||||
import { detectLanguage } from '@ditto/lang';
|
||||
import { NPostgres, NPostgresSchema } from '@nostrify/db';
|
||||
import { dbEventsCounter } from '@ditto/metrics';
|
||||
import { NIP50, NKinds, NostrEvent, NostrFilter, NSchema as n } from '@nostrify/nostrify';
|
||||
import { dbEventsCounter, internalSubscriptionsSizeGauge } from '@ditto/metrics';
|
||||
import {
|
||||
NIP50,
|
||||
NKinds,
|
||||
NostrEvent,
|
||||
NostrFilter,
|
||||
NostrRelayCLOSED,
|
||||
NostrRelayEOSE,
|
||||
NostrRelayEVENT,
|
||||
NSchema as n,
|
||||
} from '@nostrify/nostrify';
|
||||
import { Machina } from '@nostrify/nostrify/utils';
|
||||
import { logi } from '@soapbox/logi';
|
||||
import { JsonValue } from '@std/json';
|
||||
import { LanguageCode } from 'iso-639-1';
|
||||
import { Kysely } from 'kysely';
|
||||
import linkify from 'linkifyjs';
|
||||
import { nip27 } from 'nostr-tools';
|
||||
import { LRUCache } from 'lru-cache';
|
||||
import { matchFilter, nip27 } from 'nostr-tools';
|
||||
import tldts from 'tldts';
|
||||
import { z } from 'zod';
|
||||
|
||||
|
|
@ -37,30 +48,47 @@ interface TagConditionOpts {
|
|||
}
|
||||
|
||||
/** Options for the EventsDB store. */
|
||||
interface EventsDBOpts {
|
||||
interface DittoPgStoreOpts {
|
||||
/** Kysely instance to use. */
|
||||
kysely: Kysely<DittoTables>;
|
||||
db: DittoDatabase;
|
||||
/** Pubkey of the admin account. */
|
||||
pubkey: string;
|
||||
/** Timeout in milliseconds for database queries. */
|
||||
timeout: number;
|
||||
/** Whether the event returned should be a Nostr event or a Ditto event. Defaults to false. */
|
||||
pure?: boolean;
|
||||
/** Chunk size for streaming events. Defaults to 20. */
|
||||
chunkSize?: number;
|
||||
/** Batch size for fulfilling subscriptions. Defaults to 500. */
|
||||
batchSize?: number;
|
||||
/** Max age (in **seconds**) an event can be to be fulfilled to realtime subscribers. */
|
||||
maxAge?: number;
|
||||
/** Whether to listen for events from the database with NOTIFY. */
|
||||
notify?: boolean;
|
||||
}
|
||||
|
||||
/** Realtime subscription. */
|
||||
interface Subscription {
|
||||
filters: NostrFilter[];
|
||||
machina: Machina<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED>;
|
||||
}
|
||||
|
||||
/** SQL database storage adapter for Nostr events. */
|
||||
class EventsDB extends NPostgres {
|
||||
export class DittoPgStore extends NPostgres {
|
||||
readonly subs = new Map<string, Subscription>();
|
||||
readonly encounters = new LRUCache<string, boolean>({ max: 1000 });
|
||||
|
||||
/** Conditions for when to index certain tags. */
|
||||
static tagConditions: Record<string, TagCondition> = {
|
||||
'a': ({ count }) => count < 15,
|
||||
'd': ({ event, count }) => count === 0 && NKinds.parameterizedReplaceable(event.kind),
|
||||
'e': EventsDB.eTagCondition,
|
||||
'e': DittoPgStore.eTagCondition,
|
||||
'k': ({ count, value }) => count === 0 && Number.isInteger(Number(value)),
|
||||
'L': ({ event, count }) => event.kind === 1985 || count === 0,
|
||||
'l': ({ event, count }) => event.kind === 1985 || count === 0,
|
||||
'n': ({ count, value }) => count < 50 && value.length < 50,
|
||||
'P': ({ count, value }) => count === 0 && isNostrId(value),
|
||||
'p': EventsDB.pTagCondition,
|
||||
'p': DittoPgStore.pTagCondition,
|
||||
'proxy': ({ count, value }) => count === 0 && value.length < 256,
|
||||
'q': ({ event, count, value }) => count === 0 && event.kind === 1 && isNostrId(value),
|
||||
'r': ({ event, count }) => (event.kind === 1985 ? count < 20 : count < 3),
|
||||
|
|
@ -72,67 +100,43 @@ class EventsDB extends NPostgres {
|
|||
},
|
||||
};
|
||||
|
||||
static indexExtensions(event: NostrEvent): Record<string, string> {
|
||||
const ext: Record<string, string> = {};
|
||||
|
||||
if (event.kind === 1) {
|
||||
ext.reply = event.tags.some(([name]) => name === 'e').toString();
|
||||
} else if (event.kind === 1111) {
|
||||
ext.reply = event.tags.some(([name]) => ['e', 'E'].includes(name)).toString();
|
||||
} else if (event.kind === 6) {
|
||||
ext.reply = 'false';
|
||||
}
|
||||
|
||||
if ([1, 20, 30023].includes(event.kind)) {
|
||||
const language = detectLanguage(event.content, 0.90);
|
||||
|
||||
if (language) {
|
||||
ext.language = language;
|
||||
}
|
||||
}
|
||||
|
||||
const imeta: string[][][] = event.tags
|
||||
.filter(([name]) => name === 'imeta')
|
||||
.map(([_, ...entries]) =>
|
||||
entries.map((entry) => {
|
||||
const split = entry.split(' ');
|
||||
return [split[0], split.splice(1).join(' ')];
|
||||
})
|
||||
);
|
||||
|
||||
// quirks mode
|
||||
if (!imeta.length && event.kind === 1) {
|
||||
const links = linkify.find(event.content).filter(({ type }) => type === 'url');
|
||||
imeta.push(...getMediaLinks(links));
|
||||
}
|
||||
|
||||
if (imeta.length) {
|
||||
ext.media = 'true';
|
||||
|
||||
if (imeta.every((tags) => tags.some(([name, value]) => name === 'm' && value.startsWith('video/')))) {
|
||||
ext.video = 'true';
|
||||
}
|
||||
}
|
||||
|
||||
ext.protocol = event.tags.find(([name]) => name === 'proxy')?.[2] ?? 'nostr';
|
||||
|
||||
return ext;
|
||||
}
|
||||
|
||||
constructor(private opts: EventsDBOpts) {
|
||||
super(opts.kysely, {
|
||||
indexTags: EventsDB.indexTags,
|
||||
indexSearch: EventsDB.searchText,
|
||||
indexExtensions: EventsDB.indexExtensions,
|
||||
constructor(private opts: DittoPgStoreOpts) {
|
||||
super(opts.db.kysely, {
|
||||
indexTags: DittoPgStore.indexTags,
|
||||
indexSearch: DittoPgStore.searchText,
|
||||
indexExtensions: DittoPgStore.indexExtensions,
|
||||
chunkSize: opts.chunkSize,
|
||||
});
|
||||
|
||||
if (opts.notify) {
|
||||
opts.db.listen('nostr_event', async (id) => {
|
||||
if (this.encounters.has(id)) return;
|
||||
this.encounters.set(id, true);
|
||||
|
||||
const [event] = await this.query([{ ids: [id] }]);
|
||||
|
||||
if (event) {
|
||||
await this.fulfill(event);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/** Insert an event (and its tags) into the database. */
|
||||
override async event(event: NostrEvent, opts: { signal?: AbortSignal; timeout?: number } = {}): Promise<void> {
|
||||
event = purifyEvent(event);
|
||||
|
||||
logi({ level: 'debug', ns: 'ditto.event', source: 'db', id: event.id, kind: event.kind });
|
||||
dbEventsCounter.inc({ kind: event.kind });
|
||||
|
||||
if (NKinds.ephemeral(event.kind)) {
|
||||
return await this.fulfill(event);
|
||||
}
|
||||
|
||||
if (this.opts.notify) {
|
||||
this.encounters.set(event.id, true);
|
||||
}
|
||||
|
||||
if (await this.isDeletedAdmin(event)) {
|
||||
throw new RelayError('blocked', 'event deleted by admin');
|
||||
}
|
||||
|
|
@ -141,6 +145,7 @@ class EventsDB extends NPostgres {
|
|||
|
||||
try {
|
||||
await super.event(event, { ...opts, timeout: opts.timeout ?? this.opts.timeout });
|
||||
this.fulfill(event); // don't await or catch (should never reject)
|
||||
} catch (e) {
|
||||
if (e instanceof Error && e.message === 'Cannot add a deleted event') {
|
||||
throw new RelayError('blocked', 'event deleted by user');
|
||||
|
|
@ -152,6 +157,48 @@ class EventsDB extends NPostgres {
|
|||
}
|
||||
}
|
||||
|
||||
/** Fulfill active subscriptions with this event. */
|
||||
protected async fulfill(event: NostrEvent): Promise<void> {
|
||||
const { maxAge = 60, batchSize = 500 } = this.opts;
|
||||
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
const age = now - event.created_at;
|
||||
|
||||
if (age > maxAge) {
|
||||
// Ephemeral events must be fulfilled, or else return an error to the client.
|
||||
if (NKinds.ephemeral(event.kind)) {
|
||||
throw new RelayError('invalid', 'event too old');
|
||||
} else {
|
||||
// Silently ignore old events.
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let count = 0;
|
||||
|
||||
for (const [subId, { filters, machina }] of this.subs.entries()) {
|
||||
for (const filter of filters) {
|
||||
count++;
|
||||
|
||||
if (this.matchesFilter(event, filter)) {
|
||||
machina.push(['EVENT', subId, event]);
|
||||
break;
|
||||
}
|
||||
|
||||
// Yield to event loop.
|
||||
if (count % batchSize === 0) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Check if the event fulfills the filter, according to Ditto criteria. */
|
||||
protected matchesFilter(event: NostrEvent, filter: NostrFilter): boolean {
|
||||
// TODO: support streaming by search.
|
||||
return typeof filter.search !== 'string' && matchFilter(filter, event);
|
||||
}
|
||||
|
||||
/** Check if an event has been deleted by the admin. */
|
||||
private async isDeletedAdmin(event: NostrEvent): Promise<boolean> {
|
||||
const filters: NostrFilter[] = [
|
||||
|
|
@ -213,27 +260,89 @@ class EventsDB extends NPostgres {
|
|||
}
|
||||
}
|
||||
|
||||
override async *req(
|
||||
filters: NostrFilter[],
|
||||
opts: { timeout?: number; signal?: AbortSignal; limit?: number } = {},
|
||||
): AsyncIterable<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
|
||||
const { db, chunkSize = 20 } = this.opts;
|
||||
const { limit, timeout = this.opts.timeout, signal } = opts;
|
||||
|
||||
filters = await this.expandFilters(filters);
|
||||
|
||||
const subId = crypto.randomUUID();
|
||||
const normalFilters = this.normalizeFilters(filters);
|
||||
const machina = new Machina<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED>(signal);
|
||||
|
||||
if (normalFilters.length && limit !== 0) {
|
||||
this.withTimeout(db.kysely as unknown as Kysely<NPostgresSchema>, timeout, async (trx) => {
|
||||
let query = this.getEventsQuery(trx, normalFilters);
|
||||
|
||||
if (typeof opts.limit === 'number') {
|
||||
query = query.limit(opts.limit);
|
||||
}
|
||||
|
||||
for await (const row of query.stream(chunkSize)) {
|
||||
const event = this.parseEventRow(row);
|
||||
machina.push(['EVENT', subId, event]);
|
||||
}
|
||||
|
||||
machina.push(['EOSE', subId]);
|
||||
}).catch((error) => {
|
||||
if (error instanceof Error && error.message.includes('timeout')) {
|
||||
machina.push(['CLOSED', subId, 'error: the relay could not respond fast enough']);
|
||||
} else {
|
||||
machina.push(['CLOSED', subId, 'error: something went wrong']);
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
for await (const msg of machina) {
|
||||
const [verb] = msg;
|
||||
|
||||
yield msg;
|
||||
|
||||
if (verb === 'EOSE') {
|
||||
break;
|
||||
}
|
||||
|
||||
if (verb === 'CLOSED') {
|
||||
return;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
yield ['CLOSED', subId, 'error: the relay could not respond fast enough'];
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
yield ['EOSE', subId];
|
||||
}
|
||||
|
||||
this.subs.set(subId, { filters, machina });
|
||||
internalSubscriptionsSizeGauge.set(this.subs.size);
|
||||
|
||||
try {
|
||||
for await (const msg of machina) {
|
||||
yield msg;
|
||||
}
|
||||
} catch (e) {
|
||||
if (e instanceof Error && e.name === 'AbortError') {
|
||||
yield ['CLOSED', subId, 'error: the relay could not respond fast enough'];
|
||||
} else {
|
||||
yield ['CLOSED', subId, 'error: something went wrong'];
|
||||
}
|
||||
} finally {
|
||||
this.subs.delete(subId);
|
||||
internalSubscriptionsSizeGauge.set(this.subs.size);
|
||||
}
|
||||
}
|
||||
|
||||
/** Get events for filters from the database. */
|
||||
override async query(
|
||||
filters: NostrFilter[],
|
||||
opts: { signal?: AbortSignal; timeout?: number; limit?: number } = {},
|
||||
opts: { signal?: AbortSignal; pure?: boolean; timeout?: number; limit?: number } = {},
|
||||
): Promise<DittoEvent[]> {
|
||||
filters = await this.expandFilters(filters);
|
||||
|
||||
for (const filter of filters) {
|
||||
if (filter.since && filter.since >= 2_147_483_647) {
|
||||
throw new RelayError('invalid', 'since filter too far into the future');
|
||||
}
|
||||
if (filter.until && filter.until >= 2_147_483_647) {
|
||||
throw new RelayError('invalid', 'until filter too far into the future');
|
||||
}
|
||||
for (const kind of filter.kinds ?? []) {
|
||||
if (kind >= 2_147_483_647) {
|
||||
throw new RelayError('invalid', 'kind filter too far into the future');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (opts.signal?.aborted) return Promise.resolve([]);
|
||||
|
||||
logi({ level: 'debug', ns: 'ditto.req', source: 'db', filters: filters as JsonValue });
|
||||
|
|
@ -323,7 +432,7 @@ class EventsDB extends NPostgres {
|
|||
|
||||
return event.tags.reduce<string[][]>((results, tag, index) => {
|
||||
const [name, value] = tag;
|
||||
const condition = EventsDB.tagConditions[name] as TagCondition | undefined;
|
||||
const condition = DittoPgStore.tagConditions[name] as TagCondition | undefined;
|
||||
|
||||
if (value && condition && value.length < 200 && checkCondition(name, value, condition, index)) {
|
||||
results.push(tag);
|
||||
|
|
@ -334,16 +443,63 @@ class EventsDB extends NPostgres {
|
|||
}, []);
|
||||
}
|
||||
|
||||
static indexExtensions(event: NostrEvent): Record<string, string> {
|
||||
const ext: Record<string, string> = {};
|
||||
|
||||
if (event.kind === 1) {
|
||||
ext.reply = event.tags.some(([name]) => name === 'e').toString();
|
||||
} else if (event.kind === 1111) {
|
||||
ext.reply = event.tags.some(([name]) => ['e', 'E'].includes(name)).toString();
|
||||
} else if (event.kind === 6) {
|
||||
ext.reply = 'false';
|
||||
}
|
||||
|
||||
if ([1, 20, 30023].includes(event.kind)) {
|
||||
const language = detectLanguage(event.content, 0.90);
|
||||
|
||||
if (language) {
|
||||
ext.language = language;
|
||||
}
|
||||
}
|
||||
|
||||
const imeta: string[][][] = event.tags
|
||||
.filter(([name]) => name === 'imeta')
|
||||
.map(([_, ...entries]) =>
|
||||
entries.map((entry) => {
|
||||
const split = entry.split(' ');
|
||||
return [split[0], split.splice(1).join(' ')];
|
||||
})
|
||||
);
|
||||
|
||||
// quirks mode
|
||||
if (!imeta.length && event.kind === 1) {
|
||||
const links = linkify.find(event.content).filter(({ type }) => type === 'url');
|
||||
imeta.push(...getMediaLinks(links));
|
||||
}
|
||||
|
||||
if (imeta.length) {
|
||||
ext.media = 'true';
|
||||
|
||||
if (imeta.every((tags) => tags.some(([name, value]) => name === 'm' && value.startsWith('video/')))) {
|
||||
ext.video = 'true';
|
||||
}
|
||||
}
|
||||
|
||||
ext.protocol = event.tags.find(([name]) => name === 'proxy')?.[2] ?? 'nostr';
|
||||
|
||||
return ext;
|
||||
}
|
||||
|
||||
/** Build a search index from the event. */
|
||||
static searchText(event: NostrEvent): string {
|
||||
switch (event.kind) {
|
||||
case 0:
|
||||
return EventsDB.buildUserSearchContent(event);
|
||||
return DittoPgStore.buildUserSearchContent(event);
|
||||
case 1:
|
||||
case 20:
|
||||
return nip27.replaceAll(event.content, () => '');
|
||||
case 30009:
|
||||
return EventsDB.buildTagsSearchContent(event.tags.filter(([t]) => t !== 'alt'));
|
||||
return DittoPgStore.buildTagsSearchContent(event.tags.filter(([t]) => t !== 'alt'));
|
||||
case 30360:
|
||||
return event.tags.find(([name]) => name === 'd')?.[1] || '';
|
||||
default:
|
||||
|
|
@ -367,6 +523,18 @@ class EventsDB extends NPostgres {
|
|||
filters = structuredClone(filters);
|
||||
|
||||
for (const filter of filters) {
|
||||
if (filter.since && filter.since >= 2_147_483_647) {
|
||||
throw new RelayError('invalid', 'since filter too far into the future');
|
||||
}
|
||||
if (filter.until && filter.until >= 2_147_483_647) {
|
||||
throw new RelayError('invalid', 'until filter too far into the future');
|
||||
}
|
||||
for (const kind of filter.kinds ?? []) {
|
||||
if (kind >= 2_147_483_647) {
|
||||
throw new RelayError('invalid', 'kind filter too far into the future');
|
||||
}
|
||||
}
|
||||
|
||||
if (filter.search) {
|
||||
const tokens = NIP50.parseInput(filter.search);
|
||||
|
||||
|
|
@ -385,7 +553,7 @@ class EventsDB extends NPostgres {
|
|||
}
|
||||
|
||||
if (domains.size || hostnames.size) {
|
||||
let query = this.opts.kysely
|
||||
let query = this.opts.db.kysely
|
||||
.selectFrom('author_stats')
|
||||
.select('pubkey')
|
||||
.where((eb) => {
|
||||
|
|
@ -417,21 +585,33 @@ class EventsDB extends NPostgres {
|
|||
.map((t) => typeof t === 'object' ? `${t.key}:${t.value}` : t)
|
||||
.join(' ');
|
||||
}
|
||||
|
||||
if (filter.kinds) {
|
||||
// Ephemeral events are not stored, so don't bother querying for them.
|
||||
// If this results in an empty kinds array, NDatabase will remove the filter before querying and return no results.
|
||||
filter.kinds = filter.kinds.filter((kind) => !NKinds.ephemeral(kind));
|
||||
}
|
||||
}
|
||||
|
||||
return filters;
|
||||
}
|
||||
|
||||
// deno-lint-ignore no-explicit-any
|
||||
override async transaction(callback: (store: NPostgres, kysely: Kysely<any>) => Promise<void>): Promise<void> {
|
||||
return super.transaction((store, kysely) => callback(store, kysely as unknown as Kysely<DittoTables>));
|
||||
/** Execute the callback in a new transaction, unless the Kysely instance is already a transaction. */
|
||||
private static override async trx<T = unknown>(
|
||||
db: Kysely<DittoTables>,
|
||||
callback: (trx: Kysely<DittoTables>) => Promise<T>,
|
||||
): Promise<T> {
|
||||
if (db.isTransaction) {
|
||||
return await callback(db);
|
||||
} else {
|
||||
return await db.transaction().execute((trx) => callback(trx));
|
||||
}
|
||||
}
|
||||
|
||||
export { EventsDB };
|
||||
/** Execute NPostgres functions in a transaction. */
|
||||
// @ts-ignore gg
|
||||
override async transaction(
|
||||
callback: (store: DittoPgStore, kysely: Kysely<DittoTables>) => Promise<void>,
|
||||
): Promise<void> {
|
||||
const { db } = this.opts;
|
||||
|
||||
await DittoPgStore.trx(db.kysely, async (trx) => {
|
||||
const store = new DittoPgStore({ ...this.opts, db: { ...db, kysely: trx }, notify: false });
|
||||
await callback(store, trx);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -1,23 +0,0 @@
|
|||
import { assertEquals } from '@std/assert';
|
||||
|
||||
import { eventFixture } from '@/test.ts';
|
||||
|
||||
import { InternalRelay } from './InternalRelay.ts';
|
||||
|
||||
Deno.test('InternalRelay', async () => {
|
||||
const relay = new InternalRelay();
|
||||
const event1 = await eventFixture('event-1');
|
||||
|
||||
const promise = new Promise((resolve) => setTimeout(() => resolve(relay.event(event1)), 0));
|
||||
|
||||
for await (const msg of relay.req([{}])) {
|
||||
if (msg[0] === 'EVENT') {
|
||||
assertEquals(relay.subs.size, 1);
|
||||
assertEquals(msg[2], event1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
await promise;
|
||||
assertEquals(relay.subs.size, 0); // cleanup
|
||||
});
|
||||
|
|
@ -1,86 +0,0 @@
|
|||
// deno-lint-ignore-file require-await
|
||||
import {
|
||||
NIP50,
|
||||
NostrEvent,
|
||||
NostrFilter,
|
||||
NostrRelayCLOSED,
|
||||
NostrRelayEOSE,
|
||||
NostrRelayEVENT,
|
||||
NRelay,
|
||||
} from '@nostrify/nostrify';
|
||||
import { Machina } from '@nostrify/nostrify/utils';
|
||||
import { matchFilter } from 'nostr-tools';
|
||||
import { Gauge } from 'prom-client';
|
||||
|
||||
import { DittoEvent } from '@/interfaces/DittoEvent.ts';
|
||||
import { purifyEvent } from '@/utils/purify.ts';
|
||||
|
||||
interface InternalRelayOpts {
|
||||
gauge?: Gauge;
|
||||
}
|
||||
|
||||
/**
|
||||
* PubSub event store for streaming events within the application.
|
||||
* The pipeline should push events to it, then anything in the application can subscribe to it.
|
||||
*/
|
||||
export class InternalRelay implements NRelay {
|
||||
readonly subs = new Map<string, { filters: NostrFilter[]; machina: Machina<NostrEvent> }>();
|
||||
|
||||
constructor(private opts: InternalRelayOpts = {}) {}
|
||||
|
||||
async *req(
|
||||
filters: NostrFilter[],
|
||||
opts?: { signal?: AbortSignal },
|
||||
): AsyncGenerator<NostrRelayEVENT | NostrRelayEOSE | NostrRelayCLOSED> {
|
||||
const id = crypto.randomUUID();
|
||||
const machina = new Machina<NostrEvent>(opts?.signal);
|
||||
|
||||
yield ['EOSE', id];
|
||||
|
||||
this.subs.set(id, { filters, machina });
|
||||
this.opts.gauge?.set(this.subs.size);
|
||||
|
||||
try {
|
||||
for await (const event of machina) {
|
||||
yield ['EVENT', id, event];
|
||||
}
|
||||
} finally {
|
||||
this.subs.delete(id);
|
||||
this.opts.gauge?.set(this.subs.size);
|
||||
}
|
||||
}
|
||||
|
||||
async event(event: DittoEvent): Promise<void> {
|
||||
for (const { filters, machina } of this.subs.values()) {
|
||||
for (const filter of filters) {
|
||||
if (matchFilter(filter, event)) {
|
||||
if (filter.search) {
|
||||
const tokens = NIP50.parseInput(filter.search);
|
||||
|
||||
const domain = (tokens.find((t) =>
|
||||
typeof t === 'object' && t.key === 'domain'
|
||||
) as { key: 'domain'; value: string } | undefined)?.value;
|
||||
|
||||
if (domain === event.author_stats?.nip05_hostname) {
|
||||
machina.push(purifyEvent(event));
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
machina.push(purifyEvent(event));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
async query(): Promise<NostrEvent[]> {
|
||||
return [];
|
||||
}
|
||||
|
||||
async close(): Promise<void> {
|
||||
return Promise.resolve();
|
||||
}
|
||||
}
|
||||
|
|
@ -2,7 +2,7 @@ import { DittoDB } from '@ditto/db';
|
|||
import { NostrEvent } from '@nostrify/nostrify';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
import { EventsDB } from '@/storages/EventsDB.ts';
|
||||
import { DittoPgStore } from '@/storages/DittoPgStore.ts';
|
||||
import { sql } from 'kysely';
|
||||
|
||||
/** Import an event fixture by name in tests. */
|
||||
|
|
@ -13,31 +13,32 @@ export async function eventFixture(name: string): Promise<NostrEvent> {
|
|||
|
||||
/** Create a database for testing. It uses `DATABASE_URL`, or creates an in-memory database by default. */
|
||||
export async function createTestDB(opts?: { pure?: boolean }) {
|
||||
const { kysely } = DittoDB.create(Conf.databaseUrl, { poolSize: 1 });
|
||||
const db = DittoDB.create(Conf.databaseUrl, { poolSize: 1 });
|
||||
|
||||
await DittoDB.migrate(kysely);
|
||||
await DittoDB.migrate(db.kysely);
|
||||
|
||||
const store = new EventsDB({
|
||||
kysely,
|
||||
const store = new DittoPgStore({
|
||||
db,
|
||||
timeout: Conf.db.timeouts.default,
|
||||
pubkey: Conf.pubkey,
|
||||
pure: opts?.pure ?? false,
|
||||
notify: true,
|
||||
});
|
||||
|
||||
return {
|
||||
...db,
|
||||
store,
|
||||
kysely,
|
||||
[Symbol.asyncDispose]: async () => {
|
||||
const { rows } = await sql<
|
||||
{ tablename: string }
|
||||
>`select tablename from pg_tables where schemaname = current_schema()`.execute(kysely);
|
||||
>`select tablename from pg_tables where schemaname = current_schema()`.execute(db.kysely);
|
||||
|
||||
for (const { tablename } of rows) {
|
||||
if (tablename.startsWith('kysely_')) continue;
|
||||
await sql`truncate table ${sql.ref(tablename)} cascade`.execute(kysely);
|
||||
await sql`truncate table ${sql.ref(tablename)} cascade`.execute(db.kysely);
|
||||
}
|
||||
|
||||
await kysely.destroy();
|
||||
await db[Symbol.asyncDispose]();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ import { NostrEvent, NostrRelayOK, NPolicy } from '@nostrify/nostrify';
|
|||
import { ReadOnlyPolicy } from '@nostrify/policies';
|
||||
import * as Comlink from 'comlink';
|
||||
|
||||
import { EventsDB } from '@/storages/EventsDB.ts';
|
||||
import { DittoPgStore } from '@/storages/DittoPgStore.ts';
|
||||
|
||||
// @ts-ignore Don't try to access the env from this worker.
|
||||
Deno.env = new Map<string, string>();
|
||||
|
|
@ -15,7 +15,7 @@ interface PolicyInit {
|
|||
path: string;
|
||||
/** Database URL to connect to. */
|
||||
databaseUrl: string;
|
||||
/** Admin pubkey to use for EventsDB checks. */
|
||||
/** Admin pubkey to use for DittoPgStore checks. */
|
||||
pubkey: string;
|
||||
}
|
||||
|
||||
|
|
@ -30,10 +30,10 @@ export class CustomPolicy implements NPolicy {
|
|||
async init({ path, databaseUrl, pubkey }: PolicyInit): Promise<void> {
|
||||
const Policy = (await import(path)).default;
|
||||
|
||||
const { kysely } = DittoDB.create(databaseUrl, { poolSize: 1 });
|
||||
const db = DittoDB.create(databaseUrl, { poolSize: 1 });
|
||||
|
||||
const store = new EventsDB({
|
||||
kysely,
|
||||
const store = new DittoPgStore({
|
||||
db,
|
||||
pubkey,
|
||||
timeout: 5_000,
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
import { NostrEvent } from '@nostrify/nostrify';
|
||||
|
||||
import { Storages } from '../packages/ditto/storages.ts';
|
||||
import { EventsDB } from '../packages/ditto/storages/EventsDB.ts';
|
||||
import { DittoPgStore } from '../packages/ditto/storages/DittoPgStore.ts';
|
||||
|
||||
const kysely = await Storages.kysely();
|
||||
|
||||
|
|
@ -11,7 +11,7 @@ const query = kysely
|
|||
|
||||
for await (const row of query.stream()) {
|
||||
const event: NostrEvent = { ...row, created_at: Number(row.created_at) };
|
||||
const ext = EventsDB.indexExtensions(event);
|
||||
const ext = DittoPgStore.indexExtensions(event);
|
||||
|
||||
try {
|
||||
await kysely
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue