Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 34 additions & 59 deletions api/mcp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type ToolDef = CacheToolDef | RpcToolDef;
const TOOL_REGISTRY: ToolDef[] = [
{
name: 'get_market_data',
description: 'Real-time equity quotes, commodity prices (including gold futures GC=F), crypto prices, forex FX rates (USD/EUR, USD/JPY etc.), sector performance, ETF flows, and Gulf market quotes from WorldMonitor\'s curated bootstrap cache.',
description: 'Real-time equity quotes, commodity prices (including gold futures GC=F), crypto prices, forex FX rates (USD/EUR, USD/JPY etc.), sector performance, ETF flows, Gulf market quotes, crypto sector performance, stablecoin market data, and wholesale FX rates from WorldMonitor\'s curated bootstrap cache.',
inputSchema: { type: 'object', properties: {}, required: [] },
_cacheKeys: [
'market:stocks-bootstrap:v1',
Expand All @@ -86,6 +86,9 @@ const TOOL_REGISTRY: ToolDef[] = [
'market:etf-flows:v1',
'market:gulf-quotes:v1',
'market:fear-greed:v1',
'market:crypto-sectors:v1',
'market:stablecoins:v1',
'shared:fx-rates:v1',
],
_seedMetaKey: 'seed-meta:market:stocks',
_maxStaleMin: 30,
Expand Down Expand Up @@ -154,7 +157,7 @@ const TOOL_REGISTRY: ToolDef[] = [
},
{
name: 'get_economic_data',
description: 'Macro economic indicators: Fed Funds rate (FRED), economic calendar events, fuel prices, ECB FX rates, EU yield curve, earnings calendar, COT positioning, energy storage data, BIS household debt service ratio (DSR, quarterly, leading indicator of household financial stress across ~40 advanced economies), and BIS residential + commercial property price indices (real, quarterly).',
description: 'Macro economic indicators: Fed Funds rate (FRED), economic calendar events, fuel prices, ECB FX rates, EU yield curve, earnings calendar, COT positioning, energy storage, IMF WEO macro (inflation, GDP, debt, 200+ countries), national debt-to-GDP timeseries, Big Mac PPP index, FAO Food Price Index, and Eurostat EU statistics from WorldMonitor\'s seed cache.',
inputSchema: { type: 'object', properties: {}, required: [] },
_cacheKeys: [
'economic:fred:v1:FEDFUNDS:0',
Expand All @@ -165,65 +168,14 @@ const TOOL_REGISTRY: ToolDef[] = [
'economic:spending:v1',
'market:earnings-calendar:v1',
'market:cot:v1',
'economic:bis:dsr:v1',
'economic:bis:property-residential:v1',
'economic:bis:property-commercial:v1',
'economic:imf:macro:v2',
'economic:national-debt:v1',
'economic:bigmac:v1',
'economic:fao-ffpi:v1',
'economic:eurostat-country-data:v1',
],
_seedMetaKey: 'seed-meta:economic:econ-calendar',
_maxStaleMin: 1440,
_freshnessChecks: [
{ key: 'seed-meta:economic:econ-calendar', maxStaleMin: 1440 },
// Per-dataset BIS seed-meta keys — the aggregate
// `seed-meta:economic:bis-extended` would report "fresh" even if only
// one of the three datasets (DSR / SPP / CPP) is current, matching the
// false-freshness bug already fixed for /api/health and resilience.
{ key: 'seed-meta:economic:bis-dsr', maxStaleMin: 1440 }, // 12h cron × 2
{ key: 'seed-meta:economic:bis-property-residential', maxStaleMin: 1440 },
{ key: 'seed-meta:economic:bis-property-commercial', maxStaleMin: 1440 },
],
},
{
name: 'get_country_macro',
description: 'Per-country macroeconomic indicators from IMF WEO (~210 countries, monthly cadence). Bundles fiscal/external balance (inflation, current account, gov revenue/expenditure/primary balance, CPI), growth & per-capita (real GDP growth, GDP/capita USD & PPP, savings & investment rates, savings-investment gap), labor & demographics (unemployment, population), and external trade (current account USD, import/export volume % changes). Latest available year per series. Use for country-level economic screening, peer benchmarking, and stagflation/imbalance flags. NOTE: export/import LEVELS in USD (exportsUsd, importsUsd, tradeBalanceUsd) are returned as null — WEO retracted broad coverage for BX/BM indicators in 2026-04; use currentAccountUsd or volume changes (import/exportVolumePctChg) instead.',
inputSchema: { type: 'object', properties: {}, required: [] },
_cacheKeys: [
'economic:imf:macro:v2',
'economic:imf:growth:v1',
'economic:imf:labor:v1',
'economic:imf:external:v1',
],
_seedMetaKey: 'seed-meta:economic:imf-macro',
_maxStaleMin: 100800, // monthly WEO release; 70d = 2× interval (absorbs one missed run)
_freshnessChecks: [
{ key: 'seed-meta:economic:imf-macro', maxStaleMin: 100800 },
{ key: 'seed-meta:economic:imf-growth', maxStaleMin: 100800 },
{ key: 'seed-meta:economic:imf-labor', maxStaleMin: 100800 },
{ key: 'seed-meta:economic:imf-external', maxStaleMin: 100800 },
],
},
{
name: 'get_eu_housing_cycle',
description: 'Eurostat annual house price index (prc_hpi_a, base 2015=100) for all 27 EU members plus EA20 and EU27_2020 aggregates. Each country entry includes the latest value, prior value, date, unit, and a 10-year sparkline series. Complements BIS WS_SPP with broader EU coverage for the Housing cycle tile.',
inputSchema: { type: 'object', properties: {}, required: [] },
_cacheKeys: ['economic:eurostat:house-prices:v1'],
_seedMetaKey: 'seed-meta:economic:eurostat-house-prices',
_maxStaleMin: 60 * 24 * 50, // weekly cron, annual data
},
{
name: 'get_eu_quarterly_gov_debt',
description: 'Eurostat quarterly general government gross debt (gov_10q_ggdebt, %GDP) for all 27 EU members plus EA20 and EU27_2020 aggregates. Each country entry includes latest value, prior value, quarter label, and an 8-quarter sparkline series. Provides fresher debt-trajectory signal than annual IMF GGXWDG_NGDP for EU panels.',
inputSchema: { type: 'object', properties: {}, required: [] },
_cacheKeys: ['economic:eurostat:gov-debt-q:v1'],
_seedMetaKey: 'seed-meta:economic:eurostat-gov-debt-q',
_maxStaleMin: 60 * 24 * 14, // quarterly data, 2-day cron
},
{
name: 'get_eu_industrial_production',
description: 'Eurostat monthly industrial production index (sts_inpr_m, NACE B-D industry excl. construction, SCA, base 2021=100) for all 27 EU members plus EA20 and EU27_2020 aggregates. Each country entry includes latest value, prior value, month label, and a 12-month sparkline series. Leading indicator of real-economy activity used by the "Real economy pulse" sparkline.',
inputSchema: { type: 'object', properties: {}, required: [] },
_cacheKeys: ['economic:eurostat:industrial-production:v1'],
_seedMetaKey: 'seed-meta:economic:eurostat-industrial-production',
_maxStaleMin: 60 * 24 * 5, // monthly data, daily cron
},
{
name: 'get_prediction_markets',
Expand Down Expand Up @@ -268,12 +220,17 @@ const TOOL_REGISTRY: ToolDef[] = [
},
{
name: 'get_supply_chain_data',
description: 'Dry bulk shipping stress index, customs revenue flows, and COMTRADE bilateral trade data. Tracks global supply chain pressure and trade disruptions.',
description: 'Dry bulk shipping stress index, customs revenue flows, COMTRADE bilateral trade data, Hormuz tracker, port chokepoint reference data, active disruptions, energy crisis policies, and energy intelligence feeds. Tracks global supply chain pressure and trade disruptions.',
inputSchema: { type: 'object', properties: {}, required: [] },
_cacheKeys: [
'supply_chain:shipping_stress:v1',
'trade:customs-revenue:v1',
'comtrade:flows:v1',
'supply_chain:hormuz_tracker:v1',
'portwatch:chokepoints:ref:v1',
'portwatch:disruptions:active:v1',
'energy:crisis-policies:v1',
'energy:intelligence:feed:v1',
],
_seedMetaKey: 'seed-meta:trade:customs-revenue',
_maxStaleMin: 2880,
Expand Down Expand Up @@ -323,6 +280,24 @@ const TOOL_REGISTRY: ToolDef[] = [
_maxStaleMin: 30,
},

// -------------------------------------------------------------------------
// Resilience recovery — cache read (IMF WEO-derived resilience indicators)
// -------------------------------------------------------------------------
{
name: 'get_resilience_recovery',
description: 'IMF WEO-derived resilience and recovery indicators: fiscal space (revenue vs. spending headroom), reserve adequacy (external reserves vs. imports), external debt sustainability, import concentration (HHI), and strategic fuel stock levels. Covers 200+ countries with monthly/quarterly cadence.',
inputSchema: { type: 'object', properties: {}, required: [] },
_cacheKeys: [
'resilience:recovery:fiscal-space:v1',
'resilience:recovery:reserve-adequacy:v1',
'resilience:recovery:external-debt:v1',
'resilience:recovery:import-hhi:v1',
'resilience:recovery:fuel-stocks:v1',
],
_seedMetaKey: 'seed-meta:resilience:recovery:fiscal-space',
_maxStaleMin: 43200,
},

// -------------------------------------------------------------------------
// AI inference tools — call LLM endpoints, not cached Redis reads
// -------------------------------------------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions convex/alertRules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ function validateQuietHoursArgs(args: {
if (args.quietHoursEnd !== undefined && (args.quietHoursEnd < 0 || args.quietHoursEnd > 23 || !Number.isInteger(args.quietHoursEnd))) {
throw new ConvexError("quietHoursEnd must be an integer 0–23");
}
if (args.quietHoursStart !== undefined && args.quietHoursEnd !== undefined && args.quietHoursStart === args.quietHoursEnd) {
throw new ConvexError("quietHoursStart and quietHoursEnd cannot be equal — setting the same value for both means quiet hours are always active; use the enabled flag instead");
}
if (args.quietHoursTimezone !== undefined) {
try {
Intl.DateTimeFormat(undefined, { timeZone: args.quietHoursTimezone });
Expand Down
105 changes: 43 additions & 62 deletions scripts/notification-relay.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,34 @@ function isPrivateIP(ip) {

// ── Quiet hours ───────────────────────────────────────────────────────────────

const { toLocalHour, isInQuietHours } = require('./lib/quiet-hours.cjs');
function toLocalHour(nowMs, timezone) {
try {
const parts = new Intl.DateTimeFormat('en-US', {
timeZone: timezone,
hour: 'numeric',
hour12: false,
}).formatToParts(new Date(nowMs));
const h = parts.find(p => p.type === 'hour');
return h ? parseInt(h.value, 10) : -1;
} catch {
return -1;
}
}

function isInQuietHours(rule) {
if (!rule.quietHoursEnabled) return false;
const start = rule.quietHoursStart ?? 22;
const end = rule.quietHoursEnd ?? 7;
// start === end means quiet hours are not meaningful — treat as disabled
if (start === end) return false;
const tz = rule.quietHoursTimezone ?? 'UTC';
const localHour = toLocalHour(Date.now(), tz);
if (localHour === -1) return false;
// spans midnight when start > end (e.g. 23:00-07:00)
return start < end
? localHour >= start && localHour < end
: localHour >= start || localHour < end;
}

// Returns 'deliver' | 'suppress' | 'hold'
function resolveQuietAction(rule, severity) {
Expand Down Expand Up @@ -256,7 +283,7 @@ async function processFlushQuietHeld(event) {

// ── Delivery: Telegram ────────────────────────────────────────────────────────

async function sendTelegram(userId, chatId, text) {
async function sendTelegram(userId, chatId, text, _retryCount = 0) {
if (!TELEGRAM_BOT_TOKEN) {
console.warn('[relay] Telegram: TELEGRAM_BOT_TOKEN not set — skipping');
return false;
Expand All @@ -277,10 +304,14 @@ async function sendTelegram(userId, chatId, text) {
return false;
}
if (res.status === 429) {
if (_retryCount >= 1) {
console.warn(`[relay] Telegram 429 retry exhausted for ${userId} — giving up`);
return false;
}
const body = await res.json().catch(() => ({}));
const wait = ((body.parameters?.retry_after ?? 5) + 1) * 1000;
await new Promise(r => setTimeout(r, wait));
return sendTelegram(userId, chatId, text); // single retry
return sendTelegram(userId, chatId, text, _retryCount + 1); // single retry with counter
}
if (res.status === 401) {
console.error('[relay] Telegram 401 Unauthorized — TELEGRAM_BOT_TOKEN is invalid or belongs to a different bot; correct the Railway env var to restore Telegram delivery');
Expand Down Expand Up @@ -445,13 +476,6 @@ async function sendWebhook(userId, webhookEnvelope, event) {
return false;
}

// Envelope version stays at '1'. Payload gained optional `corroborationCount`
// on rss_alert (PR #3069) — this is an additive field, backwards-compatible
// for consumers that don't enforce `additionalProperties: false`. Bumping
// version here would have broken parity with the other webhook producers
// (scripts/proactive-intelligence.mjs, scripts/seed-digest-notifications.mjs)
// which still emit v1, causing the same endpoint to receive mixed envelope
// versions per event type.
const payload = JSON.stringify({
version: '1',
eventType: event.eventType,
Expand Down Expand Up @@ -556,62 +580,21 @@ async function processWelcome(event) {

const IMPORTANCE_SCORE_LIVE = process.env.IMPORTANCE_SCORE_LIVE === '1';
const IMPORTANCE_SCORE_MIN = Number(process.env.IMPORTANCE_SCORE_MIN ?? 40);
// v2 key: JSON-encoded members, used after the stale-score fix (PR #TBD).
// The old v1 key (compact string format) is retained by consumers for
// backward-compat reading but is no longer written. See
// docs/internal/scoringDiagnostic.md §5 and §9 Step 4.
const SHADOW_SCORE_LOG_KEY = 'shadow:score-log:v2';
const SHADOW_SCORE_LOG_KEY = 'shadow:score-log:v1';
const SHADOW_LOG_TTL = 7 * 24 * 3600; // 7 days

async function shadowLogScore(event) {
const importanceScore = event.payload?.importanceScore ?? 0;
if (!UPSTASH_URL || !UPSTASH_TOKEN || importanceScore === 0) return;
const now = Date.now();
const record = {
ts: now,
importanceScore,
severity: event.severity ?? 'high',
eventType: event.eventType,
title: String(event.payload?.title ?? '').slice(0, 160),
source: event.payload?.source ?? '',
publishedAt: event.payload?.publishedAt ?? null,
corroborationCount: event.payload?.corroborationCount ?? null,
variant: event.variant ?? '',
};
const member = JSON.stringify(record);
// Use timestamp as the sorted-set score so entries are time-sortable for analysis.
// Member encodes importanceScore + context for review.
const member = `${now}:score=${importanceScore}:${event.eventType}:${String(event.payload?.title ?? '').slice(0, 60)}`;
const cutoff = String(now - SHADOW_LOG_TTL * 1000); // prune entries older than 7 days
// One pipelined HTTP request: ZADD + ZREMRANGEBYSCORE prune + 30-day
// belt-and-suspenders EXPIRE. Saves ~50% round-trips vs sequential calls
// and bounds growth even if writes stop and the rolling prune stalls.
try {
const res = await fetch(`${UPSTASH_URL}/pipeline`, {
method: 'POST',
headers: {
Authorization: `Bearer ${UPSTASH_TOKEN}`,
'Content-Type': 'application/json',
'User-Agent': 'worldmonitor-relay/1.0',
},
body: JSON.stringify([
['ZADD', SHADOW_SCORE_LOG_KEY, String(now), member],
['ZREMRANGEBYSCORE', SHADOW_SCORE_LOG_KEY, '-inf', cutoff],
['EXPIRE', SHADOW_SCORE_LOG_KEY, '2592000'],
]),
});
// Surface HTTP failures and per-command errors. Activation depends on v2
// filling with clean data; a silent write-failure would leave operators
// staring at an empty ZSET with no signal.
if (!res.ok) {
console.warn(`[relay] shadow-log pipeline HTTP ${res.status}`);
return;
}
const body = await res.json().catch(() => null);
if (Array.isArray(body)) {
const failures = body.map((cmd, i) => (cmd?.error ? `cmd[${i}] ${cmd.error}` : null)).filter(Boolean);
if (failures.length > 0) console.warn(`[relay] shadow-log pipeline partial failure: ${failures.join('; ')}`);
}
} catch (err) {
console.warn(`[relay] shadow-log pipeline threw: ${err?.message ?? err}`);
}
await upstashRest('ZADD', SHADOW_SCORE_LOG_KEY, String(now), member);
await upstashRest('ZREMRANGEBYSCORE', SHADOW_SCORE_LOG_KEY, '-inf', cutoff);
} catch {}
}

// ── AI impact analysis ───────────────────────────────────────────────────────
Expand Down Expand Up @@ -682,10 +665,8 @@ async function processEvent(event) {
if (event.eventType === 'flush_quiet_held') { await processFlushQuietHeld(event); return; }
console.log(`[relay] Processing event: ${event.eventType} (${event.severity ?? 'high'})`);

// Shadow log importanceScore for comparison. Gate at caller: only rss_alert
// events carry importanceScore; for everything else shadowLogScore would
// short-circuit, but we still pay the promise/microtask cost unless gated here.
if (event.eventType === 'rss_alert') shadowLogScore(event).catch(() => {});
// Shadow log importanceScore for comparison (always runs when score is present)
shadowLogScore(event).catch(() => {});

// Score gate — only for rss_alert; other event types (oref_siren, conflict_escalation,
// notam_closure, etc.) never attach importanceScore so they must never be gated here.
Expand Down
14 changes: 12 additions & 2 deletions src/app/country-intel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,22 @@ export class CountryIntelManager implements AppModule {
const ourPos = CountryIntelManager.firstMentionPosition(t, searchTerms);
const otherPos = CountryIntelManager.firstMentionPosition(t, otherCountryTerms);
return ourPos !== Infinity && (otherPos === Infinity || ourPos <= otherPos);
}).sort((a, b) => {
});
const seen = new Set<string>();
const deduped: typeof filteredNews = [];
for (const n of filteredNews) {
const normalized = n.title.toLowerCase().replace(/[^a-z0-9\s]/g, '').replace(/\s+/g, ' ').trim();
if (normalized.length > 0 && !seen.has(normalized)) {
seen.add(normalized);
deduped.push(n);
}
}
deduped.sort((a, b) => {
const severityDelta = this.newsSeverityRank(b) - this.newsSeverityRank(a);
if (severityDelta !== 0) return severityDelta;
return new Date(b.pubDate).getTime() - new Date(a.pubDate).getTime();
});
this.ctx.countryBriefPage.updateNews(filteredNews.slice(0, 10));
this.ctx.countryBriefPage.updateNews(deduped.slice(0, 10));
Comment on lines +269 to +283
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Deduplication may discard higher-quality duplicates

The deduplication iterates filteredNews in source order and keeps the first occurrence, but the sort by severity and date runs after. If a higher-severity or more-recent duplicate appears later in filteredNews than a lower-quality duplicate, the better item is dropped before the sort ever runs.

Sorting before deduplicating ensures the "best" instance is always the one that survives:

const sorted = filteredNews.sort((a, b) => {
  const severityDelta = this.newsSeverityRank(b) - this.newsSeverityRank(a);
  if (severityDelta !== 0) return severityDelta;
  return new Date(b.pubDate).getTime() - new Date(a.pubDate).getTime();
});
const seen = new Set<string>();
const deduped: typeof sorted = [];
for (const n of sorted) {
  const normalized = n.title.toLowerCase().replace(/[^a-z0-9\s]/g, '').replace(/\s+/g, ' ').trim();
  if (normalized.length > 0 && !seen.has(normalized)) {
    seen.add(normalized);
    deduped.push(n);
  }
}
this.ctx.countryBriefPage.updateNews(deduped.slice(0, 10));

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 415ac90. Deduplication loop now runs AFTER the sort, so the highest-severity/freshest item wins when duplicates exist. The deduped array is now sorted first (by severity then recency), then the deduplication loop eliminates lower-quality duplicates, ensuring the best item per topic survives.


this.ctx.countryBriefPage.updateInfrastructure(code);

Expand Down
Loading
Loading