From 59a88fd59efd497e5edb84dad92f8dfd66af7ebb Mon Sep 17 00:00:00 2001 From: Favyen Bastani Date: Tue, 4 Mar 2025 14:45:21 -0800 Subject: [PATCH 1/2] Support retry loop for prepare/ingest/materialize. Some data sources have transient errors especially when making lots of API calls, e.g. we have had issues with the PlanetBasemap and PlanetaryComputer data sources. So it can be useful to have a general-purpose retry option rather than implementing retry loop inside every problematic data source. --- rslearn/dataset/manage.py | 130 ++++++++++++++++++++++++++++++++------ rslearn/main.py | 121 +++++++++++++++++++++++++++++++---- 2 files changed, 219 insertions(+), 32 deletions(-) diff --git a/rslearn/dataset/manage.py b/rslearn/dataset/manage.py index 5778c48..959f91f 100644 --- a/rslearn/dataset/manage.py +++ b/rslearn/dataset/manage.py @@ -1,5 +1,11 @@ """Functions to manage datasets.""" +import random +import time +from collections.abc import Callable +from datetime import timedelta +from typing import Any + import rslearn.data_sources from rslearn.config import ( LayerConfig, @@ -17,8 +23,39 @@ logger = get_logger(__name__) +def retry(fn: Callable, retry_max_attempts: int, retry_backoff: timedelta) -> Any: + """Retry the function multiple times in case of error. + + The function is retried until either the attempts are exhausted, or the function + runs successfully without raising an Exception. + + Args: + fn: the function to call. + retry_max_attempts: retry this many times (plus the original attempt) before + giving up (and raising Exception). + retry_backoff: the base backoff time used to compute how long to wait between + retries. The actual time is (retry_backoff * attempts) * r, where r is a + random number between 1 and 2, and attempts is the number of attempts tried + so far. + """ + for attempt_idx in range(retry_max_attempts): + try: + return fn() + except Exception as e: + logger.debug(f"Retrying after catching error in retry loop: {e}") + sleep_base_seconds = retry_backoff.total_seconds() * (attempt_idx + 1) + time.sleep(sleep_base_seconds * (1 + random.random())) + + # Last attempt. This time we don't catch the exception. + return fn() + + def prepare_dataset_windows( - dataset: Dataset, windows: list[Window], force: bool = False + dataset: Dataset, + windows: list[Window], + force: bool = False, + retry_max_attempts: int = 0, + retry_backoff: timedelta = timedelta(minutes=1), ) -> None: """Prepare windows in a dataset. @@ -30,11 +67,15 @@ def prepare_dataset_windows( windows: the windows to prepare force: whether to prepare windows even if they were previously prepared (default false) + retry_max_attempts: set greater than zero to retry for this many attempts in + case of error. + retry_backoff: how long to wait before retrying (see retry). """ # Iterate over retrieved layers, and prepare each one. for layer_name, layer_cfg in dataset.layers.items(): if not layer_cfg.data_source: continue + data_source_cfg = layer_cfg.data_source # Get windows that need to be prepared for this layer. needed_windows = [] @@ -59,13 +100,13 @@ def prepare_dataset_windows( geometry = window.get_geometry() # Apply temporal modifiers. - time_offset = layer_cfg.data_source.time_offset + time_offset = data_source_cfg.time_offset if geometry.time_range and time_offset: geometry.time_range = ( geometry.time_range[0] + time_offset, geometry.time_range[1] + time_offset, ) - duration = layer_cfg.data_source.duration + duration = data_source_cfg.duration if geometry.time_range and duration: geometry.time_range = ( geometry.time_range[0], @@ -74,7 +115,12 @@ def prepare_dataset_windows( geometries.append(geometry) - results = data_source.get_items(geometries, layer_cfg.data_source.query_config) + results = retry( + fn=lambda: data_source.get_items(geometries, data_source_cfg.query_config), + retry_max_attempts=retry_max_attempts, + retry_backoff=retry_backoff, + ) + for window, result in zip(needed_windows, results): layer_datas = window.load_layer_datas() layer_datas[layer_name] = WindowLayerData( @@ -86,7 +132,12 @@ def prepare_dataset_windows( window.save_layer_datas(layer_datas) -def ingest_dataset_windows(dataset: Dataset, windows: list[Window]) -> None: +def ingest_dataset_windows( + dataset: Dataset, + windows: list[Window], + retry_max_attempts: int = 0, + retry_backoff: timedelta = timedelta(minutes=1), +) -> None: """Ingest items for retrieved layers in a dataset. The items associated with the specified windows are downloaded and divided into @@ -95,6 +146,9 @@ def ingest_dataset_windows(dataset: Dataset, windows: list[Window]) -> None: Args: dataset: the dataset windows: the windows to ingest + retry_max_attempts: set greater than zero to retry for this many attempts in + case of error. + retry_backoff: how long to wait before retrying (see retry). """ tile_store = dataset.get_tile_store() for layer_name, layer_cfg in dataset.layers.items(): @@ -123,10 +177,19 @@ def ingest_dataset_windows(dataset: Dataset, windows: list[Window]) -> None: print(f"Ingesting {len(geometries_by_item)} items in layer {layer_name}") geometries_and_items = list(geometries_by_item.items()) - data_source.ingest( - tile_store=get_tile_store_with_layer(tile_store, layer_name, layer_cfg), - items=[item for item, _ in geometries_and_items], - geometries=[geometries for _, geometries in geometries_and_items], + + # Use retry loop for the actual data source ingest call. + def ingest() -> None: + data_source.ingest( + tile_store=get_tile_store_with_layer(tile_store, layer_name, layer_cfg), + items=[item for item, _ in geometries_and_items], + geometries=[geometries for _, geometries in geometries_and_items], + ) + + retry( + fn=ingest, + retry_max_attempts=retry_max_attempts, + retry_backoff=retry_backoff, ) @@ -186,6 +249,8 @@ def materialize_window( tile_store: TileStore, layer_name: str, layer_cfg: LayerConfig, + retry_max_attempts: int = 0, + retry_backoff: timedelta = timedelta(minutes=1), ) -> None: """Materialize a window. @@ -196,6 +261,9 @@ def materialize_window( tile_store: tile store of the dataset to materialize from layer_name: the layer name layer_cfg: the layer config + retry_max_attempts: set greater than zero to retry for this many attempts in + case of error. + retry_backoff: how long to wait before retrying (see retry). """ # Check if layer is materialized already. if window.is_layer_completed(layer_name): @@ -237,12 +305,17 @@ def materialize_window( materializer = Materializers[dataset.materializer_name] else: materializer = Materializers[layer_cfg.layer_type.value] - materializer.materialize( - get_tile_store_with_layer(tile_store, layer_name, layer_cfg), - window, - layer_name, - layer_cfg, - item_groups, + + retry( + fn=lambda: materializer.materialize( + get_tile_store_with_layer(tile_store, layer_name, layer_cfg), + window, + layer_name, + layer_cfg, + item_groups, + ), + retry_max_attempts=retry_max_attempts, + retry_backoff=retry_backoff, ) else: @@ -250,10 +323,21 @@ def materialize_window( print( f"Materializing {len(item_groups)} item groups in layer {layer_name} via data source" ) - data_source.materialize(window, item_groups, layer_name, layer_cfg) + retry( + fn=lambda: data_source.materialize( + window, item_groups, layer_name, layer_cfg + ), + retry_max_attempts=retry_max_attempts, + retry_backoff=retry_backoff, + ) -def materialize_dataset_windows(dataset: Dataset, windows: list[Window]) -> None: +def materialize_dataset_windows( + dataset: Dataset, + windows: list[Window], + retry_max_attempts: int = 0, + retry_backoff: timedelta = timedelta(minutes=1), +) -> None: """Materialize items for retrieved layers in a dataset. The portions of items corresponding to dataset windows are extracted from the tile @@ -262,6 +346,9 @@ def materialize_dataset_windows(dataset: Dataset, windows: list[Window]) -> None Args: dataset: the dataset windows: the windows to materialize + retry_max_attempts: set greater than zero to retry for this many attempts in + case of error. + retry_backoff: how long to wait before retrying (see retry). """ tile_store = dataset.get_tile_store() for layer_name, layer_cfg in dataset.layers.items(): @@ -274,5 +361,12 @@ def materialize_dataset_windows(dataset: Dataset, windows: list[Window]) -> None for window in windows: materialize_window( - window, dataset, data_source, tile_store, layer_name, layer_cfg + window=window, + dataset=dataset, + data_source=data_source, + tile_store=tile_store, + layer_name=layer_name, + layer_cfg=layer_cfg, + retry_max_attempts=retry_max_attempts, + retry_backoff=retry_backoff, ) diff --git a/rslearn/main.py b/rslearn/main.py index 07d9cd7..2535562 100644 --- a/rslearn/main.py +++ b/rslearn/main.py @@ -5,7 +5,7 @@ import random import sys from collections.abc import Callable -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from typing import Any, TypeVar import tqdm @@ -18,7 +18,11 @@ from rslearn.data_sources import Item, data_source_from_config from rslearn.dataset import Dataset, Window, WindowLayerData from rslearn.dataset.add_windows import add_windows_from_box, add_windows_from_file -from rslearn.dataset.manage import materialize_dataset_windows, prepare_dataset_windows +from rslearn.dataset.manage import ( + materialize_dataset_windows, + prepare_dataset_windows, + retry, +) from rslearn.log_utils import get_logger from rslearn.tile_stores import get_tile_store_with_layer from rslearn.train.data_module import RslearnDataModule @@ -355,14 +359,24 @@ def apply_on_windows_args(f: Callable[..., None], args: argparse.Namespace) -> N class PrepareHandler: """apply_on_windows handler for the rslearn dataset prepare command.""" - def __init__(self, force: bool) -> None: + def __init__( + self, + force: bool, + retry_max_attempts: int = 0, + retry_backoff: timedelta = timedelta(minutes=1), + ) -> None: """Initialize a new PrepareHandler. Args: force: force prepare + retry_max_attempts: set greater than zero to retry for this many attempts in + case of error. + retry_backoff: how long to wait before retrying (see retry). """ self.force = force self.dataset: Dataset | None = None + self.retry_max_attempts = retry_max_attempts + self.retry_backoff = retry_backoff def set_dataset(self, dataset: Dataset) -> None: """Captures the dataset from apply_on_windows_args. @@ -377,7 +391,13 @@ def __call__(self, windows: list[Window]) -> None: logger.info(f"Running prepare on {len(windows)} windows") if self.dataset is None: raise ValueError("dataset not set") - prepare_dataset_windows(self.dataset, windows, self.force) + prepare_dataset_windows( + self.dataset, + windows, + self.force, + retry_max_attempts=self.retry_max_attempts, + retry_backoff=self.retry_backoff, + ) @register_handler("dataset", "prepare") @@ -400,10 +420,26 @@ def dataset_prepare() -> None: default="", help="List of layers to disable e.g 'layer1,layer2'", ) + parser.add_argument( + "--retry-max-attempts", + type=int, + default=0, + help="Retry for this many attempts", + ) + parser.add_argument( + "--retry-backoff-seconds", + type=int, + default=0, + help="Backoff time (seconds) between retries", + ) add_apply_on_windows_args(parser) args = parser.parse_args(args=sys.argv[3:]) - fn = PrepareHandler(args.force) + fn = PrepareHandler( + args.force, + retry_max_attempts=args.retry_max_attempts, + retry_backoff=timedelta(seconds=args.retry_backoff_seconds), + ) apply_on_windows_args(fn, args) @@ -417,10 +453,17 @@ def _load_window_layer_datas( class IngestHandler: """apply_on_windows handler for the rslearn dataset ingest command.""" - def __init__(self, ignore_errors: bool = False) -> None: + def __init__( + self, + ignore_errors: bool = False, + retry_max_attempts: int = 0, + retry_backoff: timedelta = timedelta(minutes=1), + ) -> None: """Initialize a new IngestHandler.""" self.dataset: Dataset | None = None self.ignore_errors = ignore_errors + self.retry_max_attempts = retry_max_attempts + self.retry_backoff = retry_backoff def set_dataset(self, dataset: Dataset) -> None: """Captures the dataset from apply_on_windows_args. @@ -464,10 +507,16 @@ def __call__( data_source = data_source_from_config(layer_cfg, self.dataset.path) try: - data_source.ingest( - tile_store=layer_tile_store, - items=[item for item, _ in items_and_geometries], - geometries=[geometries for _, geometries in items_and_geometries], + retry( + lambda: data_source.ingest( + tile_store=layer_tile_store, + items=[item for item, _ in items_and_geometries], + geometries=[ + geometries for _, geometries in items_and_geometries + ], + ), + retry_max_attempts=self.retry_max_attempts, + retry_backoff=self.retry_backoff, ) except Exception as e: if not self.ignore_errors: @@ -564,20 +613,43 @@ def dataset_ingest() -> None: help="Ignore ingestion errors in individual jobs", action=argparse.BooleanOptionalAction, ) + parser.add_argument( + "--retry-max-attempts", + type=int, + default=0, + help="Retry for this many attempts", + ) + parser.add_argument( + "--retry-backoff-seconds", + type=int, + default=0, + help="Backoff time (seconds) between retries", + ) add_apply_on_windows_args(parser) args = parser.parse_args(args=sys.argv[3:]) - fn = IngestHandler(ignore_errors=args.ignore_errors) + fn = IngestHandler( + ignore_errors=args.ignore_errors, + retry_max_attempts=args.retry_max_attempts, + retry_backoff=timedelta(seconds=args.retry_backoff_seconds), + ) apply_on_windows_args(fn, args) class MaterializeHandler: """apply_on_windows handler for the rslearn dataset materialize command.""" - def __init__(self, ignore_errors: bool = False) -> None: + def __init__( + self, + ignore_errors: bool = False, + retry_max_attempts: int = 0, + retry_backoff: timedelta = timedelta(minutes=1), + ) -> None: """Initialize a MaterializeHandler.""" self.dataset: Dataset | None = None self.ignore_errors = ignore_errors + self.retry_max_attempts = retry_max_attempts + self.retry_backoff = retry_backoff def set_dataset(self, dataset: Dataset) -> None: """Captures the dataset from apply_on_windows_args. @@ -593,7 +665,12 @@ def __call__(self, windows: list[Window]) -> None: if self.dataset is None: raise ValueError("dataset not set") try: - materialize_dataset_windows(self.dataset, windows) + materialize_dataset_windows( + self.dataset, + windows, + retry_max_attempts=self.retry_max_attempts, + retry_backoff=self.retry_backoff, + ) except Exception as e: if not self.ignore_errors: logger.error(f"Error materializing windows: {e}") @@ -624,9 +701,25 @@ def dataset_materialize() -> None: help="Ignore errors in individual jobs", action=argparse.BooleanOptionalAction, ) + parser.add_argument( + "--retry-max-attempts", + type=int, + default=0, + help="Retry for this many attempts", + ) + parser.add_argument( + "--retry-backoff-seconds", + type=int, + default=0, + help="Backoff time (seconds) between retries", + ) add_apply_on_windows_args(parser) args = parser.parse_args(args=sys.argv[3:]) - fn = MaterializeHandler(ignore_errors=args.ignore_errors) + fn = MaterializeHandler( + ignore_errors=args.ignore_errors, + retry_max_attempts=args.retry_max_attempts, + retry_backoff=timedelta(seconds=args.retry_backoff_seconds), + ) apply_on_windows_args(fn, args) From 91dafd2246fdd6391fa32643ad85f09b7cc262e7 Mon Sep 17 00:00:00 2001 From: Favyen Bastani Date: Tue, 4 Mar 2025 15:24:58 -0800 Subject: [PATCH 2/2] ensure planet_basemap can leverage retry for initializing mosaics --- rslearn/data_sources/planet_basemap.py | 45 ++++++++++++++++++-------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/rslearn/data_sources/planet_basemap.py b/rslearn/data_sources/planet_basemap.py index f3ea4d7..887fe85 100644 --- a/rslearn/data_sources/planet_basemap.py +++ b/rslearn/data_sources/planet_basemap.py @@ -83,6 +83,7 @@ def __init__( environmnet variable). """ self.config = config + self.series_id = series_id self.bands = bands self.session = requests.Session() @@ -90,18 +91,8 @@ def __init__( api_key = os.environ["PL_API_KEY"] self.session.auth = (api_key, "") - # List mosaics. - self.mosaics = {} - for mosaic_dict in self._api_get_paginate( - path=f"series/{series_id}/mosaics", list_key="mosaics" - ): - shp = shapely.box(*mosaic_dict["bbox"]) - time_range = ( - datetime.fromisoformat(mosaic_dict["first_acquired"]), - datetime.fromisoformat(mosaic_dict["last_acquired"]), - ) - geom = STGeometry(WGS84_PROJECTION, shp, time_range) - self.mosaics[mosaic_dict["id"]] = geom + # Lazily load mosaics. + self.mosaics: dict | None = None @staticmethod def from_config(config: LayerConfig, ds_path: UPath) -> "PlanetBasemap": @@ -123,6 +114,31 @@ def from_config(config: LayerConfig, ds_path: UPath) -> "PlanetBasemap": kwargs[optional_key] = d[optional_key] return PlanetBasemap(**kwargs) + def _load_mosaics(self) -> dict[str, STGeometry]: + """Lazily load mosaics in the configured series_id from Planet API. + + We don't load it when creating the data source because it takes time and caller + may not be calling get_items. Additionally, loading it during the get_items + call enables leveraging the retry loop functionality in + prepare_dataset_windows. + """ + if self.mosaics is not None: + return self.mosaics + + self.mosaics = {} + for mosaic_dict in self._api_get_paginate( + path=f"series/{self.series_id}/mosaics", list_key="mosaics" + ): + shp = shapely.box(*mosaic_dict["bbox"]) + time_range = ( + datetime.fromisoformat(mosaic_dict["first_acquired"]), + datetime.fromisoformat(mosaic_dict["last_acquired"]), + ) + geom = STGeometry(WGS84_PROJECTION, shp, time_range) + self.mosaics[mosaic_dict["id"]] = geom + + return self.mosaics + def _api_get( self, path: str | None = None, @@ -159,6 +175,7 @@ def _api_get( raise ApiError( f"{url}: got status code {response.status_code}: {response.text}" ) + return response.json() def _api_get_paginate( @@ -204,6 +221,8 @@ def get_items( Returns: List of groups of items that should be retrieved for each geometry. """ + mosaics = self._load_mosaics() + groups = [] for geometry in geometries: geom_bbox = geometry.to_projection(WGS84_PROJECTION).shp.bounds @@ -212,7 +231,7 @@ def get_items( # Find the relevant mosaics that the geometry intersects. # For each relevant mosaic, identify the intersecting quads. items = [] - for mosaic_id, mosaic_geom in self.mosaics.items(): + for mosaic_id, mosaic_geom in mosaics.items(): if not geometry.intersects(mosaic_geom): continue logger.info(f"found mosaic {mosaic_geom} for geom {geometry}")