From 350671db4758890116fa6709be90189231958bdb Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 25 Sep 2024 14:31:01 -0500 Subject: [PATCH 1/3] DittoPglite: prevent starting PGlite instances in worker threads --- src/db/adapters/DittoPglite.ts | 19 +++++++++++++------ src/utils/worker.test.ts | 30 ++++++++++++++++++++++++++++++ src/utils/worker.ts | 9 +++++++++ 3 files changed, 52 insertions(+), 6 deletions(-) create mode 100644 src/utils/worker.test.ts create mode 100644 src/utils/worker.ts diff --git a/src/db/adapters/DittoPglite.ts b/src/db/adapters/DittoPglite.ts index 2455fc37..6e4ae488 100644 --- a/src/db/adapters/DittoPglite.ts +++ b/src/db/adapters/DittoPglite.ts @@ -6,16 +6,23 @@ import { Kysely } from 'kysely'; import { DittoDatabase, DittoDatabaseOpts } from '@/db/DittoDatabase.ts'; import { DittoTables } from '@/db/DittoTables.ts'; import { KyselyLogger } from '@/db/KyselyLogger.ts'; +import { isWorker } from '@/utils/worker.ts'; export class DittoPglite { static create(databaseUrl: string, opts?: DittoDatabaseOpts): DittoDatabase { + const url = new URL(databaseUrl); + + if (url.protocol === 'file:' && isWorker()) { + throw new Error('PGlite is not supported in worker threads.'); + } + + const pglite = new PGlite(databaseUrl, { + extensions: { pg_trgm }, + debug: opts?.debug, + }); + const kysely = new Kysely({ - dialect: new PgliteDialect({ - database: new PGlite(databaseUrl, { - extensions: { pg_trgm }, - debug: opts?.debug, - }), - }), + dialect: new PgliteDialect({ database: pglite }), log: KyselyLogger, }); diff --git a/src/utils/worker.test.ts b/src/utils/worker.test.ts new file mode 100644 index 00000000..89845e2b --- /dev/null +++ b/src/utils/worker.test.ts @@ -0,0 +1,30 @@ +import { assertEquals } from '@std/assert'; + +import { isWorker } from '@/utils/worker.ts'; + +Deno.test('isWorker from the main thread returns false', () => { + assertEquals(isWorker(), false); +}); + +Deno.test('isWorker from a worker thread returns true', async () => { + const script = ` + import { isWorker } from '@/utils/worker.ts'; + postMessage(isWorker()); + self.close(); + `; + + const worker = new Worker( + URL.createObjectURL(new Blob([script], { type: 'application/javascript' })), + { type: 'module' }, + ); + + const result = await new Promise((resolve) => { + worker.onmessage = (e) => { + resolve(e.data); + }; + }); + + worker.terminate(); + + assertEquals(result, true); +}); diff --git a/src/utils/worker.ts b/src/utils/worker.ts new file mode 100644 index 00000000..ce94caee --- /dev/null +++ b/src/utils/worker.ts @@ -0,0 +1,9 @@ +/** + * Detect if this thread is running in a Worker context. + * + * https://stackoverflow.com/a/18002694 + */ +export function isWorker(): boolean { + // @ts-ignore This is fine. + return typeof WorkerGlobalScope !== 'undefined' && self instanceof WorkerGlobalScope; +} From f50b41f22b4233e8f472a058d38badb874fff342 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 25 Sep 2024 15:01:40 -0500 Subject: [PATCH 2/3] Refactor PolicyWorker error handling --- src/pipeline.ts | 2 +- src/workers/policy.ts | 91 ++++++++++++++++++++++++------------ src/workers/policy.worker.ts | 20 ++------ 3 files changed, 67 insertions(+), 46 deletions(-) diff --git a/src/pipeline.ts b/src/pipeline.ts index 6aa4b0c0..a00456a9 100644 --- a/src/pipeline.ts +++ b/src/pipeline.ts @@ -81,7 +81,7 @@ async function policyFilter(event: NostrEvent, signal: AbortSignal): Promise( - new Worker( - new URL('./policy.worker.ts', import.meta.url), - { - type: 'module', - deno: { - permissions: { - read: [Conf.denoDir, Conf.policy, Conf.dataDir], - write: [Conf.dataDir], - net: 'inherit', - env: false, - }, - }, - }, - ), -); +class PolicyWorker implements NPolicy { + private worker: Comlink.Remote; + private ready: Promise; + private enabled = true; -try { - await policyWorker.init({ - path: Conf.policy, - cwd: Deno.cwd(), - databaseUrl: Conf.databaseUrl, - adminPubkey: Conf.pubkey, - }); - console.debug(`Using custom policy: ${Conf.policy}`); -} catch (e: any) { - if (e.message.includes('Module not found')) { - console.debug('Custom policy not found '); - } else { - throw new Error(`DITTO_POLICY (error importing policy): ${Conf.policy}`, e); + constructor() { + this.worker = Comlink.wrap( + new Worker( + new URL('./policy.worker.ts', import.meta.url), + { + type: 'module', + deno: { + permissions: { + read: [Conf.denoDir, Conf.policy, Conf.dataDir], + write: [Conf.dataDir], + net: 'inherit', + env: false, + }, + }, + }, + ), + ); + + this.ready = this.init(); + } + + async call(event: NostrEvent, signal?: AbortSignal): Promise { + await this.ready; + + if (!this.enabled) { + return ['OK', event.id, true, '']; + } + + return this.worker.call(event, signal); + } + + private async init(): Promise { + try { + await this.worker.init({ + path: Conf.policy, + databaseUrl: Conf.databaseUrl, + adminPubkey: Conf.pubkey, + }); + + console.warn(`Using custom policy: ${Conf.policy}`); + } catch (e) { + if (e instanceof Error && e.message.includes('Module not found')) { + console.warn('Custom policy not found '); + this.enabled = false; + return; + } + + if (e instanceof Error && e.message.includes('PGlite is not supported in worker threads')) { + console.warn('Custom policies are not supported with PGlite. The policy is disabled.'); + this.enabled = false; + return; + } + + throw new Error(`DITTO_POLICY (error importing policy): ${Conf.policy}`, e); + } } } + +export const policyWorker = new PolicyWorker(); diff --git a/src/workers/policy.worker.ts b/src/workers/policy.worker.ts index ae6ef8b1..6b256a21 100644 --- a/src/workers/policy.worker.ts +++ b/src/workers/policy.worker.ts @@ -1,6 +1,6 @@ import 'deno-safe-fetch/load'; import { NostrEvent, NostrRelayOK, NPolicy } from '@nostrify/nostrify'; -import { NoOpPolicy, ReadOnlyPolicy } from '@nostrify/policies'; +import { ReadOnlyPolicy } from '@nostrify/policies'; import * as Comlink from 'comlink'; import { DittoDB } from '@/db/DittoDB.ts'; @@ -15,8 +15,6 @@ Deno.env = new Map(); interface PolicyInit { /** Path to the policy module (https, jsr, file, etc) */ path: string; - /** Current working directory. */ - cwd: string; /** Database URL to connect to. */ databaseUrl: string; /** Admin pubkey to use for EventsDB checks. */ @@ -31,10 +29,8 @@ export class CustomPolicy implements NPolicy { return this.policy.call(event, signal); } - async init({ path, cwd, databaseUrl, adminPubkey }: PolicyInit): Promise { - // HACK: PGlite uses `path.resolve`, which requires read permission on Deno (which we don't want to give). - // We can work around this getting the cwd from the caller and overwriting `Deno.cwd`. - Deno.cwd = () => cwd; + async init({ path, databaseUrl, adminPubkey }: PolicyInit): Promise { + const Policy = (await import(path)).default; const { kysely } = DittoDB.create(databaseUrl, { poolSize: 1 }); @@ -44,15 +40,7 @@ export class CustomPolicy implements NPolicy { timeout: 1_000, }); - try { - const Policy = (await import(path)).default; - this.policy = new Policy({ store }); - } catch (e: any) { - if (e.message.includes('Module not found')) { - this.policy = new NoOpPolicy(); - } - throw e; - } + this.policy = new Policy({ store }); } } From ff658cf4968975adef3341aafad5191aa53917fc Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Wed, 25 Sep 2024 15:10:12 -0500 Subject: [PATCH 3/3] policy: fix Error constructor --- src/workers/policy.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/workers/policy.ts b/src/workers/policy.ts index ef7e6db7..941b5cf1 100644 --- a/src/workers/policy.ts +++ b/src/workers/policy.ts @@ -67,7 +67,7 @@ class PolicyWorker implements NPolicy { return; } - throw new Error(`DITTO_POLICY (error importing policy): ${Conf.policy}`, e); + throw new Error(`DITTO_POLICY (error importing policy): ${Conf.policy}`); } } }