Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions impc_etl/jobs/ingest/data_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
import logging
import os
import shutil
from datetime import datetime
import json
import time
import textwrap

import requests
from airflow.hooks.base import BaseHook
from airflow.sdk import Asset, Variable, asset, Param

from impc_etl.utils.airflow import create_input_asset
from impc_etl.utils.spark import with_spark_mongo_session

task_logger = logging.getLogger("airflow.task")

Expand Down Expand Up @@ -315,6 +319,123 @@ def copy_archived_ontological_data():
return ontology_file_assets


media_data_asset = create_input_asset("misc/media")

@asset.multi(
schedule=[misc_directory_asset],
outlets=[media_data_asset],
dag_id=f"{dr_tag}_fetch_cached_media_data",
)
@with_spark_mongo_session
def fetch_cached_media_data():
"""
PySparkTask task to fetch cached media data from the data release Mongo database.
"""

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import LongType


spark = SparkSession.builder.getOrCreate()

# Step 1: Fetch the media data already downloaded from our local database
media_df = spark.read \
.format("mongodb") \
.option("collection", "media_data") \
.load()

media_df = media_df.drop("_id")
task_logger.info(f" Distinct values in starting media: {media_df.select("checksum").distinct().count()}")
task_logger.info(f"schema: {media_df.printSchema()}")

media_df.coalesce(1).write.format("json").save(media_data_asset.uri + "/existing_media", mode="overwrite")

# Step 2: Fetch the media data from DCC for the past year
done = False
offset = 0

thisYear = datetime.today().strftime('%Y')
targetDate = str(int(thisYear) - 1) + '-07-01'
task_logger.info(f"- Calling media API to retrieve latest set of entries since: {targetDate}")

# Set HTTP proxy so the jobs has access to the Internet from EBI infrastructure.
http_proxy = "http://hh-wwwcache.ebi.ac.uk:3128"
os.environ["HTTP_PROXY"] = http_proxy
os.environ["HTTPS_PROXY"] = http_proxy
proxy_map = {"http": http_proxy, "https": http_proxy}

# Schema used to filter DCC data
dcc_minimal_schema = StructType([
StructField("id", LongType(), True),
StructField("extension", StringType(), True),
StructField("parameterKey", StringType(), True),
StructField("procedureKey", StringType(), True),
StructField("pipelineKey", StringType(), True),
StructField("checksum", StringType(), True),
StructField("centre", StringType(), True),
StructField("dccUrl", StringType(), True)])


def get_dcc_media(media_api_url, proxies, retries=0):
"""
:param media_api_url:
:param proxies:
:param retries:
:return:
"""
task_logger.info("Retrieving data from :" + media_api_url)

try:
response = requests.get(media_api_url, timeout=(5, 14), proxies=proxies)
try:
json_data = response.json()
except json.decoder.JSONDecodeError:
task_logger.info(f"{media_api_url}")
task_logger.info(" " + response.text)
raise requests.exceptions.RequestException(response=response)
except requests.exceptions.RequestException as e:
if retries < 4:
time.sleep(1)
json_data = get_dcc_media(media_api_url, proxies, retries + 1)
else:
task_logger.info(f"Max retries for {media_api_url}")
raise Exception(f"Max retries for {media_api_url}")
return json_data

while not done:
url = 'https://api.mousephenotype.org/media/updatedsince/' + targetDate + '?limit=10000&offset=' + str(offset * 10000)
json_data = get_dcc_media(url, proxy_map, retries=0)
if len(json_data) > 0:

result_list = []
for entry in json_data:
result_list.append(json.dumps(entry))
rdd = spark.sparkContext.parallelize(result_list)

# Use the schema to filter for data required
newRows = spark.read.schema(dcc_minimal_schema).json(rdd)

if offset == 0:
newRows.write.format("parquet").save(media_data_asset.uri + "/dcc_media", mode="overwrite")
else:
newRows.write.mode("append").format("parquet").save(media_data_asset.uri + "/dcc_media")

elif len(json_data) == 0:
done = True
offset += 1

# Step 3: calculate the new DCC entries
dcc_media = spark.read.format("parquet").load(media_data_asset.uri + "/dcc_media")
task_logger.info(f"- dcc_media entries: {dcc_media.count()} ")

new_dcc_entries = dcc_media.join(media_df, dcc_media.checksum == media_df.checksum, "leftanti")
task_logger.info(f"- Retrieved {new_dcc_entries.count()} new entries ...")
new_dcc_entries.write.format("json").save(media_data_asset.uri + "/new_dcc_media", mode="overwrite")


threei_xml_dir_asset = create_input_asset("xml/3i")
europhenome_xml_dir_asset = create_input_asset("xml/europhenome")
pwg_xml_dir_asset = create_input_asset("xml/pwg")
Expand Down
73 changes: 0 additions & 73 deletions impc_etl/jobs/parse/dcc_media_metadata_extractor.py

This file was deleted.

