diff --git a/apps/hip-3-pusher/pyproject.toml b/apps/hip-3-pusher/pyproject.toml index 0e06c775a5..e93c42c505 100644 --- a/apps/hip-3-pusher/pyproject.toml +++ b/apps/hip-3-pusher/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "hip-3-pusher" -version = "0.2.3" +version = "0.2.4" description = "Hyperliquid HIP-3 market oracle pusher" readme = "README.md" requires-python = "==3.13.*" diff --git a/apps/hip-3-pusher/src/pusher/config.py b/apps/hip-3-pusher/src/pusher/config.py index 74acea4e81..9bd49996fe 100644 --- a/apps/hip-3-pusher/src/pusher/config.py +++ b/apps/hip-3-pusher/src/pusher/config.py @@ -4,6 +4,8 @@ from typing import Literal STALE_TIMEOUT_SECONDS = 5 +USER_LIMIT_INTERVAL_SECONDS = 1800 +HYPERLIQUID_WS_PING_INTERVAL_SECONDS = 20 class KMSConfig(BaseModel): @@ -37,6 +39,8 @@ class HyperliquidConfig(BaseModel): publish_interval: float publish_timeout: float enable_publish: bool + user_limit_interval: int = USER_LIMIT_INTERVAL_SECONDS + ws_ping_interval: int = HYPERLIQUID_WS_PING_INTERVAL_SECONDS @model_validator(mode="after") def set_default_urls(self): diff --git a/apps/hip-3-pusher/src/pusher/hermes_listener.py b/apps/hip-3-pusher/src/pusher/hermes_listener.py index 72f481c627..32fc1c1234 100644 --- a/apps/hip-3-pusher/src/pusher/hermes_listener.py +++ b/apps/hip-3-pusher/src/pusher/hermes_listener.py @@ -1,9 +1,8 @@ import asyncio import json from loguru import logger -import time import websockets -from tenacity import retry, retry_if_exception_type, wait_exponential +from tenacity import retry, retry_if_exception_type, wait_fixed from pusher.config import Config, STALE_TIMEOUT_SECONDS from pusher.exception import StaleConnectionError @@ -37,11 +36,12 @@ async def subscribe_all(self): await asyncio.gather(*(self.subscribe_single(url) for url in self.hermes_urls)) @retry( - retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)), - wait=wait_exponential(multiplier=1, min=1, max=10), + retry=retry_if_exception_type(Exception), + wait=wait_fixed(1), reraise=True, ) async def subscribe_single(self, url): + logger.info("Starting Hermes listener loop: {}", url) return await self.subscribe_single_inner(url) async def subscribe_single_inner(self, url): @@ -58,8 +58,10 @@ async def subscribe_single_inner(self, url): data = json.loads(message) self.parse_hermes_message(data) except asyncio.TimeoutError: + logger.warning("HermesListener: No messages in {} seconds, reconnecting...", STALE_TIMEOUT_SECONDS) raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting") except websockets.ConnectionClosed: + logger.warning("HermesListener: Connection closed, reconnecting...") raise except json.JSONDecodeError as e: logger.error("Failed to decode JSON message: {}", e) @@ -83,7 +85,6 @@ def parse_hermes_message(self, data): expo = price_object["expo"] publish_time = price_object["publish_time"] logger.debug("Hermes update: {} {} {} {}", id, price, expo, publish_time) - now = time.time() - self.hermes_state.put(id, PriceUpdate(price, now)) + self.hermes_state.put(id, PriceUpdate(price, publish_time)) except Exception as e: logger.error("parse_hermes_message error: {}", e) diff --git a/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py b/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py index 60faf4c51a..7c6eb5dc18 100644 --- a/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py +++ b/apps/hip-3-pusher/src/pusher/hyperliquid_listener.py @@ -1,8 +1,10 @@ import asyncio import json +from enum import StrEnum + import websockets from loguru import logger -from tenacity import retry, retry_if_exception_type, wait_exponential +from tenacity import retry, retry_if_exception_type, wait_fixed import time from pusher.config import Config, STALE_TIMEOUT_SECONDS @@ -14,6 +16,15 @@ HYPERLIQUID_MAINNET_WS_URL = "wss://api.hyperliquid.xyz/ws" HYPERLIQUID_TESTNET_WS_URL = "wss://api.hyperliquid-testnet.xyz/ws" +class HLChannel(StrEnum): + CHANNEL_ACTIVE_ASSET_CTX = "activeAssetCtx" + CHANNEL_ALL_MIDS = "allMids" + CHANNEL_SUBSCRIPTION_RESPONSE = "subscriptionResponse" + CHANNEL_PONG = "pong" + CHANNEL_ERROR = "error" + +DATA_CHANNELS = [HLChannel.CHANNEL_ACTIVE_ASSET_CTX, HLChannel.CHANNEL_ALL_MIDS] + class HyperliquidListener: """ @@ -27,6 +38,7 @@ def __init__(self, config: Config, hl_oracle_state: PriceSourceState, hl_mark_st self.hl_oracle_state = hl_oracle_state self.hl_mark_state = hl_mark_state self.hl_mid_state = hl_mid_state + self.ws_ping_interval = config.hyperliquid.ws_ping_interval def get_subscribe_request(self, asset): return { @@ -38,11 +50,12 @@ async def subscribe_all(self): await asyncio.gather(*(self.subscribe_single(hyperliquid_ws_url) for hyperliquid_ws_url in self.hyperliquid_ws_urls)) @retry( - retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)), - wait=wait_exponential(multiplier=1, min=1, max=10), + retry=retry_if_exception_type(Exception), + wait=wait_fixed(1), reraise=True, ) async def subscribe_single(self, url): + logger.info("Starting Hyperliquid listener loop: {}", url) return await self.subscribe_single_inner(url) async def subscribe_single_inner(self, url): @@ -59,48 +72,69 @@ async def subscribe_single_inner(self, url): await ws.send(json.dumps(subscribe_all_mids_request)) logger.info("Sent subscribe request for allMids for dex: {} to {}", self.market_name, url) + now = time.time() + channel_last_message_timestamp = {channel: now for channel in HLChannel} + last_ping_timestamp = now + # listen for updates while True: try: message = await asyncio.wait_for(ws.recv(), timeout=STALE_TIMEOUT_SECONDS) data = json.loads(message) channel = data.get("channel", None) + now = time.time() if not channel: logger.error("No channel in message: {}", data) - elif channel == "subscriptionResponse": - logger.debug("Received subscription response: {}", data) - elif channel == "error": + elif channel == HLChannel.CHANNEL_SUBSCRIPTION_RESPONSE: + logger.info("Received subscription response: {}", data) + elif channel == HLChannel.CHANNEL_ERROR: logger.error("Received Hyperliquid error response: {}", data) - elif channel == "activeAssetCtx": - self.parse_hyperliquid_active_asset_ctx_update(data) - elif channel == "allMids": - self.parse_hyperliquid_all_mids_update(data) + elif channel == HLChannel.CHANNEL_ACTIVE_ASSET_CTX: + self.parse_hyperliquid_active_asset_ctx_update(data, now) + channel_last_message_timestamp[channel] = now + elif channel == HLChannel.CHANNEL_ALL_MIDS: + self.parse_hyperliquid_all_mids_update(data, now) + channel_last_message_timestamp[channel] = now + elif channel == HLChannel.CHANNEL_PONG: + logger.debug("Received pong") else: logger.error("Received unknown channel: {}", channel) + + # check for stale channels + for channel in DATA_CHANNELS: + if now - channel_last_message_timestamp[channel] > STALE_TIMEOUT_SECONDS: + logger.warning("HyperliquidLister: no messages in channel {} stale in {} seconds; reconnecting...", channel, STALE_TIMEOUT_SECONDS) + raise StaleConnectionError(f"No messages in channel {channel} in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...") + + # ping if we need to + if now - last_ping_timestamp > self.ws_ping_interval: + await ws.send(json.dumps({"method": "ping"})) + last_ping_timestamp = now except asyncio.TimeoutError: - raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...") - except websockets.ConnectionClosed: + logger.warning("HyperliquidListener: No messages overall in {} seconds, reconnecting...", STALE_TIMEOUT_SECONDS) + raise StaleConnectionError(f"No messages overall in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...") + except websockets.ConnectionClosed as e: + rc, rr = e.rcvd.code if e.rcvd else None, e.rcvd.reason if e.rcvd else None + logger.warning("HyperliquidListener: Websocket connection closed (code={} reason={}); reconnecting...", rc, rr) raise except json.JSONDecodeError as e: logger.error("Failed to decode JSON message: {} error: {}", message, e) except Exception as e: logger.error("Unexpected exception: {}", e) - def parse_hyperliquid_active_asset_ctx_update(self, message): + def parse_hyperliquid_active_asset_ctx_update(self, message, now): try: ctx = message["data"]["ctx"] symbol = message["data"]["coin"] - now = time.time() self.hl_oracle_state.put(symbol, PriceUpdate(ctx["oraclePx"], now)) self.hl_mark_state.put(symbol, PriceUpdate(ctx["markPx"], now)) logger.debug("activeAssetCtx symbol: {} oraclePx: {} markPx: {}", symbol, ctx["oraclePx"], ctx["markPx"]) except Exception as e: logger.error("parse_hyperliquid_active_asset_ctx_update error: message: {} e: {}", message, e) - def parse_hyperliquid_all_mids_update(self, message): + def parse_hyperliquid_all_mids_update(self, message, now): try: mids = message["data"]["mids"] - now = time.time() for mid in mids: self.hl_mid_state.put(mid, PriceUpdate(mids[mid], now)) logger.debug("allMids: {}", mids) diff --git a/apps/hip-3-pusher/src/pusher/lazer_listener.py b/apps/hip-3-pusher/src/pusher/lazer_listener.py index 43aa18aa34..298e93a4fe 100644 --- a/apps/hip-3-pusher/src/pusher/lazer_listener.py +++ b/apps/hip-3-pusher/src/pusher/lazer_listener.py @@ -1,9 +1,8 @@ import asyncio import json from loguru import logger -import time import websockets -from tenacity import retry, retry_if_exception_type, wait_exponential +from tenacity import retry, retry_if_exception_type, wait_fixed from pusher.config import Config, STALE_TIMEOUT_SECONDS from pusher.exception import StaleConnectionError @@ -41,11 +40,12 @@ async def subscribe_all(self): await asyncio.gather(*(self.subscribe_single(router_url) for router_url in self.lazer_urls)) @retry( - retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)), - wait=wait_exponential(multiplier=1, min=1, max=10), + retry=retry_if_exception_type(Exception), + wait=wait_fixed(1), reraise=True, ) async def subscribe_single(self, router_url): + logger.info("Starting Lazer listener loop: {}", router_url) return await self.subscribe_single_inner(router_url) async def subscribe_single_inner(self, router_url): @@ -66,8 +66,10 @@ async def subscribe_single_inner(self, router_url): data = json.loads(message) self.parse_lazer_message(data) except asyncio.TimeoutError: + logger.warning("LazerListener: No messages in {} seconds, reconnecting...", STALE_TIMEOUT_SECONDS) raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting") except websockets.ConnectionClosed: + logger.warning("LazerListener: Connection closed, reconnecting...") raise except json.JSONDecodeError as e: logger.error("Failed to decode JSON message: {}", e) @@ -85,14 +87,14 @@ def parse_lazer_message(self, data): if data.get("type", "") != "streamUpdated": return price_feeds = data["parsed"]["priceFeeds"] - logger.debug("price_feeds: {}", price_feeds) - now = time.time() + timestamp = int(data["parsed"]["timestampUs"]) / 1_000_000.0 + logger.debug("price_feeds: {} timestamp: {}", price_feeds, timestamp) for feed_update in price_feeds: feed_id = feed_update.get("priceFeedId", None) price = feed_update.get("price", None) if feed_id is None or price is None: continue else: - self.lazer_state.put(feed_id, PriceUpdate(price, now)) + self.lazer_state.put(feed_id, PriceUpdate(price, timestamp)) except Exception as e: logger.error("parse_lazer_message error: {}", e) diff --git a/apps/hip-3-pusher/src/pusher/main.py b/apps/hip-3-pusher/src/pusher/main.py index 9cbbac1d30..3f7e6b00e9 100644 --- a/apps/hip-3-pusher/src/pusher/main.py +++ b/apps/hip-3-pusher/src/pusher/main.py @@ -13,6 +13,7 @@ from pusher.price_state import PriceState from pusher.publisher import Publisher from pusher.metrics import Metrics +from pusher.user_limit_listener import UserLimitListener def load_config(): @@ -52,6 +53,7 @@ async def main(): lazer_listener = LazerListener(config, price_state.lazer_state) hermes_listener = HermesListener(config, price_state.hermes_state) seda_listener = SedaListener(config, price_state.seda_state) + user_limit_listener = UserLimitListener(config, metrics, publisher.user_limit_address) await asyncio.gather( publisher.run(), @@ -59,6 +61,7 @@ async def main(): lazer_listener.subscribe_all(), hermes_listener.subscribe_all(), seda_listener.run(), + user_limit_listener.run(), ) logger.info("Exiting hip-3-pusher..") diff --git a/apps/hip-3-pusher/src/pusher/metrics.py b/apps/hip-3-pusher/src/pusher/metrics.py index c0c3ca3f75..c73ee374c9 100644 --- a/apps/hip-3-pusher/src/pusher/metrics.py +++ b/apps/hip-3-pusher/src/pusher/metrics.py @@ -48,6 +48,11 @@ def _init_metrics(self): name="hip_3_relayer_price_config", description="Price source config", ) + # labels: dex, user + self.user_request_balance = self.meter.create_gauge( + name="hip_3_relayer_user_request_balance", + description="Number of update requests left before rate limit", + ) def set_price_configs(self, dex: str, price_config: PriceConfig): self._set_price_config_type(dex, price_config.oracle, "oracle") diff --git a/apps/hip-3-pusher/src/pusher/price_state.py b/apps/hip-3-pusher/src/pusher/price_state.py index be7773cd07..5102d4b3b7 100644 --- a/apps/hip-3-pusher/src/pusher/price_state.py +++ b/apps/hip-3-pusher/src/pusher/price_state.py @@ -85,11 +85,14 @@ def get_prices(self, symbol_configs: dict[str, list[PriceSourceConfig]], oracle_ pxs = {} for symbol in symbol_configs: for source_config in symbol_configs[symbol]: - # find first valid price in the waterfall - px = self.get_price(source_config, oracle_update) - if px is not None: - pxs[f"{self.market_name}:{symbol}"] = str(px) - break + try: + # find first valid price in the waterfall + px = self.get_price(source_config, oracle_update) + if px is not None: + pxs[f"{self.market_name}:{symbol}"] = str(px) + break + except Exception as e: + logger.exception("get_price exception for symbol: {} source_config: {} error: {}", symbol, source_config, e) return pxs def get_price(self, price_source_config: PriceSourceConfig, oracle_update: OracleUpdate): @@ -125,10 +128,10 @@ def get_price_from_pair_source(self, base_source: PriceSource, quote_source: Pri if base_price is None: return None quote_price = self.get_price_from_single_source(quote_source) - if quote_price is None: + if quote_price is None or float(quote_price) == 0: return None - return str(round(float(base_price) / float(quote_price), 2)) + return str(float(base_price) / float(quote_price)) def get_price_from_oracle_mid_average(self, symbol: str, oracle_update: OracleUpdate): oracle_price = oracle_update.oracle.get(symbol) diff --git a/apps/hip-3-pusher/src/pusher/publisher.py b/apps/hip-3-pusher/src/pusher/publisher.py index de954b3b3f..eb71815cb0 100644 --- a/apps/hip-3-pusher/src/pusher/publisher.py +++ b/apps/hip-3-pusher/src/pusher/publisher.py @@ -2,6 +2,7 @@ from enum import StrEnum import time +from hyperliquid.utils.types import Meta, SpotMeta from loguru import logger from pathlib import Path @@ -27,6 +28,14 @@ class PushErrorReason(StrEnum): INTERNAL_ERROR = "internal_error" # Invalid nonce, if the pusher account pushes multiple transactions with the same ms timestamp INVALID_NONCE = "invalid_nonce" + # Invalid account + INVALID_DEPLOYER_ACCOUNT = "invalid_deployer_account" + # User not activated + ACCOUNT_DOES_NOT_EXIST = "account_does_not_exist" + # Missing externalPerpPxs + MISSING_EXTERNAL_PERP_PXS = "missing_external_perp_pxs" + # Invalid dex (e.g. pointing to mainnet or testnet by mistake) + INVALID_DEX = "invalid_dex" # Some error string we haven't categorized yet UNKNOWN = "unknown" @@ -50,9 +59,12 @@ def __init__(self, config: Config, price_state: PriceState, metrics: Metrics): oracle_pusher_key = Path(config.hyperliquid.oracle_pusher_key_path).read_text().strip() self.oracle_account: LocalAccount = Account.from_key(oracle_pusher_key) logger.info("oracle pusher local pubkey: {}", self.oracle_account.address) + self.user_limit_address = self.oracle_account.address self.publisher_exchanges = [Exchange(wallet=self.oracle_account, base_url=url, - timeout=config.hyperliquid.publish_timeout) + timeout=config.hyperliquid.publish_timeout, + meta=Meta(universe=[]), + spot_meta=SpotMeta(universe=[], tokens=[])) for url in self.push_urls] if config.kms.enable_kms: # TODO: Add KMS/multisig support @@ -61,11 +73,13 @@ def __init__(self, config: Config, price_state: PriceState, metrics: Metrics): self.enable_kms = True self.kms_signer = KMSSigner(config, self.publisher_exchanges) + self.user_limit_address = self.kms_signer.address if config.multisig.enable_multisig: if not config.multisig.multisig_address: raise Exception("Multisig enabled but missing multisig address") self.multisig_address = config.multisig.multisig_address + self.user_limit_address = self.multisig_address else: self.multisig_address = None @@ -81,11 +95,11 @@ async def run(self): while True: await asyncio.sleep(self.publish_interval) try: - self.publish() + await self.publish() except Exception as e: logger.exception("Publisher.publish() exception: {}", repr(e)) - def publish(self): + async def publish(self): oracle_update = self.price_state.get_all_prices() logger.debug("oracle_update: {}", oracle_update) @@ -106,13 +120,13 @@ def publish(self): external_perp_pxs=external_perp_pxs, ) elif self.multisig_address: - push_response = self._send_multisig_update( + push_response = await self._send_multisig_update( oracle_pxs=oracle_pxs, all_mark_pxs=mark_pxs, external_perp_pxs=external_perp_pxs, ) else: - push_response = self._send_update( + push_response = await self._send_update( oracle_pxs=oracle_pxs, all_mark_pxs=mark_pxs, external_perp_pxs=external_perp_pxs, @@ -129,20 +143,23 @@ def publish(self): self._record_push_interval_metric() - def _send_update(self, oracle_pxs, all_mark_pxs, external_perp_pxs): + async def _send_update(self, oracle_pxs, all_mark_pxs, external_perp_pxs): for exchange in self.publisher_exchanges: try: - return exchange.perp_deploy_set_oracle( - dex=self.market_name, - oracle_pxs=oracle_pxs, - all_mark_pxs=all_mark_pxs, - external_perp_pxs=external_perp_pxs, - ) + return await asyncio.to_thread(self._request_single, exchange, oracle_pxs, all_mark_pxs, external_perp_pxs) except Exception as e: logger.exception("perp_deploy_set_oracle exception for endpoint: {} error: {}", exchange.base_url, repr(e)) raise PushError("all push endpoints failed") + def _request_single(self, exchange, oracle_pxs, all_mark_pxs, external_perp_pxs): + return exchange.perp_deploy_set_oracle( + dex=self.market_name, + oracle_pxs=oracle_pxs, + all_mark_pxs=all_mark_pxs, + external_perp_pxs=external_perp_pxs, + ) + def _handle_response(self, response, symbols: list[str]): logger.debug("oracle update response: {}", response) status = response.get("status") @@ -172,10 +189,10 @@ def _record_push_interval_metric(self): self.last_push_time = now logger.debug("Push interval: {}", push_interval) - def _send_multisig_update(self, oracle_pxs, all_mark_pxs, external_perp_pxs): + async def _send_multisig_update(self, oracle_pxs, all_mark_pxs, external_perp_pxs): for exchange in self.publisher_exchanges: try: - return self._send_single_multisig_update( + return await self._send_single_multisig_update( exchange=exchange, oracle_pxs=oracle_pxs, all_mark_pxs=all_mark_pxs, @@ -186,7 +203,7 @@ def _send_multisig_update(self, oracle_pxs, all_mark_pxs, external_perp_pxs): raise PushError("all push endpoints failed for multisig") - def _send_single_multisig_update(self, exchange, oracle_pxs, all_mark_pxs, external_perp_pxs): + async def _send_single_multisig_update(self, exchange, oracle_pxs, all_mark_pxs, external_perp_pxs): timestamp = get_timestamp_ms() oracle_pxs_wire = sorted(list(oracle_pxs.items())) mark_pxs_wire = [sorted(list(mark_pxs.items())) for mark_pxs in all_mark_pxs] @@ -210,6 +227,9 @@ def _send_single_multisig_update(self, exchange, oracle_pxs, all_mark_pxs, exter payload_multi_sig_user=self.multisig_address, outer_signer=self.oracle_account.address, )] + return await asyncio.to_thread(self._request_multi_sig, exchange, action, signatures, timestamp) + + def _request_multi_sig(self, exchange, action, signatures, timestamp): return exchange.multi_sig(self.multisig_address, action, signatures, timestamp) def _update_attempts_total(self, status: str, error_reason: str | None, symbols: list[str]): @@ -234,6 +254,14 @@ def _get_error_reason(self, response): return PushErrorReason.USER_LIMIT elif "Invalid nonce" in response: return PushErrorReason.INVALID_NONCE + elif "externalPerpPxs missing perp" in response: + return PushErrorReason.MISSING_EXTERNAL_PERP_PXS + elif "Invalid perp deployer or sub-deployer" in response: + return PushErrorReason.INVALID_DEPLOYER_ACCOUNT + elif "User or API Wallet" in response: + return PushErrorReason.ACCOUNT_DOES_NOT_EXIST + elif "Invalid perp DEX" in response: + return PushErrorReason.INVALID_DEX else: logger.warning("Unrecognized error response: {}", response) return PushErrorReason.UNKNOWN diff --git a/apps/hip-3-pusher/src/pusher/user_limit_listener.py b/apps/hip-3-pusher/src/pusher/user_limit_listener.py new file mode 100644 index 0000000000..c8398fa908 --- /dev/null +++ b/apps/hip-3-pusher/src/pusher/user_limit_listener.py @@ -0,0 +1,49 @@ +import asyncio +import time + +from hyperliquid.utils.types import SpotMeta, Meta +from loguru import logger + +from hyperliquid.info import Info +from hyperliquid.utils import constants + +from pusher.config import Config +from pusher.metrics import Metrics + + +class UserLimitListener: + def __init__(self, config: Config, metrics: Metrics, address: str): + self.address = address.lower() + self.metrics = metrics + self.interval = config.hyperliquid.user_limit_interval + self.dex = config.hyperliquid.market_name + + base_url = constants.TESTNET_API_URL if config.hyperliquid.use_testnet else constants.MAINNET_API_URL + self.info = Info(base_url=base_url, skip_ws=True, meta=Meta(universe=[]), spot_meta=SpotMeta(universe=[], tokens=[])) + + async def run(self): + logger.info("Starting user limit listener url: {} address: {} interval: {}", self.info.base_url, self.address, self.interval) + most_recent_timestamp = None + most_recent_balance = None + + while True: + try: + now = time.time() + if not most_recent_timestamp or now - most_recent_timestamp > self.interval: + response = await asyncio.to_thread(self._request) + logger.debug("userRateLimit response: {}", response) + new_balance = response["nRequestsSurplus"] + response["nRequestsCap"] - response["nRequestsUsed"] + logger.debug("userRateLimit user: {} balance: {}", self.address, new_balance) + + most_recent_timestamp = now + most_recent_balance = new_balance + + self.metrics.user_request_balance.set(most_recent_balance, {"dex": self.dex, "user": self.address}) + except Exception as e: + logger.error("userRateLimit query failed: {}", e) + + # want to update every 60s to keep metric populated in Grafana + await asyncio.sleep(60) + + def _request(self): + return self.info.user_rate_limit(self.address) diff --git a/apps/hip-3-pusher/tests/test_price_state.py b/apps/hip-3-pusher/tests/test_price_state.py index 52e023119a..39fee65b1d 100644 --- a/apps/hip-3-pusher/tests/test_price_state.py +++ b/apps/hip-3-pusher/tests/test_price_state.py @@ -12,6 +12,7 @@ def get_config(): config: Config = Config.model_construct() config.stale_price_threshold_seconds = 5 config.hyperliquid = HyperliquidConfig.model_construct() + config.hyperliquid.market_name = "pyth" config.hyperliquid.asset_context_symbols = [SYMBOL] config.lazer = LazerConfig.model_construct() config.lazer.feed_ids = [1, 8] @@ -44,8 +45,8 @@ def test_good_hl_price(): now = time.time() price_state.hl_oracle_state.put(SYMBOL, PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds / 2.0)) - oracle_px, _, _ = price_state.get_all_prices(DEX) - assert oracle_px == {f"{DEX}:{SYMBOL}": "110000.0"} + oracle_update = price_state.get_all_prices() + assert oracle_update.oracle == {f"{DEX}:{SYMBOL}": "110000.0"} def test_fallback_lazer(): @@ -59,8 +60,8 @@ def test_fallback_lazer(): price_state.lazer_state.put(1, PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds / 2.0)) price_state.lazer_state.put(8, PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0)) - oracle_px, _, _ = price_state.get_all_prices(DEX) - assert oracle_px == {f"{DEX}:{SYMBOL}": "111616.16"} + oracle_update = price_state.get_all_prices() + assert oracle_update.oracle == {f"{DEX}:{SYMBOL}": "111616.16161616161"} @@ -79,8 +80,8 @@ def test_fallback_hermes(): price_state.hermes_state.put("2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b", PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds / 2.0)) - oracle_px, _, _ = price_state.get_all_prices(DEX) - assert oracle_px == {f"{DEX}:{SYMBOL}": "113265.31"} + oracle_update = price_state.get_all_prices() + assert oracle_update.oracle == {f"{DEX}:{SYMBOL}": "113265.30612244898"} def test_all_fail(): @@ -98,5 +99,5 @@ def test_all_fail(): price_state.hermes_state.put("2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b", PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds - 1.0)) - oracle_px, _, _ = price_state.get_all_prices(DEX) - assert oracle_px == {} + oracle_update = price_state.get_all_prices() + assert oracle_update.oracle == {} diff --git a/apps/hip-3-pusher/uv.lock b/apps/hip-3-pusher/uv.lock index 488cd0f4ec..d629b998bd 100644 --- a/apps/hip-3-pusher/uv.lock +++ b/apps/hip-3-pusher/uv.lock @@ -351,7 +351,7 @@ wheels = [ [[package]] name = "hip-3-pusher" -version = "0.2.3" +version = "0.2.4" source = { editable = "." } dependencies = [ { name = "boto3" },