mirror of
https://gitlab.com/soapbox-pub/ditto.git
synced 2025-12-06 03:19:46 +00:00
SimpleLRU: respect AbortSignal
This commit is contained in:
parent
a597eae674
commit
16f3a13364
2 changed files with 35 additions and 30 deletions
|
|
@ -4,7 +4,7 @@ import { assertEquals, assertRejects } from '@std/assert';
|
|||
Deno.test("SimpleLRU doesn't repeat failed calls", async () => {
|
||||
let calls = 0;
|
||||
|
||||
const cache = new SimpleLRU(
|
||||
using cache = new SimpleLRU(
|
||||
// deno-lint-ignore require-await
|
||||
async () => {
|
||||
calls++;
|
||||
|
|
|
|||
|
|
@ -3,50 +3,55 @@
|
|||
import { LRUCache } from 'lru-cache';
|
||||
import { type Gauge } from 'prom-client';
|
||||
|
||||
type FetchFn<K extends {}, V extends {}, O extends {}> = (key: K, opts: O) => Promise<V>;
|
||||
|
||||
interface FetchFnOpts {
|
||||
signal?: AbortSignal | null;
|
||||
}
|
||||
type FetchFn<K extends {}, V extends {}> = (key: K, opts: { signal?: AbortSignal }) => Promise<V>;
|
||||
|
||||
type SimpleLRUOpts<K extends {}, V extends {}> = LRUCache.Options<K, V, void> & {
|
||||
gauge?: Gauge;
|
||||
errorRefresh?: number;
|
||||
};
|
||||
|
||||
export class SimpleLRU<
|
||||
K extends {},
|
||||
V extends {},
|
||||
O extends {} = FetchFnOpts,
|
||||
> {
|
||||
protected cache: LRUCache<K, V, void>;
|
||||
protected cache: LRUCache<K, Promise<V>, void>;
|
||||
private tids = new Set<number>();
|
||||
|
||||
constructor(fetchFn: FetchFn<K, V, { signal: AbortSignal }>, private opts: SimpleLRUOpts<K, V>) {
|
||||
this.cache = new LRUCache({
|
||||
async fetchMethod(key, _staleValue, { signal }) {
|
||||
try {
|
||||
return await fetchFn(key, { signal: signal as unknown as AbortSignal });
|
||||
} catch {
|
||||
return null as unknown as V;
|
||||
}
|
||||
},
|
||||
...opts,
|
||||
});
|
||||
constructor(private fetchFn: FetchFn<K, V>, private opts: SimpleLRUOpts<K, Promise<V>>) {
|
||||
this.cache = new LRUCache({ ...opts });
|
||||
}
|
||||
|
||||
async fetch(key: K, opts?: O): Promise<V> {
|
||||
const result = await this.cache.fetch(key, opts);
|
||||
|
||||
this.opts.gauge?.set(this.cache.size);
|
||||
|
||||
if (result === undefined || result === null) {
|
||||
throw new Error('SimpleLRU: fetch failed');
|
||||
async fetch(key: K, opts?: { signal?: AbortSignal }): Promise<V> {
|
||||
if (opts?.signal?.aborted) {
|
||||
throw new DOMException('The signal has been aborted', 'AbortError');
|
||||
}
|
||||
|
||||
return result;
|
||||
const cached = await this.cache.get(key);
|
||||
|
||||
if (cached) {
|
||||
return cached;
|
||||
}
|
||||
|
||||
const promise = this.fetchFn(key, { signal: opts?.signal });
|
||||
|
||||
this.cache.set(key, promise);
|
||||
|
||||
promise.then(() => {
|
||||
this.opts.gauge?.set(this.cache.size);
|
||||
}).catch(() => {
|
||||
const tid = setTimeout(() => {
|
||||
this.cache.delete(key);
|
||||
this.tids.delete(tid);
|
||||
}, this.opts.errorRefresh ?? 10_000);
|
||||
this.tids.add(tid);
|
||||
});
|
||||
|
||||
return promise;
|
||||
}
|
||||
|
||||
put(key: K, value: V): Promise<void> {
|
||||
this.cache.set(key, value);
|
||||
return Promise.resolve();
|
||||
[Symbol.dispose](): void {
|
||||
for (const tid of this.tids) {
|
||||
clearTimeout(tid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue