Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(taskinstnace): set internal asset_type of all Asset subclass to "Asset" #47598

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,12 @@ def _run_raw_task(
events = context["outlet_events"]
for obj in ti.task.outlets or []:
# Lineage can have other types of objects besides assets
asset_type = type(obj).__name__
#
# The asset_type here is not Asset.asset_type but the obj type instead.
# Only Asset is expected to be subclassed so
# any subclass of Asset should be set as Asset to be handled correctly.
# TODO: We should rename it or probably rework the logic later.
asset_type = "Asset" if isinstance(obj, Asset) else type(obj).__name__
if isinstance(obj, Asset):
task_outlets.append(AssetProfile(name=obj.name, uri=obj.uri, asset_type=asset_type))
outlet_events.append(attrs.asdict(events[obj])) # type: ignore
Expand Down
7 changes: 6 additions & 1 deletion task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,12 @@ def _process_outlets(context: Context, outlets: list[AssetProfile]):

for obj in outlets or []:
# Lineage can have other types of objects besides assets
asset_type = type(obj).__name__
#
# The asset_type here is not Asset.asset_type but the obj type instead.
# Only Asset is expected to be subclassed so
# any subclass of Asset should be set as Asset to be handled correctly.
# TODO: We should rename it or probably rework the logic later.
asset_type = "Asset" if isinstance(obj, Asset) else type(obj).__name__
if isinstance(obj, Asset):
task_outlets.append(AssetProfile(name=obj.name, uri=obj.uri, asset_type=asset_type))
outlet_events.append(attrs.asdict(events[obj])) # type: ignore
Expand Down
41 changes: 40 additions & 1 deletion task-sdk/tests/task_sdk/execution_time/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
TaskInstance,
TerminalTIState,
)
from airflow.sdk.definitions.asset import Asset, AssetAlias
from airflow.sdk.definitions.asset import Asset, AssetAlias, Dataset, Model

# from airflow.sdk.definitions.asset.decorators import AssetDefinition
from airflow.sdk.definitions.param import DagParam
from airflow.sdk.definitions.variable import Variable
from airflow.sdk.execution_time.comms import (
Expand Down Expand Up @@ -703,6 +705,43 @@ def test_dag_parsing_context(make_ti_context, mock_supervisor_comms, monkeypatch
),
id="asset",
),
pytest.param(
[Dataset(name="s3://bucket/my-task", uri="s3://bucket/my-task")],
SucceedTask(
state="success",
end_date=timezone.datetime(2024, 12, 3, 10, 0),
task_outlets=[
AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", asset_type="Asset")
],
outlet_events=[
{
"key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"},
"extra": {},
"asset_alias_events": [],
}
],
),
id="dataset",
),
pytest.param(
[Model(name="s3://bucket/my-task", uri="s3://bucket/my-task")],
SucceedTask(
state="success",
end_date=timezone.datetime(2024, 12, 3, 10, 0),
task_outlets=[
AssetProfile(name="s3://bucket/my-task", uri="s3://bucket/my-task", asset_type="Asset")
],
outlet_events=[
{
"key": {"name": "s3://bucket/my-task", "uri": "s3://bucket/my-task"},
"extra": {},
"asset_alias_events": [],
}
],
),
id="model",
),
# TODO: asset name ref and asset uri ref are not checked and should be added later
pytest.param(
[AssetAlias(name="example-alias", group="asset")],
SucceedTask(
Expand Down
Loading