diff --git a/requirements.txt b/requirements.txt index 573b5521..0b92e230 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ boto3 botocore numpy pandas +scipy julian pyarrow pandera diff --git a/src/es_sfgtools/pipeline/__init__.py b/src/es_sfgtools/pipeline/__init__.py index 78371350..7eb021cb 100644 --- a/src/es_sfgtools/pipeline/__init__.py +++ b/src/es_sfgtools/pipeline/__init__.py @@ -1 +1 @@ -from .temp import DataHandler,DATA_TYPE,FILE_TYPE,DataCatalog \ No newline at end of file +from .temp import DataHandler,DATA_TYPE,FILE_TYPE,DataCatalog, SOURCE_MAP, TARGET_MAP \ No newline at end of file diff --git a/src/es_sfgtools/pipeline/temp.py b/src/es_sfgtools/pipeline/temp.py index dcf0d482..3fe1de26 100644 --- a/src/es_sfgtools/pipeline/temp.py +++ b/src/es_sfgtools/pipeline/temp.py @@ -11,12 +11,12 @@ import matplotlib.pyplot as plt import matplotlib.dates as mdates import seaborn -import tqdm +from tqdm.autonotebook import tqdm from multiprocessing import Pool, cpu_count from functools import partial import numpy as np import warnings -import folium +import folium import json warnings.filterwarnings("ignore") seaborn.set_theme(style="whitegrid") @@ -36,11 +36,13 @@ class FILE_TYPE(Enum): SEABIRD = "svpavg" NOVATEL770 = "novatel770" DFPO00 = "dfpo00" + OFFLOAD = "offload" @classmethod def to_schema(cls): return [x.name for x in cls] + FILE_TYPES = [x.value for x in FILE_TYPE] class DATA_TYPE(Enum): @@ -170,6 +172,7 @@ def __init__(self,working_dir:Path) -> None: self.catalog_data = DataCatalog.validate(pd.read_csv(self.catalog)) else: self.catalog_data = pd.DataFrame() + logger.info(f"Data Handler initialized, data will be stored in {self.working_dir}") def _get_timestamp(self,remote_prefix:str) -> pd.Timestamp: """ @@ -189,6 +192,21 @@ def _get_timestamp(self,remote_prefix:str) -> pd.Timestamp: except: return None + def get_local_counts(self): + try: + local_files = self.catalog_data[self.catalog_data['local_location'].notnull()] + data_type_counts = local_files.type.value_counts() + except (AttributeError, KeyError): + data_type_counts = pd.Series() + return data_type_counts + + def get_dtype_counts(self): + try: + data_type_counts = self.catalog_data[self.catalog_data.type.isin(FILE_TYPES)].type.value_counts() + except AttributeError: + data_type_counts = "No data types found" + return data_type_counts + def add_campaign_data(self, network: str, station: str, @@ -215,7 +233,7 @@ def add_campaign_data(self, if file_type in file.replace("_", ""): discovered_file_type = file_type break - + if discovered_file_type is None: logger.error(f"File type not recognized for {file}") continue @@ -254,36 +272,14 @@ def add_campaign_data(self, self.catalog_data = pd.concat([self.catalog_data, incoming_df]) self.catalog_data.to_csv(self.catalog,index=False) - # Get count of each data type in the catalog - data_type_counts = self.catalog_data[self.catalog_data.type.isin(FILE_TYPES)].type.value_counts() - return data_type_counts - - def _download_https(self, remote_url: Path, destination_dir: Path, token_path='.'): - """ - Downloads a file from the specified https url on gage-data - - Args: - remote_url (Path): The path of the file in the gage-data storage. - destination (Path): The local path where the file will be downloaded. - - Returns: - bool: True if the file was downloaded successfully, False otherwise. - """ - try: - local_location = destination_dir / Path(remote_url).name - get_file_from_gage_data(url=remote_url, - dest_dir=destination_dir, - token_path=token_path) - logger.info(f"Downloaded {str(remote_url)} to {str(local_location)}") - return True - except Exception as e: - response = f"Error downloading {str(remote_url)} \n {e}" - response += "\n HINT: Check authentication credentials" - logger.error(response) - print(response) - return False - def download_campaign_data(self,network:str,station:str,survey:str,override:bool=False,from_s3:bool=False): + def download_campaign_data(self, + network:str, + station:str, + survey:str, + file_type: str, + override:bool=False, + from_s3:bool=False): """ Retrieves and catalogs data from the remote locations stored in the catalog. @@ -291,6 +287,7 @@ def download_campaign_data(self,network:str,station:str,survey:str,override:bool network (str): The network name. station (str): The station name. survey (str): The survey name. + file_type (str): The type of file to download override (bool): Whether to download the data even if it already exists from_s3 (bool): Use S3 download functionality if remote resourses are in an s3 bucket @@ -299,12 +296,19 @@ def download_campaign_data(self,network:str,station:str,survey:str,override:bool """ # TODO make multithreaded # Find all entries in the catalog that match the params + local_counts = self.get_local_counts() + try: + local_files_of_type = local_counts[file_type] + except KeyError: + local_files_of_type = 0 + logger.info(f"Data directory currently contains {local_files_of_type} files of type {file_type}") entries = self.catalog_data[ (self.catalog_data.network == network) & (self.catalog_data.station == station) & (self.catalog_data.survey == survey) + & (self.catalog_data.type == file_type) ] - + logger.info(f"Downloading {entries.shape[0]-local_files_of_type} missing files of type {file_type}") if entries.shape[0] < 1: raise Exception('No matching data found in catalog') if from_s3: @@ -324,7 +328,7 @@ def download_campaign_data(self,network:str,station:str,survey:str,override:bool remote_url=entry.remote_prefix, destination=local_location, ) - # Check if the entry is from an S3 location or gage/sage + # Check if the entry is from an S3 location or gage-data else: is_download = self._download_https( remote_url=entry.remote_filepath, destination_dir=self.raw_dir @@ -340,9 +344,9 @@ def download_campaign_data(self,network:str,station:str,survey:str,override:bool if count == 0: response = f"No files downloaded" logger.error(response) - print(response) - - logger.info(f"Downloaded {count} files to {str(self.raw_dir)}") + #print(response) + else: + logger.info(f"Downloaded {count} files") self.catalog_data.to_csv(self.catalog,index=False) @@ -412,30 +416,6 @@ def add_campaign_data_s3(self, network: str, station: str, survey: str, bucket: data_type_counts = self.catalog_data[self.catalog_data.type.isin(FILE_TYPES)].type.value_counts() return data_type_counts - def _download_boto(self, client: boto3.client, bucket: str, remote_url: Path, destination: Path): - """ - Downloads a file from the specified S3 bucket. - - Args: - client (boto3.client): The Boto3 client object for S3. - bucket (str): The name of the S3 bucket. - remote_url (Path): The path of the file in the S3 bucket. - destination (Path): The local path where the file will be downloaded. - - Returns: - bool: True if the file was downloaded successfully, False otherwise. - """ - try: - client.download_file(Bucket=bucket, Key=str(remote_url), Filename=str(destination)) - logger.info(f"Downloaded {str(remote_url)} to {str(destination)}") - return True - except Exception as e: - response = f"Error downloading {str(remote_url)} \n {e}" - response += "\n HINT: $ aws sso login" - logger.error(response) - print(response) - return False - def download_campaign_data_s3(self,network:str,station:str,survey:str,override:bool=False): """ Retrieves and catalogs data from the s3 locations stored in the catalog. @@ -480,6 +460,55 @@ def download_campaign_data_s3(self,network:str,station:str,survey:str,override:b logger.info(f"Downloaded {count} files to {str(self.raw_dir)}") self.catalog_data.to_csv(self.catalog,index=False) + + def _download_https(self, remote_url: Path, destination_dir: Path, token_path='.'): + """ + Downloads a file from the specified https url on gage-data + + Args: + remote_url (Path): The path of the file in the gage-data storage. + destination (Path): The local path where the file will be downloaded. + + Returns: + bool: True if the file was downloaded successfully, False otherwise. + """ + try: + #local_location = destination_dir / Path(remote_url).name + get_file_from_gage_data(url=remote_url, + dest_dir=destination_dir, + token_path=token_path) + #logger.info(f"Downloaded {str(remote_url)} to {str(local_location)}") + return True + except Exception as e: + response = f"Error downloading {str(remote_url)} \n {e}" + response += "\n HINT: Check authentication credentials" + logger.error(response) + print(response) + return False + + def _download_boto(self, client: boto3.client, bucket: str, remote_url: Path, destination: Path): + """ + Downloads a file from the specified S3 bucket. + + Args: + client (boto3.client): The Boto3 client object for S3. + bucket (str): The name of the S3 bucket. + remote_url (Path): The path of the file in the S3 bucket. + destination (Path): The local path where the file will be downloaded. + + Returns: + bool: True if the file was downloaded successfully, False otherwise. + """ + try: + client.download_file(Bucket=bucket, Key=str(remote_url), Filename=str(destination)) + logger.info(f"Downloaded {str(remote_url)} to {str(destination)}") + return True + except Exception as e: + response = f"Error downloading {str(remote_url)} \n {e}" + response += "\n HINT: $ aws sso login" + logger.error(response) + print(response) + return False def clear_raw_processed_data(self, network: str, station: str, survey: str): """ @@ -551,6 +580,45 @@ def clear_raw_processed_data(self, network: str, station: str, survey: str): pbar.close() + def add_entry(self, entry: dict): + """ + Add an entry in the catalog. This may result in duplicates, which need to be cleaned up via + consolidate_entries() + + Args: + entry (dict): The new entry. + + Returns: + None + """ + with self.catalog.open("r") as f: + keys=list(f.readline().rstrip().split(',')) + entry_str = "\n" + for key in keys: + if key in entry: + entry_str += f"{str(entry[key])}" + if key != keys[-1]: + entry_str += "," + + with self.catalog.open("a") as f: + print(entry_str) + f.write(entry_str) + + def consolidate_entries(self): + """ + Remove any duplicate entries, keeping the most complete. + + Args: + None + Returns: + None + """ + df = pd.read_csv(str(self.catalog)) + df['count'] = pd.isnull(df).sum(1) + df=df.sort_values(['count']).drop_duplicates(subset=['uuid'],keep='first').drop(labels='count',axis=1) + df=df.sort_index() + df.to_csv(self.catalog,index=False) + def update_entry(self,entry:dict): """ Replace an entry in the catalog with a new entry. @@ -599,8 +667,9 @@ def get_parent_stack( parents: List[Union[FILE_TYPE, DATA_TYPE]] = SOURCE_MAP.get( stack[pointer], [] ) - while parents: - parent = parents.pop() + #while parents: + # parent = parents.pop() + for parent in parents: stack.append(parent) pointer += 1 return stack[::-1] @@ -608,14 +677,14 @@ def get_parent_stack( def _process_targeted( self, parent: dict, child_type: Union[FILE_TYPE, DATA_TYPE] ) -> dict: - + #TODO: implement multithreaded logging, had to switch to print statement below if isinstance(parent, dict): parent = pd.Series(parent) # if parent.processed: # return None - logger.info( - f"Attemping to process {parent.uuid} of Type {parent.type} to {child_type.value}" + print( + f"Attemping to process {os.path.basename(parent.local_location)} ({parent.uuid}) of Type {parent.type} to {child_type.value}" ) child_map = TARGET_MAP.get(FILE_TYPE(parent.type)) if child_map is None: @@ -688,7 +757,10 @@ def _process_targeted( "source_uuid": parent.uuid, "processed": is_processed, } - logger.info(f"\n Successful Processing: \n{str(processed_meta)}") + print(f"Successful Processing: {str(processed_meta)}") + if is_processed == True: + self.update_entry(processed_meta) + #self.add_entry(processed_meta) return processed_meta def _process_data_link(self, @@ -743,7 +815,7 @@ def _process_data_link(self, meta_data_list = [] with Pool(processes=cpu_count()) as pool: - for meta_data in tqdm.tqdm( + for meta_data in tqdm( pool.map( process_func_partial, parent_entries_to_process.to_dict(orient="records"), @@ -753,6 +825,7 @@ def _process_data_link(self, ): if meta_data is not None: self.update_entry(meta_data) + #self.add_entry(meta_data) meta_data_list.append(meta_data) parent_entries_processed = parent_entries_to_process[ @@ -761,18 +834,21 @@ def _process_data_link(self, ) ] logger.info(f"Processed {len(meta_data_list)} Out of {parent_entries_processed.shape[0]} For {target.value} Files from {parent_entries_processed.shape[0]} Parent Files") + self.consolidate_entries() return parent_entries_processed def _process_data_graph(self, network: str, station: str, survey: str,child_type:Union[FILE_TYPE,DATA_TYPE],override:bool=False): processing_queue = self.get_parent_stack(child_type=child_type) - + #logger.info(f"processing queue: {processing_queue}") while processing_queue: parent = processing_queue.pop(0) + #logger.info(f"parent: {parent}") children:dict = TARGET_MAP.get(parent,{}) + #logger.info(f"children: {children}") children_to_process = [k for k in children.keys() if k in processing_queue] - + #logger.info(f"children to process: {children_to_process}") for child in children_to_process: - + #logger.info(f"child:{child}") processed_parents:pd.DataFrame = self._process_data_link(network,station,survey,target=child,source=[parent],override=override) # Check if all children of this parent have been processed if processed_parents is not None: diff --git a/src/es_sfgtools/utils/gage_data.py b/src/es_sfgtools/utils/gage_data.py index 47bcd24b..0f0ff072 100644 --- a/src/es_sfgtools/utils/gage_data.py +++ b/src/es_sfgtools/utils/gage_data.py @@ -3,12 +3,15 @@ import os import urllib.request import ssl +import logging ssl._create_default_https_context = ssl._create_stdlib_context from earthscope_sdk.auth.device_code_flow import DeviceCodeFlowSimple from earthscope_sdk.auth.auth_flow import NoTokensError +logger = logging.getLogger(__name__) + def retrieve_token(token_path="."): """ Retrieve or generate a token for the GAGE data archive using the EarthScope SDK. @@ -140,12 +143,11 @@ def download_file_list_from_gage_data(file_urls: list, dest_dir='./files', token print(f"Failed to download {len(failed_files)} files.") print(failed_files) -def generate_gage_data_survey_url(network, station, survey): - url = f'https://gage-data.earthscope.org/archive/seafloor/{network}/{station}/{survey}/raw' +def generate_gage_data_survey_url(network, station, survey, level='raw'): + url = f'https://gage-data.earthscope.org/archive/seafloor/{network}/{station}/{survey}/{level}' return url -def get_survey_file_dict(url): - file_list = list_files_from_gage_data(url) +def list_file_counts_by_type(file_list, url=None): file_dict = {} for file in file_list: if 'master' in file: @@ -153,24 +155,41 @@ def get_survey_file_dict(url): elif 'lever_arms' in file: file_dict.setdefault('lever_arms', []).append(file) elif 'bcsonardyne' in file: - file_dict.setdefault('bcsonardyne', []).append(file) + file_dict.setdefault('sonardyne', []).append(file) elif 'bcnovatel' in file: - file_dict.setdefault('bcnovatel', []).append(file) + file_dict.setdefault('novatel', []).append(file) + elif 'bcoffload' in file: + file_dict.setdefault('offload', []).append(file) elif file.endswith('NOV770.raw'): file_dict.setdefault('NOV770', []).append(file) elif file.endswith('DFOP00.raw'): file_dict.setdefault('DFOP00', []).append(file) elif file.endswith('NOV000.bin'): file_dict.setdefault('NOV000', []).append(file) + elif "ctd" in file: + file_dict.setdefault('ctd', []).append(file) - file_dict['ctd'] = list_files_from_gage_data(f'{url}/ctd') - - print('Found:') + if url is not None: + logger.info(f'Found under {url}:') + else: + logger.info('Found:') for k,v in file_dict.items(): - print(f' {len(v)} {k} file(s)') - + logger.info(f' {len(v)} {k} file(s)') return file_dict +def get_survey_file_dict(url): + file_list = list_files_from_gage_data(url) + return list_file_counts_by_type(file_list) + +def list_survey_files(network: str, + station: str, + survey: str) -> list: + url = generate_gage_data_survey_url(network, station, survey) + file_list = list_files_from_gage_data(url) + file_list += list_files_from_gage_data(f'{url}/ctd') + list_file_counts_by_type(file_list=file_list, url=url) + return file_list + if __name__ == "__main__": # Example usage