Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

## PATCH Bug fixes/Technical Debt/Documentation
1. [984] - Fixes {server_default} from showing up in path.
2. [989] - This creates HTTP communicator and associated classes in the common library

# v2024.6.17.10.40

Expand Down Expand Up @@ -50,6 +51,8 @@
16. [981] - Fixes html injection that can occur from user name when displaying owner in schema dlg box.
17. [983] - Fixes google analytics by adding nonce which was broken.

12. [977] - Created HTTPCommunicator, StringMessage, HTTPCredential, HTTPSocket, and unit test classes.

# v2023.10.23.15.50

## MINOR Feature
Expand Down
20 changes: 18 additions & 2 deletions common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,26 @@ if( BUILD_COMMON )
"source/sockets/*.cpp")
if(BUILD_SHARED_LIBS)
add_library( common SHARED ${Sources})
target_link_libraries( common PUBLIC ${DATAFED_BOOST_DATE_TIME_LIBRARY_PATH} protobuf::libprotobuf libzmq datafed-protobuf)
target_link_libraries( common PUBLIC
${DATAFED_BOOST_DATE_TIME_LIBRARY_PATH}
protobuf::libprotobuf
libzmq
datafed-protobuf
${DATAFED_CURL_LIBRARIES}
${OPENSSL_SSL_LIBRARY}
curl)

else()
add_library( common STATIC ${Sources})
target_link_libraries( common PUBLIC ${DATAFED_BOOST_DATE_TIME_LIBRARY_PATH} protobuf::libprotobuf libzmq-static datafed-protobuf)
target_link_libraries( common PUBLIC
${DATAFED_BOOST_DATE_TIME_LIBRARY_PATH}
protobuf::libprotobuf
libzmq-static
datafed-protobuf
${DATAFED_CURL_LIBRARIES}
${OPENSSL_SSL_LIBRARY}
curl
)
endif()

set_target_properties(common PROPERTIES POSITION_INDEPENDENT_CODE ON SOVERSION ${DATAFED_COMMON_LIB_MAJOR} VERSION ${DATAFED_COMMON_LIB_MAJOR}.${DATAFED_COMMON_LIB_MINOR}.${DATAFED_COMMON_LIB_PATCH} )
Expand Down
2 changes: 1 addition & 1 deletion common/include/common/IMessage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ enum class MessageType { GOOGLE_PROTOCOL_BUFFER, STRING };
**/
enum class MessageState { REQUEST, RESPONSE };

enum class MessageAttribute { ID, KEY, STATE, CORRELATION_ID };
enum class MessageAttribute { ID, KEY, STATE, CORRELATION_ID, ENDPOINT, VERB };

inline const std::string toString(const MessageAttribute attribute) {
if (attribute == MessageAttribute::ID) {
Expand Down
1 change: 0 additions & 1 deletion common/include/common/Util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ void hexDump(const char *a_buffer, const char *a_buffer_end,
std::string escapeCSV(const std::string &a_value);
std::string escapeJSON(const std::string &a_value);
bool to_uint32(const char *a_str, uint32_t &a_out);

// std::vector<std::string> smartTokenize( const std::string & a_text, const
// std::string & a_delim );

Expand Down
8 changes: 8 additions & 0 deletions common/source/CommunicatorFactory.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@

// Local private includes
#include "communicators/HTTPCommunicator.hpp"
#include "communicators/ZeroMQCommunicator.hpp"
#include "communicators/ZeroMQCommunicatorSecure.hpp"
#include "sockets/HTTPSocket.hpp"
#include "sockets/ZeroMQSocket.hpp"

// Local public includes
Expand All @@ -28,6 +30,12 @@ std::unique_ptr<ICommunicator> CommunicatorFactory::create(
m_log_context));
}
}
// FLAG Added due to suspected reason for memory issue
else if (socket_options.protocol_type == ProtocolType::HTTP) {
return std::unique_ptr<ICommunicator>(
new HTTPCommunicator(socket_options, credentials, timeout_on_receive,
timeout_on_poll, m_log_context));
}
return std::unique_ptr<ICommunicator>();
}

Expand Down
6 changes: 6 additions & 0 deletions common/source/CredentialFactory.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

// Local private includes
#include "credentials/HTTPCredentials.hpp"
#include "credentials/ZeroMQSocketCredentials.hpp"

// Local public includes
Expand All @@ -17,6 +18,11 @@ std::unique_ptr<ICredentials> CredentialFactory::create(
if (protocol_type == ProtocolType::ZQTP) {
return std::unique_ptr<ICredentials>(new ZeroMQSocketCredentials(options));
}

else if (protocol_type == ProtocolType::HTTP) {
return std::unique_ptr<ICredentials>(new HTTPCredentials(options));
}

return std::unique_ptr<ICredentials>();
}

