Skip to content

New list hosts rebased #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,5 @@ PROJECT (CLICKHOUSE-CLIENT)
ut
)
ENDIF (BUILD_TESTS)


13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,16 @@ client.Select("SELECT id, name FROM test.numbers", [] (const Block& block)
client.Execute("DROP TABLE test.numbers");
```
Please note that `Client` instance is NOT thread-safe. I.e. you must create a separate `Client` for each thread or utilize some synchronization techniques.

## Features
### Multiple host
It is possible to specify multiple hosts to connect to. The connection
will be set to the first available host.
```cpp
Client client(ClientOptions()
.SetHost({
{"host1.com", 8000},
{"host2.com"}, /// port is ClientOptions.port
}));

```
3 changes: 3 additions & 0 deletions clickhouse/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,6 @@ IF (WIN32 OR MINGW)
TARGET_LINK_LIBRARIES (clickhouse-cpp-lib wsock32 ws2_32)
TARGET_LINK_LIBRARIES (clickhouse-cpp-lib-static wsock32 ws2_32)
ENDIF ()

add_executable(chcptest miniproj/example.cpp)
target_link_libraries(chcptest clickhouse-cpp-lib)
131 changes: 131 additions & 0 deletions clickhouse/base/endpoint.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#pragma once

#include "../exceptions.h"

#include <optional>
#include <queue>
#include <iostream>
namespace clickhouse {
class NetworkAddress;

/// List of hostnames with service ports
struct Endpoint {
std::string host;
std::optional<unsigned int> port = std::nullopt;
};

class EndpointConnector {
public:
std::vector<Endpoint> endpoints;
explicit EndpointConnector(std::vector<Endpoint> endpoints) {
std::cout << "connector constructor" << endpoints.size() << std::endl;
this->endpoints = endpoints;
}

struct Iterator {
Iterator() {
endpoints_ = nullptr;
finished_ = true;
}
Iterator(const std::vector<Endpoint> *endpoints, const std::vector<Endpoint>::const_iterator & start_with, bool finished = false) {
if (finished) {
finished_ = true;
}
it_ = start_with;
start_with_ = start_with;
endpoints_ = endpoints;
}
Iterator& operator++() {
++it_;
if (it_ == endpoints_->end()) {
it_ = endpoints_->begin();
}
if (it_ == start_with_) {
finished_ = true;
}
return *this;
}

bool operator!=(const Iterator& other) {
std::cout << "compare start" << std::endl;
if (finished_ && other.finished_) {
return false;
}
std::cout << "first" << std::endl;
if (finished_ != other.finished_) {
return true;
}
std::cout << "second" << std::endl;
if (other.it_ != it_) {
return true;
}
std::cout << "third" << std::endl;
return false;
}
const Endpoint& operator*() const {
std::cout << "dereference" << std::endl;
return *it_;
}

std::vector<Endpoint>::const_iterator getInsideIterator() {
return it_;
}

private:
bool finished_ = false;
std::vector<Endpoint>::const_iterator start_with_;
std::vector<Endpoint>::const_iterator it_;
const std::vector<Endpoint> *endpoints_;
};

Iterator begin() const {
return Iterator(&endpoints, endpoints.begin());
}
Iterator end() const {
return Iterator(&endpoints, endpoints.begin(), true);
}

bool isConnected();

void setCurrentEndpoint(const Iterator& iter) const {
current_endpoint_ = iter;
}
Endpoint getCurrentEndpoint();

void setNetworkAddress(std::shared_ptr<NetworkAddress> addr) const {
addr_ = addr;
}
std::shared_ptr<NetworkAddress> getNetworkAddress() {
return addr_;
}


enum ReconnectType {
ONLY_CURRENT,
ALL
};

ReconnectType getReconnectType() {
return reconnectType_;
}

void setReconnectType(ReconnectType reconnectType) {
if (reconnectType == ONLY_CURRENT) {
begin_ = current_endpoint_;
end_ = current_endpoint_;
++end_;
} else if (reconnectType == ALL) {
begin_ = current_endpoint_;
end_ = Iterator(&endpoints, current_endpoint_.getInsideIterator(), true);
} else {
throw AssertionError("There is no such Reconnect Type: " + std::to_string(reconnectType));
}
}

ReconnectType reconnectType_;
Iterator begin_;
Iterator end_;
mutable Iterator current_endpoint_;
mutable std::shared_ptr<NetworkAddress> addr_;
};
}
92 changes: 50 additions & 42 deletions clickhouse/base/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,50 +120,60 @@ ssize_t Poll(struct pollfd* fds, int nfds, int timeout) noexcept {
#endif
}

SOCKET SocketConnect(const NetworkAddress& addr) {
SOCKET SocketConnect(const EndpointConnector& endpointConnector) {
int last_err = 0;
for (auto res = addr.Info(); res != nullptr; res = res->ai_next) {
SOCKET s(socket(res->ai_family, res->ai_socktype, res->ai_protocol));

if (s == -1) {
continue;
}
std::cout << "start iterations" << std::endl;
for (auto it = endpointConnector.begin(); it != endpointConnector.end(); ++it) {
std::cout << "host is" << (*it).host << std::endl;
auto endpoint = *it;
std::cout << "host " << endpoint.host << " port " << endpoint.port.value() << std::endl;
const auto addr = NetworkAddress(endpoint.host, std::to_string(endpoint.port.value()));

for (auto res = addr.Info(); res != nullptr; res = res->ai_next) {
SOCKET s(socket(res->ai_family, res->ai_socktype, res->ai_protocol));

if (s == -1) {
continue;
}

SetNonBlock(s, true);
SetNonBlock(s, true);

if (connect(s, res->ai_addr, (int)res->ai_addrlen) != 0) {
int err = getSocketErrorCode();
if (
err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK
if (connect(s, res->ai_addr, (int)res->ai_addrlen) != 0) {
int err = getSocketErrorCode();
if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK
#if defined(_win_)
|| err == WSAEWOULDBLOCK || err == WSAEINPROGRESS
|| err == WSAEWOULDBLOCK || err == WSAEINPROGRESS
#endif
) {
pollfd fd;
fd.fd = s;
fd.events = POLLOUT;
fd.revents = 0;
ssize_t rval = Poll(&fd, 1, 5000);

if (rval == -1) {
throw std::system_error(getSocketErrorCode(), getErrorCategory(), "fail to connect");
}
if (rval > 0) {
socklen_t len = sizeof(err);
getsockopt(s, SOL_SOCKET, SO_ERROR, (char*)&err, &len);

if (!err) {
SetNonBlock(s, false);
return s;
) {
pollfd fd;
fd.fd = s;
fd.events = POLLOUT;
fd.revents = 0;
ssize_t rval = Poll(&fd, 1, 5000);

if (rval == -1) {
throw std::system_error(getSocketErrorCode(), getErrorCategory(), "fail to connect");
}
if (rval > 0) {
socklen_t len = sizeof(err);
getsockopt(s, SOL_SOCKET, SO_ERROR, (char*)&err, &len);

if (!err) {
SetNonBlock(s, false);
return s;
}
last_err = err;
}
last_err = err;
}
} else {
SetNonBlock(s, false);
return s;
}
} else {
SetNonBlock(s, false);
return s;
endpointConnector.setCurrentEndpoint(it);
//endpointConnector.setNetworkAddress(std::make_shared<NetworkAddress>(addr));
}
}
std::cout << "finish iterations" << std::endl;
if (last_err > 0) {
throw std::system_error(last_err, getErrorCategory(), "fail to connect");
}
Expand Down Expand Up @@ -224,8 +234,8 @@ void SocketFactory::sleepFor(const std::chrono::milliseconds& duration) {
}


Socket::Socket(const NetworkAddress& addr)
: handle_(SocketConnect(addr))
Socket::Socket(EndpointConnector& endpointConnector)
: handle_(SocketConnect(endpointConnector))
{}

Socket::Socket(Socket&& other) noexcept
Expand Down Expand Up @@ -299,17 +309,15 @@ std::unique_ptr<OutputStream> Socket::makeOutputStream() const {

NonSecureSocketFactory::~NonSecureSocketFactory() {}

std::unique_ptr<SocketBase> NonSecureSocketFactory::connect(const ClientOptions &opts) {
const auto address = NetworkAddress(opts.host, std::to_string(opts.port));

auto socket = doConnect(address);
std::unique_ptr<SocketBase> NonSecureSocketFactory::connect(const ClientOptions &opts, EndpointConnector& endpointConnector) {
auto socket = doConnect(endpointConnector);
setSocketOptions(*socket, opts);

return socket;
}

std::unique_ptr<Socket> NonSecureSocketFactory::doConnect(const NetworkAddress& address) {
return std::make_unique<Socket>(address);
std::unique_ptr<Socket> NonSecureSocketFactory::doConnect(EndpointConnector& endpointConnector) {
return std::make_unique<Socket>(endpointConnector);
}

void NonSecureSocketFactory::setSocketOptions(Socket &socket, const ClientOptions &opts) {
Expand Down
9 changes: 5 additions & 4 deletions clickhouse/base/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "platform.h"
#include "input.h"
#include "output.h"
#include "endpoint.h"

#include <cstddef>
#include <string>
Expand Down Expand Up @@ -76,15 +77,15 @@ class SocketFactory {

// TODO: move connection-related options to ConnectionOptions structure.

virtual std::unique_ptr<SocketBase> connect(const ClientOptions& opts) = 0;
virtual std::unique_ptr<SocketBase> connect(const ClientOptions& opts, EndpointConnector& endpointConnector) = 0;

virtual void sleepFor(const std::chrono::milliseconds& duration);
};


class Socket : public SocketBase {
public:
Socket(const NetworkAddress& addr);
Socket(EndpointConnector& endpointConnector);
Socket(Socket&& other) noexcept;
Socket& operator=(Socket&& other) noexcept;

Expand Down Expand Up @@ -116,10 +117,10 @@ class NonSecureSocketFactory : public SocketFactory {
public:
~NonSecureSocketFactory() override;

std::unique_ptr<SocketBase> connect(const ClientOptions& opts) override;
std::unique_ptr<SocketBase> connect(const ClientOptions& opts, EndpointConnector& endpointConnector) override;

protected:
virtual std::unique_ptr<Socket> doConnect(const NetworkAddress& address);
virtual std::unique_ptr<Socket> doConnect(EndpointConnector& endpointConnector);

void setSocketOptions(Socket& socket, const ClientOptions& opts);
};
Expand Down
7 changes: 4 additions & 3 deletions clickhouse/base/sslsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,17 @@ SSL_CTX * SSLContext::getContext() {
<< "\n\t handshake state: " << SSL_get_state(ssl_) \
<< std::endl
*/
SSLSocket::SSLSocket(const NetworkAddress& addr, const SSLParams & ssl_params,
SSLSocket::SSLSocket(EndpointConnector& endpointConnector, const SSLParams & ssl_params,
SSLContext& context)
: Socket(addr)
: Socket(endpointConnector)
, ssl_(SSL_new(context.getContext()), &SSL_free)
{
auto ssl = ssl_.get();
if (!ssl)
throw clickhouse::OpenSSLError("Failed to create SSL instance");

std::unique_ptr<ASN1_OCTET_STRING, decltype(&ASN1_OCTET_STRING_free)> ip_addr(a2i_IPADDRESS(addr.Host().c_str()), &ASN1_OCTET_STRING_free);
auto addr = endpointConnector.getNetworkAddress().Host().c_str();
std::unique_ptr<ASN1_OCTET_STRING, decltype(&ASN1_OCTET_STRING_free)> ip_addr(a2i_IPADDRESS(addr), &ASN1_OCTET_STRING_free);

HANDLE_SSL_ERROR(ssl, SSL_set_fd(ssl, handle_));
if (ssl_params.use_SNI)
Expand Down
2 changes: 1 addition & 1 deletion clickhouse/base/sslsocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class SSLContext

class SSLSocket : public Socket {
public:
explicit SSLSocket(const NetworkAddress& addr, const SSLParams & ssl_params, SSLContext& context);
explicit SSLSocket(EndpointConnector& addr, const SSLParams & ssl_params, SSLContext& context);
SSLSocket(SSLSocket &&) = default;
~SSLSocket() override = default;

Expand Down
Loading