Skip to content

Commit

Permalink
fix(taskinstnace): set the asset_type of all Asset subclass to "Asset"
Browse files Browse the repository at this point in the history
Currently, asset_type is used to decide how we register asset event.
(note that the asset_type here is object type instead of Asset.asset_type)
As Asset might be subclass, all instances of Asset subclasses should have "Asset" as their asset_type
  • Loading branch information
Lee-W committed Mar 11, 2025
1 parent 637525c commit 41d56c8
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
7 changes: 6 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,12 @@ def _run_raw_task(
events = context["outlet_events"]
for obj in ti.task.outlets or []:
# Lineage can have other types of objects besides assets
asset_type = type(obj).__name__
#
# The asset_type here is not Asset.asset_type but the obj type instead.
# Only Asset is expected to be subclassed so
# any subclass of Asset should be set as Asset to be handled correctly.
# TODO: We should rename it later.
asset_type = "Asset" if isinstance(obj, Asset) else type(obj).__name__
if isinstance(obj, Asset):
task_outlets.append(AssetProfile(name=obj.name, uri=obj.uri, asset_type=asset_type))
outlet_events.append(attrs.asdict(events[obj])) # type: ignore
Expand Down
7 changes: 6 additions & 1 deletion task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,12 @@ def _process_outlets(context: Context, outlets: list[AssetProfile]):

for obj in outlets or []:
# Lineage can have other types of objects besides assets
asset_type = type(obj).__name__
#
# The asset_type here is not Asset.asset_type but the obj type instead.
# Only Asset is expected to be subclassed so
# any subclass of Asset should be set as Asset to be handled correctly.
# TODO: We should rename it later.
asset_type = "Asset" if isinstance(obj, Asset) else type(obj).__name__
if isinstance(obj, Asset):
task_outlets.append(AssetProfile(name=obj.name, uri=obj.uri, asset_type=asset_type))
outlet_events.append(attrs.asdict(events[obj])) # type: ignore
Expand Down

0 comments on commit 41d56c8

Please sign in to comment.