Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deferrable mode for S3KeySensor #31018

Merged
merged 37 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
3ded5c9
Make S3KeySensor deferrable
sunank200 May 2, 2023
3d1c188
Remove aiobotocore from dependency
sunank200 May 2, 2023
85e6edb
Add the S3 trigger path
sunank200 May 8, 2023
95e74c6
Add the tests
sunank200 May 8, 2023
f613dd0
Merge branch 'main' into s3keysensor
sunank200 May 8, 2023
602d9d6
remove unnecessary tests
sunank200 May 9, 2023
ae83da3
Fix the docs
sunank200 May 9, 2023
6e1f0a0
update docs and example DAG
sunank200 May 10, 2023
11ce1b8
Merge branch 'apache:main' into s3keysensor
sunank200 May 10, 2023
41b0c22
Merge branch 'apache:main' into s3keysensor
sunank200 May 29, 2023
e5f515f
Merge branch 'main' into s3keysensor
sunank200 May 29, 2023
571ffa6
Remove S3HookAsync and use S3Hook with async_conn
sunank200 May 29, 2023
0785452
Add tests for hooks and triggers
sunank200 May 29, 2023
38583a7
remove aiohttp
sunank200 May 29, 2023
c4be0da
remove test for Python 3.7 in Airflow
sunank200 May 29, 2023
e1bbaec
Add more test to system tests
sunank200 May 30, 2023
debf4a7
Merge branch 'main' into s3keysensor
sunank200 May 30, 2023
3701f61
Merge branch 'apache:main' into s3keysensor
sunank200 May 30, 2023
b75c3b9
Merge branch 'main' into s3keysensor
sunank200 May 30, 2023
d7b3188
Merge branch 'apache:main' into s3keysensor
sunank200 Jun 5, 2023
d36487d
add check_fn fix
sunank200 Jun 7, 2023
c3e8d8e
Merge branch 'main' into s3keysensor
sunank200 Jun 7, 2023
6be1156
Add tests for chech_fn
sunank200 Jun 7, 2023
fa49d9c
Remove s3 key unchanged code
sunank200 Jun 7, 2023
08ee769
Add the should_check_fn in serializer
sunank200 Jun 7, 2023
df55976
Update triggers integration-name in providers.yaml
sunank200 Jun 7, 2023
3ac1de4
Refactor integration name from Amazon S3 to Amazon Simple Storage Ser…
sunank200 Jun 7, 2023
545a26b
add type checking
sunank200 Jun 7, 2023
8d18db9
Add . for static checksin doc-strings
sunank200 Jun 7, 2023
9f62328
Merge remote-tracking branch 'upstream/main' into s3keysensor
sunank200 Jun 7, 2023
f598da1
Merge branch 'main' into s3keysensor
sunank200 Jun 7, 2023
01f64fe
Merge branch 's3keysensor' of https://github.com/sunank200/airflow in…
sunank200 Jun 7, 2023
66732fd
Merge branch 'main' into s3keysensor
sunank200 Jun 7, 2023
110a8f9
change doc string
sunank200 Jun 7, 2023
83c3bed
Merge branch 's3keysensor' of https://github.com/sunank200/airflow in…
sunank200 Jun 7, 2023
66e7e4e
Merge branch 'main' into s3keysensor
sunank200 Jun 7, 2023
1b315a1
Merge branch 'main' into s3keysensor
sunank200 Jun 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
364 changes: 363 additions & 1 deletion airflow/providers/amazon/aws/hooks/s3.py

Large diffs are not rendered by default.

51 changes: 46 additions & 5 deletions airflow/providers/amazon/aws/sensors/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import fnmatch
import os
import re
from datetime import datetime
from datetime import datetime, timedelta
from functools import cached_property
from typing import TYPE_CHECKING, Callable, Sequence
from typing import TYPE_CHECKING, Any, Callable, List, Sequence, cast

from deprecated import deprecated

Expand All @@ -31,6 +31,7 @@

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.triggers.s3 import S3KeyTrigger
from airflow.sensors.base import BaseSensorOperator, poke_mode_only


Expand All @@ -48,7 +49,7 @@ class S3KeySensor(BaseSensorOperator):
or relative path from root level. When it's specified as a full s3://
url, please leave bucket_name as `None`
:param bucket_name: Name of the S3 bucket. Only needed when ``bucket_key``
is not provided as a full s3:// url. When specified, all the keys passed to ``bucket_key``
is not provided as a full ``s3://`` url. When specified, all the keys passed to ``bucket_key``
refers to this bucket
:param wildcard_match: whether the bucket_key should be interpreted as a
Unix wildcard pattern
Expand All @@ -61,8 +62,9 @@ class S3KeySensor(BaseSensorOperator):
def check_fn(files: List) -> bool:
return any(f.get('Size', 0) > 1048576 for f in files)
:param aws_conn_id: a reference to the s3 connection
:param verify: Whether or not to verify SSL certificates for S3 connection.
By default SSL certificates are verified.
:param deferrable: Run operator in the deferrable mode
:param verify: Whether to verify SSL certificates for S3 connection.
By default, SSL certificates are verified.
You can provide the following values:

