Skip to content

Fix apparent race condition between message processing and Cluster.scheduler_info #9064

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

uellue
Copy link

@uellue uellue commented Apr 30, 2025

When messages regarding removed workers arrive too late, Cluster.scheduler_info might have been updated by Cluster._wait_for_workers(). That means looking up the worker name and del ... will fail with a KeyError

With this fix, SpecCluster._update_worker_status() doesn't depend on looking up the name, but gets it supplied in the message.

Furthermore, Cluster._update_worker_status() is made robust against a missing key.

The issue is easy to reproduce on a 24-core EPYC with the added test case. On machines with less cores (tested on quad core laptop) it is hard to reproduce. It seems to appear more often at high system load, such as a video conference.

I still don't fully understand the implications of adding an argument to remove_worker(). Maybe someone with better understanding can chime in?

Unfortunately, following https://distributed.dask.org/en/latest/develop.html#test to run the tests threw a long list of errors on my machine and didn't run any tests. Perhaps someone can help with testing this and also perhaps help adapt the test case to the common practice?

Thx @sk1p for writing the minimal reproducer!

Refs LiberTEM/LiberTEM#1690 where the flakiness appeared in tests.

Closes #xxxx

  • Tests added / passed
  • Passes pre-commit run --all-files

Scaling can fail when discrepancies between Cluster.scheduler_info
and delayed incoming messages develop: The delayed message references
a worker that is not in scheduler_info anymore since that has been updated by
wait_for_workers().
sk1p added a commit to matbryan52/LiberTEM that referenced this pull request Apr 30, 2025
Shutting down a spec cluster shortly after scaling down and up results in
`KeyError`s. Instead, explicitly synchronize after scaling down, such
that all distributed components have "settled", and we can be more
confident that a shutdown in the near future will succeed.

(as I understand it, "remove" ops from scaling down are still coming in
when shutting down, and they are no longer expected, because the state
was changed by scaling up again in between, forgetting about the workers
that were in the process of being removed)

See dask/distributed#9064 for the upstream
reproducer and proposed fix
Copy link
Contributor

github-actions bot commented Apr 30, 2025

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    21 files  ± 0      21 suites  ±0   8h 26m 9s ⏱️ + 3m 3s
 4 087 tests + 1   3 972 ✅  - 1    111 💤 ±0  4 ❌ +2 
39 327 runs  +10  37 426 ✅ +9  1 897 💤  - 1  4 ❌ +2 

For more details on these failures, see this check.

Results for commit b9401b5. ± Comparison against base commit 0e3b344.

♻️ This comment has been updated with latest results.

Copy link
Member

@jacobtomlinson jacobtomlinson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for picking this up! It looks like your new test is hanging in CI.

Comment on lines 343 to 344
@pytest.mark.slow
def test_stress_scale():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have any strict guidelines regarding test runtimes but this one looks like it could easily run minutes on CI. If that's the case, this would be a -1 for me. This edge case isn't worth adding minutes to CI runtime.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I fully agree to not run this routinely in CI. I was hoping that placing it in test_stress.py and marking it as slow would accomplish that. Is there a place or a mark for tests that are run on demand, for example only on a RC?

The test is best run on a machine with many cores anyway. Using supercomputers for CI on scientific codes, including Continuous Benchmarking, is a topic for our supercomputing guys at Forschungszentrum Jülich, by the way. Since Dask is a pretty fundamental project for Python data science, maybe sth can be worked out in this direction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the test case should fail even with less workers. Reproducing on a 24 core system took only a few seconds, and in our CI the test was flaky even with two processes. Maybe artificially slowing down the worker scale down somehow could help make reproduction more likely?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Managed to make reproduction more likely with a delay in message processing. Thx for the idea! 👍

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is now much faster, times from my laptop:

13.45s call     distributed/tests/test_stress.py::test_chaos_rechunk
13.28s call     distributed/tests/test_stress.py::test_close_connections
10.07s call     distributed/tests/test_stress.py::test_stress_creation_and_deletion
7.91s call     distributed/tests/test_stress.py::test_stress_gc[inc-1000]
5.38s call     distributed/tests/test_stress.py::test_cancel_stress_sync

4.09s call     distributed/tests/test_stress.py::test_stress_scale

3.83s call     distributed/tests/test_stress.py::test_stress_gc[slowinc-100]
3.09s call     distributed/tests/test_stress.py::test_cancel_stress
1.00s call     distributed/tests/test_stress.py::test_stress_scatter_death
0.69s call     distributed/tests/test_stress.py::test_stress_1
0.03s call     distributed/tests/test_stress.py::test_no_delay_during_large_transfer

What is the threshold to be marked as "slow"?

sk1p added a commit to LiberTEM/LiberTEM that referenced this pull request May 5, 2025
Shutting down a spec cluster shortly after scaling down and up results in
`KeyError`s. Instead, explicitly synchronize after scaling down, such
that all distributed components have "settled", and we can be more
confident that a shutdown in the near future will succeed.

(as I understand it, "remove" ops from scaling down are still coming in
when shutting down, and they are no longer expected, because the state
was changed by scaling up again in between, forgetting about the workers
that were in the process of being removed)

See dask/distributed#9064 for the upstream
reproducer and proposed fix
Monkeypatch a slight delay and opening to run other async tasks
into worker status message processing.

In tests, this made it much more likely to trigger the error: Usually first time
with five workers on a 24 core EPYC. That means hopefully much faster and no repetitions needed.

To be confirmed on a quad core laptop!
@uellue
Copy link
Author

uellue commented May 8, 2025

Thx for your feedback! I've tried to address your feedback and make pre-commit happy as well.

@uellue uellue changed the title Fix apparent race condition between message processing and Cluster.scheduler_info WIP: Fix apparent race condition between message processing and Cluster.scheduler_info May 8, 2025
@uellue
Copy link
Author

uellue commented May 8, 2025

Ill work in another change to prevent lockup of the test, will update here tomorrow. Please don't merge just yet, WIP! :-)

uellue and others added 7 commits May 9, 2025 10:38
For some reason the context manager may lock up the test run. This way it terminates.

Confirmed that the current test is likely to trigger the issue also on a quad-core laptop.
SpecCluster.scheduler_info might be updated by Cluster._wait_for_workers() while messages are in flight. When scaling down
this can lead to a KeyError upon trying to get the worker's name from scheduler_info if that
worker is not present in scheduler_info anymore.

As a fix, this sends both worker key and name in the message queue so that
the name doesn't need to be looked up.
TODO: Fully understand impact of adding the name parameter on other functions.
del raises KeyError, two argument pop not.
mypy and black mostly
@uellue uellue changed the title WIP: Fix apparent race condition between message processing and Cluster.scheduler_info Fix apparent race condition between message processing and Cluster.scheduler_info May 9, 2025
@uellue
Copy link
Author

uellue commented May 9, 2025

Done! FYI, reason for force push is that we rewrote the git history so that it contains first the test case, then the fix for easier testing.

@uellue
Copy link
Author

uellue commented May 15, 2025

The missing coverage for the diff seems to be a glitch. I don't see a connection between the other test failures and the change. If there is any, I'd need some help to understand.

@jacobtomlinson
Copy link
Member

Rerunning CI

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants