mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 11:29:46 +00:00
Perform listen/notify of Nostr events to the pipeline
This commit is contained in:
parent
92a1d88923
commit
8155fae89a
7 changed files with 87 additions and 0 deletions
|
|
@ -249,6 +249,13 @@ class Conf {
|
|||
.split(/[, ]+/g)
|
||||
.map(Number);
|
||||
}
|
||||
/**
|
||||
* Whether Ditto should subscribe to Nostr events from the Postgres database itself.
|
||||
* This would make Nostr events inserted directly into Postgres available to the streaming API and relay.
|
||||
*/
|
||||
static get notifyEnabled(): boolean {
|
||||
return optionalBooleanSchema.parse(Deno.env.get('NOTIFY_ENABLED')) ?? false;
|
||||
}
|
||||
/** Whether to enable Ditto cron jobs. */
|
||||
static get cronEnabled(): boolean {
|
||||
return optionalBooleanSchema.parse(Deno.env.get('CRON_ENABLED')) ?? true;
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import { NostrEvent } from '@nostrify/nostrify';
|
||||
import { Kysely } from 'kysely';
|
||||
|
||||
import { DittoTables } from '@/db/DittoTables.ts';
|
||||
|
|
@ -6,6 +7,7 @@ export interface DittoDatabase {
|
|||
readonly kysely: Kysely<DittoTables>;
|
||||
readonly poolSize: number;
|
||||
readonly availableConnections: number;
|
||||
readonly listenNostr: (onEvent: (event: NostrEvent) => void) => void;
|
||||
}
|
||||
|
||||
export interface DittoDatabaseOpts {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import { PGlite } from '@electric-sql/pglite';
|
||||
import { pg_trgm } from '@electric-sql/pglite/contrib/pg_trgm';
|
||||
import { NostrEvent } from '@nostrify/nostrify';
|
||||
import { PgliteDialect } from '@soapbox/kysely-pglite';
|
||||
import { Kysely } from 'kysely';
|
||||
|
||||
|
|
@ -26,10 +27,17 @@ export class DittoPglite {
|
|||
log: KyselyLogger,
|
||||
});
|
||||
|
||||
const listenNostr = (onEvent: (event: NostrEvent) => void): void => {
|
||||
pglite.listen('nostr_event', (payload) => {
|
||||
onEvent(JSON.parse(payload));
|
||||
});
|
||||
};
|
||||
|
||||
return {
|
||||
kysely,
|
||||
poolSize: 1,
|
||||
availableConnections: 1,
|
||||
listenNostr,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import { NostrEvent } from '@nostrify/nostrify';
|
||||
import {
|
||||
BinaryOperationNode,
|
||||
FunctionNode,
|
||||
|
|
@ -40,6 +41,12 @@ export class DittoPostgres {
|
|||
log: KyselyLogger,
|
||||
});
|
||||
|
||||
const listenNostr = (onEvent: (event: NostrEvent) => void): void => {
|
||||
pg.listen('nostr_event', (payload) => {
|
||||
onEvent(JSON.parse(payload));
|
||||
});
|
||||
};
|
||||
|
||||
return {
|
||||
kysely,
|
||||
get poolSize() {
|
||||
|
|
@ -48,6 +55,7 @@ export class DittoPostgres {
|
|||
get availableConnections() {
|
||||
return pg.connections.idle;
|
||||
},
|
||||
listenNostr,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
|||
37
src/db/migrations/039_pg_notify.ts
Normal file
37
src/db/migrations/039_pg_notify.ts
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
import { Kysely, sql } from 'kysely';
|
||||
|
||||
export async function up(db: Kysely<any>): Promise<void> {
|
||||
await sql`
|
||||
CREATE OR REPLACE FUNCTION notify_nostr_event()
|
||||
RETURNS TRIGGER AS $$
|
||||
DECLARE
|
||||
payload JSON;
|
||||
BEGIN
|
||||
payload := json_build_object(
|
||||
'id', NEW.id,
|
||||
'kind', NEW.kind,
|
||||
'pubkey', NEW.pubkey,
|
||||
'content', NEW.content,
|
||||
'tags', NEW.tags,
|
||||
'created_at', NEW.created_at,
|
||||
'sig', NEW.sig
|
||||
);
|
||||
|
||||
PERFORM pg_notify('nostr_event', payload::text);
|
||||
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
`.execute(db);
|
||||
|
||||
await sql`
|
||||
CREATE TRIGGER nostr_event_trigger
|
||||
AFTER INSERT OR UPDATE ON nostr_events
|
||||
FOR EACH ROW EXECUTE FUNCTION notify_nostr_event()
|
||||
`.execute(db);
|
||||
}
|
||||
|
||||
export async function down(db: Kysely<any>): Promise<void> {
|
||||
await sql`DROP TRIGGER nostr_event_trigger ON nostr_events`.execute(db);
|
||||
await sql`DROP FUNCTION notify_nostr_event()`.execute(db);
|
||||
}
|
||||
20
src/notify.ts
Normal file
20
src/notify.ts
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
import { Semaphore } from '@lambdalisue/async';
|
||||
|
||||
import * as pipeline from '@/pipeline.ts';
|
||||
import { Storages } from '@/storages.ts';
|
||||
|
||||
const sem = new Semaphore(1);
|
||||
|
||||
export async function startNotify(): Promise<void> {
|
||||
const { listenNostr } = await Storages.database();
|
||||
|
||||
listenNostr((event) => {
|
||||
sem.lock(async () => {
|
||||
try {
|
||||
await pipeline.handleEvent(event, AbortSignal.timeout(5000));
|
||||
} catch (e) {
|
||||
console.warn(e);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
@ -3,11 +3,16 @@
|
|||
import { Conf } from '@/config.ts';
|
||||
import { cron } from '@/cron.ts';
|
||||
import { startFirehose } from '@/firehose.ts';
|
||||
import { startNotify } from '@/notify.ts';
|
||||
|
||||
if (Conf.firehoseEnabled) {
|
||||
startFirehose();
|
||||
}
|
||||
|
||||
if (Conf.notifyEnabled) {
|
||||
startNotify();
|
||||
}
|
||||
|
||||
if (Conf.cronEnabled) {
|
||||
cron();
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue