diff --git a/http/index.mjs b/http/index.mjs index 1e38c3d4..172b1d1a 100644 --- a/http/index.mjs +++ b/http/index.mjs @@ -1,6 +1,6 @@ import { createServer } from 'node:http'; import cluster from 'node:cluster'; -import os from 'node:os'; +import { availableParallelism } from 'node:os'; import 'dotenv/config'; @@ -9,7 +9,7 @@ import getEnv from './env-binding.mjs'; import cacheMachine from '../utils/cache-machine.mjs'; const port = process.env.PORT ?? 8788; -const workerCount = parseInt(process.env.WORKERS ?? String(os.cpus().length - 1)); +const workerCount = parseInt(process.env.WORKERS ?? String(Math.max(availableParallelism() - 1, 1))); /*process.on('uncaughtException', (error) => { console.error('Uncaught Exception', error.stack); @@ -118,7 +118,7 @@ if (cluster.isPrimary && workerCount > 0) { } if (response) { const worker = cluster.workers[id]; - if (worker?.isConnected()) { + if (worker && worker.isConnected() && !worker.isDead()) { cluster.workers[id].send(response); } } diff --git a/index.mjs b/index.mjs index ec9feb27..097a2c75 100644 --- a/index.mjs +++ b/index.mjs @@ -177,10 +177,6 @@ async function graphqlHandler(request, env, ctx) { if (env.SKIP_CACHE !== 'true' && ttl > 0) { key = key ?? await cacheMachine.createKey(env, query, variables, specialCache); ctx.waitUntil(cacheMachine.put(env, body, {key})); - // using waitUntil doesn't hold up returning a response but keeps the worker alive as long as needed - /*const safePut = await cacheMachine.safePut(env, body, {key}); - console.log(safePut); - ctx.waitUntil(safePut.fetchRequest);*/ } return response; diff --git a/plugins/plugin-twitch.mjs b/plugins/plugin-twitch.mjs index 68fcdb00..0d54ec2f 100644 --- a/plugins/plugin-twitch.mjs +++ b/plugins/plugin-twitch.mjs @@ -10,6 +10,12 @@ export async function getTwitchResponse(env) { 'Client-ID': env.TWITCH_CLIENT_ID, }, }); + + if (response.status !== 200) { + response.body.cancel(); + throw new Error(`Invalid response code ${response.status}`); + } + const twitchJson = JSON.stringify(await response.json(), null, 2); return new Response(twitchJson, { diff --git a/utils/cache-machine.mjs b/utils/cache-machine.mjs index 93d0a835..69a56331 100644 --- a/utils/cache-machine.mjs +++ b/utils/cache-machine.mjs @@ -6,16 +6,24 @@ const cacheUrl = 'https://cache.tarkov.dev' let cacheFailCount = 0; let cachePaused = false; -function pauseCache() { +function cacheIsPaused() { + if (!cachePaused) { + return false; + } + const coolDownExpired = new Date() - cachePaused > 60000; + if (coolDownExpired) { + cachePaused = false; + return false; + } + return true; +} + +function cacheRequestFail() { cacheFailCount++; - if (cacheFailCount <= 4) { + if (cacheFailCount <= 4 || cacheIsPaused()) { return; } - cachePaused = true; - setTimeout(() => { - cachePaused = false; - cacheFailCount = 0; - }, 60000); + cachePaused = new Date(); } // Helper function to create a hash from a string @@ -52,7 +60,7 @@ const cacheMachine = { console.warn('env.CACHE_BASIC_AUTH is not set; skipping cache check'); return false; } - if (cachePaused) { + if (cacheIsPaused()) { console.warn('Cache paused; skipping cache check'); return false; } @@ -73,6 +81,7 @@ const cacheMachine = { }, }); cacheFailCount = 0; + response.body.cancel(); if (response.status === 200) { return await response.json(); } else if (response.status !== 404) { @@ -83,7 +92,7 @@ const cacheMachine = { } catch (error) { if (error.message === 'The operation was aborted due to timeout') { console.warn('Checking cache timed out'); - pauseCache(); + cacheRequestFail(); return false; } console.error('checkCache error: ' + error.message); @@ -100,7 +109,7 @@ const cacheMachine = { console.warn('env.CACHE_BASIC_AUTH is not set; skipping cache put'); return false; } - if (cachePaused) { + if (cacheIsPaused()) { console.warn('Cache paused; skipping cache update'); return false; } @@ -130,6 +139,7 @@ const cacheMachine = { // Log non-200 responses if (response.status !== 200) { + response.body.cancel(); console.error(`failed to write to cache: ${response.status}`); return false } @@ -138,7 +148,7 @@ const cacheMachine = { } catch (error) { if (error.message === 'The operation was aborted due to timeout') { console.warn('Updating cache timed out'); - pauseCache(); + cacheRequestFail(); return false; } console.error('updateCache error: ' + error.message); diff --git a/utils/worker-kv.mjs b/utils/worker-kv.mjs index ebdf350d..b881c4f5 100644 --- a/utils/worker-kv.mjs +++ b/utils/worker-kv.mjs @@ -70,41 +70,39 @@ class WorkerKV { //console.log(`${requestKv} loading`); } this.loading[gameMode] = true; - this.loadingPromises[gameMode][requestId] = new Promise((resolve, reject) => { - const startLoad = new Date(); - this.dataSource.env.DATA_CACHE.getWithMetadata(requestKv, 'text').then(response => { - console.log(`${requestKv} load: ${new Date() - startLoad} ms`); - const metadata = response.metadata; - let responseValue = response.value; - if (metadata && metadata.compression) { - return reject(new Error(`${metadata.compression} compression is not supported`)); - } - const parsedValue = JSON.parse(responseValue); - if (!parsedValue && requestKv !== this.kvName) { - console.warn(`${requestKv} data not found; falling back to ${this.kvName}`); - this.loading[gameMode] = false; - delete this.loadingPromises[gameMode][requestId]; - return resolve(this.getCache(context, info, true)); - } - this.cache[gameMode] = parsedValue; - let newDataExpires = false; - if (this.cache[gameMode]?.expiration) { - newDataExpires = new Date(this.cache[gameMode].expiration).valueOf(); - } - if (newDataExpires && this.dataExpires === newDataExpires) { - console.log(`${requestKv} is still stale after re-load`); - } - this.lastRefresh[gameMode] = new Date(); - this.dataExpires[gameMode] = newDataExpires; - this.dataSource.setKvLoadedForRequest(requestKv, requestId); + const startLoad = new Date(); + this.loadingPromises[gameMode][requestId] = this.dataSource.env.DATA_CACHE.getWithMetadata(requestKv, 'text').then(response => { + console.log(`${requestKv} load: ${new Date() - startLoad} ms`); + const metadata = response.metadata; + let responseValue = response.value; + if (metadata && metadata.compression) { + return Promise.reject(new Error(`${metadata.compression} compression is not supported`)); + } + const parsedValue = JSON.parse(responseValue); + if (!parsedValue && requestKv !== this.kvName) { + console.warn(`${requestKv} data not found; falling back to ${this.kvName}`); this.loading[gameMode] = false; delete this.loadingPromises[gameMode][requestId]; - this.postLoad({cache: this.cache[gameMode], gameMode}); - resolve({cache: this.cache[gameMode], gameMode}); - }).catch(error => { - this.loading[gameMode] = false; - reject(error); - }); + return this.getCache(context, info, true); + } + this.cache[gameMode] = parsedValue; + let newDataExpires = false; + if (this.cache[gameMode]?.expiration) { + newDataExpires = new Date(this.cache[gameMode].expiration).valueOf(); + } + if (newDataExpires && this.dataExpires === newDataExpires) { + console.log(`${requestKv} is still stale after re-load`); + } + this.lastRefresh[gameMode] = new Date(); + this.dataExpires[gameMode] = newDataExpires; + this.dataSource.setKvLoadedForRequest(requestKv, requestId); + this.loading[gameMode] = false; + delete this.loadingPromises[gameMode][requestId]; + this.postLoad({cache: this.cache[gameMode], gameMode}); + return {cache: this.cache[gameMode], gameMode}; + }).catch(error => { + this.loading[gameMode] = false; + return Promise.reject(error); }); return this.loadingPromises[gameMode][requestId]; }