- ``False``: do not validate SSL certificates. SSL will still be used
Expand All @@ -84,6 +86,7 @@ def __init__(
check_fn: Callable[..., bool] | None = None,
aws_conn_id: str = "aws_default",
verify: str | bool | None = None,
deferrable: bool = False,
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -93,6 +96,7 @@ def __init__(
self.check_fn = check_fn
self.aws_conn_id = aws_conn_id
self.verify = verify
self.deferrable = deferrable

def _check_key(self, key):
bucket_name, key = S3Hook.get_s3_bucket_key(self.bucket_name, key, "bucket_name", "bucket_key")
Expand Down Expand Up @@ -131,6 +135,43 @@ def poke(self, context: Context):
else:
return all(self._check_key(key) for key in self.bucket_key)

def execute(self, context: Context) -> None:
"""
Defers to Trigger class to poll for state of the job run until
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
it reaches a failure state or success state
"""
if not self.deferrable:
super().execute(context)
else:
if not self.poke(context=context):
self.defer(
timeout=timedelta(seconds=self.timeout),
trigger=S3KeyTrigger(
bucket_name=cast(str, self.bucket_name),
bucket_key=self.bucket_key,
wildcard_match=self.wildcard_match,
check_fn=self.check_fn,
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
aws_conn_id=self.aws_conn_id,
verify=self.verify,
poke_interval=self.poke_interval,
),
method_name="execute_complete",
)

def execute_complete(self, context: Context, event: dict[str, Any]) -> bool | None:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event["status"] == "error":
raise AirflowException(event["message"])
elif event["status"] == "success" and "s3_objects" in event:
files = cast(List[str], event["s3_objects"])
if self.check_fn:
return self.check_fn(files)
return None

@deprecated(reason="use `hook` property instead.")
def get_hook(self) -> S3Hook:
"""Create and return an S3Hook"""
Expand Down
100 changes: 100 additions & 0 deletions airflow/providers/amazon/aws/triggers/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import asyncio
from functools import cached_property
from typing import Any, AsyncIterator, Callable

from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.triggers.base import BaseTrigger, TriggerEvent


class S3KeyTrigger(BaseTrigger):
"""
S3KeyTrigger is fired as deferred class with params to run the task in trigger worker

:param bucket_name: Name of the S3 bucket. Only needed when ``bucket_key``
is not provided as a full s3:// url.
:param bucket_key: The key being waited on. Supports full s3:// style url
or relative path from root level. When it's specified as a full s3://
url, please leave bucket_name as `None`.
:param wildcard_match: whether the bucket_key should be interpreted as a
Unix wildcard pattern
:param aws_conn_id: reference to the s3 connection
:param hook_params: params for hook its optional
:param check_fn: Function that receives the list of the S3 objects,
and returns a boolean
"""

def __init__(
self,
bucket_name: str,
bucket_key: str | list[str],
wildcard_match: bool = False,
check_fn: Callable[..., bool] | None = None,
aws_conn_id: str = "aws_default",
poke_interval: float = 5.0,
**hook_params: Any,
):
super().__init__()
self.bucket_name = bucket_name
self.bucket_key = bucket_key
self.wildcard_match = wildcard_match
self.check_fn = check_fn
self.aws_conn_id = aws_conn_id
self.hook_params = hook_params
self.poke_interval = poke_interval

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize S3KeyTrigger arguments and classpath."""
return (
"airflow.providers.amazon.aws.triggers.s3.S3KeyTrigger",
{
"bucket_name": self.bucket_name,
"bucket_key": self.bucket_key,
"wildcard_match": self.wildcard_match,
"check_fn": self.check_fn,
"aws_conn_id": self.aws_conn_id,
"hook_params": self.hook_params,
"poke_interval": self.poke_interval,
},
)

@cached_property
def hook(self) -> S3Hook:
return S3Hook(aws_conn_id=self.aws_conn_id, verify=self.hook_params.get("verify"))

async def run(self) -> AsyncIterator[TriggerEvent]:
"""Make an asynchronous connection using S3HookAsync."""
try:
async with self.hook.async_conn as client:
while True:
if await self.hook.check_key_async(
client, self.bucket_name, self.bucket_key, self.wildcard_match
):
if self.check_fn is None:
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
yield TriggerEvent({"status": "success"})
else:
s3_objects = await self.hook.get_files_async(
client, self.bucket_name, self.bucket_key, self.wildcard_match
)
yield TriggerEvent({"status": "success", "s3_objects": s3_objects})
await asyncio.sleep(self.poke_interval)

except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
3 changes: 2 additions & 1 deletion airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ dependencies:
- mypy-boto3-rds>=1.24.0
- mypy-boto3-redshift-data>=1.24.0
- mypy-boto3-appflow>=1.24.0
- aiohttp
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
- asgiref
- mypy-boto3-s3>=1.24.0


integrations:
- integration-name: Amazon Athena
external-doc-url: https://aws.amazon.com/athena/
Expand Down
20 changes: 20 additions & 0 deletions docs/apache-airflow-providers-amazon/operators/s3/s3.rst
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,26 @@ multiple files can match one key. The list of matched S3 object attributes conta
:start-after: [START howto_sensor_s3_key_function]
:end-before: [END howto_sensor_s3_key_function]

You can also run this operator in deferrable mode by setting the parameter ``deferrable`` to True.
This will lead to efficient utilization of Airflow workers as polling for job status happens on
the triggerer asynchronously. Note that this will need triggerer to be available on your Airflow deployment.

To check one file:

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_s3_key_single_key_deferrable]
:end-before: [END howto_sensor_s3_key_single_key_deferrable]

To check multiple files:

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_s3_key_multiple_keys_deferrable]
:end-before: [END howto_sensor_s3_key_multiple_keys_deferrable]

.. _howto/sensor:S3KeysUnchangedSensor:

Wait on Amazon S3 prefix changes
Expand Down
2 changes: 2 additions & 0 deletions generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
},
"amazon": {
"deps": [
"aiohttp",
"apache-airflow-providers-common-sql>=1.3.1",
"apache-airflow>=2.4.0",
"asgiref",
"asgiref",
"boto3>=1.24.0",
"jsonpath_ng>=1.5.3",
"mypy-boto3-appflow>=1.24.0",
Expand Down
37 changes: 37 additions & 0 deletions tests/providers/amazon/aws/compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Licensed to the Apache Software Foundation (ASF) under one
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

__all__ = ["async_mock", "AsyncMock"]

import sys

if sys.version_info < (3, 8):
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
# For compatibility with Python 3.7
from asynctest import mock as async_mock

# ``asynctest.mock.CoroutineMock`` which provide compatibility not working well with autospec=True
# as result "TypeError: object MagicMock can't be used in 'await' expression" could be raised.
# Best solution in this case provide as spec actual awaitable object
# >>> from tests.providers.apache.livy.compat import AsyncMock
# >>> from foo.bar import SpamEgg
# >>> mock_something = AsyncMock(SpamEgg)
from asynctest.mock import CoroutineMock as AsyncMock
else:
from unittest import mock as async_mock
from unittest.mock import AsyncMock
17 changes: 5 additions & 12 deletions tests/providers/amazon/aws/hooks/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@

from airflow.exceptions import AirflowException
from airflow.models import Connection
from airflow.providers.amazon.aws.hooks.s3 import S3Hook, provide_bucket_name, unify_bucket_name_and_key
from airflow.utils.timezone import datetime
from airflow.providers.amazon.aws.hooks.s3 import (
S3Hook,
provide_bucket_name,
unify_bucket_name_and_key,
)


@pytest.fixture
Expand Down Expand Up @@ -223,21 +226,11 @@ def test_list_keys(self, s3_bucket):
bucket.put_object(Key="a", Body=b"a")
bucket.put_object(Key="dir/b", Body=b"b")

from_datetime = datetime(1992, 3, 8, 18, 52, 51)
to_datetime = datetime(1993, 3, 14, 21, 52, 42)

def dummy_object_filter(keys, from_datetime=None, to_datetime=None):
return []

assert [] == hook.list_keys(s3_bucket, prefix="non-existent/")
assert ["a", "dir/b"] == hook.list_keys(s3_bucket)
assert ["a"] == hook.list_keys(s3_bucket, delimiter="/")
assert ["dir/b"] == hook.list_keys(s3_bucket, prefix="dir/")
assert ["dir/b"] == hook.list_keys(s3_bucket, start_after_key="a")
assert [] == hook.list_keys(s3_bucket, from_datetime=from_datetime, to_datetime=to_datetime)
assert [] == hook.list_keys(
s3_bucket, from_datetime=from_datetime, to_datetime=to_datetime, object_filter=dummy_object_filter
)

def test_list_keys_paged(self, s3_bucket):
hook = S3Hook()
Expand Down
Loading