-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathapache_airflow_1.9.x_starter_template.py
123 lines (99 loc) · 4.48 KB
/
apache_airflow_1.9.x_starter_template.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
############################################################
# Author Krisjan Oldekamp / Stacktonic.com #
# Email [email protected] #
############################################################
from airflow import models
# Airflow operators https://airflow.apache.org/docs/apache-airflow/1.10.14/_api/airflow/operators/index.html
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
# Contributed Airflow operators https://airflow.apache.org/docs/apache-airflow/1.10.14/_api/airflow/contrib/operators/index.html
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator
# Other common imports
import datetime
import os
import json
#################################################
# DAG Configuration
#################################################
DAG_NAME = "example_dag" # DAG name (proposed format: lowercase underscore)
DAG_START_DATE = datetime.datetime(2021, 2, 12) # Startdate (when enabling the "catchup" parameter, you can perform a backfill)
DAG_SCHEDULE_INTERVAL = "@daily" # Cron notation -> see https://airflow.apache.org/scheduler.html#dag-runs
DAG_CATCHUP = False # When set to true, DAG will start running from DAG_START_DATE instead of current date
#################################################
# Default DAG arguments
#################################################
default_dag_args = {
"owner": "airflow",
"start_date": DAG_START_DATE,
"depends_on_past": False,
"email": models.Variable.get("email_monitoring"), # Make sure you create this variable
"email_on_failure": True,
"email_on_retry": True,
"retries": 1, # Number of retries
"retry_delay": datetime.timedelta(minutes=60), # Retry delay
"max_active_runs": 1 # Number of max. active runs
}
#################################################
# Custom DAG Configuration
#################################################
TASKS = [{
"account": "123"
},{
"account": "456"
}]
#################################################
# Custom Python functions
#################################################
# Custom Python function, to be executed by PythonOperator (see https://cloud.google.com/composer/docs/how-to/using/writing-dags#pythonoperator)
def custom_python_function_whatever(ds, **kwargs):
param_execution_date = kwargs["execution_date"]
param_something = kwargs["something_else"]
print("Custom function python, do whatever you want " + param_something)
#################################################
# Operator / repeatable functions
#################################################
# Repeatable / dynamic task
def dynamic_task(i, **kwargs):
taskname = TASKS[i]["account"]
return DummyOperator(
task_id="dynamic_task_" + str(i),
dag=dag)
#################################################
# Main DAG
#################################################
# Create DAG
with models.DAG(
DAG_NAME,
schedule_interval=DAG_SCHEDULE_INTERVAL,
catchup=DAG_CATCHUP,
max_active_runs=1,
default_args=default_dag_args) as dag:
# Start
start = DummyOperator(
task_id="start")
# Bash Operator
print_start = BashOperator(
task_id="print_start",
bash_command="echo 'hello'")
# Custom Python function
custom_python_function = PythonOperator(
task_id="custom_python_function",
provide_context=True,
op_kwargs={
"execution_date": "{{ ds }}", # Macro for execution date, see other macros https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
"something_else": "hello!"
},
python_callable=custom_python_function_whatever)
# Complete
complete = DummyOperator(
task_id="complete")
# Set order of execution (see also https://airflow.apache.org/concepts.html#bitshift-composition)
start >> print_start >> custom_python_function
# Add dynamic / repeating tasks (a lot of ways to do this)
for i, val in enumerate(TASKS):
custom_python_function >> dynamic_task(i) >> complete