Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions aws/lambda/clickhouse-replicator-s3/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,112 @@ def encode_url_component(url):
return urllib.parse.quote(url)


def handle_test_run_s3_small(table, bucket, key) -> List[Dict[str, Any]]:
def clean_up_query(query):
return " ".join([line.strip() for line in query.split("\n")])

def get_sys_err_out_parser(name):
# system-err and system-out generally have either the format:
# Tuple(text String) or Array(Tuple(text String))
# This function returns a query that will parse out the text field into an array of strings
return f"""
if(
JSONArrayLength(`{name}`) is null,
if(
JSONHas(`{name}`, 'text'),
array(JSONExtractString(`{name}`, 'text')),
[ ]
),
JSONExtractArrayRaw(JSON_QUERY(`{name}`, '$[*].text'))
) as `{name}`
"""

def get_skipped_failure_parser_helper(name, type, field_to_check_for_existence):
# skipped and failure generally have either the format:
# Tuple(stuff) or Array(Tuple(stuff)).
# The stuff varies. The type input should be the string `Tuple(stuff)`
# The field_to_check_for_existence is the field that is checked to see
# if the skip/rerun exists or if it should be an empty array. It is a
# dictionary key in the tuple
return f"""
if(
JSONArrayLength({name}) is null,
if(
JSONHas({name}, '{field_to_check_for_existence}'),
array(
JSONExtract(
{name},
'{type}'
)
),
[ ]
),
JSONExtract(
{name},
'Array({type})'
)
) as {name}
"""

# Cannot use general_adapter due to custom field for now()::DateTime64(9)
# time_inserted
query = f"""
insert into {table}
select
classname,
duration,
{get_skipped_failure_parser_helper('error', 'Tuple(type String, message String, text String)', 'message')},
{get_skipped_failure_parser_helper('failure', 'Tuple(type String, message String, text String)', 'message')},
file,
invoking_file,
job_id,
line::Int64,
name,
{get_skipped_failure_parser_helper('rerun', 'Tuple(message String, text String)', 'message')},
result,
{get_skipped_failure_parser_helper('skipped', 'Tuple(type String, message String, text String)', 'message')},
status,
time,
now()::DateTime64(9) as time_inserted,
workflow_id,
workflow_run_attempt,
('{bucket}', '{key}')
from
s3(
'https://{bucket}.s3.amazonaws.com/{encode_url_component(key)}',
'JSONEachRow',
'
`classname` String,
`duration` Float32,
`error` String,
`failure` String,
`file` String,
`invoking_file` String,
`job_id` Int64,
`line` Float32,
`name` String,
`properties` Tuple(property Tuple(name String, value String)),
`rerun` String,
`result` String,
`skipped` String,
`status` String,
`system-err` String,
`system-out` String,
`time` Float32,
`type_param` String,
`value_param` String,
`workflow_id` Int64,
`workflow_run_attempt` Int32',
'gzip'
)
"""
query = clean_up_query(query)
try:
get_clickhouse_client().query(query)
except Exception as e:
log_failure_to_clickhouse(table, bucket, key, e)


def handle_test_run_s3(table, bucket, key) -> List[Dict[str, Any]]:
def clean_up_query(query):
return " ".join([line.strip() for line in query.split("\n")])
Expand Down Expand Up @@ -585,12 +691,14 @@ def cloudwatch_metrics_adapter(table, bucket, key):
"disabled_tests_historical": "misc.disabled_tests_historical",
# fbossci-cloudwatch-metrics bucket
"ghci-related": "infra_metrics.cloudwatch_metrics",
"all_test_runs": "fortesting.all_test_runs",
}

OBJECT_CONVERTER = {
"default.merges": merges_adapter,
"default.test_run_s3": handle_test_run_s3,
"default.failed_test_runs": handle_test_run_s3,
"fortesting.all_test_runs": handle_test_run_s3_small,
"default.test_run_summary": handle_test_run_summary,
"default.merge_bases": merge_bases_adapter,
"default.rerun_disabled_tests": rerun_disabled_tests_adapter,
Expand Down