diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..6649c3b --- /dev/null +++ b/Makefile @@ -0,0 +1,3 @@ +default: + g++ main.cpp txradar.cpp util.cpp $(shell pkg-config --cflags --libs libczmq++ libbitcoin) -o txradar + diff --git a/README b/README new file mode 100644 index 0000000..c69b0c0 --- /dev/null +++ b/README @@ -0,0 +1,11 @@ +Make sure you have libbitcoin installed. + + $ make + $ ./txradar + +Check out define.hpp to change the ports and other settings. + +To see only txradar output use: + + $ ./txradar 2>&1 | grep txradar + diff --git a/define.hpp b/define.hpp new file mode 100644 index 0000000..6c4f698 --- /dev/null +++ b/define.hpp @@ -0,0 +1,14 @@ +#ifndef TXRADAR_DEFINE_HPP +#define TXRADAR_DEFINE_HPP + +#define ONLY_LOCALHOST_CONNECTIONS + +const size_t notify_port = 7678; + +// How many connections radar should try to maintain. +const size_t target_connections = 40; + +#define LOG_TXR "txradar" + +#endif + diff --git a/main.cpp b/main.cpp new file mode 100644 index 0000000..3fa07ab --- /dev/null +++ b/main.cpp @@ -0,0 +1,10 @@ +#include "txradar.hpp" + +int main() +{ + txradar radar; + radar.start(true, 1, target_connections); + pause(); + return 0; +} + diff --git a/txradar.cpp b/txradar.cpp new file mode 100644 index 0000000..974c919 --- /dev/null +++ b/txradar.cpp @@ -0,0 +1,158 @@ +#include "txradar.hpp" + +#include "util.hpp" + +using std::placeholders::_1; +using std::placeholders::_2; +using std::placeholders::_3; + +void log_nothing( + bc::log_level level, const std::string& domain, const std::string& body) +{ +} +void log_to_file(std::ofstream& file, + bc::log_level level, const std::string& domain, const std::string& body) +{ + if (body.empty()) + return; + file << level_repr(level); + if (!domain.empty()) + file << " [" << domain << "]"; + file << ": " << body << std::endl; +} + +txradar::txradar() + : engine_(device_()), auth_(ctx_), + hosts_(pool_), handshake_(pool_), network_(pool_), + p2p_(pool_, hosts_, handshake_, network_) +{ + BITCOIN_ASSERT(ctx_.self()); + BITCOIN_ASSERT(auth_.self()); +} +txradar::~txradar() +{ + pool_.stop(); + pool_.join(); +} + +void p2p_started(bc::network::protocol& p2p, const std::error_code& ec); + +// Start the p2p network. Is called repeatedly until connected. +void start_p2p(bc::network::protocol& p2p) +{ + // Keep trying to connect until successful. + p2p.start(std::bind(p2p_started, std::ref(p2p), _1)); +} + +// If there's an error then attempt to reconnect until successful. +void p2p_started(bc::network::protocol& p2p, const std::error_code& ec) +{ + if (ec) + { + bc::log_warning(LOG_TXR) << "Restarting connection..."; + start_p2p(p2p); + return; + } + // Success. Call finish callback to signal success. + bc::log_info(LOG_TXR) << "Radar started."; +} + +void txradar::start(bool display_output, size_t threads, size_t number_hosts) +{ +#ifdef ONLY_LOCALHOST_CONNECTIONS + auth_.allow("127.0.0.1"); +#endif + + pool_.spawn(threads); + + // Set connection counts. + p2p_.set_max_outbound(number_hosts); + // Notify us of new connections so we can subscribe to 'inv' packets. + p2p_.subscribe_channel( + std::bind(&txradar::connection_started, this, _1, _2)); + // Start connecting to p2p networks for broadcasting and monitor txs. + start_p2p(p2p_); +} + +void txradar::connection_started( + const std::error_code& ec, bc::network::channel_ptr node) +{ + if (ec) + { + bc::log_warning(LOG_TXR) + << "Couldn't start connection: " << ec.message(); + return; + } + bc::log_info(LOG_TXR) << "Connection established."; + node_id_type node_id = + engine_() % std::numeric_limits::max(); + // Subscribe to 'inv' packets. + node->subscribe_inventory( + std::bind(&txradar::inventory_received, this, + _1, _2, node, node_id)); + // Resubscribe to new nodes. + p2p_.subscribe_channel( + std::bind(&txradar::connection_started, this, _1, _2)); +} + +template +void notify_transaction( + Context& ctx, NodeID node_id, const bc::hash_digest& tx_hash) +{ + static czmqpp::socket socket(ctx, ZMQ_PUB); + static bool is_initialized = false; + if (!is_initialized) + { + BITCOIN_ASSERT(socket.self()); + int bind_rc = socket.bind(listen_transport(notify_port)); + BITCOIN_ASSERT(bind_rc != -1); + is_initialized = true; + bc::log_info(LOG_TXR) << "Initialized ZMQ socket."; + } + bc::log_info(LOG_TXR) << "Sending (" << node_id << ", " + << bc::encode_hash(tx_hash) << ")"; + // Create a message. + czmqpp::message msg; + // node_id + const auto data_id = bc::to_little_endian(node_id); + BITCOIN_ASSERT(data_id.size() == 4); + msg.append(bc::to_data_chunk(data_id)); + // tx_hash + const czmqpp::data_chunk data_hash(tx_hash.begin(), tx_hash.end()); + msg.append(data_hash); + // Send it. + bool rc = msg.send(socket); + BITCOIN_ASSERT(rc); +} + +void txradar::inventory_received( + const std::error_code& ec, const bc::inventory_type& packet, + bc::network::channel_ptr node, node_id_type node_id) +{ + if (ec) + { + bc::log_error(LOG_TXR) << "inventory: " << ec.message(); + return; + } + for (const bc::inventory_vector_type& ivec: packet.inventories) + { + if (ivec.type == bc::inventory_type_id::transaction) + { + notify_transaction(ctx_, node_id, ivec.hash); + } + else if (ivec.type == bc::inventory_type_id::block); + // Do nothing. + else + bc::log_warning(LOG_TXR) << "Ignoring unknown inventory type"; + } + // Resubscribe to 'inv' packets. + node->subscribe_inventory( + std::bind(&txradar::inventory_received, this, + _1, _2, node, node_id)); +} + +size_t txradar::total_connections() const +{ + return p2p_.total_connections(); +} + diff --git a/txradar.hpp b/txradar.hpp new file mode 100644 index 0000000..514e9f8 --- /dev/null +++ b/txradar.hpp @@ -0,0 +1,48 @@ +#ifndef TXRADAR_TXRADAR_HPP +#define TXRADAR_TXRADAR_HPP + +#include +#include +#include "define.hpp" + +class txradar +{ +public: + txradar(); + ~txradar(); + + void start(bool display_output, size_t threads, size_t number_hosts); + + size_t total_connections() const; + +private: + typedef uint32_t node_id_type; + + void connection_started( + const std::error_code& ec, bc::network::channel_ptr node); + void inventory_received( + const std::error_code& ec, const bc::inventory_type& packet, + bc::network::channel_ptr node, node_id_type node_id); + + // Used for generating node IDs + std::random_device device_; + std::default_random_engine engine_; + + // ZeroMQ stuff + czmqpp::context ctx_; + czmqpp::authenticator auth_; + + // For logging + std::ofstream outfile_, errfile_; + + // Threadpool + bc::threadpool pool_; + // Bitcoin network components. + bc::network::hosts hosts_; + bc::network::handshake handshake_; + bc::network::network network_; + bc::network::protocol p2p_; +}; + +#endif + diff --git a/util.cpp b/util.cpp new file mode 100644 index 0000000..ebf689c --- /dev/null +++ b/util.cpp @@ -0,0 +1,11 @@ +#include "util.hpp" + +#include + +const std::string listen_transport(const size_t port) +{ + std::string transport = "tcp://*:"; + transport += boost::lexical_cast(port); + return transport; +} + diff --git a/util.hpp b/util.hpp new file mode 100644 index 0000000..0e7e753 --- /dev/null +++ b/util.hpp @@ -0,0 +1,9 @@ +#ifndef TXRADAR_UTIL_HPP +#define TXRADAR_UTIL_HPP + +#include + +const std::string listen_transport(const size_t port); + +#endif +