-
Notifications
You must be signed in to change notification settings - Fork 1
Ecs processing #223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Ecs processing #223
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds support for ECS (Elastic Container Service) based processing workflows to the seafloor geodesy tools. The changes introduce a new working environment type (ECS) that enables cloud-based data processing with RDS database backend and S3 storage integration.
Key Changes:
- Adds ECS environment support with RDS database connectivity for asset catalog management
- Implements SV3PipelineECS class for cloud-based SV3 data processing workflows
- Extends TileDB operations to support time-based filtering via Unix timestamps in RINEX generation
- Updates timezone handling to use UTC consistently across shotdata and observation processing
Reviewed changes
Copilot reviewed 22 out of 23 changed files in this pull request and generated 32 comments.
Show a summary per file
| File | Description |
|---|---|
| src/golangtools/cmd/tdb2rnx/main.go | Adds Unix timestamp filtering to FilterDaySlices and main functions for time-range based RINEX generation |
| src/es_sfgtools/workflows/workflow_handler.py | Extends directory initialization to support ECS environment using match statement |
| src/es_sfgtools/workflows/utils/protocols.py | Updates asset catalog initialization for ECS with RDS backend |
| src/es_sfgtools/workflows/preprocess_ingest/data_handler.py | Adds S3-based TileDB array creation and conditional consolidation for ECS |
| src/es_sfgtools/workflows/pipelines/sv3_pipeline.py | Introduces new SV3PipelineECS class with cloud-based processing capabilities and multi-threaded operations |
| src/es_sfgtools/workflows/modeling/garpos_handler.py | Fixes timezone handling in garpos plotting to use UTC explicitly |
| src/es_sfgtools/tiledb_tools/tiledb_schemas.py | Updates datetime handling to ensure UTC timezone in read operations |
| src/es_sfgtools/tiledb_tools/tiledb_operations.py | Adds tile2rinex_ecs function with Unix timestamp parameters for ECS workflows |
| src/es_sfgtools/prefiltering/utils.py | Adds time-based filtering to shotdata using Unix timestamps |
| src/es_sfgtools/data_mgmt/ingestion/archive_pull.py | Fixes URL path construction with proper separator |
| src/es_sfgtools/data_mgmt/directorymgmt/schemas.py | Extends TileDB directory building to support ECS with S3 paths |
| src/es_sfgtools/data_mgmt/directorymgmt/handler.py | Updates directory loading logic for ECS environment |
| src/es_sfgtools/data_mgmt/assetcatalog/utils.py | Adds AWS Secrets Manager integration for RDS credentials |
| src/es_sfgtools/data_mgmt/assetcatalog/schemas.py | Adds ConnectionInfo model for RDS database configuration |
| src/es_sfgtools/data_mgmt/assetcatalog/handler.py | Extends PreProcessCatalogHandler with RDS support and environment-aware database selection |
| src/es_sfgtools/config/env_config.py | Adds ECS working environment type and configuration |
| src/es_sfgtools/init.py | Updates version to 0.5.3 |
| src/es_sfgtools/._version.py | Creates version file with 0.5.3 |
| mac_environment.yml | Adds hatanaka dependency for RINEX compression |
| linux_environment.yml | Adds hatanaka dependency for RINEX compression |
| dev/dev_ecs.py | Adds development script demonstrating ECS workflow usage |
| dev/DEV_GARPOS_QUIRKS_DEMO.py | Updates demo with override flag changes |
Comments suppressed due to low confidence (1)
src/es_sfgtools/workflows/pipelines/sv3_pipeline.py:12
- Import of 'pd' is not used.
from turtle import pd
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| year_int = int(year) | ||
| campaign_dates = [d for d in unique_dates if d.year == year_int] | ||
|
|
||
| def process_date(date:datetime.date,self=self) -> None: |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent parameter naming. The function parameter on line 1422 is named 'date' but is annotated as 'datetime.date', and the default parameter 'self=self' is unconventional. The closure captures 'self' automatically, so the explicit 'self=self' parameter is unnecessary and non-idiomatic. Consider removing this parameter or restructuring as a class method.
| def process_date(date:datetime.date,self=self) -> None: | |
| def process_date(date: datetime.date) -> None: |
| if startUnix > 0 && endUnix > 0 { | ||
| filterStart = time.Unix(startUnix, 0).UTC() | ||
| filterEnd = time.Unix(endUnix, 0).UTC() | ||
| useTimeFilter = true | ||
| log.Infof("Filtering day slices between %s and %s", filterStart, filterEnd) | ||
| } |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing validation for time range parameters. The function accepts startUnix and endUnix parameters but only validates that both are greater than 0 (line 168). This doesn't check if startUnix is less than endUnix, which could lead to invalid time ranges being processed. Consider adding validation that startUnix < endUnix.
| # Compress RINEX file with Hatanaka | ||
| # Path('1lsu0010.21d.gz').write_bytes(hatanaka.compress(rinex_data)) | ||
| compressed_rinex_path = rinex_file.with_suffix(rinex_file.suffix + ".gz") | ||
| compressed_rinex_path = hatanaka.compress_on_disk(str(rinex_file),delete=True) |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing error handling. Line 1454 assigns the result of hatanaka.compress_on_disk but doesn't check if an error occurred. If compression fails, the subsequent operations (upload, cataloging) will operate on an invalid path, potentially causing cryptic errors downstream.
| compressed_rinex_path = hatanaka.compress_on_disk(str(rinex_file),delete=True) | |
| try: | |
| compressed_rinex_result = hatanaka.compress_on_disk( | |
| str(rinex_file), | |
| delete=True, | |
| ) | |
| except Exception as exc: | |
| ProcessLogger.logerror( | |
| f"Failed to Hatanaka-compress RINEX file {rinex_file} for " | |
| f"{self.current_station_name} on {date.strftime('%Y-%m-%d')}: {exc}" | |
| ) | |
| return | |
| if not compressed_rinex_result: | |
| ProcessLogger.logerror( | |
| f"Hatanaka compression returned no output path for RINEX file {rinex_file} " | |
| f"for {self.current_station_name} on {date.strftime('%Y-%m-%d')}" | |
| ) | |
| return | |
| compressed_rinex_path = Path(compressed_rinex_result) | |
| if not compressed_rinex_path.exists(): | |
| ProcessLogger.logerror( | |
| f"Hatanaka-compressed RINEX file not found at {compressed_rinex_path} " | |
| f"for {self.current_station_name} on {date.strftime('%Y-%m-%d')}" | |
| ) | |
| return |
| network_id, station_id, campaign_id,use_local=False | ||
| ) | ||
| if dtype_counts == {}: | ||
| message = f"No catalogged files found for {network_id}/{station_id}/{campaign_id}. Ensure data is ingested before processing." |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in log message: "catalogged" should be "cataloged". The double 'g' is a spelling error.
| message = f"No catalogged files found for {network_id}/{station_id}/{campaign_id}. Ensure data is ingested before processing." | |
| message = f"No cataloged files found for {network_id}/{station_id}/{campaign_id}. Ensure data is ingested before processing." |
| """ | ||
| Process Novatel 770 files | ||
| 1. Query asset catalog for Novatel 770 files for current context | ||
| 2. If files exist, check if processing is needed (override or not merged) | ||
| 3. Call novatel_770_2tile to process files into TileDB GNSS observation array | ||
| 4. Update asset catalog with merge job | ||
| """ |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate docstring. Lines 1261-1268 contain a docstring that duplicates the processing steps already documented in lines 1248-1256. The duplicate docstring should be removed to avoid confusion and maintain clean documentation.
| """ | |
| Process Novatel 770 files | |
| 1. Query asset catalog for Novatel 770 files for current context | |
| 2. If files exist, check if processing is needed (override or not merged) | |
| 3. Call novatel_770_2tile to process files into TileDB GNSS observation array | |
| 4. Update asset catalog with merge job | |
| """ |
| return False | ||
|
|
||
| def get_assets(self, |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assignment to 'get_assets' is unnecessary as it is redefined before this value is used.
| if rinex_file is not None: | ||
| # Compress RINEX file with Hatanaka | ||
| # Path('1lsu0010.21d.gz').write_bytes(hatanaka.compress(rinex_data)) | ||
| compressed_rinex_path = rinex_file.with_suffix(rinex_file.suffix + ".gz") |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assignment to 'compressed_rinex_path' is unnecessary as it is redefined before this value is used.
| compressed_rinex_path = rinex_file.with_suffix(rinex_file.suffix + ".gz") |
| from es_sfgtools.workflows.workflow_handler import WorkflowHandler | ||
| from es_sfgtools.workflows.midprocess import IntermediateDataProcessor | ||
| from es_sfgtools.workflows.pipelines.sv3_pipeline import SV3PipelineECS | ||
| from es_sfgtools.data_mgmt.assetcatalog.schemas import AssetEntry |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'AssetEntry' is not used.
| from es_sfgtools.data_mgmt.assetcatalog.schemas import AssetEntry |
| import threading | ||
| from turtle import pd | ||
| from typing import List, Optional,Tuple | ||
| import os |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'os' is not used.
| import os |
| TDBKinPositionArray, | ||
| TDBShotDataArray, | ||
| ) | ||
| from es_sfgtools.config.env_config import Environment,WorkingEnvironment |
Copilot
AI
Dec 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import of 'WorkingEnvironment' is not used.
| from es_sfgtools.config.env_config import Environment,WorkingEnvironment | |
| from es_sfgtools.config.env_config import Environment |
No description provided.