From 3d0af689de709b5f3978ee0073d91af8a77b3a70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Thu, 18 Sep 2025 13:09:55 +0100 Subject: [PATCH 01/14] feat: added impc_web_api utils module --- impc_etl/utils/impc_web_api.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 impc_etl/utils/impc_web_api.py diff --git a/impc_etl/utils/impc_web_api.py b/impc_etl/utils/impc_web_api.py new file mode 100644 index 00000000..a5522a35 --- /dev/null +++ b/impc_etl/utils/impc_web_api.py @@ -0,0 +1,28 @@ + +GENE_SUMMARY_MAPPINGS = { + "mgi_accession_id": "mgiGeneAccessionId", + "marker_symbol": "geneSymbol", + "marker_name": "geneName", + "marker_synonym": "synonyms", + "significant_top_level_mp_terms": "significantTopLevelPhenotypes", + "not_significant_top_level_mp_terms": "notSignificantTopLevelPhenotypes", + "embryo_data_available": "hasEmbryoImagingData", + "human_gene_symbol": "human_gene_symbols", + "human_symbol_synonym": "human_symbol_synonyms", + "production_centre": "production_centres", + "phenotyping_centre": "phenotyping_centres", + "allele_name": "allele_names", + "ensembl_gene_id": "ensembl_gene_ids", +} + +def to_camel_case(snake_str): + components = snake_str.split("_") + # We capitalize the first letter of each component except the first one + # with the 'title' method and join them together. + return components[0] + "".join(x.title() for x in components[1:]) + +def phenotype_term_zip_udf(x, y): + from pyspark.sql.functions import lit, struct, when + return when(x.isNotNull(), struct(x.alias("id"), y.alias("name"))).otherwise( + lit(None) + ) \ No newline at end of file From e755ed4eedd13d6c2512d8d5cef5f5e5f218601b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Mon, 15 Sep 2025 12:01:54 +0100 Subject: [PATCH 02/14] WIP: gene disease mapper task migration to Airflow --- .../impc_web_api/impc_gene_diseases_mapper.py | 217 ++++++++---------- 1 file changed, 98 insertions(+), 119 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py index 888b0e28..a7b0d544 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py @@ -1,130 +1,109 @@ from impc_etl.jobs.load.impc_web_api import ( - BooleanType, - DoubleType, - ImpcConfig, - IntegerType, - PySparkTask, - SparkContext, - SparkSession, Window, - col, - desc, - luigi, - row_number, - to_camel_case, ) - -class ImpcGeneDiseasesMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ - - #: Name of the Spark task - name: str = "ImpcGeneDiseasesMapper" - - #: Path to the CSV gene disease association report - disease_model_summary_csv_path = luigi.Parameter() - - #: Path to the CSV gene disease association report - mouse_model_phenodigm_csv_path = luigi.Parameter() - - #: Path to the CSV gene disease association report - disease_phenodigm_csv_path = luigi.Parameter() - - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() - - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/gene_diseases_service_json" - ) - - def app_options(self): - """ - Generates the options pass to the PySpark job - """ - return [ - self.disease_model_summary_csv_path, - self.mouse_model_phenodigm_csv_path, - self.disease_phenodigm_csv_path, - self.output().path, - ] - - def main(self, sc: SparkContext, *args): - """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. - """ - spark = SparkSession(sc) - - # Parsing app options - disease_model_summary_csv_path = args[0] - mouse_model_phenodigm_csv_path = args[1] # model_id, model_phenotypes - disease_phenodigm_csv_path = args[2] # disease_id, disease_phenotypes - output_path = args[3] - - disease_df = spark.read.csv(disease_model_summary_csv_path, header=True).drop( - "disease_phenotypes", "model_phenotypes" - ) - mouse_model_df = spark.read.csv( - mouse_model_phenodigm_csv_path, header=True - ).select("model_id", "model_phenotypes") - disease_phenodigm_df = spark.read.csv( - disease_phenodigm_csv_path, header=True - ).select("disease_id", "disease_phenotypes") - - disease_df = disease_df.withColumn( - "phenodigm_score", - (col("disease_model_avg_norm") + col("disease_model_max_norm")) / 2, - ) - - disease_df = disease_df.join(disease_phenodigm_df, "disease_id", "left_outer") - disease_df = disease_df.join(mouse_model_df, "model_id", "left_outer") - - window_spec = Window.partitionBy("disease_id", "marker_id").orderBy( - col("phenodigm_score").desc() - ) - - max_disease_df = disease_df.withColumn( - "row_number", row_number().over(window_spec) - ) - - max_disease_df = max_disease_df.withColumn( - "isMaxPhenodigmScore", col("row_number") == 1 - ).drop("row_number") - +import logging +import textwrap +from airflow.sdk import Variable, asset + +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +disease_model_summary_csv_path_asset = create_input_asset("impc_web_api/disease_model_summary.csv") +mouse_model_phenodigm_csv_path_asset = create_input_asset("impc_web_api/mouse_model_phenodigm.csv") +disease_phenodigm_csv_path_asset = create_input_asset("impc_web_api/disease_phenodigm.csv") +gene_diseases_service_json_asset = create_output_asset("impc_web_api/gene_diseases_service_json") + +@asset.multi( + schedule=[disease_model_summary_csv_path_asset, mouse_model_phenodigm_csv_path_asset, disease_phenodigm_csv_path_asset], + outlets=[gene_diseases_service_json_asset], + dag_id=f"{dr_tag}_impc_gene_diseases_mapper", + description=textwrap.dedent( + """IMPC Web API gene diseases mapper DAG.""" + ), + tags=["impc_web_api", "diseases"], +) +@with_spark_session +def impc_gene_diseases_mapper(): + from pyspark.sql import SparkSession, Window + from pyspark.sql.types import BooleanType, DoubleType, IntegerType + from pyspark.sql.functions import col, row_number + + def to_camel_case(snake_str): + components = snake_str.split("_") + # We capitalize the first letter of each component except the first one + # with the 'title' method and join them together. + return components[0] + "".join(x.title() for x in components[1:]) + + spark = SparkSession.builder.getOrCreate() + + # Parsing app options + disease_model_summary_csv_path = disease_model_summary_csv_path_asset.uri + mouse_model_phenodigm_csv_path = mouse_model_phenodigm_csv_path_asset.uri # model_id, model_phenotypes + disease_phenodigm_csv_path = disease_phenodigm_csv_path_asset.uri # disease_id, disease_phenotypes + output_path = gene_diseases_service_json_asset.uri + + disease_df = spark.read.csv(disease_model_summary_csv_path, header=True).drop( + "disease_phenotypes", "model_phenotypes" + ) + mouse_model_df = spark.read.csv( + mouse_model_phenodigm_csv_path, header=True + ).select("model_id", "model_phenotypes") + disease_phenodigm_df = spark.read.csv( + disease_phenodigm_csv_path, header=True + ).select("disease_id", "disease_phenotypes") + + disease_df = disease_df.withColumn( + "phenodigm_score", + (col("disease_model_avg_norm") + col("disease_model_max_norm")) / 2, + ) + + disease_df = disease_df.join(disease_phenodigm_df, "disease_id", "left_outer") + disease_df = disease_df.join(mouse_model_df, "model_id", "left_outer") + + window_spec = Window.partitionBy("disease_id", "marker_id").orderBy( + col("phenodigm_score").desc() + ) + + max_disease_df = disease_df.withColumn( + "row_number", row_number().over(window_spec) + ) + + max_disease_df = max_disease_df.withColumn( + "isMaxPhenodigmScore", col("row_number") == 1 + ).drop("row_number") + + max_disease_df = max_disease_df.withColumnRenamed( + "marker_id", "mgiGeneAccessionId" + ) + + for col_name in max_disease_df.columns: max_disease_df = max_disease_df.withColumnRenamed( - "marker_id", "mgiGeneAccessionId" + col_name, to_camel_case(col_name) ) - for col_name in max_disease_df.columns: - max_disease_df = max_disease_df.withColumnRenamed( - col_name, to_camel_case(col_name) - ) - - double_cols = [ - "diseaseModelAvgNorm", - "diseaseModelAvgRaw", - "diseaseModelMaxRaw", - "diseaseModelMaxNorm", - ] - - for col_name in double_cols: - max_disease_df = max_disease_df.withColumn( - col_name, col(col_name).astype(DoubleType()) - ) + double_cols = [ + "diseaseModelAvgNorm", + "diseaseModelAvgRaw", + "diseaseModelMaxRaw", + "diseaseModelMaxNorm", + ] + for col_name in double_cols: max_disease_df = max_disease_df.withColumn( - "markerNumModels", col("markerNumModels").astype(IntegerType()) - ) - max_disease_df = max_disease_df.withColumn( - "associationCurated", col("associationCurated").astype(BooleanType()) + col_name, col(col_name).astype(DoubleType()) ) - max_disease_df.repartition(500).write.option("ignoreNullFields", "false").json( - output_path - ) \ No newline at end of file + max_disease_df = max_disease_df.withColumn( + "markerNumModels", col("markerNumModels").astype(IntegerType()) + ) + max_disease_df = max_disease_df.withColumn( + "associationCurated", col("associationCurated").astype(BooleanType()) + ) + + max_disease_df.repartition(500).write.option("ignoreNullFields", "false").json( + output_path + ) \ No newline at end of file From 77215fb199b7f174a28ba9a972bd16a1668edb6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Mon, 15 Sep 2025 13:15:58 +0100 Subject: [PATCH 03/14] feat: remove import from local module --- impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py index a7b0d544..ff1cd3a8 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py @@ -1,7 +1,3 @@ -from impc_etl.jobs.load.impc_web_api import ( - Window, -) - import logging import textwrap from airflow.sdk import Variable, asset From ec7b0e452b232860bf94f96a5642bd1003b666e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Mon, 15 Sep 2025 15:12:13 +0100 Subject: [PATCH 04/14] fix: explicitly cast disease_model_avg_norm and disease_model_max_norm columns as Double --- impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py index ff1cd3a8..94676915 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py @@ -54,7 +54,7 @@ def to_camel_case(snake_str): disease_df = disease_df.withColumn( "phenodigm_score", - (col("disease_model_avg_norm") + col("disease_model_max_norm")) / 2, + (col("disease_model_avg_norm").cast(DoubleType()) + col("disease_model_max_norm").cast(DoubleType())) / 2, ) disease_df = disease_df.join(disease_phenodigm_df, "disease_id", "left_outer") From 1804c51ceba5f594a67147d078d09b7ef7540dfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Tue, 16 Sep 2025 12:22:42 +0100 Subject: [PATCH 05/14] feat: replace repartition with coalesce --- impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py index 94676915..c286c61a 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py @@ -100,6 +100,6 @@ def to_camel_case(snake_str): "associationCurated", col("associationCurated").astype(BooleanType()) ) - max_disease_df.repartition(500).write.option("ignoreNullFields", "false").json( + max_disease_df.coalesce(100).write.option("ignoreNullFields", "false").json( output_path ) \ No newline at end of file From b47898469da26a178439aecfcceb62f2587cf7d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Thu, 18 Sep 2025 14:45:32 +0100 Subject: [PATCH 06/14] feat: remove to_camel_case function from impc_gene_diseases_mapper task, use shared one --- .../jobs/load/impc_web_api/impc_gene_diseases_mapper.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py index c286c61a..87dd658c 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py @@ -4,6 +4,7 @@ from impc_etl.utils.airflow import create_input_asset, create_output_asset from impc_etl.utils.spark import with_spark_session +from impc_etl.utils.impc_web_api import to_camel_case task_logger = logging.getLogger("airflow.task") dr_tag = Variable.get("data_release_tag") @@ -28,12 +29,6 @@ def impc_gene_diseases_mapper(): from pyspark.sql.types import BooleanType, DoubleType, IntegerType from pyspark.sql.functions import col, row_number - def to_camel_case(snake_str): - components = snake_str.split("_") - # We capitalize the first letter of each component except the first one - # with the 'title' method and join them together. - return components[0] + "".join(x.title() for x in components[1:]) - spark = SparkSession.builder.getOrCreate() # Parsing app options From d9797f23ee34aefd3384212395745f5cb5e26f34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Wed, 17 Sep 2025 09:59:30 +0100 Subject: [PATCH 07/14] feat: initial impc_batch_query mapper task --- .../impc_web_api/impc_batch_query_mapper.py | 210 ++++++++---------- 1 file changed, 95 insertions(+), 115 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py index 409727bb..fa3e68f7 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py @@ -1,124 +1,104 @@ -from impc_etl.jobs.load.impc_web_api import ( - ImpcConfig, - PySparkTask, - SparkContext, - SparkSession, - col, - collect_set, - explode_outer, - luigi, - phenotype_term_zip_udf, +import logging +import textwrap +from airflow.sdk import Variable, asset + +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +ortholog_mapping_report_tsv_path_asset = create_input_asset("impc_web_api/mouse_human_ortholog_report.tsv") +mp_hp_matches_csv_path_asset = create_input_asset("impc_web_api/mp_hp_matches.csv") +gene_stats_results_json_path_asset = create_input_asset("output/impc_web_api/gene_statistical_results_service_json") + +batch_query_data_parquet_asset = create_output_asset("impc_web_api/batch_query_data_parquet") + +@asset.multi( + schedule=[ortholog_mapping_report_tsv_path_asset, mp_hp_matches_csv_path_asset, gene_stats_results_json_path_asset], + outlets=[batch_query_data_parquet_asset], + dag_id=f"{dr_tag}_impc_batch_query_mapper", + description=textwrap.dedent( + """IMPC Web API batch query mapper DAG.""" + ), + tags=["impc_web_api", "batch query"], ) - - -class ImpcBatchQueryMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ - - #: Name of the Spark task - name: str = "ImpcBatchQueryMapper" - - ortholog_mapping_report_tsv_path = luigi.Parameter() - mp_hp_matches_csv_path = luigi.Parameter() - - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() - - def requires(self): - return [ImpcGeneStatsResultsMapper()] - - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/batch_query_data_parquet" - ) - - def app_options(self): - """ - Generates the options pass to the PySpark job - """ - return [ - self.ortholog_mapping_report_tsv_path, - self.mp_hp_matches_csv_path, - self.input()[0].path, - self.output().path, - ] - - def main(self, sc: SparkContext, *args): - """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. - """ - spark = SparkSession(sc) - - # Parsing app options - ortholog_mapping_report_tsv_path = args[0] - mp_hp_matches_csv_path = args[1] - gene_stats_results_json_path = args[2] - output_path = args[3] - - ortholog_mapping_df = spark.read.csv( - ortholog_mapping_report_tsv_path, sep="\t", header=True +@with_spark_session +def impc_batch_query_mapper(): + from pyspark.sql import SparkSession + from pyspark.sql.functions import col, explode_outer, collect_set, when, struct, lit + + def phenotype_term_zip_udf(x, y): + return when(x.isNotNull(), struct(x.alias("id"), y.alias("name"))).otherwise( + lit(None) ) - stats_results = spark.read.json(gene_stats_results_json_path) - ortholog_mapping_df = ortholog_mapping_df.select( - col("Mgi Gene Acc Id").alias("mgiGeneAccessionId"), - col("Human Gene Symbol").alias("humanGeneSymbol"), - col("Hgnc Acc Id").alias("hgncGeneAccessionId"), - ).distinct() - - stats_results = stats_results.join( - ortholog_mapping_df, "mgiGeneAccessionId", how="left_outer" + spark = SparkSession.builder.getOrCreate() + + ortholog_mapping_report_tsv_path = ortholog_mapping_report_tsv_path_asset.uri + mp_hp_matches_csv_path = mp_hp_matches_csv_path_asset.uri + gene_stats_results_json_path = gene_stats_results_json_path_asset.uri + output_path = batch_query_data_parquet_asset.uri + + ortholog_mapping_df = spark.read.csv( + ortholog_mapping_report_tsv_path, sep="\t", header=True + ) + stats_results = spark.read.json(gene_stats_results_json_path) + + ortholog_mapping_df = ortholog_mapping_df.select( + col("Mgi Gene Acc Id").alias("mgiGeneAccessionId"), + col("Human Gene Symbol").alias("humanGeneSymbol"), + col("Hgnc Acc Id").alias("hgncGeneAccessionId"), + ).distinct() + + stats_results = stats_results.join( + ortholog_mapping_df, "mgiGeneAccessionId", how="left_outer" + ) + + mp_matches_df = spark.read.csv(mp_hp_matches_csv_path, header=True) + mp_matches_df = mp_matches_df.select( + col("curie_x").alias("id"), + col("curie_y").alias("hp_term_id"), + col("label_y").alias("hp_term_name"), + ).distinct() + + stats_mp_hp_df = stats_results.select( + "statisticalResultId", + "potentialPhenotypes", + "intermediatePhenotypes", + "topLevelPhenotypes", + "significantPhenotype", + ) + for phenotype_list_col in [ + "potentialPhenotypes", + "intermediatePhenotypes", + "topLevelPhenotypes", + ]: + stats_mp_hp_df = stats_mp_hp_df.withColumn( + phenotype_list_col[:-1], explode_outer(phenotype_list_col) ) - mp_matches_df = spark.read.csv(mp_hp_matches_csv_path, header=True) - mp_matches_df = mp_matches_df.select( - col("curie_x").alias("id"), - col("curie_y").alias("hp_term_id"), - col("label_y").alias("hp_term_name"), - ).distinct() - - stats_mp_hp_df = stats_results.select( - "statisticalResultId", - "potentialPhenotypes", - "intermediatePhenotypes", - "topLevelPhenotypes", - "significantPhenotype", - ) - for phenotype_list_col in [ - "potentialPhenotypes", - "intermediatePhenotypes", - "topLevelPhenotypes", - ]: - stats_mp_hp_df = stats_mp_hp_df.withColumn( - phenotype_list_col[:-1], explode_outer(phenotype_list_col) - ) - - stats_mp_hp_df = stats_mp_hp_df.join( - mp_matches_df, - ( + stats_mp_hp_df = stats_mp_hp_df.join( + mp_matches_df, + ( (col("significantPhenotype.id") == col("id")) | (col("potentialPhenotype.id") == col("id")) | (col("intermediatePhenotype.id") == col("id")) | (col("topLevelPhenotype.id") == col("id")) - ), - how="left_outer", - ) - stats_mp_hp_df = stats_mp_hp_df.withColumn( - "humanPhenotype", - phenotype_term_zip_udf(col("hp_term_id"), col("hp_term_name")), - ) - stats_mp_hp_df = ( - stats_mp_hp_df.groupBy("statisticalResultId") - .agg(collect_set("humanPhenotype").alias("humanPhenotypes")) - .select("statisticalResultId", "humanPhenotypes") - .distinct() - ) - - stats_results = stats_results.join(stats_mp_hp_df, "statisticalResultId") - - stats_results.write.parquet(output_path) \ No newline at end of file + ), + how="left_outer", + ) + stats_mp_hp_df = stats_mp_hp_df.withColumn( + "humanPhenotype", + phenotype_term_zip_udf(col("hp_term_id"), col("hp_term_name")), + ) + stats_mp_hp_df = ( + stats_mp_hp_df.groupBy("statisticalResultId") + .agg(collect_set("humanPhenotype").alias("humanPhenotypes")) + .select("statisticalResultId", "humanPhenotypes") + .distinct() + ) + + stats_results = stats_results.join(stats_mp_hp_df, "statisticalResultId") + + stats_results.coalesce(100).write.parquet(output_path, mode="overwrite") From a4ee4519fc2f894b17950c93af50c74ff452d90a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Thu, 18 Sep 2025 14:47:32 +0100 Subject: [PATCH 08/14] feat: use phenotype_term_zip_udf function from utils module --- impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py index fa3e68f7..f579a40e 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py @@ -4,6 +4,7 @@ from impc_etl.utils.airflow import create_input_asset, create_output_asset from impc_etl.utils.spark import with_spark_session +from impc_etl.utils.impc_web_api import phenotype_term_zip_udf task_logger = logging.getLogger("airflow.task") dr_tag = Variable.get("data_release_tag") @@ -28,11 +29,6 @@ def impc_batch_query_mapper(): from pyspark.sql import SparkSession from pyspark.sql.functions import col, explode_outer, collect_set, when, struct, lit - def phenotype_term_zip_udf(x, y): - return when(x.isNotNull(), struct(x.alias("id"), y.alias("name"))).otherwise( - lit(None) - ) - spark = SparkSession.builder.getOrCreate() ortholog_mapping_report_tsv_path = ortholog_mapping_report_tsv_path_asset.uri From 53ffc4d65f8ec4ae6a068508d5dec837e434b7bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Wed, 17 Sep 2025 12:54:55 +0100 Subject: [PATCH 09/14] feat: initial impc_gene_search_mapper airflow task --- .../impc_web_api/impc_gene_search_mapper.py | 134 +++++++++--------- 1 file changed, 67 insertions(+), 67 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py index b6ac0fde..cd162f6d 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py @@ -1,78 +1,78 @@ -from impc_etl.jobs.load.impc_web_api import ( - GENE_SUMMARY_MAPPINGS, - GeneLoader, - ImpcConfig, - PySparkTask, - SparkContext, - SparkSession, - luigi, - to_camel_case, -) - +import logging +import textwrap +from airflow.sdk import Variable, asset -class ImpcGeneSearchMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") - #: Name of the Spark task - name: str = "Impc_Gene_Search_Mapper" +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() +gene_parquet_path_asset = create_input_asset("output/gene_data_include_parquet") +gene_search_service_json_asset = create_output_asset("impc_web_api/gene_search_service_json") - def requires(self): - return GeneLoader() +@asset.multi( + schedule=[gene_parquet_path_asset], + outlets=[gene_search_service_json_asset], + dag_id=f"{dr_tag}_impc_gene_search_mapper", + description=textwrap.dedent( + """IMPC Web API gene search mapper DAG.""" + ), + tags=["impc_web_api", "gene search"], +) +@with_spark_session +def impc_gene_search_mapper(): + from pyspark.sql import SparkSession - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/gene_search_service_json" - ) + GENE_SUMMARY_MAPPINGS = { + "mgi_accession_id": "mgiGeneAccessionId", + "marker_symbol": "geneSymbol", + "marker_name": "geneName", + "marker_synonym": "synonyms", + "significant_top_level_mp_terms": "significantTopLevelPhenotypes", + "not_significant_top_level_mp_terms": "notSignificantTopLevelPhenotypes", + "embryo_data_available": "hasEmbryoImagingData", + "human_gene_symbol": "human_gene_symbols", + "human_symbol_synonym": "human_symbol_synonyms", + "production_centre": "production_centres", + "phenotyping_centre": "phenotyping_centres", + "allele_name": "allele_names", + "ensembl_gene_id": "ensembl_gene_ids", + } - def app_options(self): - """ - Generates the options pass to the PySpark job - """ - return [ - self.input().path, - self.output().path, - ] + def to_camel_case(snake_str): + components = snake_str.split("_") + # We capitalize the first letter of each component except the first one + # with the 'title' method and join them together. + return components[0] + "".join(x.title() for x in components[1:]) - def main(self, sc: SparkContext, *args): - """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. - """ - spark = SparkSession(sc) + spark = SparkSession.builder.getOrCreate() - # Parsing app options - gene_parquet_path = args[0] - gene_df = spark.read.parquet(gene_parquet_path) - output_path = args[1] + # Parsing app options + gene_parquet_path = gene_parquet_path_asset.uri + gene_df = spark.read.parquet(gene_parquet_path) + output_path = gene_search_service_json_asset.uri - for col_name in GENE_SUMMARY_MAPPINGS.keys(): - gene_df = gene_df.withColumnRenamed( - col_name, GENE_SUMMARY_MAPPINGS[col_name] - ) + for col_name in GENE_SUMMARY_MAPPINGS.keys(): + gene_df = gene_df.withColumnRenamed( + col_name, GENE_SUMMARY_MAPPINGS[col_name] + ) - for col_name in gene_df.columns: - gene_df = gene_df.withColumnRenamed(col_name, to_camel_case(col_name)) + for col_name in gene_df.columns: + gene_df = gene_df.withColumnRenamed(col_name, to_camel_case(col_name)) - gene_search_df = gene_df.select( - "mgiGeneAccessionId", - "geneName", - "geneSymbol", - "synonyms", - "humanGeneSymbols", - "humanSymbolSynonyms", - "esCellProductionStatus", - "mouseProductionStatus", - "phenotypeStatus", - "phenotypingDataAvailable", - ) - gene_search_df.repartition(1).write.option("ignoreNullFields", "false").json( - output_path - ) \ No newline at end of file + gene_search_df = gene_df.select( + "mgiGeneAccessionId", + "geneName", + "geneSymbol", + "synonyms", + "humanGeneSymbols", + "humanSymbolSynonyms", + "esCellProductionStatus", + "mouseProductionStatus", + "phenotypeStatus", + "phenotypingDataAvailable", + ) + gene_search_df.repartition(1).write.option("ignoreNullFields", "false").json( + output_path + ) From 7f8dda25144c32c7adf89eb641f1dec82cd8e233 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Thu, 18 Sep 2025 15:09:49 +0100 Subject: [PATCH 10/14] feat: use GENE_SUMMARY_MAPPINGS from utils module --- .../impc_web_api/impc_gene_search_mapper.py | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py index cd162f6d..a56ea9e6 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py @@ -7,6 +7,7 @@ from impc_etl.utils.airflow import create_input_asset, create_output_asset from impc_etl.utils.spark import with_spark_session +from impc_etl.utils.impc_web_api import GENE_SUMMARY_MAPPINGS gene_parquet_path_asset = create_input_asset("output/gene_data_include_parquet") gene_search_service_json_asset = create_output_asset("impc_web_api/gene_search_service_json") @@ -24,22 +25,6 @@ def impc_gene_search_mapper(): from pyspark.sql import SparkSession - GENE_SUMMARY_MAPPINGS = { - "mgi_accession_id": "mgiGeneAccessionId", - "marker_symbol": "geneSymbol", - "marker_name": "geneName", - "marker_synonym": "synonyms", - "significant_top_level_mp_terms": "significantTopLevelPhenotypes", - "not_significant_top_level_mp_terms": "notSignificantTopLevelPhenotypes", - "embryo_data_available": "hasEmbryoImagingData", - "human_gene_symbol": "human_gene_symbols", - "human_symbol_synonym": "human_symbol_synonyms", - "production_centre": "production_centres", - "phenotyping_centre": "phenotyping_centres", - "allele_name": "allele_names", - "ensembl_gene_id": "ensembl_gene_ids", - } - def to_camel_case(snake_str): components = snake_str.split("_") # We capitalize the first letter of each component except the first one From b7f28f4e5c8fb025a462c7cacf1b90c378338091 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Thu, 18 Sep 2025 15:16:07 +0100 Subject: [PATCH 11/14] feat: set overwrite mode when saving results --- impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py index a56ea9e6..b12e1dd5 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py @@ -58,6 +58,6 @@ def to_camel_case(snake_str): "phenotypeStatus", "phenotypingDataAvailable", ) - gene_search_df.repartition(1).write.option("ignoreNullFields", "false").json( + gene_search_df.repartition(1).write.option("ignoreNullFields", "false").mode("overwrite").json( output_path ) From ae47f383551e3e5092eccf927b9c8a1b8c572044 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Thu, 18 Sep 2025 10:32:50 +0100 Subject: [PATCH 12/14] feat: initial impc_idg_mapper airflow task --- .../jobs/load/impc_web_api/impc_idg_mapper.py | 160 ++++++++---------- 1 file changed, 69 insertions(+), 91 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_idg_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_idg_mapper.py index fe81d966..4f02eb59 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_idg_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_idg_mapper.py @@ -1,93 +1,71 @@ -from impc_etl.jobs.load.impc_web_api import ( - GeneLoader, - ImpcConfig, - PySparkTask, - SparkContext, - SparkSession, - col, - luigi, +import logging +import textwrap +from airflow.sdk import Variable, asset +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +gene_parquet_path_asset = create_input_asset("output/gene_data_include_parquet") +idg_family_mapping_report_json_path_asset = create_input_asset("impc_web_api/IDG_TargetList_CurrentVersion.json") +ortholog_mapping_report_tsv_path_asset = create_input_asset("impc_web_api/mouse_human_ortholog_report.tsv") +idg_landing_json_asset = create_output_asset("impc_web_api/idg_landing.json") + +@asset.multi( + schedule=[idg_family_mapping_report_json_path_asset], + outlets=[idg_landing_json_asset], + dag_id=f"{dr_tag}_impc_idg_landing_mapper", + description=textwrap.dedent( + """IMPC Web API IDG landing page mapper DAG.""" + ), + tags=["impc_web_api", "idg", "landing-page"], ) +@with_spark_session +def impc_idg_mapper(): + import json + from pyspark.sql import SparkSession + from pyspark.sql.functions import col + from urllib.parse import unquote, urlparse + + spark = SparkSession.builder.getOrCreate() + + # Parsing app options + idg_family_mapping_report_json_path = idg_family_mapping_report_json_path_asset.uri + ortholog_mapping_report_tsv_path = ortholog_mapping_report_tsv_path_asset.uri + gene_parquet_path = gene_parquet_path_asset.uri + + idg_family_df = spark.read.json(idg_family_mapping_report_json_path) + ortholog_mapping_df = spark.read.csv( + ortholog_mapping_report_tsv_path, sep="\t", header=True + ) + gene_df = spark.read.parquet(gene_parquet_path) + + gene_df = gene_df.select( + "mgi_accession_id", + "marker_symbol", + "significant_top_level_mp_terms", + "not_significant_top_level_mp_terms", + "phenotype_status", + "mouse_production_status", + "es_cell_production_status", + ).distinct() + + ortholog_mapping_df = ortholog_mapping_df.select( + col("Mgi Gene Acc Id").alias("mgi_accession_id"), + col("Human Gene Symbol").alias("human_gene_symbol"), + ).distinct() + + gene_df = gene_df.join( + ortholog_mapping_df, + "mgi_accession_id", + ) + idg_family_df = idg_family_df.withColumnRenamed("Gene", "human_gene_symbol") + idg_family_df = idg_family_df.withColumnRenamed("IDGFamily", "idg_family") + gene_df = gene_df.join(idg_family_df, "human_gene_symbol") + + idg_landing_json = gene_df.rdd.map(lambda row: row.asDict(True)).collect() + output_path = unquote(urlparse(idg_landing_json_asset.uri).path) + with open(output_path, "w") as output_file: + output_file.write(json.dumps(idg_landing_json)) - -class ImpcIDGMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ - - #: Name of the Spark task - name: str = "ImpcIDGMapper" - - ortholog_mapping_report_tsv_path = luigi.Parameter() - idg_family_mapping_report_json_path = luigi.Parameter() - - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() - - def requires(self): - return [GeneLoader()] - - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/idg_landing.json" - ) - - def app_options(self): - """ - Generates the options pass to the PySpark job - """ - return [ - self.idg_family_mapping_report_json_path, - self.ortholog_mapping_report_tsv_path, - self.input()[0].path, - self.output().path, - ] - - def main(self, sc: SparkContext, *args): - """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. - """ - spark = SparkSession(sc) - - # Parsing app options - idg_family_mapping_report_json_path = args[0] - ortholog_mapping_report_tsv_path = args[1] - gene_parquet_path = args[2] - output_path = args[3] - - idg_family_df = spark.read.json(idg_family_mapping_report_json_path) - ortholog_mapping_df = spark.read.csv( - ortholog_mapping_report_tsv_path, sep="\t", header=True - ) - gene_df = spark.read.parquet(gene_parquet_path) - - gene_df = gene_df.select( - "mgi_accession_id", - "marker_symbol", - "significant_top_level_mp_terms", - "not_significant_top_level_mp_terms", - "phenotype_status", - "mouse_production_status", - "es_cell_production_status", - ).distinct() - - ortholog_mapping_df = ortholog_mapping_df.select( - col("Mgi Gene Acc Id").alias("mgi_accession_id"), - col("Human Gene Symbol").alias("human_gene_symbol"), - ).distinct() - - gene_df = gene_df.join( - ortholog_mapping_df, - "mgi_accession_id", - ) - idg_family_df = idg_family_df.withColumnRenamed("Gene", "human_gene_symbol") - idg_family_df = idg_family_df.withColumnRenamed("IDGFamily", "idg_family") - gene_df = gene_df.join(idg_family_df, "human_gene_symbol") - - idg_landing_json = gene_df.rdd.map(lambda row: row.asDict(True)).collect() - - with open(output_path, mode="w") as output_file: - output_file.write(json.dumps(idg_landing_json)) \ No newline at end of file From d274972bda51dc2dfb2bfd20c239cf3e4d95f87b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Thu, 18 Sep 2025 12:58:33 +0100 Subject: [PATCH 13/14] feat: initial impc_external_links_mapper airflow task --- .../impc_external_links_mapper.py | 473 ++++++++---------- 1 file changed, 220 insertions(+), 253 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_external_links_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_external_links_mapper.py index 01f01362..a5d9f0c5 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_external_links_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_external_links_mapper.py @@ -1,259 +1,226 @@ -from impc_etl.jobs.load.impc_web_api import ( - GeneLoader, - ImpcConfig, - PySparkTask, - SparkContext, - SparkSession, - StringType, - StructField, - StructType, - col, - concat, - concat_ws, - lit, - lower, - luigi, - regexp_replace, - trim, +import logging +import textwrap +from airflow.sdk import Variable, asset +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +mouse_human_ortholog_report_tsv_path_asset = create_input_asset("impc_web_api/mouse_human_ortholog_report.tsv") +umass_early_lethal_report_csv_path_asset = create_input_asset("impc_web_api/umass_early_lethal_report.csv") +uniprot_report_csv_path_asset = create_input_asset("impc_web_api/uniprot_report.tsv") +morphic_report_csv_path_asset = create_input_asset("impc_web_api/morphic_report.csv") +gene_parquet_path_asset = create_input_asset("output/gene_data_include_parquet") + +external_links_json_asset = create_output_asset("impc_web_api/external_links_json") + +@asset.multi( + schedule=[mouse_human_ortholog_report_tsv_path_asset, umass_early_lethal_report_csv_path_asset, uniprot_report_csv_path_asset, morphic_report_csv_path_asset, gene_parquet_path_asset], + outlets=[external_links_json_asset], + dag_id=f"{dr_tag}_impc_external_links_mapper", + description=textwrap.dedent( + """IMPC Web API external links mapper DAG.""" + ), + tags=["impc_web_api", "external links"], ) - - -class ImpcExternalLinksMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ - - #: Name of the Spark task - name: str = "ImpcExternalLinksMapper" - - mouse_human_ortholog_report_tsv_path: luigi.Parameter = luigi.Parameter() - umass_early_lethal_report_csv_path: luigi.Parameter = luigi.Parameter() - uniprot_report_csv_path: luigi.Parameter = luigi.Parameter() - morphic_report_csv_path: luigi.Parameter = luigi.Parameter() - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() - - def requires(self): - return [GeneLoader()] - - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/external_links_json" - ) - - def app_options(self): - """ - Generates the options pass to the PySpark job - """ - return [ - self.input()[0].path, - self.mouse_human_ortholog_report_tsv_path, - self.umass_early_lethal_report_csv_path, - self.uniprot_report_csv_path, - self.morphic_report_csv_path, - self.output().path, - ] - - def main(self, sc: SparkContext, *args): - """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. - """ - spark = SparkSession(sc) - - # Parsing app options - gene_parquet_path = args[0] - mouse_human_ortholog_report_tsv_path = args[1] - umass_early_lethal_report_csv_path = args[2] - uniprot_report_csv_path = args[3] - morphic_report_csv_path = args[4] - output_path = args[5] - - gene_df = spark.read.parquet(gene_parquet_path) - mouse_human_ortholog_report_df = spark.read.csv( - mouse_human_ortholog_report_tsv_path, sep="\t", header=True - ) - umass_early_lethal_report_df = spark.read.csv( - umass_early_lethal_report_csv_path, header=True, multiLine=True - ) - uniprot_report_df = spark.read.csv( - uniprot_report_csv_path, - sep="\t", - header=False, - schema=StructType( - [ - StructField("uniprot_id", StringType(), True), - StructField("uniprot_db_id", StringType(), True), - StructField("uniprot_external_id", StringType(), True), - ] - ), - ) - morphic_report_df = spark.read.csv( - morphic_report_csv_path, - header=False, - schema=StructType( - [ - StructField("morphic_human_gene_symbol", StringType(), True), - ] - ), - ) - - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( - "MGI Number", "mgi_accession_id" - ) - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( - "Description only", "description" - ) - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( - "Link", concat(lit("https://"), col("Link")) - ) - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( - "Link", "href" - ) - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( - "description", regexp_replace("description", "[\b\n\r]", " ") - ) - - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( - "mgi_accession_id", - concat_ws(":", lit("MGI"), trim("mgi_accession_id")), - ).select("mgi_accession_id", "description", "href") - for col_name in mouse_human_ortholog_report_df.columns: - mouse_human_ortholog_report_df = ( - mouse_human_ortholog_report_df.withColumnRenamed( - col_name, col_name.replace(" ", "_").lower() - ) - ) - +@with_spark_session +def impc_external_links_mapper(): + from pyspark.sql import SparkSession, Window + from pyspark.sql.types import StructType, StructField, StringType + from pyspark.sql.functions import col, concat, lit, regexp_replace, trim, concat_ws + spark = SparkSession.builder.getOrCreate() + + # Parsing app options + gene_parquet_path = gene_parquet_path_asset.uri + mouse_human_ortholog_report_tsv_path = mouse_human_ortholog_report_tsv_path_asset.uri + umass_early_lethal_report_csv_path = umass_early_lethal_report_csv_path_asset.uri + uniprot_report_csv_path = uniprot_report_csv_path_asset.uri + morphic_report_csv_path = morphic_report_csv_path_asset.uri + output_path = external_links_json_asset.uri + + gene_df = spark.read.parquet(gene_parquet_path) + mouse_human_ortholog_report_df = spark.read.csv( + mouse_human_ortholog_report_tsv_path, sep="\t", header=True + ) + umass_early_lethal_report_df = spark.read.csv( + umass_early_lethal_report_csv_path, header=True, multiLine=True + ) + uniprot_report_df = spark.read.csv( + uniprot_report_csv_path, + sep="\t", + header=False, + schema=StructType( + [ + StructField("uniprot_id", StringType(), True), + StructField("uniprot_db_id", StringType(), True), + StructField("uniprot_external_id", StringType(), True), + ] + ), + ) + morphic_report_df = spark.read.csv( + morphic_report_csv_path, + header=False, + schema=StructType( + [ + StructField("morphic_human_gene_symbol", StringType(), True), + ] + ), + ) + + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( + "MGI Number", "mgi_accession_id" + ) + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( + "Description only", "description" + ) + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( + "Link", concat(lit("https://"), col("Link")) + ) + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( + "Link", "href" + ) + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( + "description", regexp_replace("description", "[\b\n\r]", " ") + ) + + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( + "mgi_accession_id", + concat_ws(":", lit("MGI"), trim("mgi_accession_id")), + ).select("mgi_accession_id", "description", "href") + for col_name in mouse_human_ortholog_report_df.columns: mouse_human_ortholog_report_df = ( mouse_human_ortholog_report_df.withColumnRenamed( - "mgi_gene_acc_id", "mgi_gene_accession_id" + col_name, col_name.replace(" ", "_").lower() ) ) - gwas_mouse_human_ortholog_report_df = mouse_human_ortholog_report_df.select( - "human_gene_symbol", "mgi_gene_accession_id" - ).distinct() - - gene_mgi_accession_df = ( - gene_df.select("mgi_accession_id") - .withColumnRenamed("mgi_accession_id", "mgi_gene_accession_id") - .dropDuplicates() - ) - - gwas_external_links_df = gene_mgi_accession_df.join( - gwas_mouse_human_ortholog_report_df, "mgi_gene_accession_id" - ) - - gwas_external_links_df = gwas_external_links_df.withColumnRenamed( - "mgi_gene_accession_id", "mgiGeneAccessionId" - ) - - gwas_external_links_df = gwas_external_links_df.withColumnRenamed( - "human_gene_symbol", "label" - ) - - gwas_external_links_df = gwas_external_links_df.withColumn( - "href", - concat( - lit("https://www.ebi.ac.uk/gwas/genes/"), gwas_external_links_df.label - ), - ) - gwas_external_links_df = gwas_external_links_df.withColumn( - "providerName", lit("GWAS Catalog") - ) - - gwas_external_links_df = gwas_external_links_df.withColumn( - "description", lit(None) - ) - - embryo_data_df = gene_df.select( - "mgi_accession_id", - "marker_symbol", - ).distinct() - embryo_data_df = embryo_data_df.join( - umass_early_lethal_report_df, "mgi_accession_id" - ) - - umass_external_links_df = embryo_data_df.withColumnRenamed( - "mgi_accession_id", "mgiGeneAccessionId" - ) - umass_external_links_df = umass_external_links_df.withColumnRenamed( - "marker_symbol", "label" - ) - umass_external_links_df = umass_external_links_df.withColumn( - "providerName", lit("Mager Lab Early Lethal Phenotypes") - ) - umass_external_links_df = umass_external_links_df.select( - "mgiGeneAccessionId", "label", "href", "providerName", "description" - ) - - uniprot_external_links_df = uniprot_report_df.where( - col("uniprot_db_id") == lit("MGI") - ) - uniprot_external_links_df = uniprot_external_links_df.withColumn( - "mgiGeneAccessionId", col("uniprot_external_id") - ) - uniprot_external_links_df = uniprot_external_links_df.withColumnRenamed( - "uniprot_id", "label" - ) - uniprot_external_links_df = uniprot_external_links_df.withColumn( - "providerName", lit("UniProt") - ) - uniprot_external_links_df = uniprot_external_links_df.withColumn( - "href", - concat(lit("https://www.uniprot.org/uniprotkb/"), col("label")), - ) - uniprot_external_links_df = uniprot_external_links_df.withColumn( - "description", lit(None) - ) - uniprot_external_links_df = uniprot_external_links_df.select( - "mgiGeneAccessionId", "label", "href", "providerName", "description" - ).distinct() - - morphic_mouse_human_ortholog_report_df = mouse_human_ortholog_report_df.select( - "human_gene_symbol", "mgi_gene_accession_id", "hgnc_acc_id" - ).distinct() - - morphic_external_links_df = gene_mgi_accession_df.join( - morphic_mouse_human_ortholog_report_df, "mgi_gene_accession_id" - ) - - morphic_external_links_df = morphic_external_links_df.join( - morphic_report_df, - col("human_gene_symbol") == col("morphic_human_gene_symbol"), - ) - - morphic_external_links_df = morphic_external_links_df.withColumnRenamed( - "mgi_gene_accession_id", "mgiGeneAccessionId" - ) - - morphic_external_links_df = morphic_external_links_df.withColumnRenamed( - "human_gene_symbol", "label" - ) - - morphic_external_links_df = morphic_external_links_df.withColumn( - "href", - concat(lit("https://morphic.bio/genes/"), col("hgnc_acc_id"), lit("/")), - ) - morphic_external_links_df = morphic_external_links_df.withColumn( - "providerName", lit("MorPhiC Program") - ) - - morphic_external_links_df = morphic_external_links_df.withColumn( - "description", lit(None) - ) - - morphic_external_links_df = morphic_external_links_df.select( - "mgiGeneAccessionId", "label", "href", "providerName", "description" - ).distinct() - - external_links_df = ( - gwas_external_links_df.union(umass_external_links_df) - .union(uniprot_external_links_df) - .union(morphic_external_links_df) - ) - external_links_df.write.json(output_path, mode="overwrite") \ No newline at end of file + mouse_human_ortholog_report_df = ( + mouse_human_ortholog_report_df.withColumnRenamed( + "mgi_gene_acc_id", "mgi_gene_accession_id" + ) + ) + + gwas_mouse_human_ortholog_report_df = mouse_human_ortholog_report_df.select( + "human_gene_symbol", "mgi_gene_accession_id" + ).distinct() + + gene_mgi_accession_df = ( + gene_df.select("mgi_accession_id") + .withColumnRenamed("mgi_accession_id", "mgi_gene_accession_id") + .dropDuplicates() + ) + + gwas_external_links_df = gene_mgi_accession_df.join( + gwas_mouse_human_ortholog_report_df, "mgi_gene_accession_id" + ) + + gwas_external_links_df = gwas_external_links_df.withColumnRenamed( + "mgi_gene_accession_id", "mgiGeneAccessionId" + ) + + gwas_external_links_df = gwas_external_links_df.withColumnRenamed( + "human_gene_symbol", "label" + ) + + gwas_external_links_df = gwas_external_links_df.withColumn( + "href", + concat( + lit("https://www.ebi.ac.uk/gwas/genes/"), gwas_external_links_df.label + ), + ) + gwas_external_links_df = gwas_external_links_df.withColumn( + "providerName", lit("GWAS Catalog") + ) + + gwas_external_links_df = gwas_external_links_df.withColumn( + "description", lit(None) + ) + + embryo_data_df = gene_df.select( + "mgi_accession_id", + "marker_symbol", + ).distinct() + embryo_data_df = embryo_data_df.join( + umass_early_lethal_report_df, "mgi_accession_id" + ) + + umass_external_links_df = embryo_data_df.withColumnRenamed( + "mgi_accession_id", "mgiGeneAccessionId" + ) + umass_external_links_df = umass_external_links_df.withColumnRenamed( + "marker_symbol", "label" + ) + umass_external_links_df = umass_external_links_df.withColumn( + "providerName", lit("Mager Lab Early Lethal Phenotypes") + ) + umass_external_links_df = umass_external_links_df.select( + "mgiGeneAccessionId", "label", "href", "providerName", "description" + ) + + uniprot_external_links_df = uniprot_report_df.where( + col("uniprot_db_id") == lit("MGI") + ) + uniprot_external_links_df = uniprot_external_links_df.withColumn( + "mgiGeneAccessionId", col("uniprot_external_id") + ) + uniprot_external_links_df = uniprot_external_links_df.withColumnRenamed( + "uniprot_id", "label" + ) + uniprot_external_links_df = uniprot_external_links_df.withColumn( + "providerName", lit("UniProt") + ) + uniprot_external_links_df = uniprot_external_links_df.withColumn( + "href", + concat(lit("https://www.uniprot.org/uniprotkb/"), col("label")), + ) + uniprot_external_links_df = uniprot_external_links_df.withColumn( + "description", lit(None) + ) + uniprot_external_links_df = uniprot_external_links_df.select( + "mgiGeneAccessionId", "label", "href", "providerName", "description" + ).distinct() + + morphic_mouse_human_ortholog_report_df = mouse_human_ortholog_report_df.select( + "human_gene_symbol", "mgi_gene_accession_id", "hgnc_acc_id" + ).distinct() + + morphic_external_links_df = gene_mgi_accession_df.join( + morphic_mouse_human_ortholog_report_df, "mgi_gene_accession_id" + ) + + morphic_external_links_df = morphic_external_links_df.join( + morphic_report_df, + col("human_gene_symbol") == col("morphic_human_gene_symbol"), + ) + + morphic_external_links_df = morphic_external_links_df.withColumnRenamed( + "mgi_gene_accession_id", "mgiGeneAccessionId" + ) + + morphic_external_links_df = morphic_external_links_df.withColumnRenamed( + "human_gene_symbol", "label" + ) + + morphic_external_links_df = morphic_external_links_df.withColumn( + "href", + concat(lit("https://morphic.bio/genes/"), col("hgnc_acc_id"), lit("/")), + ) + morphic_external_links_df = morphic_external_links_df.withColumn( + "providerName", lit("MorPhiC Program") + ) + + morphic_external_links_df = morphic_external_links_df.withColumn( + "description", lit(None) + ) + + morphic_external_links_df = morphic_external_links_df.select( + "mgiGeneAccessionId", "label", "href", "providerName", "description" + ).distinct() + + external_links_df = ( + gwas_external_links_df.union(umass_external_links_df) + .union(uniprot_external_links_df) + .union(morphic_external_links_df) + ) + external_links_df.write.json(output_path, mode="overwrite") From 008e49a280dc4c5297d4025ef0af0e112030bcfe Mon Sep 17 00:00:00 2001 From: Robert Wilson Date: Thu, 25 Sep 2025 10:17:57 +0100 Subject: [PATCH 14/14] Modified the phenotype_statistical_results_mapper for Airflow. The feat/impc-web-api branch is merged into this branch, but no changes have been made to impc_etl.utils.impc_web_api. --- ...pc_phenotype_statistical_results_mapper.py | 389 +++++++++--------- 1 file changed, 190 insertions(+), 199 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_phenotype_statistical_results_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_phenotype_statistical_results_mapper.py index 857171cf..66a53652 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_phenotype_statistical_results_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_phenotype_statistical_results_mapper.py @@ -1,228 +1,219 @@ -from impc_etl.jobs.load.impc_web_api import ( - DoubleType, - GeneLoader, - ImpcConfig, - IntegerType, - PySparkTask, - SparkContext, - SparkSession, - StatsResultsMapper, - array, - col, - collect_set, - concat, - explode, - lit, - luigi, - phenotype_term_zip_udf, - struct, - to_camel_case, - when, - zip_with, -) +""" + PySpark task to create the phenotype_stats_service_parquet for the website + from the stats_results_parquet and the gene parquet files. +""" +import logging +import textwrap +from airflow.sdk import Variable, asset -class ImpcPhenotypeStatisticalResultsMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session - #: Name of the Spark task - name: str = "ImpcPhenotypeStatisticalResultsMapper" - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") - def requires(self): - return [StatsResultsMapper(), GeneLoader()] +statistical_results_raw_data_include_parquet_path_asset = create_input_asset("output/statistical_results_raw_data_include_parquet") +gene_data_include_parquet_path_asset = create_input_asset("output/gene_data_include_parquet") - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/phenotype_stats_service_parquet" - ) +phenotype_stats_service_parquet_output_asset = create_output_asset("impc_web_api/phenotype_stats_service_parquet") - def app_options(self): +@asset.multi( + schedule=[statistical_results_raw_data_include_parquet_path_asset, + gene_data_include_parquet_path_asset, + ], + outlets=[phenotype_stats_service_parquet_output_asset], + dag_id=f"{dr_tag}_impc_phenotype_statistical_results_mapper", + description=textwrap.dedent( """ - Generates the options pass to the PySpark job + PySpark task to create the phenotype_stats_service_parquet for the website + from the stats_results_parquet and the gene parquet files. """ - return [ - self.input()[0].path, - self.input()[1].path, - self.output().path, - ] + ), + tags=["impc_web_api"], +) +@with_spark_session +def impc_phenotype_statistical_results_mapper(): + + from impc_etl.utils.impc_web_api import to_camel_case, phenotype_term_zip_udf + + from pyspark.sql import SparkSession + from pyspark.sql.functions import ( + col, + explode, + zip_with, + struct, + when, + lit, + array, + concat, + collect_set, + + ) + from pyspark.sql.types import DoubleType, IntegerType + + spark = SparkSession.builder.getOrCreate() + + + stats_results_df = spark.read.parquet(statistical_results_raw_data_include_parquet_path_asset.uri) + gene_df = spark.read.parquet(gene_data_include_parquet_path_asset.uri) + + gene_df = gene_df.select( + col("mgi_accession_id").alias("marker_accession_id"), + "seq_region_start", + "seq_region_end", + "seq_region_id", + "chr_name", + "chr_strand", + "marker_symbol", + "marker_name", + ).distinct() + + phenotype_stats_df = stats_results_df.select( + "doc_id", + "marker_accession_id", + "p_value", + "effect_size", + "significant", + "mp_term_id", + "mp_term_id_options", + "intermediate_mp_term_id", + "top_level_mp_term_id", + "resource_fullname", + "mp_term_name", + "intermediate_mp_term_name", + "top_level_mp_term_name", + "mp_term_name_options", + ) + + phenotype_stats_df = phenotype_stats_df.withColumn( + "significantPhenotype", + when( + col("mp_term_id").isNotNull(), + struct( + col("mp_term_id").alias("id"), col("mp_term_name").alias("name") + ), + ).otherwise(lit(None)), + ) - def main(self, sc: SparkContext, *args): - """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. - """ - spark = SparkSession(sc) - - # Parsing app options - stats_results_parquet_path = args[0] - gene_parquet_path = args[1] - output_path = args[2] - - stats_results_df = spark.read.parquet(stats_results_parquet_path) - gene_df = spark.read.parquet(gene_parquet_path) - - gene_df = gene_df.select( - col("mgi_accession_id").alias("marker_accession_id"), - "seq_region_start", - "seq_region_end", - "seq_region_id", - "chr_name", - "chr_strand", - "marker_symbol", - "marker_name", - ).distinct() - - phenotype_stats_df = stats_results_df.select( - "doc_id", - "marker_accession_id", - "p_value", - "effect_size", - "significant", - "mp_term_id", - "mp_term_id_options", + phenotype_stats_df = phenotype_stats_df.withColumn( + "intermediatePhenotypes", + zip_with( "intermediate_mp_term_id", - "top_level_mp_term_id", - "resource_fullname", - "mp_term_name", "intermediate_mp_term_name", + phenotype_term_zip_udf, + ), + ) + + phenotype_stats_df = phenotype_stats_df.withColumn( + "topLevelPhenotypes", + zip_with( + "top_level_mp_term_id", "top_level_mp_term_name", + phenotype_term_zip_udf, + ), + ) + + phenotype_stats_df = phenotype_stats_df.withColumn( + "potentialPhenotypes", + zip_with( + "mp_term_id_options", "mp_term_name_options", + phenotype_term_zip_udf, + ), + ) + phenotype_stats_df = phenotype_stats_df.drop( + "top_level_mp_term_id", + "top_level_mp_term_name", + "intermediate_mp_term_id", + "intermediate_mp_term_name", + "mp_term_id_options", + "mp_term_name_options", + "mp_term_id", + "mp_term_name", + ) + + phenotype_stats_df = phenotype_stats_df.join(gene_df, "marker_accession_id") + + phenotype_stats_map = { + "marker_accession_id": "mgiGeneAccessionId", + "effect_size": "reportedEffectSize", + "p_value": "reportedPValue", + "resource_fullname": "resourceFullName", + "doc_id": "datasetId", + } + + for col_name in phenotype_stats_map.keys(): + phenotype_stats_df = phenotype_stats_df.withColumnRenamed( + col_name, phenotype_stats_map[col_name] ) - phenotype_stats_df = phenotype_stats_df.withColumn( - "significantPhenotype", - when( - col("mp_term_id").isNotNull(), - struct( - col("mp_term_id").alias("id"), col("mp_term_name").alias("name") - ), - ).otherwise(lit(None)), + for col_name in phenotype_stats_df.columns: + phenotype_stats_df = phenotype_stats_df.withColumnRenamed( + col_name, to_camel_case(col_name) ) - phenotype_stats_df = phenotype_stats_df.withColumn( - "intermediatePhenotypes", - zip_with( - "intermediate_mp_term_id", - "intermediate_mp_term_name", - phenotype_term_zip_udf, - ), - ) + double_cols = [ + "reportedEffectSize", + "reportedPValue", + ] + for col_name in double_cols: phenotype_stats_df = phenotype_stats_df.withColumn( - "topLevelPhenotypes", - zip_with( - "top_level_mp_term_id", - "top_level_mp_term_name", - phenotype_term_zip_udf, - ), + col_name, col(col_name).astype(DoubleType()) ) + int_cols = [ + "seqRegionStart", + "seqRegionEnd", + ] + + for col_name in int_cols: phenotype_stats_df = phenotype_stats_df.withColumn( - "potentialPhenotypes", - zip_with( - "mp_term_id_options", - "mp_term_name_options", - phenotype_term_zip_udf, - ), + col_name, col(col_name).astype(IntegerType()) ) - phenotype_stats_df = phenotype_stats_df.drop( - "top_level_mp_term_id", - "top_level_mp_term_name", - "intermediate_mp_term_id", - "intermediate_mp_term_name", - "mp_term_id_options", - "mp_term_name_options", - "mp_term_id", - "mp_term_name", + phenotype_stats_df = ( + phenotype_stats_df.withColumn( + "potentialPhenotypes", + when( + col("significantPhenotype").isNotNull(), + array("significantPhenotype"), + ).otherwise(col("potentialPhenotypes")), ) - - phenotype_stats_df = phenotype_stats_df.join(gene_df, "marker_accession_id") - - phenotype_stats_map = { - "marker_accession_id": "mgiGeneAccessionId", - "effect_size": "reportedEffectSize", - "p_value": "reportedPValue", - "resource_fullname": "resourceFullName", - "doc_id": "datasetId", - } - - for col_name in phenotype_stats_map.keys(): - phenotype_stats_df = phenotype_stats_df.withColumnRenamed( - col_name, phenotype_stats_map[col_name] - ) - - for col_name in phenotype_stats_df.columns: - phenotype_stats_df = phenotype_stats_df.withColumnRenamed( - col_name, to_camel_case(col_name) - ) - - double_cols = [ - "reportedEffectSize", - "reportedPValue", - ] - - for col_name in double_cols: - phenotype_stats_df = phenotype_stats_df.withColumn( - col_name, col(col_name).astype(DoubleType()) - ) - - int_cols = [ - "seqRegionStart", - "seqRegionEnd", - ] - - for col_name in int_cols: - phenotype_stats_df = phenotype_stats_df.withColumn( - col_name, col(col_name).astype(IntegerType()) - ) - phenotype_stats_df = ( - phenotype_stats_df.withColumn( - "potentialPhenotypes", - when( - col("significantPhenotype").isNotNull(), - array("significantPhenotype"), - ).otherwise(col("potentialPhenotypes")), - ) - .withColumn( - "phenotypes", - concat( - "potentialPhenotypes", - "intermediatePhenotypes", - "topLevelPhenotypes", - ), - ) - .drop( + .withColumn( + "phenotypes", + concat( "potentialPhenotypes", "intermediatePhenotypes", "topLevelPhenotypes", - "significantPhenotype", - ) - .withColumn("phenotypeId", explode("phenotypes.id")) - .drop("phenotypes") - .groupBy("phenotypeId") - .agg( - collect_set( - struct( - "mgiGeneAccessionId", - "markerSymbol", - "reportedPValue", - "reportedEffectSize", - "chrName", - "chrStrand", - "seqRegionStart", - "seqRegionEnd", - "significant", - "resourceFullName", - ) - ).alias("results") - ) + ), + ) + .drop( + "potentialPhenotypes", + "intermediatePhenotypes", + "topLevelPhenotypes", + "significantPhenotype", + ) + .withColumn("phenotypeId", explode("phenotypes.id")) + .drop("phenotypes") + .groupBy("phenotypeId") + .agg( + collect_set( + struct( + "mgiGeneAccessionId", + "markerSymbol", + "reportedPValue", + "reportedEffectSize", + "chrName", + "chrStrand", + "seqRegionStart", + "seqRegionEnd", + "significant", + "resourceFullName", + ) + ).alias("results") ) - phenotype_stats_df.repartition(1000).write.parquet(output_path) \ No newline at end of file + ) + output_path = phenotype_stats_service_parquet_output_asset.uri + phenotype_stats_df.repartition(1000).write.parquet(output_path, mode="overwrite") \ No newline at end of file