Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ boto3
botocore
numpy
pandas
scipy
julian
pyarrow
pandera
Expand Down
2 changes: 1 addition & 1 deletion src/es_sfgtools/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .temp import DataHandler,DATA_TYPE,FILE_TYPE,DataCatalog
from .temp import DataHandler,DATA_TYPE,FILE_TYPE,DataCatalog, SOURCE_MAP, TARGET_MAP
218 changes: 147 additions & 71 deletions src/es_sfgtools/pipeline/temp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import seaborn
import tqdm
from tqdm.autonotebook import tqdm
from multiprocessing import Pool, cpu_count
from functools import partial
import numpy as np
import warnings
import folium
import folium
import json
warnings.filterwarnings("ignore")
seaborn.set_theme(style="whitegrid")
Expand All @@ -36,11 +36,13 @@ class FILE_TYPE(Enum):
SEABIRD = "svpavg"
NOVATEL770 = "novatel770"
DFPO00 = "dfpo00"
OFFLOAD = "offload"

@classmethod
def to_schema(cls):
return [x.name for x in cls]


FILE_TYPES = [x.value for x in FILE_TYPE]

class DATA_TYPE(Enum):
Expand Down Expand Up @@ -170,6 +172,7 @@ def __init__(self,working_dir:Path) -> None:
self.catalog_data = DataCatalog.validate(pd.read_csv(self.catalog))
else:
self.catalog_data = pd.DataFrame()
logger.info(f"Data Handler initialized, data will be stored in {self.working_dir}")

def _get_timestamp(self,remote_prefix:str) -> pd.Timestamp:
"""
Expand All @@ -189,6 +192,21 @@ def _get_timestamp(self,remote_prefix:str) -> pd.Timestamp:
except:
return None

def get_local_counts(self):
try:
local_files = self.catalog_data[self.catalog_data['local_location'].notnull()]
data_type_counts = local_files.type.value_counts()
except (AttributeError, KeyError):
data_type_counts = pd.Series()
return data_type_counts

def get_dtype_counts(self):
try:
data_type_counts = self.catalog_data[self.catalog_data.type.isin(FILE_TYPES)].type.value_counts()
except AttributeError:
data_type_counts = "No data types found"
return data_type_counts

def add_campaign_data(self,
network: str,
station: str,
Expand All @@ -215,7 +233,7 @@ def add_campaign_data(self,
if file_type in file.replace("_", ""):
discovered_file_type = file_type
break

if discovered_file_type is None:
logger.error(f"File type not recognized for {file}")
continue
Expand Down Expand Up @@ -254,43 +272,22 @@ def add_campaign_data(self,
self.catalog_data = pd.concat([self.catalog_data, incoming_df])

self.catalog_data.to_csv(self.catalog,index=False)
# Get count of each data type in the catalog
data_type_counts = self.catalog_data[self.catalog_data.type.isin(FILE_TYPES)].type.value_counts()
return data_type_counts

def _download_https(self, remote_url: Path, destination_dir: Path, token_path='.'):
"""
Downloads a file from the specified https url on gage-data

Args:
remote_url (Path): The path of the file in the gage-data storage.
destination (Path): The local path where the file will be downloaded.

Returns:
bool: True if the file was downloaded successfully, False otherwise.
"""
try:
local_location = destination_dir / Path(remote_url).name
get_file_from_gage_data(url=remote_url,
dest_dir=destination_dir,
token_path=token_path)
logger.info(f"Downloaded {str(remote_url)} to {str(local_location)}")
return True
except Exception as e:
response = f"Error downloading {str(remote_url)} \n {e}"
response += "\n HINT: Check authentication credentials"
logger.error(response)
print(response)
return False

def download_campaign_data(self,network:str,station:str,survey:str,override:bool=False,from_s3:bool=False):
def download_campaign_data(self,
network:str,
station:str,
survey:str,
file_type: str,
override:bool=False,
from_s3:bool=False):
"""
Retrieves and catalogs data from the remote locations stored in the catalog.

Args:
network (str): The network name.
station (str): The station name.
survey (str): The survey name.
file_type (str): The type of file to download
override (bool): Whether to download the data even if it already exists
from_s3 (bool): Use S3 download functionality if remote resourses are in an s3 bucket

Expand All @@ -299,12 +296,19 @@ def download_campaign_data(self,network:str,station:str,survey:str,override:bool
"""
# TODO make multithreaded
# Find all entries in the catalog that match the params
local_counts = self.get_local_counts()
try:
local_files_of_type = local_counts[file_type]
except KeyError:
local_files_of_type = 0
logger.info(f"Data directory currently contains {local_files_of_type} files of type {file_type}")
entries = self.catalog_data[
(self.catalog_data.network == network)
& (self.catalog_data.station == station)
& (self.catalog_data.survey == survey)
& (self.catalog_data.type == file_type)
]

logger.info(f"Downloading {entries.shape[0]-local_files_of_type} missing files of type {file_type}")
if entries.shape[0] < 1:
raise Exception('No matching data found in catalog')
if from_s3:
Expand All @@ -324,7 +328,7 @@ def download_campaign_data(self,network:str,station:str,survey:str,override:bool
remote_url=entry.remote_prefix,
destination=local_location,
)
# Check if the entry is from an S3 location or gage/sage
# Check if the entry is from an S3 location or gage-data
else:
is_download = self._download_https(
remote_url=entry.remote_filepath, destination_dir=self.raw_dir
Expand All @@ -340,9 +344,9 @@ def download_campaign_data(self,network:str,station:str,survey:str,override:bool
if count == 0:
response = f"No files downloaded"
logger.error(response)
print(response)

logger.info(f"Downloaded {count} files to {str(self.raw_dir)}")
#print(response)
else:
logger.info(f"Downloaded {count} files")

self.catalog_data.to_csv(self.catalog,index=False)

Expand Down Expand Up @@ -412,30 +416,6 @@ def add_campaign_data_s3(self, network: str, station: str, survey: str, bucket:
data_type_counts = self.catalog_data[self.catalog_data.type.isin(FILE_TYPES)].type.value_counts()
return data_type_counts

def _download_boto(self, client: boto3.client, bucket: str, remote_url: Path, destination: Path):
"""
Downloads a file from the specified S3 bucket.

Args:
client (boto3.client): The Boto3 client object for S3.
bucket (str): The name of the S3 bucket.
remote_url (Path): The path of the file in the S3 bucket.
destination (Path): The local path where the file will be downloaded.

Returns:
bool: True if the file was downloaded successfully, False otherwise.
"""
try:
client.download_file(Bucket=bucket, Key=str(remote_url), Filename=str(destination))
logger.info(f"Downloaded {str(remote_url)} to {str(destination)}")
return True
except Exception as e:
response = f"Error downloading {str(remote_url)} \n {e}"
response += "\n HINT: $ aws sso login"
logger.error(response)
print(response)
return False

def download_campaign_data_s3(self,network:str,station:str,survey:str,override:bool=False):
"""
Retrieves and catalogs data from the s3 locations stored in the catalog.
Expand Down Expand Up @@ -480,6 +460,55 @@ def download_campaign_data_s3(self,network:str,station:str,survey:str,override:b
logger.info(f"Downloaded {count} files to {str(self.raw_dir)}")

self.catalog_data.to_csv(self.catalog,index=False)

def _download_https(self, remote_url: Path, destination_dir: Path, token_path='.'):
"""
Downloads a file from the specified https url on gage-data

Args:
remote_url (Path): The path of the file in the gage-data storage.
destination (Path): The local path where the file will be downloaded.

Returns:
bool: True if the file was downloaded successfully, False otherwise.
"""
try:
#local_location = destination_dir / Path(remote_url).name
get_file_from_gage_data(url=remote_url,
dest_dir=destination_dir,
token_path=token_path)
#logger.info(f"Downloaded {str(remote_url)} to {str(local_location)}")
return True
except Exception as e:
response = f"Error downloading {str(remote_url)} \n {e}"
response += "\n HINT: Check authentication credentials"
logger.error(response)
print(response)
return False

def _download_boto(self, client: boto3.client, bucket: str, remote_url: Path, destination: Path):
"""
Downloads a file from the specified S3 bucket.

Args:
client (boto3.client): The Boto3 client object for S3.
bucket (str): The name of the S3 bucket.
remote_url (Path): The path of the file in the S3 bucket.
destination (Path): The local path where the file will be downloaded.

Returns:
bool: True if the file was downloaded successfully, False otherwise.
"""
try:
client.download_file(Bucket=bucket, Key=str(remote_url), Filename=str(destination))
logger.info(f"Downloaded {str(remote_url)} to {str(destination)}")
return True
except Exception as e:
response = f"Error downloading {str(remote_url)} \n {e}"
response += "\n HINT: $ aws sso login"
logger.error(response)
print(response)
return False

def clear_raw_processed_data(self, network: str, station: str, survey: str):
"""
Expand Down Expand Up @@ -551,6 +580,45 @@ def clear_raw_processed_data(self, network: str, station: str, survey: str):

pbar.close()

def add_entry(self, entry: dict):
"""
Add an entry in the catalog. This may result in duplicates, which need to be cleaned up via
consolidate_entries()

Args:
entry (dict): The new entry.

Returns:
None
"""
with self.catalog.open("r") as f:
keys=list(f.readline().rstrip().split(','))
entry_str = "\n"
for key in keys:
if key in entry:
entry_str += f"{str(entry[key])}"
if key != keys[-1]:
entry_str += ","

with self.catalog.open("a") as f:
print(entry_str)
f.write(entry_str)

def consolidate_entries(self):
"""
Remove any duplicate entries, keeping the most complete.

Args:
None
Returns:
None
"""
df = pd.read_csv(str(self.catalog))
df['count'] = pd.isnull(df).sum(1)
df=df.sort_values(['count']).drop_duplicates(subset=['uuid'],keep='first').drop(labels='count',axis=1)
df=df.sort_index()
df.to_csv(self.catalog,index=False)

def update_entry(self,entry:dict):
"""
Replace an entry in the catalog with a new entry.
Expand Down Expand Up @@ -599,23 +667,24 @@ def get_parent_stack(
parents: List[Union[FILE_TYPE, DATA_TYPE]] = SOURCE_MAP.get(
stack[pointer], []
)
while parents:
parent = parents.pop()
#while parents:
# parent = parents.pop()
for parent in parents:
stack.append(parent)
pointer += 1
return stack[::-1]

def _process_targeted(
self, parent: dict, child_type: Union[FILE_TYPE, DATA_TYPE]
) -> dict:

#TODO: implement multithreaded logging, had to switch to print statement below
if isinstance(parent, dict):
parent = pd.Series(parent)

# if parent.processed:
# return None
logger.info(
f"Attemping to process {parent.uuid} of Type {parent.type} to {child_type.value}"
print(
f"Attemping to process {os.path.basename(parent.local_location)} ({parent.uuid}) of Type {parent.type} to {child_type.value}"
)
child_map = TARGET_MAP.get(FILE_TYPE(parent.type))
if child_map is None:
Expand Down Expand Up @@ -688,7 +757,10 @@ def _process_targeted(
"source_uuid": parent.uuid,
"processed": is_processed,
}
logger.info(f"\n Successful Processing: \n{str(processed_meta)}")
print(f"Successful Processing: {str(processed_meta)}")
if is_processed == True:
self.update_entry(processed_meta)
#self.add_entry(processed_meta)
return processed_meta

def _process_data_link(self,
Expand Down Expand Up @@ -743,7 +815,7 @@ def _process_data_link(self,

meta_data_list = []
with Pool(processes=cpu_count()) as pool:
for meta_data in tqdm.tqdm(
for meta_data in tqdm(
pool.map(
process_func_partial,
parent_entries_to_process.to_dict(orient="records"),
Expand All @@ -753,6 +825,7 @@ def _process_data_link(self,
):
if meta_data is not None:
self.update_entry(meta_data)
#self.add_entry(meta_data)
meta_data_list.append(meta_data)

parent_entries_processed = parent_entries_to_process[
Expand All @@ -761,18 +834,21 @@ def _process_data_link(self,
)
]
logger.info(f"Processed {len(meta_data_list)} Out of {parent_entries_processed.shape[0]} For {target.value} Files from {parent_entries_processed.shape[0]} Parent Files")
self.consolidate_entries()
return parent_entries_processed

def _process_data_graph(self, network: str, station: str, survey: str,child_type:Union[FILE_TYPE,DATA_TYPE],override:bool=False):
processing_queue = self.get_parent_stack(child_type=child_type)

#logger.info(f"processing queue: {processing_queue}")
while processing_queue:
parent = processing_queue.pop(0)
#logger.info(f"parent: {parent}")
children:dict = TARGET_MAP.get(parent,{})
#logger.info(f"children: {children}")
children_to_process = [k for k in children.keys() if k in processing_queue]

#logger.info(f"children to process: {children_to_process}")
for child in children_to_process:

#logger.info(f"child:{child}")
processed_parents:pd.DataFrame = self._process_data_link(network,station,survey,target=child,source=[parent],override=override)
# Check if all children of this parent have been processed
if processed_parents is not None:
Expand Down
Loading