From 72a777da52c9b1717e9b3b59be5ebc7ed04728fd Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Wed, 10 Dec 2025 10:47:15 -0600 Subject: [PATCH 01/11] Check for staleness in each Hyperliquid websocket channel --- .../src/pusher/hermes_listener.py | 2 +- .../src/pusher/hyperliquid_listener.py | 34 +++++++++++++------ .../hip-3-pusher/src/pusher/lazer_listener.py | 2 +- 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/apps/hip-3-pusher/src/pusher/hermes_listener.py b/apps/hip-3-pusher/src/pusher/hermes_listener.py index 72f481c627..9d25939471 100644 --- a/apps/hip-3-pusher/src/pusher/hermes_listener.py +++ b/apps/hip-3-pusher/src/pusher/hermes_listener.py @@ -37,7 +37,7 @@ async def subscribe_all(self): await asyncio.gather(*(self.subscribe_single(url) for url in self.hermes_urls)) @retry( - retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)), + retry=retry_if_exception_type(Exception), wait=wait_exponential(multiplier=1, min=1, max=10), reraise=True, ) diff --git a/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py b/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py index 60faf4c51a..2d46b74871 100644 --- a/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py +++ b/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py @@ -1,5 +1,7 @@ import asyncio import json +from enum import StrEnum + import websockets from loguru import logger from tenacity import retry, retry_if_exception_type, wait_exponential @@ -14,6 +16,10 @@ HYPERLIQUID_MAINNET_WS_URL = "wss://api.hyperliquid.xyz/ws" HYPERLIQUID_TESTNET_WS_URL = "wss://api.hyperliquid-testnet.xyz/ws" +class HLChannel(StrEnum): + CHANNEL_ACTIVE_ASSET_CTX = "activeAssetCtx" + CHANNEL_ALL_MIDS = "allMids" + class HyperliquidListener: """ @@ -38,7 +44,7 @@ async def subscribe_all(self): await asyncio.gather(*(self.subscribe_single(hyperliquid_ws_url) for hyperliquid_ws_url in self.hyperliquid_ws_urls)) @retry( - retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)), + retry=retry_if_exception_type(Exception), wait=wait_exponential(multiplier=1, min=1, max=10), reraise=True, ) @@ -59,26 +65,36 @@ async def subscribe_single_inner(self, url): await ws.send(json.dumps(subscribe_all_mids_request)) logger.info("Sent subscribe request for allMids for dex: {} to {}", self.market_name, url) + channel_last_message_timestamp = {channel: time.time() for channel in HLChannel} # listen for updates while True: try: message = await asyncio.wait_for(ws.recv(), timeout=STALE_TIMEOUT_SECONDS) data = json.loads(message) channel = data.get("channel", None) + now = time.time() if not channel: logger.error("No channel in message: {}", data) elif channel == "subscriptionResponse": logger.debug("Received subscription response: {}", data) elif channel == "error": logger.error("Received Hyperliquid error response: {}", data) - elif channel == "activeAssetCtx": - self.parse_hyperliquid_active_asset_ctx_update(data) - elif channel == "allMids": - self.parse_hyperliquid_all_mids_update(data) + elif channel == HLChannel.CHANNEL_ACTIVE_ASSET_CTX: + self.parse_hyperliquid_active_asset_ctx_update(data, now) + channel_last_message_timestamp[channel] = now + elif channel == HLChannel.CHANNEL_ALL_MIDS: + self.parse_hyperliquid_all_mids_update(data, now) + channel_last_message_timestamp[channel] = now else: logger.error("Received unknown channel: {}", channel) + + # check for stale channels + for channel in HLChannel: + if now - channel_last_message_timestamp[channel] > STALE_TIMEOUT_SECONDS: + logger.error("Hyperliquid channel {} stale; restarting websocket listener", channel) + raise StaleConnectionError(f"No messages in channel {channel} in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...") except asyncio.TimeoutError: - raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...") + raise StaleConnectionError(f"No messages overall in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...") except websockets.ConnectionClosed: raise except json.JSONDecodeError as e: @@ -86,21 +102,19 @@ async def subscribe_single_inner(self, url): except Exception as e: logger.error("Unexpected exception: {}", e) - def parse_hyperliquid_active_asset_ctx_update(self, message): + def parse_hyperliquid_active_asset_ctx_update(self, message, now): try: ctx = message["data"]["ctx"] symbol = message["data"]["coin"] - now = time.time() self.hl_oracle_state.put(symbol, PriceUpdate(ctx["oraclePx"], now)) self.hl_mark_state.put(symbol, PriceUpdate(ctx["markPx"], now)) logger.debug("activeAssetCtx symbol: {} oraclePx: {} markPx: {}", symbol, ctx["oraclePx"], ctx["markPx"]) except Exception as e: logger.error("parse_hyperliquid_active_asset_ctx_update error: message: {} e: {}", message, e) - def parse_hyperliquid_all_mids_update(self, message): + def parse_hyperliquid_all_mids_update(self, message, now): try: mids = message["data"]["mids"] - now = time.time() for mid in mids: self.hl_mid_state.put(mid, PriceUpdate(mids[mid], now)) logger.debug("allMids: {}", mids) diff --git a/apps/hip-3-pusher/src/pusher/lazer_listener.py b/apps/hip-3-pusher/src/pusher/lazer_listener.py index 43aa18aa34..66eb9cb81a 100644 --- a/apps/hip-3-pusher/src/pusher/lazer_listener.py +++ b/apps/hip-3-pusher/src/pusher/lazer_listener.py @@ -41,7 +41,7 @@ async def subscribe_all(self): await asyncio.gather(*(self.subscribe_single(router_url) for router_url in self.lazer_urls)) @retry( - retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)), + retry=retry_if_exception_type(Exception), wait=wait_exponential(multiplier=1, min=1, max=10), reraise=True, ) From 1e24b697a640adea20938272c51a5f0b05537b50 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Wed, 10 Dec 2025 10:50:37 -0600 Subject: [PATCH 02/11] version bump --- apps/hip-3-pusher/pyproject.toml | 2 +- apps/hip-3-pusher/uv.lock | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/hip-3-pusher/pyproject.toml b/apps/hip-3-pusher/pyproject.toml index 0e06c775a5..e93c42c505 100644 --- a/apps/hip-3-pusher/pyproject.toml +++ b/apps/hip-3-pusher/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "hip-3-pusher" -version = "0.2.3" +version = "0.2.4" description = "Hyperliquid HIP-3 market oracle pusher" readme = "README.md" requires-python = "==3.13.*" diff --git a/apps/hip-3-pusher/uv.lock b/apps/hip-3-pusher/uv.lock index 488cd0f4ec..d629b998bd 100644 --- a/apps/hip-3-pusher/uv.lock +++ b/apps/hip-3-pusher/uv.lock @@ -351,7 +351,7 @@ wheels = [ [[package]] name = "hip-3-pusher" -version = "0.2.3" +version = "0.2.4" source = { editable = "." } dependencies = [ { name = "boto3" }, From f7d7f131396b3526856ea94458371d3f78fe1a34 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Wed, 10 Dec 2025 10:55:02 -0600 Subject: [PATCH 03/11] Add missing external perp error reason --- apps/hip-3-pusher/src/pusher/publisher.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/apps/hip-3-pusher/src/pusher/publisher.py b/apps/hip-3-pusher/src/pusher/publisher.py index de954b3b3f..e864589f82 100644 --- a/apps/hip-3-pusher/src/pusher/publisher.py +++ b/apps/hip-3-pusher/src/pusher/publisher.py @@ -27,6 +27,8 @@ class PushErrorReason(StrEnum): INTERNAL_ERROR = "internal_error" # Invalid nonce, if the pusher account pushes multiple transactions with the same ms timestamp INVALID_NONCE = "invalid_nonce" + # Missing externalPerpPxs + MISSING_EXTERNAL_PERP_PXS = "missing_external_perp_pxs" # Some error string we haven't categorized yet UNKNOWN = "unknown" @@ -234,6 +236,8 @@ def _get_error_reason(self, response): return PushErrorReason.USER_LIMIT elif "Invalid nonce" in response: return PushErrorReason.INVALID_NONCE + elif "externalPerpPxs missing perp" in response: + return PushErrorReason.MISSING_EXTERNAL_PERP_PXS else: logger.warning("Unrecognized error response: {}", response) return PushErrorReason.UNKNOWN From 10c6f14cdfb0fd877739175fde8367ed00a8df8a Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Wed, 10 Dec 2025 14:41:54 -0600 Subject: [PATCH 04/11] User rate limit listener, async-ify sdk calls --- apps/hip-3-pusher/src/pusher/config.py | 2 + .../src/pusher/hermes_listener.py | 3 ++ .../src/pusher/hyperliquid_listener.py | 5 ++- .../hip-3-pusher/src/pusher/lazer_listener.py | 3 ++ apps/hip-3-pusher/src/pusher/main.py | 3 ++ apps/hip-3-pusher/src/pusher/metrics.py | 5 +++ apps/hip-3-pusher/src/pusher/publisher.py | 42 ++++++++++++------- .../src/pusher/user_limit_listener.py | 36 ++++++++++++++++ 8 files changed, 83 insertions(+), 16 deletions(-) create mode 100644 apps/hip-3-pusher/src/pusher/user_limit_listener.py diff --git a/apps/hip-3-pusher/src/pusher/config.py b/apps/hip-3-pusher/src/pusher/config.py index 74acea4e81..2bc23cea94 100644 --- a/apps/hip-3-pusher/src/pusher/config.py +++ b/apps/hip-3-pusher/src/pusher/config.py @@ -4,6 +4,7 @@ from typing import Literal STALE_TIMEOUT_SECONDS = 5 +USER_LIMIT_INTERVAL_SECONDS = 1800 class KMSConfig(BaseModel): @@ -37,6 +38,7 @@ class HyperliquidConfig(BaseModel): publish_interval: float publish_timeout: float enable_publish: bool + user_limit_interval: int = USER_LIMIT_INTERVAL_SECONDS @model_validator(mode="after") def set_default_urls(self): diff --git a/apps/hip-3-pusher/src/pusher/hermes_listener.py b/apps/hip-3-pusher/src/pusher/hermes_listener.py index 9d25939471..9527e02f33 100644 --- a/apps/hip-3-pusher/src/pusher/hermes_listener.py +++ b/apps/hip-3-pusher/src/pusher/hermes_listener.py @@ -42,6 +42,7 @@ async def subscribe_all(self): reraise=True, ) async def subscribe_single(self, url): + logger.info("Starting Hermes listener loop: {}", url) return await self.subscribe_single_inner(url) async def subscribe_single_inner(self, url): @@ -58,8 +59,10 @@ async def subscribe_single_inner(self, url): data = json.loads(message) self.parse_hermes_message(data) except asyncio.TimeoutError: + logger.warning("HermesListener: No messages in {} seconds, reconnecting...", STALE_TIMEOUT_SECONDS) raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting") except websockets.ConnectionClosed: + logger.warning("HermesListener: Connection closed, reconnecting...") raise except json.JSONDecodeError as e: logger.error("Failed to decode JSON message: {}", e) diff --git a/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py b/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py index 2d46b74871..abf66c4b39 100644 --- a/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py +++ b/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py @@ -49,6 +49,7 @@ async def subscribe_all(self): reraise=True, ) async def subscribe_single(self, url): + logger.info("Starting Hyperliquid listener loop: {}", url) return await self.subscribe_single_inner(url) async def subscribe_single_inner(self, url): @@ -91,11 +92,13 @@ async def subscribe_single_inner(self, url): # check for stale channels for channel in HLChannel: if now - channel_last_message_timestamp[channel] > STALE_TIMEOUT_SECONDS: - logger.error("Hyperliquid channel {} stale; restarting websocket listener", channel) + logger.warning("HyperliquidLister: no messages in channel {} stale in {} seconds; reconnecting...", channel, STALE_TIMEOUT_SECONDS) raise StaleConnectionError(f"No messages in channel {channel} in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...") except asyncio.TimeoutError: + logger.warning("HyperliquidListener: No messages overall in {} seconds, reconnecting...", STALE_TIMEOUT_SECONDS) raise StaleConnectionError(f"No messages overall in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...") except websockets.ConnectionClosed: + logger.warning("HyperliquidListener: Connection closed, reconnecting...") raise except json.JSONDecodeError as e: logger.error("Failed to decode JSON message: {} error: {}", message, e) diff --git a/apps/hip-3-pusher/src/pusher/lazer_listener.py b/apps/hip-3-pusher/src/pusher/lazer_listener.py index 66eb9cb81a..3af86ae74b 100644 --- a/apps/hip-3-pusher/src/pusher/lazer_listener.py +++ b/apps/hip-3-pusher/src/pusher/lazer_listener.py @@ -46,6 +46,7 @@ async def subscribe_all(self): reraise=True, ) async def subscribe_single(self, router_url): + logger.info("Starting Lazer listener loop: {}", router_url) return await self.subscribe_single_inner(router_url) async def subscribe_single_inner(self, router_url): @@ -66,8 +67,10 @@ async def subscribe_single_inner(self, router_url): data = json.loads(message) self.parse_lazer_message(data) except asyncio.TimeoutError: + logger.warning("LazerListener: No messages in {} seconds, reconnecting...", STALE_TIMEOUT_SECONDS) raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting") except websockets.ConnectionClosed: + logger.warning("LazerListener: Connection closed, reconnecting...") raise except json.JSONDecodeError as e: logger.error("Failed to decode JSON message: {}", e) diff --git a/apps/hip-3-pusher/src/pusher/main.py b/apps/hip-3-pusher/src/pusher/main.py index 9cbbac1d30..3f7e6b00e9 100644 --- a/apps/hip-3-pusher/src/pusher/main.py +++ b/apps/hip-3-pusher/src/pusher/main.py @@ -13,6 +13,7 @@ from pusher.price_state import PriceState from pusher.publisher import Publisher from pusher.metrics import Metrics +from pusher.user_limit_listener import UserLimitListener def load_config(): @@ -52,6 +53,7 @@ async def main(): lazer_listener = LazerListener(config, price_state.lazer_state) hermes_listener = HermesListener(config, price_state.hermes_state) seda_listener = SedaListener(config, price_state.seda_state) + user_limit_listener = UserLimitListener(config, metrics, publisher.user_limit_address) await asyncio.gather( publisher.run(), @@ -59,6 +61,7 @@ async def main(): lazer_listener.subscribe_all(), hermes_listener.subscribe_all(), seda_listener.run(), + user_limit_listener.run(), ) logger.info("Exiting hip-3-pusher..") diff --git a/apps/hip-3-pusher/src/pusher/metrics.py b/apps/hip-3-pusher/src/pusher/metrics.py index c0c3ca3f75..3fe3fe10ab 100644 --- a/apps/hip-3-pusher/src/pusher/metrics.py +++ b/apps/hip-3-pusher/src/pusher/metrics.py @@ -48,6 +48,11 @@ def _init_metrics(self): name="hip_3_relayer_price_config", description="Price source config", ) + # labels: user + self.user_request_balance = self.meter.create_gauge( + name="hip_3_relayer_user_request_balance", + description="Number of update requests left before rate limit", + ) def set_price_configs(self, dex: str, price_config: PriceConfig): self._set_price_config_type(dex, price_config.oracle, "oracle") diff --git a/apps/hip-3-pusher/src/pusher/publisher.py b/apps/hip-3-pusher/src/pusher/publisher.py index e864589f82..3e4816605b 100644 --- a/apps/hip-3-pusher/src/pusher/publisher.py +++ b/apps/hip-3-pusher/src/pusher/publisher.py @@ -2,6 +2,7 @@ from enum import StrEnum import time +from hyperliquid.utils.types import Meta, SpotMeta from loguru import logger from pathlib import Path @@ -52,9 +53,12 @@ def __init__(self, config: Config, price_state: PriceState, metrics: Metrics): oracle_pusher_key = Path(config.hyperliquid.oracle_pusher_key_path).read_text().strip() self.oracle_account: LocalAccount = Account.from_key(oracle_pusher_key) logger.info("oracle pusher local pubkey: {}", self.oracle_account.address) + self.user_limit_address = self.oracle_account.address self.publisher_exchanges = [Exchange(wallet=self.oracle_account, base_url=url, - timeout=config.hyperliquid.publish_timeout) + timeout=config.hyperliquid.publish_timeout, + meta=Meta(universe=[]), + spot_meta=SpotMeta(universe=[], tokens=[])) for url in self.push_urls] if config.kms.enable_kms: # TODO: Add KMS/multisig support @@ -63,11 +67,13 @@ def __init__(self, config: Config, price_state: PriceState, metrics: Metrics): self.enable_kms = True self.kms_signer = KMSSigner(config, self.publisher_exchanges) + self.user_limit_address = self.kms_signer.address if config.multisig.enable_multisig: if not config.multisig.multisig_address: raise Exception("Multisig enabled but missing multisig address") self.multisig_address = config.multisig.multisig_address + self.user_limit_address = self.multisig_address else: self.multisig_address = None @@ -83,11 +89,11 @@ async def run(self): while True: await asyncio.sleep(self.publish_interval) try: - self.publish() + await self.publish() except Exception as e: logger.exception("Publisher.publish() exception: {}", repr(e)) - def publish(self): + async def publish(self): oracle_update = self.price_state.get_all_prices() logger.debug("oracle_update: {}", oracle_update) @@ -108,13 +114,13 @@ def publish(self): external_perp_pxs=external_perp_pxs, ) elif self.multisig_address: - push_response = self._send_multisig_update( + push_response = await self._send_multisig_update( oracle_pxs=oracle_pxs, all_mark_pxs=mark_pxs, external_perp_pxs=external_perp_pxs, ) else: - push_response = self._send_update( + push_response = await self._send_update( oracle_pxs=oracle_pxs, all_mark_pxs=mark_pxs, external_perp_pxs=external_perp_pxs, @@ -131,20 +137,23 @@ def publish(self): self._record_push_interval_metric() - def _send_update(self, oracle_pxs, all_mark_pxs, external_perp_pxs): + async def _send_update(self, oracle_pxs, all_mark_pxs, external_perp_pxs): for exchange in self.publisher_exchanges: try: - return exchange.perp_deploy_set_oracle( - dex=self.market_name, - oracle_pxs=oracle_pxs, - all_mark_pxs=all_mark_pxs, - external_perp_pxs=external_perp_pxs, - ) + return await asyncio.to_thread(self._request_single, exchange, oracle_pxs, all_mark_pxs, external_perp_pxs) except Exception as e: logger.exception("perp_deploy_set_oracle exception for endpoint: {} error: {}", exchange.base_url, repr(e)) raise PushError("all push endpoints failed") + def _request_single(self, exchange, oracle_pxs, all_mark_pxs, external_perp_pxs): + return exchange.perp_deploy_set_oracle( + dex=self.market_name, + oracle_pxs=oracle_pxs, + all_mark_pxs=all_mark_pxs, + external_perp_pxs=external_perp_pxs, + ) + def _handle_response(self, response, symbols: list[str]): logger.debug("oracle update response: {}", response) status = response.get("status") @@ -174,10 +183,10 @@ def _record_push_interval_metric(self): self.last_push_time = now logger.debug("Push interval: {}", push_interval) - def _send_multisig_update(self, oracle_pxs, all_mark_pxs, external_perp_pxs): + async def _send_multisig_update(self, oracle_pxs, all_mark_pxs, external_perp_pxs): for exchange in self.publisher_exchanges: try: - return self._send_single_multisig_update( + return await self._send_single_multisig_update( exchange=exchange, oracle_pxs=oracle_pxs, all_mark_pxs=all_mark_pxs, @@ -188,7 +197,7 @@ def _send_multisig_update(self, oracle_pxs, all_mark_pxs, external_perp_pxs): raise PushError("all push endpoints failed for multisig") - def _send_single_multisig_update(self, exchange, oracle_pxs, all_mark_pxs, external_perp_pxs): + async def _send_single_multisig_update(self, exchange, oracle_pxs, all_mark_pxs, external_perp_pxs): timestamp = get_timestamp_ms() oracle_pxs_wire = sorted(list(oracle_pxs.items())) mark_pxs_wire = [sorted(list(mark_pxs.items())) for mark_pxs in all_mark_pxs] @@ -212,6 +221,9 @@ def _send_single_multisig_update(self, exchange, oracle_pxs, all_mark_pxs, exter payload_multi_sig_user=self.multisig_address, outer_signer=self.oracle_account.address, )] + return await asyncio.to_thread(self._request_multi_sig, exchange, action, signatures, timestamp) + + def _request_multi_sig(self, exchange, action, signatures, timestamp): return exchange.multi_sig(self.multisig_address, action, signatures, timestamp) def _update_attempts_total(self, status: str, error_reason: str | None, symbols: list[str]): diff --git a/apps/hip-3-pusher/src/pusher/user_limit_listener.py b/apps/hip-3-pusher/src/pusher/user_limit_listener.py new file mode 100644 index 0000000000..0cffdba3ef --- /dev/null +++ b/apps/hip-3-pusher/src/pusher/user_limit_listener.py @@ -0,0 +1,36 @@ +import asyncio + +from hyperliquid.utils.types import SpotMeta, Meta +from loguru import logger + +from hyperliquid.info import Info +from hyperliquid.utils import constants + +from pusher.config import Config +from pusher.metrics import Metrics + + +class UserLimitListener: + def __init__(self, config: Config, metrics: Metrics, address: str): + self.address = address.lower() + self.metrics = metrics + self.interval = config.hyperliquid.user_limit_interval + + base_url = constants.TESTNET_API_URL if config.hyperliquid.use_testnet else constants.MAINNET_API_URL + self.info = Info(base_url=base_url, skip_ws=True, meta=Meta(universe=[]), spot_meta=SpotMeta(universe=[], tokens=[])) + + async def run(self): + logger.info("Starting user limit listener url: {} address: {} interval: {}", self.info.base_url, self.address, self.interval) + while True: + try: + response = await asyncio.to_thread(self._request) + logger.debug("userRateLimit response: {}", response) + balance = response["nRequestsSurplus"] - response["nRequestsCap"] - response["nRequestsUsed"] + logger.debug("userRateLimit user: {} balance: {}", self.address, balance) + self.metrics.user_request_balance.set(balance, {"user": self.address}) + except Exception as e: + logger.error("userRateLimit query failed: {}", e) + await asyncio.sleep(self.interval) + + def _request(self): + return self.info.user_rate_limit(self.address) From 0529c6af518c709a3495c5baec399a9966b33cec Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Wed, 10 Dec 2025 16:39:03 -0600 Subject: [PATCH 05/11] fixes --- apps/hip-3-pusher/src/pusher/config.py | 2 ++ .../src/pusher/hyperliquid_listener.py | 31 ++++++++++++++----- apps/hip-3-pusher/src/pusher/metrics.py | 2 +- .../src/pusher/user_limit_listener.py | 3 +- 4 files changed, 29 insertions(+), 9 deletions(-) diff --git a/apps/hip-3-pusher/src/pusher/config.py b/apps/hip-3-pusher/src/pusher/config.py index 2bc23cea94..9bd49996fe 100644 --- a/apps/hip-3-pusher/src/pusher/config.py +++ b/apps/hip-3-pusher/src/pusher/config.py @@ -5,6 +5,7 @@ STALE_TIMEOUT_SECONDS = 5 USER_LIMIT_INTERVAL_SECONDS = 1800 +HYPERLIQUID_WS_PING_INTERVAL_SECONDS = 20 class KMSConfig(BaseModel): @@ -39,6 +40,7 @@ class HyperliquidConfig(BaseModel): publish_timeout: float enable_publish: bool user_limit_interval: int = USER_LIMIT_INTERVAL_SECONDS + ws_ping_interval: int = HYPERLIQUID_WS_PING_INTERVAL_SECONDS @model_validator(mode="after") def set_default_urls(self): diff --git a/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py b/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py index abf66c4b39..fa0cb9da55 100644 --- a/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py +++ b/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py @@ -19,6 +19,11 @@ class HLChannel(StrEnum): CHANNEL_ACTIVE_ASSET_CTX = "activeAssetCtx" CHANNEL_ALL_MIDS = "allMids" + CHANNEL_SUBSCRIPTION_RESPONSE = "subscriptionResponse" + CHANNEL_PONG = "pong" + CHANNEL_ERROR = "error" + +DATA_CHANNELS = [HLChannel.CHANNEL_ACTIVE_ASSET_CTX, HLChannel.CHANNEL_ALL_MIDS] class HyperliquidListener: @@ -33,6 +38,7 @@ def __init__(self, config: Config, hl_oracle_state: PriceSourceState, hl_mark_st self.hl_oracle_state = hl_oracle_state self.hl_mark_state = hl_mark_state self.hl_mid_state = hl_mid_state + self.ws_ping_interval = config.hyperliquid.ws_ping_interval def get_subscribe_request(self, asset): return { @@ -66,7 +72,10 @@ async def subscribe_single_inner(self, url): await ws.send(json.dumps(subscribe_all_mids_request)) logger.info("Sent subscribe request for allMids for dex: {} to {}", self.market_name, url) - channel_last_message_timestamp = {channel: time.time() for channel in HLChannel} + now = time.time() + channel_last_message_timestamp = {channel: now for channel in HLChannel} + last_ping_timestamp = now + # listen for updates while True: try: @@ -76,9 +85,9 @@ async def subscribe_single_inner(self, url): now = time.time() if not channel: logger.error("No channel in message: {}", data) - elif channel == "subscriptionResponse": - logger.debug("Received subscription response: {}", data) - elif channel == "error": + elif channel == HLChannel.CHANNEL_SUBSCRIPTION_RESPONSE: + logger.info("Received subscription response: {}", data) + elif channel == HLChannel.CHANNEL_ERROR: logger.error("Received Hyperliquid error response: {}", data) elif channel == HLChannel.CHANNEL_ACTIVE_ASSET_CTX: self.parse_hyperliquid_active_asset_ctx_update(data, now) @@ -86,19 +95,27 @@ async def subscribe_single_inner(self, url): elif channel == HLChannel.CHANNEL_ALL_MIDS: self.parse_hyperliquid_all_mids_update(data, now) channel_last_message_timestamp[channel] = now + elif channel == HLChannel.CHANNEL_PONG: + logger.debug("Received pong") else: logger.error("Received unknown channel: {}", channel) # check for stale channels - for channel in HLChannel: + for channel in DATA_CHANNELS: if now - channel_last_message_timestamp[channel] > STALE_TIMEOUT_SECONDS: logger.warning("HyperliquidLister: no messages in channel {} stale in {} seconds; reconnecting...", channel, STALE_TIMEOUT_SECONDS) raise StaleConnectionError(f"No messages in channel {channel} in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...") + + # ping if we need to + if now - last_ping_timestamp > self.ws_ping_interval: + await ws.send(json.dumps({"method": "ping"})) + last_ping_timestamp = now except asyncio.TimeoutError: logger.warning("HyperliquidListener: No messages overall in {} seconds, reconnecting...", STALE_TIMEOUT_SECONDS) raise StaleConnectionError(f"No messages overall in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...") - except websockets.ConnectionClosed: - logger.warning("HyperliquidListener: Connection closed, reconnecting...") + except websockets.ConnectionClosed as e: + rc, rr = e.rcvd.code if e.rcvd else None, e.rcvd.reason if e.rcvd else None + logger.warning("HyperliquidListener: Websocket connection closed (code={} reason={}); reconnecting...", rc, rr) raise except json.JSONDecodeError as e: logger.error("Failed to decode JSON message: {} error: {}", message, e) diff --git a/apps/hip-3-pusher/src/pusher/metrics.py b/apps/hip-3-pusher/src/pusher/metrics.py index 3fe3fe10ab..c73ee374c9 100644 --- a/apps/hip-3-pusher/src/pusher/metrics.py +++ b/apps/hip-3-pusher/src/pusher/metrics.py @@ -48,7 +48,7 @@ def _init_metrics(self): name="hip_3_relayer_price_config", description="Price source config", ) - # labels: user + # labels: dex, user self.user_request_balance = self.meter.create_gauge( name="hip_3_relayer_user_request_balance", description="Number of update requests left before rate limit", diff --git a/apps/hip-3-pusher/src/pusher/user_limit_listener.py b/apps/hip-3-pusher/src/pusher/user_limit_listener.py index 0cffdba3ef..18d8c24f20 100644 --- a/apps/hip-3-pusher/src/pusher/user_limit_listener.py +++ b/apps/hip-3-pusher/src/pusher/user_limit_listener.py @@ -15,6 +15,7 @@ def __init__(self, config: Config, metrics: Metrics, address: str): self.address = address.lower() self.metrics = metrics self.interval = config.hyperliquid.user_limit_interval + self.dex = config.hyperliquid.market_name base_url = constants.TESTNET_API_URL if config.hyperliquid.use_testnet else constants.MAINNET_API_URL self.info = Info(base_url=base_url, skip_ws=True, meta=Meta(universe=[]), spot_meta=SpotMeta(universe=[], tokens=[])) @@ -27,7 +28,7 @@ async def run(self): logger.debug("userRateLimit response: {}", response) balance = response["nRequestsSurplus"] - response["nRequestsCap"] - response["nRequestsUsed"] logger.debug("userRateLimit user: {} balance: {}", self.address, balance) - self.metrics.user_request_balance.set(balance, {"user": self.address}) + self.metrics.user_request_balance.set(balance, {"dex": self.dex, "user": self.address}) except Exception as e: logger.error("userRateLimit query failed: {}", e) await asyncio.sleep(self.interval) From d5d12a34ec232cc0e1ca14a163719076bf237a13 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Wed, 10 Dec 2025 17:33:32 -0600 Subject: [PATCH 06/11] wait fixed 1s between all websocket connections --- apps/hip-3-pusher/src/pusher/hermes_listener.py | 4 ++-- apps/hip-3-pusher/src/pusher/hyperliquid_listener.py | 4 ++-- apps/hip-3-pusher/src/pusher/lazer_listener.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/hip-3-pusher/src/pusher/hermes_listener.py b/apps/hip-3-pusher/src/pusher/hermes_listener.py index 9527e02f33..f24454c35a 100644 --- a/apps/hip-3-pusher/src/pusher/hermes_listener.py +++ b/apps/hip-3-pusher/src/pusher/hermes_listener.py @@ -3,7 +3,7 @@ from loguru import logger import time import websockets -from tenacity import retry, retry_if_exception_type, wait_exponential +from tenacity import retry, retry_if_exception_type, wait_fixed from pusher.config import Config, STALE_TIMEOUT_SECONDS from pusher.exception import StaleConnectionError @@ -38,7 +38,7 @@ async def subscribe_all(self): @retry( retry=retry_if_exception_type(Exception), - wait=wait_exponential(multiplier=1, min=1, max=10), + wait=wait_fixed(1), reraise=True, ) async def subscribe_single(self, url): diff --git a/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py b/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py index fa0cb9da55..7c6eb5dc18 100644 --- a/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py +++ b/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py @@ -4,7 +4,7 @@ import websockets from loguru import logger -from tenacity import retry, retry_if_exception_type, wait_exponential +from tenacity import retry, retry_if_exception_type, wait_fixed import time from pusher.config import Config, STALE_TIMEOUT_SECONDS @@ -51,7 +51,7 @@ async def subscribe_all(self): @retry( retry=retry_if_exception_type(Exception), - wait=wait_exponential(multiplier=1, min=1, max=10), + wait=wait_fixed(1), reraise=True, ) async def subscribe_single(self, url): diff --git a/apps/hip-3-pusher/src/pusher/lazer_listener.py b/apps/hip-3-pusher/src/pusher/lazer_listener.py index 3af86ae74b..c2e0f23343 100644 --- a/apps/hip-3-pusher/src/pusher/lazer_listener.py +++ b/apps/hip-3-pusher/src/pusher/lazer_listener.py @@ -3,7 +3,7 @@ from loguru import logger import time import websockets -from tenacity import retry, retry_if_exception_type, wait_exponential +from tenacity import retry, retry_if_exception_type, wait_fixed from pusher.config import Config, STALE_TIMEOUT_SECONDS from pusher.exception import StaleConnectionError @@ -42,7 +42,7 @@ async def subscribe_all(self): @retry( retry=retry_if_exception_type(Exception), - wait=wait_exponential(multiplier=1, min=1, max=10), + wait=wait_fixed(1), reraise=True, ) async def subscribe_single(self, router_url): From 5f7f7544f3983f911f3e909e682c3c34b454d4f5 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Wed, 10 Dec 2025 22:50:46 -0600 Subject: [PATCH 07/11] populate balance metric more frequently --- .../src/pusher/user_limit_listener.py | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/apps/hip-3-pusher/src/pusher/user_limit_listener.py b/apps/hip-3-pusher/src/pusher/user_limit_listener.py index 18d8c24f20..c8398fa908 100644 --- a/apps/hip-3-pusher/src/pusher/user_limit_listener.py +++ b/apps/hip-3-pusher/src/pusher/user_limit_listener.py @@ -1,4 +1,5 @@ import asyncio +import time from hyperliquid.utils.types import SpotMeta, Meta from loguru import logger @@ -22,16 +23,27 @@ def __init__(self, config: Config, metrics: Metrics, address: str): async def run(self): logger.info("Starting user limit listener url: {} address: {} interval: {}", self.info.base_url, self.address, self.interval) + most_recent_timestamp = None + most_recent_balance = None + while True: try: - response = await asyncio.to_thread(self._request) - logger.debug("userRateLimit response: {}", response) - balance = response["nRequestsSurplus"] - response["nRequestsCap"] - response["nRequestsUsed"] - logger.debug("userRateLimit user: {} balance: {}", self.address, balance) - self.metrics.user_request_balance.set(balance, {"dex": self.dex, "user": self.address}) + now = time.time() + if not most_recent_timestamp or now - most_recent_timestamp > self.interval: + response = await asyncio.to_thread(self._request) + logger.debug("userRateLimit response: {}", response) + new_balance = response["nRequestsSurplus"] + response["nRequestsCap"] - response["nRequestsUsed"] + logger.debug("userRateLimit user: {} balance: {}", self.address, new_balance) + + most_recent_timestamp = now + most_recent_balance = new_balance + + self.metrics.user_request_balance.set(most_recent_balance, {"dex": self.dex, "user": self.address}) except Exception as e: logger.error("userRateLimit query failed: {}", e) - await asyncio.sleep(self.interval) + + # want to update every 60s to keep metric populated in Grafana + await asyncio.sleep(60) def _request(self): return self.info.user_rate_limit(self.address) From a30b42ba62bb2be313a3a860848988b5b8903751 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Thu, 11 Dec 2025 13:27:27 -0600 Subject: [PATCH 08/11] Use lazer/hermes message timestamps --- apps/hip-3-pusher/src/pusher/hermes_listener.py | 4 +--- apps/hip-3-pusher/src/pusher/lazer_listener.py | 7 +++---- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/apps/hip-3-pusher/src/pusher/hermes_listener.py b/apps/hip-3-pusher/src/pusher/hermes_listener.py index f24454c35a..32fc1c1234 100644 --- a/apps/hip-3-pusher/src/pusher/hermes_listener.py +++ b/apps/hip-3-pusher/src/pusher/hermes_listener.py @@ -1,7 +1,6 @@ import asyncio import json from loguru import logger -import time import websockets from tenacity import retry, retry_if_exception_type, wait_fixed @@ -86,7 +85,6 @@ def parse_hermes_message(self, data): expo = price_object["expo"] publish_time = price_object["publish_time"] logger.debug("Hermes update: {} {} {} {}", id, price, expo, publish_time) - now = time.time() - self.hermes_state.put(id, PriceUpdate(price, now)) + self.hermes_state.put(id, PriceUpdate(price, publish_time)) except Exception as e: logger.error("parse_hermes_message error: {}", e) diff --git a/apps/hip-3-pusher/src/pusher/lazer_listener.py b/apps/hip-3-pusher/src/pusher/lazer_listener.py index c2e0f23343..298e93a4fe 100644 --- a/apps/hip-3-pusher/src/pusher/lazer_listener.py +++ b/apps/hip-3-pusher/src/pusher/lazer_listener.py @@ -1,7 +1,6 @@ import asyncio import json from loguru import logger -import time import websockets from tenacity import retry, retry_if_exception_type, wait_fixed @@ -88,14 +87,14 @@ def parse_lazer_message(self, data): if data.get("type", "") != "streamUpdated": return price_feeds = data["parsed"]["priceFeeds"] - logger.debug("price_feeds: {}", price_feeds) - now = time.time() + timestamp = int(data["parsed"]["timestampUs"]) / 1_000_000.0 + logger.debug("price_feeds: {} timestamp: {}", price_feeds, timestamp) for feed_update in price_feeds: feed_id = feed_update.get("priceFeedId", None) price = feed_update.get("price", None) if feed_id is None or price is None: continue else: - self.lazer_state.put(feed_id, PriceUpdate(price, now)) + self.lazer_state.put(feed_id, PriceUpdate(price, timestamp)) except Exception as e: logger.error("parse_lazer_message error: {}", e) From e690d574472c6c1c9710c0bb3a444d93bf081c25 Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Thu, 11 Dec 2025 13:33:24 -0600 Subject: [PATCH 09/11] existing unit test updates --- apps/hip-3-pusher/tests/test_price_state.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/apps/hip-3-pusher/tests/test_price_state.py b/apps/hip-3-pusher/tests/test_price_state.py index 52e023119a..392ed0bd3f 100644 --- a/apps/hip-3-pusher/tests/test_price_state.py +++ b/apps/hip-3-pusher/tests/test_price_state.py @@ -12,6 +12,7 @@ def get_config(): config: Config = Config.model_construct() config.stale_price_threshold_seconds = 5 config.hyperliquid = HyperliquidConfig.model_construct() + config.hyperliquid.market_name = "pyth" config.hyperliquid.asset_context_symbols = [SYMBOL] config.lazer = LazerConfig.model_construct() config.lazer.feed_ids = [1, 8] @@ -44,8 +45,8 @@ def test_good_hl_price(): now = time.time() price_state.hl_oracle_state.put(SYMBOL, PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds / 2.0)) - oracle_px, _, _ = price_state.get_all_prices(DEX) - assert oracle_px == {f"{DEX}:{SYMBOL}": "110000.0"} + oracle_update = price_state.get_all_prices() + assert oracle_update.oracle == {f"{DEX}:{SYMBOL}": "110000.0"} def test_fallback_lazer(): @@ -59,8 +60,8 @@ def test_fallback_lazer(): price_state.lazer_state.put(1, PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds / 2.0)) price_state.lazer_state.put(8, PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0)) - oracle_px, _, _ = price_state.get_all_prices(DEX) - assert oracle_px == {f"{DEX}:{SYMBOL}": "111616.16"} + oracle_update = price_state.get_all_prices() + assert oracle_update.oracle == {f"{DEX}:{SYMBOL}": "111616.16"} @@ -79,8 +80,8 @@ def test_fallback_hermes(): price_state.hermes_state.put("2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b", PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds / 2.0)) - oracle_px, _, _ = price_state.get_all_prices(DEX) - assert oracle_px == {f"{DEX}:{SYMBOL}": "113265.31"} + oracle_update = price_state.get_all_prices() + assert oracle_update.oracle == {f"{DEX}:{SYMBOL}": "113265.31"} def test_all_fail(): @@ -98,5 +99,5 @@ def test_all_fail(): price_state.hermes_state.put("2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b", PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds - 1.0)) - oracle_px, _, _ = price_state.get_all_prices(DEX) - assert oracle_px == {} + oracle_update = price_state.get_all_prices() + assert oracle_update.oracle == {} From 23419c9d7bf9c97e8ae82b16406c3cb09f1a36ee Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Thu, 11 Dec 2025 14:11:40 -0600 Subject: [PATCH 10/11] remove pair source rounding as Hyperliquid handles it --- apps/hip-3-pusher/src/pusher/price_state.py | 17 ++++++++++------- apps/hip-3-pusher/tests/test_price_state.py | 4 ++-- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/apps/hip-3-pusher/src/pusher/price_state.py b/apps/hip-3-pusher/src/pusher/price_state.py index be7773cd07..5102d4b3b7 100644 --- a/apps/hip-3-pusher/src/pusher/price_state.py +++ b/apps/hip-3-pusher/src/pusher/price_state.py @@ -85,11 +85,14 @@ def get_prices(self, symbol_configs: dict[str, list[PriceSourceConfig]], oracle_ pxs = {} for symbol in symbol_configs: for source_config in symbol_configs[symbol]: - # find first valid price in the waterfall - px = self.get_price(source_config, oracle_update) - if px is not None: - pxs[f"{self.market_name}:{symbol}"] = str(px) - break + try: + # find first valid price in the waterfall + px = self.get_price(source_config, oracle_update) + if px is not None: + pxs[f"{self.market_name}:{symbol}"] = str(px) + break + except Exception as e: + logger.exception("get_price exception for symbol: {} source_config: {} error: {}", symbol, source_config, e) return pxs def get_price(self, price_source_config: PriceSourceConfig, oracle_update: OracleUpdate): @@ -125,10 +128,10 @@ def get_price_from_pair_source(self, base_source: PriceSource, quote_source: Pri if base_price is None: return None quote_price = self.get_price_from_single_source(quote_source) - if quote_price is None: + if quote_price is None or float(quote_price) == 0: return None - return str(round(float(base_price) / float(quote_price), 2)) + return str(float(base_price) / float(quote_price)) def get_price_from_oracle_mid_average(self, symbol: str, oracle_update: OracleUpdate): oracle_price = oracle_update.oracle.get(symbol) diff --git a/apps/hip-3-pusher/tests/test_price_state.py b/apps/hip-3-pusher/tests/test_price_state.py index 392ed0bd3f..39fee65b1d 100644 --- a/apps/hip-3-pusher/tests/test_price_state.py +++ b/apps/hip-3-pusher/tests/test_price_state.py @@ -61,7 +61,7 @@ def test_fallback_lazer(): price_state.lazer_state.put(8, PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0)) oracle_update = price_state.get_all_prices() - assert oracle_update.oracle == {f"{DEX}:{SYMBOL}": "111616.16"} + assert oracle_update.oracle == {f"{DEX}:{SYMBOL}": "111616.16161616161"} @@ -81,7 +81,7 @@ def test_fallback_hermes(): PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds / 2.0)) oracle_update = price_state.get_all_prices() - assert oracle_update.oracle == {f"{DEX}:{SYMBOL}": "113265.31"} + assert oracle_update.oracle == {f"{DEX}:{SYMBOL}": "113265.30612244898"} def test_all_fail(): From 8f664de3af4e7825befa5f206ed1bf7f495a667b Mon Sep 17 00:00:00 2001 From: Mike Rolish Date: Fri, 12 Dec 2025 10:17:36 -0600 Subject: [PATCH 11/11] More pusher error types --- apps/hip-3-pusher/src/pusher/publisher.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/apps/hip-3-pusher/src/pusher/publisher.py b/apps/hip-3-pusher/src/pusher/publisher.py index 3e4816605b..eb71815cb0 100644 --- a/apps/hip-3-pusher/src/pusher/publisher.py +++ b/apps/hip-3-pusher/src/pusher/publisher.py @@ -28,8 +28,14 @@ class PushErrorReason(StrEnum): INTERNAL_ERROR = "internal_error" # Invalid nonce, if the pusher account pushes multiple transactions with the same ms timestamp INVALID_NONCE = "invalid_nonce" + # Invalid account + INVALID_DEPLOYER_ACCOUNT = "invalid_deployer_account" + # User not activated + ACCOUNT_DOES_NOT_EXIST = "account_does_not_exist" # Missing externalPerpPxs MISSING_EXTERNAL_PERP_PXS = "missing_external_perp_pxs" + # Invalid dex (e.g. pointing to mainnet or testnet by mistake) + INVALID_DEX = "invalid_dex" # Some error string we haven't categorized yet UNKNOWN = "unknown" @@ -250,6 +256,12 @@ def _get_error_reason(self, response): return PushErrorReason.INVALID_NONCE elif "externalPerpPxs missing perp" in response: return PushErrorReason.MISSING_EXTERNAL_PERP_PXS + elif "Invalid perp deployer or sub-deployer" in response: + return PushErrorReason.INVALID_DEPLOYER_ACCOUNT + elif "User or API Wallet" in response: + return PushErrorReason.ACCOUNT_DOES_NOT_EXIST + elif "Invalid perp DEX" in response: + return PushErrorReason.INVALID_DEX else: logger.warning("Unrecognized error response: {}", response) return PushErrorReason.UNKNOWN