Skip to content

Commit 6b46913

Browse files
Airflow ETL Pipeline
0 parents  commit 6b46913

27 files changed

+2326
-0
lines changed

.gitignore

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
.backup_files
2+
.ipynb_checkpoints
3+
.venv
4+
logs

Extract Transform Load.ipynb

+158
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"metadata": {},
6+
"source": [
7+
"# Importing Dependencies"
8+
]
9+
},
10+
{
11+
"cell_type": "code",
12+
"execution_count": 1,
13+
"metadata": {},
14+
"outputs": [],
15+
"source": [
16+
"from datetime import date, datetime\n",
17+
"import requests\n",
18+
"import pandas as pd\n",
19+
"import os\n",
20+
"import json\n",
21+
"import re\n",
22+
"import requests\n",
23+
"import sqlalchemy\n",
24+
"from sqlalchemy.orm import sessionmaker\n",
25+
"import sqlite3"
26+
]
27+
},
28+
{
29+
"cell_type": "markdown",
30+
"metadata": {},
31+
"source": [
32+
"# Extract"
33+
]
34+
},
35+
{
36+
"cell_type": "code",
37+
"execution_count": 2,
38+
"metadata": {},
39+
"outputs": [],
40+
"source": [
41+
"# Extracting JSON data from public API of New York City website\n",
42+
"def _extract():\n",
43+
" url = \"https://data.cityofnewyork.us/resource/rc75-m7u3.json\"\n",
44+
" result_load = requests.get(url)\n",
45+
" df = pd.DataFrame(json.loads(result_load.content))\n",
46+
" df.to_csv(\"covid_db_original_{}.csv\".format(date.today().strftime(\"%Y%m%d\")))\n",
47+
"_extract()"
48+
]
49+
},
50+
{
51+
"cell_type": "markdown",
52+
"metadata": {},
53+
"source": [
54+
"# Transform"
55+
]
56+
},
57+
{
58+
"cell_type": "code",
59+
"execution_count": 3,
60+
"metadata": {},
61+
"outputs": [],
62+
"source": [
63+
"# df = pd.DataFrame(json.loads(result_load.content))\n",
64+
"def _transform():\n",
65+
" df1 = pd.read_csv(\"covid_db_original_{}.csv\".format(date.today().strftime(\"%Y%m%d\")))\n",
66+
" df1['date'] = df1['date_of_interest'].str.extract('(....-..-..)', expand=True)\n",
67+
" df1.drop(df1.columns.difference(['date','case_count','hospitalized_count','death_count']), axis=1, inplace=True)\n",
68+
" df1 = df1.set_index(\"date\")\n",
69+
" df1.to_csv(\"covid_db_transformed_{}.csv\".format(date.today().strftime(\"%Y%m%d\")))\n",
70+
"_transform()"
71+
]
72+
},
73+
{
74+
"cell_type": "markdown",
75+
"metadata": {},
76+
"source": [
77+
"# Load"
78+
]
79+
},
80+
{
81+
"cell_type": "code",
82+
"execution_count": 6,
83+
"metadata": {},
84+
"outputs": [
85+
{
86+
"name": "stdout",
87+
"output_type": "stream",
88+
"text": [
89+
"Opened database successfully\n",
90+
"Data already exists in the database\n",
91+
"Close database successfully\n"
92+
]
93+
}
94+
],
95+
"source": [
96+
"def _load():\n",
97+
" \n",
98+
" df2 = pd.read_csv(\"covid_db_transformed_{}.csv\".format(date.today().strftime(\"%Y%m%d\")))\n",
99+
" \n",
100+
" DATABASE_LOCATION = \"sqlite:///covid_db_cleaned.sqlite\"\n",
101+
"\n",
102+
" engine = sqlalchemy.create_engine(DATABASE_LOCATION)\n",
103+
" conn = sqlite3.connect('covid_db_cleaned.sqlite')\n",
104+
" cursor = conn.cursor()\n",
105+
"\n",
106+
" sql_query = \"\"\"\n",
107+
" CREATE TABLE IF NOT EXISTS covid_db_cleaned (\n",
108+
" date DATE,\n",
109+
" case_count INT,\n",
110+
" hospitalized_count INT,\n",
111+
" death_count INT,\n",
112+
" PRIMARY KEY (date)\n",
113+
" )\n",
114+
" \"\"\"\n",
115+
"\n",
116+
" cursor.execute(sql_query)\n",
117+
" print(\"Opened database successfully\")\n",
118+
"\n",
119+
" try:\n",
120+
" df2.to_sql(\"covid_db_cleaned\", engine, index=False, if_exists='append',con=conn)\n",
121+
" except:\n",
122+
" print(\"Data already exists in the database\")\n",
123+
"\n",
124+
" conn.close()\n",
125+
" print(\"Close database successfully\")\n",
126+
"_load()"
127+
]
128+
},
129+
{
130+
"cell_type": "code",
131+
"execution_count": null,
132+
"metadata": {},
133+
"outputs": [],
134+
"source": []
135+
}
136+
],
137+
"metadata": {
138+
"kernelspec": {
139+
"display_name": "Python 3",
140+
"language": "python",
141+
"name": "python3"
142+
},
143+
"language_info": {
144+
"codemirror_mode": {
145+
"name": "ipython",
146+
"version": 3
147+
},
148+
"file_extension": ".py",
149+
"mimetype": "text/x-python",
150+
"name": "python",
151+
"nbconvert_exporter": "python",
152+
"pygments_lexer": "ipython3",
153+
"version": "3.8.5"
154+
}
155+
},
156+
"nbformat": 4,
157+
"nbformat_minor": 4
158+
}

