Skip to content

Commit

Permalink
new txradar code.
Browse files Browse the repository at this point in the history
  • Loading branch information
genjix committed Jan 12, 2015
0 parents commit b87d212
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 0 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
default:
g++ main.cpp txradar.cpp util.cpp $(shell pkg-config --cflags --libs libczmq++ libbitcoin) -o txradar

11 changes: 11 additions & 0 deletions README
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Make sure you have libbitcoin installed.

$ make
$ ./txradar

Check out define.hpp to change the ports and other settings.

To see only txradar output use:

$ ./txradar 2>&1 | grep txradar

14 changes: 14 additions & 0 deletions define.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#ifndef TXRADAR_DEFINE_HPP
#define TXRADAR_DEFINE_HPP

#define ONLY_LOCALHOST_CONNECTIONS

const size_t notify_port = 7678;

// How many connections radar should try to maintain.
const size_t target_connections = 40;

#define LOG_TXR "txradar"

#endif

10 changes: 10 additions & 0 deletions main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#include "txradar.hpp"

int main()
{
txradar radar;
radar.start(true, 1, target_connections);
pause();
return 0;
}

158 changes: 158 additions & 0 deletions txradar.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
#include "txradar.hpp"

#include "util.hpp"

using std::placeholders::_1;
using std::placeholders::_2;
using std::placeholders::_3;

void log_nothing(
bc::log_level level, const std::string& domain, const std::string& body)
{
}
void log_to_file(std::ofstream& file,
bc::log_level level, const std::string& domain, const std::string& body)
{
if (body.empty())
return;
file << level_repr(level);
if (!domain.empty())
file << " [" << domain << "]";
file << ": " << body << std::endl;
}

txradar::txradar()
: engine_(device_()), auth_(ctx_),
hosts_(pool_), handshake_(pool_), network_(pool_),
p2p_(pool_, hosts_, handshake_, network_)
{
BITCOIN_ASSERT(ctx_.self());
BITCOIN_ASSERT(auth_.self());
}
txradar::~txradar()
{
pool_.stop();
pool_.join();
}

void p2p_started(bc::network::protocol& p2p, const std::error_code& ec);

// Start the p2p network. Is called repeatedly until connected.
void start_p2p(bc::network::protocol& p2p)
{
// Keep trying to connect until successful.
p2p.start(std::bind(p2p_started, std::ref(p2p), _1));
}

// If there's an error then attempt to reconnect until successful.
void p2p_started(bc::network::protocol& p2p, const std::error_code& ec)
{
if (ec)
{
bc::log_warning(LOG_TXR) << "Restarting connection...";
start_p2p(p2p);
return;
}
// Success. Call finish callback to signal success.
bc::log_info(LOG_TXR) << "Radar started.";
}

void txradar::start(bool display_output, size_t threads, size_t number_hosts)
{
#ifdef ONLY_LOCALHOST_CONNECTIONS
auth_.allow("127.0.0.1");
#endif

pool_.spawn(threads);

// Set connection counts.
p2p_.set_max_outbound(number_hosts);
// Notify us of new connections so we can subscribe to 'inv' packets.
p2p_.subscribe_channel(
std::bind(&txradar::connection_started, this, _1, _2));
// Start connecting to p2p networks for broadcasting and monitor txs.
start_p2p(p2p_);
}

void txradar::connection_started(
const std::error_code& ec, bc::network::channel_ptr node)
{
if (ec)
{
bc::log_warning(LOG_TXR)
<< "Couldn't start connection: " << ec.message();
return;
}
bc::log_info(LOG_TXR) << "Connection established.";
node_id_type node_id =
engine_() % std::numeric_limits<node_id_type>::max();
// Subscribe to 'inv' packets.
node->subscribe_inventory(
std::bind(&txradar::inventory_received, this,
_1, _2, node, node_id));
// Resubscribe to new nodes.
p2p_.subscribe_channel(
std::bind(&txradar::connection_started, this, _1, _2));
}

template <typename Context, typename NodeID>
void notify_transaction(
Context& ctx, NodeID node_id, const bc::hash_digest& tx_hash)
{
static czmqpp::socket socket(ctx, ZMQ_PUB);
static bool is_initialized = false;
if (!is_initialized)
{
BITCOIN_ASSERT(socket.self());
int bind_rc = socket.bind(listen_transport(notify_port));
BITCOIN_ASSERT(bind_rc != -1);
is_initialized = true;
bc::log_info(LOG_TXR) << "Initialized ZMQ socket.";
}
bc::log_info(LOG_TXR) << "Sending (" << node_id << ", "
<< bc::encode_hash(tx_hash) << ")";
// Create a message.
czmqpp::message msg;
// node_id
const auto data_id = bc::to_little_endian(node_id);
BITCOIN_ASSERT(data_id.size() == 4);
msg.append(bc::to_data_chunk(data_id));
// tx_hash
const czmqpp::data_chunk data_hash(tx_hash.begin(), tx_hash.end());
msg.append(data_hash);
// Send it.
bool rc = msg.send(socket);
BITCOIN_ASSERT(rc);
}

void txradar::inventory_received(
const std::error_code& ec, const bc::inventory_type& packet,
bc::network::channel_ptr node, node_id_type node_id)
{
if (ec)
{
bc::log_error(LOG_TXR) << "inventory: " << ec.message();
return;
}
for (const bc::inventory_vector_type& ivec: packet.inventories)
{
if (ivec.type == bc::inventory_type_id::transaction)
{
notify_transaction(ctx_, node_id, ivec.hash);
}
else if (ivec.type == bc::inventory_type_id::block);
// Do nothing.
else
bc::log_warning(LOG_TXR) << "Ignoring unknown inventory type";
}
// Resubscribe to 'inv' packets.
node->subscribe_inventory(
std::bind(&txradar::inventory_received, this,
_1, _2, node, node_id));
}

size_t txradar::total_connections() const
{
return p2p_.total_connections();
}

48 changes: 48 additions & 0 deletions txradar.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#ifndef TXRADAR_TXRADAR_HPP
#define TXRADAR_TXRADAR_HPP

#include <czmq++/czmqpp.hpp>
#include <bitcoin/bitcoin.hpp>
#include "define.hpp"

class txradar
{
public:
txradar();
~txradar();

void start(bool display_output, size_t threads, size_t number_hosts);

size_t total_connections() const;

private:
typedef uint32_t node_id_type;

void connection_started(
const std::error_code& ec, bc::network::channel_ptr node);
void inventory_received(
const std::error_code& ec, const bc::inventory_type& packet,
bc::network::channel_ptr node, node_id_type node_id);

// Used for generating node IDs
std::random_device device_;
std::default_random_engine engine_;

// ZeroMQ stuff
czmqpp::context ctx_;
czmqpp::authenticator auth_;

// For logging
std::ofstream outfile_, errfile_;

// Threadpool
bc::threadpool pool_;
// Bitcoin network components.
bc::network::hosts hosts_;
bc::network::handshake handshake_;
bc::network::network network_;
bc::network::protocol p2p_;
};

#endif

11 changes: 11 additions & 0 deletions util.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#include "util.hpp"

#include <boost/lexical_cast.hpp>

const std::string listen_transport(const size_t port)
{
std::string transport = "tcp://*:";
transport += boost::lexical_cast<std::string>(port);
return transport;
}

9 changes: 9 additions & 0 deletions util.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#ifndef TXRADAR_UTIL_HPP
#define TXRADAR_UTIL_HPP

#include <string>

const std::string listen_transport(const size_t port);

#endif

0 comments on commit b87d212

Please sign in to comment.