Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize secrets backend over DB for retrieving connections #47593

Merged
merged 3 commits into from
Mar 11, 2025

Conversation

amoghrajesh
Copy link
Contributor

#47048 brought in support to retrieve connections from BaseHook using the new task sdk mechanism. This however, deprioritised secrets backend. The real reason for this is because #45435 is not done yet.

However, this could lead to things breaking, so I plan to tweak things over temporarily till #45435 is fixed.

Testing

  1. Set up a secrets backend with these in init.sh breeze
export AIRFLOW__SECRETS__BACKEND="airflow.secrets.local_filesystem.LocalFilesystemBackend"
export AIRFLOW__SECRETS__BACKEND_KWARGS='{"connections_file_path": "/files/conn.json"}'
  1. Had the conn.json contain two connections:
{
    "CONN_A": "mysql://host_a",
    "CONN_B": {
        "conn_type": "scheme",
        "host": "host",
        "schema": "schema",
        "login": "Login",
        "password": "None",
        "port": "1234"
    }
}
  1. Dag used for testing:
from __future__ import annotations

from airflow.hooks.base import BaseHook
from airflow.models.baseoperator import BaseOperator
from airflow import DAG

class CustomOperator(BaseOperator):
    def execute(self, context):
        c = BaseHook.get_connection(conn_id="CONN_A")
        print("The connection is", c)


with DAG("example_get_connection", schedule=None, catchup=False) as dag:
    CustomOperator(task_id="print_conn")

Scenario 1: Backend configured

image (21)

Logs:

