Skip to content

Commit 7574013

Browse files
authored
Adding backwards compatibility for asset indexing (stac-utils#433)
**Related Issue(s):** - ~stac-utils#425~ - stac-utils#428 **Description:** - Adds flag for indexing assets. - ~Adds tests for serializers.~ - Adds scripts for reindexing for mapping changes. **PR Checklist:** - [x] Code is formatted and linted (run `pre-commit run --all-files`) - [ ] Tests pass (run `make test`) - [ ] Documentation has been updated to reflect changes, if applicable - [ ] Changes are added to the changelog
1 parent 5723bba commit 7574013

File tree

6 files changed

+244
-32
lines changed

6 files changed

+244
-32
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1010

1111
### Added
1212

13+
- `STAC_INDEX_ASSETS` environment variable to allow asset serialization to be configurable. [#433](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/433)
1314
- Added the `ENV_MAX_LIMIT` environment variable to SFEOS, allowing overriding of the `MAX_LIMIT`, which controls the `?limit` parameter for returned items and STAC collections. [#434](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/434)
1415
- Updated the `format_datetime_range` function to support milliseconds. [#423](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/423)
1516

README.md

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -66,26 +66,43 @@ This project is built on the following technologies: STAC, stac-fastapi, FastAPI
6666

6767
## Table of Contents
6868

69-
- [Documentation & Resources](#documentation--resources)
70-
- [Package Structure](#package-structure)
71-
- [Examples](#examples)
72-
- [Performance](#performance)
73-
- [Quick Start](#quick-start)
74-
- [Installation](#installation)
75-
- [Running Locally](#running-locally)
76-
- [Configuration reference](#configuration-reference)
77-
- [Interacting with the API](#interacting-with-the-api)
78-
- [Configure the API](#configure-the-api)
79-
- [Collection pagination](#collection-pagination)
80-
- [Ingesting Sample Data CLI Tool](#ingesting-sample-data-cli-tool)
81-
- [Elasticsearch Mappings](#elasticsearch-mappings)
82-
- [Managing Elasticsearch Indices](#managing-elasticsearch-indices)
83-
- [Snapshots](#snapshots)
84-
- [Reindexing](#reindexing)
85-
- [Auth](#auth)
86-
- [Aggregation](#aggregation)
87-
- [Rate Limiting](#rate-limiting)
88-
- [Datetime-Based Index Management](#datetime-based-index-management)
69+
- [stac-fastapi-elasticsearch-opensearch](#stac-fastapi-elasticsearch-opensearch)
70+
- [Sponsors \& Supporters](#sponsors--supporters)
71+
- [Project Introduction - What is SFEOS?](#project-introduction---what-is-sfeos)
72+
- [Common Deployment Patterns](#common-deployment-patterns)
73+
- [Technologies](#technologies)
74+
- [Table of Contents](#table-of-contents)
75+
- [Documentation \& Resources](#documentation--resources)
76+
- [Package Structure](#package-structure)
77+
- [Examples](#examples)
78+
- [Performance](#performance)
79+
- [Direct Response Mode](#direct-response-mode)
80+
- [Quick Start](#quick-start)
81+
- [Installation](#installation)
82+
- [Running Locally](#running-locally)
83+
- [Using Pre-built Docker Images](#using-pre-built-docker-images)
84+
- [Using Docker Compose](#using-docker-compose)
85+
- [Configuration Reference](#configuration-reference)
86+
- [Datetime-Based Index Management](#datetime-based-index-management)
87+
- [Overview](#overview)
88+
- [When to Use](#when-to-use)
89+
- [Configuration](#configuration)
90+
- [Enabling Datetime-Based Indexing](#enabling-datetime-based-indexing)
91+
- [Related Configuration Variables](#related-configuration-variables)
92+
- [How Datetime-Based Indexing Works](#how-datetime-based-indexing-works)
93+
- [Index and Alias Naming Convention](#index-and-alias-naming-convention)
94+
- [Index Size Management](#index-size-management)
95+
- [Interacting with the API](#interacting-with-the-api)
96+
- [Configure the API](#configure-the-api)
97+
- [Collection Pagination](#collection-pagination)
98+
- [Ingesting Sample Data CLI Tool](#ingesting-sample-data-cli-tool)
99+
- [Elasticsearch Mappings](#elasticsearch-mappings)
100+
- [Managing Elasticsearch Indices](#managing-elasticsearch-indices)
101+
- [Snapshots](#snapshots)
102+
- [Reindexing](#reindexing)
103+
- [Auth](#auth)
104+
- [Aggregation](#aggregation)
105+
- [Rate Limiting](#rate-limiting)
89106

90107
## Documentation & Resources
91108

@@ -228,6 +245,7 @@ You can customize additional settings in your `.env` file:
228245
| `DATABASE_REFRESH` | Controls whether database operations refresh the index immediately after changes. If set to `true`, changes will be immediately searchable. If set to `false`, changes may not be immediately visible but can improve performance for bulk operations. If set to `wait_for`, changes will wait for the next refresh cycle to become visible. | `false` | Optional |
229246
| `ENABLE_TRANSACTIONS_EXTENSIONS` | Enables or disables the Transactions and Bulk Transactions API extensions. If set to `false`, the POST `/collections` route and related transaction endpoints (including bulk transaction operations) will be unavailable in the API. This is useful for deployments where mutating the catalog via the API should be prevented. | `true` | Optional |
230247
| `STAC_ITEM_LIMIT` | Sets the environment variable for result limiting to SFEOS for the number of returned items and STAC collections. | `10` | Optional |
248+
| `STAC_INDEX_ASSETS` | Controls if Assets are indexed when added to Elasticsearch/Opensearch. This allows asset fields to be included in search queries. | `false` | Optional |
231249
| `ENV_MAX_LIMIT` | Configures the environment variable in SFEOS to override the default `MAX_LIMIT`, which controls the limit parameter for returned items and STAC collections. | `10,000` | Optional |
232250

233251
> [!NOTE]

scripts/reindex_elasticsearch.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import asyncio
2+
import time
3+
4+
from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings
5+
from stac_fastapi.elasticsearch.database_logic import create_index_templates
6+
from stac_fastapi.sfeos_helpers.mappings import COLLECTIONS_INDEX, ITEMS_INDEX_PREFIX
7+
8+
9+
async def reindex(client, index, new_index, aliases):
10+
"""Reindex STAC index"""
11+
print(f"reindexing {index} to {new_index}")
12+
13+
await client.options(ignore_status=400).indices.create(index=new_index)
14+
15+
reindex_resp = await client.reindex(
16+
dest={"index": new_index},
17+
source={"index": [index]},
18+
wait_for_completion=False,
19+
script={
20+
"source": "if (ctx._source.containsKey('assets')){List l = new ArrayList();for (key in ctx._source.assets.keySet()) {def item = ctx._source.assets[key]; item['es_key'] = key; l.add(item)}ctx._source.assets=l} if (ctx._source.containsKey('item_assets')){ List a = new ArrayList(); for (key in ctx._source.item_assets.keySet()) {def item = ctx._source.item_assets[key]; item['es_key'] = key; a.add(item)}ctx._source.item_assets=a}",
21+
"lang": "painless",
22+
},
23+
)
24+
25+
task_id = reindex_resp["task"]
26+
27+
reindex_complete = False
28+
while not reindex_complete:
29+
task_resp = await client.tasks.get(task_id=task_id)
30+
31+
if "completed" in task_resp and task_resp["completed"]:
32+
reindex_complete = True
33+
34+
elif "error" in task_resp:
35+
reindex_complete = True
36+
print(f"Reindex failed for {index} with error: {task_resp['error']}")
37+
38+
else:
39+
time.sleep(60)
40+
41+
actions = []
42+
for alias in aliases["aliases"]:
43+
actions.extend(
44+
[
45+
{"add": {"index": new_index, "alias": alias}},
46+
{"remove": {"index": index, "alias": alias}},
47+
]
48+
)
49+
50+
await client.indices.update_aliases(actions=actions)
51+
52+
53+
async def run():
54+
"""Reindex all STAC indexes for mapping update"""
55+
client = AsyncElasticsearchSettings().create_client
56+
57+
await create_index_templates()
58+
59+
collection_response = await client.indices.get_alias(name=COLLECTIONS_INDEX)
60+
collections = await client.search(index=COLLECTIONS_INDEX)
61+
62+
collection_index, collection_aliases = next(iter(collection_response.items()))
63+
collection_index_name, version = collection_index.rsplit("-", 1)
64+
new_collection_index = f"{collection_index_name}-{str(int(version) + 1).zfill(6)}"
65+
66+
await reindex(client, collection_index, new_collection_index, collection_aliases)
67+
68+
for collection in collections["hits"]["hits"]:
69+
70+
item_indexes = await client.indices.get_alias(
71+
name=f"{ITEMS_INDEX_PREFIX}{collection['_id']}*"
72+
)
73+
74+
for item_index, aliases in item_indexes.items():
75+
item_index_name, version = item_index.rsplit("-", 1)
76+
new_item_index = f"{item_index_name}-{str(int(version) + 1).zfill(6)}"
77+
78+
await reindex(client, item_index, new_item_index, aliases)
79+
80+
await client.close()
81+
82+
83+
if __name__ == "__main__":
84+
asyncio.run(run())

scripts/reindex_opensearch.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import asyncio
2+
import time
3+
4+
from stac_fastapi.opensearch.config import AsyncOpensearchSettings
5+
from stac_fastapi.opensearch.database_logic import create_index_templates
6+
from stac_fastapi.sfeos_helpers.mappings import COLLECTIONS_INDEX, ITEMS_INDEX_PREFIX
7+
8+
9+
async def reindex(client, index, new_index, aliases):
10+
"""Reindex STAC index"""
11+
print(f"reindexing {index} to {new_index}")
12+
13+
await client.options(ignore_status=400).indices.create(index=new_index)
14+
15+
reindex_resp = await client.reindex(
16+
dest={"index": new_index},
17+
source={"index": [index]},
18+
wait_for_completion=False,
19+
script={
20+
"source": "if (ctx._source.containsKey('assets')){List l = new ArrayList();for (key in ctx._source.assets.keySet()) {def item = ctx._source.assets[key]; item['es_key'] = key; l.add(item)}ctx._source.assets=l} if (ctx._source.containsKey('item_assets')){ List a = new ArrayList(); for (key in ctx._source.item_assets.keySet()) {def item = ctx._source.item_assets[key]; item['es_key'] = key; a.add(item)}ctx._source.item_assets=a}",
21+
"lang": "painless",
22+
},
23+
)
24+
25+
task_id = reindex_resp["task"]
26+
27+
reindex_complete = False
28+
while not reindex_complete:
29+
task_resp = await client.tasks.get(task_id=task_id)
30+
31+
if "completed" in task_resp and task_resp["completed"]:
32+
reindex_complete = True
33+
34+
elif "error" in task_resp:
35+
reindex_complete = True
36+
print(f"Reindex failed for {index} with error: {task_resp['error']}")
37+
38+
else:
39+
time.sleep(60)
40+
41+
actions = []
42+
for alias in aliases["aliases"]:
43+
actions.extend(
44+
[
45+
{"add": {"index": new_index, "alias": alias}},
46+
{"remove": {"index": index, "alias": alias}},
47+
]
48+
)
49+
50+
await client.indices.update_aliases(actions=actions)
51+
52+
53+
async def run():
54+
"""Reindex all STAC indexes for mapping update"""
55+
client = AsyncOpensearchSettings().create_client
56+
57+
await create_index_templates()
58+
59+
collection_response = await client.indices.get_alias(name=COLLECTIONS_INDEX)
60+
collections = await client.search(index=COLLECTIONS_INDEX)
61+
62+
collection_index, collection_aliases = next(iter(collection_response.items()))
63+
collection_index_name, version = collection_index.rsplit("-", 1)
64+
new_collection_index = f"{collection_index_name}-{str(int(version) + 1).zfill(6)}"
65+
66+
await reindex(client, collection_index, new_collection_index, collection_aliases)
67+
68+
for collection in collections["hits"]["hits"]:
69+
70+
item_indexes = await client.indices.get_alias(
71+
name=f"{ITEMS_INDEX_PREFIX}{collection['_id']}*"
72+
)
73+
74+
for item_index, aliases in item_indexes.items():
75+
item_index_name, version = item_index.rsplit("-", 1)
76+
new_item_index = f"{item_index_name}-{str(int(version) + 1).zfill(6)}"
77+
78+
await reindex(client, item_index, new_item_index, aliases)
79+
80+
await client.close()
81+
82+
83+
if __name__ == "__main__":
84+
asyncio.run(run())

stac_fastapi/core/stac_fastapi/core/serializers.py

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from stac_fastapi.core.datetime_utils import now_to_rfc3339_str
1111
from stac_fastapi.core.models.links import CollectionLinks
12+
from stac_fastapi.core.utilities import get_bool_env
1213
from stac_fastapi.types import stac as stac_types
1314
from stac_fastapi.types.links import ItemLinks, resolve_links
1415

@@ -66,9 +67,10 @@ def stac_to_db(cls, stac_data: stac_types.Item, base_url: str) -> stac_types.Ite
6667
item_links = resolve_links(stac_data.get("links", []), base_url)
6768
stac_data["links"] = item_links
6869

69-
stac_data["assets"] = [
70-
{"es_key": k, **v} for k, v in stac_data.get("assets", {}).items()
71-
]
70+
if get_bool_env("STAC_INDEX_ASSETS"):
71+
stac_data["assets"] = [
72+
{"es_key": k, **v} for k, v in stac_data.get("assets", {}).items()
73+
]
7274

7375
now = now_to_rfc3339_str()
7476
if "created" not in stac_data["properties"]:
@@ -97,6 +99,12 @@ def db_to_stac(cls, item: dict, base_url: str) -> stac_types.Item:
9799
if original_links:
98100
item_links += resolve_links(original_links, base_url)
99101

102+
if get_bool_env("STAC_INDEX_ASSETS"):
103+
assets = {a.pop("es_key"): a for a in item.get("assets", [])}
104+
105+
else:
106+
assets = item.get("assets", {})
107+
100108
return stac_types.Item(
101109
type="Feature",
102110
stac_version=item.get("stac_version", ""),
@@ -107,7 +115,7 @@ def db_to_stac(cls, item: dict, base_url: str) -> stac_types.Item:
107115
bbox=item.get("bbox", []),
108116
properties=item.get("properties", {}),
109117
links=item_links,
110-
assets={a.pop("es_key"): a for a in item.get("assets", [])},
118+
assets=assets,
111119
)
112120

113121

@@ -132,9 +140,15 @@ def stac_to_db(
132140
collection["links"] = resolve_links(
133141
collection.get("links", []), str(request.base_url)
134142
)
135-
collection["assets"] = [
136-
{"es_key": k, **v} for k, v in collection.get("assets", {}).items()
137-
]
143+
144+
if get_bool_env("STAC_INDEX_ASSETS"):
145+
collection["assets"] = [
146+
{"es_key": k, **v} for k, v in collection.get("assets", {}).items()
147+
]
148+
collection["item_assets"] = [
149+
{"es_key": k, **v} for k, v in collection.get("item_assets", {}).items()
150+
]
151+
138152
return collection
139153

140154
@classmethod
@@ -181,9 +195,18 @@ def db_to_stac(
181195
collection_links += resolve_links(original_links, str(request.base_url))
182196
collection["links"] = collection_links
183197

184-
collection["assets"] = {
185-
a.pop("es_key"): a for a in collection.get("assets", [])
186-
}
198+
if get_bool_env("STAC_INDEX_ASSETS"):
199+
collection["assets"] = {
200+
a.pop("es_key"): a for a in collection.get("assets", [])
201+
}
202+
collection["item_assets"] = {
203+
i.pop("es_key"): i for i in collection.get("item_assets", [])
204+
}
205+
206+
else:
207+
collection["assets"] = collection.get("assets", {})
208+
if item_assets := collection.get("item_assets"):
209+
collection["item_assets"] = item_assets
187210

188211
# Return the stac_types.Collection object
189212
return stac_types.Collection(**collection)

stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/mappings.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import os
2929
from typing import Any, Dict, Literal, Protocol
3030

31+
from stac_fastapi.core.utilities import get_bool_env
32+
3133

3234
# stac_pydantic classes extend _GeometryBase, which doesn't have a type field,
3335
# So create our own Protocol for typing
@@ -134,7 +136,7 @@ class Geometry(Protocol): # noqa
134136
"id": {"type": "keyword"},
135137
"collection": {"type": "keyword"},
136138
"geometry": {"type": "geo_shape"},
137-
"assets": {"type": "object"},
139+
"assets": {"type": "object", "enabled": get_bool_env("STAC_INDEX_ASSETS")},
138140
"links": {"type": "object", "enabled": False},
139141
"properties": {
140142
"type": "object",
@@ -162,7 +164,7 @@ class Geometry(Protocol): # noqa
162164
"extent.temporal.interval": {"type": "date"},
163165
"providers": {"type": "object", "enabled": False},
164166
"links": {"type": "object", "enabled": False},
165-
"item_assets": {"type": "object", "enabled": False},
167+
"item_assets": {"type": "object", "enabled": get_bool_env("STAC_INDEX_ASSETS")},
166168
},
167169
}
168170

0 commit comments

Comments
 (0)