1- from impc_etl .jobs .load .impc_web_api import (
2- ImpcConfig ,
3- PySparkTask ,
4- SparkContext ,
5- SparkSession ,
6- col ,
7- collect_set ,
8- explode_outer ,
9- luigi ,
10- phenotype_term_zip_udf ,
1+ import logging
2+ import textwrap
3+ from airflow .sdk import Variable , asset
4+
5+ from impc_etl .utils .airflow import create_input_asset , create_output_asset
6+ from impc_etl .utils .spark import with_spark_session
7+ from impc_etl .utils .impc_web_api import phenotype_term_zip_udf
8+
9+ task_logger = logging .getLogger ("airflow.task" )
10+ dr_tag = Variable .get ("data_release_tag" )
11+
12+ ortholog_mapping_report_tsv_path_asset = create_input_asset ("impc_web_api/mouse_human_ortholog_report.tsv" )
13+ mp_hp_matches_csv_path_asset = create_input_asset ("impc_web_api/mp_hp_matches.csv" )
14+ gene_stats_results_json_path_asset = create_input_asset ("output/impc_web_api/gene_statistical_results_service_json" )
15+
16+ batch_query_data_parquet_asset = create_output_asset ("impc_web_api/batch_query_data_parquet" )
17+
18+ @asset .multi (
19+ schedule = [ortholog_mapping_report_tsv_path_asset , mp_hp_matches_csv_path_asset , gene_stats_results_json_path_asset ],
20+ outlets = [batch_query_data_parquet_asset ],
21+ dag_id = f"{ dr_tag } _impc_batch_query_mapper" ,
22+ description = textwrap .dedent (
23+ """IMPC Web API batch query mapper DAG."""
24+ ),
25+ tags = ["impc_web_api" , "batch query" ],
1126)
12-
13-
14- class ImpcBatchQueryMapper (PySparkTask ):
15- """
16- PySpark Task class to parse GenTar Product report data.
17- """
18-
19- #: Name of the Spark task
20- name : str = "ImpcBatchQueryMapper"
21-
22- ortholog_mapping_report_tsv_path = luigi .Parameter ()
23- mp_hp_matches_csv_path = luigi .Parameter ()
24-
25- #: Path of the output directory where the new parquet file will be generated.
26- output_path : luigi .Parameter = luigi .Parameter ()
27-
28- def requires (self ):
29- return [ImpcGeneStatsResultsMapper ()]
30-
31- def output (self ):
32- """
33- Returns the full parquet path as an output for the Luigi Task
34- (e.g. impc/dr15.2/parquet/product_report_parquet)
35- """
36- return ImpcConfig ().get_target (
37- f"{ self .output_path } /impc_web_api/batch_query_data_parquet"
38- )
39-
40- def app_options (self ):
41- """
42- Generates the options pass to the PySpark job
43- """
44- return [
45- self .ortholog_mapping_report_tsv_path ,
46- self .mp_hp_matches_csv_path ,
47- self .input ()[0 ].path ,
48- self .output ().path ,
49- ]
50-
51- def main (self , sc : SparkContext , * args ):
52- """
53- Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job.
54- """
55- spark = SparkSession (sc )
56-
57- # Parsing app options
58- ortholog_mapping_report_tsv_path = args [0 ]
59- mp_hp_matches_csv_path = args [1 ]
60- gene_stats_results_json_path = args [2 ]
61- output_path = args [3 ]
62-
63- ortholog_mapping_df = spark .read .csv (
64- ortholog_mapping_report_tsv_path , sep = "\t " , header = True
65- )
66- stats_results = spark .read .json (gene_stats_results_json_path )
67-
68- ortholog_mapping_df = ortholog_mapping_df .select (
69- col ("Mgi Gene Acc Id" ).alias ("mgiGeneAccessionId" ),
70- col ("Human Gene Symbol" ).alias ("humanGeneSymbol" ),
71- col ("Hgnc Acc Id" ).alias ("hgncGeneAccessionId" ),
72- ).distinct ()
73-
74- stats_results = stats_results .join (
75- ortholog_mapping_df , "mgiGeneAccessionId" , how = "left_outer"
76- )
77-
78- mp_matches_df = spark .read .csv (mp_hp_matches_csv_path , header = True )
79- mp_matches_df = mp_matches_df .select (
80- col ("curie_x" ).alias ("id" ),
81- col ("curie_y" ).alias ("hp_term_id" ),
82- col ("label_y" ).alias ("hp_term_name" ),
83- ).distinct ()
84-
85- stats_mp_hp_df = stats_results .select (
86- "statisticalResultId" ,
87- "potentialPhenotypes" ,
88- "intermediatePhenotypes" ,
89- "topLevelPhenotypes" ,
90- "significantPhenotype" ,
27+ @with_spark_session
28+ def impc_batch_query_mapper ():
29+ from pyspark .sql import SparkSession
30+ from pyspark .sql .functions import col , explode_outer , collect_set , when , struct , lit
31+
32+ spark = SparkSession .builder .getOrCreate ()
33+
34+ ortholog_mapping_report_tsv_path = ortholog_mapping_report_tsv_path_asset .uri
35+ mp_hp_matches_csv_path = mp_hp_matches_csv_path_asset .uri
36+ gene_stats_results_json_path = gene_stats_results_json_path_asset .uri
37+ output_path = batch_query_data_parquet_asset .uri
38+
39+ ortholog_mapping_df = spark .read .csv (
40+ ortholog_mapping_report_tsv_path , sep = "\t " , header = True
41+ )
42+ stats_results = spark .read .json (gene_stats_results_json_path )
43+
44+ ortholog_mapping_df = ortholog_mapping_df .select (
45+ col ("Mgi Gene Acc Id" ).alias ("mgiGeneAccessionId" ),
46+ col ("Human Gene Symbol" ).alias ("humanGeneSymbol" ),
47+ col ("Hgnc Acc Id" ).alias ("hgncGeneAccessionId" ),
48+ ).distinct ()
49+
50+ stats_results = stats_results .join (
51+ ortholog_mapping_df , "mgiGeneAccessionId" , how = "left_outer"
52+ )
53+
54+ mp_matches_df = spark .read .csv (mp_hp_matches_csv_path , header = True )
55+ mp_matches_df = mp_matches_df .select (
56+ col ("curie_x" ).alias ("id" ),
57+ col ("curie_y" ).alias ("hp_term_id" ),
58+ col ("label_y" ).alias ("hp_term_name" ),
59+ ).distinct ()
60+
61+ stats_mp_hp_df = stats_results .select (
62+ "statisticalResultId" ,
63+ "potentialPhenotypes" ,
64+ "intermediatePhenotypes" ,
65+ "topLevelPhenotypes" ,
66+ "significantPhenotype" ,
67+ )
68+ for phenotype_list_col in [
69+ "potentialPhenotypes" ,
70+ "intermediatePhenotypes" ,
71+ "topLevelPhenotypes" ,
72+ ]:
73+ stats_mp_hp_df = stats_mp_hp_df .withColumn (
74+ phenotype_list_col [:- 1 ], explode_outer (phenotype_list_col )
9175 )
92- for phenotype_list_col in [
93- "potentialPhenotypes" ,
94- "intermediatePhenotypes" ,
95- "topLevelPhenotypes" ,
96- ]:
97- stats_mp_hp_df = stats_mp_hp_df .withColumn (
98- phenotype_list_col [:- 1 ], explode_outer (phenotype_list_col )
99- )
10076
101- stats_mp_hp_df = stats_mp_hp_df .join (
102- mp_matches_df ,
103- (
77+ stats_mp_hp_df = stats_mp_hp_df .join (
78+ mp_matches_df ,
79+ (
10480 (col ("significantPhenotype.id" ) == col ("id" ))
10581 | (col ("potentialPhenotype.id" ) == col ("id" ))
10682 | (col ("intermediatePhenotype.id" ) == col ("id" ))
10783 | (col ("topLevelPhenotype.id" ) == col ("id" ))
108- ),
109- how = "left_outer" ,
110- )
111- stats_mp_hp_df = stats_mp_hp_df .withColumn (
112- "humanPhenotype" ,
113- phenotype_term_zip_udf (col ("hp_term_id" ), col ("hp_term_name" )),
114- )
115- stats_mp_hp_df = (
116- stats_mp_hp_df .groupBy ("statisticalResultId" )
117- .agg (collect_set ("humanPhenotype" ).alias ("humanPhenotypes" ))
118- .select ("statisticalResultId" , "humanPhenotypes" )
119- .distinct ()
120- )
121-
122- stats_results = stats_results .join (stats_mp_hp_df , "statisticalResultId" )
123-
124- stats_results .write .parquet (output_path )
84+ ),
85+ how = "left_outer" ,
86+ )
87+ stats_mp_hp_df = stats_mp_hp_df .withColumn (
88+ "humanPhenotype" ,
89+ phenotype_term_zip_udf (col ("hp_term_id" ), col ("hp_term_name" )),
90+ )
91+ stats_mp_hp_df = (
92+ stats_mp_hp_df .groupBy ("statisticalResultId" )
93+ .agg (collect_set ("humanPhenotype" ).alias ("humanPhenotypes" ))
94+ .select ("statisticalResultId" , "humanPhenotypes" )
95+ .distinct ()
96+ )
97+
98+ stats_results = stats_results .join (stats_mp_hp_df , "statisticalResultId" )
99+
100+ stats_results .coalesce ( 100 ). write .parquet (output_path , mode = "overwrite" )
0 commit comments