[2025-03-11, 05:46:38] DEBUG - Loading plugins source="airflow.plugins_manager"
[2025-03-11, 05:46:38] DEBUG - Loading plugins from directory: /files/plugins source="airflow.plugins_manager"
[2025-03-11, 05:46:38] DEBUG - Note: Loading plugins from examples as well: /files/plugins source="airflow.plugins_manager"
[2025-03-11, 05:46:38] ERROR - Failed to import plugin /files/plugins/my_xcom.py source="airflow.plugins_manager" error_detail=[{"exc_type":"ModuleNotFoundError","exc_value":"No module named 'airflow.sdk.execution_time.xcom'","syntax_error":null,"is_cause":false,"frames":[{"filename":"/opt/airflow/airflow/plugins_manager.py","lineno":290,"name":"load_plugins_from_plugin_directory"},{"filename":"<frozen importlib._bootstrap_external>","lineno":850,"name":"exec_module"},{"filename":"<frozen importlib._bootstrap>","lineno":228,"name":"_call_with_frames_removed"},{"filename":"/files/plugins/my_xcom.py","lineno":7,"name":"<module>"}]}]
[2025-03-11, 05:46:39] DEBUG - Loading plugins from entrypoints source="airflow.plugins_manager"
[2025-03-11, 05:46:39] DEBUG - Importing entry_point plugin openlineage source="airflow.plugins_manager"
[2025-03-11, 05:46:39] DEBUG - Importing entry_point plugin hive source="airflow.plugins_manager"
[2025-03-11, 05:46:39] DEBUG - Importing entry_point plugin databricks_workflow source="airflow.plugins_manager"
[2025-03-11, 05:46:39] DEBUG - Importing entry_point plugin edge_executor source="airflow.plugins_manager"
[2025-03-11, 05:46:39] DEBUG - Loading 8 plugin(s) took 760.41 seconds source="airflow.plugins_manager"
[2025-03-11, 05:46:39] DEBUG - Calling 'on_starting' with {'component': <airflow.sdk.execution_time.task_runner.TaskRunnerMarker object at 0xffff9cd4a670>} source="airflow.listeners.listener"
[2025-03-11, 05:46:39] DEBUG - Hook impls: [] source="airflow.listeners.listener"
[2025-03-11, 05:46:39] DEBUG - Result from 'on_starting': [] source="airflow.listeners.listener"
[2025-03-11, 05:46:39] INFO - DAG bundles loaded: dags-folder, example_dags source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-03-11, 05:46:39] INFO - Filling up the DagBag from /files/dags/dags/get_connection_basehook.py source="airflow.models.dagbag.DagBag"
[2025-03-11, 05:46:39] DEBUG - Importing /files/dags/dags/get_connection_basehook.py source="airflow.models.dagbag.DagBag"
[2025-03-11, 05:46:39] DEBUG - Loaded DAG <DAG: example_get_connection> source="airflow.models.dagbag.DagBag"
[2025-03-11, 05:46:39] DEBUG - DAG file parsed file="dags/get_connection_basehook.py" source="task"
[2025-03-11, 05:46:39] DEBUG - Calling 'on_task_instance_running' with {'previous_state': <TaskInstanceState.QUEUED: 'queued'>, 'task_instance': RuntimeTaskInstance(id=UUID('019583bd-a075-7e8b-b7c7-c22717860dbe'), task_id='print_conn', dag_id='example_get_connection', run_id='manual__2025-03-11T05:46:37.549632+00:00_u2dV7Og2', try_number=1, map_index=-1, hostname='d6c721d9430c', task=<Task(CustomOperator): print_conn>, bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, start_date=datetime.datetime(2025, 3, 11, 5, 46, 38, 796295, tzinfo=TzInfo(UTC)))} source="airflow.listeners.listener"
[2025-03-11, 05:46:39] DEBUG - Hook impls: [<HookImpl plugin_name='airflow.example_dags.plugins.event_listener', plugin=<module 'airflow.example_dags.plugins.event_listener' from '/opt/airflow/airflow/example_dags/plugins/event_listener.py'>>] source="airflow.listeners.listener"
[2025-03-11, 05:46:39] INFO - Task instance is in running state chan="stdout" source="task"
[2025-03-11, 05:46:39] INFO - Previous state of the Task instance: queued chan="stdout" source="task"
[2025-03-11, 05:46:39] DEBUG - Result from 'on_task_instance_running': [] source="airflow.listeners.listener"
[2025-03-11, 05:46:39] INFO - Current task name:print_conn chan="stdout" source="task"
[2025-03-11, 05:46:39] INFO - Dag name:example_get_connection chan="stdout" source="task"
[2025-03-11, 05:46:39] DEBUG - Loading connection source="airflow.secrets.local_filesystem"
[2025-03-11, 05:46:39] DEBUG - Parsing file: /files/conn.json source="airflow.secrets.local_filesystem"
[2025-03-11, 05:46:39] DEBUG - Parsed file: len(parse_errors)=0, len(secrets)=2 source="airflow.secrets.local_filesystem"
[2025-03-11, 05:46:39] DEBUG - Loaded 2 connections source="airflow.secrets.local_filesystem"
[2025-03-11, 05:46:39] INFO - {'CONN_A': 'mysql://host_a', 'CONN_B': {'conn_type': 'scheme', 'host': 'host', 'schema': 'schema', 'login': 'Login', 'password': 'None', 'port': '1234', 'conn_id': 'CONN_B'}} chan="stdout" source="task"
[2025-03-11, 05:46:39] INFO - {'CONN_A': CONN_A, 'CONN_B': CONN_B} chan="stdout" source="task"
[2025-03-11, 05:46:39] DEBUG - Loading connection source="airflow.secrets.local_filesystem"
[2025-03-11, 05:46:39] DEBUG - Parsing file: /files/conn.json source="airflow.secrets.local_filesystem"
[2025-03-11, 05:46:39] DEBUG - Parsed file: len(parse_errors)=0, len(secrets)=2 source="airflow.secrets.local_filesystem"
[2025-03-11, 05:46:39] DEBUG - Loaded 2 connections source="airflow.secrets.local_filesystem"
[2025-03-11, 05:46:39] INFO - {'CONN_A': 'mysql://host_a', 'CONN_B': {'conn_type': 'scheme', 'host': 'host', 'schema': 'schema', 'login': 'Login', 'password': 'None', 'port': '1234', 'conn_id': 'CONN_B'}} chan="stdout" source="task"
[2025-03-11, 05:46:39] INFO - {'CONN_A': CONN_A, 'CONN_B': CONN_B} chan="stdout" source="task"
[2025-03-11, 05:46:39] INFO - Retrieving connection 'CONN_A' source="airflow.hooks.base"
[2025-03-11, 05:46:39] INFO - The connection is CONN_A chan="stdout" source="task"
[2025-03-11, 05:46:39] DEBUG - Sending request json="{\"state\":\"success\",\"end_date\":\"2025-03-11T05:46:39.691444Z\",\"task_outlets\":[],\"outlet_events\":[],\"type\":\"SucceedTask\"}\n" source="task"
[2025-03-11, 05:46:39] DEBUG - Running finalizers ti="RuntimeTaskInstance(id=UUID('019583bd-a075-7e8b-b7c7-c22717860dbe'), task_id='print_conn', dag_id='example_get_connection', run_id='manual__2025-03-11T05:46:37.549632+00:00_u2dV7Og2', try_number=1, map_index=-1, hostname='d6c721d9430c', task=<Task(CustomOperator): print_conn>, bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, start_date=datetime.datetime(2025, 3, 11, 5, 46, 38, 796295, tzinfo=TzInfo(UTC)))" source="task"
[2025-03-11, 05:46:39] DEBUG - Calling 'on_task_instance_success' with {'previous_state': <TaskInstanceState.RUNNING: 'running'>, 'task_instance': RuntimeTaskInstance(id=UUID('019583bd-a075-7e8b-b7c7-c22717860dbe'), task_id='print_conn', dag_id='example_get_connection', run_id='manual__2025-03-11T05:46:37.549632+00:00_u2dV7Og2', try_number=1, map_index=-1, hostname='d6c721d9430c', task=<Task(CustomOperator): print_conn>, bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, start_date=datetime.datetime(2025, 3, 11, 5, 46, 38, 796295, tzinfo=TzInfo(UTC)))} source="airflow.listeners.listener"
[2025-03-11, 05:46:39] DEBUG - Hook impls: [<HookImpl plugin_name='airflow.example_dags.plugins.event_listener', plugin=<module 'airflow.example_dags.plugins.event_listener' from '/opt/airflow/airflow/example_dags/plugins/event_listener.py'>>] source="airflow.listeners.listener"
[2025-03-11, 05:46:39] DEBUG - Result from 'on_task_instance_success': [] source="airflow.listeners.listener"
[2025-03-11, 05:46:39] DEBUG - Calling 'before_stopping' with {'component': <airflow.sdk.execution_time.task_runner.TaskRunnerMarker object at 0xffff80179370>} source="airflow.listeners.listener"
[2025-03-11, 05:46:39] DEBUG - Hook impls: [] source="airflow.listeners.listener"
[2025-03-11, 05:46:39] DEBUG - Result from 'before_stopping': [] source="airflow.listeners.listener"
[2025-03-11, 05:46:39] INFO - Task instance in success state chan="stdout" source="task"
[2025-03-11, 05:46:39] INFO - Previous state of the Task instance: running chan="stdout" source="task"
[2025-03-11, 05:46:39] INFO - Task operator:<Task(CustomOperator): print_conn> chan="stdout" source="task"
[2025-03-11, 05:46:39] WARNING - Airflow core logging is not using a FileTaskHandler, can't upload logs to remote handler="<class 'NoneType'>" source="task"

