Merge remote-tracking branch 'origin/main' into policy-signal

This commit is contained in:
Alex Gleason 2024-09-23 18:06:11 -05:00
commit fe7a8aa8a7
No known key found for this signature in database
GPG key ID: 7211D1F99744FBB7
2 changed files with 10 additions and 7 deletions

View file

@ -1,3 +1,4 @@
import { Stickynotes } from '@soapbox/stickynotes';
import TTLCache from '@isaacs/ttlcache'; import TTLCache from '@isaacs/ttlcache';
import { import {
NostrClientCLOSE, NostrClientCLOSE,
@ -29,6 +30,8 @@ const limiter = new TTLCache<string, number>();
/** Connections for metrics purposes. */ /** Connections for metrics purposes. */
const connections = new Set<WebSocket>(); const connections = new Set<WebSocket>();
const console = new Stickynotes('ditto:relay');
/** Set up the Websocket connection. */ /** Set up the Websocket connection. */
function connectStream(socket: WebSocket, ip: string | undefined) { function connectStream(socket: WebSocket, ip: string | undefined) {
const controllers = new Map<string, AbortController>(); const controllers = new Map<string, AbortController>();

View file

@ -1,5 +1,5 @@
import { NKinds, NostrEvent, NSchema as n } from '@nostrify/nostrify'; import { NKinds, NostrEvent, NSchema as n } from '@nostrify/nostrify';
import Debug from '@soapbox/stickynotes/debug'; import { Stickynotes } from '@soapbox/stickynotes';
import ISO6391 from 'iso-639-1'; import ISO6391 from 'iso-639-1';
import { Kysely, sql } from 'kysely'; import { Kysely, sql } from 'kysely';
import lande from 'lande'; import lande from 'lande';
@ -23,7 +23,7 @@ import { purifyEvent } from '@/utils/purify.ts';
import { updateStats } from '@/utils/stats.ts'; import { updateStats } from '@/utils/stats.ts';
import { getTagSet } from '@/utils/tags.ts'; import { getTagSet } from '@/utils/tags.ts';
const debug = Debug('ditto:pipeline'); const console = new Stickynotes('ditto:pipeline');
/** /**
* Common pipeline function to process (and maybe store) events. * Common pipeline function to process (and maybe store) events.
@ -41,7 +41,7 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
if (encounterEvent(event)) return; if (encounterEvent(event)) return;
if (await existsInDB(event)) return; if (await existsInDB(event)) return;
debug(`NostrEvent<${event.kind}> ${event.id}`); console.info(`NostrEvent<${event.kind}> ${event.id}`);
pipelineEventsCounter.inc({ kind: event.kind }); pipelineEventsCounter.inc({ kind: event.kind });
if (isProtectedEvent(event)) { if (isProtectedEvent(event)) {
@ -76,18 +76,18 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
} }
async function policyFilter(event: NostrEvent, signal: AbortSignal): Promise<void> { async function policyFilter(event: NostrEvent, signal: AbortSignal): Promise<void> {
const debug = Debug('ditto:policy'); const console = new Stickynotes('ditto:policy');
try { try {
const result = await policyWorker.call(event, signal); const result = await policyWorker.call(event, signal);
policyEventsCounter.inc({ ok: String(result[2]) }); policyEventsCounter.inc({ ok: String(result[2]) });
debug(JSON.stringify(result)); console.log(JSON.stringify(result));
RelayError.assert(result); RelayError.assert(result);
} catch (e) { } catch (e) {
if (e instanceof RelayError) { if (e instanceof RelayError) {
throw e; throw e;
} else { } else {
console.error('POLICY ERROR:', e); console.error(e);
throw new RelayError('blocked', 'policy error'); throw new RelayError('blocked', 'policy error');
} }
} }
@ -136,7 +136,7 @@ async function storeEvent(event: DittoEvent, signal?: AbortSignal): Promise<unde
const store = await Storages.db(); const store = await Storages.db();
await store.transaction(async (store, kysely) => { await store.transaction(async (store, kysely) => {
await updateStats({ event, store, kysely }); await updateStats({ event, store, kysely }).catch((e) => console.error(e));
await store.event(event, { signal }); await store.event(event, { signal });
}); });
} }