Skip to content

[Feature] Compressed storage gpu #3062

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

AdrianOrenstein
Copy link

@AdrianOrenstein AdrianOrenstein commented Jul 12, 2025

Description

Replay buffers are used to store a lot of data and are used to feed neural networks with batched samples to learn from. So then ideally we could put this data as close to where the network is being updated. Often raw sensory observations are stored in these buffers, such as images, audio, or text, which consumes many gigabytes of precious memory. CPU memory and accelerator VRAM may be limited, or memory transfer between these devices may be costly. So this PR aims to streamline data compression to aid in efficient storage and memory transfer.

Mainly, creating a compressed storage object will aid in training state-of-the-art RL methods on benchmarks such as the Atari Learning Environment. The ~torchrl.data.replay_buffers.storages.CompressedStorage class provides the memory savings through compression.

closes #3058
closes #2983

  • I have raised an issue to propose this change (required for new features and bug fixes)

Types of changes

What types of changes does your code introduce? Remove all that do not apply:

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds core functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation (update in the documentation)
  • Example (update in the folder of examples)

Checklist

Go over all the following points, and put an x in all the boxes that apply.
If you are unsure about any of these, don't hesitate to ask. We are here to help!

  • I have read the CONTRIBUTION guide (required)
  • My change requires a change to the documentation.
  • I have updated the tests accordingly (required for a bug fix or a new feature).
  • I have updated the documentation accordingly.

Copy link

pytorch-bot bot commented Jul 12, 2025

🔗 Helpful Links

🧪 See artifacts and rendered test results at hud.pytorch.org/pr/pytorch/rl/3062

Note: Links to docs will display an error until the docs builds have been completed.

❌ 13 New Failures, 1 Pending, 3 Unrelated Failures

As of commit 0dbd233 with merge base db0e30d (image):

NEW FAILURES - The following jobs have failed:

FLAKY - The following job failed but was likely due to flakiness present on trunk:

BROKEN TRUNK - The following jobs failed but was present on the merge base:

👉 Rebase onto the `viable/strict` branch to avoid these failures

This comment was automatically generated by Dr. CI and updates every 15 minutes.

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Jul 12, 2025
@vmoens vmoens added the enhancement New feature or request label Jul 14, 2025
@vmoens vmoens changed the title Compressed storage gpu [Feature] Compressed storage gpu Jul 14, 2025
@AdrianOrenstein
Copy link
Author

AdrianOrenstein commented Jul 14, 2025

When the tensor is on the CPU numpy is the fastest to convert to a bytestream.

---------------------------- benchmark 'tensor_to_bytestream_speed': 5 tests ----------------------------
Name (time in us)                                                  Mean                     OPS          
---------------------------------------------------------------------------------------------------------
test_tensor_to_bytestream_speed[numpy]                           1.1852 (1.0)      843,727.6370 (1.0)    
test_tensor_to_bytestream_speed[safetensors]                    11.7078 (9.88)      85,413.0849 (0.10)   
test_tensor_to_bytestream_speed[pickle]                         17.5312 (14.79)     57,041.2807 (0.07)   
test_tensor_to_bytestream_speed[torch.save]                     29.3144 (24.73)     34,112.9736 (0.04)   
test_tensor_to_bytestream_speed[tensor.untyped_storage]     37,213.3849 (>1000.0)       26.8721 (0.00)   
---------------------------------------------------------------------------------------------------------

@vmoens vmoens force-pushed the compressed-storage-gpu branch 2 times, most recently from 74d85fa to 95f532e Compare July 16, 2025 22:22
@vmoens vmoens force-pushed the compressed-storage-gpu branch from 95f532e to 5581cf6 Compare July 16, 2025 22:45
@AdrianOrenstein
Copy link
Author

AdrianOrenstein commented Jul 23, 2025

Added some examples of compressing on the cpu, batched decompression on the gpu.

I noticed that in my example of an Atari rollout, the CompressedListStorage object would compress observations twice. Once with the first transition (as the obs) and then again when it is used in the next transition (as the next obs). I think this slightly hurts the usage of CompressedListStorage in an RL rollout. In these examples below I've done compression manually and did batched decompression with the replay buffer's collate function.

❯ python examples/replay-buffers/compressed_cpu_decompressed_gpu_replay_buffer.py
A.L.E: Arcade Learning Environment (version 0.11.2+ecc1138)
[Powered by Stella]
passed correctness checks

=== ListStorage + ReplayBuffer (CPU compress, GPU decompress) Example ===

Creating compressed storage...
Starting rollout benchmark
...adding 2000 transitions to replay buffer
done rollout with Zstd and BitstreamKind.RAW, avg_compression_ratio=191 @ transitions/s=1998

Sampling from replay buffer...
done batch sampling and decompression with Zstd and BitstreamKind.RAW @ transitions/s=8255
❯ python examples/replay-buffers/compressed_gpu_decompressed_gpu_replay_buffer.py
A.L.E: Arcade Learning Environment (version 0.11.2+ecc1138)
[Powered by Stella]
passed correctness checks

=== ListStorage + ReplayBuffer (GPU) Example ===

Creating compressed storage...
Starting rollout benchmark
...adding 2000 transitions to replay buffer
done rollout with Zstd and BitstreamKind.RAW, avg_compression_ratio=98 @ transitions/s=1105

Sampling from replay buffer...
done batch sampling and decompression with Zstd and BitstreamKind.RAW @ transitions/s=8530

Compressing on the CPU first, then transferring, and re-using this compressed observation for the next transition gets about double the Atari transitions per second.

