diff --git a/catkit_core/CMakeLists.txt b/catkit_core/CMakeLists.txt index c70e6351d..241446cb0 100644 --- a/catkit_core/CMakeLists.txt +++ b/catkit_core/CMakeLists.txt @@ -37,10 +37,6 @@ else() endif (NOT APPLE) endif (WIN32) -# Add includes for cppzmq -find_package(cppzmq REQUIRED) -target_include_directories(catkit_core PUBLIC ${CPPZMQ_INCLUDE_DIR}) - # Link Eigen find_package(Eigen3 REQUIRED NO_MODULE) target_link_libraries (catkit_core PUBLIC Eigen3::Eigen) diff --git a/catkit_core/Client.cpp b/catkit_core/Client.cpp index b9445b2e4..7034c3faf 100644 --- a/catkit_core/Client.cpp +++ b/catkit_core/Client.cpp @@ -4,81 +4,85 @@ #include "Timing.h" #include "Finally.h" -#include - #include #include #include #include #include #include +#include using namespace std; -using namespace zmq; const int SOCKET_TIMEOUT = 60000; // milliseconds. Client::Client(std::string host, int port) : m_Host(host), m_Port(port) { + m_Context = zmq_ctx_new(); } Client::~Client() { + while (!m_Sockets.empty()) + { + zmq_close(m_Sockets.top()); + m_Sockets.pop(); + } + + zmq_ctx_term(m_Context); } string Client::MakeRequest(const string &what, const string &request) { auto socket = GetSocket(); - zmq::multipart_t request_msg; - - request_msg.addstr(what); - request_msg.addstr(request); - - request_msg.send(*socket); + // Send the request. + zmq_send(socket.get(), what.c_str(), what.size(), ZMQ_SNDMORE); + zmq_send(socket.get(), request.c_str(), request.size(), 0); Timer timer; + int more = 1; + std::vector response; - try + if (zmq_recv_multipart(socket.get(), std::back_inserter(response)) < 0) { - zmq::multipart_t reply_msg; - auto res = zmq::recv_multipart(*socket, std::back_inserter(reply_msg)); - - if (!res.has_value()) + if (zmq_errno() == EAGAIN) { LOG_ERROR("The server took too long to respond to our request."); throw std::runtime_error("The server did not respond in time. Is it running?"); } - - if (reply_msg.size() != 2) + else { - LOG_ERROR("The server responded with " + std::to_string(reply_msg.size()) + " parts rather than 2."); - throw std::runtime_error("The server responded in a wrong format."); + LOG_ERROR("An error occurred while sending the request to the server: "s + zmq_strerror(zmq_errno())); + throw std::runtime_error("An error occurred while sending the request to the server."); } + } - std::string reply_type = reply_msg.popstr(); - std::string reply_data = reply_msg.popstr(); + if (response.size() != 2) + { + LOG_ERROR("The server did not respond with the expected number of parts."); + throw std::runtime_error("The server responded in a wrong format."); + } - if (reply_type == "OK") - { - return reply_data; - } - else if (reply_type == "ERROR") - { - throw std::runtime_error(reply_data); - } - else - { - LOG_ERROR("The server responded with \"" + reply_type + "\" rather than OK or ERROR."); - throw std::runtime_error("The server responded in a wrong format."); - } + const std::string &reply_type(response[0]); + const std::string &reply_data(response[1]); + + if (reply_type == "OK") + { + return reply_data; } - catch (const zmq::error_t &e) + else if (reply_type == "ERROR") { - LOG_ERROR(std::string("ZeroMQ error: ") + e.what()); - throw; + throw std::runtime_error(reply_data); } + else + { + LOG_ERROR("The server responded with \"" + reply_type + "\" rather than OK or ERROR."); + throw std::runtime_error("The server responded in a wrong format."); + } + + return reply_data; } std::string Client::GetHost() @@ -95,28 +99,38 @@ Client::socket_ptr Client::GetSocket() { std::scoped_lock lock(m_Mutex); - zmq::socket_t *socket; + socket_t *socket; if (m_Sockets.empty()) { LOG_DEBUG("Creating new socket."); - socket = new zmq::socket_t(m_Context, ZMQ_REQ); + socket = (socket_t *) zmq_socket(m_Context, ZMQ_REQ); + + int timeout = SOCKET_TIMEOUT; + int linger = 0; + int req_relaxed = 1; + int req_correlate = 1; - socket->set(zmq::sockopt::rcvtimeo, SOCKET_TIMEOUT); - socket->set(zmq::sockopt::linger, 0); - socket->set(zmq::sockopt::req_relaxed, 1); - socket->set(zmq::sockopt::req_correlate, 1); + zmq_setsockopt(socket, ZMQ_RCVTIMEO, &timeout, sizeof(int)); + zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(int)); + zmq_setsockopt(socket, ZMQ_REQ_RELAXED, &req_relaxed, sizeof(int)); + zmq_setsockopt(socket, ZMQ_REQ_CORRELATE, &req_correlate, sizeof(int)); - socket->connect("tcp://"s + m_Host + ":" + to_string(m_Port)); + std::string endpoint = "tcp://"s + m_Host + ":" + to_string(m_Port); + if (zmq_connect(socket, endpoint.c_str()) == -1) + { + LOG_ERROR("Failed to connect to " + endpoint); + throw std::runtime_error("Failed to connect to " + endpoint); + } } else { - socket = m_Sockets.top().release(); + socket = m_Sockets.top(); m_Sockets.pop(); } - return socket_ptr(socket, [this](zmq::socket_t *ptr) + return socket_ptr(socket, [this](socket_t *ptr) { this->m_Sockets.emplace(ptr); }); -} \ No newline at end of file +} diff --git a/catkit_core/Client.h b/catkit_core/Client.h index 25535eede..2bbb84e9e 100644 --- a/catkit_core/Client.h +++ b/catkit_core/Client.h @@ -1,14 +1,14 @@ #ifndef CLIENT_H #define CLIENT_H +#include "Networking.h" + #include #include #include #include #include -#include - class Client { public: @@ -24,13 +24,13 @@ class Client std::string m_Host; int m_Port; - zmq::context_t m_Context; + void *m_Context; - typedef std::unique_ptr> socket_ptr; + typedef std::unique_ptr> socket_ptr; socket_ptr GetSocket(); std::mutex m_Mutex; - std::stack> m_Sockets; + std::stack m_Sockets; }; template diff --git a/catkit_core/LogForwarder.cpp b/catkit_core/LogForwarder.cpp index d8a0b1928..a263613d9 100644 --- a/catkit_core/LogForwarder.cpp +++ b/catkit_core/LogForwarder.cpp @@ -1,11 +1,13 @@ #include "LogForwarder.h" +#include "Networking.h" + +#include #include #include -#include +#include #include -using namespace zmq; using json = nlohmann::json; LogForwarder::LogForwarder() @@ -53,12 +55,15 @@ void LogForwarder::AddLogEntry(const LogEntry &entry) void LogForwarder::MessageLoop() { - context_t context; - socket_t socket(context, ZMQ_PUSH); + context_t *context = (context_t *) zmq_ctx_new(); + socket_t *socket = (socket_t *) zmq_socket(context, ZMQ_PUSH); + + int linger = 0; + int sndtimeo = 10; + zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)); + zmq_setsockopt(socket, ZMQ_SNDTIMEO, &sndtimeo, sizeof(sndtimeo)); - socket.set(zmq::sockopt::linger, 0); - socket.set(zmq::sockopt::sndtimeo, 10); - socket.connect(m_Host); + zmq_connect(socket, m_Host.c_str()); std::string log_message; @@ -81,18 +86,21 @@ void LogForwarder::MessageLoop() } // Construct and send message. - message_t message_zmq(log_message.size()); - memcpy(message_zmq.data(), log_message.c_str(), log_message.size()); - - send_result_t res; - do + while (!m_ShutDown) { - res = socket.send(message_zmq, zmq::send_flags::none); + if (zmq_send(socket, log_message.c_str(), log_message.size(), 0) < 0) + { + if (zmq_errno() == EAGAIN) + continue; + + // There was an error while sending, but we cannot emit log messages here so ignoring it. + break; + } } - while (!res.has_value() && !m_ShutDown); } - socket.close(); + zmq_close(socket); + zmq_ctx_destroy(context); } void LogForwarder::ShutDown() diff --git a/catkit_core/LogForwarder.h b/catkit_core/LogForwarder.h index 2fddeacda..1c8b3fe8d 100644 --- a/catkit_core/LogForwarder.h +++ b/catkit_core/LogForwarder.h @@ -1,8 +1,6 @@ #ifndef LOGFORWARDER_H #define LOGFORWARDER_H -#include - #include #include #include diff --git a/catkit_core/Networking.cpp b/catkit_core/Networking.cpp new file mode 100644 index 000000000..0d1af94b5 --- /dev/null +++ b/catkit_core/Networking.cpp @@ -0,0 +1,32 @@ +#include "Networking.h" + +int zmq_recv_multipart(socket_t *socket, std::back_insert_iterator> inserter) +{ + while (true) + { + zmq_msg_t msg; + zmq_msg_init(&msg); + + if (zmq_msg_recv(&msg, socket, 0) < 0) + { + // Clean up state. + zmq_msg_close(&msg); + + return -1; + } + + // Insert into the inserter. + std::string part_str(static_cast(zmq_msg_data(&msg)), zmq_msg_size(&msg)); + inserter = std::move(part_str); + + bool more = zmq_msg_more(&msg); + + zmq_msg_close(&msg); + + if (!more) + break; + } + + // All parts received successfully. + return 0; +} diff --git a/catkit_core/Networking.h b/catkit_core/Networking.h new file mode 100644 index 000000000..c004bc162 --- /dev/null +++ b/catkit_core/Networking.h @@ -0,0 +1,19 @@ +#ifndef NETWORK_H +#define NETWORK_H + +#include +#include +#include +#include + +struct socket_t +{ +}; + +struct context_t +{ +}; + +int zmq_recv_multipart(socket_t *socket, std::back_insert_iterator> inserter); + +#endif // NETWORK_H diff --git a/catkit_core/Server.cpp b/catkit_core/Server.cpp index 323ef85d2..4d187261c 100644 --- a/catkit_core/Server.cpp +++ b/catkit_core/Server.cpp @@ -4,17 +4,17 @@ #include "Timing.h" #include "Finally.h" #include "Util.h" +#include "Networking.h" -#include - +#include #include #include #include #include #include +#include using namespace std; -using namespace zmq; Server::Server(int port) : m_Port(port), m_IsRunning(false), m_ShouldShutDown(false) @@ -54,16 +54,23 @@ void Server::RunInternal() { LOG_INFO("Starting server on port "s + to_string(m_Port) + "."); - zmq::context_t context; + context_t *context = (context_t *) zmq_ctx_new(); + + socket_t *socket = (socket_t *) zmq_socket(context, ZMQ_ROUTER); + + if (zmq_bind(socket, ("tcp://*:"s + to_string(m_Port)).c_str()) == -1) + throw runtime_error("Failed to bind socket. Error: "s + zmq_strerror(zmq_errno())); + + int recv_timeout = 20; + int linger = 0; - zmq::socket_t socket(context, ZMQ_ROUTER); - socket.bind("tcp://*:"s + std::to_string(m_Port)); - socket.set(zmq::sockopt::rcvtimeo, 20); - socket.set(zmq::sockopt::linger, 0); + zmq_setsockopt(socket, ZMQ_RCVTIMEO, &recv_timeout, sizeof(int)); + zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(int)); - Finally finally([this, &socket]() + Finally finally([this, &socket, &context]() { - socket.close(); + zmq_close(socket); + zmq_ctx_destroy(context); this->m_ShouldShutDown = true; this->m_IsRunning = false; @@ -73,12 +80,13 @@ void Server::RunInternal() while (!m_ShouldShutDown) { - zmq::multipart_t request_msg; - auto res = zmq::recv_multipart(socket, std::back_inserter(request_msg)); + std::vector request_msg; - if (!res.has_value()) + if (zmq_recv_multipart(socket, std::back_inserter(request_msg)) < 0) { - // Server has received no message. + if (zmq_errno() != EAGAIN) + LOG_ERROR("The server has received an error while receiving a message: "s + zmq_strerror(zmq_errno())); + continue; } @@ -89,11 +97,11 @@ void Server::RunInternal() continue; } - std::string client_identity = request_msg.popstr(); - std::string request_id = request_msg.popstr(); - std::string empty = request_msg.popstr(); - std::string request_type = request_msg.popstr(); - std::string request_data = request_msg.popstr(); + std::string &client_identity = request_msg[0]; + std::string &request_id = request_msg[1]; + std::string &empty = request_msg[2]; + std::string &request_type = request_msg[3]; + std::string &request_data = request_msg[4]; LOG_DEBUG("Request received: "s + request_type); @@ -126,15 +134,11 @@ void Server::RunInternal() } // Send reply to the client. - multipart_t msg; - - msg.addstr(client_identity); - msg.addstr(request_id); - msg.addstr(""); - msg.addstr(reply_type); - msg.addstr(reply_data); - - msg.send(socket); + zmq_send(socket, client_identity.c_str(), client_identity.size(), ZMQ_SNDMORE); + zmq_send(socket, request_id.c_str(), request_id.size(), ZMQ_SNDMORE); + zmq_send(socket, "", 0, ZMQ_SNDMORE); + zmq_send(socket, reply_type.c_str(), reply_type.size(), ZMQ_SNDMORE); + zmq_send(socket, reply_data.c_str(), reply_data.size(), 0); LOG_DEBUG("Sent reply: "s + reply_type); } diff --git a/catkit_core/Service.cpp b/catkit_core/Service.cpp index 7a935bc64..21c62a304 100644 --- a/catkit_core/Service.cpp +++ b/catkit_core/Service.cpp @@ -12,13 +12,11 @@ #include #include #include -#include #include #include #include using namespace std; -using namespace zmq; using json = nlohmann::json; using namespace std::string_literals; diff --git a/catkit_core/Service.h b/catkit_core/Service.h index 8c878f172..a75cb8db3 100644 --- a/catkit_core/Service.h +++ b/catkit_core/Service.h @@ -6,7 +6,6 @@ #include #include -#include #include #include "Property.h" diff --git a/catkit_core/ServiceProxy.h b/catkit_core/ServiceProxy.h index 883b69a96..ac8b2b6f5 100644 --- a/catkit_core/ServiceProxy.h +++ b/catkit_core/ServiceProxy.h @@ -6,7 +6,6 @@ #include "ServiceState.h" #include "Client.h" -#include #include #include diff --git a/catkit_core/TestbedProxy.cpp b/catkit_core/TestbedProxy.cpp index 596435f01..a77c86fcc 100644 --- a/catkit_core/TestbedProxy.cpp +++ b/catkit_core/TestbedProxy.cpp @@ -8,7 +8,6 @@ #include using namespace std; -using namespace zmq; using json = nlohmann::json; using namespace std::string_literals; diff --git a/catkit_core/TestbedProxy.h b/catkit_core/TestbedProxy.h index d54be6fb8..5fa414548 100644 --- a/catkit_core/TestbedProxy.h +++ b/catkit_core/TestbedProxy.h @@ -9,7 +9,6 @@ #include "ServiceState.h" #include "Util.h" -#include #include #include diff --git a/catkit_core/Tracing.cpp b/catkit_core/Tracing.cpp index 31c9903c1..2bb5829bf 100644 --- a/catkit_core/Tracing.cpp +++ b/catkit_core/Tracing.cpp @@ -3,10 +3,9 @@ #include "Timing.h" #include "Util.h" #include "Log.h" +#include "Networking.h" #include "tracing.pb.h" -#include - using namespace std; TracingProxy tracing_proxy; @@ -161,13 +160,16 @@ struct BuildProtoEvent void TracingProxy::MessageLoop() { - zmq::context_t context; - zmq::socket_t socket(context, ZMQ_PUSH); + context_t *context = (context_t *) zmq_ctx_new(); + socket_t *socket = (socket_t *) zmq_socket(context, ZMQ_PUSH); + + int linger = 0; + int sndtimeo = 10; - socket.set(zmq::sockopt::linger, 0); - socket.set(zmq::sockopt::sndtimeo, 10); + zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)); + zmq_setsockopt(socket, ZMQ_SNDTIMEO, &sndtimeo, sizeof(sndtimeo)); - socket.connect("tcp://"s + m_Host + ":" + to_string(m_Port)); + zmq_connect(socket, ("tcp://"s + m_Host + ":" + to_string(m_Port)).c_str()); TraceEvent event; @@ -192,20 +194,22 @@ void TracingProxy::MessageLoop() // Convert the TraceEvent to a ProtoBuf serialized string. string message = std::visit(BuildProtoEvent{}, event); - // Construct message. - zmq::message_t message_zmq(message.size()); - memcpy(message_zmq.data(), message.c_str(), message.size()); - - // Send message to socket. - zmq::send_result_t res; - do + // Send message. + while (!m_ShutDown) { - res = socket.send(message_zmq, zmq::send_flags::none); + if (zmq_send(socket, message.c_str(), message.size(), 0) < 0) + { + if (zmq_errno() == EAGAIN) + continue; + + LOG_ERROR(std::string("Error sending message to tracer: ") + zmq_strerror(zmq_errno())); + break; + } } - while (!res.has_value() && m_ShutDown); } - socket.close(); + zmq_close(socket); + zmq_ctx_destroy(context); } void TracingProxy::SetProcessName(string process_name) diff --git a/environment.yml b/environment.yml index 96395ab17..ec81054be 100644 --- a/environment.yml +++ b/environment.yml @@ -27,7 +27,6 @@ dependencies: - protobuf==3.20.3 - libprotobuf==3.20.3 - conda-forge::hcipy - - conda-forge::cppzmq==4.8.1 - conda-forge::pybind11==2.13.6 - conda-forge::eigen==3.4.0 - conda-forge::nlohmann_json==3.9.1