Skip to content

Commit 5ede61b

Browse files
PR follow-up: Add WaitUntilStopped to the base interface class
1 parent 08ae1c8 commit 5ede61b

File tree

2 files changed

+20
-8
lines changed

2 files changed

+20
-8
lines changed

src/aws-cpp-sdk-core/include/aws/core/utils/threading/Executor.h

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ namespace Aws
4848
return SubmitToThread(std::move(callable));
4949
}
5050

51+
/**
52+
* Call to wait until all tasks have finished.
53+
*/
54+
virtual void WaitUntilStopped() { return; };
55+
5156
protected:
5257
/**
5358
* To implement your own executor implementation, then simply subclass Executor and implement this method.
@@ -64,6 +69,8 @@ namespace Aws
6469
public:
6570
DefaultExecutor() : m_state(State::Free) {}
6671
~DefaultExecutor();
72+
73+
void WaitUntilStopped() override;
6774
protected:
6875
enum class State
6976
{
@@ -102,25 +109,25 @@ namespace Aws
102109
/**
103110
* Call to ensure the threadpool can be safely destroyed. It blocks until all threads finished.
104111
*/
105-
void WaitUntilStopped();
112+
void WaitUntilStopped() override;
106113

107114
protected:
108115
bool SubmitToThread(std::function<void()>&&) override;
109116

110117
private:
111118
Aws::Queue<std::function<void()>*> m_tasks;
112-
std::mutex m_queueLock;
119+
mutable std::mutex m_queueLock;
113120
Aws::Utils::Threading::Semaphore m_sync;
114121
Aws::Vector<ThreadTask*> m_threadTaskHandles;
115-
size_t m_poolSize;
116-
OverflowPolicy m_overflowPolicy;
122+
size_t m_poolSize = 0;
123+
OverflowPolicy m_overflowPolicy = OverflowPolicy::QUEUE_TASKS_EVENLY_ACROSS_THREADS;
117124
bool m_stopped{false};
118125

119126
/**
120127
* Once you call this, you are responsible for freeing the memory pointed to by task.
121128
*/
122129
std::function<void()>* PopTask();
123-
bool HasTasks();
130+
bool HasTasks() const;
124131

125132
friend class ThreadTask;
126133
};

src/aws-cpp-sdk-core/source/utils/threading/Executor.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,20 @@ void DefaultExecutor::Detach(std::thread::id id)
6060
while(expected != State::Shutdown);
6161
}
6262

63-
DefaultExecutor::~DefaultExecutor()
63+
void DefaultExecutor::WaitUntilStopped()
6464
{
6565
auto expected = State::Free;
6666
while(!m_state.compare_exchange_strong(expected, State::Shutdown))
6767
{
6868
//spin while currently detaching threads finish
6969
assert(expected == State::Locked);
70-
expected = State::Free;
70+
expected = State::Free;
7171
}
72+
}
73+
74+
DefaultExecutor::~DefaultExecutor()
75+
{
76+
WaitUntilStopped();
7277

7378
auto it = m_threads.begin();
7479
while(!m_threads.empty())
@@ -162,7 +167,7 @@ std::function<void()>* PooledThreadExecutor::PopTask()
162167
return nullptr;
163168
}
164169

165-
bool PooledThreadExecutor::HasTasks()
170+
bool PooledThreadExecutor::HasTasks() const
166171
{
167172
std::lock_guard<std::mutex> locker(m_queueLock);
168173
return m_tasks.size() > 0;

0 commit comments

Comments
 (0)