Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions quarkchain/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async def run_master(self):
extra_cmd += " --enable_profiler=true"
master = await run_master(self.config.json_filepath, extra_cmd)
prefix = "{}MASTER".format(self.cluster_id)
asyncio.ensure_future(print_output(prefix, master.stdout))
asyncio.create_task(print_output(prefix, master.stdout))
self.procs.append((prefix, master))

async def run_slaves(self):
Expand All @@ -117,7 +117,7 @@ async def run_slaves(self):
slave.ID in self.args.profile.split(","),
)
prefix = "{}SLAVE_{}".format(self.cluster_id, slave.ID)
asyncio.ensure_future(print_output(prefix, s.stdout))
asyncio.create_task(print_output(prefix, s.stdout))
self.procs.append((prefix, s))

async def run_prom(self):
Expand Down Expand Up @@ -149,10 +149,10 @@ async def shutdown(self):

def start_and_loop(self):
try:
asyncio.get_event_loop().run_until_complete(self.run())
asyncio.run(self.run())
except KeyboardInterrupt:
try:
asyncio.get_event_loop().run_until_complete(self.shutdown())
asyncio.run(self.shutdown())
except Exception:
pass

Expand Down
32 changes: 16 additions & 16 deletions quarkchain/cluster/jsonrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,31 +470,31 @@ def _parse_log_request(
# noinspection PyPep8Naming
class JSONRPCHttpServer:
@classmethod
def start_public_server(cls, env, master_server):
async def start_public_server(cls, env, master_server):
server = cls(
env,
master_server,
env.cluster_config.JSON_RPC_PORT,
env.cluster_config.JSON_RPC_HOST,
public_methods,
)
server.start()
await server.start()
return server

@classmethod
def start_private_server(cls, env, master_server):
async def start_private_server(cls, env, master_server):
server = cls(
env,
master_server,
env.cluster_config.PRIVATE_JSON_RPC_PORT,
env.cluster_config.PRIVATE_JSON_RPC_HOST,
private_methods,
)
server.start()
await server.start()
return server

@classmethod
def start_test_server(cls, env, master_server):
async def start_test_server(cls, env, master_server):
methods = AsyncMethods()
for method in public_methods.values():
methods.add(method)
Expand All @@ -507,7 +507,7 @@ def start_test_server(cls, env, master_server):
env.cluster_config.JSON_RPC_HOST,
methods,
)
server.start()
await server.start()
return server

def __init__(
Expand Down Expand Up @@ -549,7 +549,7 @@ async def __handle(self, request):
return web.Response()
return web.json_response(response, status=response.http_status)

def start(self):
async def start(self):
app = web.Application(client_max_size=JSON_RPC_CLIENT_REQUEST_MAX_SIZE)
cors = aiohttp_cors.setup(app)
route = app.router.add_post("/", self.__handle)
Expand All @@ -565,12 +565,12 @@ def start(self):
},
)
self.runner = web.AppRunner(app, access_log=None)
self.loop.run_until_complete(self.runner.setup())
await self.runner.setup()
site = web.TCPSite(self.runner, self.host, self.port)
self.loop.run_until_complete(site.start())
await site.start()

def shutdown(self):
self.loop.run_until_complete(self.runner.cleanup())
async def shutdown(self):
await self.runner.cleanup()

# JSON RPC handlers
@public_methods.add
Expand Down Expand Up @@ -1452,15 +1452,15 @@ def get_data_default(key, decoder, default=None):

class JSONRPCWebsocketServer:
@classmethod
def start_websocket_server(cls, env, slave_server):
async def start_websocket_server(cls, env, slave_server):
server = cls(
env,
slave_server,
env.slave_config.WEBSOCKET_JSON_RPC_PORT,
env.slave_config.HOST,
public_methods,
)
server.start()
await server.start()
return server