Expand Down
21 changes: 19 additions & 2 deletions common/source/MessageFactory.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@

// Local private includes
#include "messages/GoogleProtoMessage.hpp"
#include "messages/StringMessage.hpp"

// Local public includes
#include "common/IMessage.hpp"
#include "common/MessageFactory.hpp"
#include "common/TraceException.hpp"

// Standard includes
#include <memory>

Expand All @@ -17,20 +17,24 @@ MessageFactory::create(const MessageType msg_type) const {

if (msg_type == MessageType::GOOGLE_PROTOCOL_BUFFER) {
return std::unique_ptr<IMessage>(new GoogleProtoMessage());
} else if (msg_type == MessageType::STRING) {
return std::unique_ptr<IMessage>(new StringMessage());
}
EXCEPT(1, "Unsupported MessageType specified in MessageFactory.");
}

std::unique_ptr<IMessage>
MessageFactory::createResponseEnvelope(const IMessage &msg) const {

std::cout << 1 << std::endl;
if (msg.type() == MessageType::GOOGLE_PROTOCOL_BUFFER) {
std::cout << 2 << std::endl;
auto new_msg = std::unique_ptr<IMessage>(new GoogleProtoMessage());
new_msg->setRoutes(msg.getRoutes());
new_msg->set(MessageAttribute::STATE, MessageState::RESPONSE);
new_msg->set(
MessageAttribute::CORRELATION_ID,
std::get<std::string>(msg.get(MessageAttribute::CORRELATION_ID)));
std::cout << 3 << std::endl;
// The context is needed so when the response is sent the client knows what
// request it is associated with it
uint16_t context = 0;
Expand All @@ -46,6 +50,19 @@ MessageFactory::createResponseEnvelope(const IMessage &msg) const {
}
new_msg->set(constants::message::google::CONTEXT, context);
return new_msg;
} else if (msg.type() == MessageType::STRING) {
std::cout << 4 << std::endl;
auto new_msg = std::unique_ptr<IMessage>(new StringMessage());
std::cout << 5 << std::endl;
new_msg->setRoutes(msg.getRoutes());
std::cout << 6 << std::endl;
new_msg->set(MessageAttribute::STATE, MessageState::RESPONSE);
std::cout << 7 << std::endl;
new_msg->set(
MessageAttribute::CORRELATION_ID,
std::get<std::string>(msg.get(MessageAttribute::CORRELATION_ID)));
std::cout << 8 << std::endl;
return new_msg;
}
EXCEPT(1, "Unsupported MessageType specified in MessageFactory.");
}
Expand Down
8 changes: 8 additions & 0 deletions common/source/SocketFactory.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

// Local private includes
#include "communicators/HTTPCommunicator.hpp"
#include "communicators/ZeroMQCommunicator.hpp"
#include "sockets/HTTPSocket.hpp"
#include "sockets/ZeroMQSocket.hpp"

// Local public includes
Expand All @@ -20,6 +22,12 @@ SocketFactory::create(const SocketOptions &socket_options,
return std::unique_ptr<ISocket>(
new ZeroMQSocket(socket_options, credentials));
}

else if (socket_options.protocol_type == ProtocolType::HTTP) {
return std::unique_ptr<ISocket>(
new HTTPSocket(socket_options, credentials));
}

EXCEPT(1, "Unsupported ProtocolType specified in SocketFactory.");
}

Expand Down
192 changes: 192 additions & 0 deletions common/source/communicators/HTTPCommunicator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Local Private Includes
#include "HTTPCommunicator.hpp"

// Local public includes
#include "common/DynaLog.hpp"
#include "common/IMessage.hpp"
#include "common/ISocket.hpp"
#include "common/SocketFactory.hpp"
#include "common/SocketOptions.hpp"
#include "common/Util.hpp"

#include "common/ProtoBufMap.hpp"
// Standard includes
#include <curl/curl.h>
#include <list> // Include list header
#include <memory>
#include <string>
#include <unordered_map>

