diff --git a/README.md b/README.md index c190a090..6f7fd179 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ This monorepo contains two libraries: ## Why use this library? -- **Multiple backends**: DynamoDB, Elasticsearch, Memcached, MongoDB, Redis, +- **Multiple backends**: DynamoDB, S3, Elasticsearch, Memcached, MongoDB, Redis, RocksDB, Valkey, and In-memory, Disk, etc - **TTL support**: Automatic expiration handling across all store types - **Type-safe**: Full type hints with Protocol-based interfaces @@ -131,6 +131,7 @@ pip install py-key-value-aio pip install py-key-value-aio[memory] pip install py-key-value-aio[disk] pip install py-key-value-aio[dynamodb] +pip install py-key-value-aio[s3] pip install py-key-value-aio[elasticsearch] # or: redis, mongodb, memcached, valkey, vault, registry, rocksdb, see below for all options ``` @@ -191,7 +192,7 @@ categories: - **Local stores**: In-memory and disk-based storage (Memory, Disk, RocksDB, etc.) - **Secret stores**: Secure OS-level storage for sensitive data (Keyring, Vault) - **Distributed stores**: Network-based storage for multi-node apps (Redis, - DynamoDB, MongoDB, etc.) + DynamoDB, S3, MongoDB, etc.) Each store has a **stability rating** indicating likelihood of backwards-incompatible changes. Stable stores (Redis, Valkey, Disk, Keyring) diff --git a/docs/api/stores.md b/docs/api/stores.md index 50f2c8f8..ecb5320a 100644 --- a/docs/api/stores.md +++ b/docs/api/stores.md @@ -53,6 +53,16 @@ AWS DynamoDB-backed key-value store. members: - __init__ +## S3 Store + +AWS S3-backed key-value store. + +::: key_value.aio.stores.s3.S3Store + options: + show_source: false + members: + - __init__ + ## Elasticsearch Store Elasticsearch-backed key-value store. diff --git a/docs/stores.md b/docs/stores.md index ee6250a7..5852a6eb 100644 --- a/docs/stores.md +++ b/docs/stores.md @@ -397,6 +397,7 @@ Distributed stores provide network-based storage for multi-node applications. | Store | Stability | Async | Sync | Description | |-------|:---------:|:-----:|:----:|:------------| | DynamoDB | Unstable | ✅ | ✖️ | AWS DynamoDB key-value storage | +| S3 | Unstable | ✅ | ✖️ | AWS S3 object storage | | Elasticsearch | Unstable | ✅ | ✅ | Full-text search with key-value capabilities | | Memcached | Unstable | ✅ | ✖️ | High-performance distributed memory cache | | MongoDB | Unstable | ✅ | ✅ | Document database used as key-value store | @@ -503,6 +504,42 @@ pip install py-key-value-aio[dynamodb] --- +### S3Store + +AWS S3 object storage for durable, scalable key-value storage. + +```python +from key_value.aio.stores.s3 import S3Store + +store = S3Store( + bucket_name="my-kv-bucket", + region_name="us-east-1" +) +``` + +**Installation:** + +```bash +pip install py-key-value-aio[s3] +``` + +**Use Cases:** + +- Large value storage (up to 5TB per object) +- Durable, long-term storage +- Cost-effective archival +- Multi-region replication + +**Characteristics:** + +- 99.999999999% durability +- Automatic key sanitization for S3 path limits +- Supports lifecycle policies +- Pay-per-use pricing +- Stable storage format: **Unstable** + +--- + ### ElasticsearchStore Full-text search engine used as a key-value store. diff --git a/key-value/key-value-aio/pyproject.toml b/key-value/key-value-aio/pyproject.toml index 2bd8569a..2ea4a34e 100644 --- a/key-value/key-value-aio/pyproject.toml +++ b/key-value/key-value-aio/pyproject.toml @@ -42,6 +42,7 @@ vault = ["hvac>=2.3.0", "types-hvac>=2.3.0"] memcached = ["aiomcache>=0.8.0"] elasticsearch = ["elasticsearch>=8.0.0", "aiohttp>=3.12"] dynamodb = ["aioboto3>=13.3.0", "types-aiobotocore-dynamodb>=2.16.0"] +s3 = ["aioboto3>=13.3.0", "types-aiobotocore-s3>=2.16.0"] keyring = ["keyring>=25.6.0"] keyring-linux = ["keyring>=25.6.0", "dbus-python>=1.4.0"] pydantic = ["pydantic>=2.11.9"] @@ -70,7 +71,7 @@ env_files = [".env"] [dependency-groups] dev = [ - "py-key-value-aio[memory,disk,filetree,redis,elasticsearch,memcached,mongodb,vault,dynamodb,rocksdb,duckdb]", + "py-key-value-aio[memory,disk,filetree,redis,elasticsearch,memcached,mongodb,vault,dynamodb,s3,rocksdb,duckdb]", "py-key-value-aio[valkey]; platform_system != 'Windows'", "py-key-value-aio[keyring]", "py-key-value-aio[pydantic]", diff --git a/key-value/key-value-aio/src/key_value/aio/stores/dynamodb/store.py b/key-value/key-value-aio/src/key_value/aio/stores/dynamodb/store.py index 46c0dafd..a75df8c0 100644 --- a/key-value/key-value-aio/src/key_value/aio/stores/dynamodb/store.py +++ b/key-value/key-value-aio/src/key_value/aio/stores/dynamodb/store.py @@ -40,6 +40,7 @@ class DynamoDBStore(BaseContextManagerStore, BaseStore): _endpoint_url: str | None _raw_client: Any # DynamoDB client from aioboto3 _client: DynamoDBClient | None + _owns_client: bool @overload def __init__(self, *, client: DynamoDBClient, table_name: str, default_collection: str | None = None) -> None: @@ -101,6 +102,8 @@ def __init__( self._table_name = table_name if client: self._client = client + self._raw_client = None + self._owns_client = False else: session: Session = aioboto3.Session( region_name=region_name, @@ -112,6 +115,7 @@ def __init__( self._raw_client = session.client(service_name="dynamodb", endpoint_url=endpoint_url) # pyright: ignore[reportUnknownMemberType] self._client = None + self._owns_client = True super().__init__(default_collection=default_collection) @@ -127,8 +131,8 @@ async def __aexit__( self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None ) -> None: await super().__aexit__(exc_type, exc_value, traceback) - if self._client: - await self._client.__aexit__(exc_type, exc_value, traceback) + if self._owns_client and self._raw_client: + await self._raw_client.__aexit__(exc_type, exc_value, traceback) @property def _connected_client(self) -> DynamoDBClient: @@ -256,5 +260,5 @@ async def _delete_managed_entry(self, *, key: str, collection: str) -> bool: @override async def _close(self) -> None: """Close the DynamoDB client.""" - if self._client: - await self._client.__aexit__(None, None, None) # pyright: ignore[reportUnknownMemberType] + if self._owns_client and self._raw_client: + await self._raw_client.__aexit__(None, None, None) # pyright: ignore[reportUnknownMemberType] diff --git a/key-value/key-value-aio/src/key_value/aio/stores/s3/__init__.py b/key-value/key-value-aio/src/key_value/aio/stores/s3/__init__.py new file mode 100644 index 00000000..95d93ceb --- /dev/null +++ b/key-value/key-value-aio/src/key_value/aio/stores/s3/__init__.py @@ -0,0 +1,13 @@ +"""AWS S3-based key-value store.""" + +from key_value.aio.stores.s3.store import ( + S3CollectionSanitizationStrategy, + S3KeySanitizationStrategy, + S3Store, +) + +__all__ = [ + "S3CollectionSanitizationStrategy", + "S3KeySanitizationStrategy", + "S3Store", +] diff --git a/key-value/key-value-aio/src/key_value/aio/stores/s3/store.py b/key-value/key-value-aio/src/key_value/aio/stores/s3/store.py new file mode 100644 index 00000000..843a032f --- /dev/null +++ b/key-value/key-value-aio/src/key_value/aio/stores/s3/store.py @@ -0,0 +1,447 @@ +from types import TracebackType +from typing import TYPE_CHECKING, Any, overload + +from key_value.shared.utils.managed_entry import ManagedEntry +from key_value.shared.utils.sanitization import SanitizationStrategy +from key_value.shared.utils.sanitize import hash_excess_length +from typing_extensions import Self, override + +from key_value.aio.stores.base import ( + BaseContextManagerStore, + BaseStore, +) + +HTTP_NOT_FOUND = 404 + +# S3 key length limit is 1024 bytes +# Allocating 500 bytes each for collection and key stays well under the limit +MAX_COLLECTION_LENGTH = 500 +MAX_KEY_LENGTH = 500 + +try: + import aioboto3 + from aioboto3.session import Session # noqa: TC002 +except ImportError as e: + msg = "S3Store requires py-key-value-aio[s3]" + raise ImportError(msg) from e + +# aioboto3 generates types at runtime, so we use AioBaseClient at runtime but S3Client during static type checking +if TYPE_CHECKING: + from types_aiobotocore_s3.client import S3Client +else: + from aiobotocore.client import AioBaseClient as S3Client + + +class S3KeySanitizationStrategy(SanitizationStrategy): + """Sanitization strategy for S3 keys with byte-aware length limits. + + S3 has a maximum key length of 1024 bytes (UTF-8 encoded). This strategy + hashes keys that exceed the specified byte limit to ensure compliance. + + Args: + max_bytes: Maximum key length in bytes. Defaults to 500. + """ + + def __init__(self, max_bytes: int = MAX_KEY_LENGTH) -> None: + """Initialize the S3 key sanitization strategy. + + Args: + max_bytes: Maximum key length in bytes. + """ + self.max_bytes = max_bytes + + def sanitize(self, value: str) -> str: + """Hash the value if it exceeds max_bytes when UTF-8 encoded. + + Args: + value: The key to sanitize. + + Returns: + The original value if within limit, or truncated+hashed if too long. + """ + return hash_excess_length(value, self.max_bytes, length_is_bytes=True) + + def validate(self, value: str) -> None: + """No validation needed for S3 keys.""" + + +class S3CollectionSanitizationStrategy(S3KeySanitizationStrategy): + """Sanitization strategy for S3 collection names with byte-aware length limits. + + This is identical to S3KeySanitizationStrategy but uses a default of 500 bytes + for collection names to match the S3 key format {collection}/{key}. + """ + + def __init__(self, max_bytes: int = MAX_COLLECTION_LENGTH) -> None: + """Initialize the S3 collection sanitization strategy. + + Args: + max_bytes: Maximum collection name length in bytes. + """ + super().__init__(max_bytes=max_bytes) + + +class S3Store(BaseContextManagerStore, BaseStore): + """AWS S3-based key-value store. + + This store uses AWS S3 to store key-value pairs as objects. Each entry is stored + as a separate S3 object with the path format: {collection}/{key}. The ManagedEntry + is serialized to JSON and stored as the object body. TTL information is stored in + S3 object metadata and checked client-side during retrieval (S3 lifecycle policies + can be configured separately for background cleanup, but don't provide atomic TTL+retrieval). + + By default, collections and keys are not sanitized. This means you must ensure that + the combined "{collection}/{key}" path does not exceed S3's 1024-byte limit when UTF-8 encoded. + + To handle long collection or key names, use the S3CollectionSanitizationStrategy and + S3KeySanitizationStrategy which will hash values exceeding the byte limit. + + Example: + Basic usage with automatic AWS credentials: + + >>> async with S3Store(bucket_name="my-kv-store") as store: + ... await store.put(key="user:123", value={"name": "Alice"}, ttl=3600) + ... user = await store.get(key="user:123") + + With sanitization for long keys/collections: + + >>> async with S3Store( + ... bucket_name="my-kv-store", + ... collection_sanitization_strategy=S3CollectionSanitizationStrategy(), + ... key_sanitization_strategy=S3KeySanitizationStrategy(), + ... ) as store: + ... await store.put(key="very_long_key" * 100, value={"data": "test"}) + + With custom AWS credentials: + + >>> async with S3Store( + ... bucket_name="my-kv-store", + ... region_name="us-west-2", + ... aws_access_key_id="...", + ... aws_secret_access_key="...", + ... ) as store: + ... await store.put(key="config", value={"setting": "value"}) + + For local testing with LocalStack: + + >>> async with S3Store( + ... bucket_name="test-bucket", + ... endpoint_url="http://localhost:4566", + ... ) as store: + ... await store.put(key="test", value={"data": "test"}) + """ + + _bucket_name: str + _endpoint_url: str | None + _raw_client: Any + _client: S3Client | None + _owns_client: bool + + @overload + def __init__( + self, + *, + client: S3Client, + bucket_name: str, + default_collection: str | None = None, + collection_sanitization_strategy: SanitizationStrategy | None = None, + key_sanitization_strategy: SanitizationStrategy | None = None, + ) -> None: + """Initialize the S3 store with a pre-configured client. + + Note: When you provide an existing client, you retain ownership and must manage + its lifecycle yourself. The store will not close the client when the store is closed. + + Args: + client: The S3 client to use. You must have entered the context manager before passing this in. + bucket_name: The name of the S3 bucket to use. + default_collection: The default collection to use if no collection is provided. + collection_sanitization_strategy: Strategy for sanitizing collection names. Defaults to None (no sanitization). + key_sanitization_strategy: Strategy for sanitizing keys. Defaults to None (no sanitization). + """ + + @overload + def __init__( + self, + *, + bucket_name: str, + region_name: str | None = None, + endpoint_url: str | None = None, + aws_access_key_id: str | None = None, + aws_secret_access_key: str | None = None, + aws_session_token: str | None = None, + default_collection: str | None = None, + collection_sanitization_strategy: SanitizationStrategy | None = None, + key_sanitization_strategy: SanitizationStrategy | None = None, + ) -> None: + """Initialize the S3 store with AWS credentials. + + Args: + bucket_name: The name of the S3 bucket to use. + region_name: AWS region name. Defaults to None (uses AWS default). + endpoint_url: Custom endpoint URL (useful for LocalStack/MinIO). Defaults to None. + aws_access_key_id: AWS access key ID. Defaults to None (uses AWS default credentials). + aws_secret_access_key: AWS secret access key. Defaults to None (uses AWS default credentials). + aws_session_token: AWS session token. Defaults to None (uses AWS default credentials). + default_collection: The default collection to use if no collection is provided. + collection_sanitization_strategy: Strategy for sanitizing collection names. Defaults to None (no sanitization). + key_sanitization_strategy: Strategy for sanitizing keys. Defaults to None (no sanitization). + """ + + def __init__( + self, + *, + client: S3Client | None = None, + bucket_name: str, + region_name: str | None = None, + endpoint_url: str | None = None, + aws_access_key_id: str | None = None, + aws_secret_access_key: str | None = None, + aws_session_token: str | None = None, + default_collection: str | None = None, + collection_sanitization_strategy: SanitizationStrategy | None = None, + key_sanitization_strategy: SanitizationStrategy | None = None, + ) -> None: + """Initialize the S3 store. + + Args: + client: The S3 client to use. Defaults to None (creates a new client). + bucket_name: The name of the S3 bucket to use. + region_name: AWS region name. Defaults to None (uses AWS default). + endpoint_url: Custom endpoint URL (useful for LocalStack/MinIO). Defaults to None. + aws_access_key_id: AWS access key ID. Defaults to None (uses AWS default credentials). + aws_secret_access_key: AWS secret access key. Defaults to None (uses AWS default credentials). + aws_session_token: AWS session token. Defaults to None (uses AWS default credentials). + default_collection: The default collection to use if no collection is provided. + collection_sanitization_strategy: Strategy for sanitizing collection names. Defaults to None (no sanitization). + key_sanitization_strategy: Strategy for sanitizing keys. Defaults to None (no sanitization). + """ + self._bucket_name = bucket_name + self._endpoint_url = endpoint_url + + if client: + self._client = client + self._raw_client = None + self._owns_client = False + else: + session: Session = aioboto3.Session( + region_name=region_name, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + aws_session_token=aws_session_token, + ) + + self._raw_client = session.client(service_name="s3", endpoint_url=endpoint_url) # pyright: ignore[reportUnknownMemberType] + self._client = None + self._owns_client = True + + super().__init__( + default_collection=default_collection, + collection_sanitization_strategy=collection_sanitization_strategy, + key_sanitization_strategy=key_sanitization_strategy, + ) + + @override + async def __aenter__(self) -> Self: + if self._raw_client: + self._client = await self._raw_client.__aenter__() + await super().__aenter__() + return self + + @override + async def __aexit__( + self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None + ) -> None: + await super().__aexit__(exc_type, exc_value, traceback) + if self._owns_client and self._raw_client: + await self._raw_client.__aexit__(exc_type, exc_value, traceback) + + @property + def _connected_client(self) -> S3Client: + """Get the connected S3 client. + + Raises: + ValueError: If the client is not connected. + + Returns: + The connected S3 client. + """ + if not self._client: + msg = "Client not connected" + raise ValueError(msg) + return self._client + + @override + async def _setup(self) -> None: + """Setup the S3 client and ensure bucket exists. + + This method creates the S3 bucket if it doesn't already exist. It uses the + HeadBucket operation to check for bucket existence and creates it if not found. + """ + from botocore.exceptions import ClientError + + try: + await self._connected_client.head_bucket(Bucket=self._bucket_name) # pyright: ignore[reportUnknownMemberType] + except ClientError as e: + error_code = e.response.get("Error", {}).get("Code", "") # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + http_status = e.response.get("ResponseMetadata", {}).get("HTTPStatusCode", 0) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + + if error_code in ("404", "NoSuchBucket") or http_status == HTTP_NOT_FOUND: + import contextlib + + with contextlib.suppress(self._connected_client.exceptions.BucketAlreadyOwnedByYou): # pyright: ignore[reportUnknownMemberType] + create_params: dict[str, Any] = {"Bucket": self._bucket_name} + region_name = getattr(self._connected_client.meta, "region_name", None) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + + # For regions other than us-east-1, specify LocationConstraint + # Skip for custom endpoints (LocalStack, MinIO) which may not support it + if region_name and region_name != "us-east-1" and not self._endpoint_url: + create_params["CreateBucketConfiguration"] = {"LocationConstraint": region_name} + + await self._connected_client.create_bucket(**create_params) # pyright: ignore[reportUnknownMemberType] + else: + raise + + def _get_s3_key(self, *, collection: str, key: str) -> str: + """Generate the S3 object key for a given collection and key. + + The collection and key are sanitized using the configured sanitization strategies + before being combined into the S3 object key format: {collection}/{key}. + + Args: + collection: The collection name. + key: The key within the collection. + + Returns: + The S3 object key in format: {collection}/{key} + """ + sanitized_collection, sanitized_key = self._sanitize_collection_and_key(collection=collection, key=key) + return f"{sanitized_collection}/{sanitized_key}" + + @override + async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None: + """Retrieve a managed entry from S3. + + This method fetches the object from S3, deserializes the JSON body to a ManagedEntry, + and checks for client-side TTL expiration. If the entry has expired, it is deleted + and None is returned. + + Args: + key: The key to retrieve. + collection: The collection to retrieve from. + + Returns: + The ManagedEntry if found and not expired, otherwise None. + """ + s3_key = self._get_s3_key(collection=collection, key=key) + + try: + response = await self._connected_client.get_object( # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + Bucket=self._bucket_name, + Key=s3_key, + ) + + async with response["Body"] as stream: # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + body_bytes = await stream.read() # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + json_value = body_bytes.decode("utf-8") # pyright: ignore[reportUnknownMemberType] + + managed_entry = self._serialization_adapter.load_json(json_str=json_value) + + if managed_entry.is_expired: + await self._connected_client.delete_object( # type: ignore[reportUnknownMemberType] + Bucket=self._bucket_name, + Key=s3_key, + ) + return None + return managed_entry # noqa: TRY300 + + except self._connected_client.exceptions.NoSuchKey: # pyright: ignore[reportUnknownMemberType] + return None + + @override + async def _put_managed_entry( + self, + *, + key: str, + collection: str, + managed_entry: ManagedEntry, + ) -> None: + """Store a managed entry in S3. + + This method serializes the ManagedEntry to JSON and stores it as an S3 object. + TTL information is stored in the object metadata for potential use by S3 lifecycle + policies (though lifecycle policies don't support atomic TTL+retrieval, so client-side + checking is still required). + + Args: + key: The key to store. + collection: The collection to store in. + managed_entry: The ManagedEntry to store. + """ + s3_key = self._get_s3_key(collection=collection, key=key) + json_value = self._serialization_adapter.dump_json(entry=managed_entry) + + metadata: dict[str, str] = {} + if managed_entry.expires_at: + metadata["expires-at"] = managed_entry.expires_at.isoformat() + if managed_entry.created_at: + metadata["created-at"] = managed_entry.created_at.isoformat() + + await self._connected_client.put_object( # pyright: ignore[reportUnknownMemberType] + Bucket=self._bucket_name, + Key=s3_key, + Body=json_value.encode("utf-8"), + ContentType="application/json", + Metadata=metadata, + ) + + @override + async def _delete_managed_entry(self, *, key: str, collection: str) -> bool: + """Delete a managed entry from S3. + + Args: + key: The key to delete. + collection: The collection to delete from. + + Returns: + True if an object was deleted, False if the object didn't exist. + """ + s3_key = self._get_s3_key(collection=collection, key=key) + + from botocore.exceptions import ClientError + + try: + await self._connected_client.head_object( # pyright: ignore[reportUnknownMemberType] + Bucket=self._bucket_name, + Key=s3_key, + ) + + except ClientError as e: + error = e.response.get("Error", {}) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + metadata = e.response.get("ResponseMetadata", {}) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + error_code = error.get("Code", "") # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + http_status = metadata.get("HTTPStatusCode", 0) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + + if error_code in ("404", "NoSuchKey") or http_status == HTTP_NOT_FOUND: + return False + + if error_code in ("403", "AccessDenied"): + await self._connected_client.delete_object( # pyright: ignore[reportUnknownMemberType] + Bucket=self._bucket_name, + Key=s3_key, + ) + return True + + raise + + await self._connected_client.delete_object( # pyright: ignore[reportUnknownMemberType] + Bucket=self._bucket_name, + Key=s3_key, + ) + return True + + @override + async def _close(self) -> None: + """Close the S3 client.""" + if self._owns_client and self._raw_client: + await self._raw_client.__aexit__(None, None, None) diff --git a/key-value/key-value-aio/tests/stores/dynamodb/test_dynamodb.py b/key-value/key-value-aio/tests/stores/dynamodb/test_dynamodb.py index 235b3572..f2a748d4 100644 --- a/key-value/key-value-aio/tests/stores/dynamodb/test_dynamodb.py +++ b/key-value/key-value-aio/tests/stores/dynamodb/test_dynamodb.py @@ -40,7 +40,7 @@ async def ping_dynamodb() -> bool: session = aioboto3.Session( aws_access_key_id="test", - aws_secret_access_key="test", # noqa: S106 + aws_secret_access_key="test", region_name="us-east-1", ) async with session.client(service_name="dynamodb", endpoint_url=DYNAMODB_ENDPOINT) as client: # type: ignore @@ -89,7 +89,7 @@ async def store(self, setup_dynamodb: None) -> DynamoDBStore: table_name=DYNAMODB_TEST_TABLE, endpoint_url=DYNAMODB_ENDPOINT, aws_access_key_id="test", - aws_secret_access_key="test", # noqa: S106 + aws_secret_access_key="test", region_name="us-east-1", ) @@ -98,7 +98,7 @@ async def store(self, setup_dynamodb: None) -> DynamoDBStore: session = aioboto3.Session( aws_access_key_id="test", - aws_secret_access_key="test", # noqa: S106 + aws_secret_access_key="test", region_name="us-east-1", ) async with session.client(service_name="dynamodb", endpoint_url=DYNAMODB_ENDPOINT) as client: # type: ignore diff --git a/key-value/key-value-aio/tests/stores/s3/__init__.py b/key-value/key-value-aio/tests/stores/s3/__init__.py new file mode 100644 index 00000000..d1936388 --- /dev/null +++ b/key-value/key-value-aio/tests/stores/s3/__init__.py @@ -0,0 +1 @@ +"""Tests for S3Store.""" diff --git a/key-value/key-value-aio/tests/stores/s3/test_s3.py b/key-value/key-value-aio/tests/stores/s3/test_s3.py new file mode 100644 index 00000000..3fd838a9 --- /dev/null +++ b/key-value/key-value-aio/tests/stores/s3/test_s3.py @@ -0,0 +1,120 @@ +import contextlib +from collections.abc import AsyncGenerator + +import pytest +from key_value.shared.stores.wait import async_wait_for_true +from typing_extensions import override + +from key_value.aio.stores.base import BaseStore +from key_value.aio.stores.s3 import S3Store +from tests.conftest import docker_container, should_skip_docker_tests +from tests.stores.base import BaseStoreTests, ContextManagerStoreTestMixin + +# S3 test configuration (using LocalStack) +S3_HOST = "localhost" +S3_HOST_PORT = 4566 +S3_ENDPOINT = f"http://{S3_HOST}:{S3_HOST_PORT}" +S3_TEST_BUCKET = "kv-store-test" + +WAIT_FOR_S3_TIMEOUT = 30 + +# LocalStack versions to test +LOCALSTACK_VERSIONS_TO_TEST = [ + "4.0.3", # Latest stable version +] + +LOCALSTACK_CONTAINER_PORT = 4566 + + +async def ping_s3() -> bool: + """Check if LocalStack S3 is running.""" + try: + import aioboto3 + + session = aioboto3.Session( + aws_access_key_id="test", + aws_secret_access_key="test", + region_name="us-east-1", + ) + async with session.client(service_name="s3", endpoint_url=S3_ENDPOINT) as client: # type: ignore + await client.list_buckets() # type: ignore + except Exception: + return False + else: + return True + + +class S3FailedToStartError(Exception): + pass + + +@pytest.mark.skipif(should_skip_docker_tests(), reason="Docker is not available") +class TestS3Store(ContextManagerStoreTestMixin, BaseStoreTests): + @pytest.fixture(scope="session", params=LOCALSTACK_VERSIONS_TO_TEST) + async def setup_s3(self, request: pytest.FixtureRequest) -> AsyncGenerator[None, None]: + version = request.param + + # LocalStack container for S3 + with docker_container( + f"s3-test-{version}", + f"localstack/localstack:{version}", + {str(LOCALSTACK_CONTAINER_PORT): S3_HOST_PORT}, + environment={"SERVICES": "s3"}, + ): + if not await async_wait_for_true(bool_fn=ping_s3, tries=WAIT_FOR_S3_TIMEOUT, wait_time=2): + msg = f"LocalStack S3 {version} failed to start" + raise S3FailedToStartError(msg) + + yield + + @override + @pytest.fixture + async def store(self, setup_s3: None) -> S3Store: + from key_value.aio.stores.s3 import S3CollectionSanitizationStrategy, S3KeySanitizationStrategy + + store = S3Store( + bucket_name=S3_TEST_BUCKET, + endpoint_url=S3_ENDPOINT, + aws_access_key_id="test", + aws_secret_access_key="test", + region_name="us-east-1", + # Use sanitization strategies for tests to handle long collection/key names + collection_sanitization_strategy=S3CollectionSanitizationStrategy(), + key_sanitization_strategy=S3KeySanitizationStrategy(), + ) + + # Clean up test bucket if it exists + import aioboto3 + + session = aioboto3.Session( + aws_access_key_id="test", + aws_secret_access_key="test", + region_name="us-east-1", + ) + async with session.client(service_name="s3", endpoint_url=S3_ENDPOINT) as client: # type: ignore + with contextlib.suppress(Exception): + # Delete all objects in the bucket (handle pagination) + continuation_token: str | None = None + while True: + list_kwargs = {"Bucket": S3_TEST_BUCKET} + if continuation_token: + list_kwargs["ContinuationToken"] = continuation_token + response = await client.list_objects_v2(**list_kwargs) # type: ignore + + # Delete objects from this page + for obj in response.get("Contents", []): # type: ignore + await client.delete_object(Bucket=S3_TEST_BUCKET, Key=obj["Key"]) # type: ignore + + # Check if there are more pages + continuation_token = response.get("NextContinuationToken") # type: ignore + if not continuation_token: + break + + # Delete the bucket + await client.delete_bucket(Bucket=S3_TEST_BUCKET) # type: ignore + + return store + + @pytest.mark.skip(reason="Distributed Caches are unbounded") + @override + async def test_not_unbounded(self, store: BaseStore): ... diff --git a/key-value/key-value-aio/tests/stores/vault/test_vault.py b/key-value/key-value-aio/tests/stores/vault/test_vault.py index e9704641..b7e25ec0 100644 --- a/key-value/key-value-aio/tests/stores/vault/test_vault.py +++ b/key-value/key-value-aio/tests/stores/vault/test_vault.py @@ -13,7 +13,7 @@ # Vault test configuration VAULT_HOST = "localhost" VAULT_PORT = 8200 -VAULT_TOKEN = "dev-root-token" # noqa: S105 +VAULT_TOKEN = "dev-root-token" VAULT_MOUNT_POINT = "secret" VAULT_CONTAINER_PORT = 8200 diff --git a/key-value/key-value-shared/src/key_value/shared/utils/sanitize.py b/key-value/key-value-shared/src/key_value/shared/utils/sanitize.py index ce1e5df3..d6c84ddf 100644 --- a/key-value/key-value-shared/src/key_value/shared/utils/sanitize.py +++ b/key-value/key-value-shared/src/key_value/shared/utils/sanitize.py @@ -61,6 +61,37 @@ def sanitize_characters_in_string(value: str, allowed_characters: str, replace_w return new_value +def _truncate_to_bytes(value: str, max_bytes: int, encoding: str = "utf-8") -> str: + """Truncate a string to fit within max_bytes when encoded, without splitting multi-byte characters. + + Args: + value: The string to truncate. + max_bytes: The maximum number of bytes. + encoding: The encoding to use (default: utf-8). + + Returns: + The truncated string that fits within max_bytes. + """ + encoded = value.encode(encoding) + if len(encoded) <= max_bytes: + return value + + # Binary search to find the longest substring that fits + left, right = 0, len(value) + result = "" + + while left <= right: + mid = (left + right) // 2 + candidate = value[:mid] + if len(candidate.encode(encoding)) <= max_bytes: + result = candidate + left = mid + 1 + else: + right = mid - 1 + + return result + + @bear_enforce def sanitize_string( value: str, @@ -70,6 +101,7 @@ def sanitize_string( hash_fragment_separator: str = DEFAULT_HASH_FRAGMENT_SEPARATOR, hash_fragment_mode: HashFragmentMode = HashFragmentMode.ONLY_IF_CHANGED, hash_fragment_length: int = DEFAULT_HASH_FRAGMENT_SIZE, + length_is_bytes: bool = False, ) -> str: """Sanitize the value, replacing characters and optionally adding a fragment a hash of the value if requested. @@ -81,9 +113,10 @@ def sanitize_string( Args: value: The value to sanitize. allowed_characters: The allowed characters in the value. - max_length: The maximum length of the value (with the hash fragment added). + max_length: The maximum length of the value (with hash fragment). Interpreted as bytes if length_is_bytes is True. hash_fragment_separator: The separator to add between the value and the hash fragment. hash_fragment_mode: The mode to add the hash fragment. + length_is_bytes: If True, max_length is interpreted as bytes instead of characters. """ if max_length < MINIMUM_MAX_LENGTH: msg = f"max_length must be greater than or equal to {MINIMUM_MAX_LENGTH}" @@ -94,7 +127,11 @@ def sanitize_string( raise ValueError(msg) hash_fragment: str = generate_hash_fragment(value=value, size=hash_fragment_length) - hash_fragment_size_required: int = len(hash_fragment_separator) + len(hash_fragment) + hash_fragment_size_required: int = ( + len((hash_fragment_separator + hash_fragment).encode("utf-8")) + if length_is_bytes + else len(hash_fragment_separator) + len(hash_fragment) + ) sanitized_value: str = ( sanitize_characters_in_string(value=value, allowed_characters=allowed_characters, replace_with=replacement_character) @@ -106,8 +143,7 @@ def sanitize_string( if hash_fragment_mode == HashFragmentMode.ALWAYS: actual_max_length = max_length - hash_fragment_size_required - - sanitized_value = sanitized_value[:actual_max_length] + sanitized_value = _truncate_to_bytes(sanitized_value, actual_max_length) if length_is_bytes else sanitized_value[:actual_max_length] if not sanitized_value: return hash_fragment @@ -115,14 +151,13 @@ def sanitize_string( return sanitized_value + hash_fragment_separator + hash_fragment if hash_fragment_mode == HashFragmentMode.ONLY_IF_CHANGED: - sanitized_value = sanitized_value[:max_length] + sanitized_value = _truncate_to_bytes(sanitized_value, max_length) if length_is_bytes else sanitized_value[:max_length] if value == sanitized_value: return value actual_max_length = max_length - hash_fragment_size_required - - sanitized_value = sanitized_value[:actual_max_length] + sanitized_value = _truncate_to_bytes(sanitized_value, actual_max_length) if length_is_bytes else sanitized_value[:actual_max_length] if not sanitized_value: return hash_fragment @@ -133,18 +168,19 @@ def sanitize_string( msg = "Entire value was sanitized and hash_fragment_mode is HashFragmentMode.NEVER" raise ValueError(msg) - return sanitized_value + return _truncate_to_bytes(sanitized_value, max_length) if length_is_bytes else sanitized_value[:max_length] @bear_enforce -def hash_excess_length(value: str, max_length: int) -> str: +def hash_excess_length(value: str, max_length: int, length_is_bytes: bool = False) -> str: """Hash part of the value if it exceeds the maximum length. This operation will truncate the value to the maximum length minus 8 characters and will swap the last 8 characters with the first 8 characters of the generated hash. Args: value: The value to hash. - max_length: The maximum length of the value. Must be greater than 32. + max_length: The maximum length of the value. Must be greater than 16. If length_is_bytes is True, this is interpreted as bytes. + length_is_bytes: If True, max_length is interpreted as bytes instead of characters. Returns: The hashed value if the value exceeds the maximum length, otherwise the original value. @@ -153,10 +189,13 @@ def hash_excess_length(value: str, max_length: int) -> str: msg = f"max_length must be greater than {MINIMUM_MAX_LENGTH}" raise ValueError(msg) - if len(value) <= max_length: + # Check if truncation is needed + current_length = len(value.encode("utf-8")) if length_is_bytes else len(value) + if current_length <= max_length: return value - truncated_value = value[: max_length - 8] + # Truncate to max_length - 8 to make room for hash + truncated_value = _truncate_to_bytes(value, max_length - 8) if length_is_bytes else value[: max_length - 8] hash_of_value = hashlib.sha256(value.encode()).hexdigest() first_eight_of_hash = hash_of_value[:8] diff --git a/key-value/key-value-sync/tests/code_gen/stores/vault/test_vault.py b/key-value/key-value-sync/tests/code_gen/stores/vault/test_vault.py index 6eddf1f1..d0cbc6d3 100644 --- a/key-value/key-value-sync/tests/code_gen/stores/vault/test_vault.py +++ b/key-value/key-value-sync/tests/code_gen/stores/vault/test_vault.py @@ -14,7 +14,7 @@ # Vault test configuration VAULT_HOST = "localhost" VAULT_PORT = 8200 -VAULT_TOKEN = "dev-root-token" # noqa: S105 +VAULT_TOKEN = "dev-root-token" VAULT_MOUNT_POINT = "secret" VAULT_CONTAINER_PORT = 8200 diff --git a/pyproject.toml b/pyproject.toml index 8ea8b0a3..97c16599 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,6 +64,8 @@ line-length = 140 [tool.ruff.lint.extend-per-file-ignores] "**/tests/*.py" = [ "S101", # Ignore asserts + "S105", # Ignore hardcoded password string (test credentials) + "S106", # Ignore hardcoded password function argument (test credentials) "DTZ005", # Ignore datetime.UTC "PLR2004", # Ignore magic values "E501", # Ignore line length diff --git a/scripts/build_sync_library.py b/scripts/build_sync_library.py index e1f6d9a4..05bc58bb 100644 --- a/scripts/build_sync_library.py +++ b/scripts/build_sync_library.py @@ -54,10 +54,12 @@ EXCLUDE_DIRECTORIES = [ "key-value/key-value-aio/src/key_value/aio/stores/dynamodb", "key-value/key-value-aio/tests/stores/dynamodb", - "key-value/key-value-aio/src/key_value/aio/stores/memcached", - "key-value/key-value-aio/tests/stores/memcached", "key-value/key-value-aio/src/key_value/aio/stores/filetree", "key-value/key-value-aio/tests/stores/filetree", + "key-value/key-value-aio/src/key_value/aio/stores/memcached", + "key-value/key-value-aio/tests/stores/memcached", + "key-value/key-value-aio/src/key_value/aio/stores/s3", + "key-value/key-value-aio/tests/stores/s3", "key-value/key-value-aio/src/key_value/aio/wrappers/timeout", "key-value/key-value-aio/tests/wrappers/timeout", ] diff --git a/uv.lock b/uv.lock index 5eff3617..1d106ddf 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.10" resolution-markers = [ "python_full_version >= '3.12' and sys_platform != 'win32'", @@ -1815,6 +1815,10 @@ redis = [ rocksdb = [ { name = "rocksdict" }, ] +s3 = [ + { name = "aioboto3" }, + { name = "types-aiobotocore-s3" }, +] valkey = [ { name = "valkey-glide" }, ] @@ -1829,13 +1833,14 @@ wrappers-encryption = [ [package.dev-dependencies] dev = [ { name = "py-key-value", extra = ["dev"] }, - { name = "py-key-value-aio", extra = ["disk", "duckdb", "dynamodb", "elasticsearch", "filetree", "keyring", "memcached", "memory", "mongodb", "pydantic", "redis", "rocksdb", "vault", "wrappers-encryption"] }, + { name = "py-key-value-aio", extra = ["disk", "duckdb", "dynamodb", "elasticsearch", "filetree", "keyring", "memcached", "memory", "mongodb", "pydantic", "redis", "rocksdb", "s3", "vault", "wrappers-encryption"] }, { name = "py-key-value-aio", extra = ["valkey"], marker = "sys_platform != 'win32'" }, ] [package.metadata] requires-dist = [ { name = "aioboto3", marker = "extra == 'dynamodb'", specifier = ">=13.3.0" }, + { name = "aioboto3", marker = "extra == 's3'", specifier = ">=13.3.0" }, { name = "aiofile", marker = "extra == 'filetree'", specifier = ">=3.5.0" }, { name = "aiohttp", marker = "extra == 'elasticsearch'", specifier = ">=3.12" }, { name = "aiomcache", marker = "extra == 'memcached'", specifier = ">=0.8.0" }, @@ -1859,16 +1864,17 @@ requires-dist = [ { name = "rocksdict", marker = "python_full_version >= '3.12' and extra == 'rocksdb'", specifier = ">=0.3.24" }, { name = "rocksdict", marker = "python_full_version < '3.12' and extra == 'rocksdb'", specifier = ">=0.3.2" }, { name = "types-aiobotocore-dynamodb", marker = "extra == 'dynamodb'", specifier = ">=2.16.0" }, + { name = "types-aiobotocore-s3", marker = "extra == 's3'", specifier = ">=2.16.0" }, { name = "types-hvac", marker = "extra == 'vault'", specifier = ">=2.3.0" }, { name = "valkey-glide", marker = "extra == 'valkey'", specifier = ">=2.1.0" }, ] -provides-extras = ["memory", "disk", "filetree", "redis", "mongodb", "valkey", "vault", "memcached", "elasticsearch", "dynamodb", "keyring", "keyring-linux", "pydantic", "rocksdb", "duckdb", "wrappers-encryption"] +provides-extras = ["memory", "disk", "filetree", "redis", "mongodb", "valkey", "vault", "memcached", "elasticsearch", "dynamodb", "s3", "keyring", "keyring-linux", "pydantic", "rocksdb", "duckdb", "wrappers-encryption"] [package.metadata.requires-dev] dev = [ { name = "py-key-value", extras = ["dev"], editable = "." }, { name = "py-key-value-aio", extras = ["keyring"] }, - { name = "py-key-value-aio", extras = ["memory", "disk", "filetree", "redis", "elasticsearch", "memcached", "mongodb", "vault", "dynamodb", "rocksdb", "duckdb"] }, + { name = "py-key-value-aio", extras = ["memory", "disk", "filetree", "redis", "elasticsearch", "memcached", "mongodb", "vault", "dynamodb", "s3", "rocksdb", "duckdb"] }, { name = "py-key-value-aio", extras = ["pydantic"] }, { name = "py-key-value-aio", extras = ["valkey"], marker = "sys_platform != 'win32'" }, { name = "py-key-value-aio", extras = ["wrappers-encryption"] }, @@ -2712,6 +2718,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ca/4f/05d80aa8b5a95b82ddb89547c3a037b9460702286c66ca6e0fbb8fa2ce86/types_aiobotocore_dynamodb-2.25.0-py3-none-any.whl", hash = "sha256:de791dfcef90eb3431c09b63419301f9ff824d82970623e149a427d5fd325430", size = 57971, upload-time = "2025-10-11T01:27:39.639Z" }, ] +[[package]] +name = "types-aiobotocore-s3" +version = "2.25.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions", marker = "python_full_version < '3.12'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/8e/c7/5ceb5faf9475470cf01e764def05e96a11db734139acd0a114195ff7e353/types_aiobotocore_s3-2.25.1.tar.gz", hash = "sha256:d7729dea44d144e9a24fdd17f4dc66b98ad71864f9bf6eb6db05958e0e59d01e", size = 76444, upload-time = "2025-10-29T01:52:02.49Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/89/5b/fbef89694f035c69cfcfd998fefe2c82ff1975ab9cd963b5a9627298afae/types_aiobotocore_s3-2.25.1-py3-none-any.whl", hash = "sha256:c6f1f4f60c253248fbf463ed179b9302e5258e07073e3786eafefff2891332e3", size = 83906, upload-time = "2025-10-29T01:52:01.029Z" }, +] + [[package]] name = "types-hvac" version = "2.3.0.20250914"