Skip to content

Coalesce and parallelize partial shard reads #3004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

aldenks
Copy link
Contributor

@aldenks aldenks commented Apr 22, 2025

This PR optimizes reading more than one, but not all, chunks from a shard. Towards #1758.

  • Combines requests for chunks which are nearby within a shard into a single call to the byte_getter. There's a configurable maximum gap between requests and maximum coalesced total request size -- if either of those are exceeded a new group is made.
  • The coalesced groups are requested with concurrent_map.

To be explicit, in my mind the optimizations in this have two goals:

  1. Speed up the time it takes to read many chunks from a single shard. Most of the benefit on this point comes from using concurrent_map but with especially small chunks coalescing can also help reduce request overhead.
  2. Reduce the total count of calls to the byte_getter. If the store (e.g. object storage) has a per operation charge this results in reduced costs.

Mostly these goals aren't in conflict. The maximum coalesced size option provides a knob to ensure we aren't coalescing requests into a single very large request which would be faster to make as multiple concurrent calls.

TODO:

  • Add unit tests and/or doctests in docstrings
  • Add docstrings and API docs for any new/modified user-facing classes and functions
  • New/modified features documented in docs/user-guide/*.rst
  • Changes documented as a new file in changes/
  • GitHub Actions have all passed
  • Test coverage is 100% (Codecov passes)

@github-actions github-actions bot added the needs release notes Automatically applied to PRs which haven't added release notes label Apr 22, 2025
@aldenks
Copy link
Contributor Author

aldenks commented Apr 22, 2025

I'm curious what folks think of this and looking for input! One particular design question to note: this adds a nested concurrent_map (there's already one in the codec pipeline's read/read_batch) so it's technically possible to have concurrency of async.concurrency**2. That said, I think we want some concurrency here.

@jhamman jhamman requested a review from normanrz April 22, 2025 04:51
@d-v-b
Copy link
Contributor

d-v-b commented Apr 24, 2025

@aldenks thanks for working on this, it's a great optimization direction to pursue.

That being said, since this is a performance optimization, it might be helpful to get some metrics for how much perf we get with these changes, in terms of speed and also latency / number of requests. If reducing requests is the goal, you could consider implementing a WrapperStore as a demonstration or test fixture that tracks the number of times its methods are invoked, and use that to show that your approach reduces requests. But any other demonstration would work too.

As for the specific code changes (and the nested concurrent map), nothing jumps out to me as problematic, but that might just be my ignorance of async python and the sharding code 😁

@aldenks
Copy link
Contributor Author

aldenks commented Apr 28, 2025

I'm working on a couple performance tests as Davis suggested above. Trying airspeed velocity which is new to me but looks useful.

@dcherian
Copy link
Contributor

Something like this might be more useful than asv here. A regression test of this form would also be great, if it can be made to work

https://github.com/pydata/xarray/blob/e37bfbf1bff63476de9e9d95eedbbd66d864f588/xarray/tests/test_backends.py#L3415

@aldenks aldenks force-pushed the coalesce-shard-reads branch 2 times, most recently from dafc09e to 2c18e2e Compare May 1, 2025 02:10
@aldenks
Copy link
Contributor Author

aldenks commented May 1, 2025

Got pulled away after pushing those changes but before describing them -- I'll update this tomorrow with profiling results.

@aldenks aldenks force-pushed the coalesce-shard-reads branch from 2c18e2e to 9f6727d Compare May 1, 2025 14:39
@aldenks
Copy link
Contributor Author

aldenks commented May 1, 2025

alright!

  • Tests added that check for coalesced (reduced count) of calls to store
  • Profiled execution time and store request count

I don't necessarily need a detailed review at this point -- if yall let me know any open questions you have or give a thumbs up ill work on the remaining todos in the description.

Tests

Added test_sharding_multiple_chunks_partial_shard_read which is parameterized to test both with and with out coalescing and I'm seeing the behavior I'd expect. e.g.:

    if coalesce_reads:
        # 2 shard index requests + 2 coalesced chunk data byte ranges (one for each shard)
        assert store_mock.get.call_count == 4
    else:
        # 2 shard index requests + 6 chunks
        assert store_mock.get.call_count == 8

Profiling

I added a slow test which reads a few subsets of a large-ish shard containing small-ish chunks under a range of settings. See test_partial_shard_read_performance for details.

The test is currently skipped and I don't necessarily think it should be a test, probably belongs in bench or I'm happy to remove now it's done its job.

To recreate the figures below:

  • Check out the commit before the optimization changes 91fbdf9 and run hatch env run --env test.py3.12-2.1-optional -- pytest -s tests/test_codecs/test_sharding.py::test_partial_shard_read_performance --run-slow-hypothesis. Then checkout the latest commit 9f6727d and run the same command again. (yes, this is what airspeed velocity is for but i opted for simpler for now.)
  • Run the plotting code below
Plotting code
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

df_coalesce = pd.read_json(
    "zarr-python-partial-shard-read-performance-with-coalesce.json"
)
df_no_coalesce = pd.read_json(
    "zarr-python-partial-shard-read-performance-no-coalesce.json"
)
df_no_coalesce["coalesce_max_gap"] = "Before optimization"
df = pd.concat([df_no_coalesce, df_coalesce])

# Define statements and metrics
statements = df["statement"].unique()
metrics = ["time", "store_get_calls"]
metric_titles = ["Execution Time", "Number of Store Get Calls"]
metric_ylabels = ["Time (s)", "Number of Calls"]

# Create labels for coalesce values
coalesce_values = df["coalesce_max_gap"].unique()
coalesce_labels = [
    x if isinstance(x, str) else "Disabled" if x == -1 else f"{x/1024/1024:.0f}MiB"
    for x in coalesce_values
]

# Get unique get_latency values
latency_values = df["get_latency"].unique()

# For each unique get_latency value, create a separate figure
for latency in latency_values:
    latency_df = df[df["get_latency"] == latency]

    max_time = latency_df["time"].max()
    max_store_get_calls = latency_df["store_get_calls"].max()

    plt.figure(figsize=(16, 12))
    plt.suptitle(
        "Performance when reading a subset of a shard\n"
        "Execution time and count of store `get` calls for 3 array access patterns\n"
        "Array and shard shape: (512, 512, 512), chunk shape: (64, 64, 64)\n"
        "Store get latency: local ssd"
        + (
            ""
            if latency == 0
            else f" + {latency * 1000:.0f}ms (simulated object storage)"
        ),
        fontsize=14,    )

    # One row per statement, two columns: time and store_get_calls
    for i, statement in enumerate(statements):
        statement_df = latency_df[latency_df["statement"] == statement]

        # Plot for time
        plt.subplot(3, 2, i * 2 + 1)
        sns.barplot(
            data=statement_df,
            x="concurrency",
            y="time",
            hue="coalesce_max_gap",
            hue_order=coalesce_values,
            palette="muted",
            errorbar=None,
        )
        plt.title(f"{statement} - {metric_titles[0]}")
        plt.xlabel("Concurrency")
        plt.ylabel(metric_ylabels[0])
        plt.ylim(0, max_time * 1.1)
        plt.legend(title="Coalesce Max Gap", labels=coalesce_labels)

        # Plot for store_get_calls
        plt.subplot(3, 2, i * 2 + 2)
        sns.barplot(
            data=statement_df,
            x="concurrency",
            y="store_get_calls",
            hue="coalesce_max_gap",
            hue_order=coalesce_values,
            palette="muted",
            errorbar=None,
        )
        plt.title(f"{statement} - {metric_titles[1]}")
        plt.xlabel("Concurrency")
        plt.ylabel(metric_ylabels[1])
        plt.ylim(0, max_store_get_calls * 1.1)
        plt.legend(title="Coalesce Max Gap", labels=coalesce_labels)

    plt.tight_layout()
    plt.savefig(f"zarr-metrics-by-statement-with-latency-{latency}s.png", dpi=300)
    plt.show()
    plt.close()

Local SSD

The results shown in this figure read from my local SSD. Before this optimization and when run on my ssd, the time to read chunks serially in the loop in ShardingCodec._decode_partial_single was about the same as the time to decode chunks (which was parallelized already within the self.codec_pipeline.read call).

zarr-metrics-by-statement-with-latency-0 0s

Simulated Object Storage

This optimization is most important reading from higher latency storage. To simulate this I added 10ms of latency to each get call. (I read that with specific networking setups you can get down to 1ms latency from S3, I also regularly see >100ms in practice, so 10ms feels fair)

zarr-metrics-by-statement-with-latency-0 01s

Takeaways

None of these are particularly surprising, but my takeaways are:

  • Most of the execution time benefit comes from adding concurrency
  • We can reduce store request count by enabling coalescing and get equal or better performance to adding concurrency alone
  • If we coalesce over gaps that are too large execution time increases a bit as we're requesting more data we don't need. 1MiB seems like a decent middle ground as a maximum gap between chunks to coalesce.

@JackKelly
Copy link
Contributor

JackKelly commented May 2, 2025

This is very cool!

On the topic of benchmarking: When reading compressed & sharded datasets, LDeakin's zarr_benchmarks suggests that zarr-python (without this PR) is quite slow. Note that the size of the performance difference isn't immediately obvious from a quick glance at the zarr_benchmark graphs, until you read the numbers above each bar in the bar charts! For example, in the "round trip benchmark", zarrs takes 3 seconds to read a compressed and sharded dataset from a local SSD, whilst zarr-python takes 181 seconds!

Hopefully this PR will help zarr-python to catch up. Thank you, Alden!

If it's helpful, and if this PR is ready, then I could try running LDeakin's zarr_benchmarks with this PR, if you'd like?

@aldenks
Copy link
Contributor Author

aldenks commented May 5, 2025

Thanks @JackKelly! I'd be happy for more benchmarking of this. I took a look at LDeakin's zarr_benchmarks and I would expect the changes in the PR to not have any impact. All the read tests in there appear to read complete shards and this PR doesn't touch that path. If you're adding benchmarks I might advocate for one that reads subsets of multiple shards as I find that's a pretty common read pattern in practice.

@JackKelly
Copy link
Contributor

JackKelly commented May 6, 2025

Noted! I've started making some notes for my plans for benchmarking here. Feel free to comment!

(BTW, please don't let my benchmarks hold up this PR! IMHO, the benchmarks that Alden has presented above are more than sufficient to demonstrate that this PR speeds up zarr-python. The aim with my benchmarking is more to compare zarr-python (with and without this PR) against zarrs).

@JackKelly
Copy link
Contributor

(Just a quick note to say that I'm afraid I'm now planning to experiment with storing NWP data in Parquet (using Polars)... so I might not get round to benchmarking Zarr readers soon, I'm sorry...)

@aldenks
Copy link
Contributor Author

aldenks commented May 13, 2025

Any maintainers, I don't necessarily need a detailed review at this point -- if yall let me know any open questions you have or give a thumbs up ill work on the remaining todos in the description.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs release notes Automatically applied to PRs which haven't added release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants