Skip to content

Commit

Permalink
thrift_proxy: add upstream cluster metrics for MessageType (#15668)
Browse files Browse the repository at this point in the history
Signed-off-by: William Fu <[email protected]>
  • Loading branch information
williamsfu99 authored Apr 6, 2021
1 parent f4fb2e9 commit fe4e11d
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ scenarios. The filter's main job is to follow the instructions specified in the
Statistics
----------

The filter outputs statistics in the *thrift.<stat_prefix>.* namespace.
The filter outputs generic routing error statistics in the *thrift.<stat_prefix>.* namespace.

.. csv-table::
:header: Name, Type, Description
Expand All @@ -23,3 +23,17 @@ The filter outputs statistics in the *thrift.<stat_prefix>.* namespace.
unknown_cluster, Counter, Total requests with a route that has an unknown cluster.
upstream_rq_maintenance_mode, Counter, Total requests with a destination cluster in maintenance mode.
no_healthy_upstream, Counter, Total requests with no healthy upstream endpoints available.


The filter also outputs MessageType statistics in the upstream cluster's stat scope.

.. csv-table::
:header: Name, Type, Description
:widths: 1, 1, 2

request_call, Counter, Total requests with the "Call" message type.
request_oneway, Counter, Total requests with the "Oneway" message type.
request_invalid_type, Counter, Total requests with an unsupported message type.
response_reply, Counter, Total responses with the "Reply" message type. Includes both successes and errors.
response_exception, Counter, Total responses with the "Exception" message type.
response_invalid_type, Counter, Total responses with an unsupported message type.
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ New Features
* tcp_proxy: added a :ref:`use_post field <envoy_v3_api_field_extensions.filters.network.tcp_proxy.v3.TcpProxy.TunnelingConfig.use_post>` for using HTTP POST to proxy TCP streams.
* tcp_proxy: added a :ref:`headers_to_add field <envoy_v3_api_field_extensions.filters.network.tcp_proxy.v3.TcpProxy.TunnelingConfig.headers_to_add>` for setting additional headers to the HTTP requests for TCP proxing.
* thrift_proxy: added a :ref:`max_requests_per_connection field <envoy_v3_api_field_extensions.filters.network.thrift_proxy.v3.ThriftProxy.max_requests_per_connection>` for setting maximum requests for per downstream connection.
* thrift_proxy: added per upstream metrics within the :ref:`thrift router <envoy_v3_api_msg_extensions.filters.network.thrift_proxy.router.v3.Router>` for messagetype in request/response.
* tls peer certificate validation: added :ref:`SPIFFE validator <envoy_v3_api_msg_extensions.transport_sockets.tls.v3.SPIFFECertValidatorConfig>` for supporting isolated multiple trust bundles in a single listener or cluster.
* tracing: added the :ref:`pack_trace_reason <envoy_v3_api_field_extensions.request_id.uuid.v3.UuidRequestIdConfig.pack_trace_reason>`
field as well as explicit configuration for the built-in :ref:`UuidRequestIdConfig <envoy_v3_api_msg_extensions.request_id.uuid.v3.UuidRequestIdConfig>`
Expand Down
4 changes: 4 additions & 0 deletions source/extensions/filters/network/thrift_proxy/conn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ class ConnectionManager : public Network::ReadFilter,
}
void resetDownstreamConnection() override { parent_.resetDownstreamConnection(); }
StreamInfo::StreamInfo& streamInfo() override { return parent_.streamInfo(); }
MessageMetadataSharedPtr responseMetadata() override { return parent_.responseMetadata(); }
bool responseSuccess() override { return parent_.responseSuccess(); }

ActiveRpc& parent_;
ThriftFilters::DecoderFilterSharedPtr handle_;
Expand Down Expand Up @@ -217,6 +219,8 @@ class ConnectionManager : public Network::ReadFilter,
ThriftFilters::ResponseStatus upstreamData(Buffer::Instance& buffer) override;
void resetDownstreamConnection() override;
StreamInfo::StreamInfo& streamInfo() override { return stream_info_; }
MessageMetadataSharedPtr responseMetadata() override { return response_decoder_->metadata_; }
bool responseSuccess() override { return response_decoder_->success_.value_or(false); }

// Thrift::FilterChainFactoryCallbacks
void addDecoderFilter(ThriftFilters::DecoderFilterSharedPtr filter) override {
Expand Down
11 changes: 11 additions & 0 deletions source/extensions/filters/network/thrift_proxy/filters/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ class DecoderFilterCallbacks {
* @return StreamInfo for logging purposes.
*/
virtual StreamInfo::StreamInfo& streamInfo() PURE;

/**
* @return Response decoder metadata created by the connection manager.
*/
virtual MessageMetadataSharedPtr responseMetadata() PURE;

/**
* @return Signal indicating whether or not the response decoder encountered a successful/void
* reply.
*/
virtual bool responseSuccess() PURE;
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,19 @@ FilterStatus Router::messageBegin(MessageMetadataSharedPtr metadata) {
cluster_ = cluster->info();
ENVOY_STREAM_LOG(debug, "cluster '{}' match for method '{}'", *callbacks_, cluster_name,
metadata->methodName());
switch (metadata->messageType()) {
case MessageType::Call:
incClusterScopeCounter(request_call_);
break;

case MessageType::Oneway:
incClusterScopeCounter(request_oneway_);
break;

default:
incClusterScopeCounter(request_invalid_type_);
break;
}

if (cluster_->maintenanceMode()) {
stats_.upstream_rq_maintenance_mode_.inc();
Expand Down Expand Up @@ -338,6 +351,24 @@ void Router::onUpstreamData(Buffer::Instance& data, bool end_stream) {
ThriftFilters::ResponseStatus status = callbacks_->upstreamData(data);
if (status == ThriftFilters::ResponseStatus::Complete) {
ENVOY_STREAM_LOG(debug, "response complete", *callbacks_);
switch (callbacks_->responseMetadata()->messageType()) {
case MessageType::Reply:
incClusterScopeCounter(response_reply_);
if (callbacks_->responseSuccess()) {
incClusterScopeCounter(response_reply_success_);
} else {
incClusterScopeCounter(response_reply_error_);
}
break;

case MessageType::Exception:
incClusterScopeCounter(response_exception_);
break;

default:
incClusterScopeCounter(response_invalid_type_);
break;
}
upstream_request_->onResponseComplete();
cleanup();
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
Router(Upstream::ClusterManager& cluster_manager, const std::string& stat_prefix,
Stats::Scope& scope)
: cluster_manager_(cluster_manager), stats_(generateStats(stat_prefix, scope)),
stat_name_set_(scope.symbolTable().makeSet("thrift_proxy")),
request_call_(stat_name_set_->add("request_call")),
request_oneway_(stat_name_set_->add("request_oneway")),
request_invalid_type_(stat_name_set_->add("request_invalid_type")),
response_reply_(stat_name_set_->add("response_reply")),
response_reply_success_(stat_name_set_->add("response_success")),
response_reply_error_(stat_name_set_->add("response_error")),
response_exception_(stat_name_set_->add("response_exception")),
response_invalid_type_(stat_name_set_->add("response_invalid_type")),
passthrough_supported_(false) {}

~Router() override = default;
Expand All @@ -188,6 +197,11 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
void setDecoderFilterCallbacks(ThriftFilters::DecoderFilterCallbacks& callbacks) override;
bool passthroughSupported() const override { return passthrough_supported_; }

// Stats
void incClusterScopeCounter(Stats::StatName name) {
cluster_->statsScope().counterFromStatName(name).inc();
}

// ProtocolConverter
FilterStatus transportBegin(MessageMetadataSharedPtr metadata) override;
FilterStatus transportEnd() override;
Expand Down Expand Up @@ -259,6 +273,15 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,

Upstream::ClusterManager& cluster_manager_;
RouterStats stats_;
Stats::StatNameSetPtr stat_name_set_;
const Stats::StatName request_call_;
const Stats::StatName request_oneway_;
const Stats::StatName request_invalid_type_;
const Stats::StatName response_reply_;
const Stats::StatName response_reply_success_;
const Stats::StatName response_reply_error_;
const Stats::StatName response_exception_;
const Stats::StatName response_invalid_type_;

ThriftFilters::DecoderFilterCallbacks* callbacks_{};
RouteConstSharedPtr route_{};
Expand Down
3 changes: 3 additions & 0 deletions test/extensions/filters/network/thrift_proxy/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,13 @@ class MockDecoderFilterCallbacks : public DecoderFilterCallbacks {
MOCK_METHOD(ResponseStatus, upstreamData, (Buffer::Instance&));
MOCK_METHOD(void, resetDownstreamConnection, ());
MOCK_METHOD(StreamInfo::StreamInfo&, streamInfo, ());
MOCK_METHOD(MessageMetadataSharedPtr, responseMetadata, ());
MOCK_METHOD(bool, responseSuccess, ());

uint64_t stream_id_{1};
NiceMock<Network::MockConnection> connection_;
NiceMock<StreamInfo::MockStreamInfo> stream_info_;
MessageMetadataSharedPtr metadata_;
std::shared_ptr<Router::MockRoute> route_;
};

Expand Down
Loading

0 comments on commit fe4e11d

Please sign in to comment.