From b43b0092be430427d5b5606f6657b03fe313062c Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Fri, 25 Jul 2025 22:37:49 -0700 Subject: [PATCH 01/10] clean --- src/xdist/scheduler/loadscope.py | 2 ++ src/xdist/workermanage.py | 12 +++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/xdist/scheduler/loadscope.py b/src/xdist/scheduler/loadscope.py index 0a01cb49..de87d231 100644 --- a/src/xdist/scheduler/loadscope.py +++ b/src/xdist/scheduler/loadscope.py @@ -162,6 +162,8 @@ def add_node(self, node: WorkerController) -> None: """ assert node not in self.assigned_work self.assigned_work[node] = {} + # sort by gw id + self.assigned_work = dict(sorted(self.assigned_work.items(), key=lambda item: item[0].gateway.id)) def remove_node(self, node: WorkerController) -> str | None: """Remove a node from the scheduler. diff --git a/src/xdist/workermanage.py b/src/xdist/workermanage.py index 201c8e71..fc0c6c56 100644 --- a/src/xdist/workermanage.py +++ b/src/xdist/workermanage.py @@ -1,6 +1,7 @@ from __future__ import annotations from collections.abc import Sequence +from concurrent.futures import ThreadPoolExecutor import enum import fnmatch import os @@ -94,15 +95,24 @@ def setup_nodes( ) -> list[WorkerController]: self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs) self.trace("setting up nodes") - return [self.setup_node(spec, putevent) for spec in self.specs] + with ThreadPoolExecutor(max_workers=len(self.specs)) as executor: + futs = [ + executor.submit(self.setup_node, spec, putevent, idx) + for idx, spec in enumerate(self.specs) + ] + return [f.result() for f in futs] + # return [self.setup_node(spec, putevent) for spec in self.specs] def setup_node( self, spec: execnet.XSpec, putevent: Callable[[tuple[str, dict[str, Any]]], None], + idx: int | None = None, ) -> WorkerController: if getattr(spec, "execmodel", None) != "main_thread_only": spec = execnet.XSpec(f"execmodel=main_thread_only//{spec}") + # if idx is not None: + # spec = execnet.XSpec(f"{spec}//id=gw{idx}") gw = self.group.makegateway(spec) self.config.hook.pytest_xdist_newgateway(gateway=gw) self.rsync_roots(gw) From f8b3f5e0ea6baa443576b06a2c911aba7528e6dc Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Fri, 25 Jul 2025 22:42:11 -0700 Subject: [PATCH 02/10] fix acceptance_test --- src/xdist/scheduler/loadscope.py | 1 - src/xdist/workermanage.py | 4 ++++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/xdist/scheduler/loadscope.py b/src/xdist/scheduler/loadscope.py index de87d231..1286332b 100644 --- a/src/xdist/scheduler/loadscope.py +++ b/src/xdist/scheduler/loadscope.py @@ -162,7 +162,6 @@ def add_node(self, node: WorkerController) -> None: """ assert node not in self.assigned_work self.assigned_work[node] = {} - # sort by gw id self.assigned_work = dict(sorted(self.assigned_work.items(), key=lambda item: item[0].gateway.id)) def remove_node(self, node: WorkerController) -> str | None: diff --git a/src/xdist/workermanage.py b/src/xdist/workermanage.py index fc0c6c56..b1af81c5 100644 --- a/src/xdist/workermanage.py +++ b/src/xdist/workermanage.py @@ -93,6 +93,10 @@ def setup_nodes( self, putevent: Callable[[tuple[str, dict[str, Any]]], None], ) -> list[WorkerController]: + # create basetemp directory only once + if hasattr(self.config, "_tmp_path_factory"): + self.config._tmp_path_factory.getbasetemp() + self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs) self.trace("setting up nodes") with ThreadPoolExecutor(max_workers=len(self.specs)) as executor: From 82f215e6fbd893b196d74d96d6ebd066be1c8ba8 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Fri, 25 Jul 2025 23:05:16 -0700 Subject: [PATCH 03/10] fix everything --- src/xdist/dsession.py | 2 ++ src/xdist/workermanage.py | 19 ++++++++++++++++--- testing/test_workermanage.py | 4 ++-- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 5bf7d980..5dc94dcd 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -188,6 +188,7 @@ def worker_workerready( node.shutdown() else: assert self.sched is not None + print('scheduler:', self.sched.__class__.__name__) self.sched.add_node(node) def worker_workerfinished(self, node: WorkerController) -> None: @@ -522,6 +523,7 @@ def pytest_xdist_setupnodes(self, specs: Sequence[execnet.XSpec]) -> None: @pytest.hookimpl def pytest_xdist_newgateway(self, gateway: execnet.Gateway) -> None: + print('pytest_xdist_newgateway', gateway.id, gateway.spec) if self.config.option.verbose > 0: rinfo = gateway._rinfo() different_interpreter = rinfo.executable != sys.executable diff --git a/src/xdist/workermanage.py b/src/xdist/workermanage.py index b1af81c5..c50bba5b 100644 --- a/src/xdist/workermanage.py +++ b/src/xdist/workermanage.py @@ -99,12 +99,17 @@ def setup_nodes( self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs) self.trace("setting up nodes") + import threading + lock = threading.Lock() with ThreadPoolExecutor(max_workers=len(self.specs)) as executor: futs = [ - executor.submit(self.setup_node, spec, putevent, idx) + executor.submit(self.setup_node, spec, putevent, idx, lock) for idx, spec in enumerate(self.specs) ] - return [f.result() for f in futs] + results = [f.result() for f in futs] + for r in results: + self.config.hook.pytest_xdist_newgateway(gateway=r.gateway) + return results # return [self.setup_node(spec, putevent) for spec in self.specs] def setup_node( @@ -112,13 +117,21 @@ def setup_node( spec: execnet.XSpec, putevent: Callable[[tuple[str, dict[str, Any]]], None], idx: int | None = None, + lock = None, ) -> WorkerController: + if lock is None: + import threading + lock = threading.Lock() if getattr(spec, "execmodel", None) != "main_thread_only": spec = execnet.XSpec(f"execmodel=main_thread_only//{spec}") # if idx is not None: # spec = execnet.XSpec(f"{spec}//id=gw{idx}") + print('theoretical gateway id', idx, spec.id) gw = self.group.makegateway(spec) - self.config.hook.pytest_xdist_newgateway(gateway=gw) + # with lock: + # print('calling pytest_xdist_newgateway with gateway id', gw.id) + # self.config.hook.pytest_xdist_newgateway(gateway=gw) + print(f"setup_node: {gw} {spec}") self.rsync_roots(gw) node = WorkerController(self, gw, self.config, putevent) # Keep the node alive. diff --git a/testing/test_workermanage.py b/testing/test_workermanage.py index b3e8a1c7..968c99c6 100644 --- a/testing/test_workermanage.py +++ b/testing/test_workermanage.py @@ -51,7 +51,7 @@ def dest(tmp_path: Path) -> Path: def workercontroller(monkeypatch: pytest.MonkeyPatch) -> None: class MockController: def __init__(self, *args: object) -> None: - pass + self.gateway = args[1] def setup(self) -> None: pass @@ -83,7 +83,7 @@ def test_popen_makegateway_events( assert len(call.specs) == 2 call = hookrecorder.popcall("pytest_xdist_newgateway") - assert call.gateway.spec == execnet.XSpec("execmodel=main_thread_only//popen") + # assert call.gateway.spec == execnet.XSpec("execmodel=main_thread_only//popen") assert call.gateway.id == "gw0" call = hookrecorder.popcall("pytest_xdist_newgateway") assert call.gateway.id == "gw1" From 23f52e9799ca8d0c09d105b5262b599659712f1a Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Fri, 25 Jul 2025 23:06:29 -0700 Subject: [PATCH 04/10] clean up --- src/xdist/workermanage.py | 16 +--------------- testing/test_workermanage.py | 2 +- 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/src/xdist/workermanage.py b/src/xdist/workermanage.py index c50bba5b..46dc9219 100644 --- a/src/xdist/workermanage.py +++ b/src/xdist/workermanage.py @@ -99,39 +99,25 @@ def setup_nodes( self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs) self.trace("setting up nodes") - import threading - lock = threading.Lock() with ThreadPoolExecutor(max_workers=len(self.specs)) as executor: futs = [ - executor.submit(self.setup_node, spec, putevent, idx, lock) + executor.submit(self.setup_node, spec, putevent, idx) for idx, spec in enumerate(self.specs) ] results = [f.result() for f in futs] for r in results: self.config.hook.pytest_xdist_newgateway(gateway=r.gateway) return results - # return [self.setup_node(spec, putevent) for spec in self.specs] def setup_node( self, spec: execnet.XSpec, putevent: Callable[[tuple[str, dict[str, Any]]], None], idx: int | None = None, - lock = None, ) -> WorkerController: - if lock is None: - import threading - lock = threading.Lock() if getattr(spec, "execmodel", None) != "main_thread_only": spec = execnet.XSpec(f"execmodel=main_thread_only//{spec}") - # if idx is not None: - # spec = execnet.XSpec(f"{spec}//id=gw{idx}") - print('theoretical gateway id', idx, spec.id) gw = self.group.makegateway(spec) - # with lock: - # print('calling pytest_xdist_newgateway with gateway id', gw.id) - # self.config.hook.pytest_xdist_newgateway(gateway=gw) - print(f"setup_node: {gw} {spec}") self.rsync_roots(gw) node = WorkerController(self, gw, self.config, putevent) # Keep the node alive. diff --git a/testing/test_workermanage.py b/testing/test_workermanage.py index 968c99c6..144416d0 100644 --- a/testing/test_workermanage.py +++ b/testing/test_workermanage.py @@ -83,7 +83,7 @@ def test_popen_makegateway_events( assert len(call.specs) == 2 call = hookrecorder.popcall("pytest_xdist_newgateway") - # assert call.gateway.spec == execnet.XSpec("execmodel=main_thread_only//popen") + assert call.gateway.spec == execnet.XSpec("execmodel=main_thread_only//popen") assert call.gateway.id == "gw0" call = hookrecorder.popcall("pytest_xdist_newgateway") assert call.gateway.id == "gw1" From f49c64d0aa2cd94ccaa52725eefd7a5ce6c2ef22 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Fri, 25 Jul 2025 23:07:20 -0700 Subject: [PATCH 05/10] moree --- src/xdist/dsession.py | 1 - src/xdist/workermanage.py | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 5dc94dcd..7aefb044 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -523,7 +523,6 @@ def pytest_xdist_setupnodes(self, specs: Sequence[execnet.XSpec]) -> None: @pytest.hookimpl def pytest_xdist_newgateway(self, gateway: execnet.Gateway) -> None: - print('pytest_xdist_newgateway', gateway.id, gateway.spec) if self.config.option.verbose > 0: rinfo = gateway._rinfo() different_interpreter = rinfo.executable != sys.executable diff --git a/src/xdist/workermanage.py b/src/xdist/workermanage.py index 46dc9219..f74d163f 100644 --- a/src/xdist/workermanage.py +++ b/src/xdist/workermanage.py @@ -101,8 +101,8 @@ def setup_nodes( self.trace("setting up nodes") with ThreadPoolExecutor(max_workers=len(self.specs)) as executor: futs = [ - executor.submit(self.setup_node, spec, putevent, idx) - for idx, spec in enumerate(self.specs) + executor.submit(self.setup_node, spec, putevent) + for spec in self.specs ] results = [f.result() for f in futs] for r in results: @@ -113,7 +113,6 @@ def setup_node( self, spec: execnet.XSpec, putevent: Callable[[tuple[str, dict[str, Any]]], None], - idx: int | None = None, ) -> WorkerController: if getattr(spec, "execmodel", None) != "main_thread_only": spec = execnet.XSpec(f"execmodel=main_thread_only//{spec}") From 65d1c12a603a8ba0832513f40f456f85ece5c882 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Fri, 25 Jul 2025 23:09:38 -0700 Subject: [PATCH 06/10] fix --- src/xdist/dsession.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 7aefb044..79c463dc 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -188,7 +188,7 @@ def worker_workerready( node.shutdown() else: assert self.sched is not None - print('scheduler:', self.sched.__class__.__name__) + # print('scheduler:', self.sched.__class__.__name__) self.sched.add_node(node) def worker_workerfinished(self, node: WorkerController) -> None: From 01e30d92ba16718ea547bc857aca7bedc63b8c3e Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Fri, 25 Jul 2025 23:14:54 -0700 Subject: [PATCH 07/10] hacky --- src/xdist/workermanage.py | 6 ++---- testing/test_workermanage.py | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/xdist/workermanage.py b/src/xdist/workermanage.py index f74d163f..08bd9b6c 100644 --- a/src/xdist/workermanage.py +++ b/src/xdist/workermanage.py @@ -104,10 +104,7 @@ def setup_nodes( executor.submit(self.setup_node, spec, putevent) for spec in self.specs ] - results = [f.result() for f in futs] - for r in results: - self.config.hook.pytest_xdist_newgateway(gateway=r.gateway) - return results + return [f.result() for f in futs] def setup_node( self, @@ -117,6 +114,7 @@ def setup_node( if getattr(spec, "execmodel", None) != "main_thread_only": spec = execnet.XSpec(f"execmodel=main_thread_only//{spec}") gw = self.group.makegateway(spec) + self.config.hook.pytest_xdist_newgateway(gateway=gw) self.rsync_roots(gw) node = WorkerController(self, gw, self.config, putevent) # Keep the node alive. diff --git a/testing/test_workermanage.py b/testing/test_workermanage.py index 144416d0..b3e8a1c7 100644 --- a/testing/test_workermanage.py +++ b/testing/test_workermanage.py @@ -51,7 +51,7 @@ def dest(tmp_path: Path) -> Path: def workercontroller(monkeypatch: pytest.MonkeyPatch) -> None: class MockController: def __init__(self, *args: object) -> None: - self.gateway = args[1] + pass def setup(self) -> None: pass From fc7e83de23aca255d79f583dcc0bcb29eb1aa924 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Fri, 25 Jul 2025 23:18:32 -0700 Subject: [PATCH 08/10] fix --- testing/test_workermanage.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/testing/test_workermanage.py b/testing/test_workermanage.py index b3e8a1c7..829e8887 100644 --- a/testing/test_workermanage.py +++ b/testing/test_workermanage.py @@ -82,11 +82,11 @@ def test_popen_makegateway_events( call = hookrecorder.popcall("pytest_xdist_setupnodes") assert len(call.specs) == 2 - call = hookrecorder.popcall("pytest_xdist_newgateway") - assert call.gateway.spec == execnet.XSpec("execmodel=main_thread_only//popen") - assert call.gateway.id == "gw0" - call = hookrecorder.popcall("pytest_xdist_newgateway") - assert call.gateway.id == "gw1" + # check expected gateways + gw_calls = [hookrecorder.popcall("pytest_xdist_newgateway"), + hookrecorder.popcall("pytest_xdist_newgateway")] + assert {c.gateway.id for c in gw_calls} == {"gw0", "gw1"} + assert {c.gateway.spec for c in gw_calls} == {execnet.XSpec("execmodel=main_thread_only//popen")} assert len(hm.group) == 2 hm.teardown_nodes() assert not len(hm.group) From f89edf998901d94c4b623e473301bf3510001464 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Fri, 25 Jul 2025 23:21:15 -0700 Subject: [PATCH 09/10] clean up --- src/xdist/dsession.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 79c463dc..5bf7d980 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -188,7 +188,6 @@ def worker_workerready( node.shutdown() else: assert self.sched is not None - # print('scheduler:', self.sched.__class__.__name__) self.sched.add_node(node) def worker_workerfinished(self, node: WorkerController) -> None: From ba07d6b8e19cc402f176e561bd174d9bc8373cf9 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 26 Jul 2025 06:21:33 +0000 Subject: [PATCH 10/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/xdist/scheduler/loadscope.py | 4 +++- src/xdist/workermanage.py | 3 +-- testing/test_workermanage.py | 10 +++++++--- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/xdist/scheduler/loadscope.py b/src/xdist/scheduler/loadscope.py index 1286332b..6fddd97c 100644 --- a/src/xdist/scheduler/loadscope.py +++ b/src/xdist/scheduler/loadscope.py @@ -162,7 +162,9 @@ def add_node(self, node: WorkerController) -> None: """ assert node not in self.assigned_work self.assigned_work[node] = {} - self.assigned_work = dict(sorted(self.assigned_work.items(), key=lambda item: item[0].gateway.id)) + self.assigned_work = dict( + sorted(self.assigned_work.items(), key=lambda item: item[0].gateway.id) + ) def remove_node(self, node: WorkerController) -> str | None: """Remove a node from the scheduler. diff --git a/src/xdist/workermanage.py b/src/xdist/workermanage.py index 08bd9b6c..3d944d50 100644 --- a/src/xdist/workermanage.py +++ b/src/xdist/workermanage.py @@ -101,8 +101,7 @@ def setup_nodes( self.trace("setting up nodes") with ThreadPoolExecutor(max_workers=len(self.specs)) as executor: futs = [ - executor.submit(self.setup_node, spec, putevent) - for spec in self.specs + executor.submit(self.setup_node, spec, putevent) for spec in self.specs ] return [f.result() for f in futs] diff --git a/testing/test_workermanage.py b/testing/test_workermanage.py index 829e8887..3f729e94 100644 --- a/testing/test_workermanage.py +++ b/testing/test_workermanage.py @@ -83,10 +83,14 @@ def test_popen_makegateway_events( assert len(call.specs) == 2 # check expected gateways - gw_calls = [hookrecorder.popcall("pytest_xdist_newgateway"), - hookrecorder.popcall("pytest_xdist_newgateway")] + gw_calls = [ + hookrecorder.popcall("pytest_xdist_newgateway"), + hookrecorder.popcall("pytest_xdist_newgateway"), + ] assert {c.gateway.id for c in gw_calls} == {"gw0", "gw1"} - assert {c.gateway.spec for c in gw_calls} == {execnet.XSpec("execmodel=main_thread_only//popen")} + assert {c.gateway.spec for c in gw_calls} == { + execnet.XSpec("execmodel=main_thread_only//popen") + } assert len(hm.group) == 2 hm.teardown_nodes() assert not len(hm.group)