Skip to content

[DO NOT MERGE] Bisect: multitenancy test at 886ae8b8c9~1 (before BlockEntry)#64203

Closed
TimothySeah wants to merge 3 commits into
ray-project:masterfrom
TimothySeah:tseah/bisect-before-blockentry
Closed

[DO NOT MERGE] Bisect: multitenancy test at 886ae8b8c9~1 (before BlockEntry)#64203
TimothySeah wants to merge 3 commits into
ray-project:masterfrom
TimothySeah:tseah/bisect-before-blockentry

Conversation

@TimothySeah

Copy link
Copy Markdown
Contributor

Multitenancy release test on master @ 886ae8b8c9~1 (just before #63654 'Introduce BlockEntry on RefBundle') with PR #63375 cherry-picked on top. Pin/PASS decides whether #63654 caused the regression.

@TimothySeah TimothySeah requested a review from a team as a code owner June 18, 2026 05:35

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces multitenancy support to the Ray Data cluster autoscaler by allowing resource requests and node resources to be bucketed by a subcluster label (__subcluster__). It also adds a multitenancy variant of the heterogeneous memory batch inference release test. The code review feedback is highly constructive, pointing out critical key mismatches between 'subcluster' and 'subcluster' across the benchmark scripts and YAML configuration. It also identifies potential bugs, such as strict inequality mismatches between None and empty dictionaries in subcluster selectors, unpropagated exceptions in benchmark threads, and a PEP 8 violation regarding lambda assignment.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +102 to +104
ray.data.DataContext.get_current().execution_options.label_selector = {
"subcluster": subcluster
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a key mismatch between the label selector key used in the dataset pipeline ("subcluster") and the key expected by the cluster autoscaler ("__subcluster__" as defined by SUBCLUSTER_LABEL_KEY in default_autoscaling_coordinator.py).

Because of this mismatch, the autoscaler will fail to associate the resource requests and node resources with their respective subclusters, effectively bypassing the multitenancy isolation logic at the autoscaler level.

Please update the key to "__subcluster__" to ensure consistency.

Suggested change
ray.data.DataContext.get_current().execution_options.label_selector = {
"subcluster": subcluster
}
ray.data.DataContext.get_current().execution_options.label_selector = {
"__subcluster__": subcluster
}


if subcluster is not None:
# Also pin on the Dataset's own context so chained ops inherit it.
ds.context.execution_options.label_selector = {"subcluster": subcluster}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Update the label selector key here to "__subcluster__" as well to match the autoscaler's expected key.

Suggested change
ds.context.execution_options.label_selector = {"subcluster": subcluster}
ds.context.execution_options.label_selector = {"__subcluster__": subcluster}

Comment on lines +20 to +21
labels:
subcluster: tenant_a

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Update the node label key from subcluster to __subcluster__ to match the SUBCLUSTER_LABEL_KEY expected by the cluster autoscaler. Please apply this change to all node groups in this YAML file.

      labels:
        __subcluster__: tenant_a

raise after metrics have been written.
"""
nodes = list_nodes(detail=True, limit=1000)
node_subcluster = {n.node_id: (n.labels or {}).get("subcluster") for n in nodes}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Update the label key from "subcluster" to "__subcluster__" to match the autoscaler's expected key.

Suggested change
node_subcluster = {n.node_id: (n.labels or {}).get("subcluster") for n in nodes}
node_subcluster = {n.node_id: (n.labels or {}).get("__subcluster__") for n in nodes}

node_sc = node_subcluster.get(t.node_id)
if node_sc in SUBCLUSTERS:
tasks_on_labeled += 1
want = _selector_for(t).get("subcluster")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Update the label key from "subcluster" to "__subcluster__" to match the autoscaler's expected key.

Suggested change
want = _selector_for(t).get("subcluster")
want = _selector_for(t).get("__subcluster__")

Comment on lines +284 to +292
if (
requester_id in self._subcluster_selectors
and self._subcluster_selectors[requester_id] != subcluster_selector
):
raise ValueError(
"Cannot change subcluster_selector of an ongoing request "
f"from {self._subcluster_selectors[requester_id]!r} to "
f"{subcluster_selector!r}."
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In Python, None and {} are semantically equivalent when representing an empty or missing subcluster selector, but they are not equal under strict inequality (!=). If a requester first registers with subcluster_selector=None and later calls request_resources with subcluster_selector={}, the strict inequality check self._subcluster_selectors[requester_id] != subcluster_selector will evaluate to True and raise an unexpected ValueError.

To prevent this, normalize both selectors to a dictionary (e.g., using or {}) before comparing them.

Suggested change
if (
requester_id in self._subcluster_selectors
and self._subcluster_selectors[requester_id] != subcluster_selector
):
raise ValueError(
"Cannot change subcluster_selector of an ongoing request "
f"from {self._subcluster_selectors[requester_id]!r} to "
f"{subcluster_selector!r}."
)
if (
requester_id in self._subcluster_selectors
and self._subcluster_selectors[requester_id] != (subcluster_selector or {})
):
raise ValueError(
"Cannot change subcluster_selector of an ongoing request "
f"from {self._subcluster_selectors[requester_id]!r} to "
f"{subcluster_selector!r}."
)

Comment on lines +79 to +96
def run_concurrent(args: argparse.Namespace) -> Dict[str, float]:
"""Run both tenants on threads concurrently; return per-tenant wall-time."""
per_tenant: Dict[str, float] = {}

def _run(sc: str) -> None:
t0 = time.perf_counter()
run_pipeline(sc, args)
per_tenant[sc] = time.perf_counter() - t0

threads = [
threading.Thread(target=_run, args=(sc,), name=f"pipeline-{sc}")
for sc in SUBCLUSTERS
]
for t in threads:
t.start()
for t in threads:
t.join()
return per_tenant

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In Python, exceptions raised inside a threading.Thread do not propagate to the main thread when calling join(). If one of the concurrent pipelines fails or crashes, the main thread will continue executing silently, which can lead to misleading test results or false positives.

To prevent this, capture any exceptions raised in the threads and raise a RuntimeError in the main thread if any failures occurred.

def run_concurrent(args: argparse.Namespace) -> Dict[str, float]:
    """Run both tenants on threads concurrently; return per-tenant wall-time."""
    per_tenant: Dict[str, float] = {}
    errors: List[Exception] = []

    def _run(sc: str) -> None:
        try:
            t0 = time.perf_counter()
            run_pipeline(sc, args)
            per_tenant[sc] = time.perf_counter() - t0
        except Exception as e:
            errors.append(e)

    threads = [
        threading.Thread(target=_run, args=(sc,), name=f"pipeline-{sc}")
        for sc in SUBCLUSTERS
    ]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    if errors:
        raise RuntimeError(f"Concurrent pipeline run failed: {errors}")
    return per_tenant

)
# Write subcluster after all validations so a rejected call
# never leaves the registry on a new subcluster.
self._subcluster_selectors[requester_id] = subcluster_selector

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Normalize subcluster_selector to an empty dictionary when storing it in self._subcluster_selectors to ensure consistency with the comparison check and avoid potential None vs {} mismatches.

Suggested change
self._subcluster_selectors[requester_id] = subcluster_selector
self._subcluster_selectors[requester_id] = subcluster_selector or {}

Comment on lines +240 to +242
get_node_counts = lambda: _get_node_resource_spec_and_count( # noqa: E731
subcluster=subcluster
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

According to PEP 8, you should always use a def statement instead of an assignment statement that binds a lambda expression directly to an identifier (e.g., get_node_counts = lambda: ...). Using a named local function is cleaner and avoids the need for the # noqa: E731 suppression comment.

            def get_node_counts():
                return _get_node_resource_spec_and_count(subcluster=subcluster)

@TimothySeah TimothySeah marked this pull request as draft June 18, 2026 08:06
TimothySeah and others added 2 commits June 18, 2026 01:09
…r resources by subcluster label (ray-project#63375)

The end goal is to support 2 ray data datasets in 1 cluster with
subcluster label scheduling. In such a setup, we have 2 datasets sharing the same AutoscalingCoordinator. The previous PR in
this stack (ray-project#63331) made sure
that each dataset's tasks ended up in the correct subcluster. This PR
ensures that all requesters, whether they are trainers or datasets, only
request and receive resources in their subcluster.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
Adds the 4 files from the 71d95 baseline branch needed to run the
heterogeneous_memory_batch_inference_multitenancy release test. Includes
the release_data_tests.yaml entry so the test is actually scheduled.

Signed-off-by: Timothy Seah <tseah@anyscale.com>
@TimothySeah TimothySeah force-pushed the tseah/bisect-before-blockentry branch from 5e1a0cb to 6639bcd Compare June 18, 2026 08:10
The merged 5d2c4e7 shipped with SUBCLUSTER_LABEL_KEY = "__subcluster__",
which was reverted to "subcluster" in a later master PR. The YAML in
this bisect branch (copied from the 71d95 baseline) uses "subcluster:".
Realign code to YAML so autoscaler routing actually activates for the
bisect test.

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant