Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/health.js
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ const SEED_META = {
vpdTrackerHistorical: { key: 'seed-meta:health:vpd-tracker', maxStaleMin: 2880 }, // shares seed-meta key with vpdTrackerRealtime (same run)
resilienceStaticIndex: { key: 'seed-meta:resilience:static', maxStaleMin: 576000 }, // annual October snapshot; 400d threshold matches TTL and preserves prior-year data on source outages
resilienceStaticFao: { key: 'seed-meta:resilience:static', maxStaleMin: 576000 }, // same seeder + same heartbeat as resilienceStaticIndex; required so EMPTY_DATA_OK + missing data degrades to STALE_SEED instead of silent OK
resilienceRanking: { key: 'seed-meta:resilience:ranking', maxStaleMin: 720 }, // on-demand RPC cache (6h TTL); 12h threshold catches stale rankings without paging on cold start
resilienceRanking: { key: 'seed-meta:resilience:ranking', maxStaleMin: 720 }, // RPC cache (12h TTL, refreshed every 6h by seed-resilience-scores cron via refreshRankingAggregate); 12h staleness threshold = 2 missed cron ticks
resilienceIntervals: { key: 'seed-meta:resilience:intervals', maxStaleMin: 20160 }, // weekly cron; 20160min = 14d = 2x interval
energyExposure: { key: 'seed-meta:economic:owid-energy-mix', maxStaleMin: 50400 }, // monthly cron on 1st; 50400min = 35d = TTL matches cron cadence + 5d buffer
energyMixAll: { key: 'seed-meta:economic:owid-energy-mix', maxStaleMin: 50400 }, // same seed run as energyExposure; shares seed-meta key
Expand Down
197 changes: 96 additions & 101 deletions scripts/seed-resilience-scores.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ const SEED_UA = 'Mozilla/5.0 (compatible; WorldMonitor-Seed/1.0)';

export const RESILIENCE_SCORE_CACHE_PREFIX = 'resilience:score:v9:';
export const RESILIENCE_RANKING_CACHE_KEY = 'resilience:ranking:v9';
export const RESILIENCE_RANKING_CACHE_TTL_SECONDS = 6 * 60 * 60;
// Must match the server-side RESILIENCE_RANKING_CACHE_TTL_SECONDS. Extended
// to 12h (2x the cron interval) so a missed/slow cron can't create an
// EMPTY_ON_DEMAND gap before the next successful rebuild.
export const RESILIENCE_RANKING_CACHE_TTL_SECONDS = 12 * 60 * 60;
export const RESILIENCE_STATIC_INDEX_KEY = 'resilience:static:index:v1';

const INTERVAL_KEY_PREFIX = 'resilience:intervals:v1:';
Expand Down Expand Up @@ -162,9 +165,16 @@ async function seedResilienceScores() {
if (missing > 0) {
console.log(`[resilience-scores] Warming ${missing} missing via ranking endpoint...`);
try {
// ?refresh=1 MUST be set here. The ranking aggregate (12h TTL) routinely
// outlives the per-country score keys (6h TTL), so in the post-6h /
// pre-12h window the handler's cache-hit early-return would fire and
// skip the whole warm path — scores would stay missing, coverage would
// degrade, and only the per-country laggard fallback (or nothing, if
// WM_KEY is absent) would recover. Forcing a recompute routes the call
// through warmMissingResilienceScores and its chunked pipeline SET.
const headers = { 'User-Agent': SEED_UA, 'Accept': 'application/json' };
if (WM_KEY) headers['X-WorldMonitor-Key'] = WM_KEY;
const resp = await fetch(`${API_BASE}/api/resilience/v1/get-resilience-ranking`, {
const resp = await fetch(`${API_BASE}/api/resilience/v1/get-resilience-ranking?refresh=1`, {
headers,
signal: AbortSignal.timeout(60_000),
});
Expand Down Expand Up @@ -216,120 +226,103 @@ async function seedResilienceScores() {
console.log(`[resilience-scores] Laggards warmed: ${laggardsWarmed}/${stillMissing.length}`);
}

// The ranking cache (resilience:ranking:v9) needs to reflect the
// freshly-warmed per-country scores. Two failure modes have to be handled:
//
// 1. Laggards were warmed individually after the bulk RPC. The ranking
// cache (written earlier) froze those countries as coverage-0
// greyedOut entries. Rebuild needed.
//
// 2. The bulk RPC's handler hit a read-after-write race: it called
// warmMissingResilienceScores() (writing 222 per-country keys), then
// its own re-read of those same keys returned an empty Map (Upstash
// pipeline visibility lag in the same Vercel invocation). Result:
// cachedScores.size = 0, every item built with `undefined` payload =
// coverage 0 = all 222 in greyedOut, coverage gate (cachedScores.size
// / countryCodes.length) = 0% < 75% → handler skips the SET → ranking
// cache stays null.
//
// stillMissing is computed from the seeder's OWN pipeline GET (which
// sees the writes), so it correctly reports 0 laggards. The original
// `if (laggardsWarmed > 0)` gate would skip the rebuild — and we'd
// end up with all per-country scores cached but no ranking key.
//
// Fix: rebuild whenever (a) we warmed laggards OR (b) the ranking key is
// null in Redis after the bulk call. Path (b) catches the race; the
// second RPC call sees warm per-country scores in cache and the handler's
// re-read succeeds.
// Inline GET so we can distinguish "key absent" (rebuild needed) from
// "GET failed" (rebuild as a precaution but log it for incident triage).
// The shared redisGetJson() collapses both into null, which would silently
// mask transient Upstash hiccups in the rebuild trigger reason.
let rankingExists = null;
let rankingProbeFailed = false;
try {
const probeResp = await fetch(`${url}/get/${encodeURIComponent(RESILIENCE_RANKING_CACHE_KEY)}`, {
headers: { Authorization: `Bearer ${token}` },
signal: AbortSignal.timeout(5_000),
});
if (!probeResp.ok) {
rankingProbeFailed = true;
console.warn(`[resilience-scores] Ranking probe HTTP ${probeResp.status}; rebuilding as a precaution`);
} else {
const data = await probeResp.json();
rankingExists = data?.result || null;
}
} catch (err) {
rankingProbeFailed = true;
console.warn(`[resilience-scores] Ranking probe failed (${err.message}); rebuilding as a precaution`);
}
if (laggardsWarmed > 0 || rankingExists == null) {
const reason = laggardsWarmed > 0
? `${laggardsWarmed} laggard warms`
: (rankingProbeFailed ? 'ranking probe failed (precautionary)' : 'bulk-call race left ranking:v9 null');
try {
if (laggardsWarmed > 0) {
await redisPipeline(url, token, [['DEL', RESILIENCE_RANKING_CACHE_KEY]]);
}
const rebuildHeaders = { 'User-Agent': SEED_UA, 'Accept': 'application/json' };
if (WM_KEY) rebuildHeaders['X-WorldMonitor-Key'] = WM_KEY;
const rebuildResp = await fetch(`${API_BASE}/api/resilience/v1/get-resilience-ranking`, {
headers: rebuildHeaders,
signal: AbortSignal.timeout(60_000),
});
if (rebuildResp.ok) {
const rebuilt = await rebuildResp.json();
const total = (rebuilt.items?.length ?? 0) + (rebuilt.greyedOut?.length ?? 0);
console.log(`[resilience-scores] Rebuilt ${RESILIENCE_RANKING_CACHE_KEY} with ${total} countries (${reason})`);
} else {
console.warn(`[resilience-scores] Rebuild ranking HTTP ${rebuildResp.status} — ranking cache is null until next RPC call`);
}
} catch (err) {
console.warn(`[resilience-scores] Failed to rebuild ranking cache: ${err.message}`);
}
}

const finalResults = await redisPipeline(url, token, getCommands);
const finalWarmed = countCachedFromPipeline(finalResults);
console.log(`[resilience-scores] Final: ${finalWarmed}/${countryCodes.length} cached`);

const intervalsWritten = await computeAndWriteIntervals(url, token, countryCodes, finalResults);
return { skipped: false, recordCount: finalWarmed, total: countryCodes.length, intervalsWritten };
const rankingPresent = await refreshRankingAggregate({ url, token, laggardsWarmed });
return { skipped: false, recordCount: finalWarmed, total: countryCodes.length, intervalsWritten, rankingPresent };
}

const intervalsWritten = await computeAndWriteIntervals(url, token, countryCodes, preResults);
return { skipped: false, recordCount: preWarmed, total: countryCodes.length, intervalsWritten };
// Refresh the ranking aggregate on every cron, even when per-country
// scores are still warm from the previous tick. Ranking has a 12h TTL vs
// a 6h cron cadence — skipping the refresh when the key is still alive
// would let it drift toward expiry without a rebuild, and a single missed
// cron would then produce an EMPTY_ON_DEMAND gap before the next one runs.
const rankingPresent = await refreshRankingAggregate({ url, token, laggardsWarmed: 0 });
return { skipped: false, recordCount: preWarmed, total: countryCodes.length, intervalsWritten, rankingPresent };
}

// Write seed-meta:resilience:ranking so api/health.js can track data freshness.
// Without this, the meta key is only written by the get-resilience-ranking RPC
// handler when a user hits it, and goes silently stale during quiet Pro usage —
// firing a misleading "7× stale" alarm in the health endpoint even while the
// underlying scores are fresh. Non-fatal on Redis failure; seed itself still
// completed successfully.
async function writeRankingSeedMeta(recordCount) {
// Trigger a ranking rebuild via the public endpoint EVERY cron, regardless of
// whether resilience:ranking:v9 is still live at probe time. Short-circuiting
// on "key present" left a timing hole: if the key was written late in a prior
// run and the next cron fires early, the key is still alive at probe time →
// rebuild skipped → key expires a short while later and stays absent until a
// cron eventually runs when it's missing. One cheap HTTP per cron keeps both
// the ranking AND its sibling seed-meta rolling forward, and self-heals the
// partial-pipeline case where ranking was written but meta wasn't — handler
// retries the atomic pair on every cron.
//
// Returns whether the ranking key is present in Redis after the rebuild
// attempt (observability only — no caller gates on this).
async function refreshRankingAggregate({ url, token, laggardsWarmed }) {
const reason = laggardsWarmed > 0 ? `${laggardsWarmed} laggard warms` : 'scheduled cron refresh';
try {
const { url, token } = getRedisCredentials();
const meta = { fetchedAt: Date.now(), recordCount };
const resp = await fetch(url, {
method: 'POST',
headers: { Authorization: `Bearer ${token}`, 'Content-Type': 'application/json' },
body: JSON.stringify(['SET', 'seed-meta:resilience:ranking', JSON.stringify(meta), 'EX', 86400 * 7]),
signal: AbortSignal.timeout(5_000),
// ?refresh=1 tells the handler to skip its cache-hit early-return and
// recompute-then-SET atomically. Avoids the earlier "DEL then rebuild"
// flow where a failed rebuild would leave the ranking absent instead of
// stale-but-present.
const rebuildHeaders = { 'User-Agent': SEED_UA, 'Accept': 'application/json' };
if (WM_KEY) rebuildHeaders['X-WorldMonitor-Key'] = WM_KEY;
const rebuildResp = await fetch(`${API_BASE}/api/resilience/v1/get-resilience-ranking?refresh=1`, {
headers: rebuildHeaders,
signal: AbortSignal.timeout(60_000),
});
// fetch() doesn't throw on non-2xx — we must check resp.ok explicitly.
// Otherwise a 401/429/500 from Upstash silently looks like success, the
// seed-meta stays stale, and /api/health keeps alerting without ops
// knowing the write ever failed.
if (!resp.ok) {
const body = await resp.text().catch(() => '<unreadable>');
console.warn(`[resilience-scores] seed-meta:resilience:ranking write failed: HTTP ${resp.status} — ${body.slice(0, 200)}`);
if (rebuildResp.ok) {
const rebuilt = await rebuildResp.json();
const total = (rebuilt.items?.length ?? 0) + (rebuilt.greyedOut?.length ?? 0);
console.log(`[resilience-scores] Refreshed ${RESILIENCE_RANKING_CACHE_KEY} with ${total} countries (${reason})`);
} else {
console.warn(`[resilience-scores] Refresh ranking HTTP ${rebuildResp.status} — ranking cache stays at its prior state until next cron`);
}
} catch (err) {
console.warn('[resilience-scores] seed-meta:resilience:ranking write failed:', err?.message || err);
console.warn(`[resilience-scores] Failed to refresh ranking cache: ${err.message}`);
}

// Verify BOTH the ranking data key AND the seed-meta key. Upstash REST
// pipeline is non-transactional: the handler's atomic SET could land the
// ranking but miss the meta, leaving /api/health reading stale meta over a
// fresh ranking. If the meta didn't land within ~5 minutes, log a warning
// so ops can grep for it — next cron will retry (ranking SET is
// idempotent).
const [rankingLen, metaFresh] = await Promise.all([
fetch(`${url}/strlen/${encodeURIComponent(RESILIENCE_RANKING_CACHE_KEY)}`, {
headers: { Authorization: `Bearer ${token}` },
signal: AbortSignal.timeout(5_000),
}).then((r) => r.ok ? r.json() : null).then((d) => Number(d?.result || 0)).catch(() => 0),
fetch(`${url}/get/seed-meta:resilience:ranking`, {
headers: { Authorization: `Bearer ${token}` },
signal: AbortSignal.timeout(5_000),
}).then((r) => r.ok ? r.json() : null).then((d) => {
if (!d?.result) return false;
try {
const meta = JSON.parse(d.result);
return typeof meta?.fetchedAt === 'number' && (Date.now() - meta.fetchedAt) < 5 * 60 * 1000;
} catch { return false; }
}).catch(() => false),
]);
const rankingPresent = rankingLen > 0;
if (rankingPresent && !metaFresh) {
console.warn(`[resilience-scores] Partial publish: ranking:v9 present but seed-meta not fresh — next cron will retry (handler SET is idempotent)`);
}
return rankingPresent;
}

// The seeder does NOT write seed-meta:resilience:ranking. Previously it did,
// as a "heartbeat" when Pro traffic was quiet — but it could only attest to
// "recordCount of per-country scores", not to whether `resilience:ranking:v9`
// was actually published this cron. The ranking handler gates its SET on a
// 75% coverage threshold and skips both the ranking and its meta when the
// gate fails; a stale-but-present ranking key combined with a fresh seeder
// meta write was exactly the "meta says fresh, data is stale" failure mode
// this PR exists to eliminate. The handler is now the sole writer of meta,
// and it writes both keys atomically via the same pipeline only when coverage
// passes. refreshRankingAggregate() triggers the handler every cron so meta
// never goes silently stale during quiet Pro usage — which was the original
// reason the seeder meta write existed.

async function main() {
const startedAt = Date.now();
const result = await seedResilienceScores();
Expand All @@ -339,8 +332,10 @@ async function main() {
...(result.reason != null && { reason: result.reason }),
...(result.intervalsWritten != null && { intervalsWritten: result.intervalsWritten }),
});
if (!result.skipped && (result.recordCount ?? 0) > 0) {
await writeRankingSeedMeta(result.recordCount);
if (!result.skipped && (result.recordCount ?? 0) > 0 && !result.rankingPresent) {
// Observability only — seeder never writes seed-meta. Health will flag the
// stale meta on its own if this persists across multiple cron ticks.
console.warn('[resilience-scores] resilience:ranking:v9 absent after rebuild attempt; handler-side coverage gate likely tripped. Next cron will retry.');
}
}

Expand Down
50 changes: 40 additions & 10 deletions server/worldmonitor/resilience/v1/_shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ export const RESILIENCE_SCHEMA_V2_ENABLED =
(process.env.RESILIENCE_SCHEMA_V2_ENABLED ?? 'true').toLowerCase() === 'true';

export const RESILIENCE_SCORE_CACHE_TTL_SECONDS = 6 * 60 * 60;
export const RESILIENCE_RANKING_CACHE_TTL_SECONDS = 6 * 60 * 60;
// Ranking TTL must exceed the cron interval (6h) by enough to tolerate one
// missed/slow cron tick. With TTL==cron_interval, writing near the end of a
// run and firing the next cron near the start of the next interval left a
// gap of multiple hours once the key expired between refreshes. 12h gives a
// full cron-cycle of headroom — ensureRankingPresent() still refreshes on
// every cron, so under normal operation the key stays well above TTL=0.
export const RESILIENCE_RANKING_CACHE_TTL_SECONDS = 12 * 60 * 60;
export const RESILIENCE_SCORE_CACHE_PREFIX = 'resilience:score:v9:';
export const RESILIENCE_HISTORY_KEY_PREFIX = 'resilience:history:v4:';
export const RESILIENCE_RANKING_CACHE_KEY = 'resilience:ranking:v9';
Expand Down Expand Up @@ -479,21 +485,45 @@ export async function warmMissingResilienceScores(
// the prefixed namespace via setCachedJson/cachedFetchJson; writing raw here
// would (a) make preview warms invisible to subsequent preview reads and
// (b) leak preview writes into the production-visible unprefixed namespace.
const setCommands = scores.map(({ cc, score }) => [
//
// Chunk size: a single 222-SET pipeline pushes ~600KB of body and routinely
// exceeds REDIS_PIPELINE_TIMEOUT_MS (5s) on Vercel Edge → the runRedisPipeline
// call returns `[]`, the persistence guard correctly returns an empty map,
// and ranking publish gets dropped even though Upstash usually finishes the
// writes a moment later. Splitting into ~30-command batches keeps each
// pipeline body small enough to land well under the timeout while still
// making one round-trip per batch.
const SET_BATCH = 30;
const allSetCommands = scores.map(({ cc, score }) => [
'SET',
scoreCacheKey(cc),
JSON.stringify(score),
'EX',
String(RESILIENCE_SCORE_CACHE_TTL_SECONDS),
]);
const persistResults = await runRedisPipeline(setCommands);
// runRedisPipeline returns [] on transport/HTTP failure. Without a
// per-command OK signal we have no proof anything persisted — return an
// empty map so the ranking coverage gate can't false-positive on a broken
// write path.
if (persistResults.length !== scores.length) {
console.warn(`[resilience] warm pipeline returned ${persistResults.length}/${scores.length} results — treating all as unpersisted`);
return warmed;
// Fire all batches concurrently. Serial awaits would add 7 extra Upstash
// round-trips for a 222-country warm (~100-500ms each on Edge). Each batch
// is independent, so Promise.all collapses them into a single wall-clock
// window bounded by the slowest batch. Failed batches still pad with empty
// entries to preserve per-command index alignment downstream.
const batches: Array<Array<Array<string>>> = [];
for (let i = 0; i < allSetCommands.length; i += SET_BATCH) {
batches.push(allSetCommands.slice(i, i + SET_BATCH));
}
const batchOutcomes = await Promise.all(batches.map((batch) => runRedisPipeline(batch)));
const persistResults: Array<{ result?: unknown }> = [];
for (let b = 0; b < batches.length; b++) {
const batch = batches[b]!;
const batchResults = batchOutcomes[b]!;
if (batchResults.length !== batch.length) {
// runRedisPipeline returns [] on transport/HTTP failure. Pad with
// empty entries so the per-command index alignment downstream stays
// correct — those entries will fail the OK check and be excluded
// from `warmed`, which is the safe behavior (no proof = no claim).
for (let j = 0; j < batch.length; j++) persistResults.push({});
} else {
for (const result of batchResults) persistResults.push(result);
}
}
Comment on lines +510 to 527
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Sequential batch awaits add round-trip latency

The batches are awaited one at a time. For 222 countries (⌈222/30⌉ = 8 batches), this serializes 7 extra Upstash round-trips (~100–500 ms each on Vercel Edge) that could run in parallel. Promise.all would reduce warm time to a single wave, though you'd need to flatten results in batch-offset order and keep the same alignment-padding logic.

Not blocking — the sequential approach is safer for backpressure and the total latency is still well within the function timeout.


let persistFailures = 0;
Expand Down
Loading
Loading