Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
3d0af68
feat: added impc_web_api utils module
francisco-ebi Sep 18, 2025
e755ed4
WIP: gene disease mapper task migration to Airflow
francisco-ebi Sep 15, 2025
77215fb
feat: remove import from local module
francisco-ebi Sep 15, 2025
ec7b0e4
fix: explicitly cast disease_model_avg_norm and disease_model_max_nor…
francisco-ebi Sep 15, 2025
1804c51
feat: replace repartition with coalesce
francisco-ebi Sep 16, 2025
57095d3
Merge pull request #428 from mpi2/399-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 18, 2025
b478984
feat: remove to_camel_case function from impc_gene_diseases_mapper ta…
francisco-ebi Sep 18, 2025
d9797f2
feat: initial impc_batch_query mapper task
francisco-ebi Sep 17, 2025
a4ee451
feat: use phenotype_term_zip_udf function from utils module
francisco-ebi Sep 18, 2025
9d23723
Merge pull request #422 from mpi2/392-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 18, 2025
53ffc4d
feat: initial impc_gene_search_mapper airflow task
francisco-ebi Sep 17, 2025
7f8dda2
feat: use GENE_SUMMARY_MAPPINGS from utils module
francisco-ebi Sep 18, 2025
b7f28f4
feat: set overwrite mode when saving results
francisco-ebi Sep 18, 2025
cd7cb82
Merge pull request #423 from mpi2/403-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 18, 2025
ae47f38
feat: initial impc_idg_mapper airflow task
francisco-ebi Sep 18, 2025
38664ec
Merge pull request #426 from mpi2/408-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 18, 2025
d274972
feat: initial impc_external_links_mapper airflow task
francisco-ebi Sep 18, 2025
21bf0a4
Merge pull request #429 from mpi2/398-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 18, 2025
08c2e3b
feat: initial impc_gene_histopathology_mapper task
francisco-ebi Sep 19, 2025
220d5dd
Merge pull request #433 from mpi2/400-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 19, 2025
6b32a47
feat: initial impc_gene_images_mapper task
francisco-ebi Sep 19, 2025
ed906be
Merge pull request #434 from mpi2/401-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 19, 2025
3b602b0
feat: initial impc_phenotype_pleiotropy_mapper task
francisco-ebi Sep 19, 2025
774355d
Merge pull request #435 from mpi2/415-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 19, 2025
973728d
feat: initial impc_embryo_landing_mapper task
francisco-ebi Sep 22, 2025
d2edc43
Merge pull request #438 from mpi2/397-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 22, 2025
18b3086
feat: initial impc_gene_stats_results_mapper task
francisco-ebi Sep 23, 2025
32c18ca
fix: stats result input path and dag id
francisco-ebi Sep 23, 2025
cb51222
fix: add missing sql functions
francisco-ebi Sep 23, 2025
a970a38
feat: set overwrite mode when saving results
francisco-ebi Sep 23, 2025
580233e
feat: change repartition(1000) to coalesce(5) to run locally
francisco-ebi Sep 23, 2025
7bc5877
Merge pull request #439 from mpi2/404-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 23, 2025
da444a7
feat: added get_lacz_expression_count function to impc_etl/utils/impc…
francisco-ebi Sep 24, 2025
507b70b
feat: initial impc_gene_summary_mapper task
francisco-ebi Sep 24, 2025
3a81eb3
Merge pull request #440 from mpi2/405-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 24, 2025
93e3581
fix: impc_gene_summary_mapper tags
francisco-ebi Sep 24, 2025
3745870
feat: initial impc_phenotype_search_mapper task
francisco-ebi Sep 24, 2025
6a038a5
Merge pull request #441 from mpi2/416-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 24, 2025
be0574e
feat: initial impc_phenotype_summary_mapper task
francisco-ebi Sep 24, 2025
81bf5a1
feat: remove old imports
francisco-ebi Sep 24, 2025
c3bc18e
feat: import missing helper function
francisco-ebi Sep 24, 2025
f8f45f7
Merge pull request #442 from mpi2/418-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 24, 2025
4567f1e
feat: initial impc_images_mapper task
francisco-ebi Sep 24, 2025
6f6360d
Merge pull request #443 from mpi2/409-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 24, 2025
4bc984f
feat: impc_histopathology_datasets_mapper task
francisco-ebi Sep 24, 2025
1cfac8f
Merge pull request #444 from mpi2/406-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 93 additions & 117 deletions impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py
Original file line number Diff line number Diff line change
@@ -1,124 +1,100 @@
from impc_etl.jobs.load.impc_web_api import (
ImpcConfig,
PySparkTask,
SparkContext,
SparkSession,
col,
collect_set,
explode_outer,
luigi,
phenotype_term_zip_udf,
import logging
import textwrap
from airflow.sdk import Variable, asset

from impc_etl.utils.airflow import create_input_asset, create_output_asset
from impc_etl.utils.spark import with_spark_session
from impc_etl.utils.impc_web_api import phenotype_term_zip_udf

task_logger = logging.getLogger("airflow.task")
dr_tag = Variable.get("data_release_tag")

ortholog_mapping_report_tsv_path_asset = create_input_asset("impc_web_api/mouse_human_ortholog_report.tsv")
mp_hp_matches_csv_path_asset = create_input_asset("impc_web_api/mp_hp_matches.csv")
gene_stats_results_json_path_asset = create_input_asset("output/impc_web_api/gene_statistical_results_service_json")

batch_query_data_parquet_asset = create_output_asset("impc_web_api/batch_query_data_parquet")

@asset.multi(
schedule=[ortholog_mapping_report_tsv_path_asset, mp_hp_matches_csv_path_asset, gene_stats_results_json_path_asset],
outlets=[batch_query_data_parquet_asset],
dag_id=f"{dr_tag}_impc_batch_query_mapper",
description=textwrap.dedent(
"""IMPC Web API batch query mapper DAG."""
),
tags=["impc_web_api", "batch query"],
)


class ImpcBatchQueryMapper(PySparkTask):
"""
PySpark Task class to parse GenTar Product report data.
"""

#: Name of the Spark task
name: str = "ImpcBatchQueryMapper"

ortholog_mapping_report_tsv_path = luigi.Parameter()
mp_hp_matches_csv_path = luigi.Parameter()

#: Path of the output directory where the new parquet file will be generated.
output_path: luigi.Parameter = luigi.Parameter()

def requires(self):
return [ImpcGeneStatsResultsMapper()]

def output(self):
"""
Returns the full parquet path as an output for the Luigi Task
(e.g. impc/dr15.2/parquet/product_report_parquet)
"""
return ImpcConfig().get_target(
f"{self.output_path}/impc_web_api/batch_query_data_parquet"
)

def app_options(self):
"""
Generates the options pass to the PySpark job
"""
return [
self.ortholog_mapping_report_tsv_path,
self.mp_hp_matches_csv_path,
self.input()[0].path,
self.output().path,
]

def main(self, sc: SparkContext, *args):
"""
Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job.
"""
spark = SparkSession(sc)

# Parsing app options
ortholog_mapping_report_tsv_path = args[0]
mp_hp_matches_csv_path = args[1]
gene_stats_results_json_path = args[2]
output_path = args[3]

ortholog_mapping_df = spark.read.csv(
ortholog_mapping_report_tsv_path, sep="\t", header=True
)
stats_results = spark.read.json(gene_stats_results_json_path)

ortholog_mapping_df = ortholog_mapping_df.select(
col("Mgi Gene Acc Id").alias("mgiGeneAccessionId"),
col("Human Gene Symbol").alias("humanGeneSymbol"),
col("Hgnc Acc Id").alias("hgncGeneAccessionId"),
).distinct()

stats_results = stats_results.join(
ortholog_mapping_df, "mgiGeneAccessionId", how="left_outer"
)

mp_matches_df = spark.read.csv(mp_hp_matches_csv_path, header=True)
mp_matches_df = mp_matches_df.select(
col("curie_x").alias("id"),
col("curie_y").alias("hp_term_id"),
col("label_y").alias("hp_term_name"),
).distinct()

stats_mp_hp_df = stats_results.select(
"statisticalResultId",
"potentialPhenotypes",
"intermediatePhenotypes",
"topLevelPhenotypes",
"significantPhenotype",
@with_spark_session
def impc_batch_query_mapper():
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode_outer, collect_set, when, struct, lit

spark = SparkSession.builder.getOrCreate()

ortholog_mapping_report_tsv_path = ortholog_mapping_report_tsv_path_asset.uri
mp_hp_matches_csv_path = mp_hp_matches_csv_path_asset.uri
gene_stats_results_json_path = gene_stats_results_json_path_asset.uri
output_path = batch_query_data_parquet_asset.uri

ortholog_mapping_df = spark.read.csv(
ortholog_mapping_report_tsv_path, sep="\t", header=True
)
stats_results = spark.read.json(gene_stats_results_json_path)

ortholog_mapping_df = ortholog_mapping_df.select(
col("Mgi Gene Acc Id").alias("mgiGeneAccessionId"),
col("Human Gene Symbol").alias("humanGeneSymbol"),
col("Hgnc Acc Id").alias("hgncGeneAccessionId"),
).distinct()

stats_results = stats_results.join(
ortholog_mapping_df, "mgiGeneAccessionId", how="left_outer"
)

mp_matches_df = spark.read.csv(mp_hp_matches_csv_path, header=True)
mp_matches_df = mp_matches_df.select(
col("curie_x").alias("id"),
col("curie_y").alias("hp_term_id"),
col("label_y").alias("hp_term_name"),
).distinct()

stats_mp_hp_df = stats_results.select(
"statisticalResultId",
"potentialPhenotypes",
"intermediatePhenotypes",
"topLevelPhenotypes",
"significantPhenotype",
)
for phenotype_list_col in [
"potentialPhenotypes",
"intermediatePhenotypes",
"topLevelPhenotypes",
]:
stats_mp_hp_df = stats_mp_hp_df.withColumn(
phenotype_list_col[:-1], explode_outer(phenotype_list_col)
)
for phenotype_list_col in [
"potentialPhenotypes",
"intermediatePhenotypes",
"topLevelPhenotypes",
]:
stats_mp_hp_df = stats_mp_hp_df.withColumn(
phenotype_list_col[:-1], explode_outer(phenotype_list_col)
)

stats_mp_hp_df = stats_mp_hp_df.join(
mp_matches_df,
(
stats_mp_hp_df = stats_mp_hp_df.join(
mp_matches_df,
(
(col("significantPhenotype.id") == col("id"))
| (col("potentialPhenotype.id") == col("id"))
| (col("intermediatePhenotype.id") == col("id"))
| (col("topLevelPhenotype.id") == col("id"))
),
how="left_outer",
)
stats_mp_hp_df = stats_mp_hp_df.withColumn(
"humanPhenotype",
phenotype_term_zip_udf(col("hp_term_id"), col("hp_term_name")),
)
stats_mp_hp_df = (
stats_mp_hp_df.groupBy("statisticalResultId")
.agg(collect_set("humanPhenotype").alias("humanPhenotypes"))
.select("statisticalResultId", "humanPhenotypes")
.distinct()
)

stats_results = stats_results.join(stats_mp_hp_df, "statisticalResultId")

stats_results.write.parquet(output_path)
),
how="left_outer",
)
stats_mp_hp_df = stats_mp_hp_df.withColumn(
"humanPhenotype",
phenotype_term_zip_udf(col("hp_term_id"), col("hp_term_name")),
)
stats_mp_hp_df = (
stats_mp_hp_df.groupBy("statisticalResultId")
.agg(collect_set("humanPhenotype").alias("humanPhenotypes"))
.select("statisticalResultId", "humanPhenotypes")
.distinct()
)

stats_results = stats_results.join(stats_mp_hp_df, "statisticalResultId")

stats_results.coalesce(100).write.parquet(output_path, mode="overwrite")
Loading