diff --git a/key-value/key-value-aio/pyproject.toml b/key-value/key-value-aio/pyproject.toml index 8f41827c..85075adf 100644 --- a/key-value/key-value-aio/pyproject.toml +++ b/key-value/key-value-aio/pyproject.toml @@ -41,6 +41,7 @@ valkey = ["valkey-glide>=2.1.0"] vault = ["hvac>=2.3.0", "types-hvac>=2.3.0"] memcached = ["aiomcache>=0.8.0"] elasticsearch = ["elasticsearch>=8.0.0", "aiohttp>=3.12"] +opensearch = ["opensearch-py[async]>=2.0.0"] dynamodb = ["aioboto3>=13.3.0", "types-aiobotocore-dynamodb>=2.16.0"] keyring = ["keyring>=25.6.0"] keyring-linux = ["keyring>=25.6.0", "dbus-python>=1.4.0"] @@ -69,7 +70,7 @@ env_files = [".env"] [dependency-groups] dev = [ - "py-key-value-aio[memory,disk,filetree,redis,elasticsearch,memcached,mongodb,vault,dynamodb,rocksdb,duckdb]", + "py-key-value-aio[memory,disk,filetree,redis,elasticsearch,opensearch,memcached,mongodb,vault,dynamodb,rocksdb,duckdb]", "py-key-value-aio[valkey]; platform_system != 'Windows'", "py-key-value-aio[keyring]", "py-key-value-aio[pydantic]", diff --git a/key-value/key-value-aio/src/key_value/aio/stores/opensearch/__init__.py b/key-value/key-value-aio/src/key_value/aio/stores/opensearch/__init__.py new file mode 100644 index 00000000..d7f02bcc --- /dev/null +++ b/key-value/key-value-aio/src/key_value/aio/stores/opensearch/__init__.py @@ -0,0 +1,3 @@ +from key_value.aio.stores.opensearch.store import OpenSearchStore + +__all__ = ["OpenSearchStore"] diff --git a/key-value/key-value-aio/src/key_value/aio/stores/opensearch/store.py b/key-value/key-value-aio/src/key_value/aio/stores/opensearch/store.py new file mode 100644 index 00000000..c62ea220 --- /dev/null +++ b/key-value/key-value-aio/src/key_value/aio/stores/opensearch/store.py @@ -0,0 +1,501 @@ +import contextlib +import logging +from collections.abc import Sequence +from typing import Any, overload + +from key_value.shared.errors import DeserializationError, SerializationError +from key_value.shared.utils.managed_entry import ManagedEntry +from key_value.shared.utils.sanitization import ( + AlwaysHashStrategy, + HashFragmentMode, + HybridSanitizationStrategy, + SanitizationStrategy, +) +from key_value.shared.utils.sanitize import ( + ALPHANUMERIC_CHARACTERS, + LOWERCASE_ALPHABET, + NUMBERS, + UPPERCASE_ALPHABET, +) +from key_value.shared.utils.serialization import SerializationAdapter +from key_value.shared.utils.time_to_live import now_as_epoch +from typing_extensions import override + +from key_value.aio.stores.base import ( + BaseContextManagerStore, + BaseCullStore, + BaseDestroyCollectionStore, + BaseEnumerateCollectionsStore, + BaseEnumerateKeysStore, + BaseStore, +) +from key_value.aio.stores.opensearch.utils import LessCapableJsonSerializer + +try: + from opensearchpy import AsyncOpenSearch + from opensearchpy.exceptions import RequestError + + from key_value.aio.stores.opensearch.utils import ( + get_aggregations_from_body, + get_body_from_response, + get_first_value_from_field_in_hit, + get_hits_from_response, + get_source_from_body, + ) +except ImportError as e: + msg = "OpenSearchStore requires opensearch-py[async]>=2.0.0. Install with: pip install 'py-key-value-aio[opensearch]'" + raise ImportError(msg) from e + + +logger = logging.getLogger(__name__) + +DEFAULT_INDEX_PREFIX = "opensearch_kv_store" + +DEFAULT_MAPPING = { + "properties": { + "created_at": { + "type": "date", + }, + "expires_at": { + "type": "date", + }, + "collection": { + "type": "keyword", + }, + "key": { + "type": "keyword", + }, + "value": { + "properties": { + "flat": { + "type": "flat_object", + }, + }, + }, + }, +} + +DEFAULT_PAGE_SIZE = 10000 +PAGE_LIMIT = 10000 + +MAX_KEY_LENGTH = 256 +ALLOWED_KEY_CHARACTERS: str = ALPHANUMERIC_CHARACTERS + +MAX_INDEX_LENGTH = 200 +ALLOWED_INDEX_CHARACTERS: str = LOWERCASE_ALPHABET + NUMBERS + "_" + "-" + "." + + +class OpenSearchSerializationAdapter(SerializationAdapter): + """Adapter for OpenSearch.""" + + def __init__(self) -> None: + """Initialize the OpenSearch adapter""" + super().__init__() + + self._date_format = "isoformat" + self._value_format = "dict" + + @override + def prepare_dump(self, data: dict[str, Any]) -> dict[str, Any]: + value = data.pop("value") + + data["value"] = { + "flat": value, + } + + return data + + @override + def prepare_load(self, data: dict[str, Any]) -> dict[str, Any]: + data["value"] = data.pop("value").get("flat") + + return data + + +class OpenSearchV1KeySanitizationStrategy(AlwaysHashStrategy): + def __init__(self) -> None: + super().__init__( + hash_length=64, + ) + + +class OpenSearchV1CollectionSanitizationStrategy(HybridSanitizationStrategy): + def __init__(self) -> None: + super().__init__( + replacement_character="_", + max_length=MAX_INDEX_LENGTH, + allowed_characters=UPPERCASE_ALPHABET + ALLOWED_INDEX_CHARACTERS, + hash_fragment_mode=HashFragmentMode.ALWAYS, + ) + + +class OpenSearchStore( + BaseEnumerateCollectionsStore, BaseEnumerateKeysStore, BaseDestroyCollectionStore, BaseCullStore, BaseContextManagerStore, BaseStore +): + """An OpenSearch-based store. + + Stores collections in their own indices and stores values in Flattened fields. + + This store has specific restrictions on what is allowed in keys and collections. Keys and collections are not sanitized + by default which may result in errors when using the store. + + To avoid issues, you may want to consider leveraging the `OpenSearchV1KeySanitizationStrategy` and + `OpenSearchV1CollectionSanitizationStrategy` strategies. + """ + + _client: AsyncOpenSearch + + _index_prefix: str + + _default_collection: str | None + + _serializer: SerializationAdapter + + _key_sanitization_strategy: SanitizationStrategy + _collection_sanitization_strategy: SanitizationStrategy + + @overload + def __init__( + self, + *, + opensearch_client: AsyncOpenSearch, + index_prefix: str, + default_collection: str | None = None, + key_sanitization_strategy: SanitizationStrategy | None = None, + collection_sanitization_strategy: SanitizationStrategy | None = None, + ) -> None: + """Initialize the opensearch store. + + Args: + opensearch_client: The opensearch client to use. + index_prefix: The index prefix to use. Collections will be prefixed with this prefix. + default_collection: The default collection to use if no collection is provided. + key_sanitization_strategy: The sanitization strategy to use for keys. + collection_sanitization_strategy: The sanitization strategy to use for collections. + """ + + @overload + def __init__( + self, + *, + url: str, + api_key: str | None = None, + index_prefix: str, + default_collection: str | None = None, + key_sanitization_strategy: SanitizationStrategy | None = None, + collection_sanitization_strategy: SanitizationStrategy | None = None, + ) -> None: + """Initialize the opensearch store. + + Args: + url: The url of the opensearch cluster. + api_key: The api key to use. + index_prefix: The index prefix to use. Collections will be prefixed with this prefix. + default_collection: The default collection to use if no collection is provided. + """ + + def __init__( + self, + *, + opensearch_client: AsyncOpenSearch | None = None, + url: str | None = None, + api_key: str | None = None, + index_prefix: str, + default_collection: str | None = None, + key_sanitization_strategy: SanitizationStrategy | None = None, + collection_sanitization_strategy: SanitizationStrategy | None = None, + ) -> None: + """Initialize the opensearch store. + + Args: + opensearch_client: The opensearch client to use. + url: The url of the opensearch cluster. + api_key: The api key to use. + index_prefix: The index prefix to use. Collections will be prefixed with this prefix. + default_collection: The default collection to use if no collection is provided. + key_sanitization_strategy: The sanitization strategy to use for keys. + collection_sanitization_strategy: The sanitization strategy to use for collections. + """ + if opensearch_client is None and url is None: + msg = "Either opensearch_client or url must be provided" + raise ValueError(msg) + + if opensearch_client: + self._client = opensearch_client + elif url: + client_kwargs: dict[str, Any] = { + "hosts": [url], + "http_compress": True, + "timeout": 10, + "max_retries": 3, + } + if api_key: + client_kwargs["api_key"] = api_key + + self._client = AsyncOpenSearch(**client_kwargs) + else: + msg = "Either opensearch_client or url must be provided" + raise ValueError(msg) + + LessCapableJsonSerializer.install_serializer(client=self._client) + + self._index_prefix = index_prefix.lower() + + self._serializer = OpenSearchSerializationAdapter() + + super().__init__( + default_collection=default_collection, + collection_sanitization_strategy=collection_sanitization_strategy, + key_sanitization_strategy=key_sanitization_strategy, + ) + + @override + async def _setup(self) -> None: + # OpenSearch doesn't have serverless mode, so we can skip the cluster info check + pass + + @override + async def _setup_collection(self, *, collection: str) -> None: + index_name = self._get_index_name(collection=collection) + + if await self._client.indices.exists(index=index_name): + return + + try: + _ = await self._client.indices.create(index=index_name, body={"mappings": DEFAULT_MAPPING, "settings": {}}) + except RequestError as e: + if "resource_already_exists_exception" in str(e).lower(): + return + raise + + def _get_index_name(self, collection: str) -> str: + return self._index_prefix + "-" + self._sanitize_collection(collection=collection).lower() + + def _get_document_id(self, key: str) -> str: + return self._sanitize_key(key=key) + + def _get_destination(self, *, collection: str, key: str) -> tuple[str, str]: + index_name: str = self._get_index_name(collection=collection) + document_id: str = self._get_document_id(key=key) + + return index_name, document_id + + @override + async def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None: + index_name, document_id = self._get_destination(collection=collection, key=key) + + try: + opensearch_response = await self._client.get(index=index_name, id=document_id) + except Exception: + return None + + body: dict[str, Any] = get_body_from_response(response=opensearch_response) + + if not (source := get_source_from_body(body=body)): + return None + + try: + return self._serializer.load_dict(data=source) + except DeserializationError: + return None + + @override + async def _get_managed_entries(self, *, collection: str, keys: Sequence[str]) -> list[ManagedEntry | None]: + if not keys: + return [] + + # Use mget for efficient batch retrieval + index_name = self._get_index_name(collection=collection) + document_ids = [self._get_document_id(key=key) for key in keys] + docs = [{"_id": document_id} for document_id in document_ids] + + try: + opensearch_response = await self._client.mget(index=index_name, body={"docs": docs}) + except Exception: + return [None] * len(keys) + + body: dict[str, Any] = get_body_from_response(response=opensearch_response) + docs_result = body.get("docs", []) + + entries_by_id: dict[str, ManagedEntry | None] = {} + for doc in docs_result: + if not (doc_id := doc.get("_id")): + continue + + if "found" not in doc or not doc.get("found"): + entries_by_id[doc_id] = None + continue + + if not (source := doc.get("_source")): + entries_by_id[doc_id] = None + continue + + try: + entries_by_id[doc_id] = self._serializer.load_dict(data=source) + except DeserializationError as e: + logger.error( + "Failed to deserialize OpenSearch document in batch operation", + extra={ + "collection": collection, + "document_id": doc_id, + "error": str(e), + }, + exc_info=True, + ) + entries_by_id[doc_id] = None + + # Return entries in the same order as input keys + return [entries_by_id.get(document_id) for document_id in document_ids] + + @override + async def _put_managed_entry( + self, + *, + key: str, + collection: str, + managed_entry: ManagedEntry, + ) -> None: + index_name: str = self._get_index_name(collection=collection) + document_id: str = self._get_document_id(key=key) + + document: dict[str, Any] = self._serializer.dump_dict(entry=managed_entry, key=key, collection=collection) + + try: + _ = await self._client.index( # type: ignore[reportUnknownVariableType] + index=index_name, + id=document_id, + body=document, + params={"refresh": "true"}, + ) + except Exception as e: + msg = f"Failed to serialize document: {e}" + raise SerializationError(message=msg) from e + + @override + async def _delete_managed_entry(self, *, key: str, collection: str) -> bool: + index_name: str = self._get_index_name(collection=collection) + document_id: str = self._get_document_id(key=key) + + try: + opensearch_response = await self._client.delete(index=index_name, id=document_id) + except Exception: + return False + + body: dict[str, Any] = get_body_from_response(response=opensearch_response) + + if not (result := body.get("result")) or not isinstance(result, str): + return False + + return result == "deleted" + + @override + async def _get_collection_keys(self, *, collection: str, limit: int | None = None) -> list[str]: + """Get up to 10,000 keys in the specified collection (eventually consistent).""" + + limit = min(limit or DEFAULT_PAGE_SIZE, PAGE_LIMIT) + + try: + result = await self._client.search( + index=self._get_index_name(collection=collection), + body={ + "query": { + "term": { + "collection": collection, + }, + }, + "_source": False, + "fields": ["key"], + "size": limit, + }, + ) + except Exception: + return [] + + if not (hits := get_hits_from_response(response=result)): + return [] + + all_keys: list[str] = [] + + for hit in hits: + if not (key := get_first_value_from_field_in_hit(hit=hit, field="key", value_type=str)): + continue + + all_keys.append(key) + + return all_keys + + @override + async def _get_collection_names(self, *, limit: int | None = None) -> list[str]: + """List up to 10,000 collections in the opensearch store (eventually consistent).""" + + limit = min(limit or DEFAULT_PAGE_SIZE, PAGE_LIMIT) + + try: + search_response = await self._client.search( + index=f"{self._index_prefix}-*", + body={ + "aggs": { + "collections": { + "terms": { + "field": "collection", + "size": limit, + }, + }, + }, + "size": 0, + }, + ) + except Exception: + return [] + + body: dict[str, Any] = get_body_from_response(response=search_response) + aggregations: dict[str, Any] = get_aggregations_from_body(body=body) + + if not aggregations or "collections" not in aggregations: + return [] + + buckets: list[Any] = aggregations["collections"].get("buckets", []) + + return [bucket["key"] for bucket in buckets if isinstance(bucket, dict) and "key" in bucket] + + @override + async def _delete_collection(self, *, collection: str) -> bool: + try: + result = await self._client.delete_by_query( + index=self._get_index_name(collection=collection), + body={ + "query": { + "term": { + "collection": collection, + }, + }, + }, + ) + except Exception: + return False + + body: dict[str, Any] = get_body_from_response(response=result) + + if not (deleted := body.get("deleted")) or not isinstance(deleted, int): + return False + + return deleted > 0 + + @override + async def _cull(self) -> None: + ms_epoch = int(now_as_epoch() * 1000) + with contextlib.suppress(Exception): + _ = await self._client.delete_by_query( + index=f"{self._index_prefix}-*", + body={ + "query": { + "range": { + "expires_at": {"lt": ms_epoch}, + }, + }, + }, + ) + + @override + async def _close(self) -> None: + await self._client.close() diff --git a/key-value/key-value-aio/src/key_value/aio/stores/opensearch/utils.py b/key-value/key-value-aio/src/key_value/aio/stores/opensearch/utils.py new file mode 100644 index 00000000..ebc75a9c --- /dev/null +++ b/key-value/key-value-aio/src/key_value/aio/stores/opensearch/utils.py @@ -0,0 +1,133 @@ +from typing import Any, TypeVar, cast + +from opensearchpy import AsyncOpenSearch +from opensearchpy.serializer import JSONSerializer + + +def get_body_from_response(response: Any) -> dict[str, Any]: + if not response: + return {} + + if isinstance(response, dict): + return cast("dict[str, Any]", response) + + # OpenSearch response objects might have a body attribute + if hasattr(response, "body"): + body = response.body + if not body: + return {} + if isinstance(body, dict): + return cast("dict[str, Any]", body) + + return {} + + +def get_source_from_body(body: dict[str, Any]) -> dict[str, Any]: + if not (source := body.get("_source")): + return {} + + if not isinstance(source, dict) or not all(isinstance(key, str) for key in source): # pyright: ignore[reportUnknownVariableType] + return {} + + return cast("dict[str, Any]", source) + + +def get_aggregations_from_body(body: dict[str, Any]) -> dict[str, Any]: + if not (aggregations := body.get("aggregations")): + return {} + + if not isinstance(aggregations, dict) or not all( + isinstance(key, str) + for key in aggregations # pyright: ignore[reportUnknownVariableType] + ): + return {} + + return cast("dict[str, Any]", aggregations) + + +def get_hits_from_response(response: Any) -> list[dict[str, Any]]: + body = get_body_from_response(response=response) + + if not body: + return [] + + if not (hits := body.get("hits")): + return [] + + hits_dict: dict[str, Any] = cast("dict[str, Any]", hits) + + if not (hits_list := hits_dict.get("hits")): + return [] + + if not all(isinstance(hit, dict) for hit in hits_list): # pyright: ignore[reportAny] + return [] + + hits_list_dict: list[dict[str, Any]] = cast("list[dict[str, Any]]", hits_list) + + return hits_list_dict + + +T = TypeVar("T") + + +def get_fields_from_hit(hit: dict[str, Any]) -> dict[str, list[Any]]: + if not (fields := hit.get("fields")): + return {} + + if not isinstance(fields, dict) or not all(isinstance(key, str) for key in fields): # pyright: ignore[reportUnknownVariableType] + msg = f"Fields in hit {hit} is not a dict" + raise TypeError(msg) + + if not all(isinstance(value, list) for value in fields.values()): # pyright: ignore[reportUnknownVariableType] + msg = f"Fields in hit {hit} is not a dict of lists" + raise TypeError(msg) + + return cast("dict[str, list[Any]]", fields) + + +def get_field_from_hit(hit: dict[str, Any], field: str) -> list[Any]: + if not (fields := get_fields_from_hit(hit=hit)): + return [] + + if not (value := fields.get(field)): + msg = f"Field {field} is not in hit {hit}" + raise TypeError(msg) + + return value + + +def get_values_from_field_in_hit(hit: dict[str, Any], field: str, value_type: type[T]) -> list[T]: + if not (value := get_field_from_hit(hit=hit, field=field)): + msg = f"Field {field} is not in hit {hit}" + raise TypeError(msg) + + if not all(isinstance(item, value_type) for item in value): # pyright: ignore[reportAny] + msg = f"Field {field} in hit {hit} is not a list of {value_type}" + raise TypeError(msg) + + return cast("list[T]", value) + + +def get_first_value_from_field_in_hit(hit: dict[str, Any], field: str, value_type: type[T]) -> T: + values: list[T] = get_values_from_field_in_hit(hit=hit, field=field, value_type=value_type) + if len(values) != 1: + msg: str = f"Field {field} in hit {hit} is not a single value" + raise TypeError(msg) + return values[0] + + +def new_bulk_action(action: str, index: str, document_id: str) -> dict[str, Any]: + return {action: {"_index": index, "_id": document_id}} + + +class LessCapableJsonSerializer(JSONSerializer): + """A JSON Serializer that doesnt try to be smart with datetime, floats, etc.""" + + def default(self, data: Any) -> Any: # type: ignore[reportIncompatibleMethodOverride] + msg = f"Unable to serialize to JSON: {data!r} (type: {type(data).__name__})" + raise TypeError(msg) + + @classmethod + def install_serializer(cls, client: AsyncOpenSearch) -> None: + # OpenSearch uses a different serializer architecture + client.transport.serializer = cls() # type: ignore[reportUnknownMemberType] diff --git a/key-value/key-value-aio/tests/stores/base.py b/key-value/key-value-aio/tests/stores/base.py index 43177ee2..c2e57acf 100644 --- a/key-value/key-value-aio/tests/stores/base.py +++ b/key-value/key-value-aio/tests/stores/base.py @@ -30,7 +30,7 @@ async def eventually_consistent(self) -> None: # noqa: B027 @abstractmethod async def store(self) -> BaseStore | AsyncGenerator[BaseStore, None]: ... - @pytest.mark.timeout(60) + @pytest.mark.timeout(90) async def test_store(self, store: BaseStore): """Tests that the store is a valid AsyncKeyValueProtocol.""" assert isinstance(store, AsyncKeyValueProtocol) is True diff --git a/key-value/key-value-aio/tests/stores/elasticsearch/test_elasticsearch.py b/key-value/key-value-aio/tests/stores/elasticsearch/test_elasticsearch.py index d47d6893..fc89561d 100644 --- a/key-value/key-value-aio/tests/stores/elasticsearch/test_elasticsearch.py +++ b/key-value/key-value-aio/tests/stores/elasticsearch/test_elasticsearch.py @@ -1,5 +1,6 @@ from collections.abc import AsyncGenerator from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING, Any import pytest from dirty_equals import IsFloat, IsStr @@ -19,6 +20,9 @@ from tests.conftest import docker_container, should_skip_docker_tests from tests.stores.base import BaseStoreTests, ContextManagerStoreTestMixin +if TYPE_CHECKING: + from elastic_transport._response import ObjectApiResponse + TEST_SIZE_LIMIT = 1 * 1024 * 1024 # 1MB ES_HOST = "localhost" ES_PORT = 9200 @@ -41,7 +45,12 @@ async def ping_elasticsearch() -> bool: es_client: AsyncElasticsearch = get_elasticsearch_client() async with es_client: - return await es_client.ping() + if not await es_client.ping(): + return False + + status: ObjectApiResponse[dict[str, Any]] = await es_client.options(ignore_status=404).cluster.health(wait_for_status="green") + + return status.body.get("status") == "green" async def cleanup_elasticsearch_indices(elasticsearch_client: AsyncElasticsearch): @@ -102,10 +111,7 @@ async def setup_elasticsearch(self, request: pytest.FixtureRequest) -> AsyncGene @pytest.fixture async def es_client(self) -> AsyncGenerator[AsyncElasticsearch, None]: async with AsyncElasticsearch(hosts=[ES_URL]) as es_client: - try: - yield es_client - finally: - await es_client.close() + yield es_client @override @pytest.fixture diff --git a/key-value/key-value-aio/tests/stores/opensearch/__init__.py b/key-value/key-value-aio/tests/stores/opensearch/__init__.py new file mode 100644 index 00000000..21c381ad --- /dev/null +++ b/key-value/key-value-aio/tests/stores/opensearch/__init__.py @@ -0,0 +1 @@ +# OpenSearch store tests diff --git a/key-value/key-value-aio/tests/stores/opensearch/test_opensearch.py b/key-value/key-value-aio/tests/stores/opensearch/test_opensearch.py new file mode 100644 index 00000000..5bdda247 --- /dev/null +++ b/key-value/key-value-aio/tests/stores/opensearch/test_opensearch.py @@ -0,0 +1,220 @@ +import contextlib +from collections.abc import AsyncGenerator +from datetime import datetime, timedelta, timezone +from typing import Any + +import pytest +from dirty_equals import IsFloat, IsStr +from inline_snapshot import snapshot +from key_value.shared.stores.wait import async_wait_for_true +from key_value.shared.utils.managed_entry import ManagedEntry +from opensearchpy import AsyncOpenSearch +from typing_extensions import override + +from key_value.aio.stores.base import BaseStore +from key_value.aio.stores.opensearch import OpenSearchStore +from key_value.aio.stores.opensearch.store import ( + OpenSearchSerializationAdapter, + OpenSearchV1CollectionSanitizationStrategy, + OpenSearchV1KeySanitizationStrategy, +) +from tests.conftest import docker_container, should_skip_docker_tests +from tests.stores.base import BaseStoreTests, ContextManagerStoreTestMixin + +TEST_SIZE_LIMIT = 1 * 1024 * 1024 # 1MB +LOCALHOST = "localhost" + +CONTAINER_PORT = 9200 +HOST_PORT = 19201 + +OPENSEARCH_URL = f"http://{LOCALHOST}:{HOST_PORT}" + + +WAIT_FOR_OPENSEARCH_TIMEOUT = 30 + +OPENSEARCH_VERSIONS_TO_TEST = [ + "2.11.0", # Released 2023 + "2.18.0", # Recent stable version +] + + +def get_opensearch_client() -> AsyncOpenSearch: + return AsyncOpenSearch(hosts=[OPENSEARCH_URL], use_ssl=False, verify_certs=False) + + +async def ping_opensearch() -> bool: + opensearch_client: AsyncOpenSearch = get_opensearch_client() + + async with opensearch_client: + try: + return await opensearch_client.ping() + except Exception: + return False + + +async def cleanup_opensearch_indices(opensearch_client: AsyncOpenSearch): + with contextlib.suppress(Exception): + indices = await opensearch_client.indices.get(index="opensearch-kv-store-e2e-test-*") + for index in indices: + _ = await opensearch_client.indices.delete(index=index) + + +class OpenSearchFailedToStartError(Exception): + pass + + +def test_managed_entry_document_conversion(): + created_at = datetime(year=2025, month=1, day=1, hour=0, minute=0, second=0, tzinfo=timezone.utc) + expires_at = created_at + timedelta(seconds=10) + + managed_entry = ManagedEntry(value={"test": "test"}, created_at=created_at, expires_at=expires_at) + adapter = OpenSearchSerializationAdapter() + document = adapter.dump_dict(entry=managed_entry) + + assert document == snapshot( + { + "version": 1, + "value": {"flat": {"test": "test"}}, + "created_at": "2025-01-01T00:00:00+00:00", + "expires_at": "2025-01-01T00:00:10+00:00", + } + ) + + round_trip_managed_entry = adapter.load_dict(data=document) + + assert round_trip_managed_entry.value == managed_entry.value + assert round_trip_managed_entry.created_at == created_at + assert round_trip_managed_entry.ttl == IsFloat(lt=0) + assert round_trip_managed_entry.expires_at == expires_at + + +@pytest.mark.skipif(should_skip_docker_tests(), reason="Docker is not available") +@pytest.mark.filterwarnings("ignore:A configured store is unstable and may change in a backwards incompatible way. Use at your own risk.") +class TestOpenSearchStore(ContextManagerStoreTestMixin, BaseStoreTests): + @pytest.fixture(autouse=True, scope="session", params=OPENSEARCH_VERSIONS_TO_TEST) + async def setup_opensearch(self, request: pytest.FixtureRequest) -> AsyncGenerator[None, None]: + version = request.param + os_image = f"opensearchproject/opensearch:{version}" + + with docker_container( + f"opensearch-test-{version}", + os_image, + {str(CONTAINER_PORT): HOST_PORT}, + { + "discovery.type": "single-node", + "DISABLE_SECURITY_PLUGIN": "true", + "OPENSEARCH_INITIAL_ADMIN_PASSWORD": "TestPassword123!", + }, + ): + if not await async_wait_for_true(bool_fn=ping_opensearch, tries=WAIT_FOR_OPENSEARCH_TIMEOUT, wait_time=2): + msg = f"OpenSearch {version} failed to start" + raise OpenSearchFailedToStartError(msg) + + yield + + @pytest.fixture + async def opensearch_client(self, setup_opensearch: None) -> AsyncGenerator[AsyncOpenSearch, None]: + opensearch_client = get_opensearch_client() + + async with opensearch_client: + await cleanup_opensearch_indices(opensearch_client=opensearch_client) + + yield opensearch_client + + @override + @pytest.fixture + async def store(self, opensearch_client: AsyncOpenSearch) -> AsyncGenerator[BaseStore, None]: + store = OpenSearchStore( + opensearch_client=opensearch_client, + index_prefix="opensearch-kv-store-e2e-test", + default_collection="test-collection", + ) + + async with store: + yield store + + @override + @pytest.fixture + async def sanitizing_store(self, opensearch_client: AsyncOpenSearch) -> AsyncGenerator[BaseStore, None]: + store = OpenSearchStore( + opensearch_client=opensearch_client, + index_prefix="opensearch-kv-store-e2e-test", + default_collection="test-collection", + key_sanitization_strategy=OpenSearchV1KeySanitizationStrategy(), + collection_sanitization_strategy=OpenSearchV1CollectionSanitizationStrategy(), + ) + + async with store: + yield store + + @pytest.mark.skip(reason="Distributed Caches are unbounded") + @override + async def test_not_unbounded(self, store: BaseStore): ... + + @pytest.mark.skip(reason="Skip concurrent tests on distributed caches") + @override + async def test_concurrent_operations(self, store: BaseStore): ... + + @override + async def test_long_collection_name(self, store: OpenSearchStore, sanitizing_store: OpenSearchStore): # pyright: ignore[reportIncompatibleMethodOverride] + with pytest.raises(Exception): # noqa: B017, PT011 + await store.put(collection="test_collection" * 100, key="test_key", value={"test": "test"}) + + await sanitizing_store.put(collection="test_collection" * 100, key="test_key", value={"test": "test"}) + assert await sanitizing_store.get(collection="test_collection" * 100, key="test_key") == {"test": "test"} + + @override + async def test_long_key_name(self, store: OpenSearchStore, sanitizing_store: OpenSearchStore): # pyright: ignore[reportIncompatibleMethodOverride] + """Tests that a long key name will not raise an error.""" + with pytest.raises(Exception): # noqa: B017, PT011 + await store.put(collection="test_collection", key="test_key" * 100, value={"test": "test"}) + + await sanitizing_store.put(collection="test_collection", key="test_key" * 100, value={"test": "test"}) + assert await sanitizing_store.get(collection="test_collection", key="test_key" * 100) == {"test": "test"} + + async def test_put_put_two_indices(self, store: OpenSearchStore, opensearch_client: AsyncOpenSearch): + await store.put(collection="test_collection", key="test_key", value={"test": "test"}) + await store.put(collection="test_collection_2", key="test_key", value={"test": "test"}) + assert await store.get(collection="test_collection", key="test_key") == {"test": "test"} + assert await store.get(collection="test_collection_2", key="test_key") == {"test": "test"} + + indices: dict[str, Any] = await opensearch_client.indices.get(index="opensearch-kv-store-e2e-test-*") + index_names: list[str] = list(indices.keys()) + assert index_names == snapshot(["opensearch-kv-store-e2e-test-test_collection", "opensearch-kv-store-e2e-test-test_collection_2"]) + + async def test_value_stored_as_f_object(self, store: OpenSearchStore, opensearch_client: AsyncOpenSearch): + """Verify values are stored as f objects, not JSON strings""" + await store.put(collection="test", key="test_key", value={"name": "Alice", "age": 30}) + + index_name = store._get_index_name(collection="test") # pyright: ignore[reportPrivateUsage] + doc_id = store._get_document_id(key="test_key") # pyright: ignore[reportPrivateUsage] + + response = await opensearch_client.get(index=index_name, id=doc_id) + assert response["_source"] == snapshot( + { + "version": 1, + "key": "test_key", + "collection": "test", + "value": {"flat": {"name": "Alice", "age": 30}}, + "created_at": IsStr(min_length=20, max_length=40), + } + ) + + # Test with TTL + await store.put(collection="test", key="test_key", value={"name": "Bob", "age": 25}, ttl=10) + response = await opensearch_client.get(index=index_name, id=doc_id) + assert response["_source"] == snapshot( + { + "version": 1, + "key": "test_key", + "collection": "test", + "value": {"flat": {"name": "Bob", "age": 25}}, + "created_at": IsStr(min_length=20, max_length=40), + "expires_at": IsStr(min_length=20, max_length=40), + } + ) + + @override + async def test_special_characters_in_collection_name(self, store: OpenSearchStore, sanitizing_store: OpenSearchStore): # pyright: ignore[reportIncompatibleMethodOverride] + """Tests that a special characters in the collection name will not raise an error.""" + await super().test_special_characters_in_collection_name(store=sanitizing_store) diff --git a/key-value/key-value-sync/pyproject.toml b/key-value/key-value-sync/pyproject.toml index 50770808..9bf26899 100644 --- a/key-value/key-value-sync/pyproject.toml +++ b/key-value/key-value-sync/pyproject.toml @@ -41,6 +41,7 @@ valkey = ["valkey-glide-sync>=2.1.0"] vault = ["hvac>=2.3.0", "types-hvac>=2.3.0"] memcached = ["aiomcache>=0.8.0"] elasticsearch = ["elasticsearch>=8.0.0", "aiohttp>=3.12"] +opensearch = ["opensearch-py[async]>=2.0.0"] pydantic = ["pydantic>=2.11.9"] keyring = ["keyring>=25.6.0"] keyring-linux = ["keyring>=25.6.0", "dbus-python>=1.4.0"] @@ -68,7 +69,7 @@ env_files = [".env"] [dependency-groups] dev = [ - "py-key-value-sync[memory,disk,filetree,redis,elasticsearch,memcached,mongodb,vault,rocksdb,duckdb]", + "py-key-value-sync[memory,disk,filetree,redis,elasticsearch,opensearch,memcached,mongodb,vault,rocksdb,duckdb]", "py-key-value-sync[valkey]; platform_system != 'Windows'", "py-key-value-sync[pydantic]", "py-key-value-sync[keyring]", diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/disk/multi_store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/disk/multi_store.py index 9bcc33c2..639c7d40 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/disk/multi_store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/disk/multi_store.py @@ -16,7 +16,7 @@ from diskcache import Cache from pathvalidate import sanitize_filename except ImportError as e: - msg = "DiskStore requires py-key-value-aio[disk]" + msg = "DiskStore requires py-key-value-sync[disk]" raise ImportError(msg) from e CacheFactory = Callable[[str], Cache] diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/disk/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/disk/store.py index 07c0b929..2fbedb6c 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/disk/store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/disk/store.py @@ -14,7 +14,7 @@ try: from diskcache import Cache except ImportError as e: - msg = "DiskStore requires py-key-value-aio[disk]" + msg = "DiskStore requires py-key-value-sync[disk]" raise ImportError(msg) from e diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/duckdb/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/duckdb/store.py index 15c7713f..c4b0670b 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/duckdb/store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/duckdb/store.py @@ -16,7 +16,7 @@ try: import duckdb except ImportError as e: - msg = "DuckDBStore requires the duckdb extra from py-key-value-aio or py-key-value-sync" + msg = "DuckDBStore requires the duckdb extra from py-key-value-sync or py-key-value-sync" raise ImportError(msg) from e diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/keyring/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/keyring/store.py index 5ff8a812..5c1f1180 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/keyring/store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/keyring/store.py @@ -18,7 +18,7 @@ import keyring from keyring.errors import PasswordDeleteError except ImportError as e: - msg = "KeyringStore requires py-key-value-aio[keyring]" + msg = "KeyringStore requires py-key-value-sync[keyring]" raise ImportError(msg) from e DEFAULT_KEYCHAIN_SERVICE = "py-key-value" diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/memory/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/memory/store.py index 89575d86..41c2fa49 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/memory/store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/memory/store.py @@ -21,7 +21,7 @@ try: from cachetools import TLRUCache except ImportError as e: - msg = "MemoryStore requires py-key-value-aio[memory]" + msg = "MemoryStore requires py-key-value-sync[memory]" raise ImportError(msg) from e diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/mongodb/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/mongodb/store.py index a66c7178..7b016de9 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/mongodb/store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/mongodb/store.py @@ -21,7 +21,7 @@ from pymongo.database import Database from pymongo.results import DeleteResult # noqa: TC002 except ImportError as e: - msg = "MongoDBStore requires py-key-value-aio[mongodb]" + msg = "MongoDBStore requires py-key-value-sync[mongodb]" raise ImportError(msg) from e DEFAULT_DB = "kv-store-adapter" diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/opensearch/__init__.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/opensearch/__init__.py new file mode 100644 index 00000000..52af1c01 --- /dev/null +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/opensearch/__init__.py @@ -0,0 +1,6 @@ +# WARNING: this file is auto-generated by 'build_sync_library.py' +# from the original file '__init__.py' +# DO NOT CHANGE! Change the original file instead. +from key_value.sync.code_gen.stores.opensearch.store import OpenSearchStore + +__all__ = ["OpenSearchStore"] diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/opensearch/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/opensearch/store.py new file mode 100644 index 00000000..fc023afe --- /dev/null +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/opensearch/store.py @@ -0,0 +1,420 @@ +# WARNING: this file is auto-generated by 'build_sync_library.py' +# from the original file 'store.py' +# DO NOT CHANGE! Change the original file instead. +import contextlib +import logging +from collections.abc import Sequence +from typing import Any, overload + +from key_value.shared.errors import DeserializationError, SerializationError +from key_value.shared.utils.managed_entry import ManagedEntry +from key_value.shared.utils.sanitization import AlwaysHashStrategy, HashFragmentMode, HybridSanitizationStrategy, SanitizationStrategy +from key_value.shared.utils.sanitize import ALPHANUMERIC_CHARACTERS, LOWERCASE_ALPHABET, NUMBERS, UPPERCASE_ALPHABET +from key_value.shared.utils.serialization import SerializationAdapter +from key_value.shared.utils.time_to_live import now_as_epoch +from typing_extensions import override + +from key_value.sync.code_gen.stores.base import ( + BaseContextManagerStore, + BaseCullStore, + BaseDestroyCollectionStore, + BaseEnumerateCollectionsStore, + BaseEnumerateKeysStore, + BaseStore, +) +from key_value.sync.code_gen.stores.opensearch.utils import LessCapableJsonSerializer + +try: + from opensearchpy import OpenSearch + from opensearchpy.exceptions import RequestError + + from key_value.sync.code_gen.stores.opensearch.utils import ( + get_aggregations_from_body, + get_body_from_response, + get_first_value_from_field_in_hit, + get_hits_from_response, + get_source_from_body, + ) +except ImportError as e: + msg = "OpenSearchStore requires opensearch-py>=2.0.0. Install with: pip install 'py-key-value-sync[opensearch]'" + raise ImportError(msg) from e + +logger = logging.getLogger(__name__) + +DEFAULT_INDEX_PREFIX = "opensearch_kv_store" + +DEFAULT_MAPPING = { + "properties": { + "created_at": {"type": "date"}, + "expires_at": {"type": "date"}, + "collection": {"type": "keyword"}, + "key": {"type": "keyword"}, + "value": {"properties": {"flat": {"type": "flat_object"}}}, + } +} + +DEFAULT_PAGE_SIZE = 10000 +PAGE_LIMIT = 10000 + +MAX_KEY_LENGTH = 256 +ALLOWED_KEY_CHARACTERS: str = ALPHANUMERIC_CHARACTERS + +MAX_INDEX_LENGTH = 200 +ALLOWED_INDEX_CHARACTERS: str = LOWERCASE_ALPHABET + NUMBERS + "_" + "-" + "." + + +class OpenSearchSerializationAdapter(SerializationAdapter): + """Adapter for OpenSearch.""" + + def __init__(self) -> None: + """Initialize the OpenSearch adapter""" + super().__init__() + + self._date_format = "isoformat" + self._value_format = "dict" + + @override + def prepare_dump(self, data: dict[str, Any]) -> dict[str, Any]: + value = data.pop("value") + + data["value"] = {"flat": value} + + return data + + @override + def prepare_load(self, data: dict[str, Any]) -> dict[str, Any]: + data["value"] = data.pop("value").get("flat") + + return data + + +class OpenSearchV1KeySanitizationStrategy(AlwaysHashStrategy): + def __init__(self) -> None: + super().__init__(hash_length=64) + + +class OpenSearchV1CollectionSanitizationStrategy(HybridSanitizationStrategy): + def __init__(self) -> None: + super().__init__( + replacement_character="_", + max_length=MAX_INDEX_LENGTH, + allowed_characters=UPPERCASE_ALPHABET + ALLOWED_INDEX_CHARACTERS, + hash_fragment_mode=HashFragmentMode.ALWAYS, + ) + + +class OpenSearchStore( + BaseEnumerateCollectionsStore, BaseEnumerateKeysStore, BaseDestroyCollectionStore, BaseCullStore, BaseContextManagerStore, BaseStore +): + """An OpenSearch-based store. + + Stores collections in their own indices and stores values in Flattened fields. + + This store has specific restrictions on what is allowed in keys and collections. Keys and collections are not sanitized + by default which may result in errors when using the store. + + To avoid issues, you may want to consider leveraging the `OpenSearchV1KeySanitizationStrategy` and + `OpenSearchV1CollectionSanitizationStrategy` strategies. + """ + + _client: OpenSearch + + _index_prefix: str + + _default_collection: str | None + + _serializer: SerializationAdapter + + _key_sanitization_strategy: SanitizationStrategy + _collection_sanitization_strategy: SanitizationStrategy + + @overload + def __init__( + self, + *, + opensearch_client: OpenSearch, + index_prefix: str, + default_collection: str | None = None, + key_sanitization_strategy: SanitizationStrategy | None = None, + collection_sanitization_strategy: SanitizationStrategy | None = None, + ) -> None: + """Initialize the opensearch store. + + Args: + opensearch_client: The opensearch client to use. + index_prefix: The index prefix to use. Collections will be prefixed with this prefix. + default_collection: The default collection to use if no collection is provided. + key_sanitization_strategy: The sanitization strategy to use for keys. + collection_sanitization_strategy: The sanitization strategy to use for collections. + """ + + @overload + def __init__( + self, + *, + url: str, + api_key: str | None = None, + index_prefix: str, + default_collection: str | None = None, + key_sanitization_strategy: SanitizationStrategy | None = None, + collection_sanitization_strategy: SanitizationStrategy | None = None, + ) -> None: + """Initialize the opensearch store. + + Args: + url: The url of the opensearch cluster. + api_key: The api key to use. + index_prefix: The index prefix to use. Collections will be prefixed with this prefix. + default_collection: The default collection to use if no collection is provided. + """ + + def __init__( + self, + *, + opensearch_client: OpenSearch | None = None, + url: str | None = None, + api_key: str | None = None, + index_prefix: str, + default_collection: str | None = None, + key_sanitization_strategy: SanitizationStrategy | None = None, + collection_sanitization_strategy: SanitizationStrategy | None = None, + ) -> None: + """Initialize the opensearch store. + + Args: + opensearch_client: The opensearch client to use. + url: The url of the opensearch cluster. + api_key: The api key to use. + index_prefix: The index prefix to use. Collections will be prefixed with this prefix. + default_collection: The default collection to use if no collection is provided. + key_sanitization_strategy: The sanitization strategy to use for keys. + collection_sanitization_strategy: The sanitization strategy to use for collections. + """ + if opensearch_client is None and url is None: + msg = "Either opensearch_client or url must be provided" + raise ValueError(msg) + + if opensearch_client: + self._client = opensearch_client + elif url: + client_kwargs: dict[str, Any] = {"hosts": [url], "http_compress": True, "timeout": 10, "max_retries": 3} + if api_key: + client_kwargs["api_key"] = api_key + + self._client = OpenSearch(**client_kwargs) + else: + msg = "Either opensearch_client or url must be provided" + raise ValueError(msg) + + LessCapableJsonSerializer.install_serializer(client=self._client) + + self._index_prefix = index_prefix.lower() + + self._serializer = OpenSearchSerializationAdapter() + + super().__init__( + default_collection=default_collection, + collection_sanitization_strategy=collection_sanitization_strategy, + key_sanitization_strategy=key_sanitization_strategy, + ) + + @override + def _setup(self) -> None: + # OpenSearch doesn't have serverless mode, so we can skip the cluster info check + pass + + @override + def _setup_collection(self, *, collection: str) -> None: + index_name = self._get_index_name(collection=collection) + + if self._client.indices.exists(index=index_name): + return + + try: + _ = self._client.indices.create(index=index_name, body={"mappings": DEFAULT_MAPPING, "settings": {}}) + except RequestError as e: + if "resource_already_exists_exception" in str(e).lower(): + return + raise + + def _get_index_name(self, collection: str) -> str: + return self._index_prefix + "-" + self._sanitize_collection(collection=collection).lower() + + def _get_document_id(self, key: str) -> str: + return self._sanitize_key(key=key) + + def _get_destination(self, *, collection: str, key: str) -> tuple[str, str]: + index_name: str = self._get_index_name(collection=collection) + document_id: str = self._get_document_id(key=key) + + return (index_name, document_id) + + @override + def _get_managed_entry(self, *, key: str, collection: str) -> ManagedEntry | None: + (index_name, document_id) = self._get_destination(collection=collection, key=key) + + try: + opensearch_response = self._client.get(index=index_name, id=document_id) + except Exception: + return None + + body: dict[str, Any] = get_body_from_response(response=opensearch_response) + + if not (source := get_source_from_body(body=body)): + return None + + try: + return self._serializer.load_dict(data=source) + except DeserializationError: + return None + + @override + def _get_managed_entries(self, *, collection: str, keys: Sequence[str]) -> list[ManagedEntry | None]: + if not keys: + return [] + + # Use mget for efficient batch retrieval + index_name = self._get_index_name(collection=collection) + document_ids = [self._get_document_id(key=key) for key in keys] + docs = [{"_id": document_id} for document_id in document_ids] + + try: + opensearch_response = self._client.mget(index=index_name, body={"docs": docs}) + except Exception: + return [None] * len(keys) + + body: dict[str, Any] = get_body_from_response(response=opensearch_response) + docs_result = body.get("docs", []) + + entries_by_id: dict[str, ManagedEntry | None] = {} + for doc in docs_result: + if not (doc_id := doc.get("_id")): + continue + + if "found" not in doc or not doc.get("found"): + entries_by_id[doc_id] = None + continue + + if not (source := doc.get("_source")): + entries_by_id[doc_id] = None + continue + + try: + entries_by_id[doc_id] = self._serializer.load_dict(data=source) + except DeserializationError as e: + logger.error( + "Failed to deserialize OpenSearch document in batch operation", + extra={"collection": collection, "document_id": doc_id, "error": str(e)}, + exc_info=True, + ) + entries_by_id[doc_id] = None + + # Return entries in the same order as input keys + return [entries_by_id.get(document_id) for document_id in document_ids] + + @override + def _put_managed_entry(self, *, key: str, collection: str, managed_entry: ManagedEntry) -> None: + index_name: str = self._get_index_name(collection=collection) + document_id: str = self._get_document_id(key=key) + + document: dict[str, Any] = self._serializer.dump_dict(entry=managed_entry, key=key, collection=collection) + + try: # type: ignore[reportUnknownVariableType] + _ = self._client.index(index=index_name, id=document_id, body=document, params={"refresh": "true"}) + except Exception as e: + msg = f"Failed to serialize document: {e}" + raise SerializationError(message=msg) from e + + @override + def _delete_managed_entry(self, *, key: str, collection: str) -> bool: + index_name: str = self._get_index_name(collection=collection) + document_id: str = self._get_document_id(key=key) + + try: + opensearch_response = self._client.delete(index=index_name, id=document_id) + except Exception: + return False + + body: dict[str, Any] = get_body_from_response(response=opensearch_response) + + if not (result := body.get("result")) or not isinstance(result, str): + return False + + return result == "deleted" + + @override + def _get_collection_keys(self, *, collection: str, limit: int | None = None) -> list[str]: + """Get up to 10,000 keys in the specified collection (eventually consistent).""" + + limit = min(limit or DEFAULT_PAGE_SIZE, PAGE_LIMIT) + + try: + result = self._client.search( + index=self._get_index_name(collection=collection), + body={"query": {"term": {"collection": collection}}, "_source": False, "fields": ["key"], "size": limit}, + ) + except Exception: + return [] + + if not (hits := get_hits_from_response(response=result)): + return [] + + all_keys: list[str] = [] + + for hit in hits: + if not (key := get_first_value_from_field_in_hit(hit=hit, field="key", value_type=str)): + continue + + all_keys.append(key) + + return all_keys + + @override + def _get_collection_names(self, *, limit: int | None = None) -> list[str]: + """List up to 10,000 collections in the opensearch store (eventually consistent).""" + + limit = min(limit or DEFAULT_PAGE_SIZE, PAGE_LIMIT) + + try: + search_response = self._client.search( + index=f"{self._index_prefix}-*", + body={"aggs": {"collections": {"terms": {"field": "collection", "size": limit}}}, "size": 0}, + ) + except Exception: + return [] + + body: dict[str, Any] = get_body_from_response(response=search_response) + aggregations: dict[str, Any] = get_aggregations_from_body(body=body) + + if not aggregations or "collections" not in aggregations: + return [] + + buckets: list[Any] = aggregations["collections"].get("buckets", []) + + return [bucket["key"] for bucket in buckets if isinstance(bucket, dict) and "key" in bucket] + + @override + def _delete_collection(self, *, collection: str) -> bool: + try: + result = self._client.delete_by_query( + index=self._get_index_name(collection=collection), body={"query": {"term": {"collection": collection}}} + ) + except Exception: + return False + + body: dict[str, Any] = get_body_from_response(response=result) + + if not (deleted := body.get("deleted")) or not isinstance(deleted, int): + return False + + return deleted > 0 + + @override + def _cull(self) -> None: + ms_epoch = int(now_as_epoch() * 1000) + with contextlib.suppress(Exception): + _ = self._client.delete_by_query(index=f"{self._index_prefix}-*", body={"query": {"range": {"expires_at": {"lt": ms_epoch}}}}) + + @override + def _close(self) -> None: + self._client.close() diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/opensearch/utils.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/opensearch/utils.py new file mode 100644 index 00000000..5f0490df --- /dev/null +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/opensearch/utils.py @@ -0,0 +1,133 @@ +# WARNING: this file is auto-generated by 'build_sync_library.py' +# from the original file 'utils.py' +# DO NOT CHANGE! Change the original file instead. +from typing import Any, TypeVar, cast + +from opensearchpy import OpenSearch +from opensearchpy.serializer import JSONSerializer + + +def get_body_from_response(response: Any) -> dict[str, Any]: + if not response: + return {} + + if isinstance(response, dict): + return cast("dict[str, Any]", response) + + # OpenSearch response objects might have a body attribute + if hasattr(response, "body"): + body = response.body + if not body: + return {} + if isinstance(body, dict): + return cast("dict[str, Any]", body) + + return {} + + +def get_source_from_body(body: dict[str, Any]) -> dict[str, Any]: + if not (source := body.get("_source")): + return {} + + if not isinstance(source, dict) or not all(isinstance(key, str) for key in source): # pyright: ignore[reportUnknownVariableType] + return {} + + return cast("dict[str, Any]", source) + + +def get_aggregations_from_body(body: dict[str, Any]) -> dict[str, Any]: + if not (aggregations := body.get("aggregations")): + return {} + + if not isinstance(aggregations, dict) or not all(isinstance(key, str) for key in aggregations): # pyright: ignore[reportUnknownVariableType] + return {} + + return cast("dict[str, Any]", aggregations) + + +def get_hits_from_response(response: Any) -> list[dict[str, Any]]: + body = get_body_from_response(response=response) + + if not body: + return [] + + if not (hits := body.get("hits")): + return [] + + hits_dict: dict[str, Any] = cast("dict[str, Any]", hits) + + if not (hits_list := hits_dict.get("hits")): + return [] + + if not all(isinstance(hit, dict) for hit in hits_list): # pyright: ignore[reportAny] + return [] + + hits_list_dict: list[dict[str, Any]] = cast("list[dict[str, Any]]", hits_list) + + return hits_list_dict + + +T = TypeVar("T") + + +def get_fields_from_hit(hit: dict[str, Any]) -> dict[str, list[Any]]: + if not (fields := hit.get("fields")): + return {} + + if not isinstance(fields, dict) or not all(isinstance(key, str) for key in fields): # pyright: ignore[reportUnknownVariableType] + msg = f"Fields in hit {hit} is not a dict" + raise TypeError(msg) + + if not all(isinstance(value, list) for value in fields.values()): # pyright: ignore[reportUnknownVariableType] + msg = f"Fields in hit {hit} is not a dict of lists" + raise TypeError(msg) + + return cast("dict[str, list[Any]]", fields) + + +def get_field_from_hit(hit: dict[str, Any], field: str) -> list[Any]: + if not (fields := get_fields_from_hit(hit=hit)): + return [] + + if not (value := fields.get(field)): + msg = f"Field {field} is not in hit {hit}" + raise TypeError(msg) + + return value + + +def get_values_from_field_in_hit(hit: dict[str, Any], field: str, value_type: type[T]) -> list[T]: + if not (value := get_field_from_hit(hit=hit, field=field)): + msg = f"Field {field} is not in hit {hit}" + raise TypeError(msg) + + if not all(isinstance(item, value_type) for item in value): # pyright: ignore[reportAny] + msg = f"Field {field} in hit {hit} is not a list of {value_type}" + raise TypeError(msg) + + return cast("list[T]", value) + + +def get_first_value_from_field_in_hit(hit: dict[str, Any], field: str, value_type: type[T]) -> T: + values: list[T] = get_values_from_field_in_hit(hit=hit, field=field, value_type=value_type) + if len(values) != 1: + msg: str = f"Field {field} in hit {hit} is not a single value" + raise TypeError(msg) + return values[0] + + +def new_bulk_action(action: str, index: str, document_id: str) -> dict[str, Any]: + return {action: {"_index": index, "_id": document_id}} + + +class LessCapableJsonSerializer(JSONSerializer): + """A JSON Serializer that doesnt try to be smart with datetime, floats, etc.""" + + def default(self, data: Any) -> Any: # type: ignore[reportIncompatibleMethodOverride] + msg = f"Unable to serialize to JSON: {data!r} (type: {type(data).__name__})" + raise TypeError(msg) + + @classmethod + def install_serializer(cls, client: OpenSearch) -> None: + # OpenSearch uses a different serializer architecture + client.transport.serializer = cls() # type: ignore[reportUnknownMemberType] diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/redis/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/redis/store.py index 6885cc4e..6b167d0b 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/redis/store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/redis/store.py @@ -18,7 +18,7 @@ try: from redis import Redis except ImportError as e: - msg = "RedisStore requires py-key-value-aio[redis]" + msg = "RedisStore requires py-key-value-sync[redis]" raise ImportError(msg) from e DEFAULT_PAGE_SIZE = 10000 diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/rocksdb/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/rocksdb/store.py index 5b1535eb..a8ba929c 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/rocksdb/store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/rocksdb/store.py @@ -16,7 +16,7 @@ try: from rocksdict import Options, Rdict, WriteBatch except ImportError as e: - msg = "RocksDBStore requires py-key-value-aio[rocksdb]" + msg = "RocksDBStore requires py-key-value-sync[rocksdb]" raise ImportError(msg) from e diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/valkey/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/valkey/store.py index 888029d0..dc3bc09b 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/valkey/store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/valkey/store.py @@ -15,7 +15,7 @@ from glide_sync.config import GlideClientConfiguration, NodeAddress, ServerCredentials from glide_sync.glide_client import BaseClient, GlideClient except ImportError as e: - msg = "ValkeyStore requires py-key-value-aio[valkey]" + msg = "ValkeyStore requires py-key-value-sync[valkey]" raise ImportError(msg) from e DEFAULT_PAGE_SIZE = 10000 diff --git a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/vault/store.py b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/vault/store.py index b96d6af9..d7154dc6 100644 --- a/key-value/key-value-sync/src/key_value/sync/code_gen/stores/vault/store.py +++ b/key-value/key-value-sync/src/key_value/sync/code_gen/stores/vault/store.py @@ -16,7 +16,7 @@ from hvac.api.secrets_engines.kv_v2 import KvV2 from hvac.exceptions import InvalidPath except ImportError as e: - msg = "VaultStore requires py-key-value-aio[vault]" + msg = "VaultStore requires py-key-value-sync[vault]" raise ImportError(msg) from e diff --git a/key-value/key-value-sync/src/key_value/sync/stores/opensearch/__init__.py b/key-value/key-value-sync/src/key_value/sync/stores/opensearch/__init__.py new file mode 100644 index 00000000..52af1c01 --- /dev/null +++ b/key-value/key-value-sync/src/key_value/sync/stores/opensearch/__init__.py @@ -0,0 +1,6 @@ +# WARNING: this file is auto-generated by 'build_sync_library.py' +# from the original file '__init__.py' +# DO NOT CHANGE! Change the original file instead. +from key_value.sync.code_gen.stores.opensearch.store import OpenSearchStore + +__all__ = ["OpenSearchStore"] diff --git a/key-value/key-value-sync/tests/code_gen/stores/base.py b/key-value/key-value-sync/tests/code_gen/stores/base.py index b78e279d..9c6303b1 100644 --- a/key-value/key-value-sync/tests/code_gen/stores/base.py +++ b/key-value/key-value-sync/tests/code_gen/stores/base.py @@ -27,7 +27,7 @@ def eventually_consistent(self) -> None: # noqa: B027 @abstractmethod def store(self) -> BaseStore | Generator[BaseStore, None, None]: ... - @pytest.mark.timeout(60) + @pytest.mark.timeout(90) def test_store(self, store: BaseStore): """Tests that the store is a valid KeyValueProtocol.""" assert isinstance(store, KeyValueProtocol) is True diff --git a/key-value/key-value-sync/tests/code_gen/stores/elasticsearch/test_elasticsearch.py b/key-value/key-value-sync/tests/code_gen/stores/elasticsearch/test_elasticsearch.py index 2bdf1274..778178a8 100644 --- a/key-value/key-value-sync/tests/code_gen/stores/elasticsearch/test_elasticsearch.py +++ b/key-value/key-value-sync/tests/code_gen/stores/elasticsearch/test_elasticsearch.py @@ -3,6 +3,7 @@ # DO NOT CHANGE! Change the original file instead. from collections.abc import Generator from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING, Any import pytest from dirty_equals import IsFloat, IsStr @@ -22,6 +23,9 @@ from tests.code_gen.conftest import docker_container, should_skip_docker_tests from tests.code_gen.stores.base import BaseStoreTests, ContextManagerStoreTestMixin +if TYPE_CHECKING: + from elastic_transport._response import ObjectApiResponse + TEST_SIZE_LIMIT = 1 * 1024 * 1024 # 1MB ES_HOST = "localhost" ES_PORT = 9200 @@ -42,7 +46,12 @@ def ping_elasticsearch() -> bool: es_client: Elasticsearch = get_elasticsearch_client() with es_client: - return es_client.ping() + if not es_client.ping(): + return False + + status: ObjectApiResponse[dict[str, Any]] = es_client.options(ignore_status=404).cluster.health(wait_for_status="green") + + return status.body.get("status") == "green" def cleanup_elasticsearch_indices(elasticsearch_client: Elasticsearch): @@ -103,10 +112,7 @@ def setup_elasticsearch(self, request: pytest.FixtureRequest) -> Generator[None, @pytest.fixture def es_client(self) -> Generator[Elasticsearch, None, None]: with Elasticsearch(hosts=[ES_URL]) as es_client: - try: - yield es_client - finally: - es_client.close() + yield es_client @override @pytest.fixture diff --git a/key-value/key-value-sync/tests/code_gen/stores/opensearch/__init__.py b/key-value/key-value-sync/tests/code_gen/stores/opensearch/__init__.py new file mode 100644 index 00000000..7f876cc2 --- /dev/null +++ b/key-value/key-value-sync/tests/code_gen/stores/opensearch/__init__.py @@ -0,0 +1,4 @@ +# WARNING: this file is auto-generated by 'build_sync_library.py' +# from the original file '__init__.py' +# DO NOT CHANGE! Change the original file instead. +# OpenSearch store tests diff --git a/key-value/key-value-sync/tests/code_gen/stores/opensearch/test_opensearch.py b/key-value/key-value-sync/tests/code_gen/stores/opensearch/test_opensearch.py new file mode 100644 index 00000000..d1527d4f --- /dev/null +++ b/key-value/key-value-sync/tests/code_gen/stores/opensearch/test_opensearch.py @@ -0,0 +1,214 @@ +# WARNING: this file is auto-generated by 'build_sync_library.py' +# from the original file 'test_opensearch.py' +# DO NOT CHANGE! Change the original file instead. +import contextlib +from collections.abc import Generator +from datetime import datetime, timedelta, timezone +from typing import Any + +import pytest +from dirty_equals import IsFloat, IsStr +from inline_snapshot import snapshot +from key_value.shared.stores.wait import wait_for_true +from key_value.shared.utils.managed_entry import ManagedEntry +from opensearchpy import OpenSearch +from typing_extensions import override + +from key_value.sync.code_gen.stores.base import BaseStore +from key_value.sync.code_gen.stores.opensearch import OpenSearchStore +from key_value.sync.code_gen.stores.opensearch.store import ( + OpenSearchSerializationAdapter, + OpenSearchV1CollectionSanitizationStrategy, + OpenSearchV1KeySanitizationStrategy, +) +from tests.code_gen.conftest import docker_container, should_skip_docker_tests +from tests.code_gen.stores.base import BaseStoreTests, ContextManagerStoreTestMixin + +TEST_SIZE_LIMIT = 1 * 1024 * 1024 # 1MB +LOCALHOST = "localhost" + +CONTAINER_PORT = 9200 +HOST_PORT = 19201 + +OPENSEARCH_URL = f"http://{LOCALHOST}:{HOST_PORT}" + +WAIT_FOR_OPENSEARCH_TIMEOUT = 30 +# Released 2023 +# Recent stable version +OPENSEARCH_VERSIONS_TO_TEST = ["2.11.0", "2.18.0"] + + +def get_opensearch_client() -> OpenSearch: + return OpenSearch(hosts=[OPENSEARCH_URL], use_ssl=False, verify_certs=False) + + +def ping_opensearch() -> bool: + opensearch_client: OpenSearch = get_opensearch_client() + + with opensearch_client: + try: + return opensearch_client.ping() + except Exception: + return False + + +def cleanup_opensearch_indices(opensearch_client: OpenSearch): + with contextlib.suppress(Exception): + indices = opensearch_client.indices.get(index="opensearch-kv-store-e2e-test-*") + for index in indices: + _ = opensearch_client.indices.delete(index=index) + + +class OpenSearchFailedToStartError(Exception): + pass + + +def test_managed_entry_document_conversion(): + created_at = datetime(year=2025, month=1, day=1, hour=0, minute=0, second=0, tzinfo=timezone.utc) + expires_at = created_at + timedelta(seconds=10) + + managed_entry = ManagedEntry(value={"test": "test"}, created_at=created_at, expires_at=expires_at) + adapter = OpenSearchSerializationAdapter() + document = adapter.dump_dict(entry=managed_entry) + + assert document == snapshot( + { + "version": 1, + "value": {"flat": {"test": "test"}}, + "created_at": "2025-01-01T00:00:00+00:00", + "expires_at": "2025-01-01T00:00:10+00:00", + } + ) + + round_trip_managed_entry = adapter.load_dict(data=document) + + assert round_trip_managed_entry.value == managed_entry.value + assert round_trip_managed_entry.created_at == created_at + assert round_trip_managed_entry.ttl == IsFloat(lt=0) + assert round_trip_managed_entry.expires_at == expires_at + + +@pytest.mark.skipif(should_skip_docker_tests(), reason="Docker is not available") +@pytest.mark.filterwarnings("ignore:A configured store is unstable and may change in a backwards incompatible way. Use at your own risk.") +class TestOpenSearchStore(ContextManagerStoreTestMixin, BaseStoreTests): + @pytest.fixture(autouse=True, scope="session", params=OPENSEARCH_VERSIONS_TO_TEST) + def setup_opensearch(self, request: pytest.FixtureRequest) -> Generator[None, None, None]: + version = request.param + os_image = f"opensearchproject/opensearch:{version}" + + with docker_container( + f"opensearch-test-{version}", + os_image, + {str(CONTAINER_PORT): HOST_PORT}, + {"discovery.type": "single-node", "DISABLE_SECURITY_PLUGIN": "true", "OPENSEARCH_INITIAL_ADMIN_PASSWORD": "TestPassword123!"}, + ): + if not wait_for_true(bool_fn=ping_opensearch, tries=WAIT_FOR_OPENSEARCH_TIMEOUT, wait_time=2): + msg = f"OpenSearch {version} failed to start" + raise OpenSearchFailedToStartError(msg) + + yield + + @pytest.fixture + def opensearch_client(self, setup_opensearch: None) -> Generator[OpenSearch, None, None]: + opensearch_client = get_opensearch_client() + + with opensearch_client: + cleanup_opensearch_indices(opensearch_client=opensearch_client) + + yield opensearch_client + + @override + @pytest.fixture + def store(self, opensearch_client: OpenSearch) -> Generator[BaseStore, None, None]: + store = OpenSearchStore( + opensearch_client=opensearch_client, index_prefix="opensearch-kv-store-e2e-test", default_collection="test-collection" + ) + + with store: + yield store + + @override + @pytest.fixture + def sanitizing_store(self, opensearch_client: OpenSearch) -> Generator[BaseStore, None, None]: + store = OpenSearchStore( + opensearch_client=opensearch_client, + index_prefix="opensearch-kv-store-e2e-test", + default_collection="test-collection", + key_sanitization_strategy=OpenSearchV1KeySanitizationStrategy(), + collection_sanitization_strategy=OpenSearchV1CollectionSanitizationStrategy(), + ) + + with store: + yield store + + @pytest.mark.skip(reason="Distributed Caches are unbounded") + @override + def test_not_unbounded(self, store: BaseStore): ... + + @pytest.mark.skip(reason="Skip concurrent tests on distributed caches") + @override + def test_concurrent_operations(self, store: BaseStore): ... + + @override + def test_long_collection_name(self, store: OpenSearchStore, sanitizing_store: OpenSearchStore): # pyright: ignore[reportIncompatibleMethodOverride] + with pytest.raises(Exception): # noqa: B017, PT011 + store.put(collection="test_collection" * 100, key="test_key", value={"test": "test"}) + + sanitizing_store.put(collection="test_collection" * 100, key="test_key", value={"test": "test"}) + assert sanitizing_store.get(collection="test_collection" * 100, key="test_key") == {"test": "test"} + + @override + def test_long_key_name(self, store: OpenSearchStore, sanitizing_store: OpenSearchStore): # pyright: ignore[reportIncompatibleMethodOverride] + "Tests that a long key name will not raise an error." + with pytest.raises(Exception): # noqa: B017, PT011 + store.put(collection="test_collection", key="test_key" * 100, value={"test": "test"}) + + sanitizing_store.put(collection="test_collection", key="test_key" * 100, value={"test": "test"}) + assert sanitizing_store.get(collection="test_collection", key="test_key" * 100) == {"test": "test"} + + def test_put_put_two_indices(self, store: OpenSearchStore, opensearch_client: OpenSearch): + store.put(collection="test_collection", key="test_key", value={"test": "test"}) + store.put(collection="test_collection_2", key="test_key", value={"test": "test"}) + assert store.get(collection="test_collection", key="test_key") == {"test": "test"} + assert store.get(collection="test_collection_2", key="test_key") == {"test": "test"} + + indices: dict[str, Any] = opensearch_client.indices.get(index="opensearch-kv-store-e2e-test-*") + index_names: list[str] = list(indices.keys()) + assert index_names == snapshot(["opensearch-kv-store-e2e-test-test_collection", "opensearch-kv-store-e2e-test-test_collection_2"]) + + def test_value_stored_as_f_object(self, store: OpenSearchStore, opensearch_client: OpenSearch): + """Verify values are stored as f objects, not JSON strings""" + store.put(collection="test", key="test_key", value={"name": "Alice", "age": 30}) + + index_name = store._get_index_name(collection="test") # pyright: ignore[reportPrivateUsage] + doc_id = store._get_document_id(key="test_key") # pyright: ignore[reportPrivateUsage] + + response = opensearch_client.get(index=index_name, id=doc_id) + assert response["_source"] == snapshot( + { + "version": 1, + "key": "test_key", + "collection": "test", + "value": {"flat": {"name": "Alice", "age": 30}}, + "created_at": IsStr(min_length=20, max_length=40), + } + ) + + # Test with TTL + store.put(collection="test", key="test_key", value={"name": "Bob", "age": 25}, ttl=10) + response = opensearch_client.get(index=index_name, id=doc_id) + assert response["_source"] == snapshot( + { + "version": 1, + "key": "test_key", + "collection": "test", + "value": {"flat": {"name": "Bob", "age": 25}}, + "created_at": IsStr(min_length=20, max_length=40), + "expires_at": IsStr(min_length=20, max_length=40), + } + ) + + @override + def test_special_characters_in_collection_name(self, store: OpenSearchStore, sanitizing_store: OpenSearchStore): # pyright: ignore[reportIncompatibleMethodOverride] + "Tests that a special characters in the collection name will not raise an error." + super().test_special_characters_in_collection_name(store=sanitizing_store) diff --git a/scripts/build_sync_library.py b/scripts/build_sync_library.py index e1f6d9a4..3bfbeab6 100644 --- a/scripts/build_sync_library.py +++ b/scripts/build_sync_library.py @@ -221,6 +221,7 @@ class RenameAsyncToSync(ast.NodeTransformer): # type: ignore "__aiter__": "__iter__", "asyncio.locks": "threading", "AsyncElasticsearch": "Elasticsearch", + "AsyncOpenSearch": "OpenSearch", "AsyncDatabase": "Database", "AsyncCollection": "Collection", "AsyncMongoClient": "MongoClient", @@ -408,6 +409,16 @@ def _manage_async_generator(self, node: ast.Subscript) -> ast.AST | None: pass return None + def visit_Constant(self, node: ast.Constant) -> ast.AST: + # Transform string literals containing package names + if isinstance(node.value, str): + # Replace py-key-value-aio with py-key-value-sync + node.value = node.value.replace("py-key-value-aio", "py-key-value-sync") + # Remove [async] extras from package install instructions + node.value = node.value.replace("opensearch-py[async]", "opensearch-py") + self.generic_visit(node) + return node + class BlanksInserter(ast.NodeTransformer): # type: ignore """ diff --git a/uv.lock b/uv.lock index 5eff3617..a143b65c 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.10" resolution-markers = [ "python_full_version >= '3.12' and sys_platform != 'win32'", @@ -801,6 +801,14 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cf/4c/c0c95d3d881732a5d1b28e12c9be4dea5953ade71810f94565bd5bd2101a/elasticsearch-9.1.1-py3-none-any.whl", hash = "sha256:2a5c27c57ca3dd3365f665c82c9dcd8666ccfb550d5b07c688c21ec636c104e5", size = 937483, upload-time = "2025-09-12T13:27:34.948Z" }, ] +[[package]] +name = "events" +version = "0.5" +source = { registry = "https://pypi.org/simple" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/25/ed/e47dec0626edd468c84c04d97769e7ab4ea6457b7f54dcb3f72b17fcd876/Events-0.5-py3-none-any.whl", hash = "sha256:a7286af378ba3e46640ac9825156c93bdba7502174dd696090fdfcd4d80a1abd", size = 6758, upload-time = "2023-07-31T08:23:13.645Z" }, +] + [[package]] name = "exceptiongroup" version = "1.3.0" @@ -1525,6 +1533,27 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/42/b1/6a4eb2c6e9efa028074b0001b61008c9d202b6b46caee9e5d1b18c088216/nodejs_wheel_binaries-22.20.0-py2.py3-none-win_arm64.whl", hash = "sha256:1fccac931faa210d22b6962bcdbc99269d16221d831b9a118bbb80fe434a60b8", size = 38844133, upload-time = "2025-09-26T09:47:57.357Z" }, ] +[[package]] +name = "opensearch-py" +version = "3.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "certifi" }, + { name = "events" }, + { name = "python-dateutil" }, + { name = "requests" }, + { name = "urllib3" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b8/58/ecec7f855aae7bcfb08f570088c6cb993f68c361a0727abab35dbf021acb/opensearch_py-3.0.0.tar.gz", hash = "sha256:ebb38f303f8a3f794db816196315bcddad880be0dc75094e3334bc271db2ed39", size = 248890, upload-time = "2025-06-17T05:39:48.453Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/71/e0/69fd114c607b0323d3f864ab4a5ecb87d76ec5a172d2e36a739c8baebea1/opensearch_py-3.0.0-py3-none-any.whl", hash = "sha256:842bf5d56a4a0d8290eda9bb921c50f3080e5dc4e5fefb9c9648289da3f6a8bb", size = 371491, upload-time = "2025-06-17T05:39:46.539Z" }, +] + +[package.optional-dependencies] +async = [ + { name = "aiohttp" }, +] + [[package]] name = "packaging" version = "25.0" @@ -1806,6 +1835,9 @@ memory = [ mongodb = [ { name = "pymongo" }, ] +opensearch = [ + { name = "opensearch-py", extra = ["async"] }, +] pydantic = [ { name = "pydantic" }, ] @@ -1829,7 +1861,7 @@ wrappers-encryption = [ [package.dev-dependencies] dev = [ { name = "py-key-value", extra = ["dev"] }, - { name = "py-key-value-aio", extra = ["disk", "duckdb", "dynamodb", "elasticsearch", "filetree", "keyring", "memcached", "memory", "mongodb", "pydantic", "redis", "rocksdb", "vault", "wrappers-encryption"] }, + { name = "py-key-value-aio", extra = ["disk", "duckdb", "dynamodb", "elasticsearch", "filetree", "keyring", "memcached", "memory", "mongodb", "opensearch", "pydantic", "redis", "rocksdb", "vault", "wrappers-encryption"] }, { name = "py-key-value-aio", extra = ["valkey"], marker = "sys_platform != 'win32'" }, ] @@ -1850,6 +1882,7 @@ requires-dist = [ { name = "hvac", marker = "extra == 'vault'", specifier = ">=2.3.0" }, { name = "keyring", marker = "extra == 'keyring'", specifier = ">=25.6.0" }, { name = "keyring", marker = "extra == 'keyring-linux'", specifier = ">=25.6.0" }, + { name = "opensearch-py", extras = ["async"], marker = "extra == 'opensearch'", specifier = ">=2.0.0" }, { name = "pathvalidate", marker = "extra == 'disk'", specifier = ">=3.3.1" }, { name = "py-key-value-shared", editable = "key-value/key-value-shared" }, { name = "pydantic", marker = "extra == 'pydantic'", specifier = ">=2.11.9" }, @@ -1862,13 +1895,13 @@ requires-dist = [ { name = "types-hvac", marker = "extra == 'vault'", specifier = ">=2.3.0" }, { name = "valkey-glide", marker = "extra == 'valkey'", specifier = ">=2.1.0" }, ] -provides-extras = ["memory", "disk", "filetree", "redis", "mongodb", "valkey", "vault", "memcached", "elasticsearch", "dynamodb", "keyring", "keyring-linux", "pydantic", "rocksdb", "duckdb", "wrappers-encryption"] +provides-extras = ["memory", "disk", "filetree", "redis", "mongodb", "valkey", "vault", "memcached", "elasticsearch", "opensearch", "dynamodb", "keyring", "keyring-linux", "pydantic", "rocksdb", "duckdb", "wrappers-encryption"] [package.metadata.requires-dev] dev = [ { name = "py-key-value", extras = ["dev"], editable = "." }, { name = "py-key-value-aio", extras = ["keyring"] }, - { name = "py-key-value-aio", extras = ["memory", "disk", "filetree", "redis", "elasticsearch", "memcached", "mongodb", "vault", "dynamodb", "rocksdb", "duckdb"] }, + { name = "py-key-value-aio", extras = ["memory", "disk", "filetree", "redis", "elasticsearch", "opensearch", "memcached", "mongodb", "vault", "dynamodb", "rocksdb", "duckdb"] }, { name = "py-key-value-aio", extras = ["pydantic"] }, { name = "py-key-value-aio", extras = ["valkey"], marker = "sys_platform != 'win32'" }, { name = "py-key-value-aio", extras = ["wrappers-encryption"] }, @@ -1975,6 +2008,9 @@ memory = [ mongodb = [ { name = "pymongo" }, ] +opensearch = [ + { name = "opensearch-py", extra = ["async"] }, +] pydantic = [ { name = "pydantic" }, ] @@ -1998,7 +2034,7 @@ wrappers-encryption = [ [package.dev-dependencies] dev = [ { name = "py-key-value", extra = ["dev"] }, - { name = "py-key-value-sync", extra = ["disk", "duckdb", "elasticsearch", "filetree", "keyring", "memcached", "memory", "mongodb", "pydantic", "redis", "rocksdb", "vault", "wrappers-encryption"] }, + { name = "py-key-value-sync", extra = ["disk", "duckdb", "elasticsearch", "filetree", "keyring", "memcached", "memory", "mongodb", "opensearch", "pydantic", "redis", "rocksdb", "vault", "wrappers-encryption"] }, { name = "py-key-value-sync", extra = ["valkey"], marker = "sys_platform != 'win32'" }, ] @@ -2017,6 +2053,7 @@ requires-dist = [ { name = "hvac", marker = "extra == 'vault'", specifier = ">=2.3.0" }, { name = "keyring", marker = "extra == 'keyring'", specifier = ">=25.6.0" }, { name = "keyring", marker = "extra == 'keyring-linux'", specifier = ">=25.6.0" }, + { name = "opensearch-py", extras = ["async"], marker = "extra == 'opensearch'", specifier = ">=2.0.0" }, { name = "pathvalidate", marker = "extra == 'disk'", specifier = ">=3.3.1" }, { name = "py-key-value-shared", editable = "key-value/key-value-shared" }, { name = "pydantic", marker = "extra == 'pydantic'", specifier = ">=2.11.9" }, @@ -2028,13 +2065,13 @@ requires-dist = [ { name = "types-hvac", marker = "extra == 'vault'", specifier = ">=2.3.0" }, { name = "valkey-glide-sync", marker = "extra == 'valkey'", specifier = ">=2.1.0" }, ] -provides-extras = ["memory", "disk", "filetree", "redis", "mongodb", "valkey", "vault", "memcached", "elasticsearch", "pydantic", "keyring", "keyring-linux", "rocksdb", "duckdb", "wrappers-encryption"] +provides-extras = ["memory", "disk", "filetree", "redis", "mongodb", "valkey", "vault", "memcached", "elasticsearch", "opensearch", "pydantic", "keyring", "keyring-linux", "rocksdb", "duckdb", "wrappers-encryption"] [package.metadata.requires-dev] dev = [ { name = "py-key-value", extras = ["dev"], editable = "." }, { name = "py-key-value-sync", extras = ["keyring"] }, - { name = "py-key-value-sync", extras = ["memory", "disk", "filetree", "redis", "elasticsearch", "memcached", "mongodb", "vault", "rocksdb", "duckdb"] }, + { name = "py-key-value-sync", extras = ["memory", "disk", "filetree", "redis", "elasticsearch", "opensearch", "memcached", "mongodb", "vault", "rocksdb", "duckdb"] }, { name = "py-key-value-sync", extras = ["pydantic"] }, { name = "py-key-value-sync", extras = ["valkey"], marker = "sys_platform != 'win32'" }, { name = "py-key-value-sync", extras = ["wrappers-encryption"] },