Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix export cur #1083

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 31 additions & 26 deletions cid/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,7 @@ def escape_id(id_):
def choose_analysis(qs):
""" Choose analysis """
try:
analyzes = []
logger.info("Discovering analyses")
paginator = qs.client.get_paginator('list_analyses')
response_iterator = paginator.paginate(
AwsAccountId=qs.account_id,
PaginationConfig={'MaxItems': 100}
)
for page in response_iterator:
analyzes.extend(page.get('AnalysisSummaryList'))
if len(analyzes) == 100:
logger.info('Too many analyses. Will show first 100')
analyzes = qs.client.get_paginator('list_analyses').paginate(AwsAccountId=qs.account_id).search('AnalysisSummaryList')
except qs.client.exceptions.AccessDeniedException:
logger.info("AccessDeniedException while discovering analyses")
return None
Expand Down Expand Up @@ -147,7 +137,7 @@ def export_analysis(qs, athena, glue):
"ImportMode": dataset.raw['ImportMode'],
}

for key, value in dataset_data['PhysicalTableMap'].items():
for key, value in dataset_data['PhysicalTableMap'].items(): # iterate all sub tables
if 'RelationalTable' in value \
and 'DataSourceArn' in value['RelationalTable'] \
and 'Schema' in value['RelationalTable']:
Expand Down Expand Up @@ -183,22 +173,26 @@ def export_analysis(qs, athena, glue):
#FIXME add value['Source']['DataSetArn'] to the list of dataset_arn
raise CidCritical(f"DataSet {dataset.raw['Name']} contains unsupported join. Please replace join of {value.get('Alias')} from DataSet to DataSource")

# Checking if datasets is based on CUR. It is rather are rare case as typically views depend on CUR.
cur_version = False
cur_fields = None
for dep_view in dependency_views[:]:
version = cur_helper.table_is_cur(name=dep_view)
if version:
dependency_views.remove(dep_view)
cur_version = True
cur_version = version
cur_fields = True # FIXME: check the list of fields in datasets and fields in cur. Put a list instead of 'cur2: True'

datasets[dataset_name] = {
'data': dataset_data,
'dependsOn': {'views': dependency_views},
'schedules': ['default'], #FIXME: need to read a real schedule
}
# FIXME: add a list of all columns used in the view

if cur_version == '1':
datasets[dataset_name]['dependsOn']['cur'] = True
datasets[dataset_name]['dependsOn']['cur'] = cur_fields
elif cur_version == '2':
datasets[dataset_name]['dependsOn']['cur2'] = True
datasets[dataset_name]['dependsOn']['cur2'] = cur_fields

all_views = [view_and_database[0] for view_and_database in all_views_and_databases]
all_databases = [view_and_database[1] for view_and_database in all_views_and_databases]
Expand Down Expand Up @@ -227,19 +221,30 @@ def export_analysis(qs, athena, glue):
deps = view_data.get('dependsOn', {})
non_cur_dep_views = []
for dep_view in deps.get('views', []):
dep_view_name = dep_view.split('.')[-1]
if dep_view_name in cur_tables or cur_helper.table_is_cur(name=dep_view_name):
cur_version = cur_helper.table_is_cur(name=dep_view_name)
logger.debug(f'{dep_view_name} is cur')
view_data['dependsOn']['cur'] = True
if '.' in dep_view:
dep_view_name = dep_view.split('.')[1].replace('"','')
dep_view_database = dep_view.split('.')[0].replace('"','')
else:
dep_view_name = dep_view
dep_view_database = athena.DatabaseName

if dep_view_name in cur_tables or cur_helper.table_is_cur(name=dep_view_name, database=dep_view_database):
cur_helper.set_cur(table=dep_view_name, database=dep_view_database)
cid_print(f' {dep_view_name} is CUR {cur_helper.version}')
# replace cur table name with a variable
if isinstance(view_data.get('data'), str):
# cur tables treated separately as we don't manage CUR table here
if dep_view_name != 'cur':
backslash = "\\" # workaround f-string limitation
view_data['data'] = view_data['data'].replace(f'{dep_view_name}', f'"${backslash}cur{cur_version}_database{backslash}"."${backslash}cur{cur_version}_table_name{backslash}"')
else:
pass # FIXME: this replace is too dangerous as cur can be a part of other words. Need to find some other way
cur_replacement = {
'2': ["${cur2_database}","${cur2_table_name}"],
'1': ["${cur_database}","${cur_table_name}"],
}[cur_helper.version]
view_data['data'] = re.sub(r'\b' + re.escape(dep_view) + r'\b', cur_replacement[1], view_data['data'])
view_data['data'] = re.sub(r'\b' + re.escape(dep_view_database) + r'\b', cur_replacement[0], view_data['data'])
fields = []
for field in cur_helper.fields:
if field in view_data['data']:
fields.append(field)
view_data['dependsOn'][f'cur{cur_helper.version}'] = fields or True
cur_tables.append(dep_view_name)
else:
logger.debug(f'{dep_view_name} is not cur')
Expand Down
20 changes: 13 additions & 7 deletions cid/helpers/cur.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ def ensure_column(self, column: str, column_type: str=None):
""" Ensure column is in the cur. If it is not there - add column """
pass

def table_is_cur(self, table: dict=None, name: str=None, return_reason: bool=False) -> bool:
def table_is_cur(self, table: dict=None, name: str=None, return_reason: bool=False, database: str=None) -> bool:
""" return cur version if table metadata fits CUR definition. """
try:
table = table or self.athena.get_table_metadata(name)
table = table or self.athena.get_table_metadata(name, database)
except Exception as exc: #pylint: disable=broad-exception-caught
logger.warning(exc)
return False if not return_reason else (False, f'cannot get table {name}. {exc}.')
Expand Down Expand Up @@ -201,8 +201,10 @@ def tag_and_cost_category_fields(self) -> list:
class CUR(AbstractCUR):
"""This Class represents CUR table (1 or 2 versions)"""

def __init__(self, athena, glue):
def __init__(self, athena, glue, database: str=None, table: str=None):
super().__init__(athena, glue)
if database and table:
self.set_cur(database, table)

@property
def metadata(self) -> dict:
Expand All @@ -213,12 +215,16 @@ def metadata(self) -> dict:
# good place to set a database for athena
return self._metadata

def find_cur(self):

def set_cur(self, database: str=None, table: str=None):
self._database, self._metadata = self.find_cur(database, table)

def find_cur(self, database: str=None, table: str=None):
"""Choose CUR"""
metadata = None
cur_database = get_parameters().get('cur-database')
if get_parameters().get('cur-table-name'):
table_name = get_parameters().get('cur-table-name')
cur_database = database or get_parameters().get('cur-database')
if table or get_parameters().get('cur-table-name'):
table_name = table or get_parameters().get('cur-table-name')
try:
metadata = self.athena.get_table_metadata(table_name, cur_database)
except self.athena.client.exceptions.MetadataException as exc:
Expand Down
Loading