@AdrianOrenstein
Copy link
Author

@vmoens I think we're essentially done with this PR, except for a cleanup pass.

Do we want CompressedListStorage to be mentioned in the documentation? Maybe have a page on compression to showcase the VRAM storage savings on the gpu?

Copy link
Collaborator

@vmoens vmoens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy with this! Mainly nits and minor aesthetic comments on the examples and doc but otherwise good to go!

@@ -20,7 +20,7 @@ repos:
- libcst == 0.4.7

- repo: https://github.com/pycqa/flake8
rev: 4.0.1
rev: 6.0.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not against upgrades but can you comment why we need it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4.0.1 has an issue that causes the error below:

flake8...................................................................Failed
- hook id: flake8
- exit code: 1

Traceback (most recent call last):
  File "/home/adrian/.cache/pre-commit/repoxdgdrlah/py_env-python3.12/bin/flake8", line 8, in <module>
    sys.exit(main())
             ^^^^^^
  File "/home/adrian/.cache/pre-commit/repoxdgdrlah/py_env-python3.12/lib/python3.12/site-packages/flake8/main/cli.py", line 22, in main
    app.run(argv)
  File "/home/adrian/.cache/pre-commit/repoxdgdrlah/py_env-python3.12/lib/python3.12/site-packages/flake8/main/application.py", line 375, in run
    self._run(argv)
  File "/home/adrian/.cache/pre-commit/repoxdgdrlah/py_env-python3.12/lib/python3.12/site-packages/flake8/main/application.py", line 363, in _run
    self.initialize(argv)
  File "/home/adrian/.cache/pre-commit/repoxdgdrlah/py_env-python3.12/lib/python3.12/site-packages/flake8/main/application.py", line 343, in initialize
    self.find_plugins(config_finder)
  File "/home/adrian/.cache/pre-commit/repoxdgdrlah/py_env-python3.12/lib/python3.12/site-packages/flake8/main/application.py", line 157, in find_plugins
    self.check_plugins = plugin_manager.Checkers(local_plugins.extension)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/adrian/.cache/pre-commit/repoxdgdrlah/py_env-python3.12/lib/python3.12/site-packages/flake8/plugins/manager.py", line 363, in __init__
    self.manager = PluginManager(
                   ^^^^^^^^^^^^^^
  File "/home/adrian/.cache/pre-commit/repoxdgdrlah/py_env-python3.12/lib/python3.12/site-packages/flake8/plugins/manager.py", line 243, in __init__
    self._load_entrypoint_plugins()
  File "/home/adrian/.cache/pre-commit/repoxdgdrlah/py_env-python3.12/lib/python3.12/site-packages/flake8/plugins/manager.py", line 261, in _load_entrypoint_plugins
    eps = importlib_metadata.entry_points().get(self.namespace, ())
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'EntryPoints' object has no attribute 'get'

From what I understand from the issues that recommend upgrading, there was a depreciation by importlib-metadata which is fixed in 5.x.x or greater.

As an aside, you might like the speed of Ruff, I use it for my own projects.

- **Data Integrity**: Maintains full data fidelity through lossless compression
- **Flexible Compression**: Supports custom compression algorithms or uses zstd by default
- **TensorDict Support**: Seamlessly works with TensorDict structures
- **Checkpointing**: Full support for saving and loading compressed data
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add GPU support here

Comment on lines +216 to +231
>>> import torch
>>> from torchrl.data import ReplayBuffer, CompressedListStorage
>>> from tensordict import TensorDict
>>>
>>> # Create a compressed storage for image data
>>> storage = CompressedListStorage(max_size=1000, compression_level=3)
>>> rb = ReplayBuffer(storage=storage, batch_size=32)
>>>
>>> # Add image data
>>> images = torch.randn(100, 3, 84, 84) # Atari-like frames
>>> data = TensorDict({"obs": images}, batch_size=[100])
>>> rb.extend(data)
>>>
>>> # Sample data (automatically decompressed)
>>> sample = rb.sample(16)
>>> print(sample["obs"].shape) # torch.Size([16, 3, 84, 84])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AdrianOrenstein Can you check that this still makes sense and runs?

@@ -0,0 +1,199 @@
#!/usr/bin/env python3
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need Meta headers here


from torchrl.data.replay_buffers.storages import ListStorage

gym.register_envs(ale_py)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe after the imports?

@@ -0,0 +1,182 @@
#!/usr/bin/env python3
"""
Example demonstrating the use of CompressedStorage for memory-efficient replay buffers on the GPU.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more info?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth having 2 distinct examples? Not particularly unhappy with this, just more repeated code and could be harder for people to spot what the differences are between the two (hard to know where to focus your attention)

# === CompressedListStorage + ReplayBuffer with GPU compression ===
print("\n=== ListStorage + ReplayBuffer (GPU) Example ===\n")

codec = nvcomp.Codec(algorithm=algorithm, bitstream_kind=bitstream_kind)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this deserves a bit of explanation :)
Also I would check if the lib is installed beforehand and raise a warning /exception if it cannot be found

has_nvcomp = importlib.util.findspec("nvcomp", None) is not None
if not has_nvcomp:
    raise ImportError(...)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this one?

storage = CompressedListStorage(max_size=1000, compression_level=6)

# Create some sample data with different shapes and types
print("Creating sample data...")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should remove the prints in favor of logger

@AdrianOrenstein AdrianOrenstein marked this pull request as ready for review July 24, 2025 00:24
@AdrianOrenstein
Copy link
Author

Thanks for the code review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request] Compressing data stored in the Replay Buffer
3 participants