Skip to content

Commit db846de

Browse files
feat: add Ethereum chain support
- ETH portfolio fetcher with Alchemy API + web3.py fallback - Uniswap V3 protocol (top pools by TVL, fee-based APR) - Aave V3 protocol (lending reserves, USD-denominated TVL) - Token metadata for ETH ecosystem (WETH, USDC, USDT, DAI) - Configurable subgraph URLs via env vars - Integration tests for ETH protocols and portfolio
1 parent 427015e commit db846de

File tree

10 files changed

+893
-1
lines changed

10 files changed

+893
-1
lines changed

.env.example

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@ export AWS_REGION=''
33
export AWS_ACCESS_KEY_ID=''
44
export AWS_SECRET_ACCESS_KEY=''
55
export SOLANA_RPC_URL=''
6+
export ETH_RPC_URL='' # Ethereum RPC (Alchemy/Infura recommended)
7+
8+
# The Graph subgraph URLs (optional — defaults are provided)
9+
# Production usage requires a Graph API key: https://thegraph.com/studio/
10+
export UNISWAP_V3_SUBGRAPH_URL=''
11+
export AAVE_V3_SUBGRAPH_URL=''
612
export OPENROUTER_API_KEY=''
713
export GEMINI_API_KEY=''
814
export DD_API_KEY=''

agent/tools.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ async def retrieve_solana_pools(
4444

4545
# Create a query to filter pools
4646
query = PoolQuery(
47-
chain=Chain.SOLANA, # Currently only supporting Solana
47+
chain=Chain.SOLANA,
4848
tokens=tokens or [],
4949
user_tokens=user_tokens,
5050
)
@@ -56,9 +56,37 @@ async def retrieve_solana_pools(
5656
return pools
5757

5858

59+
@tool
60+
@track_tool_usage("retrieve_ethereum_pools")
61+
async def retrieve_ethereum_pools(
62+
tokens: List[str] = None,
63+
config: RunnableConfig = None,
64+
) -> List[Pool]:
65+
"""
66+
Retrieves Ethereum pools matching the specified criteria that the user can invest in.
67+
Includes Uniswap V3 AMM pools and Aave V3 lending pools.
68+
"""
69+
configurable = config["configurable"]
70+
user_tokens: List[WalletTokenHolding] = configurable["tokens"]
71+
protocol_registry: ProtocolRegistry = configurable["protocol_registry"]
72+
73+
query = PoolQuery(
74+
chain=Chain.ETHEREUM,
75+
tokens=tokens or [],
76+
user_tokens=user_tokens,
77+
)
78+
79+
pools = await protocol_registry.get_pools(query)
80+
if len(pools) == 0:
81+
return "No Ethereum pools found."
82+
83+
return pools
84+
85+
5986
def create_investor_agent_toolkit() -> List[BaseTool]:
6087
return [
6188
retrieve_solana_pools,
89+
retrieve_ethereum_pools,
6290
]
6391

6492

main.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@
1919
from onchain.pools.solana.orca_protocol import OrcaProtocol
2020
from onchain.pools.solana.save_protocol import SaveProtocol
2121
from onchain.pools.solana.kamino_protocol import KaminoProtocol
22+
from onchain.pools.ethereum.uniswap_v3_protocol import UniswapV3Protocol
23+
from onchain.pools.ethereum.aave_protocol import AaveProtocol
2224

2325
# Define protocols enabled
2426
protocols = [
2527
OrcaProtocol.PROTOCOL_NAME,
2628
SaveProtocol.PROTOCOL_NAME,
2729
KaminoProtocol.PROTOCOL_NAME,
30+
UniswapV3Protocol.PROTOCOL_NAME,
31+
AaveProtocol.PROTOCOL_NAME,
2832
]
2933

3034
# Create the FastAPI app

onchain/pools/ethereum/__init__.py

Whitespace-only changes.
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
"""Aave V3 protocol implementation for Ethereum lending pool discovery."""
2+
3+
from typing import List, Optional, Dict, Any
4+
import logging
5+
import os
6+
7+
import aiohttp
8+
9+
from api.api_types import Pool, Token, Chain, PoolType
10+
from onchain.pools.protocol import Protocol
11+
from onchain.tokens.metadata import TokenMetadataRepo
12+
13+
logger = logging.getLogger(__name__)
14+
15+
# Aave V3 Ethereum subgraph — configurable via env var.
16+
AAVE_V3_SUBGRAPH_URL = os.environ.get(
17+
"AAVE_V3_SUBGRAPH_URL",
18+
"https://gateway.thegraph.com/api/subgraphs/id/Cd2gEDVeqnjBn1hSeqFMitw8Q1iiyV9FYUZkLNRcL87g",
19+
)
20+
21+
# Stablecoins for classification
22+
STABLECOIN_SYMBOLS = {"USDC", "USDT", "DAI", "BUSD", "TUSD", "FRAX", "LUSD", "GUSD", "sUSD"}
23+
24+
25+
class AaveProtocol(Protocol):
26+
"""
27+
Aave V3 protocol — fetches Ethereum lending reserves.
28+
29+
Returns each reserve as a lending Pool with supply APR.
30+
Uses The Graph subgraph for on-chain data.
31+
"""
32+
33+
PROTOCOL_NAME = "aave-v3"
34+
35+
_session: Optional[aiohttp.ClientSession] = None
36+
37+
@property
38+
def name(self) -> str:
39+
return self.PROTOCOL_NAME
40+
41+
@property
42+
async def session(self) -> aiohttp.ClientSession:
43+
if self._session is None:
44+
self._session = aiohttp.ClientSession(
45+
timeout=aiohttp.ClientTimeout(total=30)
46+
)
47+
return self._session
48+
49+
async def close(self):
50+
if self._session:
51+
await self._session.close()
52+
self._session = None
53+
54+
async def get_pools(self, token_metadata_repo: TokenMetadataRepo) -> List[Pool]:
55+
"""Fetch Aave V3 reserves from the subgraph."""
56+
query = """
57+
{
58+
reserves(
59+
first: 50,
60+
where: { isActive: true }
61+
) {
62+
id
63+
symbol
64+
name
65+
decimals
66+
underlyingAsset
67+
liquidityRate
68+
totalATokenSupply
69+
totalCurrentVariableDebt
70+
availableLiquidity
71+
price {
72+
priceInEth
73+
}
74+
}
75+
}
76+
"""
77+
78+
try:
79+
session = await self.session
80+
async with session.post(
81+
AAVE_V3_SUBGRAPH_URL,
82+
json={"query": query},
83+
) as response:
84+
if response.status in (401, 403):
85+
logger.error(
86+
f"Aave V3 subgraph auth failed ({response.status}). "
87+
f"Set AAVE_V3_SUBGRAPH_URL with a valid Graph API key."
88+
)
89+
return []
90+
if response.status != 200:
91+
logger.error(f"Aave V3 subgraph returned {response.status}")
92+
return []
93+
data = await response.json()
94+
except Exception as e:
95+
logger.error(f"Error fetching Aave V3 reserves: {e}")
96+
return []
97+
98+
reserves = data.get("data", {}).get("reserves", [])
99+
100+
# Resolve ETH/USD price for TVL conversion.
101+
# Use the WETH token metadata from DexScreener (already cached).
102+
eth_usd_price = 0.0
103+
eth_meta = await token_metadata_repo.get_token_metadata(
104+
"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", "ethereum"
105+
)
106+
if eth_meta and eth_meta.price:
107+
eth_usd_price = float(eth_meta.price)
108+
109+
return self._convert_to_pools(reserves, eth_usd_price)
110+
111+
def _convert_to_pools(
112+
self, reserves: List[Dict[str, Any]], eth_usd_price: float
113+
) -> List[Pool]:
114+
result: List[Pool] = []
115+
116+
for reserve in reserves:
117+
symbol = reserve.get("symbol", "")
118+
name = reserve.get("name", "")
119+
address = reserve.get("underlyingAsset", "")
120+
decimals = int(reserve.get("decimals", 18))
121+
122+
tokens = [
123+
Token(address=address, name=name, symbol=symbol),
124+
]
125+
126+
# Aave liquidityRate is in RAY units (1e27), convert to APR percentage
127+
liquidity_rate = int(reserve.get("liquidityRate", 0))
128+
supply_apr = (liquidity_rate / 1e27) * 100
129+
130+
# Calculate TVL in USD:
131+
# totalATokenSupply is in token-native units (needs /10^decimals)
132+
# priceInEth is the token price denominated in ETH (wei-scaled, 1e18)
133+
# Multiply by eth_usd_price to get USD
134+
total_supply_raw = int(reserve.get("totalATokenSupply", 0))
135+
total_supply = total_supply_raw / (10**decimals)
136+
137+
price_in_eth_raw = reserve.get("price", {}).get("priceInEth", "0")
138+
price_in_eth = int(price_in_eth_raw) / 1e18
139+
140+
if eth_usd_price > 0 and price_in_eth > 0:
141+
tvl_usd = str(round(total_supply * price_in_eth * eth_usd_price, 2))
142+
else:
143+
tvl_usd = str(round(total_supply, 2))
144+
145+
is_stablecoin = symbol.upper() in STABLECOIN_SYMBOLS
146+
147+
pool = Pool(
148+
id=reserve.get("id", ""),
149+
chain=Chain.ETHEREUM,
150+
protocol="Aave V3",
151+
tokens=tokens,
152+
type=PoolType.LENDING,
153+
TVL=tvl_usd,
154+
APRLastDay=round(supply_apr, 2),
155+
APRLastWeek=round(supply_apr, 2),
156+
APRLastMonth=round(supply_apr, 2),
157+
isStableCoin=is_stablecoin,
158+
impermanentLossRisk=False, # Lending has no IL
159+
)
160+
result.append(pool)
161+
162+
return result
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import unittest
2+
import os
3+
import boto3
4+
import asyncio
5+
6+
from onchain.pools.ethereum.uniswap_v3_protocol import UniswapV3Protocol
7+
from onchain.pools.ethereum.aave_protocol import AaveProtocol
8+
from onchain.portfolio.ethereum_portfolio import EthereumPortfolioFetcher
9+
from onchain.tokens.metadata import TokenMetadataRepo
10+
from api.api_types import Chain, PoolType
11+
import dotenv
12+
13+
dotenv.load_dotenv()
14+
15+
16+
class TestEthereumProtocols(unittest.TestCase):
17+
def setUp(self):
18+
dynamodb = boto3.resource(
19+
"dynamodb",
20+
region_name=os.environ.get("AWS_REGION"),
21+
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
22+
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
23+
)
24+
tokens_table = dynamodb.Table("token_metadata_v2")
25+
self.token_metadata_repo = TokenMetadataRepo(tokens_table)
26+
27+
def test_uniswap_v3(self):
28+
uniswap = UniswapV3Protocol()
29+
pools = asyncio.run(uniswap.get_pools(self.token_metadata_repo))
30+
31+
self.assertGreater(len(pools), 5)
32+
print(f"Uniswap V3 pools: {len(pools)}")
33+
34+
# Verify pool structure
35+
for pool in pools[:3]:
36+
self.assertEqual(pool.chain, Chain.ETHEREUM)
37+
self.assertEqual(pool.type, PoolType.AMM)
38+
self.assertIn("Uniswap V3", pool.protocol)
39+
self.assertEqual(len(pool.tokens), 2)
40+
self.assertIsNotNone(pool.TVL)
41+
self.assertGreaterEqual(pool.APRLastDay, 0)
42+
43+
def test_uniswap_v3_stablecoin_detection(self):
44+
uniswap = UniswapV3Protocol()
45+
pools = asyncio.run(uniswap.get_pools(self.token_metadata_repo))
46+
47+
# Non-stablecoin pools should have IL risk
48+
for pool in pools:
49+
if not pool.isStableCoin:
50+
self.assertTrue(pool.impermanentLossRisk)
51+
else:
52+
self.assertFalse(pool.impermanentLossRisk)
53+
54+
def test_aave_v3(self):
55+
aave = AaveProtocol()
56+
pools = asyncio.run(aave.get_pools(self.token_metadata_repo))
57+
58+
self.assertGreater(len(pools), 5)
59+
print(f"Aave V3 reserves: {len(pools)}")
60+
61+
# Verify pool structure
62+
for pool in pools[:3]:
63+
self.assertEqual(pool.chain, Chain.ETHEREUM)
64+
self.assertEqual(pool.type, PoolType.LENDING)
65+
self.assertEqual(pool.protocol, "Aave V3")
66+
self.assertEqual(len(pool.tokens), 1)
67+
self.assertFalse(pool.impermanentLossRisk)
68+
self.assertGreaterEqual(pool.APRLastDay, 0)
69+
70+
def test_aave_v3_tvl_is_usd(self):
71+
"""TVL should be in USD (not raw token units)."""
72+
aave = AaveProtocol()
73+
pools = asyncio.run(aave.get_pools(self.token_metadata_repo))
74+
75+
for pool in pools:
76+
tvl = float(pool.TVL)
77+
# Any active Aave reserve should have meaningful TVL in USD
78+
if tvl > 0:
79+
self.assertGreater(tvl, 100, f"{pool.tokens[0].symbol} TVL too low: {tvl}")
80+
81+
82+
class TestEthereumPortfolio(unittest.TestCase):
83+
def setUp(self):
84+
dynamodb = boto3.resource(
85+
"dynamodb",
86+
region_name=os.environ.get("AWS_REGION"),
87+
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
88+
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
89+
)
90+
tokens_table = dynamodb.Table("token_metadata_v2")
91+
self.token_metadata_repo = TokenMetadataRepo(tokens_table)
92+
self.portfolio_fetcher = EthereumPortfolioFetcher(self.token_metadata_repo)
93+
94+
def tearDown(self):
95+
asyncio.run(self.portfolio_fetcher.close())
96+
97+
def test_get_portfolio(self):
98+
# Vitalik's public wallet
99+
portfolio = asyncio.run(
100+
self.portfolio_fetcher.get_portfolio(
101+
"0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"
102+
)
103+
)
104+
print(portfolio)
105+
106+
self.assertGreater(len(portfolio.holdings), 0)
107+
# Vitalik holds ETH
108+
eth_holdings = [h for h in portfolio.holdings if h.symbol == "ETH"]
109+
self.assertEqual(len(eth_holdings), 1)
110+
self.assertGreater(eth_holdings[0].amount, 0)
111+
112+
def test_empty_wallet(self):
113+
# Zero address should have no holdings
114+
portfolio = asyncio.run(
115+
self.portfolio_fetcher.get_portfolio(
116+
"0x0000000000000000000000000000000000000001"
117+
)
118+
)
119+
self.assertEqual(len(portfolio.holdings), 0)
120+
121+
def test_invalid_address(self):
122+
portfolio = asyncio.run(
123+
self.portfolio_fetcher.get_portfolio("not-a-valid-address")
124+
)
125+
self.assertEqual(len(portfolio.holdings), 0)
126+
self.assertEqual(portfolio.total_value_usd, 0)

0 commit comments

Comments
 (0)