Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error ingesting metadata from Airflow #17821

Open
edallastella opened this issue Sep 12, 2024 · 0 comments
Open

Error ingesting metadata from Airflow #17821

edallastella opened this issue Sep 12, 2024 · 0 comments
Assignees
Labels
bug Something isn't working Ingestion validation

Comments

@edallastella
Copy link

Affected module
Impacts the ingestion framework.

Describe the bug
Basically when I try to ingest metadata from Airflow, I get an error from Pydantic. The ingestion dag is marked as successfull but no metadata is ingested.

To Reproduce

  1. Create a ingestion dag for Airflow
  2. Run it
  3. You should receive the error on the ingestion logs

Expected behavior
Expected to ingest Airflow's metadata successfully as it was ingested on previous versions.

Version:

  • OS: Running on Kubernetes based on ARM64
  • Python version: 3.10
  • OpenMetadata version: 1.5.1
  • OpenMetadata Ingestion package version: Using OM's ingestion image v1.5.1 as well.

Additional context
Logs from the OM server pod:
10.49.67.62 - - [09/Sep/2024:18:28:10 +0000] "GET /api/v1/services/pipelineServices/name/NonProd_Airflow_Ingestion HTTP/1.1" 200 657 "-" "python-requests/2.31.0" 39 ││ 10.49.67.17 - - [09/Sep/2024:18:28:11 +0000] "POST /api/v1/services/ingestionPipelines/trigger/6fe7a223-e2b5-453d-aa03-3ac274cc9436 HTTP/1.1" 200 326 "https://omd.nonp ││ rod.eaap.private.wiley.host/service/pipelineServices/NonProd_Airflow_Ingestion/ingestions" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like G ││ ecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0" 989 ││ 10.49.65.28 - - [09/Sep/2024:18:28:24 +0000] "GET / HTTP/1.1" 200 591 "-" "ELB-HealthChecker/2.0" 1 ││ 10.49.66.225 - - [09/Sep/2024:18:28:24 +0000] "GET / HTTP/1.1" 200 591 "-" "ELB-HealthChecker/2.0" 0 ││ 10.49.67.17 - - [09/Sep/2024:18:28:24 +0000] "GET / HTTP/1.1" 200 591 "-" "ELB-HealthChecker/2.0" 1 ││ 10.49.67.62 - - [09/Sep/2024:18:28:25 +0000] "GET /api/v1/system/version HTTP/1.1" 200 99 "-" "python-requests/2.31.0" 1 ││ 10.49.67.62 - - [09/Sep/2024:18:28:25 +0000] "GET /api/v1/system/version HTTP/1.1" 200 99 "-" "python-requests/2.31.0" 1 ││ 10.49.67.62 - - [09/Sep/2024:18:28:25 +0000] "GET /api/v1/services/databaseServices/name/Morris_Snowflake HTTP/1.1" 200 2179 "-" "python-requests/2.31.0" 52 ││ 10.49.67.62 - - [09/Sep/2024:18:28:27 +0000] "GET /api/v1/system/version HTTP/1.1" 200 99 "-" "python-requests/2.31.0" 2 ││ 10.49.67.62 - - [09/Sep/2024:18:28:27 +0000] "GET /api/v1/system/version HTTP/1.1" 200 99 "-" "python-requests/2.31.0" 1 ││ 10.49.67.62 - - [09/Sep/2024:18:28:27 +0000] "GET /api/v1/services/databaseServices/name/Morris_Snowflake HTTP/1.1" 200 2179 "-" "python-requests/2.31.0" 46 ││ 10.49.67.62 - - [09/Sep/2024:18:28:28 +0000] "GET /api/v1/system/version HTTP/1.1" 200 99 "-" "python-requests/2.31.0" 1 ││ 10.49.67.62 - - [09/Sep/2024:18:28:28 +0000] "GET /api/v1/system/version HTTP/1.1" 200 99 "-" "python-requests/2.31.0" 1 ││ 10.49.67.62 - - [09/Sep/2024:18:28:30 +0000] "GET /api/v1/system/version HTTP/1.1" 200 99 "-" "python-requests/2.31.0" 2 ││ 10.49.67.62 - - [09/Sep/2024:18:28:30 +0000] "GET /api/v1/system/version HTTP/1.1" 200 99 "-" "python-requests/2.31.0" 1 ││ 10.49.67.62 - - [09/Sep/2024:18:28:30 +0000] "GET /api/v1/services/databaseServices/name/Morris_Snowflake HTTP/1.1" 200 2179 "-" "python-requests/2.31.0" 44 ││ ERROR [2024-09-09 18:28:36,482] [DefaultQuartzScheduler_Worker-10] o.o.s.a.b.c.AbstractEventConsumer - Error in executing the Job : Invalid field name owner ││ ERROR [2024-09-09 18:28:36,492] [DefaultQuartzScheduler_Worker-4] o.o.s.a.b.c.AbstractEventConsumer - Error in executing the Job : Invalid field name owner ││ 169.254.7.127 - - [09/Sep/2024:18:28:37 +0000] "GET / HTTP/1.1" 200 2064 "-" "kube-probe/1.29+" 1 ││ 169.254.7.127 - - [09/Sep/2024:18:28:37 +0000] "GET /healthcheck HTTP/1.1" 200 263 "-" "kube-probe/1.29+" 2 ││ 10.49.65.28 - - [09/Sep/2024:18:28:39 +0000] "GET / HTTP/1.1" 200 591 "-" "ELB-HealthChecker/2.0" 1 ││ 10.49.66.225 - - [09/Sep/2024:18:28:39 +0000] "GET / HTTP/1.1" 200 591 "-" "ELB-HealthChecker/2.0" 1 ││ 10.49.67.17 - - [09/Sep/2024:18:28:39 +0000] "GET / HTTP/1.1" 200 591 "-" "ELB-HealthChecker/2.0" 1

