diff --git a/deno.json b/deno.json index bbd2f8f7..1f554639 100644 --- a/deno.json +++ b/deno.json @@ -44,6 +44,7 @@ "@std/media-types": "jsr:@std/media-types@^0.224.1", "@std/streams": "jsr:@std/streams@^0.223.0", "comlink": "npm:comlink@^4.4.1", + "comlink-async-generator": "npm:comlink-async-generator@^0.0.1", "deno-safe-fetch/load": "https://gitlab.com/soapbox-pub/deno-safe-fetch/-/raw/v1.0.0/load.ts", "deno.json": "./deno.json", "entities": "npm:entities@^4.5.0", diff --git a/deno.lock b/deno.lock index 1687f132..8565f781 100644 --- a/deno.lock +++ b/deno.lock @@ -44,6 +44,8 @@ "npm:@scure/bip32@^1.4.0": "npm:@scure/bip32@1.4.0", "npm:@scure/bip39@^1.3.0": "npm:@scure/bip39@1.3.0", "npm:@types/node": "npm:@types/node@18.16.19", + "npm:comlink-async-generator": "npm:comlink-async-generator@0.0.1", + "npm:comlink-async-generator@^0.0.1": "npm:comlink-async-generator@0.0.1", "npm:comlink@^4.4.1": "npm:comlink@4.4.1", "npm:entities@^4.5.0": "npm:entities@4.5.0", "npm:fast-stable-stringify@^1.0.0": "npm:fast-stable-stringify@1.0.0", @@ -446,6 +448,12 @@ "delayed-stream": "delayed-stream@1.0.0" } }, + "comlink-async-generator@0.0.1": { + "integrity": "sha512-RjOPv6Tb7cL9FiIgwanUJuFG9aW4myAFyyzxZoEkEegeDQrZqr92d1Njv2WIgi7nbGpTiyy5GdNTUubDaNgZ6A==", + "dependencies": { + "comlink": "comlink@4.4.1" + } + }, "comlink@4.4.1": { "integrity": "sha512-+1dlx0aY5Jo1vHy/tSsIGpSkN4tS9rZSW8FIhG0JH/crs9wwweswIo/POr451r7bZww3hFbPAKnTpimzL/mm4Q==", "dependencies": {} @@ -1807,6 +1815,7 @@ "npm:@isaacs/ttlcache@^1.4.1", "npm:@noble/secp256k1@^2.0.0", "npm:@scure/base@^1.1.6", + "npm:comlink-async-generator@^0.0.1", "npm:comlink@^4.4.1", "npm:entities@^4.5.0", "npm:fast-stable-stringify@^1.0.0", diff --git a/src/workers/sqlite.ts b/src/workers/sqlite.ts index 37c33b43..154ec556 100644 --- a/src/workers/sqlite.ts +++ b/src/workers/sqlite.ts @@ -1,8 +1,11 @@ import * as Comlink from 'comlink'; +import { asyncGeneratorTransferHandler } from 'comlink-async-generator'; import { CompiledQuery, QueryResult } from 'kysely'; import type { SqliteWorker as _SqliteWorker } from './sqlite.worker.ts'; +Comlink.transferHandlers.set('asyncGenerator', asyncGeneratorTransferHandler); + class SqliteWorker { #worker: Worker; #client: ReturnType>; @@ -33,8 +36,12 @@ class SqliteWorker { return this.#client.executeQuery(query) as Promise>; } - streamQuery(): AsyncIterableIterator { - throw new Error('Streaming queries are not supported in the web worker'); + async *streamQuery(query: CompiledQuery): AsyncIterableIterator> { + await this.#ready; + + for await (const result of await this.#client.streamQuery(query)) { + yield result as QueryResult; + } } destroy(): Promise { diff --git a/src/workers/sqlite.worker.ts b/src/workers/sqlite.worker.ts index 68c70d6c..23839dbd 100644 --- a/src/workers/sqlite.worker.ts +++ b/src/workers/sqlite.worker.ts @@ -2,6 +2,7 @@ import { Database as SQLite } from '@db/sqlite'; import * as Comlink from 'comlink'; import { CompiledQuery, QueryResult } from 'kysely'; +import { asyncGeneratorTransferHandler } from 'comlink-async-generator'; import '@/sentry.ts'; @@ -20,11 +21,22 @@ export const SqliteWorker = { insertId: BigInt(db!.lastInsertRowId), }; }, + async *streamQuery({ sql, parameters }: CompiledQuery): AsyncIterableIterator> { + if (!db) throw new Error('Database not open'); + + const stmt = db.prepare(sql).bind(...parameters as any[]); + for (const row of stmt) { + yield { + rows: [row], + }; + } + }, destroy() { db?.close(); }, }; +Comlink.transferHandlers.set('asyncGenerator', asyncGeneratorTransferHandler); Comlink.expose(SqliteWorker); self.postMessage(['ready']);