covid_db_cleaned.sqlite

52 KB
Binary file not shown.

dags/__init__.py

Whitespace-only changes.
295 Bytes
Binary file not shown.
289 Bytes
Binary file not shown.
530 Bytes
Binary file not shown.

dags/__pycache__/dag_1.cpython-37.pyc

538 Bytes
Binary file not shown.

dags/__pycache__/dag_2.cpython-37.pyc

1.22 KB
Binary file not shown.

dags/__pycache__/dag_3.cpython-37.pyc

1.29 KB
Binary file not shown.
730 Bytes
Binary file not shown.
3.55 KB
Binary file not shown.
1.55 KB
Binary file not shown.
287 Bytes
Binary file not shown.
325 Bytes
Binary file not shown.
1.35 KB
Binary file not shown.

dags/__pycache__/load.cpython-37.pyc

1.06 KB
Binary file not shown.
993 Bytes
Binary file not shown.
2.89 KB
Binary file not shown.
517 Bytes
Binary file not shown.
593 Bytes
Binary file not shown.

dags/dag_nyc_covid.py

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from airflow import DAG
2+
from airflow.operators.python_operator import PythonOperator
3+
# from airflow.operators.bash import BashOperator
4+
from datetime import datetime, timedelta
5+
from etl_pipe import _extract, _transform, _load
6+
# import os
7+
8+
# cwd = os.getcwd()
9+
10+
default_args = {
11+
"owner": "airflow",
12+
"start_date": datetime.today() - timedelta(days=1)
13+
}
14+
with DAG(
15+
"dag_nyc_covid",
16+
default_args=default_args,
17+
schedule_interval = "0 1 * * *",
18+
) as dag:
19+
20+
extractData = PythonOperator(
21+
task_id="extract_data",
22+
python_callable =_extract,
23+
dag=dag)
24+
transformData = PythonOperator(
25+
task_id="transform_data",
26+
python_callable =_transform,
27+
dag=dag)
28+
loadData = PythonOperator(
29+
task_id="load_data",
30+
python_callable =_load,
31+
dag=dag)
32+
33+
extractData >> transformData >> loadData

dags/etl_pipe.py

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from datetime import date, datetime
2+
import requests
3+
import pandas as pd
4+
import json
5+
import requests
6+
from sqlalchemy import create_engine
7+
8+
# Extract: Extracting JSON data from public API of New York City website and saving to a .csv file
9+
10+
def _extract():
11+
url = "https://data.cityofnewyork.us/resource/rc75-m7u3.json"
12+
result_load = requests.get(url)
13+
df = pd.DataFrame(json.loads(result_load.content))
14+
df.to_csv("data/covid_db_original_{}.csv".format(date.today().strftime("%Y%m%d")))
15+
_extract()
16+
17+
# Transform: Transforming the data using pandas
18+
19+
def _transform():
20+
df1 = pd.read_csv("data/covid_db_original_{}.csv".format(date.today().strftime("%Y%m%d")))
21+
df1['date'] = df1['date_of_interest'].str.extract('(....-..-..)', expand=True)
22+
df1.drop(df1.columns.difference(['date','case_count','hospitalized_count','death_count']), axis=1, inplace=True)
23+
df1 = df1.set_index("date")
24+
df1.to_csv("data/covid_db_transformed_{}.csv".format(date.today().strftime("%Y%m%d")))
25+
_transform()
26+
27+
# Load: Ingesting the transformed data into an SQLite Database
28+
29+
def _load():
30+
31+
df2 = pd.read_csv("data/covid_db_transformed_{}.csv".format(date.today().strftime("%Y%m%d")))
32+
DATABASE_LOCATION = "sqlite:///covid_db_cleaned.sqlite"
33+
engine = create_engine(DATABASE_LOCATION, echo=True)
34+
sqlite_connection = engine.connect()
35+
sqlite_table = "covid_data"
36+
df2.to_sql(sqlite_table, sqlite_connection, if_exists='append')
37+
sqlite_connection.close()
38+
39+
_load()

0 commit comments

Comments
 (0)