Merge branch 'pglite-policy' into 'main'

Fix PGlite in local development

See merge request soapbox-pub/ditto!520
This commit is contained in:
Alex Gleason 2024-09-25 20:15:46 +00:00
commit 68ab9f638c
6 changed files with 119 additions and 52 deletions

View file

@ -6,16 +6,23 @@ import { Kysely } from 'kysely';
import { DittoDatabase, DittoDatabaseOpts } from '@/db/DittoDatabase.ts';
import { DittoTables } from '@/db/DittoTables.ts';
import { KyselyLogger } from '@/db/KyselyLogger.ts';
import { isWorker } from '@/utils/worker.ts';
export class DittoPglite {
static create(databaseUrl: string, opts?: DittoDatabaseOpts): DittoDatabase {
const url = new URL(databaseUrl);
if (url.protocol === 'file:' && isWorker()) {
throw new Error('PGlite is not supported in worker threads.');
}
const pglite = new PGlite(databaseUrl, {
extensions: { pg_trgm },
debug: opts?.debug,
});
const kysely = new Kysely<DittoTables>({
dialect: new PgliteDialect({
database: new PGlite(databaseUrl, {
extensions: { pg_trgm },
debug: opts?.debug,
}),
}),
dialect: new PgliteDialect({ database: pglite }),
log: KyselyLogger,
});

View file

@ -81,7 +81,7 @@ async function policyFilter(event: NostrEvent, signal: AbortSignal): Promise<voi
try {
const result = await policyWorker.call(event, signal);
policyEventsCounter.inc({ ok: String(result[2]) });
console.log(JSON.stringify(result));
console.debug(JSON.stringify(result));
RelayError.assert(result);
} catch (e) {
if (e instanceof RelayError) {

30
src/utils/worker.test.ts Normal file
View file

@ -0,0 +1,30 @@
import { assertEquals } from '@std/assert';
import { isWorker } from '@/utils/worker.ts';
Deno.test('isWorker from the main thread returns false', () => {
assertEquals(isWorker(), false);
});
Deno.test('isWorker from a worker thread returns true', async () => {
const script = `
import { isWorker } from '@/utils/worker.ts';
postMessage(isWorker());
self.close();
`;
const worker = new Worker(
URL.createObjectURL(new Blob([script], { type: 'application/javascript' })),
{ type: 'module' },
);
const result = await new Promise<boolean>((resolve) => {
worker.onmessage = (e) => {
resolve(e.data);
};
});
worker.terminate();
assertEquals(result, true);
});

9
src/utils/worker.ts Normal file
View file

@ -0,0 +1,9 @@
/**
* Detect if this thread is running in a Worker context.
*
* https://stackoverflow.com/a/18002694
*/
export function isWorker(): boolean {
// @ts-ignore This is fine.
return typeof WorkerGlobalScope !== 'undefined' && self instanceof WorkerGlobalScope;
}

View file

@ -1,3 +1,4 @@
import { NostrEvent, NostrRelayOK, NPolicy } from '@nostrify/nostrify';
import { Stickynotes } from '@soapbox/stickynotes';
import * as Comlink from 'comlink';
@ -8,35 +9,67 @@ import '@/workers/handlers/abortsignal.ts';
const console = new Stickynotes('ditto:policy');
export const policyWorker = Comlink.wrap<CustomPolicy>(
new Worker(
new URL('./policy.worker.ts', import.meta.url),
{
type: 'module',
deno: {
permissions: {
read: [Conf.denoDir, Conf.policy, Conf.dataDir],
write: [Conf.dataDir],
net: 'inherit',
env: false,
},
},
},
),
);
class PolicyWorker implements NPolicy {
private worker: Comlink.Remote<CustomPolicy>;
private ready: Promise<void>;
private enabled = true;
try {
await policyWorker.init({
path: Conf.policy,
cwd: Deno.cwd(),
databaseUrl: Conf.databaseUrl,
adminPubkey: Conf.pubkey,
});
console.debug(`Using custom policy: ${Conf.policy}`);
} catch (e: any) {
if (e.message.includes('Module not found')) {
console.debug('Custom policy not found <https://docs.soapbox.pub/ditto/policies/>');
} else {
throw new Error(`DITTO_POLICY (error importing policy): ${Conf.policy}`, e);
constructor() {
this.worker = Comlink.wrap<CustomPolicy>(
new Worker(
new URL('./policy.worker.ts', import.meta.url),
{
type: 'module',
deno: {
permissions: {
read: [Conf.denoDir, Conf.policy, Conf.dataDir],
write: [Conf.dataDir],
net: 'inherit',
env: false,
},
},
},
),
);
this.ready = this.init();
}
async call(event: NostrEvent, signal?: AbortSignal): Promise<NostrRelayOK> {
await this.ready;
if (!this.enabled) {
return ['OK', event.id, true, ''];
}
return this.worker.call(event, signal);
}
private async init(): Promise<void> {
try {
await this.worker.init({
path: Conf.policy,
databaseUrl: Conf.databaseUrl,
adminPubkey: Conf.pubkey,
});
console.warn(`Using custom policy: ${Conf.policy}`);
} catch (e) {
if (e instanceof Error && e.message.includes('Module not found')) {
console.warn('Custom policy not found <https://docs.soapbox.pub/ditto/policies/>');
this.enabled = false;
return;
}
if (e instanceof Error && e.message.includes('PGlite is not supported in worker threads')) {
console.warn('Custom policies are not supported with PGlite. The policy is disabled.');
this.enabled = false;
return;
}
throw new Error(`DITTO_POLICY (error importing policy): ${Conf.policy}`);
}
}
}
export const policyWorker = new PolicyWorker();

View file

@ -1,6 +1,6 @@
import 'deno-safe-fetch/load';
import { NostrEvent, NostrRelayOK, NPolicy } from '@nostrify/nostrify';
import { NoOpPolicy, ReadOnlyPolicy } from '@nostrify/policies';
import { ReadOnlyPolicy } from '@nostrify/policies';
import * as Comlink from 'comlink';
import { DittoDB } from '@/db/DittoDB.ts';
@ -15,8 +15,6 @@ Deno.env = new Map<string, string>();
interface PolicyInit {
/** Path to the policy module (https, jsr, file, etc) */
path: string;
/** Current working directory. */
cwd: string;
/** Database URL to connect to. */
databaseUrl: string;
/** Admin pubkey to use for EventsDB checks. */
@ -31,10 +29,8 @@ export class CustomPolicy implements NPolicy {
return this.policy.call(event, signal);
}
async init({ path, cwd, databaseUrl, adminPubkey }: PolicyInit): Promise<void> {
// HACK: PGlite uses `path.resolve`, which requires read permission on Deno (which we don't want to give).
// We can work around this getting the cwd from the caller and overwriting `Deno.cwd`.
Deno.cwd = () => cwd;
async init({ path, databaseUrl, adminPubkey }: PolicyInit): Promise<void> {
const Policy = (await import(path)).default;
const { kysely } = DittoDB.create(databaseUrl, { poolSize: 1 });
@ -44,15 +40,7 @@ export class CustomPolicy implements NPolicy {
timeout: 1_000,
});
try {
const Policy = (await import(path)).default;
this.policy = new Policy({ store });
} catch (e: any) {
if (e.message.includes('Module not found')) {
this.policy = new NoOpPolicy();
}
throw e;
}
this.policy = new Policy({ store });
}
}