def __init__(
Expand Down Expand Up @@ -1531,11 +1531,11 @@ async def __handle(self, websocket, path):
except:
pass

def start(self):
async def start(self):
start_server = websockets.serve(self.__handle, self.host, self.port)
self.loop.run_until_complete(start_server)
await start_server

def shutdown(self):
async def shutdown(self):
pass # TODO

@staticmethod
Expand Down
57 changes: 32 additions & 25 deletions quarkchain/cluster/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ def __init__(
self.full_shard_id_list = full_shard_id_list
check(len(full_shard_id_list) > 0)

asyncio.ensure_future(self.active_and_loop_forever())
self._loop_task = asyncio.create_task(self.active_and_loop_forever())

def get_connection_to_forward(self, metadata):
"""Override ProxyConnection.get_connection_to_forward()
Expand Down Expand Up @@ -763,7 +763,7 @@ class MasterServer:
"""

def __init__(self, env, root_state, name="master"):
self.loop = asyncio.get_event_loop()
self.loop = asyncio.get_running_loop()
self.env = env
self.root_state = root_state # type: RootState
self.network = None # will be set by network constructor
Expand Down Expand Up @@ -849,7 +849,7 @@ async def __connect(self, host, port):
while True:
try:
reader, writer = await asyncio.open_connection(
host, port, loop=self.loop
host, port
)
break
except Exception as e:
Expand Down Expand Up @@ -1068,29 +1068,31 @@ async def __init_cluster(self):
self.cluster_active_future.set_result(None)

def start(self):
self.loop.create_task(self.__init_cluster())
self._init_task = self.loop.create_task(self.__init_cluster())

def do_loop(self, callbacks: List[Callable]):
async def do_loop(self, callbacks: List[Callable]):
if self.env.arguments.enable_profiler:
profile = cProfile.Profile()
profile.enable()

try:
self.loop.run_until_complete(self.shutdown_future)
await self.shutdown_future
except KeyboardInterrupt:
pass
finally:
for callback in callbacks:
if callable(callback):
callback()
result = callback()
if asyncio.iscoroutine(result):
await result

if self.env.arguments.enable_profiler:
profile.disable()
profile.print_stats("time")

def wait_until_cluster_active(self):
async def wait_until_cluster_active(self):
# Wait until cluster is ready
self.loop.run_until_complete(self.cluster_active_future)
await self.cluster_active_future

def shutdown(self):
# TODO: May set exception and disconnect all slaves
Expand All @@ -1100,6 +1102,8 @@ def shutdown(self):
self.cluster_active_future.set_exception(
RuntimeError("failed to start the cluster")
)
if hasattr(self, '_init_task') and self._init_task and not self._init_task.done():
self._init_task.cancel()

def get_shutdown_future(self):
return self.shutdown_future
Expand Down Expand Up @@ -1848,21 +1852,17 @@ def parse_args():
return env


def main():
async def _main_async(env):
from quarkchain.cluster.jsonrpc import JSONRPCHttpServer

os.chdir(os.path.dirname(os.path.abspath(__file__)))

env = parse_args()
loop = asyncio.get_event_loop()
root_state = RootState(env)
master = MasterServer(env, root_state)

if env.arguments.check_db:
master.start()
master.wait_until_cluster_active()
asyncio.ensure_future(master.check_db())
master.do_loop([])
await master.wait_until_cluster_active()
asyncio.create_task(master.check_db())
await master.do_loop([])
return

# p2p discovery mode will disable master-slave communication and JSONRPC
Expand All @@ -1875,31 +1875,38 @@ def main():
# only start the cluster if not in discovery-only mode
if start_master:
master.start()
master.wait_until_cluster_active()
await master.wait_until_cluster_active()

# kick off simulated mining if enabled
if env.cluster_config.START_SIMULATED_MINING:
asyncio.ensure_future(master.start_mining())
asyncio.create_task(master.start_mining())

if env.cluster_config.use_p2p():
network = P2PManager(env, master, loop)
network = P2PManager(env, master)
else:
network = SimpleNetwork(env, master, loop)
network.start()
network = SimpleNetwork(env, master)
await network.start()

callbacks = [network.shutdown]
if env.cluster_config.ENABLE_PUBLIC_JSON_RPC:
public_json_rpc_server = JSONRPCHttpServer.start_public_server(env, master)
public_json_rpc_server = await JSONRPCHttpServer.start_public_server(env, master)
callbacks.append(public_json_rpc_server.shutdown)

if env.cluster_config.ENABLE_PRIVATE_JSON_RPC:
private_json_rpc_server = JSONRPCHttpServer.start_private_server(env, master)
private_json_rpc_server = await JSONRPCHttpServer.start_private_server(env, master)
callbacks.append(private_json_rpc_server.shutdown)

master.do_loop(callbacks)
await master.do_loop(callbacks)

Logger.info("Master server is shutdown")


def main():
os.chdir(os.path.dirname(os.path.abspath(__file__)))

env = parse_args()
asyncio.run(_main_async(env))


if __name__ == "__main__":
main()
8 changes: 6 additions & 2 deletions quarkchain/cluster/miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,11 @@ def __init__(
# key can be None, meaning default coinbase address from local config
self.current_works = LRUCache(128)
self.root_signer_private_key = root_signer_private_key
self._mining_task = None

def start(self):
self.enabled = True
self._mine_new_block_async()
self._mining_task = self._mine_new_block_async()

def is_enabled(self):
return self.enabled
Expand All @@ -201,6 +202,9 @@ def disable(self):
# end the mining process
self.input_q.put((None, {}))
self.enabled = False
if self._mining_task and not self._mining_task.done():
self._mining_task.cancel()
self._mining_task = None

def _mine_new_block_async(self):
async def handle_mined_block():
Expand Down Expand Up @@ -266,7 +270,7 @@ async def mine_new_block():
# no-op if enabled or mining remotely
if not self.enabled or self.remote:
return None
return asyncio.ensure_future(mine_new_block())
return asyncio.create_task(mine_new_block())

async def get_work(self, coinbase_addr: Address, now=None) -> (MiningWork, Block):
if not self.remote:
Expand Down
19 changes: 6 additions & 13 deletions quarkchain/cluster/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def __init__(
op_ser_map,
op_non_rpc_map,
op_rpc_map,
loop=None,
metadata_class=None,
name=None,
command_size_limit=None,
Expand All @@ -34,7 +33,6 @@ def __init__(
op_ser_map,
op_non_rpc_map,
op_rpc_map,
loop=loop,
metadata_class=metadata_class,
name=name,
command_size_limit=command_size_limit,
Expand Down Expand Up @@ -110,12 +108,11 @@ def __init__(
op_ser_map,
op_non_rpc_map,
op_rpc_map,
loop=None,
metadata_class=Metadata,
name=None,
):
super().__init__(
op_ser_map, op_non_rpc_map, op_rpc_map, loop, metadata_class, name=name
op_ser_map, op_non_rpc_map, op_rpc_map, metadata_class, name=name
)
self.read_deque = deque()
self.read_event = asyncio.Event()
Expand Down Expand Up @@ -147,7 +144,7 @@ def get_metadata_to_write(self, metadata):

class NullConnection(AbstractConnection):
def __init__(self):
super().__init__(dict(), dict(), dict(), None, Metadata, name="NULL_CONNECTION")
super().__init__(dict(), dict(), dict(), name="NULL_CONNECTION")

def write_raw_data(self, metadata, raw_data):
pass
Expand Down Expand Up @@ -192,8 +189,7 @@ def __init__(
op_ser_map,
op_non_rpc_map,
op_rpc_map,
loop=None,
metadata_class=None,
name=None,
command_size_limit=None,
):
super().__init__(
Expand All @@ -203,9 +199,8 @@ def __init__(
op_ser_map,
op_non_rpc_map,
op_rpc_map,
loop,
P2PMetadata,
name=metadata_class,
metadata_class=P2PMetadata,
name=name,
command_size_limit=command_size_limit,
)

Expand Down Expand Up @@ -233,7 +228,6 @@ def __init__(
op_ser_map,
op_non_rpc_map,
op_rpc_map,
loop=None,
name=None,
):
super().__init__(
Expand All @@ -243,8 +237,7 @@ def __init__(
op_ser_map,
op_non_rpc_map,
op_rpc_map,
loop,
ClusterMetadata,
metadata_class=ClusterMetadata,
name=name,
)
self.peer_rpc_ids = dict()
Expand Down
Loading
Loading