namespace SDMS {

/******************************************************************************
* Public Class Methods
******************************************************************************/

// Created contructor for HTTPCommunicator
HTTPCommunicator::HTTPCommunicator(const SocketOptions &socket_options,
const ICredentials &credentials,
uint32_t timeout_on_receive_milliseconds,
long timeout_on_poll_milliseconds,
const LogContext &log_context)
: m_timeout_on_receive_milliseconds(timeout_on_receive_milliseconds),
m_timeout_on_poll_milliseconds(timeout_on_poll_milliseconds) {
// Add the socket fact here
m_log_context = log_context;
auto socket_factory = SocketFactory();
m_socket = socket_factory.create(socket_options, credentials);

std::string id = m_socket->getID();
if (id.size() > constants::communicator::MAX_COMMUNICATOR_IDENTITY_SIZE) {
std::string error_msg =
"HTTP exceeds max number of characters allowed, allowed: ";
error_msg +=
std::to_string(constants::communicator::MAX_COMMUNICATOR_IDENTITY_SIZE);
error_msg +=
" number provided " + std::to_string(id.size()) + " identity: " + id;
DL_ERROR(m_log_context, error_msg);
// EXCEPT_PARAM(1, error_msg);
}
}

ICommunicator::Response HTTPCommunicator::poll(const MessageType message_type) {
// Put send and recieve here and make sure response = response
// Step 1: Create and send message
std::unique_ptr<IMessage> message = m_msg_factory.create(message_type);
send(*message);

// Step 2: Receive response
ICommunicator::Response response = receive(message_type);

// Step 3: Process response
LogContext log_context = m_log_context;
if (!response.error && !response.time_out) {
log_context.correlation_id =
std::get<std::string>(message->get(MessageAttribute::CORRELATION_ID));

std::cout << "Correlation ID Checker in Poll func" << std::endl;
std::cout << log_context.correlation_id << std::endl;
std::string log_message = "Received message on communicator id: " + id();
log_message += ", receiving from address: " + address();
DL_TRACE(log_context, log_message);
} else {
if (response.error) {
std::string error_message =
"Error encountered for communicator id: " + id();
error_message += ", error is: " + response.error_msg;
error_message += ", receiving from address: " + address();
DL_ERROR(log_context, error_message);
} else if (response.time_out) {
std::string error_message =
"Timeout encountered for communicator id: " + id();
error_message += ", timeout occurred after: " +
std::to_string(m_timeout_on_poll_milliseconds);
error_message += ", receiving from address: " + address();
DL_TRACE(log_context, error_message);
}
}

return response;
}

/**
* This is technical debt in the future get rid of MsgBuf and replace with
* IMessage
**/
void HTTPCommunicator::send(IMessage &message) {
std::cout << "Attempting to send" << std::endl;
// add curl here
// create a std list of type IComm::Response, to be our buffer that
CURL *curl;
CURLcode res;
std::string readBuffer;
// Initialize CURL session
curl = curl_easy_init();

if (curl) {
std::string verb =
std::get<std::string>(message.get(MessageAttribute::VERB));
std::string endpoint =
std::get<std::string>(message.get(MessageAttribute::ENDPOINT));
// Parsing the message for the if statement below:
// Variables to store the parsed values
const std::string body = std::get<std::string>(message.getPayload());

// Instead of parsing we need to use getAttribute to get the endpoint, verb,
// we dont need body as its just the payload
std::cout << "Setting curl options" << std::endl;
// Setting string buffer
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlResponseWriteCB);

curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer);
// Set CURL options: URL, HTTP method, headers, body, etc.
curl_easy_setopt(curl, CURLOPT_URL,
endpoint.c_str()); // Set URL with localhost and port
// Example: Set headers if needed
struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, "Content-Type: application/json");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);

// Put a check here that breaks the message and if it is a post then do the
// below if not then do whatever it says to do:
if (verb == "POST") {
curl_easy_setopt(curl, CURLOPT_POST, 1);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str());
}
// Get Request
else if (verb == "GET") {
curl_easy_setopt(curl, CURLOPT_HTTPGET, 1);
}

// Perform the request
res = curl_easy_perform(curl);

// Check for errors
if (res != CURLE_OK) {
fprintf(stderr, "curl_easy_perform() failed: %s\n",
curl_easy_strerror(res));
}

// Cleanup
curl_slist_free_all(headers); // Free headers list
curl_easy_cleanup(curl);
ICommunicator::Response response;
MessageFactory msg_factory;
response.message = msg_factory.create(MessageType::STRING);
response.message->setPayload(readBuffer); // CHANGED . to ->
auto correlation_id_value =
std::get<std::string>(message.get(MessageAttribute::CORRELATION_ID));
// std::cout << "Correlation ID: "<< correlation_id_value << std::endl;
response.message->set(MessageAttribute::CORRELATION_ID,
correlation_id_value); // changed . to ->
// Store response in buffer
responseBuffer.push_back(
std::move(response)); // Assuming `response` is movable
}
}

/* Ideally in the future get rid of MsgBuf and replace with IMessage
**/
ICommunicator::Response HTTPCommunicator::receive(const MessageType) {
// FIFO from send's (imaginary) buffer
ICommunicator::Response response;

if (!responseBuffer.empty()) {
response =
std::move(responseBuffer.front()); // Use move semantics if possible
responseBuffer.pop_front(); // Remove response from buffer
} else {
// Handle case when buffer is empty
}
std::cout << "Successfully finished receiving" << std::endl;
return response;
}

const std::string HTTPCommunicator::id() const noexcept {
return std::string("ClientID");
}

const std::string HTTPCommunicator::address() const noexcept {
return std::string("ClientAddress");
}
} // namespace SDMS
Loading
Loading