get_dagrun() function starts giving error in 2.0 when using inside cluster policy #19744
Replies: 6 comments
-
A workaround for this, for now, is to pass a session to from airflow.settings import Session
session = Session()
def task_instance_mutation_hook(task_instance):
dag_run = task_instance.get_dagrun(session=session)
conf = dag_run.conf |
Beta Was this translation helpful? Give feedback.
-
@kaxil
|
Beta Was this translation helpful? Give feedback.
-
Try with the following to delay import and creating a session in the mutation hook def task_instance_mutation_hook(task_instance):
from airflow.settings import Session
session = Session()
dag_run = task_instance.get_dagrun(session=session)
conf = dag_run.conf |
Beta Was this translation helpful? Give feedback.
-
@sudarshan2906 Did it work? |
Beta Was this translation helpful? Give feedback.
-
@kaxil Sorry for the late reply. |
Beta Was this translation helpful? Give feedback.
-
@sudarshan2906 does the code still work for you in Airflow 2.10.1? I tried the same code in my environment but it failed |
Beta Was this translation helpful? Give feedback.
-
Apache Airflow version: 2.0
PRETTY_NAME="Debian GNU/Linux 10 (buster)"
NAME="Debian GNU/Linux"
VERSION_ID="10"
VERSION="10 (buster)"
VERSION_CODENAME=buster
ID=debian
What happened:
I am using a task_instance_mutation_hook cluster policy and inside that using get_dagrun() function from task_instance object to get the dag run object for the task instance. This was working fine in 1.10.14 but it started giving following error in 2.0 due to which scheduler is not starting.
I am using the dag run object to get the conf passed to the dag run and I am setting some properties of the task_instance according to it.
How to reproduce it:
Example of the cluster policy used:
Beta Was this translation helpful? Give feedback.
All reactions