Let SQLite transfer asyncIterables

This commit is contained in:
Alex Gleason 2024-08-02 16:45:41 -05:00
parent 9543049419
commit 3bec54ee76
No known key found for this signature in database
GPG key ID: 7211D1F99744FBB7
4 changed files with 31 additions and 2 deletions

View file

@ -44,6 +44,7 @@
"@std/media-types": "jsr:@std/media-types@^0.224.1", "@std/media-types": "jsr:@std/media-types@^0.224.1",
"@std/streams": "jsr:@std/streams@^0.223.0", "@std/streams": "jsr:@std/streams@^0.223.0",
"comlink": "npm:comlink@^4.4.1", "comlink": "npm:comlink@^4.4.1",
"comlink-async-generator": "npm:comlink-async-generator@^0.0.1",
"deno-safe-fetch/load": "https://gitlab.com/soapbox-pub/deno-safe-fetch/-/raw/v1.0.0/load.ts", "deno-safe-fetch/load": "https://gitlab.com/soapbox-pub/deno-safe-fetch/-/raw/v1.0.0/load.ts",
"deno.json": "./deno.json", "deno.json": "./deno.json",
"entities": "npm:entities@^4.5.0", "entities": "npm:entities@^4.5.0",

9
deno.lock generated
View file

@ -44,6 +44,8 @@
"npm:@scure/bip32@^1.4.0": "npm:@scure/bip32@1.4.0", "npm:@scure/bip32@^1.4.0": "npm:@scure/bip32@1.4.0",
"npm:@scure/bip39@^1.3.0": "npm:@scure/bip39@1.3.0", "npm:@scure/bip39@^1.3.0": "npm:@scure/bip39@1.3.0",
"npm:@types/node": "npm:@types/node@18.16.19", "npm:@types/node": "npm:@types/node@18.16.19",
"npm:comlink-async-generator": "npm:comlink-async-generator@0.0.1",
"npm:comlink-async-generator@^0.0.1": "npm:comlink-async-generator@0.0.1",
"npm:comlink@^4.4.1": "npm:comlink@4.4.1", "npm:comlink@^4.4.1": "npm:comlink@4.4.1",
"npm:entities@^4.5.0": "npm:entities@4.5.0", "npm:entities@^4.5.0": "npm:entities@4.5.0",
"npm:fast-stable-stringify@^1.0.0": "npm:fast-stable-stringify@1.0.0", "npm:fast-stable-stringify@^1.0.0": "npm:fast-stable-stringify@1.0.0",
@ -446,6 +448,12 @@
"delayed-stream": "delayed-stream@1.0.0" "delayed-stream": "delayed-stream@1.0.0"
} }
}, },
"comlink-async-generator@0.0.1": {
"integrity": "sha512-RjOPv6Tb7cL9FiIgwanUJuFG9aW4myAFyyzxZoEkEegeDQrZqr92d1Njv2WIgi7nbGpTiyy5GdNTUubDaNgZ6A==",
"dependencies": {
"comlink": "comlink@4.4.1"
}
},
"comlink@4.4.1": { "comlink@4.4.1": {
"integrity": "sha512-+1dlx0aY5Jo1vHy/tSsIGpSkN4tS9rZSW8FIhG0JH/crs9wwweswIo/POr451r7bZww3hFbPAKnTpimzL/mm4Q==", "integrity": "sha512-+1dlx0aY5Jo1vHy/tSsIGpSkN4tS9rZSW8FIhG0JH/crs9wwweswIo/POr451r7bZww3hFbPAKnTpimzL/mm4Q==",
"dependencies": {} "dependencies": {}
@ -1807,6 +1815,7 @@
"npm:@isaacs/ttlcache@^1.4.1", "npm:@isaacs/ttlcache@^1.4.1",
"npm:@noble/secp256k1@^2.0.0", "npm:@noble/secp256k1@^2.0.0",
"npm:@scure/base@^1.1.6", "npm:@scure/base@^1.1.6",
"npm:comlink-async-generator@^0.0.1",
"npm:comlink@^4.4.1", "npm:comlink@^4.4.1",
"npm:entities@^4.5.0", "npm:entities@^4.5.0",
"npm:fast-stable-stringify@^1.0.0", "npm:fast-stable-stringify@^1.0.0",

View file

@ -1,8 +1,11 @@
import * as Comlink from 'comlink'; import * as Comlink from 'comlink';
import { asyncGeneratorTransferHandler } from 'comlink-async-generator';
import { CompiledQuery, QueryResult } from 'kysely'; import { CompiledQuery, QueryResult } from 'kysely';
import type { SqliteWorker as _SqliteWorker } from './sqlite.worker.ts'; import type { SqliteWorker as _SqliteWorker } from './sqlite.worker.ts';
Comlink.transferHandlers.set('asyncGenerator', asyncGeneratorTransferHandler);
class SqliteWorker { class SqliteWorker {
#worker: Worker; #worker: Worker;
#client: ReturnType<typeof Comlink.wrap<typeof _SqliteWorker>>; #client: ReturnType<typeof Comlink.wrap<typeof _SqliteWorker>>;
@ -33,8 +36,12 @@ class SqliteWorker {
return this.#client.executeQuery(query) as Promise<QueryResult<R>>; return this.#client.executeQuery(query) as Promise<QueryResult<R>>;
} }
streamQuery<R>(): AsyncIterableIterator<R> { async *streamQuery<R>(query: CompiledQuery): AsyncIterableIterator<QueryResult<R>> {
throw new Error('Streaming queries are not supported in the web worker'); await this.#ready;
for await (const result of await this.#client.streamQuery(query)) {
yield result as QueryResult<R>;
}
} }
destroy(): Promise<void> { destroy(): Promise<void> {

View file

@ -2,6 +2,7 @@
import { Database as SQLite } from '@db/sqlite'; import { Database as SQLite } from '@db/sqlite';
import * as Comlink from 'comlink'; import * as Comlink from 'comlink';
import { CompiledQuery, QueryResult } from 'kysely'; import { CompiledQuery, QueryResult } from 'kysely';
import { asyncGeneratorTransferHandler } from 'comlink-async-generator';
import '@/sentry.ts'; import '@/sentry.ts';
@ -20,11 +21,22 @@ export const SqliteWorker = {
insertId: BigInt(db!.lastInsertRowId), insertId: BigInt(db!.lastInsertRowId),
}; };
}, },
async *streamQuery<R>({ sql, parameters }: CompiledQuery): AsyncIterableIterator<QueryResult<R>> {
if (!db) throw new Error('Database not open');
const stmt = db.prepare(sql).bind(...parameters as any[]);
for (const row of stmt) {
yield {
rows: [row],
};
}
},
destroy() { destroy() {
db?.close(); db?.close();
}, },
}; };
Comlink.transferHandlers.set('asyncGenerator', asyncGeneratorTransferHandler);
Comlink.expose(SqliteWorker); Comlink.expose(SqliteWorker);
self.postMessage(['ready']); self.postMessage(['ready']);