-
-
Notifications
You must be signed in to change notification settings - Fork 3
Description
Hi once again,
one of our customers has a problem with submitting dags containing two tasks after branching, we we changed probably everything but nothing really helped.
we're running airflow 2.9.3 from stackable 24.11.0 and according to @sbernauer we have the object storage backend configured for xcom and logs (maybe thats important to know). we tried with celery executors and now with k8s executors, no difference. we tried gevent
, threads
, prefork
and solo
as AIRFLOW__CELERY__POOL
. We also tried setting AIRFLOW__CORE__MP_START_METHOD
to spawn
the error we get is following
[2025-06-20, 11:48:29 UTC] {taskinstance.py:441} ▼ Post task execution logs
[2025-06-20, 11:48:29 UTC] {taskinstance.py:2905} ERROR - Task failed with exception
Traceback (most recent call last):
File "/stackable/app/lib64/python3.9/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
File "/stackable/app/lib64/python3.9/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
File "/stackable/app/lib64/python3.9/site-packages/airflow/models/baseoperator.py", line 401, in wrapper
return func(self, *args, **kwargs)
File "/stackable/app/lib64/python3.9/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 591, in execute
return self.execute_sync(context)
File "/stackable/app/lib64/python3.9/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 607, in execute_sync
ti.xcom_push(key="pod_name", value=self.pod.metadata.name)
File "/stackable/app/lib64/python3.9/site-packages/airflow/utils/session.py", line 79, in wrapper
return func(*args, session=session, **kwargs)
File "/stackable/app/lib64/python3.9/site-packages/airflow/models/taskinstance.py", line 3197, in xcom_push
XCom.set(
File "/stackable/app/lib64/python3.9/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/stackable/app/lib64/python3.9/site-packages/airflow/models/xcom.py", line 246, in set
value = cls.serialize_value(
File "/stackable/app/lib64/python3.9/site-packages/airflow/providers/common/io/xcom/backend.py", line 142, in serialize_value
if not p.exists():
File "/stackable/app/lib64/python3.9/site-packages/upath/core.py", line 711, in exists
return self.fs.exists(self.path)
File "/stackable/app/lib64/python3.9/site-packages/fsspec/asyn.py", line 118, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/stackable/app/lib64/python3.9/site-packages/fsspec/asyn.py", line 328, in loop
raise RuntimeError("This class is not fork-safe")
RuntimeError: This class is not fork-safe
dag code has to be "blurred" for typical reasons but it looks like that, kpo = kubernetespodoperator
(
kpo_a
>> kpo_b
>> kpo_c
>> kpo_d
>> branch_x
)
branch_x >> [kpo_e, kpo_f]
kpo_e >> kpo_h
in this dag the kpo_f is the one thats always failing although it does not have any configuration differences, the pod is spawning and working but airflow seems to have a problem with it somehow.
please let me know what else you need to dig deeper, its a very ugly problem i know