diff --git a/README.md b/README.md index cec69b0f..8173f36c 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,33 @@ int main() } ); + /// Select values inserted in the previous step using external data feature + /// See https://clickhouse.com/docs/engines/table-engines/special/external-data + { + Block block1, block2; + auto id = std::make_shared(); + id->Append(1); + block1.AppendColumn("id" , id); + + auto name = std::make_shared(); + name->Append("seven"); + block2.AppendColumn("name", name); + + const std::string _1 = "_1"; + const std::string _2 = "_2"; + + const ExternalTables external = {{_1, block1}, {_2, block2}}; + client.SelectWithExternalData("SELECT id, name FROM default.numbers where id in (_1) or name in (_2)", + external, [] (const Block& block) + { + for (size_t i = 0; i < block.GetRowCount(); ++i) { + std::cout << block[0]->As()->At(i) << " " + << block[1]->As()->At(i) << "\n"; + } + } + ); + } + /// Delete table. client.Execute("DROP TABLE default.numbers"); diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 0922fcce..b41e0ba3 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -155,6 +155,8 @@ class Client::Impl { void ExecuteQuery(Query query); + void SelectWithExternalData(Query query, const ExternalTables& external_tables); + void SendCancel(); void Insert(const std::string& table_name, const std::string& query_id, const Block& block); @@ -174,10 +176,14 @@ class Client::Impl { bool ReceivePacket(uint64_t* server_packet = nullptr); - void SendQuery(const Query& query); + void SendQuery(const Query& query, bool finalize = true); + void FinalizeQuery(); void SendData(const Block& block); + void SendBlockData(const Block& block); + void SendExternalData(const ExternalTables& external_tables); + bool SendHello(); bool ReadBlock(InputStream& input, Block* block); @@ -291,6 +297,51 @@ void Client::Impl::ExecuteQuery(Query query) { } } + +void Client::Impl::SelectWithExternalData(Query query, const ExternalTables& external_tables) { + if (server_info_.revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) { + throw UnimplementedError("This version of ClickHouse server doesn't support temporary tables"); + } + + EnsureNull en(static_cast(&query), &events_); + + if (options_.ping_before_query) { + RetryGuard([this]() { Ping(); }); + } + + SendQuery(query, false); + SendExternalData(external_tables); + FinalizeQuery(); + + while (ReceivePacket()) { + ; + } +} + +void Client::Impl::SendBlockData(const Block& block) { + if (compression_ == CompressionState::Enable) { + std::unique_ptr compressed_output = std::make_unique(output_.get(), options_.max_compression_chunk_size, options_.compression_method); + BufferedOutput buffered(std::move(compressed_output), options_.max_compression_chunk_size); + + WriteBlock(block, buffered); + } else { + WriteBlock(block, *output_); + } +} + +void Client::Impl::SendExternalData(const ExternalTables& external_tables) { + for (const auto& table: external_tables) { + if (!table.data.GetRowCount()) { + // skip empty blocks to keep the connection in the consistent state as the current request would be marked as finished by such an empty block + continue; + } + WireFormat::WriteFixed(*output_, ClientCodes::Data); + WireFormat::WriteString(*output_, table.name); + SendBlockData(table.data); + } +} + + std::string NameToQueryString(const std::string &input) { std::string output; @@ -753,7 +804,7 @@ void Client::Impl::SendCancel() { output_->Flush(); } -void Client::Impl::SendQuery(const Query& query) { +void Client::Impl::SendQuery(const Query& query, bool finalize) { WireFormat::WriteUInt64(*output_, ClientCodes::Query); WireFormat::WriteString(*output_, query.GetQueryID()); @@ -858,7 +909,13 @@ void Client::Impl::SendQuery(const Query& query) { } WireFormat::WriteString(*output_, std::string()); // empty string after last param } + + if (finalize) { + FinalizeQuery(); + } +} +void Client::Impl::FinalizeQuery() { // Send empty block as marker of // end of data SendData(Block()); @@ -905,16 +962,7 @@ void Client::Impl::SendData(const Block& block) { if (server_info_.revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) { WireFormat::WriteString(*output_, std::string()); } - - if (compression_ == CompressionState::Enable) { - - std::unique_ptr compressed_output = std::make_unique(output_.get(), options_.max_compression_chunk_size, options_.compression_method); - BufferedOutput buffered(std::move(compressed_output), options_.max_compression_chunk_size); - - WriteBlock(block, buffered); - } else { - WriteBlock(block, *output_); - } + SendBlockData(block); output_->Flush(); } @@ -1077,6 +1125,22 @@ void Client::Select(const Query& query) { Execute(query); } +void Client::SelectWithExternalData(const std::string& query, const ExternalTables& external_tables, SelectCallback cb) { + impl_->SelectWithExternalData(Query(query).OnData(std::move(cb)), external_tables); +} + +void Client::SelectWithExternalData(const std::string& query, const std::string& query_id, const ExternalTables& external_tables, SelectCallback cb) { + impl_->SelectWithExternalData(Query(query, query_id).OnData(std::move(cb)), external_tables); +} + +void Client::SelectWithExternalDataCancelable(const std::string& query, const ExternalTables& external_tables, SelectCancelableCallback cb) { + impl_->SelectWithExternalData(Query(query).OnDataCancelable(std::move(cb)), external_tables); +} + +void Client::SelectWithExternalDataCancelable(const std::string& query, const std::string& query_id, const ExternalTables& external_tables, SelectCancelableCallback cb) { + impl_->SelectWithExternalData(Query(query, query_id).OnDataCancelable(std::move(cb)), external_tables); +} + void Client::Insert(const std::string& table_name, const Block& block) { impl_->Insert(table_name, Query::default_query_id, block); } diff --git a/clickhouse/client.h b/clickhouse/client.h index 28b89f5f..0b15a792 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -225,6 +225,13 @@ std::ostream& operator<<(std::ostream& os, const Endpoint& options); class SocketFactory; +struct ExternalTable { + const std::string_view name; + const Block& data; +}; + +using ExternalTables = std::vector; + /** * */ @@ -248,6 +255,16 @@ class Client { void SelectCancelable(const std::string& query, SelectCancelableCallback cb); void SelectCancelable(const std::string& query, const std::string& query_id, SelectCancelableCallback cb); + // The same as Select but with an external data + // required for the query, see https://clickhouse.com/docs/engines/table-engines/special/external-data + void SelectWithExternalData(const std::string& query, const ExternalTables& external_tables, SelectCallback cb); + void SelectWithExternalData(const std::string& query, const std::string& query_id, const ExternalTables& external_tables, SelectCallback cb); + + // The same as SelectWithExternalData but can be canceled by returning false from + // the data handler function \p cb. + void SelectWithExternalDataCancelable(const std::string& query, const ExternalTables& external_tables, SelectCancelableCallback cb); + void SelectWithExternalDataCancelable(const std::string& query, const std::string& query_id, const ExternalTables& external_tables, SelectCancelableCallback cb); + /// Alias for Execute. void Select(const Query& query);