Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions server/_shared/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ function prefixKey(key: string): string {
return `${cachedPrefix}${key}`;
}

// Test-only: invalidate the memoized key prefix so a test that mutates
// process.env.VERCEL_ENV / VERCEL_GIT_COMMIT_SHA sees the new value on the
// next read. No production caller should ever invoke this.
export function __resetKeyPrefixCacheForTests(): void {
cachedPrefix = undefined;
}

/**
* Like getCachedJson but throws on Redis/network failures instead of returning null.
* Always uses the raw (unprefixed) key — callers that write via seed scripts (which bypass
Expand Down
205 changes: 141 additions & 64 deletions server/worldmonitor/resilience/v1/_shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,68 @@ async function appendHistory(countryCode: string, overallScore: number): Promise
]);
}

// Pure compute: no caching, no Redis side-effects (except appendHistory, which
// is part of the score semantics). Kept separate from `ensureResilienceScoreCached`
// so the ranking warm path can persist with explicit write-verification via a
// pipeline (see `warmMissingResilienceScores`) rather than trusting
// `cachedFetchJson`'s log-and-swallow write semantics.
async function buildResilienceScore(
normalizedCountryCode: string,
reader?: ResilienceSeedReader,
): Promise<GetResilienceScoreResponse> {
const staticMeta = await getCachedJson(RESILIENCE_STATIC_META_KEY, true) as { fetchedAt?: number } | null;
const dataVersion = staticMeta?.fetchedAt
? new Date(staticMeta.fetchedAt).toISOString().slice(0, 10)
: todayIsoDate();

const scoreMap = await scoreAllDimensions(normalizedCountryCode, reader);
const dimensions = buildDimensionList(scoreMap);
const domains = buildDomainList(dimensions);
const pillars = buildPillarList(domains, true);

const baselineDims: ResilienceDimension[] = [];
const stressDims: ResilienceDimension[] = [];
for (const dim of dimensions) {
const dimType = RESILIENCE_DIMENSION_TYPES[dim.id as ResilienceDimensionId];
if (dimType === 'baseline' || dimType === 'mixed') baselineDims.push(dim);
if (dimType === 'stress' || dimType === 'mixed') stressDims.push(dim);
}
const baselineScore = round(coverageWeightedMean(baselineDims));
const stressScore = round(coverageWeightedMean(stressDims));
const stressFactor = round(Math.max(0, Math.min(1 - stressScore / 100, 0.5)), 4);
const overallScore = round(domains.reduce((sum, d) => sum + d.score * d.weight, 0));

const totalImputed = dimensions.reduce((sum, d) => sum + (d.imputedWeight ?? 0), 0);
const totalObserved = dimensions.reduce((sum, d) => sum + (d.observedWeight ?? 0), 0);
const imputationShare = (totalImputed + totalObserved) > 0
? round(totalImputed / (totalImputed + totalObserved), 4)
: 0;

const history = (await readHistory(normalizedCountryCode))
.filter((point) => point.date !== todayIsoDate());
const scoreSeries = [...history.map((point) => point.score), overallScore];
const oldestScore = history[0]?.score;

await appendHistory(normalizedCountryCode, overallScore);

return {
countryCode: normalizedCountryCode,
overallScore,
baselineScore,
stressScore,
stressFactor,
level: classifyResilienceLevel(overallScore),
domains,
trend: detectTrend(scoreSeries),
change30d: oldestScore == null ? 0 : round(overallScore - oldestScore),
lowConfidence: computeLowConfidence(dimensions, imputationShare),
imputationShare,
dataVersion,
pillars,
schemaVersion: '2.0',
};
}

export async function ensureResilienceScoreCached(countryCode: string, reader?: ResilienceSeedReader): Promise<GetResilienceScoreResponse> {
const normalizedCountryCode = normalizeCountryCode(countryCode);
if (!normalizedCountryCode) {
Expand Down Expand Up @@ -235,59 +297,7 @@ export async function ensureResilienceScoreCached(countryCode: string, reader?:
let cached = await cachedFetchJson<GetResilienceScoreResponse>(
scoreCacheKey(normalizedCountryCode),
RESILIENCE_SCORE_CACHE_TTL_SECONDS,
async () => {
const staticMeta = await getCachedJson(RESILIENCE_STATIC_META_KEY, true) as { fetchedAt?: number } | null;
const dataVersion = staticMeta?.fetchedAt
? new Date(staticMeta.fetchedAt).toISOString().slice(0, 10)
: todayIsoDate();

const scoreMap = await scoreAllDimensions(normalizedCountryCode, reader);
const dimensions = buildDimensionList(scoreMap);
const domains = buildDomainList(dimensions);
const pillars = buildPillarList(domains, true);

const baselineDims: ResilienceDimension[] = [];
const stressDims: ResilienceDimension[] = [];
for (const dim of dimensions) {
const dimType = RESILIENCE_DIMENSION_TYPES[dim.id as ResilienceDimensionId];
if (dimType === 'baseline' || dimType === 'mixed') baselineDims.push(dim);
if (dimType === 'stress' || dimType === 'mixed') stressDims.push(dim);
}
const baselineScore = round(coverageWeightedMean(baselineDims));
const stressScore = round(coverageWeightedMean(stressDims));
const stressFactor = round(Math.max(0, Math.min(1 - stressScore / 100, 0.5)), 4);
const overallScore = round(domains.reduce((sum, d) => sum + d.score * d.weight, 0));

const totalImputed = dimensions.reduce((sum, d) => sum + (d.imputedWeight ?? 0), 0);
const totalObserved = dimensions.reduce((sum, d) => sum + (d.observedWeight ?? 0), 0);
const imputationShare = (totalImputed + totalObserved) > 0
? round(totalImputed / (totalImputed + totalObserved), 4)
: 0;

const history = (await readHistory(normalizedCountryCode))
.filter((point) => point.date !== todayIsoDate());
const scoreSeries = [...history.map((point) => point.score), overallScore];
const oldestScore = history[0]?.score;

await appendHistory(normalizedCountryCode, overallScore);

return {
countryCode: normalizedCountryCode,
overallScore,
baselineScore,
stressScore,
stressFactor,
level: classifyResilienceLevel(overallScore),
domains,
trend: detectTrend(scoreSeries),
change30d: oldestScore == null ? 0 : round(overallScore - oldestScore),
lowConfidence: computeLowConfidence(dimensions, imputationShare),
imputationShare,
dataVersion,
pillars,
schemaVersion: '2.0',
};
},
() => buildResilienceScore(normalizedCountryCode, reader),
300,
) ?? {
countryCode: normalizedCountryCode,
Expand Down Expand Up @@ -410,26 +420,93 @@ export function sortRankingItems(items: ResilienceRankingItem[]): ResilienceRank
});
}

export async function warmMissingResilienceScores(countryCodes: string[]): Promise<void> {
// Warms the resilience score cache for the given countries and returns a map
// of country-code → score for ONLY the scores whose writes actually landed in
// Redis. Two subtle requirements:
//
// 1. Avoid the Upstash REST write→re-read visibility lag. A /pipeline GET of
// freshly-SET keys in the same Vercel invocation can return null even
// when every SET succeeded — the pre-existing post-warm re-read tripped
// this and silently dropped the ranking publish. See
// `feedback_upstash_write_reread_race_in_handler.md`.
// 2. Still detect actual write failures. `cachedFetchJson`'s underlying
// `setCachedJson` only logs and swallows on error, which would make a
// transient /set failure look like a successful warm and publish a
// ranking aggregate over missing per-country keys.
//
// The pipeline SET response is the authoritative persistence signal: it's
// synchronous with the write, so "result: OK" per command means the key is
// actually stored. We compute scores in memory (no caching), persist in one
// pipeline, and only include countries whose SET returned OK in the returned
// map. Callers should merge the map directly into their local `cachedScores`
// — no post-warm Redis re-read.
export async function warmMissingResilienceScores(
countryCodes: string[],
): Promise<Map<string, GetResilienceScoreResponse>> {
const uniqueCodes = [...new Set(countryCodes.map((countryCode) => normalizeCountryCode(countryCode)).filter(Boolean))];
const warmed = new Map<string, GetResilienceScoreResponse>();
if (uniqueCodes.length === 0) return warmed;

// Share one memoized reader across all countries so global Redis keys (conflict events,
// sanctions, unrest, etc.) are fetched only once instead of once per country.
const sharedReader = createMemoizedSeedReader();
const results = await Promise.allSettled(
uniqueCodes.map((countryCode) => ensureResilienceScoreCached(countryCode, sharedReader)),
const computed = await Promise.allSettled(
uniqueCodes.map(async (cc) => ({ cc, score: await buildResilienceScore(cc, sharedReader) })),
);
const failures: Array<{ countryCode: string; reason: string }> = [];
for (let i = 0; i < results.length; i++) {
const result = results[i];
if (result?.status === 'rejected') {
failures.push({

const scores: Array<{ cc: string; score: GetResilienceScoreResponse }> = [];
const computeFailures: Array<{ countryCode: string; reason: string }> = [];
for (let i = 0; i < computed.length; i++) {
const result = computed[i]!;
if (result.status === 'fulfilled') {
scores.push(result.value);
} else {
computeFailures.push({
countryCode: uniqueCodes[i]!,
reason: result.reason instanceof Error ? result.reason.message : String(result.reason),
});
}
}
if (failures.length > 0) {
const sample = failures.slice(0, 10).map((f) => `${f.countryCode}(${f.reason})`).join(', ');
console.warn(`[resilience] warm failed for ${failures.length}/${uniqueCodes.length} countries: ${sample}${failures.length > 10 ? '...' : ''}`);
if (computeFailures.length > 0) {
const sample = computeFailures.slice(0, 10).map((f) => `${f.countryCode}(${f.reason})`).join(', ');
console.warn(`[resilience] warm compute failed for ${computeFailures.length}/${uniqueCodes.length} countries: ${sample}${computeFailures.length > 10 ? '...' : ''}`);
}
if (scores.length === 0) return warmed;

// Default `raw=false` so runRedisPipeline applies the env-based key prefix
// (`preview:<sha>:` on preview/dev, empty in production). The normal score
// reads (`getCachedResilienceScores`, `ensureResilienceScoreCached`) look in
// the prefixed namespace via setCachedJson/cachedFetchJson; writing raw here
// would (a) make preview warms invisible to subsequent preview reads and
// (b) leak preview writes into the production-visible unprefixed namespace.
const setCommands = scores.map(({ cc, score }) => [
'SET',
scoreCacheKey(cc),
JSON.stringify(score),
'EX',
String(RESILIENCE_SCORE_CACHE_TTL_SECONDS),
]);
const persistResults = await runRedisPipeline(setCommands);
// runRedisPipeline returns [] on transport/HTTP failure. Without a
// per-command OK signal we have no proof anything persisted — return an
// empty map so the ranking coverage gate can't false-positive on a broken
// write path.
if (persistResults.length !== scores.length) {
console.warn(`[resilience] warm pipeline returned ${persistResults.length}/${scores.length} results — treating all as unpersisted`);
return warmed;
}

let persistFailures = 0;
for (let i = 0; i < scores.length; i++) {
const { cc, score } = scores[i]!;
if (persistResults[i]?.result === 'OK') {
warmed.set(cc, score);
} else {
persistFailures++;
}
}
if (persistFailures > 0) {
console.warn(`[resilience] warm persisted ${warmed.size}/${scores.length} scores (${persistFailures} SETs did not return OK)`);
}
return warmed;
}
12 changes: 9 additions & 3 deletions server/worldmonitor/resilience/v1/get-resilience-ranking.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,18 @@ export const getResilienceRanking: ResilienceServiceHandler['getResilienceRankin
const countryCodes = await listScorableCountries();
if (countryCodes.length === 0) return { items: [], greyedOut: [] };

let cachedScores = await getCachedResilienceScores(countryCodes);
const cachedScores = await getCachedResilienceScores(countryCodes);
const missing = countryCodes.filter((countryCode) => !cachedScores.has(countryCode));
if (missing.length > 0) {
try {
await warmMissingResilienceScores(missing.slice(0, SYNC_WARM_LIMIT));
cachedScores = await getCachedResilienceScores(countryCodes);
// Merge warm results into cachedScores directly rather than re-reading
// from Redis. Upstash REST writes (/set) aren't always visible to an
// immediately-following /pipeline GET in the same Vercel invocation,
// which collapsed coverage to 0/N and silently dropped the ranking
// publish. The warmer already holds every score in memory — trust it.
// See `feedback_upstash_write_reread_race_in_handler.md`.
const warmed = await warmMissingResilienceScores(missing.slice(0, SYNC_WARM_LIMIT));
for (const [countryCode, score] of warmed) cachedScores.set(countryCode, score);
} catch (err) {
console.warn('[resilience] ranking warmup failed:', err);
}
Expand Down
4 changes: 2 additions & 2 deletions tests/helpers/fake-upstash-redis.mts
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ export function createRedisFetch(fixtures: Record<string, unknown>): FakeRedisSt
return { fetchImpl, redis, sortedSets, expires };
}

export function installRedis(fixtures: Record<string, unknown>): FakeRedisState {
export function installRedis(fixtures: Record<string, unknown>, opts: { keepVercelEnv?: boolean } = {}): FakeRedisState {
process.env.UPSTASH_REDIS_REST_URL = 'https://redis.example';
process.env.UPSTASH_REDIS_REST_TOKEN = 'token';
delete process.env.VERCEL_ENV;
if (!opts.keepVercelEnv) delete process.env.VERCEL_ENV;
const state = createRedisFetch(fixtures);
globalThis.fetch = state.fetchImpl;
return state;
Expand Down
Loading
Loading