Skip to content

Commit

Permalink
[Ray data optimization] Make min num chunks to trigger combine config…
Browse files Browse the repository at this point in the history
…urable (#36)

Combine chunks will be expensive and not needed as the number chunk
won't be more than 10 or so. Note that this threshold has been updated
to 10 in latest master on Ray. Signed-off-by: ShaochenYu-YW
<[email protected]>

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: ShaochenYu-YW <[email protected]>
  • Loading branch information
ShaochenYu-YW authored Feb 11, 2025
1 parent b1152ea commit 5699d59
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion python/ray/data/_internal/batcher.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import logging
import os

from typing import Optional

from ray.data._internal.arrow_block import ArrowBlockAccessor
Expand All @@ -11,7 +14,8 @@
# See https://github.com/ray-project/ray/issues/31108 for more details.
# TODO(jjyao): remove this once
# https://github.com/apache/arrow/issues/35126 is resolved.
MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS = 2
# Make this configurable to void unnecessary combine chunks
MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS = int(os.getenv('RAY_DATA_MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS', 2))

# Delay compaction until the shuffle buffer has reached this ratio over the min
# shuffle buffer size. Setting this to 1 minimizes memory usage, at the cost of
Expand All @@ -20,6 +24,8 @@
SHUFFLE_BUFFER_COMPACTION_RATIO = 1.5


logger = logging.getLogger(__name__)

class BatcherInterface:
def add(self, block: Block):
"""Add a block to the block buffer.
Expand Down Expand Up @@ -142,6 +148,7 @@ def next_batch(self) -> Block:
and block.column(0).num_chunks
>= MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS
):
logger.warning(f"Detected combine_chunks in Batcher. Number of chunks: {block.column(0).num_chunks}, threshold: {MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS}")
accessor = BlockAccessor.for_block(
transform_pyarrow.combine_chunks(block)
)
Expand Down Expand Up @@ -313,6 +320,7 @@ def next_batch(self) -> Block:
and self._shuffle_buffer.column(0).num_chunks
>= MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS
):
logger.warning(f"Detected combine_chunks in ShuffleBatcher. Number of chunks: {block.column(0).num_chunks}, threshold: {MIN_NUM_CHUNKS_TO_TRIGGER_COMBINE_CHUNKS}")
self._shuffle_buffer = transform_pyarrow.combine_chunks(
self._shuffle_buffer
)
Expand Down

0 comments on commit 5699d59

Please sign in to comment.