Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions catkit_core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ else()
endif (NOT APPLE)
endif (WIN32)

# Add includes for cppzmq
find_package(cppzmq REQUIRED)
target_include_directories(catkit_core PUBLIC ${CPPZMQ_INCLUDE_DIR})

# Link Eigen
find_package(Eigen3 REQUIRED NO_MODULE)
target_link_libraries (catkit_core PUBLIC Eigen3::Eigen)
Expand Down
106 changes: 60 additions & 46 deletions catkit_core/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,81 +4,85 @@
#include "Timing.h"
#include "Finally.h"

#include <zmq_addon.hpp>

#include <algorithm>
#include <chrono>
#include <thread>
#include <iostream>
#include <string>
#include <mutex>
#include <zmq.h>

using namespace std;
using namespace zmq;

const int SOCKET_TIMEOUT = 60000; // milliseconds.

Client::Client(std::string host, int port)
: m_Host(host), m_Port(port)
{
m_Context = zmq_ctx_new();
}

Client::~Client()
{
while (!m_Sockets.empty())
{
zmq_close(m_Sockets.top());
m_Sockets.pop();
}

zmq_ctx_term(m_Context);
}

string Client::MakeRequest(const string &what, const string &request)
{
auto socket = GetSocket();

zmq::multipart_t request_msg;

request_msg.addstr(what);
request_msg.addstr(request);

request_msg.send(*socket);
// Send the request.
zmq_send(socket.get(), what.c_str(), what.size(), ZMQ_SNDMORE);
zmq_send(socket.get(), request.c_str(), request.size(), 0);

Timer timer;
int more = 1;
std::vector<std::string> response;

try
if (zmq_recv_multipart(socket.get(), std::back_inserter(response)) < 0)
{
zmq::multipart_t reply_msg;
auto res = zmq::recv_multipart(*socket, std::back_inserter(reply_msg));

if (!res.has_value())
if (zmq_errno() == EAGAIN)
{
LOG_ERROR("The server took too long to respond to our request.");
throw std::runtime_error("The server did not respond in time. Is it running?");
}

if (reply_msg.size() != 2)
else
{
LOG_ERROR("The server responded with " + std::to_string(reply_msg.size()) + " parts rather than 2.");
throw std::runtime_error("The server responded in a wrong format.");
LOG_ERROR("An error occurred while sending the request to the server: "s + zmq_strerror(zmq_errno()));
throw std::runtime_error("An error occurred while sending the request to the server.");
}
}

std::string reply_type = reply_msg.popstr();
std::string reply_data = reply_msg.popstr();
if (response.size() != 2)
{
LOG_ERROR("The server did not respond with the expected number of parts.");
throw std::runtime_error("The server responded in a wrong format.");
}

if (reply_type == "OK")
{
return reply_data;
}
else if (reply_type == "ERROR")
{
throw std::runtime_error(reply_data);
}
else
{
LOG_ERROR("The server responded with \"" + reply_type + "\" rather than OK or ERROR.");
throw std::runtime_error("The server responded in a wrong format.");
}
const std::string &reply_type(response[0]);
const std::string &reply_data(response[1]);

if (reply_type == "OK")
{
return reply_data;
}
catch (const zmq::error_t &e)
else if (reply_type == "ERROR")
{
LOG_ERROR(std::string("ZeroMQ error: ") + e.what());
throw;
throw std::runtime_error(reply_data);
}
else
{
LOG_ERROR("The server responded with \"" + reply_type + "\" rather than OK or ERROR.");
throw std::runtime_error("The server responded in a wrong format.");
}

return reply_data;
}

std::string Client::GetHost()
Expand All @@ -95,28 +99,38 @@ Client::socket_ptr Client::GetSocket()
{
std::scoped_lock<std::mutex> lock(m_Mutex);

zmq::socket_t *socket;
socket_t *socket;
if (m_Sockets.empty())
{
LOG_DEBUG("Creating new socket.");

socket = new zmq::socket_t(m_Context, ZMQ_REQ);
socket = (socket_t *) zmq_socket(m_Context, ZMQ_REQ);

int timeout = SOCKET_TIMEOUT;
int linger = 0;
int req_relaxed = 1;
int req_correlate = 1;

socket->set(zmq::sockopt::rcvtimeo, SOCKET_TIMEOUT);
socket->set(zmq::sockopt::linger, 0);
socket->set(zmq::sockopt::req_relaxed, 1);
socket->set(zmq::sockopt::req_correlate, 1);
zmq_setsockopt(socket, ZMQ_RCVTIMEO, &timeout, sizeof(int));
zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(int));
zmq_setsockopt(socket, ZMQ_REQ_RELAXED, &req_relaxed, sizeof(int));
zmq_setsockopt(socket, ZMQ_REQ_CORRELATE, &req_correlate, sizeof(int));

socket->connect("tcp://"s + m_Host + ":" + to_string(m_Port));
std::string endpoint = "tcp://"s + m_Host + ":" + to_string(m_Port);
if (zmq_connect(socket, endpoint.c_str()) == -1)
{
LOG_ERROR("Failed to connect to " + endpoint);
throw std::runtime_error("Failed to connect to " + endpoint);
}
}
else
{
socket = m_Sockets.top().release();
socket = m_Sockets.top();
m_Sockets.pop();
}

