From c2980939ba7b9b947fd6d870a9e134a8ea2726f0 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 11 Mar 2025 15:55:24 +0800 Subject: [PATCH] test(task-sdk): extend test_run_with_asset_outlets with asset subclasses --- .../execution_time/test_task_runner.py | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 4066bb1eadfd3..b66b36254677d 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -52,7 +52,9 @@ TaskInstance, TerminalTIState, ) -from airflow.sdk.definitions.asset import Asset, AssetAlias +from airflow.sdk.definitions.asset import Asset, AssetAlias, Dataset, Model + +# from airflow.sdk.definitions.asset.decorators import AssetDefinition from airflow.sdk.definitions.param import DagParam from airflow.sdk.definitions.variable import Variable from airflow.sdk.execution_time.comms import ( @@ -703,6 +705,43 @@ def test_dag_parsing_context(make_ti_context, mock_supervisor_comms, monkeypatch ), id="asset", ), + pytest.param( + [Dataset(name="s3://bucket/my-task", uri="s3://bucket/my-task")], + SucceedTask( + state="success", + end_date=timezone.datetime(2024, 12, 3, 10, 0), + task_outlets=[ + AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", asset_type="Asset") + ], + outlet_events=[ + { + "key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"}, + "extra": {}, + "asset_alias_events": [], + } + ], + ), + id="dataset", + ), + pytest.param( + [Model(name="s3://bucket/my-task", uri="s3://bucket/my-task")], + SucceedTask( + state="success", + end_date=timezone.datetime(2024, 12, 3, 10, 0), + task_outlets=[ + AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", asset_type="Asset") + ], + outlet_events=[ + { + "key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"}, + "extra": {}, + "asset_alias_events": [], + } + ], + ), + id="model", + ), + # TODO: asset name ref and asset uri ref are not checked and should be added later pytest.param( [AssetAlias(name="example-alias", group="asset")], SucceedTask(