From dc3b3b87be1f3630e05576f426dfa38d1afc0525 Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Fri, 27 Nov 2020 09:18:48 +0200 Subject: [PATCH] Added some basic debug and relay json-rpc calls (#305) --- tests/v2/test_jsonrpc_waku.nim | 93 +++++++++++++++++++---- waku/v2/node/jsonrpc/debug_api.nim | 14 ++++ waku/v2/node/jsonrpc/jsonrpc_callsigs.nim | 11 +++ waku/v2/node/jsonrpc/relay_api.nim | 58 ++++++++++++++ waku/v2/node/jsonrpc/store_api.nim | 2 +- 5 files changed, 164 insertions(+), 14 deletions(-) create mode 100644 waku/v2/node/jsonrpc/debug_api.nim create mode 100644 waku/v2/node/jsonrpc/relay_api.nim diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index d47f3b57b9..c52665e5dd 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -7,10 +7,11 @@ import libp2p/protobuf/minprotobuf, libp2p/stream/[bufferstream, connection], libp2p/crypto/crypto, + libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/rpc/message, ../../waku/v2/waku_types, ../../waku/v2/node/wakunode2, - ../../waku/v2/node/jsonrpc/[jsonrpc_types,store_api], + ../../waku/v2/node/jsonrpc/[jsonrpc_types,store_api,relay_api,debug_api], ../../waku/v2/protocol/message_notifier, ../../waku/v2/protocol/waku_store/waku_store, ../test_helpers @@ -19,21 +20,87 @@ template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0] const sigPath = sourceDir / ParDir / ParDir / "waku" / "v2" / "node" / "jsonrpc" / "jsonrpc_callsigs.nim" createRpcSigs(RpcHttpClient, sigPath) -suite "Waku v2 JSON-RPC API": +procSuite "Waku v2 JSON-RPC API": + const defaultTopic = "/waku/2/default-waku/proto" + const testCodec = "/waku/2/default-waku/codec" - asyncTest "get_waku_v2_store_v1_messages": - const defaultTopic = "/waku/2/default-waku/proto" - const testCodec = "/waku/2/default-waku/codec" + let + rng = crypto.newRng() + privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() + bindIp = ValidIpAddress.init("0.0.0.0") + extIp = ValidIpAddress.init("127.0.0.1") + port = Port(9000) + node = WakuNode.init(privkey, bindIp, port, some(extIp), some(port)) - # WakuNode setup + asyncTest "debug_api": + waitFor node.start() + + waitFor node.mountRelay() + + # RPC server setup let - rng = crypto.newRng() - privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() - bindIp = ValidIpAddress.init("0.0.0.0") - extIp = ValidIpAddress.init("127.0.0.1") - port = Port(9000) - node = WakuNode.init(privkey, bindIp, port, some(extIp), some(port)) + rpcPort = Port(8545) + ta = initTAddress(bindIp, rpcPort) + server = newRpcHttpServer([ta]) + + installDebugApiHandlers(node, server) + server.start() + + let client = newRpcHttpClient() + await client.connect("127.0.0.1", rpcPort) + + let response = await client.get_waku_v2_debug_v1_info() + + check: + response.listenStr == $node.peerInfo.addrs[0] & "/p2p/" & $node.peerInfo.peerId + + server.stop() + server.close() + waitfor node.stop() + + asyncTest "relay_api": + waitFor node.start() + + waitFor node.mountRelay() + + # RPC server setup + let + rpcPort = Port(8545) + ta = initTAddress(bindIp, rpcPort) + server = newRpcHttpServer([ta]) + + installRelayApiHandlers(node, server) + server.start() + + let client = newRpcHttpClient() + await client.connect("127.0.0.1", rpcPort) + + check: + # At this stage the node is only subscribed to the default topic + PubSub(node.wakuRelay).topics.len == 1 + + # Subscribe to new topics + let newTopics = @["1","2","3"] + var response = await client.post_waku_v2_relay_v1_subscriptions(newTopics) + + check: + # Node is now subscribed to default + new topics + PubSub(node.wakuRelay).topics.len == 1 + newTopics.len + response == true + + # Unsubscribe from new topics + response = await client.delete_waku_v2_relay_v1_subscriptions(newTopics) + + check: + # Node is now unsubscribed from new topics + PubSub(node.wakuRelay).topics.len == 1 + response == true + + server.stop() + server.close() + waitfor node.stop() + asyncTest "store_api": waitFor node.start() waitFor node.mountRelay(@[defaultTopic]) @@ -44,7 +111,7 @@ suite "Waku v2 JSON-RPC API": ta = initTAddress(bindIp, rpcPort) server = newRpcHttpServer([ta]) - setupWakuJSONRPC(node, server) + installStoreApiHandlers(node, server) server.start() # WakuStore setup diff --git a/waku/v2/node/jsonrpc/debug_api.nim b/waku/v2/node/jsonrpc/debug_api.nim new file mode 100644 index 0000000000..cea37b172b --- /dev/null +++ b/waku/v2/node/jsonrpc/debug_api.nim @@ -0,0 +1,14 @@ +import + json_rpc/rpcserver, + ../../waku_types, + ../wakunode2 + +proc installDebugApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = + + ## Debug API version 1 definitions + + rpcsrv.rpc("get_waku_v2_debug_v1_info") do() -> WakuInfo: + ## Returns information about WakuNode + debug "get_waku_v2_debug_v1_info" + + return node.info() diff --git a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim index c6b5d97f2e..000f8176df 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim @@ -1 +1,12 @@ +# Debug API + +proc get_waku_v2_debug_v1_info(): WakuInfo + +# Relay API + +proc post_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool +proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool + +# Store API + proc get_waku_v2_store_v1_messages(topics: seq[ContentTopic], pagingOptions: Option[StorePagingOptions]): StoreResponse diff --git a/waku/v2/node/jsonrpc/relay_api.nim b/waku/v2/node/jsonrpc/relay_api.nim new file mode 100644 index 0000000000..5295d8350b --- /dev/null +++ b/waku/v2/node/jsonrpc/relay_api.nim @@ -0,0 +1,58 @@ +import + json_rpc/rpcserver, + eth/[common, rlp, keys, p2p], + ../../waku_types, + ../wakunode2 + +proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = + const futTimeout = 5.seconds + + ## Relay API version 1 definitions + + rpcsrv.rpc("post_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool: + ## Subscribes a node to a list of PubSub topics + debug "post_waku_v2_relay_v1_subscriptions" + + proc topicHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + debug "WakuMessage received", msg=msg, topic=topic + # @TODO handle message + else: + debug "WakuMessage received but failed to decode", msg=msg, topic=topic + # @TODO handle message decode failure + + var failedTopics: seq[string] + + # Subscribe to all requested topics + for topic in topics: + # If any topic fails to subscribe, add to list of failedTopics + if not(await node.subscribe(topic, topicHandler).withTimeout(futTimeout)): + failedTopics.add(topic) + + if (failedTopics.len() == 0): + # Successfully subscribed to all requested topics + return true + else: + # Failed to subscribe to one or more topics + raise newException(ValueError, "Failed to subscribe to topics " & repr(failedTopics)) + + rpcsrv.rpc("delete_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool: + ## Unsubscribes a node from a list of PubSub topics + debug "delete_waku_v2_relay_v1_subscriptions" + + var failedTopics: seq[string] + + # Unsubscribe all handlers from requested topics + for topic in topics: + # If any topic fails to unsubscribe, add to list of failedTopics + if not(await node.unsubscribeAll(topic).withTimeout(futTimeout)): + failedTopics.add(topic) + + if (failedTopics.len() == 0): + # Successfully unsubscribed from all requested topics + return true + else: + # Failed to unsubscribe from one or more topics + raise newException(ValueError, "Failed to unsubscribe from topics " & repr(failedTopics)) + diff --git a/waku/v2/node/jsonrpc/store_api.nim b/waku/v2/node/jsonrpc/store_api.nim index 3e9e3501ab..f37bc4ecba 100644 --- a/waku/v2/node/jsonrpc/store_api.nim +++ b/waku/v2/node/jsonrpc/store_api.nim @@ -6,7 +6,7 @@ import ../wakunode2, ./jsonrpc_types, ./jsonrpc_utils -proc setupWakuJSONRPC*(node: WakuNode, rpcsrv: RpcServer) = +proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = const futTimeout = 5.seconds ## Store API version 1 definitions