Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 46 additions & 16 deletions app/market/cache_refresh.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
"""
Background refresh: fetch from CMC, Twelve Data, Pyth and upsert into MongoDB.
Designed to run every 1 minute (token_detail + pyth); historical can run every 1 min for short ranges.
"""
Expand Down Expand Up @@ -127,7 +127,9 @@ def refresh_token_detail() -> None:


def refresh_token_historical() -> None:
"""Fetch historical chart data from Twelve Data and upsert into token_historical."""
"""Fetch historical chart data from Twelve Data and upsert into token_historical.
Uses ThreadPoolExecutor for parallel fetching while respecting rate limits.
"""
coll = token_historical_collection()
if coll is None:
return
Expand All @@ -136,31 +138,56 @@ def refresh_token_historical() -> None:
return
try:
from .market import MarketData
from .token_analysis import get_historical_chart_data
market_data = MarketData(twelve_data_api_key=twelve_key)
except Exception as e:
print(f"[CacheRefresh] MarketData init failed: {e}")
return

symbols = _tracked_symbols()
updated = _now_iso()
for symbol in _tracked_symbols():
for range_key in HISTORICAL_RANGES:

# Use parallel fetcher if available, fall back to serial
try:
from .cache_refresh_parallel import fetch_historical_parallel
parallel_results = fetch_historical_parallel(symbols, HISTORICAL_RANGES, market_data)
for result in parallel_results:
doc = {
"symbol": result["symbol"],
"range": result.get("range", "1d"),
"data": result.get("data", []),
"updated_at": updated,
}
try:
result = get_historical_chart_data(symbol, range_key, market_data)
doc = {
"symbol": symbol,
"range": result.get("range", range_key),
"data": result.get("data", []),
"updated_at": updated,
}
coll.update_one(
{"symbol": symbol, "range": range_key},
{"symbol": result["symbol"], "range": result.get("range", "1d")},
{"$set": doc},
upsert=True,
)
except Exception as e:
print(f"[CacheRefresh] historical {symbol} {range_key}: {e}")
time.sleep(8) # Twelve Data free tier: 8 calls/min — space out requests
print(f"[CacheRefresh] token_historical: refreshed {len(_tracked_symbols())} symbols x {len(HISTORICAL_RANGES)} ranges")
print(f"[CacheRefresh] upsert historical {result['symbol']} {result.get('range','1d')}: {e}")
print(f"[CacheRefresh] token_historical: refreshed {len(parallel_results)} symbol×range combos (parallel)")
except ImportError:
# Fallback to original serial implementation
for symbol in symbols:
for range_key in HISTORICAL_RANGES:
try:
from .token_analysis import get_historical_chart_data
result = get_historical_chart_data(symbol, range_key, market_data)
doc = {
"symbol": symbol,
"range": result.get("range", range_key),
"data": result.get("data", []),
"updated_at": updated,
}
coll.update_one(
{"symbol": symbol, "range": range_key},
{"$set": doc},
upsert=True,
)
except Exception as e:
print(f"[CacheRefresh] historical {symbol} {range_key}: {e}")
time.sleep(8)
print(f"[CacheRefresh] token_historical: refreshed {len(symbols)} symbols x {len(HISTORICAL_RANGES)} ranges")


def refresh_pyth_prices() -> None:
Expand Down Expand Up @@ -230,3 +257,6 @@ def run_refresh_historical() -> None:
from datetime import datetime
print(f"[CacheRefresh] historical run at {datetime.now().strftime('%H:%M:%S')} (every 5 min)")
refresh_token_historical()



40 changes: 40 additions & 0 deletions app/market/cache_refresh_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
Concurrent historical chart data fetcher for cache_refresh.
Uses ThreadPoolExecutor to parallelize Twelve Data API calls
while respecting rate limits (8s between calls on free tier).
"""
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, List, Optional

from .market import MarketData


def fetch_historical_parallel(symbols, ranges, market_data, max_workers=4, rate_limit_delay=8.0):
"""Fetch historical chart data for multiple symbols and ranges in parallel."""
results = []

def fetch_one(symbol, range_key):
try:
from .token_analysis import get_historical_chart_data
result = get_historical_chart_data(symbol, range_key, market_data)
return {"symbol": symbol, "range": result.get("range", range_key), "data": result.get("data", [])}
except Exception as e:
print(f"[CacheRefreshParallel] historical {symbol} {range_key}: {e}")
return None

tasks = [(symbol, rk) for symbol in symbols for rk in ranges]

with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_task = {
executor.submit(fetch_one, symbol, range_key): (symbol, range_key)
for symbol, range_key in tasks
}
for future in as_completed(future_to_task):
result = future.result()
if result is not None:
results.append(result)
time.sleep(rate_limit_delay / max(len(symbols), 1))

return results