|
1 |
| -import logging |
| 1 | +import structlog |
2 | 2 | from pprint import pprint # noqa
|
3 | 3 | from banal import ensure_list, is_mapping
|
4 | 4 | from elasticsearch import TransportError
|
|
9 | 9 | from aleph.core import es
|
10 | 10 | from aleph.settings import SETTINGS
|
11 | 11 |
|
12 |
| -log = logging.getLogger(__name__) |
| 12 | +log = structlog.get_logger(__name__) |
13 | 13 |
|
14 | 14 | BULK_PAGE = 500
|
15 | 15 | # cf. https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-from-size.html # noqa
|
|
56 | 56 | }
|
57 | 57 |
|
58 | 58 |
|
| 59 | +class AlephOperationalException(Exception): |
| 60 | + pass |
| 61 | + |
| 62 | + |
59 | 63 | def get_shard_weight(schema):
|
60 | 64 | if SETTINGS.TESTING:
|
61 | 65 | return 1
|
@@ -180,7 +184,7 @@ def query_delete(index, query, sync=False, **kwargs):
|
180 | 184 | request_timeout=MAX_REQUEST_TIMEOUT,
|
181 | 185 | timeout=MAX_TIMEOUT,
|
182 | 186 | scroll_size=SETTINGS.INDEX_DELETE_BY_QUERY_BATCHSIZE,
|
183 |
| - **kwargs |
| 187 | + **kwargs, |
184 | 188 | )
|
185 | 189 | return
|
186 | 190 | except TransportError as exc:
|
@@ -237,8 +241,7 @@ def _check_response(index, res):
|
237 | 241 | if res.get("status", 0) > 399 and not res.get("acknowledged"):
|
238 | 242 | error = res.get("error", {}).get("reason")
|
239 | 243 | log.error("Index [%s] error: %s", index, error)
|
240 |
| - return False |
241 |
| - return True |
| 244 | + raise AlephOperationalException(f"Index {index} error: {error}") |
242 | 245 |
|
243 | 246 |
|
244 | 247 | def rewrite_mapping_safe(pending, existing):
|
@@ -286,26 +289,42 @@ def configure_index(index, mapping, settings):
|
286 | 289 | "timeout": MAX_TIMEOUT,
|
287 | 290 | "master_timeout": MAX_TIMEOUT,
|
288 | 291 | }
|
289 |
| - config = es.indices.get(index=index).get(index, {}) |
| 292 | + res = es.indices.get(index=index) |
| 293 | + |
| 294 | + if len(res) != 1: |
| 295 | + # This case should never occur. |
| 296 | + log.error("ES response", res=res) |
| 297 | + raise AlephOperationalException("ES response is empty or ambiguous.") |
| 298 | + |
| 299 | + # The ES response is an object with items for every requested index. As we only request |
| 300 | + # a single index, we extract the first and only item from the response. We cannot simply |
| 301 | + # extract the response data using the index name as the name we use to request the index |
| 302 | + # may be an alias whereas the response will always contain the actual un-aliased name. |
| 303 | + config = list(res.values())[0] |
| 304 | + |
290 | 305 | settings.get("index").pop("number_of_shards")
|
| 306 | + log.info( |
| 307 | + f"[{index}] Current settings.", index=index, settings=config.get("settings") |
| 308 | + ) |
291 | 309 | if check_settings_changed(settings, config.get("settings")):
|
292 |
| - res = es.indices.close(ignore_unavailable=True, **options) |
293 |
| - res = es.indices.put_settings(body=settings, **options) |
294 |
| - if not _check_response(index, res): |
295 |
| - return False |
| 310 | + log.info(f"[{index}] Updated settings.", index=index, settings=settings) |
| 311 | + _check_response(index, es.indices.close(ignore_unavailable=True, **options)) |
| 312 | + _check_response(index, es.indices.put_settings(body=settings, **options)) |
| 313 | + else: |
| 314 | + log.info(f"[{index}] No changes detected in settings.", index=index) |
| 315 | + log.info( |
| 316 | + f"[{index}] Current mappings.", index=index, mappings=config.get("mappings") |
| 317 | + ) |
| 318 | + log.info(f"[{index}] New mappings.", index=index, mappings=mapping) |
296 | 319 | mapping = rewrite_mapping_safe(mapping, config.get("mappings"))
|
297 |
| - res = es.indices.put_mapping(body=mapping, ignore=[400], **options) |
298 |
| - if not _check_response(index, res): |
299 |
| - return False |
300 |
| - res = es.indices.open(**options) |
301 |
| - return True |
| 320 | + _check_response( |
| 321 | + index, es.indices.put_mapping(body=mapping, ignore=[400], **options) |
| 322 | + ) |
| 323 | + _check_response(index, es.indices.open(**options)) |
302 | 324 | else:
|
303 |
| - log.info("Creating index: %s...", index) |
| 325 | + log.info(f"Creating index: {index}...", index=index) |
304 | 326 | body = {"settings": settings, "mappings": mapping}
|
305 |
| - res = es.indices.create(index, body=body) |
306 |
| - if not _check_response(index, res): |
307 |
| - return False |
308 |
| - return True |
| 327 | + _check_response(index, es.indices.create(index, body=body)) |
309 | 328 |
|
310 | 329 |
|
311 | 330 | def index_settings(shards=5, replicas=SETTINGS.INDEX_REPLICAS):
|
|
0 commit comments