diff --git a/CHANGELOG.md b/CHANGELOG.md index 833900d4..a11c33ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- Added dynamic queryables mapping for search and aggregations [#375](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/375) - Added configurable landing page ID `STAC_FASTAPI_LANDING_PAGE_ID` [#352](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/352) - Added support for `S_CONTAINS`, `S_WITHIN`, `S_DISJOINT` spatial filter operations [#371](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/issues/371) - Introduced the `DATABASE_REFRESH` environment variable to control whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. [#370](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/370) diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 987acdf6..05212f5b 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -607,7 +607,7 @@ async def post_search( if hasattr(search_request, "filter_expr"): cql2_filter = getattr(search_request, "filter_expr", None) try: - search = self.database.apply_cql2_filter(search, cql2_filter) + search = await self.database.apply_cql2_filter(search, cql2_filter) except Exception as e: raise HTTPException( status_code=400, detail=f"Error with cql2_json filter: {e}" diff --git a/stac_fastapi/core/stac_fastapi/core/extensions/aggregation.py b/stac_fastapi/core/stac_fastapi/core/extensions/aggregation.py index 43bd543c..d41d763c 100644 --- a/stac_fastapi/core/stac_fastapi/core/extensions/aggregation.py +++ b/stac_fastapi/core/stac_fastapi/core/extensions/aggregation.py @@ -467,7 +467,7 @@ async def aggregate( if aggregate_request.filter_expr: try: - search = self.database.apply_cql2_filter( + search = await self.database.apply_cql2_filter( search, aggregate_request.filter_expr ) except Exception as e: diff --git a/stac_fastapi/core/stac_fastapi/core/extensions/filter.py b/stac_fastapi/core/stac_fastapi/core/extensions/filter.py index a74eff99..078e7fbf 100644 --- a/stac_fastapi/core/stac_fastapi/core/extensions/filter.py +++ b/stac_fastapi/core/stac_fastapi/core/extensions/filter.py @@ -91,20 +91,7 @@ class SpatialOp(str, Enum): S_DISJOINT = "s_disjoint" -queryables_mapping = { - "id": "id", - "collection": "collection", - "geometry": "geometry", - "datetime": "properties.datetime", - "created": "properties.created", - "updated": "properties.updated", - "cloud_cover": "properties.eo:cloud_cover", - "cloud_shadow_percentage": "properties.s2:cloud_shadow_percentage", - "nodata_pixel_percentage": "properties.s2:nodata_pixel_percentage", -} - - -def to_es_field(field: str) -> str: +def to_es_field(queryables_mapping: Dict[str, Any], field: str) -> str: """ Map a given field to its corresponding Elasticsearch field according to a predefined mapping. @@ -117,7 +104,7 @@ def to_es_field(field: str) -> str: return queryables_mapping.get(field, field) -def to_es(query: Dict[str, Any]) -> Dict[str, Any]: +def to_es(queryables_mapping: Dict[str, Any], query: Dict[str, Any]) -> Dict[str, Any]: """ Transform a simplified CQL2 query structure to an Elasticsearch compatible query DSL. @@ -133,7 +120,13 @@ def to_es(query: Dict[str, Any]) -> Dict[str, Any]: LogicalOp.OR: "should", LogicalOp.NOT: "must_not", }[query["op"]] - return {"bool": {bool_type: [to_es(sub_query) for sub_query in query["args"]]}} + return { + "bool": { + bool_type: [ + to_es(queryables_mapping, sub_query) for sub_query in query["args"] + ] + } + } elif query["op"] in [ ComparisonOp.EQ, @@ -150,7 +143,7 @@ def to_es(query: Dict[str, Any]) -> Dict[str, Any]: ComparisonOp.GTE: "gte", } - field = to_es_field(query["args"][0]["property"]) + field = to_es_field(queryables_mapping, query["args"][0]["property"]) value = query["args"][1] if isinstance(value, dict) and "timestamp" in value: value = value["timestamp"] @@ -173,11 +166,11 @@ def to_es(query: Dict[str, Any]) -> Dict[str, Any]: return {"range": {field: {range_op[query["op"]]: value}}} elif query["op"] == ComparisonOp.IS_NULL: - field = to_es_field(query["args"][0]["property"]) + field = to_es_field(queryables_mapping, query["args"][0]["property"]) return {"bool": {"must_not": {"exists": {"field": field}}}} elif query["op"] == AdvancedComparisonOp.BETWEEN: - field = to_es_field(query["args"][0]["property"]) + field = to_es_field(queryables_mapping, query["args"][0]["property"]) gte, lte = query["args"][1], query["args"][2] if isinstance(gte, dict) and "timestamp" in gte: gte = gte["timestamp"] @@ -186,14 +179,14 @@ def to_es(query: Dict[str, Any]) -> Dict[str, Any]: return {"range": {field: {"gte": gte, "lte": lte}}} elif query["op"] == AdvancedComparisonOp.IN: - field = to_es_field(query["args"][0]["property"]) + field = to_es_field(queryables_mapping, query["args"][0]["property"]) values = query["args"][1] if not isinstance(values, list): raise ValueError(f"Arg {values} is not a list") return {"terms": {field: values}} elif query["op"] == AdvancedComparisonOp.LIKE: - field = to_es_field(query["args"][0]["property"]) + field = to_es_field(queryables_mapping, query["args"][0]["property"]) pattern = cql2_like_to_es(query["args"][1]) return {"wildcard": {field: {"value": pattern, "case_insensitive": True}}} @@ -203,7 +196,7 @@ def to_es(query: Dict[str, Any]) -> Dict[str, Any]: SpatialOp.S_WITHIN, SpatialOp.S_DISJOINT, ]: - field = to_es_field(query["args"][0]["property"]) + field = to_es_field(queryables_mapping, query["args"][0]["property"]) geometry = query["args"][1] relation_mapping = { diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 7afbb58d..958ee597 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -290,6 +290,34 @@ async def get_one_item(self, collection_id: str, item_id: str) -> Dict: ) return item["_source"] + async def get_queryables_mapping(self, collection_id: str = "*") -> dict: + """Retrieve mapping of Queryables for search. + + Args: + collection_id (str, optional): The id of the Collection the Queryables + belongs to. Defaults to "*". + + Returns: + dict: A dictionary containing the Queryables mappings. + """ + queryables_mapping = {} + + mappings = await self.client.indices.get_mapping( + index=f"{ITEMS_INDEX_PREFIX}{collection_id}", + ) + + for mapping in mappings.values(): + fields = mapping["mappings"].get("properties", {}) + properties = fields.pop("properties", {}).get("properties", {}).keys() + + for field_key in fields: + queryables_mapping[field_key] = field_key + + for property_key in properties: + queryables_mapping[property_key] = f"properties.{property_key}" + + return queryables_mapping + @staticmethod def make_search(): """Database logic to create a Search instance.""" @@ -518,8 +546,9 @@ def apply_free_text_filter(search: Search, free_text_queries: Optional[List[str] return search - @staticmethod - def apply_cql2_filter(search: Search, _filter: Optional[Dict[str, Any]]): + async def apply_cql2_filter( + self, search: Search, _filter: Optional[Dict[str, Any]] + ): """ Apply a CQL2 filter to an Elasticsearch Search object. @@ -539,7 +568,7 @@ def apply_cql2_filter(search: Search, _filter: Optional[Dict[str, Any]]): otherwise the original Search object. """ if _filter is not None: - es_query = filter.to_es(_filter) + es_query = filter.to_es(await self.get_queryables_mapping(), _filter) search = search.query(es_query) return search diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 5b9510f3..71ab9275 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -307,6 +307,34 @@ async def get_one_item(self, collection_id: str, item_id: str) -> Dict: ) return item["_source"] + async def get_queryables_mapping(self, collection_id: str = "*") -> dict: + """Retrieve mapping of Queryables for search. + + Args: + collection_id (str, optional): The id of the Collection the Queryables + belongs to. Defaults to "*". + + Returns: + dict: A dictionary containing the Queryables mappings. + """ + queryables_mapping = {} + + mappings = await self.client.indices.get_mapping( + index=f"{ITEMS_INDEX_PREFIX}{collection_id}", + ) + + for mapping in mappings.values(): + fields = mapping["mappings"].get("properties", {}) + properties = fields.pop("properties", {}).get("properties", {}).keys() + + for field_key in fields: + queryables_mapping[field_key] = field_key + + for property_key in properties: + queryables_mapping[property_key] = f"properties.{property_key}" + + return queryables_mapping + @staticmethod def make_search(): """Database logic to create a Search instance.""" @@ -535,8 +563,9 @@ def apply_stacql_filter(search: Search, op: str, field: str, value: float): return search - @staticmethod - def apply_cql2_filter(search: Search, _filter: Optional[Dict[str, Any]]): + async def apply_cql2_filter( + self, search: Search, _filter: Optional[Dict[str, Any]] + ): """ Apply a CQL2 filter to an Opensearch Search object. @@ -556,7 +585,7 @@ def apply_cql2_filter(search: Search, _filter: Optional[Dict[str, Any]]): otherwise the original Search object. """ if _filter is not None: - es_query = filter.to_es(_filter) + es_query = filter.to_es(await self.get_queryables_mapping(), _filter) search = search.filter(es_query) return search diff --git a/stac_fastapi/tests/extensions/test_filter.py b/stac_fastapi/tests/extensions/test_filter.py index ae355c3a..fb6bc850 100644 --- a/stac_fastapi/tests/extensions/test_filter.py +++ b/stac_fastapi/tests/extensions/test_filter.py @@ -163,7 +163,7 @@ async def test_search_filter_ext_and_get_cql2text_id(app_client, ctx): async def test_search_filter_ext_and_get_cql2text_cloud_cover(app_client, ctx): collection = ctx.item["collection"] cloud_cover = ctx.item["properties"]["eo:cloud_cover"] - filter = f"cloud_cover={cloud_cover} AND collection='{collection}'" + filter = f"eo:cloud_cover={cloud_cover} AND collection='{collection}'" resp = await app_client.get(f"/search?filter-lang=cql2-text&filter={filter}") assert resp.status_code == 200 @@ -176,7 +176,7 @@ async def test_search_filter_ext_and_get_cql2text_cloud_cover_no_results( ): collection = ctx.item["collection"] cloud_cover = ctx.item["properties"]["eo:cloud_cover"] + 1 - filter = f"cloud_cover={cloud_cover} AND collection='{collection}'" + filter = f"eo:cloud_cover={cloud_cover} AND collection='{collection}'" resp = await app_client.get(f"/search?filter-lang=cql2-text&filter={filter}") assert resp.status_code == 200