Skip to content

Commit f0ba75b

Browse files
Fix high CPU utilization regression on event streaming introduced by #3302 (#3318)
1 parent 44dacf9 commit f0ba75b

File tree

9 files changed

+54
-19
lines changed

9 files changed

+54
-19
lines changed

src/aws-cpp-sdk-core/include/aws/core/client/AWSClientAsyncCRTP.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ namespace Aws
1414
namespace Client
1515
{
1616
class AsyncCallerContext;
17+
template <typename OutcomeT, typename ClientT, typename AWSEndpointT, typename RequestT, typename HandlerT>
18+
class BidirectionalEventStreamingTask;
1719

1820
/**
1921
* A helper to determine if AWS Operation is EventStream-enabled or not (based on const-ness of the request)
@@ -204,6 +206,9 @@ namespace Client
204206
return Aws::Client::MakeCallableOperation(AwsServiceClientT::GetAllocationTag(), operationFunc, clientThis, clientThis->m_clientConfiguration.executor.get());
205207
}
206208
protected:
209+
template <typename OutcomeT, typename ClientT, typename AWSEndpointT, typename RequestT, typename HandlerT>
210+
friend class BidirectionalEventStreamingTask; // allow BidirectionalEventStreamingTask to access m_isInitialized
211+
207212
std::atomic<bool> m_isInitialized;
208213
mutable std::atomic<size_t> m_operationsProcessed;
209214
mutable std::condition_variable m_shutdownSignal;

src/aws-cpp-sdk-core/include/aws/core/client/AWSClientEventStreamingAsyncTask.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,16 @@ class AWS_CORE_LOCAL BidirectionalEventStreamingTask final {
9191
* @return OutcomeT, operation response, NoResult on success. (Check InitialResponse for the actual service reply).
9292
*/
9393
OutcomeT operator()() {
94+
if(!m_clientThis->m_isInitialized)
95+
{
96+
AWS_LOGSTREAM_ERROR("BidirectionalEventStreamingTask", "Unable to call " <<
97+
m_pRequest->GetServiceRequestName() << ": client is not initialized (or already terminated)");
98+
m_handler(m_clientThis, *m_pRequest, Aws::Client::AWSError<CoreErrors>(CoreErrors::NOT_INITIALIZED,
99+
"NOT_INITIALIZED", "Client is not initialized or already terminated", false), m_handlerContext);
100+
return OutcomeT(NoResult());
101+
}
102+
Aws::Utils::RAIICounter raiiGuard(m_clientThis->m_operationsProcessed, &m_clientThis->m_shutdownSignal);
103+
94104
const auto outcome = m_clientThis->MakeRequest(*m_pRequest, m_endpoint, m_method, m_signerName);
95105
if (outcome.IsSuccess()) {
96106
m_handler(m_clientThis, *m_pRequest, OutcomeT(NoResult()), m_handlerContext);

src/aws-cpp-sdk-core/include/aws/core/utils/logging/ErrorMacros.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,12 @@ if(!m_isInitialized) \
6464
AWS_LOGSTREAM_ERROR(#OPERATION, "Unable to call " #OPERATION ": client is not initialized (or already terminated)"); \
6565
return Aws::Client::AWSError<CoreErrors>(CoreErrors::NOT_INITIALIZED, "NOT_INITIALIZED", "Client is not initialized or already terminated", false); \
6666
} \
67-
Aws::Utils::RAIICounter(this->m_operationsProcessed, &this->m_shutdownSignal)
67+
Aws::Utils::RAIICounter raiiGuard(this->m_operationsProcessed, &this->m_shutdownSignal)
6868

6969
#define AWS_ASYNC_OPERATION_GUARD(OPERATION) \
7070
if(!m_isInitialized) \
7171
{ \
7272
AWS_LOGSTREAM_ERROR(#OPERATION, "Unable to call " #OPERATION ": client is not initialized (or already terminated)"); \
7373
return handler(this, request, Aws::Client::AWSError<CoreErrors>(CoreErrors::NOT_INITIALIZED, "NOT_INITIALIZED", "Client is not initialized or already terminated", false), handlerContext); \
7474
} \
75-
Aws::Utils::RAIICounter(this->m_operationsProcessed, &this->m_shutdownSignal)
75+
Aws::Utils::RAIICounter raiiGuard(this->m_operationsProcessed, &this->m_shutdownSignal)

src/aws-cpp-sdk-core/include/aws/core/utils/stream/ConcurrentStreamBuf.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ namespace Aws
4141
*/
4242
bool WaitForDrain(int64_t timeoutMs);
4343

44+
/**
45+
* A flag returned by underflow() if there is no data available at the moment but stream must not be closed yet.
46+
*/
47+
static const int noData;
48+
4449
protected:
4550
std::streampos seekoff(std::streamoff off, std::ios_base::seekdir dir, std::ios_base::openmode which = std::ios_base::in | std::ios_base::out) override;
4651
std::streampos seekpos(std::streampos pos, std::ios_base::openmode which = std::ios_base::in | std::ios_base::out) override;

src/aws-cpp-sdk-core/source/client/AWSClient.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,14 @@ HttpResponseOutcome AWSClient::AttemptExhaustively(const Aws::Http::URI& uri,
357357
{
358358
break;
359359
}
360+
if (request.IsEventStreamRequest() &&
361+
(request.GetBody()->eof() ||
362+
(outcome.GetError().GetResponseCode() != Http::HttpResponseCode::REQUEST_NOT_MADE &&
363+
outcome.GetError().GetResponseCode() != Http::HttpResponseCode::NETWORK_CONNECT_TIMEOUT &&
364+
outcome.GetError().GetResponseCode() != Http::HttpResponseCode::SERVICE_UNAVAILABLE))) {
365+
AWS_LOGSTREAM_ERROR(AWS_CLIENT_LOG_TAG, "SDK is not able to retry EventStream request after the connection was established");
366+
break;
367+
}
360368

361369
AWS_LOGSTREAM_WARN(AWS_CLIENT_LOG_TAG, "Request failed, now waiting " << sleepMillis << " ms before attempting again.");
362370
if(request.GetBody())

src/aws-cpp-sdk-core/source/http/curl/CurlHttpClient.cpp

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <aws/core/utils/logging/LogMacros.h>
1616
#include <aws/core/utils/ratelimiter/RateLimiterInterface.h>
1717
#include <aws/core/utils/stream/AwsChunkedStream.h>
18+
#include <aws/core/utils/stream/ConcurrentStreamBuf.h>
1819

1920
#include <algorithm>
2021
#include <cassert>
@@ -315,12 +316,18 @@ static size_t ReadBody(char* ptr, size_t size, size_t nmemb, void* userdata, boo
315316
{
316317
size_t amountRead = 0;
317318
if (isStreaming) {
318-
if (!ioStream->eof() && ioStream->peek() != EOF) {
319-
amountRead = (size_t)ioStream->readsome(ptr, amountToRead);
319+
if (ioStream->bad()) {
320+
AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Input stream is bad!");
321+
return CURL_READFUNC_ABORT;
320322
}
321-
if (amountRead == 0 && !ioStream->eof()) {
323+
const int peekVal = ioStream->peek();
324+
if (peekVal == ConcurrentStreamBuf::noData) {
322325
return CURL_READFUNC_PAUSE;
323326
}
327+
if (ioStream->eof() || peekVal == EOF) {
328+
return 0;
329+
}
330+
amountRead = (size_t)ioStream->readsome(ptr, amountToRead);
324331
} else if (isAwsChunked && context->m_chunkedStream != nullptr) {
325332
amountRead = context->m_chunkedStream->BufferedRead(ptr, amountToRead);
326333
} else {
@@ -417,9 +424,9 @@ int CurlHttpClient::CurlProgressCallback(void *userdata, double, double, double,
417424
}
418425

419426
const int peekVal = ioStream->peek();
420-
if (ioStream->eof() && peekVal == std::char_traits<char>::eof()) {
427+
if (ioStream->eof() || peekVal == std::char_traits<char>::eof()) {
421428
// curl won't call ReadBody after the last ReadBody call returns 0
422-
// however, this Progress method is still called few times for incoming data.
429+
curl_easy_pause(context->m_curlHandle, CURLPAUSE_CONT);
423430
return 0;
424431
}
425432

@@ -432,10 +439,10 @@ int CurlHttpClient::CurlProgressCallback(void *userdata, double, double, double,
432439
// we should use multi handle or another HTTP client in the future to avoid this
433440
curl_easy_pause(context->m_curlHandle, CURLPAUSE_CONT);
434441
} else {
435-
if (peekVal != std::char_traits<char>::eof()) {
436-
curl_easy_pause(context->m_curlHandle, CURLPAUSE_CONT);
437-
} else {
442+
if (peekVal == ConcurrentStreamBuf::noData) {
438443
curl_easy_pause(context->m_curlHandle, CURLPAUSE_SEND);
444+
} else {
445+
curl_easy_pause(context->m_curlHandle, CURLPAUSE_CONT);
439446
}
440447
}
441448

src/aws-cpp-sdk-core/source/utils/stream/ConcurrentStreamBuf.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@ namespace Aws
1515
namespace Stream
1616
{
1717
const char TAG[] = "ConcurrentStreamBuf";
18+
19+
const int ConcurrentStreamBuf::noData = ((((('n' << 8) | 'z') << 8) | 'm') << 8) | 'a';
20+
1821
ConcurrentStreamBuf::ConcurrentStreamBuf(size_t bufferLength) :
19-
m_putArea(bufferLength), // we access [0] of the put area below so we must initialize it.
20-
m_eofInput(false),
21-
m_eofOutput(false)
22+
m_putArea(bufferLength) // we access [0] of the put area below so we must initialize it.
2223
{
2324
m_getArea.reserve(bufferLength);
2425
m_backbuf.reserve(bufferLength);
@@ -163,7 +164,7 @@ namespace Aws
163164
if (!lock.try_lock())
164165
{
165166
// don't block consumer, it will retry asking later
166-
return 'z'; // just returning some valid value other than EOF
167+
return noData;
167168
}
168169

169170
if (m_eofInput && m_backbuf.empty())
@@ -187,10 +188,10 @@ namespace Aws
187188
char* gbegin = reinterpret_cast<char*>(m_getArea.data());
188189
setg(gbegin, gbegin, gbegin + m_getArea.size());
189190

190-
if (!m_getArea.empty())
191+
if (!m_getArea.empty()) {
191192
return std::char_traits<char>::to_int_type(*gptr());
192-
else
193-
return 'a'; // just returning some valid value other than EOF
193+
}
194+
return noData;
194195
}
195196

196197
int ConcurrentStreamBuf::uflow()

src/aws-cpp-sdk-core/source/utils/threading/DefaultExecutor.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ void DefaultExecutor::impl::Detach(std::thread::id id) {
126126
assert(it != m_tasks.end());
127127
it->second.first.detach();
128128
m_tasks.erase(it);
129-
m_state = State::Free;
130129
m_cv.notify_one();
131130
}
132131

tests/aws-cpp-sdk-transcribestreaming-integ-tests/TranscribeErrorCaseTests.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ TEST_F(TranscribeStreamingErrorTests, TranscribeTerminateByLowSpeedLimit) {
217217
}
218218
TestTrace(Aws::String("Writing good event"));
219219
if (!stream.WriteAudioEvent(event)) {
220-
AWS_ADD_FAILURE("Failed to write an audio event");
220+
// the stream may be force closed by timeout, no test assertion here.
221221
break;
222222
}
223223
}

0 commit comments

Comments
 (0)