return socket_ptr(socket, [this](zmq::socket_t *ptr)
return socket_ptr(socket, [this](socket_t *ptr)
{
this->m_Sockets.emplace(ptr);
});
}
}
10 changes: 5 additions & 5 deletions catkit_core/Client.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#ifndef CLIENT_H
#define CLIENT_H

#include "Networking.h"

#include <string>
#include <mutex>
#include <functional>
#include <memory>
#include <stack>

#include <zmq.hpp>

class Client
{
public:
Expand All @@ -24,13 +24,13 @@ class Client
std::string m_Host;
int m_Port;

zmq::context_t m_Context;
void *m_Context;

typedef std::unique_ptr<zmq::socket_t, std::function<void(zmq::socket_t *)>> socket_ptr;
typedef std::unique_ptr<socket_t, std::function<void(socket_t *)>> socket_ptr;
socket_ptr GetSocket();

std::mutex m_Mutex;
std::stack<std::unique_ptr<zmq::socket_t>> m_Sockets;
std::stack<socket_t *> m_Sockets;
};

template<typename ProtoRequest>
Expand Down
38 changes: 23 additions & 15 deletions catkit_core/LogForwarder.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#include "LogForwarder.h"

#include "Networking.h"

#include <zmq.h>
#include <nlohmann/json.hpp>
#include <iostream>
#include <string>
#include <string>
#include <thread>

using namespace zmq;
using json = nlohmann::json;

LogForwarder::LogForwarder()
Expand Down Expand Up @@ -53,12 +55,15 @@ void LogForwarder::AddLogEntry(const LogEntry &entry)

void LogForwarder::MessageLoop()
{
context_t context;
socket_t socket(context, ZMQ_PUSH);
context_t *context = (context_t *) zmq_ctx_new();
socket_t *socket = (socket_t *) zmq_socket(context, ZMQ_PUSH);

int linger = 0;
int sndtimeo = 10;
zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger));
zmq_setsockopt(socket, ZMQ_SNDTIMEO, &sndtimeo, sizeof(sndtimeo));

socket.set(zmq::sockopt::linger, 0);
socket.set(zmq::sockopt::sndtimeo, 10);
socket.connect(m_Host);
zmq_connect(socket, m_Host.c_str());

std::string log_message;

Expand All @@ -81,18 +86,21 @@ void LogForwarder::MessageLoop()
}

// Construct and send message.
message_t message_zmq(log_message.size());
memcpy(message_zmq.data(), log_message.c_str(), log_message.size());

send_result_t res;
do
while (!m_ShutDown)
{
res = socket.send(message_zmq, zmq::send_flags::none);
if (zmq_send(socket, log_message.c_str(), log_message.size(), 0) < 0)
{
if (zmq_errno() == EAGAIN)
continue;

// There was an error while sending, but we cannot emit log messages here so ignoring it.
break;
}
}
while (!res.has_value() && !m_ShutDown);
}

socket.close();
zmq_close(socket);
zmq_ctx_destroy(context);
}

void LogForwarder::ShutDown()
Expand Down
2 changes: 0 additions & 2 deletions catkit_core/LogForwarder.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#ifndef LOGFORWARDER_H
#define LOGFORWARDER_H

#include <zmq.hpp>

#include <string>
#include <queue>
#include <mutex>
Expand Down
32 changes: 32 additions & 0 deletions catkit_core/Networking.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#include "Networking.h"

int zmq_recv_multipart(socket_t *socket, std::back_insert_iterator<std::vector<std::string>> inserter)
{
while (true)
{
zmq_msg_t msg;
zmq_msg_init(&msg);

if (zmq_msg_recv(&msg, socket, 0) < 0)
{
// Clean up state.
zmq_msg_close(&msg);

return -1;
}

// Insert into the inserter.
std::string part_str(static_cast<char*>(zmq_msg_data(&msg)), zmq_msg_size(&msg));
inserter = std::move(part_str);

bool more = zmq_msg_more(&msg);

zmq_msg_close(&msg);

if (!more)
break;
}

// All parts received successfully.
return 0;
}
19 changes: 19 additions & 0 deletions catkit_core/Networking.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#ifndef NETWORK_H
#define NETWORK_H

#include <zmq.h>
#include <vector>
#include <string>
#include <iterator>

struct socket_t
{
};

struct context_t
{
};

int zmq_recv_multipart(socket_t *socket, std::back_insert_iterator<std::vector<std::string>> inserter);

#endif // NETWORK_H
Loading
Loading