Merge branch 'notify' into 'main'

Perform listen/notify of Nostr events to the pipeline

Closes #219

See merge request soapbox-pub/ditto!562
This commit is contained in:
Alex Gleason 2024-10-20 02:18:22 +00:00
commit 9ad9adcff5
8 changed files with 81 additions and 8 deletions

View file

@ -249,6 +249,13 @@ class Conf {
.split(/[, ]+/g) .split(/[, ]+/g)
.map(Number); .map(Number);
} }
/**
* Whether Ditto should subscribe to Nostr events from the Postgres database itself.
* This would make Nostr events inserted directly into Postgres available to the streaming API and relay.
*/
static get notifyEnabled(): boolean {
return optionalBooleanSchema.parse(Deno.env.get('NOTIFY_ENABLED')) ?? false;
}
/** Whether to enable Ditto cron jobs. */ /** Whether to enable Ditto cron jobs. */
static get cronEnabled(): boolean { static get cronEnabled(): boolean {
return optionalBooleanSchema.parse(Deno.env.get('CRON_ENABLED')) ?? true; return optionalBooleanSchema.parse(Deno.env.get('CRON_ENABLED')) ?? true;

View file

@ -6,6 +6,7 @@ export interface DittoDatabase {
readonly kysely: Kysely<DittoTables>; readonly kysely: Kysely<DittoTables>;
readonly poolSize: number; readonly poolSize: number;
readonly availableConnections: number; readonly availableConnections: number;
listen(channel: string, callback: (payload: string) => void): void;
} }
export interface DittoDatabaseOpts { export interface DittoDatabaseOpts {

View file

@ -26,10 +26,15 @@ export class DittoPglite {
log: KyselyLogger, log: KyselyLogger,
}); });
const listen = (channel: string, callback: (payload: string) => void): void => {
pglite.listen(channel, callback);
};
return { return {
kysely, kysely,
poolSize: 1, poolSize: 1,
availableConnections: 1, availableConnections: 1,
listen,
}; };
} }
} }

View file

@ -40,6 +40,10 @@ export class DittoPostgres {
log: KyselyLogger, log: KyselyLogger,
}); });
const listen = (channel: string, callback: (payload: string) => void): void => {
pg.listen(channel, callback);
};
return { return {
kysely, kysely,
get poolSize() { get poolSize() {
@ -48,6 +52,7 @@ export class DittoPostgres {
get availableConnections() { get availableConnections() {
return pg.connections.idle; return pg.connections.idle;
}, },
listen,
}; };
} }
} }

View file

@ -0,0 +1,37 @@
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await sql`
CREATE OR REPLACE FUNCTION notify_nostr_event()
RETURNS TRIGGER AS $$
DECLARE
payload JSON;
BEGIN
payload := json_build_object(
'id', NEW.id,
'kind', NEW.kind,
'pubkey', NEW.pubkey,
'content', NEW.content,
'tags', NEW.tags,
'created_at', NEW.created_at,
'sig', NEW.sig
);
PERFORM pg_notify('nostr_event', payload::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
`.execute(db);
await sql`
CREATE TRIGGER nostr_event_trigger
AFTER INSERT OR UPDATE ON nostr_events
FOR EACH ROW EXECUTE FUNCTION notify_nostr_event()
`.execute(db);
}
export async function down(db: Kysely<any>): Promise<void> {
await sql`DROP TRIGGER nostr_event_trigger ON nostr_events`.execute(db);
await sql`DROP FUNCTION notify_nostr_event()`.execute(db);
}

21
src/notify.ts Normal file
View file

@ -0,0 +1,21 @@
import { Semaphore } from '@lambdalisue/async';
import * as pipeline from '@/pipeline.ts';
import { Storages } from '@/storages.ts';
const sem = new Semaphore(1);
export async function startNotify(): Promise<void> {
const { listen } = await Storages.database();
listen('nostr_event', (payload) => {
sem.lock(async () => {
try {
const event = JSON.parse(payload);
await pipeline.handleEvent(event, AbortSignal.timeout(5000));
} catch (e) {
console.warn(e);
}
});
});
}

View file

@ -40,7 +40,6 @@ async function handleEvent(event: DittoEvent, signal: AbortSignal): Promise<void
} }
if (!(await verifyEventWorker(event))) return; if (!(await verifyEventWorker(event))) return;
if (encounterEvent(event)) return; if (encounterEvent(event)) return;
if (await existsInDB(event)) return;
console.info(`NostrEvent<${event.kind}> ${event.id}`); console.info(`NostrEvent<${event.kind}> ${event.id}`);
pipelineEventsCounter.inc({ kind: event.kind }); pipelineEventsCounter.inc({ kind: event.kind });
@ -111,13 +110,6 @@ function encounterEvent(event: NostrEvent): boolean {
return encountered; return encountered;
} }
/** Check if the event already exists in the database. */
async function existsInDB(event: DittoEvent): Promise<boolean> {
const store = await Storages.db();
const events = await store.query([{ ids: [event.id], limit: 1 }]);
return events.length > 0;
}
/** Check whether the event has a NIP-70 `-` tag. */ /** Check whether the event has a NIP-70 `-` tag. */
function isProtectedEvent(event: NostrEvent): boolean { function isProtectedEvent(event: NostrEvent): boolean {
return event.tags.some(([name]) => name === '-'); return event.tags.some(([name]) => name === '-');

View file

@ -3,11 +3,16 @@
import { Conf } from '@/config.ts'; import { Conf } from '@/config.ts';
import { cron } from '@/cron.ts'; import { cron } from '@/cron.ts';
import { startFirehose } from '@/firehose.ts'; import { startFirehose } from '@/firehose.ts';
import { startNotify } from '@/notify.ts';
if (Conf.firehoseEnabled) { if (Conf.firehoseEnabled) {
startFirehose(); startFirehose();
} }
if (Conf.notifyEnabled) {
startNotify();
}
if (Conf.cronEnabled) { if (Conf.cronEnabled) {
cron(); cron();
} }