Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 86 additions & 1 deletion docs/source/store-and-retrieve/object-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,27 @@ class ObjectStoreItem:
metadata: dict[str, str] | None # Custom key-value metadata (optional)
```

### ObjectStoreListItem
The `ObjectStoreListItem` model represents only an object's metadata.
```python
class ObjectStoreListItem:
key: str # The object's unique key
size: int # Size in bytes
content_type: str | None # MIME type (e.g., "video/mp4")
metadata: dict[str, str] | None # Custom metadata
last_modified: datetime | None # Last modification timestamp
```

Note: Unlike `ObjectStoreItem`, this model does not include the `data` field, making listings fast and memory-efficient.

### ObjectStore Interface
The `ObjectStore` abstract interface defines the four standard operations:
The `ObjectStore` abstract interface defines the five standard operations:

- **put_object(key, item)**: Store a new object with a unique key. Raises if the key already exists.
- **upsert_object(key, item)**: Update (or inserts) an object with the given key.
- **get_object(key)**: Retrieve an object by its key. Raises if the key doesn't exist.
- **delete_object(key)**: Remove an object from the store. Raises if the key doesn't exist.
- **list_objects(prefix)**: List objects in the store, optionally filtered by key prefix.

```python
class ObjectStore(ABC):
Expand All @@ -63,6 +77,10 @@ class ObjectStore(ABC):
@abstractmethod
async def delete_object(self, key: str) -> None:
...

@abstractmethod
async def list_objects(self, prefix: str | None = None) -> list["ObjectStoreListItem"]:
...
```

## Included Object Stores
Expand Down Expand Up @@ -152,10 +170,20 @@ async def my_function(config: MyFunctionConfig, builder: Builder):
retrieved_item = await object_store.get_object("greeting.txt")
print(retrieved_item.data.decode("utf-8"))

# List objects with optional prefix filtering
all_objects = await object_store.list_objects()
for obj in all_objects:
print(f"{obj.key}: {obj.size} bytes, {obj.content_type}")

# List only objects with specific prefix
greetings = await object_store.list_objects(prefix="greeting")

# Delete an object
await object_store.delete_object("greeting.txt")
```

The `list_objects()` method returns metadata for stored objects without downloading their content. This is efficient for building file browsers, galleries, or managing large files. The optional `prefix` parameter filters objects by key prefix, similar to listing files in a directory.

### File Server Integration
By adding the `object_store` field in the `general.front_end` block of the configuration, clients can directly download and upload files to the connected object store:

Expand Down Expand Up @@ -194,10 +222,67 @@ This enables HTTP endpoints for object store operations:
$ curl -X DELETE http://localhost:9000/static/folder/data.txt
```

### Video Upload Integration

When `object_store` is configured in the FastAPI front end, the UI also exposes video upload endpoints. Uploaded videos are stored with the `videos/` prefix and can be accessed from your workflow functions using the same ObjectStore instance.

```yaml
general:
front_end:
object_store: my_video_store # Enables video routes
_type: fastapi

object_stores:
my_video_store:
_type: s3
endpoint_url: http://localhost:9000
access_key: minioadmin
secret_key: minioadmin
bucket_name: my-video-bucket

functions:
my_video_function:
_type: my_video_processor
object_store: my_video_store # Same store as frontend
```

This enables HTTP endpoints for object store operations:

- **GET** `/videos` - List uploaded videos
```console
$ curl -X GET http://localhost:8000/videos
```
- **POST** `/videos` - Upload a new video
```console
$ curl -X POST -F "file=@my_video.mp4;type=video/mp4" http://localhost:8000/videos
```
- **DELETE** `/videos/{video_key}` - Delete a video
```console
$ curl -X DELETE http://localhost:8000/videos/videos_12345.mp4
```

Your workflow functions can access uploaded videos using `list_objects(prefix="videos/")` and `get_object(video_key)` as shown in the usage examples above.

## Examples
The following examples demonstrate how to use the object store module in the NeMo Agent toolkit:
* `examples/object_store/user_report` - A complete workflow that stores and retrieves user diagnostic reports using different object store backends

## Running Tests

Run these from the repository root after installing dependencies:

```bash
# In-memory object store unit tests
pytest tests/nat/object_store/test_in_memory_object_store.py -v

# FastAPI video upload routes
pytest tests/nat/front_ends/fastapi/test_video_upload_routes.py -v

