mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 11:29:46 +00:00
Remove fetchWorker
This commit is contained in:
parent
af262b5d52
commit
838f773b84
12 changed files with 17 additions and 214 deletions
|
|
@ -1,6 +1,7 @@
|
|||
import { safeFetch } from '@soapbox/safe-fetch';
|
||||
|
||||
import { AppMiddleware } from '@/app.ts';
|
||||
import { Conf } from '@/config.ts';
|
||||
import { fetchWorker } from '@/workers/fetch.ts';
|
||||
import { DeepLTranslator } from '@/translators/DeepLTranslator.ts';
|
||||
import { LibreTranslateTranslator } from '@/translators/LibreTranslateTranslator.ts';
|
||||
|
||||
|
|
@ -10,7 +11,7 @@ export const translatorMiddleware: AppMiddleware = async (c, next) => {
|
|||
case 'deepl': {
|
||||
const { deeplApiKey: apiKey, deeplBaseUrl: baseUrl } = Conf;
|
||||
if (apiKey) {
|
||||
c.set('translator', new DeepLTranslator({ baseUrl, apiKey, fetch: fetchWorker }));
|
||||
c.set('translator', new DeepLTranslator({ baseUrl, apiKey, fetch: safeFetch }));
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
|
@ -18,7 +19,7 @@ export const translatorMiddleware: AppMiddleware = async (c, next) => {
|
|||
case 'libretranslate': {
|
||||
const { libretranslateApiKey: apiKey, libretranslateBaseUrl: baseUrl } = Conf;
|
||||
if (apiKey) {
|
||||
c.set('translator', new LibreTranslateTranslator({ baseUrl, apiKey, fetch: fetchWorker }));
|
||||
c.set('translator', new LibreTranslateTranslator({ baseUrl, apiKey, fetch: safeFetch }));
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,11 @@
|
|||
import { BlossomUploader, NostrBuildUploader } from '@nostrify/nostrify/uploaders';
|
||||
import { safeFetch } from '@soapbox/safe-fetch';
|
||||
|
||||
import { AppMiddleware } from '@/app.ts';
|
||||
import { Conf } from '@/config.ts';
|
||||
import { DenoUploader } from '@/uploaders/DenoUploader.ts';
|
||||
import { IPFSUploader } from '@/uploaders/IPFSUploader.ts';
|
||||
import { S3Uploader } from '@/uploaders/S3Uploader.ts';
|
||||
import { fetchWorker } from '@/workers/fetch.ts';
|
||||
|
||||
/** Set an uploader for the user. */
|
||||
export const uploaderMiddleware: AppMiddleware = async (c, next) => {
|
||||
|
|
@ -29,17 +29,17 @@ export const uploaderMiddleware: AppMiddleware = async (c, next) => {
|
|||
);
|
||||
break;
|
||||
case 'ipfs':
|
||||
c.set('uploader', new IPFSUploader({ baseUrl: Conf.mediaDomain, apiUrl: Conf.ipfs.apiUrl, fetch: fetchWorker }));
|
||||
c.set('uploader', new IPFSUploader({ baseUrl: Conf.mediaDomain, apiUrl: Conf.ipfs.apiUrl, fetch: safeFetch }));
|
||||
break;
|
||||
case 'local':
|
||||
c.set('uploader', new DenoUploader({ baseUrl: Conf.mediaDomain, dir: Conf.uploadsDir }));
|
||||
break;
|
||||
case 'nostrbuild':
|
||||
c.set('uploader', new NostrBuildUploader({ endpoint: Conf.nostrbuildEndpoint, signer, fetch: fetchWorker }));
|
||||
c.set('uploader', new NostrBuildUploader({ endpoint: Conf.nostrbuildEndpoint, signer, fetch: safeFetch }));
|
||||
break;
|
||||
case 'blossom':
|
||||
if (signer) {
|
||||
c.set('uploader', new BlossomUploader({ servers: Conf.blossomServers, signer, fetch: fetchWorker }));
|
||||
c.set('uploader', new BlossomUploader({ servers: Conf.blossomServers, signer, fetch: safeFetch }));
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,11 @@
|
|||
import { DOMParser } from '@b-fuze/deno-dom';
|
||||
import { logi } from '@soapbox/logi';
|
||||
import { safeFetch } from '@soapbox/safe-fetch';
|
||||
import tldts from 'tldts';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
import { cachedFaviconsSizeGauge } from '@/metrics.ts';
|
||||
import { SimpleLRU } from '@/utils/SimpleLRU.ts';
|
||||
import { fetchWorker } from '@/workers/fetch.ts';
|
||||
|
||||
const faviconCache = new SimpleLRU<string, URL>(
|
||||
async (domain, { signal }) => {
|
||||
|
|
@ -17,7 +17,7 @@ const faviconCache = new SimpleLRU<string, URL>(
|
|||
}
|
||||
|
||||
const rootUrl = new URL('/', `https://${domain}/`);
|
||||
const response = await fetchWorker(rootUrl, { signal });
|
||||
const response = await safeFetch(rootUrl, { signal });
|
||||
const html = await response.text();
|
||||
|
||||
const doc = new DOMParser().parseFromString(html, 'text/html');
|
||||
|
|
|
|||
|
|
@ -1,19 +1,19 @@
|
|||
import { NostrEvent } from '@nostrify/nostrify';
|
||||
import { LNURL, LNURLDetails } from '@nostrify/nostrify/ln';
|
||||
import { logi } from '@soapbox/logi';
|
||||
import { safeFetch } from '@soapbox/safe-fetch';
|
||||
import { JsonValue } from '@std/json';
|
||||
|
||||
import { cachedLnurlsSizeGauge } from '@/metrics.ts';
|
||||
import { SimpleLRU } from '@/utils/SimpleLRU.ts';
|
||||
import { errorJson } from '@/utils/log.ts';
|
||||
import { Time } from '@/utils/time.ts';
|
||||
import { fetchWorker } from '@/workers/fetch.ts';
|
||||
|
||||
const lnurlCache = new SimpleLRU<string, LNURLDetails>(
|
||||
async (lnurl, { signal }) => {
|
||||
logi({ level: 'info', ns: 'ditto.lnurl', lnurl, state: 'started' });
|
||||
try {
|
||||
const details = await LNURL.lookup(lnurl, { fetch: fetchWorker, signal });
|
||||
const details = await LNURL.lookup(lnurl, { fetch: safeFetch, signal });
|
||||
logi({ level: 'info', ns: 'ditto.lnurl', lnurl, state: 'found', details: details as unknown as JsonValue });
|
||||
return details;
|
||||
} catch (e) {
|
||||
|
|
@ -62,7 +62,7 @@ async function getInvoice(params: CallbackParams, signal?: AbortSignal): Promise
|
|||
const { pr } = await LNURL.callback(
|
||||
details.callback,
|
||||
params,
|
||||
{ fetch: fetchWorker, signal },
|
||||
{ fetch: safeFetch, signal },
|
||||
);
|
||||
|
||||
return pr;
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import { nip19 } from 'nostr-tools';
|
||||
import { NIP05, NStore } from '@nostrify/nostrify';
|
||||
import { logi } from '@soapbox/logi';
|
||||
import { safeFetch } from '@soapbox/safe-fetch';
|
||||
import tldts from 'tldts';
|
||||
|
||||
import { Conf } from '@/config.ts';
|
||||
|
|
@ -9,7 +10,6 @@ import { Storages } from '@/storages.ts';
|
|||
import { errorJson } from '@/utils/log.ts';
|
||||
import { SimpleLRU } from '@/utils/SimpleLRU.ts';
|
||||
import { Nip05, parseNip05 } from '@/utils.ts';
|
||||
import { fetchWorker } from '@/workers/fetch.ts';
|
||||
|
||||
const nip05Cache = new SimpleLRU<string, nip19.ProfilePointer>(
|
||||
async (nip05, { signal }) => {
|
||||
|
|
@ -34,7 +34,7 @@ const nip05Cache = new SimpleLRU<string, nip19.ProfilePointer>(
|
|||
throw new Error(`Not found: ${nip05}`);
|
||||
}
|
||||
} else {
|
||||
const result = await NIP05.lookup(nip05, { fetch: fetchWorker, signal });
|
||||
const result = await NIP05.lookup(nip05, { fetch: safeFetch, signal });
|
||||
logi({ level: 'info', ns: 'ditto.nip05', nip05, state: 'found', pubkey: result.pubkey });
|
||||
return result;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import TTLCache from '@isaacs/ttlcache';
|
||||
import { logi } from '@soapbox/logi';
|
||||
import { safeFetch } from '@soapbox/safe-fetch';
|
||||
import DOMPurify from 'isomorphic-dompurify';
|
||||
import { unfurl } from 'unfurl.js';
|
||||
|
||||
|
|
@ -7,13 +8,12 @@ import { Conf } from '@/config.ts';
|
|||
import { PreviewCard } from '@/entities/PreviewCard.ts';
|
||||
import { cachedLinkPreviewSizeGauge } from '@/metrics.ts';
|
||||
import { errorJson } from '@/utils/log.ts';
|
||||
import { fetchWorker } from '@/workers/fetch.ts';
|
||||
|
||||
async function unfurlCard(url: string, signal: AbortSignal): Promise<PreviewCard | null> {
|
||||
try {
|
||||
const result = await unfurl(url, {
|
||||
fetch: (url) =>
|
||||
fetchWorker(url, {
|
||||
safeFetch(url, {
|
||||
headers: {
|
||||
'Accept': 'text/html, application/xhtml+xml',
|
||||
'User-Agent': Conf.fetchUserAgent,
|
||||
|
|
|
|||
|
|
@ -1,29 +0,0 @@
|
|||
import { assertEquals, assertRejects } from '@std/assert';
|
||||
|
||||
import { fetchWorker } from '@/workers/fetch.ts';
|
||||
|
||||
Deno.test({
|
||||
name: 'fetchWorker',
|
||||
async fn() {
|
||||
const response = await fetchWorker('https://httpbingo.org/get');
|
||||
const json = await response.json();
|
||||
assertEquals(json.headers.Host, ['httpbingo.org']);
|
||||
},
|
||||
sanitizeResources: false,
|
||||
});
|
||||
|
||||
Deno.test({
|
||||
name: 'fetchWorker with AbortSignal',
|
||||
async fn() {
|
||||
const controller = new AbortController();
|
||||
const signal = controller.signal;
|
||||
|
||||
setTimeout(() => controller.abort(), 100);
|
||||
assertRejects(() => fetchWorker('https://httpbingo.org/delay/10', { signal }));
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
signal.addEventListener('abort', () => resolve(), { once: true });
|
||||
});
|
||||
},
|
||||
sanitizeResources: false,
|
||||
});
|
||||
|
|
@ -1,86 +0,0 @@
|
|||
import * as Comlink from 'comlink';
|
||||
|
||||
import { FetchWorker } from './fetch.worker.ts';
|
||||
import './handlers/abortsignal.ts';
|
||||
|
||||
import { fetchResponsesCounter } from '@/metrics.ts';
|
||||
|
||||
const worker = new Worker(new URL('./fetch.worker.ts', import.meta.url), { type: 'module', name: 'fetchWorker' });
|
||||
const client = Comlink.wrap<typeof FetchWorker>(worker);
|
||||
|
||||
// Wait for the worker to be ready before we start using it.
|
||||
const ready = new Promise<void>((resolve) => {
|
||||
const handleEvent = () => {
|
||||
self.removeEventListener('message', handleEvent);
|
||||
resolve();
|
||||
};
|
||||
worker.addEventListener('message', handleEvent);
|
||||
});
|
||||
|
||||
/**
|
||||
* Fetch implementation with a Web Worker.
|
||||
* Calling this performs the fetch in a separate CPU thread so it doesn't block the main thread.
|
||||
*/
|
||||
const fetchWorker: typeof fetch = async (...args) => {
|
||||
await ready;
|
||||
|
||||
const [url, init] = serializeFetchArgs(args);
|
||||
const { body, signal, ...rest } = init;
|
||||
|
||||
const result = await client.fetch(url, { ...rest, body: await prepareBodyForWorker(body) }, signal);
|
||||
const response = new Response(...result);
|
||||
|
||||
const { method } = init;
|
||||
const { status } = response;
|
||||
fetchResponsesCounter.inc({ method, status });
|
||||
|
||||
return response;
|
||||
};
|
||||
|
||||
/** Take arguments to `fetch`, and turn them into something we can send over Comlink. */
|
||||
function serializeFetchArgs(args: Parameters<typeof fetch>): [string, RequestInit] {
|
||||
const request = normalizeRequest(args);
|
||||
const init = requestToInit(request);
|
||||
return [request.url, init];
|
||||
}
|
||||
|
||||
/** Get a `Request` object from arguments to `fetch`. */
|
||||
function normalizeRequest(args: Parameters<typeof fetch>): Request {
|
||||
return new Request(...args);
|
||||
}
|
||||
|
||||
/** Get the body as a type we can transfer over Web Workers. */
|
||||
async function prepareBodyForWorker(
|
||||
body: BodyInit | undefined | null,
|
||||
): Promise<ArrayBuffer | Blob | string | undefined | null> {
|
||||
if (!body || typeof body === 'string' || body instanceof ArrayBuffer || body instanceof Blob) {
|
||||
return body;
|
||||
} else {
|
||||
const response = new Response(body);
|
||||
return await response.arrayBuffer();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a `Request` object into its serialized `RequestInit` format.
|
||||
* `RequestInit` is a subset of `Request`, just lacking helper methods like `json()`,
|
||||
* making it easier to serialize (exceptions: `body` and `signal`).
|
||||
*/
|
||||
function requestToInit(request: Request): RequestInit {
|
||||
return {
|
||||
method: request.method,
|
||||
headers: [...request.headers.entries()],
|
||||
body: request.body,
|
||||
referrer: request.referrer,
|
||||
referrerPolicy: request.referrerPolicy,
|
||||
mode: request.mode,
|
||||
credentials: request.credentials,
|
||||
cache: request.cache,
|
||||
redirect: request.redirect,
|
||||
integrity: request.integrity,
|
||||
keepalive: request.keepalive,
|
||||
signal: request.signal,
|
||||
};
|
||||
}
|
||||
|
||||
export { fetchWorker };
|
||||
|
|
@ -1,33 +0,0 @@
|
|||
/// <reference lib="webworker" />
|
||||
|
||||
import { safeFetch } from '@soapbox/safe-fetch';
|
||||
import { logi } from '@soapbox/logi';
|
||||
import * as Comlink from 'comlink';
|
||||
|
||||
import '@/workers/handlers/abortsignal.ts';
|
||||
import '@/sentry.ts';
|
||||
|
||||
export const FetchWorker = {
|
||||
async fetch(
|
||||
url: string,
|
||||
init: Omit<RequestInit, 'signal'>,
|
||||
signal: AbortSignal | null | undefined,
|
||||
): Promise<[BodyInit, ResponseInit]> {
|
||||
logi({ level: 'debug', ns: 'ditto.fetch', method: init.method ?? 'GET', url });
|
||||
|
||||
const response = await safeFetch(url, { ...init, signal });
|
||||
|
||||
return [
|
||||
await response.arrayBuffer(),
|
||||
{
|
||||
status: response.status,
|
||||
statusText: response.statusText,
|
||||
headers: [...response.headers.entries()],
|
||||
},
|
||||
];
|
||||
},
|
||||
};
|
||||
|
||||
Comlink.expose(FetchWorker);
|
||||
|
||||
self.postMessage('ready');
|
||||
|
|
@ -1,46 +0,0 @@
|
|||
import * as Comlink from 'comlink';
|
||||
|
||||
const signalFinalizers = new FinalizationRegistry((port: MessagePort) => {
|
||||
port.postMessage(null);
|
||||
port.close();
|
||||
});
|
||||
|
||||
Comlink.transferHandlers.set('abortsignal', {
|
||||
canHandle(value) {
|
||||
return value instanceof AbortSignal || value?.constructor?.name === 'AbortSignal';
|
||||
},
|
||||
serialize(signal) {
|
||||
if (signal.aborted) {
|
||||
return [{ aborted: true }];
|
||||
}
|
||||
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
signal.addEventListener(
|
||||
'abort',
|
||||
() => port1.postMessage({ reason: signal.reason }),
|
||||
{ once: true },
|
||||
);
|
||||
|
||||
signalFinalizers?.register(signal, port1);
|
||||
|
||||
return [{ aborted: false, port: port2 }, [port2]];
|
||||
},
|
||||
deserialize({ aborted, port }) {
|
||||
if (aborted || !port) {
|
||||
return AbortSignal.abort();
|
||||
}
|
||||
|
||||
const ctrl = new AbortController();
|
||||
|
||||
port.addEventListener('message', (ev) => {
|
||||
if (ev.data && 'reason' in ev.data) {
|
||||
ctrl.abort(ev.data.reason);
|
||||
}
|
||||
port.close();
|
||||
}, { once: true });
|
||||
|
||||
port.start();
|
||||
|
||||
return ctrl.signal;
|
||||
},
|
||||
} as Comlink.TransferHandler<AbortSignal, { aborted: boolean; port?: MessagePort }>);
|
||||
|
|
@ -5,8 +5,6 @@ import * as Comlink from 'comlink';
|
|||
import { Conf } from '@/config.ts';
|
||||
import type { CustomPolicy } from '@/workers/policy.worker.ts';
|
||||
|
||||
import '@/workers/handlers/abortsignal.ts';
|
||||
|
||||
class PolicyWorker implements NPolicy {
|
||||
private worker: Comlink.Remote<CustomPolicy>;
|
||||
private ready: Promise<void>;
|
||||
|
|
|
|||
|
|
@ -6,8 +6,6 @@ import * as Comlink from 'comlink';
|
|||
import { DittoDB } from '@/db/DittoDB.ts';
|
||||
import { EventsDB } from '@/storages/EventsDB.ts';
|
||||
|
||||
import '@/workers/handlers/abortsignal.ts';
|
||||
|
||||
// @ts-ignore Don't try to access the env from this worker.
|
||||
Deno.env = new Map<string, string>();
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue