Skip to content
2 changes: 1 addition & 1 deletion apps/hip-3-pusher/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "hip-3-pusher"
version = "0.2.3"
version = "0.2.4"
description = "Hyperliquid HIP-3 market oracle pusher"
readme = "README.md"
requires-python = "==3.13.*"
Expand Down
4 changes: 4 additions & 0 deletions apps/hip-3-pusher/src/pusher/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import Literal

STALE_TIMEOUT_SECONDS = 5
USER_LIMIT_INTERVAL_SECONDS = 1800
HYPERLIQUID_WS_PING_INTERVAL_SECONDS = 20


class KMSConfig(BaseModel):
Expand Down Expand Up @@ -37,6 +39,8 @@ class HyperliquidConfig(BaseModel):
publish_interval: float
publish_timeout: float
enable_publish: bool
user_limit_interval: int = USER_LIMIT_INTERVAL_SECONDS
ws_ping_interval: int = HYPERLIQUID_WS_PING_INTERVAL_SECONDS

@model_validator(mode="after")
def set_default_urls(self):
Expand Down
13 changes: 7 additions & 6 deletions apps/hip-3-pusher/src/pusher/hermes_listener.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import asyncio
import json
from loguru import logger
import time
import websockets
from tenacity import retry, retry_if_exception_type, wait_exponential
from tenacity import retry, retry_if_exception_type, wait_fixed

from pusher.config import Config, STALE_TIMEOUT_SECONDS
from pusher.exception import StaleConnectionError
Expand Down Expand Up @@ -37,11 +36,12 @@ async def subscribe_all(self):
await asyncio.gather(*(self.subscribe_single(url) for url in self.hermes_urls))

@retry(
retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type(Exception),
wait=wait_fixed(1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it stop retrying after some max retries? Probably a good idea to reraise and crash the app when the retries are exhausted
(Same for the other decorators)

reraise=True,
)
async def subscribe_single(self, url):
logger.info("Starting Hermes listener loop: {}", url)
return await self.subscribe_single_inner(url)

async def subscribe_single_inner(self, url):
Expand All @@ -58,8 +58,10 @@ async def subscribe_single_inner(self, url):
data = json.loads(message)
self.parse_hermes_message(data)
except asyncio.TimeoutError:
logger.warning("HermesListener: No messages in {} seconds, reconnecting...", STALE_TIMEOUT_SECONDS)
raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
except websockets.ConnectionClosed:
logger.warning("HermesListener: Connection closed, reconnecting...")
raise
except json.JSONDecodeError as e:
logger.error("Failed to decode JSON message: {}", e)
Expand All @@ -83,7 +85,6 @@ def parse_hermes_message(self, data):
expo = price_object["expo"]
publish_time = price_object["publish_time"]
logger.debug("Hermes update: {} {} {} {}", id, price, expo, publish_time)
now = time.time()
self.hermes_state.put(id, PriceUpdate(price, now))
self.hermes_state.put(id, PriceUpdate(price, publish_time))
except Exception as e:
logger.error("parse_hermes_message error: {}", e)
66 changes: 50 additions & 16 deletions apps/hip-3-pusher/src/pusher/hyperliquid_listener.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
import json
from enum import StrEnum

import websockets
from loguru import logger
from tenacity import retry, retry_if_exception_type, wait_exponential
from tenacity import retry, retry_if_exception_type, wait_fixed
import time

from pusher.config import Config, STALE_TIMEOUT_SECONDS
Expand All @@ -14,6 +16,15 @@
HYPERLIQUID_MAINNET_WS_URL = "wss://api.hyperliquid.xyz/ws"
HYPERLIQUID_TESTNET_WS_URL = "wss://api.hyperliquid-testnet.xyz/ws"

class HLChannel(StrEnum):
CHANNEL_ACTIVE_ASSET_CTX = "activeAssetCtx"
CHANNEL_ALL_MIDS = "allMids"
CHANNEL_SUBSCRIPTION_RESPONSE = "subscriptionResponse"
CHANNEL_PONG = "pong"
CHANNEL_ERROR = "error"

DATA_CHANNELS = [HLChannel.CHANNEL_ACTIVE_ASSET_CTX, HLChannel.CHANNEL_ALL_MIDS]


class HyperliquidListener:
"""
Expand All @@ -27,6 +38,7 @@ def __init__(self, config: Config, hl_oracle_state: PriceSourceState, hl_mark_st
self.hl_oracle_state = hl_oracle_state
self.hl_mark_state = hl_mark_state
self.hl_mid_state = hl_mid_state
self.ws_ping_interval = config.hyperliquid.ws_ping_interval

def get_subscribe_request(self, asset):
return {
Expand All @@ -38,11 +50,12 @@ async def subscribe_all(self):
await asyncio.gather(*(self.subscribe_single(hyperliquid_ws_url) for hyperliquid_ws_url in self.hyperliquid_ws_urls))

@retry(
retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type(Exception),
wait=wait_fixed(1),
reraise=True,
)
async def subscribe_single(self, url):
logger.info("Starting Hyperliquid listener loop: {}", url)
return await self.subscribe_single_inner(url)

async def subscribe_single_inner(self, url):
Expand All @@ -59,48 +72,69 @@ async def subscribe_single_inner(self, url):
await ws.send(json.dumps(subscribe_all_mids_request))
logger.info("Sent subscribe request for allMids for dex: {} to {}", self.market_name, url)

now = time.time()
channel_last_message_timestamp = {channel: now for channel in HLChannel}
last_ping_timestamp = now

# listen for updates
while True:
try:
message = await asyncio.wait_for(ws.recv(), timeout=STALE_TIMEOUT_SECONDS)
data = json.loads(message)
channel = data.get("channel", None)
now = time.time()
if not channel:
logger.error("No channel in message: {}", data)
elif channel == "subscriptionResponse":
logger.debug("Received subscription response: {}", data)
elif channel == "error":
elif channel == HLChannel.CHANNEL_SUBSCRIPTION_RESPONSE:
logger.info("Received subscription response: {}", data)
elif channel == HLChannel.CHANNEL_ERROR:
logger.error("Received Hyperliquid error response: {}", data)
elif channel == "activeAssetCtx":
self.parse_hyperliquid_active_asset_ctx_update(data)
elif channel == "allMids":
self.parse_hyperliquid_all_mids_update(data)
elif channel == HLChannel.CHANNEL_ACTIVE_ASSET_CTX:
self.parse_hyperliquid_active_asset_ctx_update(data, now)
channel_last_message_timestamp[channel] = now
elif channel == HLChannel.CHANNEL_ALL_MIDS:
self.parse_hyperliquid_all_mids_update(data, now)
channel_last_message_timestamp[channel] = now
elif channel == HLChannel.CHANNEL_PONG:
logger.debug("Received pong")
else:
logger.error("Received unknown channel: {}", channel)

# check for stale channels
for channel in DATA_CHANNELS:
if now - channel_last_message_timestamp[channel] > STALE_TIMEOUT_SECONDS:
logger.warning("HyperliquidLister: no messages in channel {} stale in {} seconds; reconnecting...", channel, STALE_TIMEOUT_SECONDS)
raise StaleConnectionError(f"No messages in channel {channel} in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...")

# ping if we need to
if now - last_ping_timestamp > self.ws_ping_interval:
await ws.send(json.dumps({"method": "ping"}))
last_ping_timestamp = now
except asyncio.TimeoutError:
raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...")
except websockets.ConnectionClosed:
logger.warning("HyperliquidListener: No messages overall in {} seconds, reconnecting...", STALE_TIMEOUT_SECONDS)
raise StaleConnectionError(f"No messages overall in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...")
except websockets.ConnectionClosed as e:
rc, rr = e.rcvd.code if e.rcvd else None, e.rcvd.reason if e.rcvd else None
logger.warning("HyperliquidListener: Websocket connection closed (code={} reason={}); reconnecting...", rc, rr)
raise
except json.JSONDecodeError as e:
logger.error("Failed to decode JSON message: {} error: {}", message, e)
except Exception as e:
logger.error("Unexpected exception: {}", e)

def parse_hyperliquid_active_asset_ctx_update(self, message):
def parse_hyperliquid_active_asset_ctx_update(self, message, now):
try:
ctx = message["data"]["ctx"]
symbol = message["data"]["coin"]
now = time.time()
self.hl_oracle_state.put(symbol, PriceUpdate(ctx["oraclePx"], now))
self.hl_mark_state.put(symbol, PriceUpdate(ctx["markPx"], now))
logger.debug("activeAssetCtx symbol: {} oraclePx: {} markPx: {}", symbol, ctx["oraclePx"], ctx["markPx"])
except Exception as e:
logger.error("parse_hyperliquid_active_asset_ctx_update error: message: {} e: {}", message, e)

def parse_hyperliquid_all_mids_update(self, message):
def parse_hyperliquid_all_mids_update(self, message, now):
try:
mids = message["data"]["mids"]
now = time.time()
for mid in mids:
self.hl_mid_state.put(mid, PriceUpdate(mids[mid], now))
logger.debug("allMids: {}", mids)
Expand Down
16 changes: 9 additions & 7 deletions apps/hip-3-pusher/src/pusher/lazer_listener.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import asyncio
import json
from loguru import logger
import time
import websockets
from tenacity import retry, retry_if_exception_type, wait_exponential
from tenacity import retry, retry_if_exception_type, wait_fixed

from pusher.config import Config, STALE_TIMEOUT_SECONDS
from pusher.exception import StaleConnectionError
Expand Down Expand Up @@ -41,11 +40,12 @@ async def subscribe_all(self):
await asyncio.gather(*(self.subscribe_single(router_url) for router_url in self.lazer_urls))

@retry(
retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type(Exception),
wait=wait_fixed(1),
reraise=True,
)
async def subscribe_single(self, router_url):
logger.info("Starting Lazer listener loop: {}", router_url)
return await self.subscribe_single_inner(router_url)

async def subscribe_single_inner(self, router_url):
Expand All @@ -66,8 +66,10 @@ async def subscribe_single_inner(self, router_url):
data = json.loads(message)
self.parse_lazer_message(data)
except asyncio.TimeoutError:
logger.warning("LazerListener: No messages in {} seconds, reconnecting...", STALE_TIMEOUT_SECONDS)
raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
except websockets.ConnectionClosed:
logger.warning("LazerListener: Connection closed, reconnecting...")
raise
except json.JSONDecodeError as e:
logger.error("Failed to decode JSON message: {}", e)
Expand All @@ -85,14 +87,14 @@ def parse_lazer_message(self, data):
if data.get("type", "") != "streamUpdated":
return
price_feeds = data["parsed"]["priceFeeds"]
logger.debug("price_feeds: {}", price_feeds)
now = time.time()
timestamp = int(data["parsed"]["timestampUs"]) / 1_000_000.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name the variable something like timestamp_ms (or whatever the resolution is)

logger.debug("price_feeds: {} timestamp: {}", price_feeds, timestamp)
for feed_update in price_feeds:
feed_id = feed_update.get("priceFeedId", None)
price = feed_update.get("price", None)
if feed_id is None or price is None:
continue
else:
self.lazer_state.put(feed_id, PriceUpdate(price, now))
self.lazer_state.put(feed_id, PriceUpdate(price, timestamp))
except Exception as e:
logger.error("parse_lazer_message error: {}", e)
3 changes: 3 additions & 0 deletions apps/hip-3-pusher/src/pusher/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pusher.price_state import PriceState
from pusher.publisher import Publisher
from pusher.metrics import Metrics
from pusher.user_limit_listener import UserLimitListener


def load_config():
Expand Down Expand Up @@ -52,13 +53,15 @@ async def main():
lazer_listener = LazerListener(config, price_state.lazer_state)
hermes_listener = HermesListener(config, price_state.hermes_state)
seda_listener = SedaListener(config, price_state.seda_state)
user_limit_listener = UserLimitListener(config, metrics, publisher.user_limit_address)

await asyncio.gather(
publisher.run(),
hyperliquid_listener.subscribe_all(),
lazer_listener.subscribe_all(),
hermes_listener.subscribe_all(),
seda_listener.run(),
user_limit_listener.run(),
Comment on lines 58 to +64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the expected behavior if any of these tasks exit/raise? I guess the app crashes and we let k8s restart it?

)
logger.info("Exiting hip-3-pusher..")

Expand Down
5 changes: 5 additions & 0 deletions apps/hip-3-pusher/src/pusher/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ def _init_metrics(self):
name="hip_3_relayer_price_config",
description="Price source config",
)
# labels: dex, user
self.user_request_balance = self.meter.create_gauge(
name="hip_3_relayer_user_request_balance",
description="Number of update requests left before rate limit",
)

def set_price_configs(self, dex: str, price_config: PriceConfig):
self._set_price_config_type(dex, price_config.oracle, "oracle")
Expand Down
17 changes: 10 additions & 7 deletions apps/hip-3-pusher/src/pusher/price_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,14 @@ def get_prices(self, symbol_configs: dict[str, list[PriceSourceConfig]], oracle_
pxs = {}
for symbol in symbol_configs:
for source_config in symbol_configs[symbol]:
# find first valid price in the waterfall
px = self.get_price(source_config, oracle_update)
if px is not None:
pxs[f"{self.market_name}:{symbol}"] = str(px)
break
try:
# find first valid price in the waterfall
px = self.get_price(source_config, oracle_update)
if px is not None:
pxs[f"{self.market_name}:{symbol}"] = str(px)
break
except Exception as e:
logger.exception("get_price exception for symbol: {} source_config: {} error: {}", symbol, source_config, e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For printing out expections use repr(e) to maintain useful type context in the output

return pxs

def get_price(self, price_source_config: PriceSourceConfig, oracle_update: OracleUpdate):
Expand Down Expand Up @@ -125,10 +128,10 @@ def get_price_from_pair_source(self, base_source: PriceSource, quote_source: Pri
if base_price is None:
return None
quote_price = self.get_price_from_single_source(quote_source)
if quote_price is None:
if quote_price is None or float(quote_price) == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What edge case are we covering here?
Looks like we're casting the string quote price to a float and comparing it to integer zero. Also, you may get caught out by float representation equality issues here.

return None

return str(round(float(base_price) / float(quote_price), 2))
return str(float(base_price) / float(quote_price))

def get_price_from_oracle_mid_average(self, symbol: str, oracle_update: OracleUpdate):
oracle_price = oracle_update.oracle.get(symbol)
Expand Down
Loading