Logs from the ingestion pod:
/home/airflow/.local/lib/python3.10/site-packages/flask_limiter/extension.py:337 UserWarning: Using the in-memory storage for tracking rate limits as no storage was explicitly specified. This is not recommended for production use. See: https://flask-limiter.readthedocs.io#configuring-a-storage-backend for documentation about configuring the storage backend. [2024-09-09T18:27:11.408+0000] {dagbag.py:545} INFO - Filling up the DagBag from /mnt/efs/airflow-dags/f7b8ae63-124c-47d8-8c61-b020216b3725.py [2024-09-09T18:27:11.607+0000] {server_mixin.py:74} INFO - OpenMetadata client running with Server version [1.5.1] and Client version [1.5.1.0] [2024-09-09T18:27:11.655+0000] {workflow_factory.py:85} INFO - Registered the dag: f7b8ae63-124c-47d8-8c61-b020216b3725 [2024-09-09T18:27:11.753+0000] {task_command.py:426} INFO - Running <TaskInstance: f7b8ae63-124c-47d8-8c61-b020216b3725.ingestion_task manual__2024-09-09T18:26:54+00:00 [queued]> on host f7b8ae63-124c-47d8-8c61-b020216b3725-ingestion-task-1ndhold5 [2024-09-09T18:27:12.140+0000] {local_task_job_runner.py:120} INFO - ::group::Pre task execution logs [2024-09-09T18:27:12.191+0000] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: f7b8ae63-124c-47d8-8c61-b020216b3725.ingestion_task manual__2024-09-09T18:26:54+00:00 [queued]> [2024-09-09T18:27:12.202+0000] {taskinstance.py:2076} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: f7b8ae63-124c-47d8-8c61-b020216b3725.ingestion_task manual__2024-09-09T18:26:54+00:00 [queued]> [2024-09-09T18:27:12.202+0000] {taskinstance.py:2306} INFO - Starting attempt 1 of 1 [2024-09-09T18:27:12.221+0000] {taskinstance.py:2330} INFO - Executing <Task(CustomPythonOperator): ingestion_task> on 2024-09-09 18:26:54+00:00 [2024-09-09T18:27:12.231+0000] {standard_task_runner.py:63} INFO - Started process 84 to run task [2024-09-09T18:27:12.234+0000] {standard_task_runner.py:90} INFO - Running: ['airflow', 'tasks', 'run', 'f7b8ae63-124c-47d8-8c61-b020216b3725', 'ingestion_task', 'manual__2024-09-09T18:26:54+00:00', '--job-id', '4064', '--raw', '--subdir', 'DAGS_FOLDER/f7b8ae63-124c-47d8-8c61-b020216b3725.py', '--cfg-path', '/tmp/tmp9qs_lypo'] [2024-09-09T18:27:12.235+0000] {standard_task_runner.py:91} INFO - Job 4064: Subtask ingestion_task [2024-09-09T18:27:12.580+0000] {task_command.py:426} INFO - Running <TaskInstance: f7b8ae63-124c-47d8-8c61-b020216b3725.ingestion_task manual__2024-09-09T18:26:54+00:00 [running]> on host f7b8ae63-124c-47d8-8c61-b020216b3725-ingestion-task-1ndhold5 [2024-09-09T18:27:13.312+0000] {logging_mixin.py:188} WARNING - /home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/template_rendering.py:46 AirflowProviderDeprecationWarning: This function is deprecated. Please use create_unique_id. [2024-09-09T18:27:13.313+0000] {logging_mixin.py:188} WARNING - /home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py:145 AirflowProviderDeprecationWarning: This function is deprecated. Please use add_unique_suffix. [2024-09-09T18:27:13.314+0000] {pod_generator.py:557} WARNING - Model file /opt/airflow/pod_templates/pod_template.yaml does not exist [2024-09-09T18:27:13.400+0000] {taskinstance.py:2648} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='edallastel' AIRFLOW_CTX_DAG_ID='f7b8ae63-124c-47d8-8c61-b020216b3725' AIRFLOW_CTX_TASK_ID='ingestion_task' AIRFLOW_CTX_EXECUTION_DATE='2024-09-09T18:26:54+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-09-09T18:26:54+00:00' [2024-09-09T18:27:13.401+0000] {taskinstance.py:430} INFO - ::endgroup:: [2024-09-09T18:27:13.448+0000] {server_mixin.py:74} INFO - OpenMetadata client running with Server version [1.5.1] and Client version [1.5.1.0] [2024-09-09T18:27:13.967+0000] {ingestion_pipeline_mixin.py:52} DEBUG - Created Pipeline Status for pipeline NonProd_Airflow_Ingestion.f7b8ae63-124c-47d8-8c61-b020216b3725: runId='b6483301-8485-4f5e-8e87-99cceaee705a' pipelineState=<PipelineState.running: 'running'> startDate=Timestamp(root=1725906433434) timestamp=Timestamp(root=1725906433434) endDate=None status=None [2024-09-09T18:27:15.130+0000] {test_connections.py:221} INFO - Test connection results: [2024-09-09T18:27:15.130+0000] {test_connections.py:222} INFO - failed=] success=["'CheckAccess': Pass"] warning=] [2024-09-09T18:27:15.131+0000] {metadata.py:69} DEBUG - Source type:airflow,<class 'metadata.ingestion.source.pipeline.airflow.metadata.AirflowSource'> configured [2024-09-09T18:27:15.131+0000] {metadata.py:71} DEBUG - Source type:airflow,<class 'metadata.ingestion.source.pipeline.airflow.metadata.AirflowSource'> prepared [2024-09-09T18:27:15.676+0000] {metadata.py:80} DEBUG - Sink type:metadata-rest, <class 'metadata.ingestion.sink.metadata_rest.MetadataRestSink'> configured [2024-09-09T18:27:15.676+0000] {topology_runner.py:166} DEBUG - Processing node producer='get_services' stages=[NodeStage(type_=<class 'metadata.generated.schema.entity.services.pipelineService.PipelineService'>, processor='yield_create_request_pipeline_service', nullable=False, must_return=True, overwrite=False, consumer=None, context='pipeline_service', store_all_in_context=False, clear_context=False, store_fqn=False, cache_entities=True, use_cache=False)] children=['pipeline'] post_process=['mark_pipelines_as_deleted'] threads=False [2024-09-09T18:27:15.677+0000] {topology_runner.py:231} DEBUG - Processing stage: type_=<class 'metadata.generated.schema.entity.services.pipelineService.PipelineService'> processor='yield_create_request_pipeline_service' nullable=False must_return=True overwrite=False consumer=None context='pipeline_service' store_all_in_context=False clear_context=False store_fqn=False cache_entities=True use_cache=False [2024-09-09T18:27:15.888+0000] {topology_runner.py:166} DEBUG - Processing node producer='get_pipeline' stages=[NodeStage(type_=<class 'metadata.ingestion.models.ometa_classification.OMetaTagAndClassification'>, processor='yield_tag', nullable=True, must_return=False, overwrite=True, consumer=None, context='tags', store_all_in_context=False, clear_context=False, store_fqn=False, cache_entities=False, use_cache=False), NodeStage(type_=<class 'metadata.generated.schema.entity.data.pipeline.Pipeline'>, processor='yield_pipeline', nullable=False, must_return=False, overwrite=True, consumer=['pipeline_service'], context='pipeline', store_all_in_context=False, clear_context=False, store_fqn=False, cache_entities=False, use_cache=True), NodeStage(type_=<class 'metadata.ingestion.models.pipeline_status.OMetaPipelineStatus'>, processor='yield_pipeline_status', nullable=True, must_return=False, overwrite=True, consumer=['pipeline_service'], context=None, store_all_in_context=False, clear_context=False, store_fqn=False, cache_entities=False, use_cache=False), NodeStage(type_=<class 'metadata.generated.schema.api.lineage.addLineage.AddLineageRequest'>, processor='yield_pipeline_lineage', nullable=True, must_return=False, overwrite=True, consumer=['pipeline_service'], context=None, store_all_in_context=False, clear_context=False, store_fqn=False, cache_entities=False, use_cache=False)] children=None post_process=None threads=False [2024-09-09T18:27:15.914+0000] {metadata.py:338} DEBUG - Traceback (most recent call last): File "/home/airflow/.local/lib/python3.10/site-packages/metadata/ingestion/source/pipeline/airflow/metadata.py", line 324, in get_pipelines_list dag = AirflowDagDetails( File "/home/airflow/.local/lib/python3.10/site-packages/pydantic/main.py", line 176, in __init__ self.__pydantic_validator__.validate_python(data, self_instance=self) pydantic_core._pydantic_core.ValidationError: 4 validation errors for AirflowDagDetails tasks.0.task_id Field required [type=missing, input_value={'__var': {'is_setup': Fa...}, '__type': 'operator'}, input_type=dict] For further information visit https://errors.pydantic.dev/2.7/v/missing tasks.1.task_id Field required [type=missing, input_value={'__var': {'is_setup': Fa...}, '__type': 'operator'}, input_type=dict] For further information visit https://errors.pydantic.dev/2.7/v/missing tasks.2.task_id Field required [type=missing, input_value={'__var': {'is_setup': Fa...}, '__type': 'operator'}, input_type=dict] For further information visit https://errors.pydantic.dev/2.7/v/missing tasks.3.task_id Field required [type=missing, input_value={'__var': {'is_setup': Fa...}, '__type': 'operator'}, input_type=dict] For further information visit https://errors.pydantic.dev/2.7/v/missing [2024-09-09T18:27:15.915+0000] {metadata.py:339} WARNING - Error building pydantic model for ('blueprint-test-basic', {'__version': 1, 'dag': {'catchup': False, 'fileloc': '/opt/airflow/dags/repo/src/airflow/dags/dai_gigahorse/pocs/blueprint/basic/basic.py', 'default_ ... (8854 characters truncated) ... mpty', '_is_empty': True, 'start_trigger_args': None}, '__type': 'operator'}], 'dag_dependencies': ], 'params': ], 'has_on_failure_callback': True}}, '/opt/airflow/dags/repo/src/airflow/dags/dai_gigahorse/pocs/blueprint/basic/basic.py') - 4 validation errors for AirflowDagDetails tasks.0.task_id Field required [type=missing, input_value={'__var': {'is_setup': Fa...}, '__type': 'operator'}, input_type=dict] For further information visit https://errors.pydantic.dev/2.7/v/missing tasks.1.task_id Field required [type=missing, input_value={'__var': {'is_setup': Fa...}, '__type': 'operator'}, input_type=dict] For further information visit https://errors.pydantic.dev/2.7/v/missing tasks.2.task_id Field required [type=missing, input_value={'__var': {'is_setup': Fa...}, '__type': 'operator'}, input_type=dict] For further information visit https://errors.pydantic.dev/2.7/v/missing tasks.3.task_id Field required [type=missing, input_value={'__var': {'is_setup': Fa...}, '__type': 'operator'}, input_type=dict] For further information visit https://errors.pydantic.dev/2.7/v/missing [2024-09-09T18:27:15.915+0000] {metadata.py:338} DEBUG - Traceback (most recent call last): File "/home/airflow/.local/lib/python3.10/site-packages/metadata/ingestion/source/pipeline/airflow/metadata.py", line 324, in get_pipelines_list dag = AirflowDagDetails( File "/home/airflow/.local/lib/python3.10/site-packages/pydantic/main.py", line 176, in __init__ self.__pydantic_validator__.validate_python(data, self_instance=self) pydantic_core._pydantic_core.ValidationError: 1 validation error for AirflowDagDetails tasks.0.task_id Field required [type=missing, input_value={'__var': {'is_setup': Fa...}, '__type': 'operator'}, input_type=dict] For further information visit https://errors.pydantic.dev/2.7/v/missing [2024-09-09T18:27:15.915+0000] {metadata.py:339} WARNING - Error building pydantic model for ('alert_dag', {'__version': 1, 'dag': {'schedule_interval': {'__var': 86400.0, '__type': 'timedelta'}, '_task_group': {'_group_id': None, 'prefix_group_id': True, ' ... (1636 characters truncated) ... thon', '_is_empty': False, 'start_trigger_args': None, 'op_args': ], 'op_kwargs': {}}, '__type': 'operator'}], 'dag_dependencies': ], 'params': ]}}, '/opt/airflow/dags/repo/src/airflow/dags/dai-gigahorse/pocs/dag-notification.py') - 1 validation error for AirflowDagDetails tasks.0.task_id Field required [type=missing, input_value={'__var': {'is_setup': Fa...}, '__type': 'operator'}, input_type=dict] For further information visit https://errors.pydantic.dev/2.7/v/missing [2024-09-09T18:27:15.916+0000] {metadata.py:338} DEBUG - Traceback (most recent call last): File "/home/airflow/.local/lib/python3.10/site-packages/metadata/ingestion/source/pipeline/airflow/metadata.py", line 324, in get_pipelines_list dag = AirflowDagDetails( File "/home/airflow/.local/lib/python3.10/site-packages/pydantic/main.py", line 176, in __init__ self.__pydantic_validator__.validate_python(data, self_instance=self) pydantic_core._pydantic_core.ValidationError: 1 validation error for AirflowDagDetails tasks.0.task_id Field required [type=missing, input_value={'__var': {'_needs_expans...}, '__type': 'operator'}, input_type=dict] For further information visit https://errors.pydantic.dev/2.7/v/missing [2024-09-09T18:27:15.916+0000] {metadata.py:339} WARNING - Error building pydantic model for ('poc-gusty', {'__version': 1, 'dag': {'schedule_interval': {'__var': 86400.0, '__type': 'timedelta'}, 'timezone': 'UTC', 'dagrun_timeout': 1800.0, 'fileloc': '/opt ... (1241 characters truncated) ... task', '_is_empty': False, 'start_tr...

@ayush-shah ayush-shah added bug Something isn't working Ingestion validation labels Sep 13, 2024
@ayush-shah ayush-shah self-assigned this Sep 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Ingestion validation
Projects
None yet
Development

No branches or pull requests

2 participants