diff --git a/quarkchain/cluster/jsonrpc.py b/quarkchain/cluster/jsonrpc.py index 8106fed1f..477b4d8fb 100644 --- a/quarkchain/cluster/jsonrpc.py +++ b/quarkchain/cluster/jsonrpc.py @@ -7,11 +7,7 @@ import websockets import rlp from aiohttp import web -from async_armor import armor from decorator import decorator -from jsonrpcserver import config -from jsonrpcserver.async_methods import AsyncMethods -from jsonrpcserver.exceptions import InvalidParams, InvalidRequest, ServerError from quarkchain.cluster.master import MasterServer from quarkchain.cluster.rpc import AccountBranchData @@ -38,6 +34,7 @@ import uuid from quarkchain.cluster.log_filter import LogFilter from quarkchain.cluster.subscription import SUB_LOGS +from quarkchain.cluster.jsonrpc_server import RpcMethods, InvalidParams # defaults DEFAULT_STARTGAS = 100 * 1000 @@ -47,13 +44,9 @@ # TODO: revisit this parameter JSON_RPC_CLIENT_REQUEST_MAX_SIZE = 16 * 1024 * 1024 -# Disable jsonrpcserver logging -config.log_requests = False -config.log_responses = False EMPTY_TX_ID = "0x" + "0" * Constant.TX_ID_HEX_LENGTH - def quantity_decoder(hex_str, allow_optional=False): """Decode `hexStr` representing a quantity.""" if allow_optional and hex_str is None: @@ -463,8 +456,8 @@ def _parse_log_request( return addresses, topics -public_methods = AsyncMethods() -private_methods = AsyncMethods() +public_methods = RpcMethods() +private_methods = RpcMethods() # noinspection PyPep8Naming @@ -495,7 +488,7 @@ def start_private_server(cls, env, master_server): @classmethod def start_test_server(cls, env, master_server): - methods = AsyncMethods() + methods = RpcMethods() for method in public_methods.values(): methods.add(method) for method in private_methods.values(): @@ -511,7 +504,7 @@ def start_test_server(cls, env, master_server): return server def __init__( - self, env, master_server: MasterServer, port, host, methods: AsyncMethods + self, env, master_server: MasterServer, port, host, methods: RpcMethods ): self.loop = asyncio.get_event_loop() self.port = port @@ -521,7 +514,7 @@ def __init__( self.counters = dict() # Bind RPC handler functions to this instance - self.handlers = AsyncMethods() + self.handlers = RpcMethods() for rpc_name in methods: func = methods[rpc_name] self.handlers[rpc_name] = func.__get__(self, self.__class__) @@ -540,14 +533,14 @@ async def __handle(self, request): self.counters[method] += 1 else: self.counters[method] = 1 - # Use armor to prevent the handler from being cancelled when + # Use asyncio.shield to prevent the handler from being cancelled when # aiohttp server loses connection to client - response = await armor(self.handlers.dispatch(request)) + response = await asyncio.shield(self.handlers.dispatch(d)) + if response is None: + return web.Response() if "error" in response: Logger.error(response) - if response.is_notification: - return web.Response() - return web.json_response(response, status=response.http_status) + return web.json_response(response) def start(self): app = web.Application(client_max_size=JSON_RPC_CLIENT_REQUEST_MAX_SIZE) @@ -1464,7 +1457,7 @@ def start_websocket_server(cls, env, slave_server): return server def __init__( - self, env, slave_server: SlaveServer, port, host, methods: AsyncMethods + self, env, slave_server: SlaveServer, port, host, methods: RpcMethods ): self.loop = asyncio.get_event_loop() self.port = port @@ -1475,14 +1468,14 @@ def __init__( self.pending_tx_cache = LRUCache(maxsize=1024) # Bind RPC handler functions to this instance - self.handlers = AsyncMethods() + self.handlers = RpcMethods() for rpc_name in methods: func = methods[rpc_name] self.handlers[rpc_name] = func.__get__(self, self.__class__) self.shard_subscription_managers = self.slave.shard_subscription_managers - async def __handle(self, websocket, path): + async def __handle(self, websocket): sub_ids = dict() # per-websocket var, Dict[sub_id, full_shard_id] try: async for message in websocket: @@ -1501,7 +1494,7 @@ async def __handle(self, websocket, path): msg_id = d.get("id", 0) response = await self.handlers.dispatch( - message, + d, context={ "websocket": websocket, "msg_id": msg_id, @@ -1509,6 +1502,8 @@ async def __handle(self, websocket, path): }, ) + if response is None: + continue if "error" in response: Logger.error(response) else: @@ -1519,8 +1514,7 @@ async def __handle(self, websocket, path): elif method == "unsubscribe": sub_id = d.get("params")[0] del sub_ids[sub_id] - if not response.is_notification: - await websocket.send(json.dumps(response)) + await websocket.send(json.dumps(response)) finally: # current websocket connection terminates, remove subscribers in this connection for sub_id, full_shard_id in sub_ids.items(): try: @@ -1536,7 +1530,8 @@ def start(self): self.loop.run_until_complete(start_server) def shutdown(self): - pass # TODO + if hasattr(self, '_server') and self._server is not None: + self._server.close() @staticmethod def response_transcoder(sub_id, result): diff --git a/quarkchain/cluster/jsonrpc_server.py b/quarkchain/cluster/jsonrpc_server.py new file mode 100644 index 000000000..b56cda0dc --- /dev/null +++ b/quarkchain/cluster/jsonrpc_server.py @@ -0,0 +1,173 @@ +import inspect +import logging +from typing import Any, Callable, Dict, Optional, Awaitable + +from aiohttp import web + +logger = logging.getLogger(__name__) + + +class JsonRpcError(Exception): + code = -32000 + message = "Server error" + + def __init__(self, message=None, data=None): + super().__init__(message or self.message) + self.message = message or self.message + self.data = data + + def to_dict(self): + error = { + "code": self.code, + "message": self.message, + } + if self.data is not None: + error["data"] = self.data + return error + +class InvalidRequest(JsonRpcError): + code = -32600 + message = "Invalid Request" + +class MethodNotFound(JsonRpcError): + code = -32601 + message = "Method not found" + +class InvalidParams(JsonRpcError): + code = -32602 + message = "Invalid params" + +class ServerError(JsonRpcError): + code = -32000 + message = "Server error" + + +class RpcMethods: + def __init__(self): + self._methods: Dict[str, Callable[..., Awaitable[Any]]] = {} + + # ========== dict ========== + def __iter__(self): + return iter(self._methods) + + def __getitem__(self, key): + return self._methods[key] + + def __setitem__(self, key, value): + self._methods[key] = value + + def items(self): + return self._methods.items() + + def keys(self): + return self._methods.keys() + + def values(self): + return self._methods.values() + + # ========== decorator ========== + def add(self, func: Callable[..., Awaitable[Any]] = None, *, name: str = None): + """ + Usage: + + @methods.add + async def foo(...): + + or: + + @methods.add(name="customName") + async def foo(...): + """ + if func is None: + def wrapper(f): + method_name = name or f.__name__ + self._methods[method_name] = f + return f + return wrapper + + method_name = name or func.__name__ + self._methods[method_name] = func + return func + + async def dispatch(self, request_json: Dict[str, Any], context=None) -> Optional[Dict[str, Any]]: + req_id = None + + try: + if not isinstance(request_json, dict): + raise InvalidRequest("Request must be object") + + req_id = request_json.get("id") + + if request_json.get("jsonrpc") != "2.0": + raise InvalidRequest("Invalid JSON-RPC version") + + method = request_json.get("method") + if not isinstance(method, str): + raise InvalidRequest("Method must be string") + + is_notification = "id" not in request_json + + if method not in self._methods: + raise MethodNotFound() + + handler = self._methods[method] + params = request_json.get("params", []) + + # Check if handler accepts a context parameter + sig = inspect.signature(handler) + pass_context = context is not None and "context" in sig.parameters + + if isinstance(params, list): + result = await handler(*params, context=context) if pass_context else await handler(*params) + elif isinstance(params, dict): + result = await handler(**params, context=context) if pass_context else await handler(**params) + else: + raise InvalidParams() + + if is_notification: + return None + + return { + "jsonrpc": "2.0", + "result": result, + "id": req_id, + } + + except JsonRpcError as e: + return { + "jsonrpc": "2.0", + "error": e.to_dict(), + "id": req_id, + } + + except TypeError as e: + # Could be missing/extra arguments → treat as invalid params + return { + "jsonrpc": "2.0", + "error": { + "code": -32602, + "message": str(e), + }, + "id": req_id, + } + except Exception: + logger.exception("Internal JSON-RPC error for method %s", method) + return { + "jsonrpc": "2.0", + "error": { + "code": -32603, + "message": "Internal error", + }, + "id": req_id, + } + + async def aiohttp_handler(self, request: web.Request) -> web.Response: + body = await request.json() + + # support batch + if isinstance(body, list): + responses = [await self.dispatch(item) for item in body] + return web.json_response(responses) + + response = await self.dispatch(body) + return web.json_response(response) diff --git a/quarkchain/cluster/prom.py b/quarkchain/cluster/prom.py index 51e58ce87..d70d7a4de 100644 --- a/quarkchain/cluster/prom.py +++ b/quarkchain/cluster/prom.py @@ -15,11 +15,6 @@ print("======") raise e -import jsonrpcclient - -# Disable jsonrpcclient verbose logging. -logging.getLogger("jsonrpcclient.client.request").setLevel(logging.WARNING) -logging.getLogger("jsonrpcclient.client.response").setLevel(logging.WARNING) TIMEOUT = 10 fetcher = None @@ -54,9 +49,7 @@ def get_highest() -> int: global fetcher assert isinstance(fetcher, Fetcher) - res = fetcher.cli.send( - jsonrpcclient.Request("getRootBlockByHeight"), timeout=TIMEOUT - ) + res = fetcher.cli.call("getRootBlockByHeight") if not res: raise RuntimeError("Failed to get latest block height") return int(res["height"], 16) diff --git a/quarkchain/cluster/subscription.py b/quarkchain/cluster/subscription.py index 11b6c84a7..eba13e644 100644 --- a/quarkchain/cluster/subscription.py +++ b/quarkchain/cluster/subscription.py @@ -1,9 +1,8 @@ import asyncio import json -from typing import List, Dict, Tuple, Optional, Callable +from typing import Any, List, Dict, Tuple, Optional, Callable -from jsonrpcserver.exceptions import InvalidParams -from websockets import WebSocketServerProtocol +from quarkchain.cluster.jsonrpc_server import InvalidParams from quarkchain.core import MinorBlock @@ -20,7 +19,7 @@ def __init__(self): SUB_NEW_PENDING_TX: {}, SUB_LOGS: {}, SUB_SYNC: {}, - } # type: Dict[str, Dict[str, WebSocketServerProtocol]] + } # type: Dict[str, Dict[str, Any]] self.log_filter_gen = {} # type: Dict[str, Callable] def add_subscriber(self, sub_type, sub_id, conn, extra=None): diff --git a/quarkchain/cluster/tests/test_jsonrpc.py b/quarkchain/cluster/tests/test_jsonrpc.py index 5a4050e98..7b93ba682 100644 --- a/quarkchain/cluster/tests/test_jsonrpc.py +++ b/quarkchain/cluster/tests/test_jsonrpc.py @@ -1,12 +1,8 @@ -import asyncio import json -import logging import unittest from contextlib import contextmanager -import aiohttp -from jsonrpcclient.aiohttp_client import aiohttpClient -from jsonrpcclient.exceptions import ReceivedErrorResponse +import rlp import websockets from quarkchain.cluster.cluster_config import ClusterConfig @@ -36,11 +32,7 @@ from quarkchain.evm.messages import mk_contract_address from quarkchain.evm.transactions import Transaction as EvmTransaction from quarkchain.utils import call_async, sha3_256, token_id_encode - - -# disable jsonrpcclient verbose logging -logging.getLogger("jsonrpcclient.client.request").setLevel(logging.WARNING) -logging.getLogger("jsonrpcclient.client.response").setLevel(logging.WARNING) +from quarkchain.jsonrpc_client import AsyncJsonRpcClient, JsonRpcError @contextmanager @@ -57,14 +49,10 @@ def jrpc_http_server_context(master): server.shutdown() -def send_request(*args): - async def __send_request(*args): - async with aiohttp.ClientSession(loop=asyncio.get_event_loop()) as session: - client = aiohttpClient(session, "http://localhost:38391") - response = await client.request(*args) - return response +rpc_client = AsyncJsonRpcClient("http://localhost:38391") - return call_async(__send_request(*args)) +def send_request(method, *args): + return call_async(rpc_client.call(method, *args)) class TestJSONRPCHttp(unittest.TestCase): @@ -852,12 +840,12 @@ def test_getLogs(self): # no filter object as wild cards resp = req({}) self.assertEqual(1, len(resp)) - self.assertDictContainsSubset(expected_log_parts, resp[0]) + self.assertTrue(expected_log_parts.items() <= resp[0].items()) # filter with from/to blocks resp = req({"fromBlock": "0x0", "toBlock": "0x1"}) self.assertEqual(1, len(resp)) - self.assertDictContainsSubset(expected_log_parts, resp[0]) + self.assertTrue(expected_log_parts.items() <= resp[0].items()) resp = req({"fromBlock": "0x0", "toBlock": "0x0"}) self.assertEqual(0, len(resp)) @@ -893,7 +881,7 @@ def test_getLogs(self): for f in (filter_obj, filter_obj_nested): resp = req(f) self.assertEqual(1, len(resp)) - self.assertDictContainsSubset(expected_log_parts, resp[0]) + self.assertTrue(expected_log_parts.items() <= resp[0].items()) self.assertEqual( "0xa9378d5bd800fae4d5b8d4c6712b2b64e8ecc86fdc831cb51944000fc7c8ecfa", resp[0]["topics"][0], @@ -927,13 +915,13 @@ def test_getLogs(self): expected_log_parts["transactionIndex"] = "0x3" # after root block coinbase expected_log_parts["transactionHash"] = "0x" + tx.get_hash().hex() expected_log_parts["blockHash"] = "0x" + block.header.get_hash().hex() - self.assertDictContainsSubset(expected_log_parts, resp[0]) + self.assertTrue(expected_log_parts.items() <= resp[0].items()) self.assertEqual(2, len(resp[0]["topics"])) # missing shard ID should fail for endpoint in ("getLogs", "eth_getLogs"): - with self.assertRaises(ReceivedErrorResponse): + with self.assertRaises(JsonRpcError): send_request(endpoint, [{}]) - with self.assertRaises(ReceivedErrorResponse): + with self.assertRaises(JsonRpcError): send_request(endpoint, [{}, None]) def test_estimateGas(self): @@ -1210,6 +1198,395 @@ def test_createTransactions(self): send_request("createTransactions", {"numTxPerShard": 1, "xShardPercent": 0}) + async def test_echoQuantity(self): + id1 = Identity.create_random_identity() + acc1 = Address.create_from_identity(id1, full_shard_key=0) + + async with ClusterContext( + 1, acc1, small_coinbase=True + ) as clusters, jrpc_http_server_context(clusters[0].master): + resp = await send_request("echoQuantity", ["0x1234"]) + self.assertEqual(resp, "0x1234") + + resp = await send_request("echoQuantity", ["0x0"]) + self.assertEqual(resp, "0x0") + + async def test_echoData(self): + id1 = Identity.create_random_identity() + acc1 = Address.create_from_identity(id1, full_shard_key=0) + + async with ClusterContext( + 1, acc1, small_coinbase=True + ) as clusters, jrpc_http_server_context(clusters[0].master): + resp = await send_request("echoData", ["0xdeadbeef"]) + self.assertEqual(resp, "0xdeadbeef") + + async def test_networkInfo(self): + id1 = Identity.create_random_identity() + acc1 = Address.create_from_identity(id1, full_shard_key=0) + + async with ClusterContext( + 1, acc1, small_coinbase=True + ) as clusters, jrpc_http_server_context(clusters[0].master): + master = clusters[0].master + resp = await send_request("networkInfo") + self.assertEqual( + resp["networkId"], + quantity_encoder(master.env.quark_chain_config.NETWORK_ID), + ) + self.assertEqual( + resp["chainSize"], + quantity_encoder(master.env.quark_chain_config.CHAIN_SIZE), + ) + self.assertEqual(len(resp["shardSizes"]), master.env.quark_chain_config.CHAIN_SIZE) + self.assertFalse(resp["syncing"]) + self.assertFalse(resp["mining"]) + self.assertEqual(resp["shardServerCount"], len(master.slave_pool)) + + async def test_getAccountData(self): + id1 = Identity.create_random_identity() + acc1 = Address.create_from_identity(id1, full_shard_key=0) + + async with ClusterContext( + 1, acc1, small_coinbase=True + ) as clusters, jrpc_http_server_context(clusters[0].master): + master = clusters[0].master + + # without include_shards + resp = await send_request( + "getAccountData", ["0x" + acc1.serialize().hex()] + ) + primary = resp["primary"] + self.assertEqual(primary["transactionCount"], "0x0") + self.assertFalse(primary["isContract"]) + self.assertEqual( + primary["balances"], + [{"tokenId": "0x8bb0", "tokenStr": "QKC", "balance": "0xf4240"}], + ) + + # with include_shards + resp = await send_request( + "getAccountData", ["0x" + acc1.serialize().hex(), None, True] + ) + self.assertIsNotNone(resp["primary"]) + # should have one entry per shard + self.assertEqual( + len(resp["shards"]), + len(master.env.quark_chain_config.shards), + ) + + async def test_sendRawTransaction(self): + id1 = Identity.create_random_identity() + acc1 = Address.create_from_identity(id1, full_shard_key=0) + acc2 = Address.create_random_account(full_shard_key=0) + + async with ClusterContext( + 1, acc1, small_coinbase=True + ) as clusters, jrpc_http_server_context(clusters[0].master): + slaves = clusters[0].slave_list + master = clusters[0].master + + evm_tx = EvmTransaction( + nonce=0, + gasprice=6, + startgas=30000, + to=acc2.recipient, + value=15, + data=b"", + from_full_shard_key=acc1.full_shard_key, + to_full_shard_key=acc2.full_shard_key, + network_id=slaves[0].env.quark_chain_config.NETWORK_ID, + gas_token_id=master.env.quark_chain_config.genesis_token, + transfer_token_id=master.env.quark_chain_config.genesis_token, + ) + evm_tx.sign(id1.get_key()) + + raw_tx_data = "0x" + rlp.encode(evm_tx).hex() + response = await send_request("sendRawTransaction", [raw_tx_data]) + tx = TypedTransaction(SerializedEvmTransaction.from_evm_tx(evm_tx)) + self.assertEqual(response, "0x" + tx.get_hash().hex() + "00000000") + + state = clusters[0].get_shard_state(2 | 0) + self.assertEqual(len(state.tx_queue), 1) + + # eth_sendRawTransaction should also work + evm_tx2 = EvmTransaction( + nonce=1, + gasprice=6, + startgas=30000, + to=acc2.recipient, + value=10, + data=b"", + from_full_shard_key=acc1.full_shard_key, + to_full_shard_key=acc2.full_shard_key, + network_id=slaves[0].env.quark_chain_config.NETWORK_ID, + gas_token_id=master.env.quark_chain_config.genesis_token, + transfer_token_id=master.env.quark_chain_config.genesis_token, + ) + evm_tx2.sign(id1.get_key()) + + raw_tx_data2 = "0x" + rlp.encode(evm_tx2).hex() + response2 = await send_request("eth_sendRawTransaction", [raw_tx_data2]) + tx2 = TypedTransaction(SerializedEvmTransaction.from_evm_tx(evm_tx2)) + self.assertEqual(response2, "0x" + tx2.get_hash().hex() + "00000000") + self.assertEqual(len(state.tx_queue), 2) + + async def test_getRootBlockById(self): + id1 = Identity.create_random_identity() + acc1 = Address.create_from_identity(id1, full_shard_key=0) + + async with ClusterContext( + 1, acc1, small_coinbase=True + ) as clusters, jrpc_http_server_context(clusters[0].master): + master = clusters[0].master + + block = await master.get_next_block_to_mine( + address=acc1, branch_value=None + ) + await master.add_root_block(block) + + resp = await send_request( + "getRootBlockById", ["0x" + block.header.get_hash().hex(), True] + ) + self.assertEqual(resp["hash"], data_encoder(block.header.get_hash())) + self.assertEqual(resp["height"], quantity_encoder(block.header.height)) + self.assertEqual( + resp["miner"], + "0x" + block.header.coinbase_address.serialize().hex(), + ) + self.assertEqual( + resp["difficulty"], quantity_encoder(block.header.difficulty) + ) + + # non-existent block + resp = await send_request( + "getRootBlockById", ["0x" + "ff" * 32, True] + ) + self.assertIsNone(resp) + + async def test_getRootBlockByHeight(self): + id1 = Identity.create_random_identity() + acc1 = Address.create_from_identity(id1, full_shard_key=0) + + async with ClusterContext( + 1, acc1, small_coinbase=True + ) as clusters, jrpc_http_server_context(clusters[0].master): + master = clusters[0].master + + block = await master.get_next_block_to_mine( + address=acc1, branch_value=None + ) + await master.add_root_block(block) + + # by specific height + resp = await send_request( + "getRootBlockByHeight", [quantity_encoder(block.header.height), True] + ) + self.assertEqual(resp["hash"], data_encoder(block.header.get_hash())) + self.assertEqual(resp["height"], quantity_encoder(block.header.height)) + + # latest (no height) should return the same block + resp = await send_request("getRootBlockByHeight", [None, True]) + self.assertEqual(resp["hash"], data_encoder(block.header.get_hash())) + self.assertEqual(resp["height"], quantity_encoder(block.header.height)) + + async def test_getAllTransactions(self): + id1 = Identity.create_random_identity() + acc1 = Address.create_from_identity(id1, full_shard_key=0) + + async with ClusterContext( + 1, acc1, small_coinbase=True + ) as clusters, jrpc_http_server_context(clusters[0].master): + master = clusters[0].master + slaves = clusters[0].slave_list + + # send 3 transactions, each in its own block + txs = [] + values = [12345, 67890, 99999] + for v in values: + tx = create_transfer_transaction( + shard_state=clusters[0].get_shard_state(2 | 0), + key=id1.get_key(), + from_address=acc1, + to_address=acc1, + value=v, + ) + self.assertTrue(slaves[0].add_tx(tx)) + txs.append(tx) + + block = await master.get_next_block_to_mine( + address=acc1, branch_value=0b10 + ) + self.assertTrue( + (await clusters[0].get_shard(2 | 0).add_block(block)) + ) + + # fetch all + resp = await send_request("getAllTransactions", ["0x0", "0x", "0xa"]) + self.assertEqual(len(resp["txList"]), 3) + tx_ids = {item["txId"] for item in resp["txList"]} + for tx in txs: + expected_id = ( + "0x" + + tx.get_hash().hex() + + acc1.full_shard_key.to_bytes(4, "big").hex() + ) + self.assertIn(expected_id, tx_ids) + returned_values = {item["value"] for item in resp["txList"]} + self.assertEqual(returned_values, {hex(v) for v in values}) + + # test limit: only fetch 2 + resp = await send_request("getAllTransactions", ["0x0", "0x", "0x2"]) + self.assertEqual(len(resp["txList"]), 2) + + # use "next" to fetch the remaining 1 + resp2 = await send_request( + "getAllTransactions", ["0x0", resp["next"], "0xa"] + ) + self.assertEqual(len(resp2["txList"]), 1) + + async def test_getTransactionsByAddress(self): + id1 = Identity.create_random_identity() + id2 = Identity.create_random_identity() + acc1 = Address.create_from_identity(id1, full_shard_key=0) + acc2 = Address.create_from_identity(id2, full_shard_key=0) + + async with ClusterContext( + 1, acc1, small_coinbase=True + ) as clusters, jrpc_http_server_context(clusters[0].master): + master = clusters[0].master + slaves = clusters[0].slave_list + + # tx1: acc1 -> acc1, value=12345 + tx1 = create_transfer_transaction( + shard_state=clusters[0].get_shard_state(2 | 0), + key=id1.get_key(), + from_address=acc1, + to_address=acc1, + value=12345, + ) + self.assertTrue(slaves[0].add_tx(tx1)) + + block = await master.get_next_block_to_mine( + address=acc1, branch_value=0b10 + ) + self.assertTrue((await clusters[0].get_shard(2 | 0).add_block(block))) + + # tx2: acc1 -> acc2, value=67890 + tx2 = create_transfer_transaction( + shard_state=clusters[0].get_shard_state(2 | 0), + key=id1.get_key(), + from_address=acc1, + to_address=acc2, + value=67890, + ) + self.assertTrue(slaves[0].add_tx(tx2)) + + block = await master.get_next_block_to_mine( + address=acc1, branch_value=0b10 + ) + self.assertTrue((await clusters[0].get_shard(2 | 0).add_block(block))) + + # query by acc1: should see both txs (as sender) + resp = await send_request( + "getTransactionsByAddress", + ["0x" + acc1.serialize().hex(), "0x", "0xa"], + ) + self.assertEqual(len(resp["txList"]), 2) + for item in resp["txList"]: + self.assertEqual( + item["fromAddress"], "0x" + acc1.serialize().hex() + ) + returned_values = {item["value"] for item in resp["txList"]} + self.assertEqual(returned_values, {hex(12345), hex(67890)}) + + # query by acc2: should see 1 tx (as receiver) + resp = await send_request( + "getTransactionsByAddress", + ["0x" + acc2.serialize().hex(), "0x", "0xa"], + ) + self.assertEqual(len(resp["txList"]), 1) + self.assertEqual(resp["txList"][0]["value"], hex(67890)) + + async def test_getTotalSupply(self): + id1 = Identity.create_random_identity() + acc1 = Address.create_from_identity(id1, full_shard_key=0) + + async with ClusterContext( + 1, acc1, small_coinbase=True + ) as clusters, jrpc_http_server_context(clusters[0].master): + master = clusters[0].master + resp = await send_request("getTotalSupply") + total_supply = master.get_total_supply() + self.assertEqual(resp, quantity_encoder(total_supply)) + + async def test_net_version(self): + id1 = Identity.create_random_identity() + acc1 = Address.create_from_identity(id1, full_shard_key=0) + + async with ClusterContext( + 1, acc1, small_coinbase=True + ) as clusters, jrpc_http_server_context(clusters[0].master): + master = clusters[0].master + resp = await send_request("net_version") + self.assertEqual( + resp, + quantity_encoder(master.env.quark_chain_config.NETWORK_ID), + ) + + async def test_getJrpcCalls(self): + id1 = Identity.create_random_identity() + acc1 = Address.create_from_identity(id1, full_shard_key=0) + + async with ClusterContext( + 1, acc1, small_coinbase=True + ) as clusters, jrpc_http_server_context(clusters[0].master): + # call networkInfo twice to register counters + await send_request("networkInfo") + await send_request("networkInfo") + + resp = await send_request("getJrpcCalls") + self.assertEqual(resp["networkInfo"], 2) + # getJrpcCalls itself should also be counted + self.assertEqual(resp["getJrpcCalls"], 1) + + async def test_eth_getBlockByNumber(self): + id1 = Identity.create_random_identity() + acc1 = Address.create_from_identity(id1, full_shard_key=0) + + async with ClusterContext( + 1, acc1, small_coinbase=True + ) as clusters, jrpc_http_server_context(clusters[0].master): + master = clusters[0].master + + block = await master.get_next_block_to_mine( + address=acc1, branch_value=0b10 + ) + self.assertTrue((await clusters[0].get_shard(2 | 0).add_block(block))) + + # by height + resp = await send_request("eth_getBlockByNumber", ["0x1", False]) + self.assertEqual(resp["number"], "0x1") + self.assertEqual(resp["hash"], data_encoder(block.header.get_hash())) + self.assertEqual( + resp["parentHash"], data_encoder(block.header.hash_prev_minor_block) + ) + self.assertEqual( + resp["miner"], + "0x" + block.header.coinbase_address.serialize().hex(), + ) + + # latest should return the same block + resp = await send_request("eth_getBlockByNumber", ["latest", False]) + self.assertEqual(resp["number"], "0x1") + self.assertEqual(resp["hash"], data_encoder(block.header.get_hash())) + + # non-existent + resp = await send_request("eth_getBlockByNumber", ["0xff", False]) + self.assertIsNone(resp) + + + # ------------------------------- Test for JSONRPCWebsocketServer ------------------------------- @contextmanager @@ -1632,7 +2009,7 @@ def test_logs(self): response = call_async(websocket.recv()) count += 1 d = json.loads(response) - self.assertDictContainsSubset(expected_log_parts, d["params"]["result"]) + self.assertTrue(expected_log_parts.items() <= d["params"]["result"].items()) self.assertEqual( "0xa9378d5bd800fae4d5b8d4c6712b2b64e8ecc86fdc831cb51944000fc7c8ecfa", d["params"]["result"]["topics"][0], diff --git a/quarkchain/jsonrpc_client.py b/quarkchain/jsonrpc_client.py new file mode 100644 index 000000000..16d978abb --- /dev/null +++ b/quarkchain/jsonrpc_client.py @@ -0,0 +1,74 @@ +# httpx is chosen over aiohttp because it provides both sync and async clients +# with a unified API, keeping this module simple and consistent. aiohttp is +# async-only, so the sync client (JsonRpcClient) would need a separate HTTP +# implementation (e.g. urllib.request). httpx as a pure client library is +# lightweight (~200KB with deps) and doesn't overlap with aiohttp's server role. +import httpx +import uuid + +class JsonRpcError(Exception): + def __init__(self, error): + self.code = error.get("code") + self.message = error.get("message") + self.data = error.get("data") + super().__init__(f"JSON-RPC Error {self.code}: {self.message}") + +class JsonRpcClient: + def __init__(self, url, timeout=10): + self.client = httpx.Client(base_url=url, timeout=timeout) + + def call(self, method, *params): + payload = { + "jsonrpc": "2.0", + "method": method, + "params": list(params), + "id": str(uuid.uuid4()), + } + + resp = self.client.post("", json=payload) + resp.raise_for_status() + data = resp.json() + + if "error" in data: + raise JsonRpcError(data["error"]) + + return data.get("result") + + def close(self): + self.client.close() + + +class AsyncJsonRpcClient: + def __init__(self, url, timeout=10): + self.client = httpx.AsyncClient(base_url=url, timeout=timeout) + + async def call(self, method, *params): + # JSON-RPC "params" can be a list (positional) or dict (named). + # The old jsonrpcclient library handled this internally; since we + # replaced it with a hand-rolled client we replicate the logic here: + # call("method", [a, b]) -> params = [a, b] (positional) + # call("method", {k: v}) -> params = {k: v} (named) + # call("method", a, b) -> params = [a, b] (positional) + if len(params) == 1 and isinstance(params[0], (dict, list)): + rpc_params = params[0] + else: + rpc_params = list(params) + + payload = { + "jsonrpc": "2.0", + "method": method, + "params": rpc_params, + "id": str(uuid.uuid4()), + } + + resp = await self.client.post("", json=payload) + resp.raise_for_status() + data = resp.json() + + if "error" in data: + raise JsonRpcError(data["error"]) + + return data.get("result") + + async def close(self): + await self.client.aclose() \ No newline at end of file diff --git a/quarkchain/tools/adjust_difficulty.py b/quarkchain/tools/adjust_difficulty.py index 26782ff24..7a65b0519 100644 --- a/quarkchain/tools/adjust_difficulty.py +++ b/quarkchain/tools/adjust_difficulty.py @@ -1,10 +1,10 @@ -import monitoring - import argparse import asyncio import json from datetime import datetime -from jsonrpc_async import Server + +from quarkchain.tools import monitoring +from quarkchain.jsonrpc_client import AsyncJsonRpcClient """ this is a centralized place that sets mining difficulty @@ -15,9 +15,9 @@ async def async_adjust(idx, server, root, minor, mining): - response = await server.setTargetBlockTime(root, minor) + response = await server.call("setTargetBlockTime", root, minor) print("idx={};response={}".format(idx, response)) - await server.setMining(mining) + await server.call("setMining", mining) async def async_adjust_difficulty(args): @@ -35,7 +35,7 @@ async def async_adjust_difficulty(args): if count == num_nodes: raise Exception("no change") servers = [ - (idx, Server("http://{}".format(cluster))) + (idx, AsyncJsonRpcClient("http://{}".format(cluster))) for idx, cluster in enumerate(clusters) ] await asyncio.gather( @@ -89,11 +89,11 @@ async def adjust_imbalanced_hashpower(args): clusters_rich = clusters[:num_rich] clusters_poor = clusters[num_rich:] servers_rich = [ - (idx, Server("http://{}".format(cluster))) + (idx, AsyncJsonRpcClient("http://{}".format(cluster))) for idx, cluster in enumerate(clusters_rich) ] servers_poor = [ - (idx, Server("http://{}".format(cluster))) + (idx, AsyncJsonRpcClient("http://{}".format(cluster))) for idx, cluster in enumerate(clusters_poor) ] rich_root = int(num_nodes * args.base_root / 9) diff --git a/quarkchain/tools/balance_watcher.py b/quarkchain/tools/balance_watcher.py index 3fc103517..972ef35bb 100644 --- a/quarkchain/tools/balance_watcher.py +++ b/quarkchain/tools/balance_watcher.py @@ -1,18 +1,15 @@ -import jsonrpcclient import time import logging import argparse import smtplib from email.message import EmailMessage - +from quarkchain.jsonrpc_client import JsonRpcClient HOST = "http://jrpc.mainnet.quarkchain.io" PORT = "38391" FORMAT = "%(asctime)-15s %(message)s" logging.basicConfig(format=FORMAT) -logging.getLogger("jsonrpcclient.client.request").setLevel(logging.WARNING) -logging.getLogger("jsonrpcclient.client.response").setLevel(logging.WARNING) logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -20,21 +17,24 @@ def query(endpoint, *args): retry, resp = 0, None while retry <= 5: + cli = JsonRpcClient(HOST + ":" + PORT) try: - resp = jsonrpcclient.request(HOST + ":" + PORT, endpoint, *args) + resp = cli.call(endpoint, *args) break except Exception: retry += 1 time.sleep(0.5) + finally: + cli.close() return resp def query_balance(recipient, chain_id, token_str): resp = query( "getBalances", - recipient.lower() + chain_id.to_bytes(2, byteorder="big").hex() + "0000", + "0x" + recipient.lower().lstrip("0x") + chain_id.to_bytes(2, byteorder="big").hex() + "0000", ) - for balance in resp.data.result["balances"]: + for balance in resp["balances"]: if balance["tokenStr"] == token_str: return int(balance["balance"], 16) return 0 diff --git a/quarkchain/tools/batch_deploy_contract.py b/quarkchain/tools/batch_deploy_contract.py index f1fd1cfcf..a91014276 100644 --- a/quarkchain/tools/batch_deploy_contract.py +++ b/quarkchain/tools/batch_deploy_contract.py @@ -1,26 +1,19 @@ import argparse -import aiohttp import asyncio -import logging import rlp -from jsonrpcclient.aiohttp_client import aiohttpClient from quarkchain.env import DEFAULT_ENV from quarkchain.core import Address, Identity from quarkchain.evm.transactions import Transaction as EvmTransaction +from quarkchain.jsonrpc_client import AsyncJsonRpcClient class Endpoint: def __init__(self, url): - self.url = url - asyncio.get_event_loop().run_until_complete(self.__create_session()) + self.client = AsyncJsonRpcClient(url) - async def __create_session(self): - self.session = aiohttp.ClientSession() - - async def __send_request(self, *args): - client = aiohttpClient(self.session, self.url) - response = await client.request(*args) + async def __send_request(self, method, *args): + response = await self.client.call(method, *args) return response async def send_transaction(self, tx): @@ -33,7 +26,7 @@ async def get_contract_address(self, tx_id): resp = await self.__send_request("getTransactionReceipt", tx_id) if not resp: return None - return resp["contract_address"] + return resp["contractAddress"] async def get_nonce(self, account): addressHex = "0x" + account.serialize().hex() @@ -42,11 +35,11 @@ async def get_nonce(self, account): async def get_shard_size(self): resp = await self.__send_request("networkInfo") - return int(resp["shard_size"], 16) + return int(resp["chainSize"], 16) async def get_network_id(self): resp = await self.__send_request("networkInfo") - return int(resp["network_id"], 16) + return int(resp["networkId"], 16) def create_transaction(address, key, nonce, data, network_id) -> EvmTransaction: @@ -110,10 +103,6 @@ def main(): parser.add_argument("--log_jrpc", default=False, type=bool) args = parser.parse_args() - if not args.log_jrpc: - logging.getLogger("jsonrpcclient.client.request").setLevel(logging.WARNING) - logging.getLogger("jsonrpcclient.client.response").setLevel(logging.WARNING) - data = bytes.fromhex(args.data) genesisId = Identity.create_from_key(DEFAULT_ENV.config.GENESIS_KEY) diff --git a/quarkchain/tools/bootnode_health_check.py b/quarkchain/tools/bootnode_health_check.py index 4b4138c3e..20dfc48b9 100644 --- a/quarkchain/tools/bootnode_health_check.py +++ b/quarkchain/tools/bootnode_health_check.py @@ -8,28 +8,18 @@ """ import argparse import asyncio -import logging +import os +import smtplib +import tempfile import time from datetime import datetime -import jsonrpcclient -import psutil -import numpy -from decimal import Decimal -import smtplib from quarkchain.cluster.cluster_config import ClusterConfig -from quarkchain.cluster.master import MasterServer from quarkchain.cluster.cluster import Cluster -import jsonrpcclient -import logging -import time -from datetime import datetime -import smtplib -import os -import tempfile +from quarkchain.jsonrpc_client import JsonRpcClient TIMEOUT = 10 PRIVATE_ENDPOINT = "http://{}:38491".format("localhost") -PRIVATE_CLIENT = jsonrpcclient.HTTPClient(PRIVATE_ENDPOINT) +PRIVATE_CLIENT = JsonRpcClient(PRIVATE_ENDPOINT, TIMEOUT) def now(): @@ -45,9 +35,7 @@ async def run(self): def check_routing_table(timeout=TIMEOUT): - result = PRIVATE_CLIENT.send( - jsonrpcclient.Request("getKadRoutingTable"), timeout=timeout - ) + result = PRIVATE_CLIENT.call("getKadRoutingTable") if len(result) == 0: print("Bootstrap node can not provide the routing table for a while!") subject = "Boostrap Node Alert!" diff --git a/quarkchain/tools/check_syncing_state.py b/quarkchain/tools/check_syncing_state.py index bd1ce861a..2763cf6f8 100644 --- a/quarkchain/tools/check_syncing_state.py +++ b/quarkchain/tools/check_syncing_state.py @@ -1,39 +1,25 @@ #! /usr/bin/env pypy3 import argparse -import logging import time from datetime import datetime -import jsonrpcclient -import psutil -import numpy -from decimal import Decimal +from quarkchain.jsonrpc_client import JsonRpcClient TIMEOUT=10 -# disable jsonrpcclient verbose logging -logging.getLogger("jsonrpcclient.client.request").setLevel(logging.WARNING) -logging.getLogger("jsonrpcclient.client.response").setLevel(logging.WARNING) - - def now(): return datetime.now().strftime("%Y-%m-%d %H:%M:%S") -def checkHeight(private_client, public_client, timeout=TIMEOUT): - result_private = private_client.send( - jsonrpcclient.Request("getRootBlockByHeight"), - timeout=timeout,) - result_public = public_client.send( - jsonrpcclient.Request("getRootBlockByHeight"), - timeout=timeout,) +def checkHeight(private_client: JsonRpcClient, public_client: JsonRpcClient): + result_private = private_client.call("getRootBlockByHeight") + result_public = public_client.call("getRootBlockByHeight") return { "height": int(result_private["height"], 16), "currentHeight": int(result_public["height"], 16), } - -def query_height(private_client, public_client, args): +def query_height(private_client: JsonRpcClient, public_client: JsonRpcClient, args): format = "{time:20} {syncing:>15}{height:>30}{currentHeight:>30}" print( format.format( @@ -75,10 +61,10 @@ def main(): args = parser.parse_args() private_endpoint = "http://{}:38391".format(args.ip) - private_client = jsonrpcclient.HTTPClient(private_endpoint) + private_client = JsonRpcClient(private_endpoint, TIMEOUT) public_endpoint = "http://{}:38391".format(args.bootstrapip) - public_client = jsonrpcclient.HTTPClient(public_endpoint) + public_client = JsonRpcClient(public_endpoint, TIMEOUT) diff --git a/quarkchain/tools/count_total_balance.py b/quarkchain/tools/count_total_balance.py index 0cd83d685..d29673bdf 100644 --- a/quarkchain/tools/count_total_balance.py +++ b/quarkchain/tools/count_total_balance.py @@ -2,37 +2,29 @@ import functools import logging from typing import List, Tuple, Dict, Any - -import jsonrpcclient +from quarkchain.jsonrpc_client import JsonRpcClient logging.root.setLevel(logging.INFO) log_format = "%(asctime)s: %(message)s" logging.basicConfig(format=log_format, datefmt="%Y-%m-%d %H:%M:%S") -# disable jsonrpcclient verbose logging -logging.getLogger("jsonrpcclient.client.request").setLevel(logging.WARNING) -logging.getLogger("jsonrpcclient.client.response").setLevel(logging.WARNING) - TIMEOUT = 10 TOTAL_SHARD = 8 @functools.lru_cache(maxsize=5) -def get_jsonrpc_cli(jrpc_url): - return jsonrpcclient.HTTPClient(jrpc_url) +def get_jsonrpc_cli(jrpc_url, timeout=10): + return JsonRpcClient(jrpc_url, timeout) class Fetcher(object): def __init__(self, host: str, timeout: int): - self.cli = get_jsonrpc_cli(host) + self.cli = get_jsonrpc_cli(host, timeout) self.timeout = timeout self.shard_to_latest_id = {} def _get_root_block(self, root_block_height: int) -> Dict[str, Any]: - res = self.cli.send( - jsonrpcclient.Request("getRootBlockByHeight", hex(root_block_height)), - timeout=self.timeout, - ) + res = self.cli.call("getRootBlockByHeight", hex(root_block_height)) if not res: raise RuntimeError( "Failed to query root block at height" % root_block_height @@ -62,12 +54,7 @@ def get_latest_minor_block_id_from_root_block( def count_total_balance( self, block_id: str, root_block_id: str, token_id: int, start: str ) -> Tuple[int, str]: - res = self.cli.send( - jsonrpcclient.Request( - "getTotalBalance", block_id, root_block_id, hex(token_id), start - ), - timeout=self.timeout, - ) + res = self.cli.call("getTotalBalance", block_id, root_block_id, hex(token_id), start) if not res: raise RuntimeError("Failed to count total balance") return int(res["totalBalance"], 16), res["next"] diff --git a/quarkchain/tools/erc20_balance_watcher.py b/quarkchain/tools/erc20_balance_watcher.py index 4456c0ad1..110636422 100644 --- a/quarkchain/tools/erc20_balance_watcher.py +++ b/quarkchain/tools/erc20_balance_watcher.py @@ -1,11 +1,9 @@ -# jsonrpcclient==3.* -# requests==2.* -import jsonrpcclient import time import logging import argparse import smtplib from email.message import EmailMessage +from quarkchain.jsonrpc_client import JsonRpcClient HOST = "https://eth.llamarpc.com" @@ -13,8 +11,6 @@ FORMAT = "%(asctime)-15s %(message)s" logging.basicConfig(format=FORMAT) -logging.getLogger("jsonrpcclient.client.request").setLevel(logging.WARNING) -logging.getLogger("jsonrpcclient.client.response").setLevel(logging.WARNING) logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -22,12 +18,15 @@ def query(endpoint, args): retry, resp = 0, None while retry <= 5: + cli = JsonRpcClient(HOST + ":" + PORT) try: - resp = jsonrpcclient.request(HOST + ":" + PORT, endpoint, *args) + resp = cli.call(endpoint, *args) break except Exception: retry += 1 time.sleep(0.5) + finally: + cli.close() return resp @@ -36,7 +35,7 @@ def query_balance(recipient): "eth_call", [{"from": None, "to": "0xea26c4ac16d4a5a106820bc8aee85fd0b7b2b664", "data": "0x70a08231"+int(recipient, 0).to_bytes(32, byteorder="big").hex()}, "latest"] ) - return int(resp.data.result, 0) + return int(resp, 0) def main(): diff --git a/quarkchain/tools/external_miner.py b/quarkchain/tools/external_miner.py index 481fc66bb..4b14a29cb 100644 --- a/quarkchain/tools/external_miner.py +++ b/quarkchain/tools/external_miner.py @@ -1,7 +1,6 @@ import argparse import copy import functools -import logging import random import signal import threading @@ -9,17 +8,12 @@ from itertools import cycle from typing import Dict, Optional, List, Tuple -import jsonrpcclient from queue import LifoQueue from quarkchain.cluster.miner import Miner, MiningWork, MiningResult from quarkchain.cluster.cluster_config import ClusterConfig from quarkchain.utils import int_left_most_bit - -# disable jsonrpcclient verbose logging - -logging.getLogger("jsonrpcclient.client.request").setLevel(logging.WARNING) -logging.getLogger("jsonrpcclient.client.response").setLevel(logging.WARNING) +from quarkchain.jsonrpc_client import JsonRpcClient TIMEOUT = 10 @@ -29,7 +23,7 @@ @functools.lru_cache(maxsize=5) def get_jsonrpc_cli(jrpc_url): - return jsonrpcclient.HTTPClient(jrpc_url) + return JsonRpcClient(jrpc_url, TIMEOUT) def get_work_rpc( @@ -40,12 +34,7 @@ def get_work_rpc( ) -> MiningWork: jrpc_url = "http://{}:{}".format(host, jrpc_port) cli = get_jsonrpc_cli(jrpc_url) - header_hash, height, diff = cli.send( - jsonrpcclient.Request( - "getWork", hex(full_shard_id) if full_shard_id is not None else None - ), - timeout=timeout, - ) + header_hash, height, diff = cli.call("getWork", hex(full_shard_id) if full_shard_id is not None else None) return MiningWork(bytes.fromhex(header_hash[2:]), int(height, 16), int(diff, 16)) @@ -58,15 +47,12 @@ def submit_work_rpc( ) -> bool: jrpc_url = "http://{}:{}".format(host, jrpc_port) cli = get_jsonrpc_cli(jrpc_url) - success = cli.send( - jsonrpcclient.Request( - "submitWork", - hex(full_shard_id) if full_shard_id is not None else None, - "0x" + res.header_hash.hex(), - hex(res.nonce), - "0x" + res.mixhash.hex(), - ), - timeout=timeout, + success = cli.call( + "submitWork", + hex(full_shard_id) if full_shard_id is not None else None, + "0x" + res.header_hash.hex(), + hex(res.nonce), + "0x" + res.mixhash.hex(), ) return success diff --git a/quarkchain/tools/fund_testnet.py b/quarkchain/tools/fund_testnet.py index b550fd5a3..088316f32 100644 --- a/quarkchain/tools/fund_testnet.py +++ b/quarkchain/tools/fund_testnet.py @@ -1,33 +1,25 @@ import argparse -import aiohttp import asyncio import logging -import pickle import random import rlp from collections import defaultdict -from jsonrpcclient.aiohttp_client import aiohttpClient from typing import Dict, List from quarkchain.env import DEFAULT_ENV from quarkchain.core import Address, Identity from quarkchain.evm.transactions import Transaction as EvmTransaction - +from quarkchain.jsonrpc_client import AsyncJsonRpcClient class Endpoint: def __init__(self, url): - self.url = url - asyncio.get_event_loop().run_until_complete(self.__create_session()) - - async def __create_session(self): - self.session = aiohttp.ClientSession() + self.client = AsyncJsonRpcClient(url) - async def __send_request(self, *args): - client = aiohttpClient(self.session, self.url) + async def __send_request(self, method, *args): # manual retry since the library has hard-coded timeouts while True: try: - response = await client.request(*args) + response = await self.client.call(method, *args) break except Exception as e: print("{} !timeout! retrying {}".format(self.url, e)) @@ -51,11 +43,11 @@ async def get_nonce(self, account): async def get_shard_size(self): resp = await self.__send_request("networkInfo") - return int(resp["shard_size"], 16) + return int(resp["chainSize"], 16) async def get_network_id(self): resp = await self.__send_request("networkInfo") - return int(resp["network_id"], 16) + return int(resp["networkId"], 16) def create_transaction(address, key, nonce, to, network_id, amount) -> EvmTransaction: @@ -93,7 +85,7 @@ async def fund_shard(endpoint, genesisId, to, network_id, shard, amount): print("retry tx={}".format(tx_id)) await endpoint.send_transaction(tx) - height = int(resp["block_height"], 16) + height = int(resp["blockHeight"], 16) status = int(resp["status"], 16) print( "shard={} tx={} block={} status={} amount={}".format( @@ -167,10 +159,6 @@ def main(): parser.add_argument("--tqkc_file", required=True, type=str) args = parser.parse_args() - if not args.log_jrpc: - logging.getLogger("jsonrpcclient.client.request").setLevel(logging.WARNING) - logging.getLogger("jsonrpcclient.client.response").setLevel(logging.WARNING) - genesisId = Identity.create_from_key(DEFAULT_ENV.config.GENESIS_KEY) endpoint = Endpoint("http://" + args.jrpc_endpoint) diff --git a/quarkchain/tools/monitoring.py b/quarkchain/tools/monitoring.py index 3ce3139a6..0ccf6b28f 100644 --- a/quarkchain/tools/monitoring.py +++ b/quarkchain/tools/monitoring.py @@ -1,4 +1,3 @@ -import jsonrpcclient import ipaddress import argparse @@ -7,7 +6,7 @@ from datetime import datetime import asyncio -from jsonrpc_async import Server +from quarkchain.jsonrpc_client import JsonRpcClient, AsyncJsonRpcClient """ @@ -19,7 +18,15 @@ def fetch_peers(ip, jrpc_port): json_rpc_url = "http://{}:{}".format(ip, jrpc_port) print("calling {}".format(json_rpc_url)) - peers = jsonrpcclient.request(json_rpc_url, "getPeers") + cli = JsonRpcClient(json_rpc_url) + try: + peers = cli.call("getPeers") + print("success {}".format(json_rpc_url)) + except Exception: + print("Failed to get peers from {}".format(json_rpc_url)) + return [] + finally: + cli.close() return [ "{}:{}".format(ipaddress.ip_address(int(p["ip"], 16)), int(p["port"], 16)) for p in peers["peers"] @@ -32,13 +39,13 @@ async def fetch_peers_async(node): :return: list of tuple(ip, p2p_port, jrpc_port) """ json_rpc_url = "http://{}:{}".format(node[0], node[2]) - server = Server(json_rpc_url) + server = AsyncJsonRpcClient(json_rpc_url, timeout=5) try: - peers = await asyncio.wait_for(server.get_peers(), 5) + peers = await server.call("getPeers") except Exception: print("Failed to get peers from {}".format(json_rpc_url)) peers = {"peers": []} - await server.session.close() + await server.close() return [ ( str(ipaddress.ip_address(int(p["ip"], 16))), @@ -149,13 +156,13 @@ def print_all_clusters(ip, p2p_port, jrpc_port, ip_lookup={}): async def async_stats(idx, server): - response = await server.get_stats() + response = await server.call("getStats") print("idx={};{}={}".format(idx, CONST_METRIC, response[CONST_METRIC])) async def async_watch(clusters): servers = [ - (idx, Server("http://{}".format(cluster))) + (idx, AsyncJsonRpcClient("http://{}".format(cluster))) for idx, cluster in enumerate(clusters) ] while True: diff --git a/quarkchain/tools/reorg_detector.py b/quarkchain/tools/reorg_detector.py index 774ade891..a2d0fc72d 100644 --- a/quarkchain/tools/reorg_detector.py +++ b/quarkchain/tools/reorg_detector.py @@ -1,9 +1,9 @@ -import jsonrpcclient import time import logging import argparse import smtplib from email.message import EmailMessage +from quarkchain.jsonrpc_client import JsonRpcClient HOST = "http://jrpc.mainnet.quarkchain.io" @@ -11,8 +11,6 @@ FORMAT = "%(asctime)-15s %(message)s" logging.basicConfig(format=FORMAT) -logging.getLogger("jsonrpcclient.client.request").setLevel(logging.WARNING) -logging.getLogger("jsonrpcclient.client.response").setLevel(logging.WARNING) logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -20,25 +18,28 @@ def query(endpoint, *args): retry, resp = 0, None while retry <= 5: + cli = JsonRpcClient(HOST + ":" + PORT) try: - resp = jsonrpcclient.request(HOST + ":" + PORT, endpoint, *args) + resp = cli.call(endpoint, *args) break except Exception: retry += 1 time.sleep(0.5) + finally: + cli.close() return resp def query_tip(): resp = query("getRootBlockByHeight", None) - return int(resp.data.result["height"], 16), resp.data.result["hash"] + return int(resp["height"], 16), resp["hash"] def query_rblock_with_height(height): if isinstance(height, int): height = hex(height) resp = query("getRootBlockByHeight", height) - return int(resp.data.result["height"], 16), resp.data.result["hash"] + return int(resp["height"], 16), resp["hash"] def main(): diff --git a/quarkchain/tools/stats b/quarkchain/tools/stats.py old mode 100755 new mode 100644 similarity index 84% rename from quarkchain/tools/stats rename to quarkchain/tools/stats.py index fe637d8e2..d2688432a --- a/quarkchain/tools/stats +++ b/quarkchain/tools/stats.py @@ -1,18 +1,12 @@ #! /usr/bin/env python3 import argparse -import logging import time from datetime import datetime -import jsonrpcclient import psutil import numpy from decimal import Decimal - - -# disable jsonrpcclient verbose logging -logging.getLogger("jsonrpcclient.client.request").setLevel(logging.WARNING) -logging.getLogger("jsonrpcclient.client.response").setLevel(logging.WARNING) +from quarkchain.jsonrpc_client import JsonRpcClient def now(): @@ -23,8 +17,8 @@ def fstr(v: float): return "{:.2f}".format(v) -def basic(client, ip): - s = client.send(jsonrpcclient.Request("getStats")) +def basic(client: JsonRpcClient, ip): + s = client.call("getStats") msg = "QuarkChain Cluster Stats\n\n" msg += "CPU: {}\n".format(psutil.cpu_count()) msg += "Memory: {} GB\n".format( @@ -37,8 +31,8 @@ def basic(client, ip): return msg -def stats(client): - s = client.send(jsonrpcclient.Request("getStats")) +def stats(client: JsonRpcClient): + s = client.call("getStats") return { "time": now(), "syncing": str(s["syncing"]), @@ -60,7 +54,7 @@ def stats(client): } -def query_stats(client, args): +def query_stats(client: JsonRpcClient, args): if args.verbose: format = "{time:20} {syncing:>8} {tps:>5} {pendingTx:>10} {confirmedTx:>10} {bps:>9} {sbps:>9} {cpu:>9} {root:>7} {shards}" else: @@ -91,7 +85,7 @@ def format_qkc(qkc: Decimal): return "{:.18f}".format(qkc).rstrip("0").rstrip(".") -def query_address(client, args): +def query_address(client: JsonRpcClient, args): address_hex = args.address.lower().lstrip("0").lstrip("x") token_str = args.token.upper() assert len(address_hex) == 48 @@ -103,11 +97,7 @@ def query_address(client, args): ) while True: - data = client.send( - jsonrpcclient.Request( - "getAccountData", address="0x" + address_hex, include_shards=True - ) - ) + data = client.call("getAccountData", address="0x" + address_hex, include_shards=True) shards_wei = [] for shard_balance in data["shards"]: for token_balances in shard_balance["balances"]: @@ -152,9 +142,9 @@ def main(): args = parser.parse_args() private_endpoint = "http://{}:38491".format(args.ip) - private_client = jsonrpcclient.HTTPClient(private_endpoint) + private_client = JsonRpcClient(private_endpoint) public_endpoint = "http://{}:38391".format(args.ip) - public_client = jsonrpcclient.HTTPClient(public_endpoint) + public_client = JsonRpcClient(public_endpoint) print(basic(private_client, args.ip))