# S3 provider integration tests (requires MinIO or S3 running and S3 plugin installed)
# Install S3 plugin first: uv pip install -e packages/nvidia_nat_s3
pytest packages/nvidia_nat_s3/tests/test_s3_object_store.py --run_integration -v
```

## Error Handling
Object stores may raise specific exceptions:
- **KeyAlreadyExistsError**: When trying to store an object with a key that already exists (for `put_object`)
Expand Down
55 changes: 55 additions & 0 deletions packages/nvidia_nat_s3/src/nat/plugins/s3/s3_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

import aioboto3
from botocore.client import BaseClient
from botocore.client import Config
from botocore.exceptions import ClientError

from nat.data_models.object_store import KeyAlreadyExistsError
from nat.data_models.object_store import NoSuchKeyError
from nat.object_store.interfaces import ObjectStore
from nat.object_store.models import ObjectStoreItem
from nat.object_store.models import ObjectStoreListItem

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -55,6 +57,10 @@ def __init__(self,
self._client_args["region_name"] = region
if endpoint_url:
self._client_args["endpoint_url"] = endpoint_url
# Use path-style addressing for non-AWS endpoints (MinIO, etc.) to avoid
# DNS-based virtual host lookups that fail for local endpoints
if 'amazonaws.com' not in endpoint_url.lower():
self._client_args["config"] = Config(s3={'addressing_style': 'path'})

async def __aenter__(self) -> "S3ObjectStore":

Expand Down Expand Up @@ -164,3 +170,52 @@ async def delete_object(self, key: str) -> None:

if results.get('DeleteMarker', False):
raise NoSuchKeyError(key=key, additional_message="Object was a delete marker")

async def list_objects(self, prefix: str | None = None) -> list[ObjectStoreListItem]:
"""
List objects in the S3 bucket, optionally filtered by key prefix.
"""
if self._client is None:
raise RuntimeError("Connection not established")

objects = []

try:
paginator = self._client.get_paginator('list_objects_v2')

pagination_args = {"Bucket": self.bucket_name}
if prefix is not None:
pagination_args["Prefix"] = prefix

async for page in paginator.paginate(**pagination_args):

if 'Contents' not in page:
continue

for obj in page['Contents']:
key = obj['Key']

if key.endswith('/'):
continue

try:
head_response = await self._client.head_object(Bucket=self.bucket_name, Key=key)
content_type = head_response.get('ContentType')
metadata = head_response.get('Metadata', {})
except ClientError as e:
logger.warning(f"Failed to get metadata for {key}: {e}")
content_type = None
metadata = {}

objects.append(
ObjectStoreListItem(key=key,
size=obj.get('Size', 0),
content_type=content_type,
metadata=metadata if metadata else None,
last_modified=obj.get('LastModified')))

except ClientError as e:
logger.error(f"Error listing objects with prefix '{prefix}': {e}", exc_info=True)
raise RuntimeError(f"Failed to list objects: {str(e)}") from e

return objects
55 changes: 55 additions & 0 deletions packages/nvidia_nat_s3/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pytest fixtures for S3 integration tests."""

import socket

import pytest


def is_port_open(host: str, port: int) -> bool:
"""Check if a port is open on the given host."""
try:
with socket.create_connection((host, port), timeout=1):
return True
except (TimeoutError, ConnectionRefusedError, OSError):
return False


@pytest.fixture(scope="session")
def minio_server():
"""
Fixture that checks if MinIO is running on localhost:9000.

To run S3 integration tests, start MinIO with:

docker run --rm -p 9000:9000 -p 9001:9001 \\
-e MINIO_ROOT_USER=minioadmin \\
-e MINIO_ROOT_PASSWORD=minioadmin \\
minio/minio server /data --console-address ":9001"

Then run tests with:
pytest packages/nvidia_nat_s3/tests/test_s3_object_store.py --run_integration -v
"""
if not is_port_open("localhost", 9000):
pytest.skip("MinIO not running on localhost:9000. "
"Start MinIO to run S3 integration tests.")

return {
"bucket_name": "test-bucket",
"endpoint_url": "http://localhost:9000",
"aws_access_key_id": "minioadmin",
"aws_secret_access_key": "minioadmin",
}
61 changes: 61 additions & 0 deletions packages/nvidia_nat_test/src/nat/test/object_store_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from nat.data_models.object_store import NoSuchKeyError
from nat.object_store.interfaces import ObjectStore
from nat.object_store.models import ObjectStoreItem
from nat.object_store.models import ObjectStoreListItem


@pytest.mark.asyncio(loop_scope="class")
Expand Down Expand Up @@ -115,3 +116,63 @@ async def test_delete_object(self, store: ObjectStore):
# Try to delete the object again
with pytest.raises(NoSuchKeyError):
await store.delete_object(key)

async def test_list_objects(self, store: ObjectStore):
"""Test listing objects with and without prefix filtering"""

test_id = str(uuid.uuid4())[:8]

test_objects = {
f"videos/{test_id}/video1.mp4":
ObjectStoreItem(data=b"video1_data", content_type="video/mp4", metadata={"title": "Video 1"}),
f"videos/{test_id}/video2.mp4":
ObjectStoreItem(data=b"video2_data", content_type="video/mp4", metadata={"title": "Video 2"}),
f"images/{test_id}/image1.png":
ObjectStoreItem(data=b"image1_data", content_type="image/png", metadata={"title": "Image 1"}),
f"docs/{test_id}/doc1.txt":
ObjectStoreItem(data=b"doc1_data", content_type="text/plain")
}

for key, item in test_objects.items():
await store.put_object(key, item)

# Test 1: List all objects (no prefix)
all_objects = await store.list_objects()
all_keys = {obj.key for obj in all_objects}

for key in test_objects.keys():
assert key in all_keys, f"Expected key {key} not found in all_objects"

# Test 2: List with videos prefix
video_objects = await store.list_objects(prefix=f"videos/{test_id}/")
assert len(video_objects) == 2, f"Expected 2 video objects, got {len(video_objects)}"

video_keys = {obj.key for obj in video_objects}
assert f"videos/{test_id}/video1.mp4" in video_keys
assert f"videos/{test_id}/video2.mp4" in video_keys

for obj in video_objects:
assert isinstance(obj, ObjectStoreListItem)
assert obj.key.startswith(f"videos/{test_id}/")
assert obj.size > 0
assert obj.content_type == "video/mp4"
assert obj.metadata is not None
assert "title" in obj.metadata

# Test 3: List with images prefix
image_objects = await store.list_objects(prefix=f"images/{test_id}/")
assert len(image_objects) == 1
assert image_objects[0].key == f"images/{test_id}/image1.png"
assert image_objects[0].content_type == "image/png"

# Test 4: List with non-existent prefix
empty_objects = await store.list_objects(prefix=f"nonexistent/{test_id}/")
assert len(empty_objects) == 0

# Test 5: List with partial prefix
all_test_objects = await store.list_objects(prefix=f"videos/{test_id}")
assert len(all_test_objects) >= 2 # At least our video objects

# Cleanup
for key in test_objects.keys():
await store.delete_object(key)
Loading