diff --git a/aws/lambda/clickhouse-replicator-s3/lambda_function.py b/aws/lambda/clickhouse-replicator-s3/lambda_function.py index 025b95f6bb..30e7af2184 100644 --- a/aws/lambda/clickhouse-replicator-s3/lambda_function.py +++ b/aws/lambda/clickhouse-replicator-s3/lambda_function.py @@ -54,6 +54,112 @@ def encode_url_component(url): return urllib.parse.quote(url) +def handle_test_run_s3_small(table, bucket, key) -> List[Dict[str, Any]]: + def clean_up_query(query): + return " ".join([line.strip() for line in query.split("\n")]) + + def get_sys_err_out_parser(name): + # system-err and system-out generally have either the format: + # Tuple(text String) or Array(Tuple(text String)) + # This function returns a query that will parse out the text field into an array of strings + return f""" + if( + JSONArrayLength(`{name}`) is null, + if( + JSONHas(`{name}`, 'text'), + array(JSONExtractString(`{name}`, 'text')), + [ ] + ), + JSONExtractArrayRaw(JSON_QUERY(`{name}`, '$[*].text')) + ) as `{name}` + """ + + def get_skipped_failure_parser_helper(name, type, field_to_check_for_existence): + # skipped and failure generally have either the format: + # Tuple(stuff) or Array(Tuple(stuff)). + # The stuff varies. The type input should be the string `Tuple(stuff)` + # The field_to_check_for_existence is the field that is checked to see + # if the skip/rerun exists or if it should be an empty array. It is a + # dictionary key in the tuple + return f""" + if( + JSONArrayLength({name}) is null, + if( + JSONHas({name}, '{field_to_check_for_existence}'), + array( + JSONExtract( + {name}, + '{type}' + ) + ), + [ ] + ), + JSONExtract( + {name}, + 'Array({type})' + ) + ) as {name} + """ + + # Cannot use general_adapter due to custom field for now()::DateTime64(9) + # time_inserted + query = f""" + insert into {table} + select + classname, + duration, + {get_skipped_failure_parser_helper('error', 'Tuple(type String, message String, text String)', 'message')}, + {get_skipped_failure_parser_helper('failure', 'Tuple(type String, message String, text String)', 'message')}, + file, + invoking_file, + job_id, + line::Int64, + name, + {get_skipped_failure_parser_helper('rerun', 'Tuple(message String, text String)', 'message')}, + result, + {get_skipped_failure_parser_helper('skipped', 'Tuple(type String, message String, text String)', 'message')}, + status, + time, + now()::DateTime64(9) as time_inserted, + workflow_id, + workflow_run_attempt, + ('{bucket}', '{key}') + from + s3( + 'https://{bucket}.s3.amazonaws.com/{encode_url_component(key)}', + 'JSONEachRow', + ' + `classname` String, + `duration` Float32, + `error` String, + `failure` String, + `file` String, + `invoking_file` String, + `job_id` Int64, + `line` Float32, + `name` String, + `properties` Tuple(property Tuple(name String, value String)), + `rerun` String, + `result` String, + `skipped` String, + `status` String, + `system-err` String, + `system-out` String, + `time` Float32, + `type_param` String, + `value_param` String, + `workflow_id` Int64, + `workflow_run_attempt` Int32', + 'gzip' + ) + """ + query = clean_up_query(query) + try: + get_clickhouse_client().query(query) + except Exception as e: + log_failure_to_clickhouse(table, bucket, key, e) + + def handle_test_run_s3(table, bucket, key) -> List[Dict[str, Any]]: def clean_up_query(query): return " ".join([line.strip() for line in query.split("\n")]) @@ -585,12 +691,14 @@ def cloudwatch_metrics_adapter(table, bucket, key): "disabled_tests_historical": "misc.disabled_tests_historical", # fbossci-cloudwatch-metrics bucket "ghci-related": "infra_metrics.cloudwatch_metrics", + "all_test_runs": "fortesting.all_test_runs", } OBJECT_CONVERTER = { "default.merges": merges_adapter, "default.test_run_s3": handle_test_run_s3, "default.failed_test_runs": handle_test_run_s3, + "fortesting.all_test_runs": handle_test_run_s3_small, "default.test_run_summary": handle_test_run_summary, "default.merge_bases": merge_bases_adapter, "default.rerun_disabled_tests": rerun_disabled_tests_adapter,