Skip to content

Commit 6faa720

Browse files
authored
Remove XCom pickling (#43905)
XCom pickling was disabled by default in Airflow 2.0.0: https://airflow.apache.org/docs/apache-airflow/1.10.15/configurations-ref.html#enable-xcom-pickling
1 parent 49daa6c commit 6faa720

File tree

14 files changed

+31
-219
lines changed

14 files changed

+31
-219
lines changed

airflow/api_connexion/endpoints/xcom_endpoint.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ def get_xcom_entry(
127127
stub.value = XCom.deserialize_value(stub)
128128
item = stub
129129

130-
if stringify or conf.getboolean("core", "enable_xcom_pickling"):
130+
if stringify:
131131
return xcom_schema_string.dump(item)
132132

133133
return xcom_schema_native.dump(item)

airflow/api_fastapi/core_api/routes/public/xcom.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def get_xcom_entry(
8888
xcom_stub.value = XCom.deserialize_value(xcom_stub)
8989
item = xcom_stub
9090

91-
if stringify or conf.getboolean("core", "enable_xcom_pickling"):
91+
if stringify:
9292
return XComResponseString.model_validate(item, from_attributes=True)
9393

9494
return XComResponseNative.model_validate(item, from_attributes=True)

airflow/config_templates/config.yml

-9
Original file line numberDiff line numberDiff line change
@@ -249,15 +249,6 @@ core:
249249
type: string
250250
example: ~
251251
default: "False"
252-
enable_xcom_pickling:
253-
description: |
254-
Whether to enable pickling for xcom (note that this is insecure and allows for
255-
RCE exploits).
256-
version_added: ~
257-
type: string
258-
example: ~
259-
default: "False"
260-
see_also: "https://docs.python.org/3/library/pickle.html#comparison-with-json"
261252
allowed_deserialization_classes:
262253
description: |
263254
What classes can be imported during deserialization. This is a multi line value.

airflow/models/taskinstance.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -3368,9 +3368,7 @@ def xcom_push(
33683368
Make an XCom available for tasks to pull.
33693369
33703370
:param key: Key to store the value under.
3371-
:param value: Value to store. What types are possible depends on whether
3372-
``enable_xcom_pickling`` is true or not. If so, this can be any
3373-
picklable object; only be JSON-serializable may be used otherwise.
3371+
:param value: Value to store. Only be JSON-serializable may be used otherwise.
33743372
"""
33753373
XCom.set(
33763374
key=key,

airflow/models/xcom.py

+4-24
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import inspect
2121
import json
2222
import logging
23-
import pickle
2423
from typing import TYPE_CHECKING, Any, Iterable, cast
2524

2625
from sqlalchemy import (
@@ -455,21 +454,8 @@ def serialize_value(
455454
run_id: str | None = None,
456455
map_index: int | None = None,
457456
) -> Any:
458-
"""Serialize XCom value to str or pickled object."""
459-
if conf.getboolean("core", "enable_xcom_pickling"):
460-
return pickle.dumps(value)
461-
try:
462-
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
463-
except (ValueError, TypeError) as ex:
464-
log.error(
465-
"%s."
466-
" If you are using pickle instead of JSON for XCom,"
467-
" then you need to enable pickle support for XCom"
468-
" in your airflow config or make sure to decorate your"
469-
" object with attr.",
470-
ex,
471-
)
472-
raise
457+
"""Serialize XCom value to JSON str."""
458+
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
473459

474460
@staticmethod
475461
def _deserialize_value(result: XCom, orm: bool) -> Any:
@@ -479,14 +465,8 @@ def _deserialize_value(result: XCom, orm: bool) -> Any:
479465

480466
if result.value is None:
481467
return None
482-
if conf.getboolean("core", "enable_xcom_pickling"):
483-
try:
484-
return pickle.loads(result.value)
485-
except pickle.UnpicklingError:
486-
return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook)
487-
else:
488-
# Since xcom_pickling is disabled, we should only try to deserialize with JSON
489-
return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook)
468+
469+
return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook)
490470

491471
@staticmethod
492472
def deserialize_value(result: XCom) -> Any:

newsfragments/aip-72.significant.rst

+10
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,13 @@ As part of this change the following breaking changes have occurred:
1717
- Shipping DAGs via pickle is no longer supported
1818

1919
This was a feature that was not widely used and was a security risk. It has been removed.
20+
21+
- Pickling is no longer supported for XCom serialization.
22+
23+
XCom data will no longer support pickling. This change is intended to improve security and simplify data
24+
handling by supporting JSON-only serialization. DAGs that depend on XCom pickling must update to use JSON-serializable data.
25+
26+
As part of that change, ``[core] enable_xcom_pickling`` configuration option has been removed.
27+
28+
If you still need to use pickling, you can use a custom XCom backend that stores references in the metadata DB and
29+
the pickled data can be stored in a separate storage like S3.

providers/src/airflow/providers/microsoft/azure/operators/adx.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ def execute(self, context: Context) -> KustoResultTable | str:
8585
https://docs.microsoft.com/en-us/azure/kusto/api/rest/response2
8686
"""
8787
response = self.hook.run_query(self.query, self.database, self.options)
88-
if conf.getboolean("core", "enable_xcom_pickling"):
88+
# TODO: Remove this after minimum Airflow version is 3.0
89+
if conf.getboolean("core", "enable_xcom_pickling", fallback=False):
8990
return response.primary_results[0]
9091
else:
9192
return str(response.primary_results[0])

providers/src/airflow/providers/microsoft/winrm/operators/winrm.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ def execute(self, context: Context) -> list | str:
9797

9898
if return_code == 0:
9999
# returning output if do_xcom_push is set
100-
enable_pickling = conf.getboolean("core", "enable_xcom_pickling")
100+
# TODO: Remove this after minimum Airflow version is 3.0
101+
enable_pickling = conf.getboolean("core", "enable_xcom_pickling", fallback=False)
101102

102103
if enable_pickling:
103104
return stdout_buffer

providers/src/airflow/providers/ssh/operators/ssh.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ def execute(self, context=None) -> bytes | str:
188188

189189
with self.get_ssh_client() as ssh_client:
190190
result = self.run_ssh_client_command(ssh_client, self.command, context=context)
191-
enable_pickling = conf.getboolean("core", "enable_xcom_pickling")
191+
# TODO: Remove this after minimum Airflow version is 3.0
192+
enable_pickling = conf.getboolean("core", "enable_xcom_pickling", fallback=False)
192193
if not enable_pickling:
193194
result = b64encode(result).decode("utf-8")
194195

providers/tests/sftp/operators/test_sftp.py

+7
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from airflow.utils import timezone
3737
from airflow.utils.timezone import datetime
3838

39+
from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS
3940
from tests_common.test_utils.config import conf_vars
4041

4142
pytestmark = pytest.mark.db_test
@@ -95,6 +96,7 @@ def teardown_method(self):
9596
if os.path.exists(self.test_remote_dir):
9697
os.rmdir(self.test_remote_dir)
9798

99+
@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3")
98100
@conf_vars({("core", "enable_xcom_pickling"): "True"})
99101
def test_pickle_file_transfer_put(self, dag_maker):
100102
test_local_file_content = (
@@ -129,6 +131,7 @@ def test_pickle_file_transfer_put(self, dag_maker):
129131
pulled = tis["check_file_task"].xcom_pull(task_ids="check_file_task", key="return_value")
130132
assert pulled.strip() == test_local_file_content
131133

134+
@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3")
132135
@conf_vars({("core", "enable_xcom_pickling"): "True"})
133136
def test_file_transfer_no_intermediate_dir_error_put(self, create_task_instance_of_operator):
134137
test_local_file_content = (
@@ -158,6 +161,7 @@ def test_file_transfer_no_intermediate_dir_error_put(self, create_task_instance_
158161
ti2.run()
159162
assert "No such file" in str(ctx.value)
160163

164+
@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3")
161165
@conf_vars({("core", "enable_xcom_pickling"): "True"})
162166
def test_file_transfer_with_intermediate_dir_put(self, dag_maker):
163167
test_local_file_content = (
@@ -232,6 +236,7 @@ def create_remote_file_and_cleanup(self):
232236
yield
233237
os.remove(self.test_remote_filepath)
234238

239+
@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3")
235240
@conf_vars({("core", "enable_xcom_pickling"): "True"})
236241
def test_pickle_file_transfer_get(self, dag_maker, create_remote_file_and_cleanup):
237242
with dag_maker(dag_id="unit_tests_sftp_op_pickle_file_transfer_get"):
@@ -275,6 +280,7 @@ def test_json_file_transfer_get(self, dag_maker, create_remote_file_and_cleanup)
275280
content_received = file.read()
276281
assert content_received == self.test_remote_file_content
277282

283+
@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3")
278284
@conf_vars({("core", "enable_xcom_pickling"): "True"})
279285
def test_file_transfer_no_intermediate_dir_error_get(self, dag_maker, create_remote_file_and_cleanup):
280286
with dag_maker(dag_id="unit_tests_sftp_op_file_transfer_no_intermediate_dir_error_get"):
@@ -298,6 +304,7 @@ def test_file_transfer_no_intermediate_dir_error_get(self, dag_maker, create_rem
298304
ti.run()
299305
assert "No such file" in str(ctx.value)
300306

307+
@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Pickle support is removed in Airflow 3")
301308
@conf_vars({("core", "enable_xcom_pickling"): "True"})
302309
def test_file_transfer_with_intermediate_dir_error_get(self, dag_maker, create_remote_file_and_cleanup):
303310
with dag_maker(dag_id="unit_tests_sftp_op_file_transfer_with_intermediate_dir_error_get"):

tests/api_connexion/endpoints/test_xcom_endpoint.py

-30
Original file line numberDiff line numberDiff line change
@@ -158,36 +158,6 @@ def test_should_respond_200_native(self):
158158
"value": {"key": "value"},
159159
}
160160

161-
@conf_vars({("core", "enable_xcom_pickling"): "True"})
162-
def test_should_respond_200_native_for_pickled(self):
163-
dag_id = "test-dag-id"
164-
task_id = "test-task-id"
165-
logical_date = "2005-04-02T00:00:00+00:00"
166-
xcom_key = "test-xcom-key"
167-
logical_date_parsed = timezone.parse(logical_date)
168-
run_id = DagRun.generate_run_id(DagRunType.MANUAL, logical_date_parsed)
169-
value_non_serializable_key = {("201009_NB502104_0421_AHJY23BGXG (SEQ_WF: 138898)", None): 82359}
170-
self._create_xcom_entry(
171-
dag_id, run_id, logical_date_parsed, task_id, xcom_key, {"key": value_non_serializable_key}
172-
)
173-
response = self.client.get(
174-
f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}",
175-
environ_overrides={"REMOTE_USER": "test"},
176-
)
177-
assert 200 == response.status_code
178-
179-
current_data = response.json
180-
current_data["timestamp"] = "TIMESTAMP"
181-
assert current_data == {
182-
"dag_id": dag_id,
183-
"logical_date": logical_date,
184-
"key": xcom_key,
185-
"task_id": task_id,
186-
"map_index": -1,
187-
"timestamp": "TIMESTAMP",
188-
"value": f"{{'key': {str(value_non_serializable_key)}}}",
189-
}
190-
191161
def test_should_raise_404_for_non_existent_xcom(self):
192162
dag_id = "test-dag-id"
193163
task_id = "test-task-id"

tests/api_connexion/schemas/test_xcom_schema.py

-51
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,18 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19-
import pickle
20-
2119
import pytest
2220
from sqlalchemy import or_, select
2321

2422
from airflow.api_connexion.schemas.xcom_schema import (
2523
XComCollection,
2624
xcom_collection_item_schema,
2725
xcom_collection_schema,
28-
xcom_schema_string,
2926
)
3027
from airflow.models import DagRun, XCom
3128
from airflow.utils import timezone
3229
from airflow.utils.session import create_session
3330

34-
from tests_common.test_utils.config import conf_vars
35-
3631
pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
3732

3833

@@ -184,49 +179,3 @@ def test_serialize(self, create_xcom, session):
184179
"total_entries": 2,
185180
},
186181
)
187-
188-
189-
class TestXComSchema:
190-
default_time = "2016-04-02T21:00:00+00:00"
191-
default_time_parsed = timezone.parse(default_time)
192-
193-
@conf_vars({("core", "enable_xcom_pickling"): "True"})
194-
def test_serialize(self, create_xcom, session):
195-
create_xcom(
196-
dag_id="test_dag",
197-
task_id="test_task_id",
198-
logical_date=self.default_time_parsed,
199-
key="test_key",
200-
value=pickle.dumps(b"test_binary"),
201-
)
202-
xcom_model = session.query(XCom).first()
203-
deserialized_xcom = xcom_schema_string.dump(xcom_model)
204-
assert deserialized_xcom == {
205-
"key": "test_key",
206-
"timestamp": self.default_time,
207-
"logical_date": self.default_time,
208-
"task_id": "test_task_id",
209-
"dag_id": "test_dag",
210-
"value": "test_binary",
211-
"map_index": -1,
212-
}
213-
214-
@conf_vars({("core", "enable_xcom_pickling"): "True"})
215-
def test_deserialize(self):
216-
xcom_dump = {
217-
"key": "test_key",
218-
"timestamp": self.default_time,
219-
"logical_date": self.default_time,
220-
"task_id": "test_task_id",
221-
"dag_id": "test_dag",
222-
"value": b"test_binary",
223-
}
224-
result = xcom_schema_string.load(xcom_dump)
225-
assert result == {
226-
"key": "test_key",
227-
"timestamp": self.default_time_parsed,
228-
"logical_date": self.default_time_parsed,
229-
"task_id": "test_task_id",
230-
"dag_id": "test_dag",
231-
"value": "test_binary",
232-
}

tests/api_fastapi/core_api/routes/public/test_xcom.py

-21
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636

3737
TEST_XCOM_KEY = "test_xcom_key"
3838
TEST_XCOM_VALUE = {"key": "value"}
39-
TEST_XCOM_KEY2 = "test_xcom_key_non_serializable"
40-
TEST_XCOM_VALUE2 = {"key": {("201009_NB502104_0421_AHJY23BGXG (SEQ_WF: 138898)", None): 82359}}
4139
TEST_XCOM_KEY3 = "test_xcom_key_non_existing"
4240

4341
TEST_DAG_ID = "test-dag-id"
@@ -140,25 +138,6 @@ def test_should_respond_200_native(self, test_client):
140138
"value": TEST_XCOM_VALUE,
141139
}
142140

143-
@conf_vars({("core", "enable_xcom_pickling"): "True"})
144-
def test_should_respond_200_pickled(self, test_client):
145-
self.create_xcom(TEST_XCOM_KEY2, TEST_XCOM_VALUE2)
146-
response = test_client.get(
147-
f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY2}"
148-
)
149-
assert response.status_code == 200
150-
151-
current_data = response.json()
152-
assert current_data == {
153-
"dag_id": TEST_DAG_ID,
154-
"logical_date": logical_date_parsed.strftime("%Y-%m-%dT%H:%M:%SZ"),
155-
"key": TEST_XCOM_KEY2,
156-
"task_id": TEST_TASK_ID,
157-
"map_index": -1,
158-
"timestamp": current_data["timestamp"],
159-
"value": str(TEST_XCOM_VALUE2),
160-
}
161-
162141
def test_should_raise_404_for_non_existent_xcom(self, test_client):
163142
response = test_client.get(
164143
f"/public/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY3}"

0 commit comments

Comments
 (0)