Scenario 2: Same dag but backend not set up

image (22)

Logs:


[2025-03-11, 05:48:29] DEBUG - Disposing DB connection pool (PID 410) source="airflow.settings"
[2025-03-11, 05:48:29] DEBUG - Loading plugins source="airflow.plugins_manager"
[2025-03-11, 05:48:29] DEBUG - Loading plugins from directory: /files/plugins source="airflow.plugins_manager"
[2025-03-11, 05:48:29] DEBUG - Note: Loading plugins from examples as well: /files/plugins source="airflow.plugins_manager"
[2025-03-11, 05:48:29] ERROR - Failed to import plugin /files/plugins/my_xcom.py source="airflow.plugins_manager" error_detail=[{"exc_type":"ModuleNotFoundError","exc_value":"No module named 'airflow.sdk.execution_time.xcom'","syntax_error":null,"is_cause":false,"frames":[{"filename":"/opt/airflow/airflow/plugins_manager.py","lineno":290,"name":"load_plugins_from_plugin_directory"},{"filename":"<frozen importlib._bootstrap_external>","lineno":850,"name":"exec_module"},{"filename":"<frozen importlib._bootstrap>","lineno":228,"name":"_call_with_frames_removed"},{"filename":"/files/plugins/my_xcom.py","lineno":7,"name":"<module>"}]}]
[2025-03-11, 05:48:30] DEBUG - Loading plugins from entrypoints source="airflow.plugins_manager"
[2025-03-11, 05:48:30] DEBUG - Importing entry_point plugin openlineage source="airflow.plugins_manager"
[2025-03-11, 05:48:30] DEBUG - Importing entry_point plugin hive source="airflow.plugins_manager"
[2025-03-11, 05:48:30] DEBUG - Importing entry_point plugin databricks_workflow source="airflow.plugins_manager"
[2025-03-11, 05:48:30] DEBUG - Importing entry_point plugin edge_executor source="airflow.plugins_manager"
[2025-03-11, 05:48:30] DEBUG - Loading 8 plugin(s) took 603.77 seconds source="airflow.plugins_manager"
[2025-03-11, 05:48:30] DEBUG - Calling 'on_starting' with {'component': <airflow.sdk.execution_time.task_runner.TaskRunnerMarker object at 0xffffa8bef760>} source="airflow.listeners.listener"
[2025-03-11, 05:48:30] DEBUG - Hook impls: [] source="airflow.listeners.listener"
[2025-03-11, 05:48:30] DEBUG - Result from 'on_starting': [] source="airflow.listeners.listener"
[2025-03-11, 05:48:30] INFO - DAG bundles loaded: dags-folder, example_dags source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-03-11, 05:48:30] INFO - Filling up the DagBag from /files/dags/dags/get_connection_basehook.py source="airflow.models.dagbag.DagBag"
[2025-03-11, 05:48:30] DEBUG - Importing /files/dags/dags/get_connection_basehook.py source="airflow.models.dagbag.DagBag"
[2025-03-11, 05:48:30] DEBUG - Loaded DAG <DAG: example_get_connection> source="airflow.models.dagbag.DagBag"
[2025-03-11, 05:48:30] DEBUG - DAG file parsed file="dags/get_connection_basehook.py" source="task"
[2025-03-11, 05:48:30] DEBUG - Calling 'on_task_instance_running' with {'previous_state': <TaskInstanceState.QUEUED: 'queued'>, 'task_instance': RuntimeTaskInstance(id=UUID('019583bf-5216-74f7-a2f4-3e67f266e837'), task_id='print_conn', dag_id='example_get_connection', run_id='manual__2025-03-11T05:48:28.558793+00:00_ElHp1F3J', try_number=1, map_index=-1, hostname='e6fe831ca8f1', task=<Task(CustomOperator): print_conn>, bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, start_date=datetime.datetime(2025, 3, 11, 5, 48, 29, 750837, tzinfo=TzInfo(UTC)))} source="airflow.listeners.listener"
[2025-03-11, 05:48:30] DEBUG - Hook impls: [<HookImpl plugin_name='airflow.example_dags.plugins.event_listener', plugin=<module 'airflow.example_dags.plugins.event_listener' from '/opt/airflow/airflow/example_dags/plugins/event_listener.py'>>] source="airflow.listeners.listener"
[2025-03-11, 05:48:30] INFO - Task instance is in running state chan="stdout" source="task"
[2025-03-11, 05:48:30] INFO - Previous state of the Task instance: queued chan="stdout" source="task"
[2025-03-11, 05:48:30] INFO - Current task name:print_conn chan="stdout" source="task"
[2025-03-11, 05:48:30] INFO - Dag name:example_get_connection chan="stdout" source="task"
[2025-03-11, 05:48:30] DEBUG - Result from 'on_task_instance_running': [] source="airflow.listeners.listener"
[2025-03-11, 05:48:30] DEBUG - Sending request json="{\"conn_id\":\"CONN_A\",\"type\":\"GetConnection\"}\n" source="task"
[2025-03-11, 05:48:30] ERROR - Task failed with exception source="task" error_detail=[{"exc_type":"AirflowRuntimeError","exc_value":"CONNECTION_NOT_FOUND: {'conn_id': 'CONN_A'}","syntax_error":null,"is_cause":false,"frames":[{"filename":"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":610,"name":"run"},{"filename":"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":735,"name":"_execute_task"},{"filename":"/opt/airflow/task-sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":371,"name":"wrapper"},{"filename":"/files/dags/dags/get_connection_basehook.py","lineno":9,"name":"execute"},{"filename":"/opt/airflow/airflow/hooks/base.py","lineno":76,"name":"get_connection"},{"filename":"/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py","lineno":88,"name":"get"},{"filename":"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py","lineno":89,"name":"_get_connection"}]}]
[2025-03-11, 05:48:30] DEBUG - Sending request json="{\"state\":\"failed\",\"end_date\":\"2025-03-11T05:48:30.406142Z\",\"type\":\"TaskState\"}\n" source="task"
[2025-03-11, 05:48:30] DEBUG - Running finalizers ti="RuntimeTaskInstance(id=UUID('019583bf-5216-74f7-a2f4-3e67f266e837'), task_id='print_conn', dag_id='example_get_connection', run_id='manual__2025-03-11T05:48:28.558793+00:00_ElHp1F3J', try_number=1, map_index=-1, hostname='e6fe831ca8f1', task=<Task(CustomOperator): print_conn>, bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, start_date=datetime.datetime(2025, 3, 11, 5, 48, 29, 750837, tzinfo=TzInfo(UTC)))" source="task"
[2025-03-11, 05:48:30] DEBUG - Calling 'on_task_instance_failed' with {'previous_state': <TaskInstanceState.RUNNING: 'running'>, 'task_instance': RuntimeTaskInstance(id=UUID('019583bf-5216-74f7-a2f4-3e67f266e837'), task_id='print_conn', dag_id='example_get_connection', run_id='manual__2025-03-11T05:48:28.558793+00:00_ElHp1F3J', try_number=1, map_index=-1, hostname='e6fe831ca8f1', task=<Task(CustomOperator): print_conn>, bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, start_date=datetime.datetime(2025, 3, 11, 5, 48, 29, 750837, tzinfo=TzInfo(UTC))), 'error': AirflowRuntimeError("CONNECTION_NOT_FOUND: {'conn_id': 'CONN_A'}")} source="airflow.listeners.listener"
[2025-03-11, 05:48:30] DEBUG - Hook impls: [<HookImpl plugin_name='airflow.example_dags.plugins.event_listener', plugin=<module 'airflow.example_dags.plugins.event_listener' from '/opt/airflow/airflow/example_dags/plugins/event_listener.py'>>] source="airflow.listeners.listener"
[2025-03-11, 05:48:30] DEBUG - Result from 'on_task_instance_failed': [] source="airflow.listeners.listener"
[2025-03-11, 05:48:30] DEBUG - Calling 'before_stopping' with {'component': <airflow.sdk.execution_time.task_runner.TaskRunnerMarker object at 0xffff90ba5f40>} source="airflow.listeners.listener"
[2025-03-11, 05:48:30] DEBUG - Hook impls: [] source="airflow.listeners.listener"
[2025-03-11, 05:48:30] DEBUG - Result from 'before_stopping': [] source="airflow.listeners.listener"
[2025-03-11, 05:48:30] INFO - Task instance in failure state chan="stdout" source="task"
[2025-03-11, 05:48:30] INFO - Task start chan="stdout" source="task"
[2025-03-11, 05:48:30] INFO - Task:<Task(CustomOperator): print_conn> chan="stdout" source="task"
[2025-03-11, 05:48:30] INFO - Failure caused by CONNECTION_NOT_FOUND: {'conn_id': 'CONN_A'} chan="stdout" source="task"
[2025-03-11, 05:48:30] WARNING - Airflow core logging is not using a FileTaskHandler, can't upload logs to remote handler="<class 'NoneType'>" source="task"

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added the area:core-operators Operators, Sensors and hooks within Core Airflow label Mar 11, 2025
@amoghrajesh
Copy link
Contributor Author

We probably need to do the same for variables too, but that can be handled seperately.

@amoghrajesh amoghrajesh requested a review from XD-DENG as a code owner March 11, 2025 08:05
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of a temporary stop-gap solution I guess this works to unblock things during betas. Just so everyone knows we can't release it like this though

@amoghrajesh amoghrajesh merged commit 5dae3b5 into apache:main Mar 11, 2025
44 checks passed
@amoghrajesh amoghrajesh deleted the fix-connections-backend branch March 11, 2025 15:45
ramitkataria added a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Mar 11, 2025
…ricHook`

After apache#47048, the boto fallback strategy started breaking in
`AwsGenericHook` (explained in more detail here:
apache#47048 (comment)).
I realized that this was because a new exception `AirflowRuntimeError`
was introduced in `BaseHook`'s `get_connection`. It used to only throw
`AirflowNotFoundException` earlier. This has been temporarily resolved
in apache#47593 but to avoid issues once it does get re-introduced,
`AwsGenericHook` will now also catch `AirflowRuntimeError`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow area:task-sdk
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants