Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/ucx/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,16 @@ class communicator_impl : public communicator_base<communicator_impl>
}
else
{
while (ucp_worker_progress(m_recv_worker->get())) {}
for (unsigned int i=0; i<10; ++i)
{
if (m_mutex.try_lock())
{
auto p = ucp_worker_progress(m_recv_worker->get());
m_mutex.unlock();
if (!p) break;
}
}
//while (ucp_worker_progress(m_recv_worker->get())) {}
}
// work through ready recv callbacks, which were pushed to the queue by other threads
// (including this thread)
Expand Down
4 changes: 3 additions & 1 deletion src/ucx/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class context_impl : public context_base
if (m_thread_safe) m_mutex.lock();
ucp_request_cancel(m_worker->get(), s->m_ucx_ptr);
while (ucp_worker_progress(m_worker->get())) {}
if (m_thread_safe) m_mutex.unlock();

// check whether the cancelled callback was enqueued by consuming all queued cancelled
// callbacks and putting them in a temporary vector
bool found = false;
Expand All @@ -212,6 +212,8 @@ class context_impl : public context_base
for (auto x : m_cancel_recv_req_vec)
while (!m_cancel_recv_req_queue.push(x)) {}

if (m_thread_safe) m_mutex.unlock();

// delete callback here if it was actually cancelled
if (found)
{
Expand Down