121 changes: 121 additions & 0 deletions impc_etl/jobs/parse/dcc_media_metadata_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""
DCC Media Metadata Extractor Task.
Input: JSON file(s) containing new DCC Media Metadata with this shape
```
{
"id": 3278832,
"extension": "bmp",
"parameterKey": "CCP_XRY_048_001",
"procedureKey": "CCP_XRY_001",
"pipelineKey": "CCP_001",
"checksum": "ff927f3e84b4b7fd0a625bfb50d58235707110d6f82bb7f08a1c7612a3d19a57",
"centre": "CCPCZ",
"dccUrl": "https://api.mousephenotype.org/tracker/media/getfile/ff927f3e84b4b7fd0a625bfb50d58235707110d6f82bb7f08a1c7612a3d19a57"
}
```
and JSON files(s) containing existing Media Metadata already downloaded with this shape
```
{
"centre": "UC Davis",
"checksum": "9830cfdff9449ebbbf14ce7089bdf07f98ca488607d26fcb9211e1e0d868ec07",
"dccUrl": "",
"downloaded": true,
"fileName": "2890042.png",
"parameter": "IMPC_XRY_034_001",
"pipeline": "UCD_001",
"procedure": "IMPC_XRY_001"
}
```
Output: Parquet file containing all the DCC Media Metadata following the shape of the existing media metadata.
New DCC media metadata is marked as downloaded FALSE.
"""
import logging

from airflow.sdk import Variable, asset

from impc_etl.utils.airflow import create_input_asset, create_output_asset
from impc_etl.utils.spark import with_spark_session

task_logger = logging.getLogger("airflow.task")
dr_tag = Variable.get("data_release_tag")

new_dcc_media_asset = create_input_asset("misc/media/new_dcc_media")
existing_media_asset = create_input_asset("misc/media/existing_media")
media_output_asset = create_output_asset("media")

SITES = {
'bcm': 'BCM',
'gmc': 'HMGU',
'h': 'MRC Harwell',
'ics': 'ICS',
'j': 'JAX',
'tcp': 'TCP',
'ning': 'NING',
'rbrc': 'RBRC',
'ucd': 'UC Davis',
'wtsi': 'WTSI',
'kmpc': 'KMPC',
'ccpcz': 'CCP-IMG'
}

@asset.multi(
schedule=[new_dcc_media_asset,existing_media_asset],
outlets=[media_output_asset],
dag_id=f"{dr_tag}_extract_dcc_media_metadata_parser",
)
@with_spark_session
def extract_dcc_media_metadata_parser():
"""
PySparkTask task to parse media metadata.
"""

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

def py_format_centre(centre: str) -> str:
return SITES[centre.lower()]

def py_create_filename(id: int, extension: str) -> str:
return str(id) + "." + extension

format_centre = F.udf(py_format_centre, T.StringType())
create_filename = F.udf(py_create_filename, T.StringType())

spark = SparkSession.builder.getOrCreate()
new_dcc_media = spark.read.json(new_dcc_media_asset.uri)

new_dcc_media = (new_dcc_media
.withColumn("impcCentre", format_centre(F.col("centre")))
.withColumn("fileName", create_filename(F.col("id"), F.col("extension")))
.withColumn("downloaded", F.lit(False))
.withColumnRenamed("parameterKey", "parameter")
.withColumnRenamed("pipelineKey", "pipeline")
.withColumnRenamed("procedureKey", "procedure")
.drop("centre", "id", "extension")
.withColumnRenamed("impcCentre", "centre")
)

# reorder the columns in the dataframe to match the existing media format
new_dcc_media = new_dcc_media.select(
"centre",
"checksum",
"dccUrl",
"downloaded",
"fileName",
"parameter",
"pipeline",
"procedure"
)

task_logger.info(f"- new_dcc_media example row: {new_dcc_media.show(1)}")
task_logger.info(f"- new_dcc_media count: {new_dcc_media.count()}")

existing_media = spark.read.json(existing_media_asset.uri)
task_logger.info(f"- existing_media count: {existing_media.count()}")

existing_media = existing_media.union(new_dcc_media)
task_logger.info(f"- After addition existing_media count: {existing_media.count()}")

existing_media.write.format("parquet").save(media_output_asset.uri, mode="overwrite")

18 changes: 17 additions & 1 deletion impc_etl/utils/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

task_logger = logging.getLogger("airflow.task")

def with_spark_session(_func=None, *, postgres_database=False, include_xml=False, is_small_task=None):
def with_spark_session(_func=None, *, postgres_database=False, include_xml=False, is_small_task=None, mongo_url=""):
def decorator(func):
@wraps(func)
def wrapper():
Expand Down Expand Up @@ -32,11 +32,18 @@ def wrapper():
jars_packages = []
if postgres_database:
jars_packages.append("org.postgresql:postgresql:42.7.7")
if mongo_url:
jars_packages.append("org.mongodb.spark:mongo-spark-connector_2.13:10.5.0")
if include_xml:
jars_packages.append("com.databricks:spark-xml_2.12:0.15.0")
if jars_packages:
conf = conf.set("spark.jars.packages", ",".join(jars_packages))

if mongo_url:
conf = conf.set("spark.mongodb.read.connection.uri", mongo_url)
conf = conf.set("spark.mongodb.write.connection.uri", mongo_url)


spark = SparkSession.builder.config(conf=conf).getOrCreate()

spark_logger = logging.getLogger("spark")
Expand All @@ -60,3 +67,12 @@ def wrapper():

def with_spark_postgres_session(func):
return with_spark_session(func, postgres_database=True)

def with_spark_mongo_session(func):
conn = BaseHook.get_connection("kompdevrs082")
replica_set = conn.extra_dejson.get("replicaSet")
auth_source = conn.extra_dejson.get("authSource")

connection_url = f"mongodb://{conn.login}:{conn.password}@{conn.host}/{conn.schema}?replicaSet={replica_set}&authSource={auth_source}"
task_logger.info(f"Mongo URL: {connection_url}")
return with_spark_session(func, mongo_url=connection_url)