diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 5982a465603ade..c3f918db95bf0d 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -362,7 +362,12 @@ def _run_raw_task( events = context["outlet_events"] for obj in ti.task.outlets or []: # Lineage can have other types of objects besides assets - asset_type = type(obj).__name__ + # + # The asset_type here is not Asset.asset_type but the obj type instead. + # Only Asset is expected to be subclassed so + # any subclass of Asset should be set as Asset to be handled correctly. + # TODO: We should rename it later. + asset_type = "Asset" if isinstance(obj, Asset) else type(obj).__name__ if isinstance(obj, Asset): task_outlets.append(AssetProfile(name=obj.name, uri=obj.uri, asset_type=asset_type)) outlet_events.append(attrs.asdict(events[obj])) # type: ignore diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 0e364b700ca4ef..e498b1f46c2a5b 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -523,7 +523,12 @@ def _process_outlets(context: Context, outlets: list[AssetProfile]): for obj in outlets or []: # Lineage can have other types of objects besides assets - asset_type = type(obj).__name__ + # + # The asset_type here is not Asset.asset_type but the obj type instead. + # Only Asset is expected to be subclassed so + # any subclass of Asset should be set as Asset to be handled correctly. + # TODO: We should rename it later. + asset_type = "Asset" if isinstance(obj, Asset) else type(obj).__name__ if isinstance(obj, Asset): task_outlets.append(AssetProfile(name=obj.name, uri=obj.uri, asset_type=asset_type)) outlet_events.append(attrs.asdict(events[obj])) # type: ignore