mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 11:29:46 +00:00
Rewrite sqlite worker to use threads.js worker pool
This commit is contained in:
parent
709675754c
commit
9000d4fc0a
5 changed files with 25 additions and 43 deletions
|
|
@ -1,5 +1,5 @@
|
|||
import { Conf } from '@/config.ts';
|
||||
import '@/cron.ts';
|
||||
// import '@/cron.ts';
|
||||
import { type User } from '@/db/users.ts';
|
||||
import {
|
||||
type Context,
|
||||
|
|
|
|||
11
src/db.ts
11
src/db.ts
|
|
@ -2,8 +2,7 @@ import fs from 'node:fs/promises';
|
|||
import path from 'node:path';
|
||||
|
||||
import { FileMigrationProvider, Kysely, Migrator, PolySqliteDialect } from '@/deps.ts';
|
||||
import { Conf } from '@/config.ts';
|
||||
import { getPragma, setPragma } from '@/pragma.ts';
|
||||
import { getPragma } from '@/pragma.ts';
|
||||
import SqliteWorker from '@/workers/sqlite.ts';
|
||||
|
||||
interface DittoDB {
|
||||
|
|
@ -58,7 +57,6 @@ interface UnattachedMediaRow {
|
|||
}
|
||||
|
||||
const sqliteWorker = new SqliteWorker();
|
||||
await sqliteWorker.open(Conf.dbPath);
|
||||
|
||||
const db = new Kysely<DittoDB>({
|
||||
dialect: new PolySqliteDialect({
|
||||
|
|
@ -66,13 +64,6 @@ const db = new Kysely<DittoDB>({
|
|||
}),
|
||||
});
|
||||
|
||||
// Set PRAGMA values.
|
||||
await Promise.all([
|
||||
setPragma(db, 'synchronous', 'normal'),
|
||||
setPragma(db, 'temp_store', 'memory'),
|
||||
setPragma(db, 'mmap_size', Conf.sqlite.mmapSize),
|
||||
]);
|
||||
|
||||
// Log out PRAGMA values for debugging.
|
||||
['journal_mode', 'synchronous', 'temp_store', 'mmap_size'].forEach(async (pragma) => {
|
||||
const value = await getPragma(db, pragma);
|
||||
|
|
|
|||
|
|
@ -79,5 +79,7 @@ export { Machina } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/08a1
|
|||
export * as Sentry from 'https://deno.land/x/sentry@7.78.0/index.js';
|
||||
export { sentry as sentryMiddleware } from 'npm:@hono/sentry@^1.0.0';
|
||||
export * as Comlink from 'npm:comlink@^4.4.1';
|
||||
export { Pool, spawn } from 'npm:threads@^1.7.0';
|
||||
export { expose } from 'npm:threads@^1.7.0/worker';
|
||||
|
||||
export type * as TypeFest from 'npm:type-fest@^4.3.0';
|
||||
|
|
|
|||
|
|
@ -1,40 +1,27 @@
|
|||
import { Comlink } from '@/deps.ts';
|
||||
import { Pool, spawn } from '@/deps.ts';
|
||||
|
||||
import type { SqliteWorker as _SqliteWorker } from './sqlite.worker.ts';
|
||||
import type { CompiledQuery, QueryResult } from '@/deps.ts';
|
||||
|
||||
class SqliteWorker {
|
||||
#worker: Worker;
|
||||
#client: ReturnType<typeof Comlink.wrap<typeof _SqliteWorker>>;
|
||||
#ready: Promise<void>;
|
||||
#pool: Pool<Awaited<ReturnType<typeof spawn<typeof _SqliteWorker>>>>;
|
||||
|
||||
constructor() {
|
||||
this.#worker = new Worker(new URL('./sqlite.worker.ts', import.meta.url).href, { type: 'module' });
|
||||
this.#client = Comlink.wrap<typeof _SqliteWorker>(this.#worker);
|
||||
|
||||
this.#ready = new Promise<void>((resolve) => {
|
||||
const handleEvent = (event: MessageEvent) => {
|
||||
if (event.data[0] === 'ready') {
|
||||
this.#worker.removeEventListener('message', handleEvent);
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
this.#worker.addEventListener('message', handleEvent);
|
||||
});
|
||||
}
|
||||
|
||||
async open(path: string): Promise<void> {
|
||||
await this.#ready;
|
||||
return this.#client.open(path);
|
||||
this.#pool = Pool(
|
||||
() =>
|
||||
spawn<typeof _SqliteWorker>(
|
||||
new Worker(new URL('./sqlite.worker.ts', import.meta.url).href, { type: 'module' }),
|
||||
),
|
||||
{ size: 4 },
|
||||
);
|
||||
}
|
||||
|
||||
async executeQuery<R>(query: CompiledQuery): Promise<QueryResult<R>> {
|
||||
await this.#ready;
|
||||
return this.#client.executeQuery(query) as Promise<QueryResult<R>>;
|
||||
return await this.#pool.queue((worker) => worker.executeQuery(query) as Promise<QueryResult<R>>);
|
||||
}
|
||||
|
||||
destroy(): Promise<void> {
|
||||
return this.#client.destroy();
|
||||
return this.#pool.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,13 +1,17 @@
|
|||
/// <reference lib="webworker" />
|
||||
|
||||
import { Comlink, type CompiledQuery, DenoSqlite3, type QueryResult } from '@/deps.ts';
|
||||
import { Conf } from '@/config.ts';
|
||||
import { type CompiledQuery, DenoSqlite3, expose, type QueryResult } from '@/deps.ts';
|
||||
|
||||
let db: DenoSqlite3 | undefined;
|
||||
const db: DenoSqlite3 = new DenoSqlite3(Conf.dbPath);
|
||||
|
||||
db.exec(`
|
||||
PRAGMA synchronous = normal;
|
||||
PRAGMA temp_store = memory;
|
||||
PRAGMA mmap_size = ${Conf.sqlite.mmapSize};
|
||||
`);
|
||||
|
||||
export const SqliteWorker = {
|
||||
open(path: string): void {
|
||||
db = new DenoSqlite3(path);
|
||||
},
|
||||
executeQuery<R>({ sql, parameters }: CompiledQuery): QueryResult<R> {
|
||||
if (!db) throw new Error('Database not open');
|
||||
return {
|
||||
|
|
@ -21,6 +25,4 @@ export const SqliteWorker = {
|
|||
},
|
||||
};
|
||||
|
||||
Comlink.expose(SqliteWorker);
|
||||
|
||||
self.postMessage(['ready']);
|
||||
expose(SqliteWorker);
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue