Skip to content

Commit 08ae1c8

Browse files
František BořánekSergeyRyabinin
František Bořánek
authored andcommitted
feat: added PooledThreadExecutor::WaitUntilStopped function
1 parent 2d9c1cf commit 08ae1c8

File tree

2 files changed

+17
-2
lines changed

2 files changed

+17
-2
lines changed

src/aws-cpp-sdk-core/include/aws/core/utils/threading/Executor.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ namespace Aws
9999
PooledThreadExecutor(PooledThreadExecutor&&) = delete;
100100
PooledThreadExecutor& operator =(PooledThreadExecutor&&) = delete;
101101

102+
/**
103+
* Call to ensure the threadpool can be safely destroyed. It blocks until all threads finished.
104+
*/
105+
void WaitUntilStopped();
106+
102107
protected:
103108
bool SubmitToThread(std::function<void()>&&) override;
104109

@@ -109,6 +114,7 @@ namespace Aws
109114
Aws::Vector<ThreadTask*> m_threadTaskHandles;
110115
size_t m_poolSize;
111116
OverflowPolicy m_overflowPolicy;
117+
bool m_stopped{false};
112118

113119
/**
114120
* Once you call this, you are responsible for freeing the memory pointed to by task.

src/aws-cpp-sdk-core/source/utils/threading/Executor.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,15 @@ PooledThreadExecutor::PooledThreadExecutor(size_t poolSize, OverflowPolicy overf
8989

9090
PooledThreadExecutor::~PooledThreadExecutor()
9191
{
92+
WaitUntilStopped();
93+
}
94+
95+
void PooledThreadExecutor::WaitUntilStopped()
96+
{
97+
{
98+
std::lock_guard<std::mutex> locker(m_queueLock);
99+
m_stopped = true;
100+
}
92101
for(auto threadTask : m_threadTaskHandles)
93102
{
94103
threadTask->StopProcessingWork();
@@ -100,6 +109,7 @@ PooledThreadExecutor::~PooledThreadExecutor()
100109
{
101110
Aws::Delete(threadTask);
102111
}
112+
m_threadTaskHandles.clear();
103113

104114
while(m_tasks.size() > 0)
105115
{
@@ -111,7 +121,6 @@ PooledThreadExecutor::~PooledThreadExecutor()
111121
Aws::Delete(fn);
112122
}
113123
}
114-
115124
}
116125

117126
bool PooledThreadExecutor::SubmitToThread(std::function<void()>&& fn)
@@ -122,7 +131,7 @@ bool PooledThreadExecutor::SubmitToThread(std::function<void()>&& fn)
122131
{
123132
std::lock_guard<std::mutex> locker(m_queueLock);
124133

125-
if (m_overflowPolicy == OverflowPolicy::REJECT_IMMEDIATELY && m_tasks.size() >= m_poolSize)
134+
if (m_stopped || (m_overflowPolicy == OverflowPolicy::REJECT_IMMEDIATELY && m_tasks.size() >= m_poolSize))
126135
{
127136
Aws::Delete(fnCpy);
128137
return false;

0 commit comments

Comments
 (0)