Skip to content

Commit 574da81

Browse files
authored
Merge pull request #236 from den818/ProfileEvents
Profile events for query
2 parents cac2a03 + 333a1eb commit 574da81

File tree

4 files changed

+67
-2
lines changed

4 files changed

+67
-2
lines changed

clickhouse/client.cpp

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,11 @@
3939
#define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429
4040
#define DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET 54441
4141
#define DBMS_MIN_REVISION_WITH_OPENTELEMETRY 54442
42+
#define DBMS_MIN_REVISION_WITH_DISTRIBUTED_DEPTH 54448
43+
#define DBMS_MIN_REVISION_WITH_INITIAL_QUERY_START_TIME 54449
44+
#define DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS 54451
4245

43-
#define REVISION DBMS_MIN_REVISION_WITH_OPENTELEMETRY
46+
#define REVISION DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS
4447

4548
namespace clickhouse {
4649

@@ -476,6 +479,22 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
476479
return true;
477480
}
478481

482+
case ServerCodes::ProfileEvents: {
483+
if (!WireFormat::SkipString(*input_)) {
484+
return false;
485+
}
486+
487+
Block block;
488+
if (!ReadBlock(*input_, &block)) {
489+
return false;
490+
}
491+
492+
if (events_) {
493+
events_->OnProfileEvents(block);
494+
}
495+
return true;
496+
}
497+
479498
default:
480499
throw UnimplementedError("unimplemented " + std::to_string((int)packet_type));
481500
break;
@@ -649,6 +668,9 @@ void Client::Impl::SendQuery(const Query& query) {
649668
WireFormat::WriteString(*output_, info.initial_user);
650669
WireFormat::WriteString(*output_, info.initial_query_id);
651670
WireFormat::WriteString(*output_, info.initial_address);
671+
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_INITIAL_QUERY_START_TIME) {
672+
WireFormat::WriteFixed<int64_t>(*output_, 0);
673+
}
652674
WireFormat::WriteFixed(*output_, info.iface_type);
653675

654676
WireFormat::WriteString(*output_, info.os_user);
@@ -660,6 +682,8 @@ void Client::Impl::SendQuery(const Query& query) {
660682

661683
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO)
662684
WireFormat::WriteString(*output_, info.quota_key);
685+
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_DISTRIBUTED_DEPTH)
686+
WireFormat::WriteUInt64(*output_, 0u);
663687
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) {
664688
WireFormat::WriteUInt64(*output_, info.client_version_patch);
665689
}

clickhouse/protocol.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ namespace clickhouse {
1717
TablesStatusResponse = 9, /// Response to TableStatus.
1818
Log = 10, /// Query execution log.
1919
TableColumns = 11, /// Columns' description for default values calculation
20+
PartUUIDs = 12, /// List of unique parts ids.
21+
ReadTaskRequest = 13, /// String (UUID) describes a request for which next task is needed
22+
/// This is such an inverted logic, where server sends requests
23+
/// And client returns back response
24+
ProfileEvents = 14, /// Packet with profile events from server.
2025
};
2126
}
2227

clickhouse/query.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ class QueryEvents {
6666
*/
6767
virtual void OnServerLog(const Block& block) = 0;
6868

69+
/// Handle query execution profile events.
70+
virtual void OnProfileEvents(const Block& block) = 0;
71+
6972
virtual void OnFinish() = 0;
7073
};
7174

@@ -75,6 +78,7 @@ using ProgressCallback = std::function<void(const Progress& progress)>;
7578
using SelectCallback = std::function<void(const Block& block)>;
7679
using SelectCancelableCallback = std::function<bool(const Block& block)>;
7780
using SelectServerLogCallback = std::function<bool(const Block& block)>;
81+
using ProfileEventsCallback = std::function<bool(const Block& block)>;
7882

7983

8084
class Query : public QueryEvents {
@@ -148,6 +152,12 @@ class Query : public QueryEvents {
148152
return *this;
149153
}
150154

155+
/// Set handler for receiving profile events.
156+
inline Query& OnProfileEvents(ProfileEventsCallback cb) {
157+
profile_events_callback_cb_ = std::move(cb);
158+
return *this;
159+
}
160+
151161
static const std::string default_query_id;
152162

153163
private:
@@ -187,6 +197,12 @@ class Query : public QueryEvents {
187197
}
188198
}
189199

200+
void OnProfileEvents(const Block& block) override {
201+
if (profile_events_callback_cb_) {
202+
profile_events_callback_cb_(block);
203+
}
204+
}
205+
190206
void OnFinish() override {
191207
}
192208

@@ -200,6 +216,7 @@ class Query : public QueryEvents {
200216
SelectCallback select_cb_;
201217
SelectCancelableCallback select_cancelable_cb_;
202218
SelectServerLogCallback select_server_log_cb_;
219+
ProfileEventsCallback profile_events_callback_cb_;
203220
};
204221

205222
}

ut/client_ut.cpp

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1126,7 +1126,6 @@ TEST_P(ClientCase, ServerLogs) {
11261126
EXPECT_GT(received_row_count, 0U);
11271127
}
11281128

1129-
11301129
TEST_P(ClientCase, TracingContext) {
11311130
Block block;
11321131
createTableWithOneColumn<ColumnString>(block);
@@ -1152,6 +1151,26 @@ TEST_P(ClientCase, TracingContext) {
11521151
EXPECT_GT(received_rows, 0u);
11531152
}
11541153

1154+
TEST_P(ClientCase, OnProfileEvents) {
1155+
Block block;
1156+
createTableWithOneColumn<ColumnString>(block);
1157+
1158+
client_->Execute("INSERT INTO " + table_name + " (*) VALUES (\'Foo\'), (\'Bar\')");
1159+
size_t received_row_count = 0;
1160+
Query query("SELECT * FROM " + table_name);
1161+
1162+
query.OnProfileEvents([&](const Block& block) {
1163+
received_row_count += block.GetRowCount();
1164+
return true;
1165+
});
1166+
client_->Execute(query);
1167+
1168+
const int DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS = 54451;
1169+
if (client_->GetServerInfo().revision >= DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS) {
1170+
EXPECT_GT(received_row_count, 0U);
1171+
}
1172+
}
1173+
11551174
const auto LocalHostEndpoint = ClientOptions()
11561175
.SetHost( getEnvOrDefault("CLICKHOUSE_HOST", "localhost"))
11571176
.SetPort( getEnvOrDefault<size_t>("CLICKHOUSE_PORT", "9000"))

0 commit comments

Comments
 (0)