From 58cb42a8865cef7e1c5bc575629ec3d95b465e9d Mon Sep 17 00:00:00 2001 From: saywurdson <34385409+saywurdson@users.noreply.github.com> Date: Fri, 25 Apr 2025 15:36:34 -0400 Subject: [PATCH 1/2] create airflow dag for openFDA pregnancy category and dbt model for clinical product to pregnancy category --- airflow/dags/openFDA/dag.py | 20 ++ airflow/dags/openFDA/dag_tasks.py | 171 ++++++++++++++++++ .../marts/products/_products__models.yml | 28 +++ .../marts/products/_products__sources.yml | 20 ++ .../products_to_pregnancy_category.sql | 9 + 5 files changed, 248 insertions(+) create mode 100644 airflow/dags/openFDA/dag.py create mode 100644 airflow/dags/openFDA/dag_tasks.py create mode 100644 dbt/sagerx/models/marts/products/_products__sources.yml create mode 100644 dbt/sagerx/models/marts/products/products_to_pregnancy_category.sql diff --git a/airflow/dags/openFDA/dag.py b/airflow/dags/openFDA/dag.py new file mode 100644 index 0000000..b516f4b --- /dev/null +++ b/airflow/dags/openFDA/dag.py @@ -0,0 +1,20 @@ +import pendulum +from airflow.decorators import dag +from openFDA.dag_tasks import extract, load + +dag_id = "openfda_pregnancy_category" + +@dag( + dag_id=dag_id, + schedule_interval="0 3 15 * *", + start_date=pendulum.today('UTC').add(days=-1), + catchup=False +) +def openfda(): + extract_task = extract(dag_id) + load_task = load(extract_task) + + extract_task >> load_task + +# Instantiate the DAG +dag = openfda() \ No newline at end of file diff --git a/airflow/dags/openFDA/dag_tasks.py b/airflow/dags/openFDA/dag_tasks.py new file mode 100644 index 0000000..b68b25d --- /dev/null +++ b/airflow/dags/openFDA/dag_tasks.py @@ -0,0 +1,171 @@ +from airflow.decorators import task +import pandas as pd +import requests +import re +import logging +from sagerx import load_df_to_pg, create_path, write_json_file, read_json_file +from common_dag_tasks import get_data_folder + +def format_ndc_to_11_digits(ndc): + # Remove any dashes if present and split into segments + if '-' in ndc: + segments = ndc.split('-') + else: + # Check if ndc that are 10 digits long + if len(ndc) != 10: + return ndc + + # Get the format (4-4-2, 5-3-2, or 5-4-1) + if re.match(r'^\d{4}\d{4}\d{2}$', ndc): # 4-4-2 format + segments = [ndc[:4], ndc[4:8], ndc[8:]] + elif re.match(r'^\d{5}\d{3}\d{2}$', ndc): # 5-3-2 format + segments = [ndc[:5], ndc[5:8], ndc[8:]] + elif re.match(r'^\d{5}\d{4}\d{1}$', ndc): # 5-4-1 format + segments = [ndc[:5], ndc[5:9], ndc[9:]] + else: + return ndc + + if len(segments) != 3: + return ndc # If not in the expected format, return as is + + # Add zero to the appropriate ndc segment + if len(segments[0]) == 4: # 4-x-x → 5-x-x + segments[0] = '0' + segments[0] + elif len(segments[1]) == 3: # x-3-x → x-4-x + segments[1] = '0' + segments[1] + elif len(segments[2]) == 1: # x-x-1 → x-x-2 + segments[2] = '0' + segments[2] + + # Return as formatted 11-digit NDC without dashes + return ''.join(segments) + +@task +def extract(dag_id: str) -> str: + """ + Extracts drug pregnancy category data from the openFDA API + """ + logging.info("Starting data retrieval for FDA drug pregnancy categories...") + + API_BASE = "https://api.fda.gov/drug/label.json" + + # Search for drug labels with teratogenic effects information, this is where the pregnancy category is + search_param = "_exists_:teratogenic_effects" + + params = { + "search": search_param, + "limit": 1000 # Maximum allowed per request + } + + all_data = [] + total_processed = 0 + + try: + # Get total number of records + initial_resp = requests.get(API_BASE, params=params) + initial_resp.raise_for_status() + initial_data = initial_resp.json() + + # Get total count of matching records + total_records = initial_data.get("meta", {}).get("results", {}).get("total", 0) + logging.info(f"Found {total_records} total records with teratogenic effects. Processing...") + + # Process records in batches of 1000 (API limit) + skip = 0 + while total_processed < total_records: + # Update params with current skip value + params["skip"] = skip + + resp = requests.get(API_BASE, params=params) + resp.raise_for_status() + batch_data = resp.json() + records = batch_data.get("results", []) + + if not records: + break + + # Process each record in the batch + for rec in records: + openfda = rec.get("openfda", {}) + + # Skip records without package_ndc + if "package_ndc" not in openfda or not openfda["package_ndc"]: + continue + + # Get rxcui if available. rxcui is not always available + rxcui = None + if "rxcui" in openfda and openfda["rxcui"]: + rxcui = openfda["rxcui"][0] + + # Define regex pattern to extract pregnancy category + pattern = r"(?:Pregnancy\s+)?Category\s+([ABCDX])\b" + + # Store just the pregnancy category letter + pregnancy_category = None + field_content = "" + if "teratogenic_effects" in rec: + field_content = rec["teratogenic_effects"] + if isinstance(field_content, list): + field_content = " ".join(field_content) + + match = re.search(pattern, field_content, re.IGNORECASE) + if match: + pregnancy_category = match.group(1) + + # Skip records without a valid pregnancy category + if not pregnancy_category: + continue + + # Get package_ndc values and create a row for each package_ndc + for package_ndc in openfda["package_ndc"]: + # Format the NDC + formatted_ndc = format_ndc_to_11_digits(package_ndc) + + all_data.append({ + "ndc": formatted_ndc, + "rxcui": rxcui, + "pregnancy_category": pregnancy_category + }) + + # Update counters for next batch + total_processed += len(records) + skip += len(records) + logging.info(f"Processed {total_processed} of {total_records} records...") + + # Save results to JSON file + data_folder = get_data_folder(dag_id) + file_path = create_path(data_folder) / 'pregnancy_categories.json' + file_path_str = file_path.resolve().as_posix() + + write_json_file(file_path_str, all_data) + + logging.info(f"Extraction Completed! Data saved to file: {file_path_str}") + return file_path_str + + except requests.exceptions.HTTPError as err: + logging.error(f"HTTP Error: {err}") + # Save any data collected so far + if all_data: + data_folder = get_data_folder(dag_id) + file_path = create_path(data_folder) / 'pregnancy_categories_partial.json' + file_path_str = file_path.resolve().as_posix() + write_json_file(file_path_str, all_data) + return file_path_str + raise + +@task +def load(file_path_str: str): + """ + Loads the pregnancy category data into the database. + """ + logging.info(f"Loading pregnancy category data from {file_path_str}") + + data = read_json_file(file_path_str) + + df = pd.DataFrame(data) + + logging.info(f"Dataframe created with {len(df)} rows") + + load_df_to_pg(df, "sagerx_lake", "openfda_pregnancy_categories", "replace", index=False) + + logging.info("Data successfully loaded into database") + return "Data load complete" \ No newline at end of file diff --git a/dbt/sagerx/models/marts/products/_products__models.yml b/dbt/sagerx/models/marts/products/_products__models.yml index 66b02e0..4202e57 100644 --- a/dbt/sagerx/models/marts/products/_products__models.yml +++ b/dbt/sagerx/models/marts/products/_products__models.yml @@ -97,3 +97,31 @@ models: - name: inactive_ingredient_tty - name: active - name: prescribable + + - name: products_to_pregnancy_category + description: | + Product information including name, RXCUI, and pregnancy category. + + Data generally comes from RxNorm and OpenFDA. + columns: + - name: clinical_product_rxcui + description: > + Product-level RxNorm RXCUI. + data_tests: + - unique + - not_null + - name: clinical_product_name + description: > + Product-level RxNorm RXCUI name. + data_tests: + - unique + - not_null + - name: pregnancy_category + description: | + Pregnancy category as defined by the FDA. + + A = No risk + B = No risk in humans + C = Risk cannot be ruled out + D = Positive evidence of risk + X = Contraindicated in pregnancy diff --git a/dbt/sagerx/models/marts/products/_products__sources.yml b/dbt/sagerx/models/marts/products/_products__sources.yml new file mode 100644 index 0000000..e77ea07 --- /dev/null +++ b/dbt/sagerx/models/marts/products/_products__sources.yml @@ -0,0 +1,20 @@ +version: 2 + +sources: + - name: openfda + schema: sagerx_lake + tables: + - name: openfda_pregnancy_categories + description: | + OpenFDA Pregnancy Categories + + https://open.fda.gov/apis/drug/pregnancy-categories/ + + The FDA has assigned pregnancy categories to drugs based on + their potential risks to a fetus. The categories are: + + - A: Controlled studies show no risk + - B: No evidence of risk in humans + - C: Risk cannot be ruled out + - D: Positive evidence of risk + - X: Contraindicated in pregnancy \ No newline at end of file diff --git a/dbt/sagerx/models/marts/products/products_to_pregnancy_category.sql b/dbt/sagerx/models/marts/products/products_to_pregnancy_category.sql new file mode 100644 index 0000000..bc258bf --- /dev/null +++ b/dbt/sagerx/models/marts/products/products_to_pregnancy_category.sql @@ -0,0 +1,9 @@ +-- products_to_pregnancy_category.sql + +select distinct + c.clinical_product_rxcui, + c.clinical_product_name, + o.pregnancy_category +from {{ source('openfda', 'openfda_pregnancy_categories') }} o +join {{ ref('int_rxnorm_clinical_products_to_ndcs') }} c +on c.ndc = o.ndc \ No newline at end of file From f571c00bf5d3b12f9bfa4a4c0ea4da96d1a65f77 Mon Sep 17 00:00:00 2001 From: Joey LeGrand Date: Sat, 10 May 2025 02:20:35 +0000 Subject: [PATCH 2/2] Change openfda to open_fda --- airflow/dags/{openFDA => open_fda}/dag.py | 8 ++++---- airflow/dags/{openFDA => open_fda}/dag_tasks.py | 12 ++++++------ .../models/marts/products/_products__sources.yml | 6 +++--- ...gory.sql => products_to_pregnancy_categories.sql} | 4 ++-- 4 files changed, 15 insertions(+), 15 deletions(-) rename airflow/dags/{openFDA => open_fda}/dag.py (72%) rename airflow/dags/{openFDA => open_fda}/dag_tasks.py (93%) rename dbt/sagerx/models/marts/products/{products_to_pregnancy_category.sql => products_to_pregnancy_categories.sql} (61%) diff --git a/airflow/dags/openFDA/dag.py b/airflow/dags/open_fda/dag.py similarity index 72% rename from airflow/dags/openFDA/dag.py rename to airflow/dags/open_fda/dag.py index b516f4b..7045f68 100644 --- a/airflow/dags/openFDA/dag.py +++ b/airflow/dags/open_fda/dag.py @@ -1,8 +1,8 @@ import pendulum from airflow.decorators import dag -from openFDA.dag_tasks import extract, load +from open_fda.dag_tasks import extract, load -dag_id = "openfda_pregnancy_category" +dag_id = "open_fda_pregnancy_category" @dag( dag_id=dag_id, @@ -10,11 +10,11 @@ start_date=pendulum.today('UTC').add(days=-1), catchup=False ) -def openfda(): +def open_fda(): extract_task = extract(dag_id) load_task = load(extract_task) extract_task >> load_task # Instantiate the DAG -dag = openfda() \ No newline at end of file +dag = open_fda() \ No newline at end of file diff --git a/airflow/dags/openFDA/dag_tasks.py b/airflow/dags/open_fda/dag_tasks.py similarity index 93% rename from airflow/dags/openFDA/dag_tasks.py rename to airflow/dags/open_fda/dag_tasks.py index b68b25d..38d4c54 100644 --- a/airflow/dags/openFDA/dag_tasks.py +++ b/airflow/dags/open_fda/dag_tasks.py @@ -85,16 +85,16 @@ def extract(dag_id: str) -> str: # Process each record in the batch for rec in records: - openfda = rec.get("openfda", {}) + open_fda = rec.get("openfda", {}) # Skip records without package_ndc - if "package_ndc" not in openfda or not openfda["package_ndc"]: + if "package_ndc" not in open_fda or not open_fda["package_ndc"]: continue # Get rxcui if available. rxcui is not always available rxcui = None - if "rxcui" in openfda and openfda["rxcui"]: - rxcui = openfda["rxcui"][0] + if "rxcui" in open_fda and open_fda["rxcui"]: + rxcui = open_fda["rxcui"][0] # Define regex pattern to extract pregnancy category pattern = r"(?:Pregnancy\s+)?Category\s+([ABCDX])\b" @@ -116,7 +116,7 @@ def extract(dag_id: str) -> str: continue # Get package_ndc values and create a row for each package_ndc - for package_ndc in openfda["package_ndc"]: + for package_ndc in open_fda["package_ndc"]: # Format the NDC formatted_ndc = format_ndc_to_11_digits(package_ndc) @@ -165,7 +165,7 @@ def load(file_path_str: str): logging.info(f"Dataframe created with {len(df)} rows") - load_df_to_pg(df, "sagerx_lake", "openfda_pregnancy_categories", "replace", index=False) + load_df_to_pg(df, "sagerx_lake", "open_fda_pregnancy_categories", "replace", index=False) logging.info("Data successfully loaded into database") return "Data load complete" \ No newline at end of file diff --git a/dbt/sagerx/models/marts/products/_products__sources.yml b/dbt/sagerx/models/marts/products/_products__sources.yml index e77ea07..f36ff9b 100644 --- a/dbt/sagerx/models/marts/products/_products__sources.yml +++ b/dbt/sagerx/models/marts/products/_products__sources.yml @@ -1,10 +1,10 @@ version: 2 sources: - - name: openfda + - name: open_fda schema: sagerx_lake tables: - - name: openfda_pregnancy_categories + - name: open_fda_pregnancy_categories description: | OpenFDA Pregnancy Categories @@ -17,4 +17,4 @@ sources: - B: No evidence of risk in humans - C: Risk cannot be ruled out - D: Positive evidence of risk - - X: Contraindicated in pregnancy \ No newline at end of file + - X: Contraindicated in pregnancy diff --git a/dbt/sagerx/models/marts/products/products_to_pregnancy_category.sql b/dbt/sagerx/models/marts/products/products_to_pregnancy_categories.sql similarity index 61% rename from dbt/sagerx/models/marts/products/products_to_pregnancy_category.sql rename to dbt/sagerx/models/marts/products/products_to_pregnancy_categories.sql index bc258bf..110df46 100644 --- a/dbt/sagerx/models/marts/products/products_to_pregnancy_category.sql +++ b/dbt/sagerx/models/marts/products/products_to_pregnancy_categories.sql @@ -1,9 +1,9 @@ --- products_to_pregnancy_category.sql +-- products_to_pregnancy_categories.sql select distinct c.clinical_product_rxcui, c.clinical_product_name, o.pregnancy_category -from {{ source('openfda', 'openfda_pregnancy_categories') }} o +from {{ source('open_fda', 'open_fda_pregnancy_categories') }} o join {{ ref('int_rxnorm_clinical_products_to_ndcs') }} c on c.ndc = o.ndc \ No newline at end of file