Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deferrable mode for S3KeySensor #31018

Merged
merged 37 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
3ded5c9
Make S3KeySensor deferrable
sunank200 May 2, 2023
3d1c188
Remove aiobotocore from dependency
sunank200 May 2, 2023
85e6edb
Add the S3 trigger path
sunank200 May 8, 2023
95e74c6
Add the tests
sunank200 May 8, 2023
f613dd0
Merge branch 'main' into s3keysensor
sunank200 May 8, 2023
602d9d6
remove unnecessary tests
sunank200 May 9, 2023
ae83da3
Fix the docs
sunank200 May 9, 2023
6e1f0a0
update docs and example DAG
sunank200 May 10, 2023
11ce1b8
Merge branch 'apache:main' into s3keysensor
sunank200 May 10, 2023
41b0c22
Merge branch 'apache:main' into s3keysensor
sunank200 May 29, 2023
e5f515f
Merge branch 'main' into s3keysensor
sunank200 May 29, 2023
571ffa6
Remove S3HookAsync and use S3Hook with async_conn
sunank200 May 29, 2023
0785452
Add tests for hooks and triggers
sunank200 May 29, 2023
38583a7
remove aiohttp
sunank200 May 29, 2023
c4be0da
remove test for Python 3.7 in Airflow
sunank200 May 29, 2023
e1bbaec
Add more test to system tests
sunank200 May 30, 2023
debf4a7
Merge branch 'main' into s3keysensor
sunank200 May 30, 2023
3701f61
Merge branch 'apache:main' into s3keysensor
sunank200 May 30, 2023
b75c3b9
Merge branch 'main' into s3keysensor
sunank200 May 30, 2023
d7b3188
Merge branch 'apache:main' into s3keysensor
sunank200 Jun 5, 2023
d36487d
add check_fn fix
sunank200 Jun 7, 2023
c3e8d8e
Merge branch 'main' into s3keysensor
sunank200 Jun 7, 2023
6be1156
Add tests for chech_fn
sunank200 Jun 7, 2023
fa49d9c
Remove s3 key unchanged code
sunank200 Jun 7, 2023
08ee769
Add the should_check_fn in serializer
sunank200 Jun 7, 2023
df55976
Update triggers integration-name in providers.yaml
sunank200 Jun 7, 2023
3ac1de4
Refactor integration name from Amazon S3 to Amazon Simple Storage Ser…
sunank200 Jun 7, 2023
545a26b
add type checking
sunank200 Jun 7, 2023
8d18db9
Add . for static checksin doc-strings
sunank200 Jun 7, 2023
9f62328
Merge remote-tracking branch 'upstream/main' into s3keysensor
sunank200 Jun 7, 2023
f598da1
Merge branch 'main' into s3keysensor
sunank200 Jun 7, 2023
01f64fe
Merge branch 's3keysensor' of https://github.com/sunank200/airflow in…
sunank200 Jun 7, 2023
66732fd
Merge branch 'main' into s3keysensor
sunank200 Jun 7, 2023
110a8f9
change doc string
sunank200 Jun 7, 2023
83c3bed
Merge branch 's3keysensor' of https://github.com/sunank200/airflow in…
sunank200 Jun 7, 2023
66e7e4e
Merge branch 'main' into s3keysensor
sunank200 Jun 7, 2023
1b315a1
Merge branch 'main' into s3keysensor
sunank200 Jun 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 0 additions & 114 deletions airflow/providers/amazon/aws/hooks/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import gzip as gz
import io
import logging
import os
import re
import shutil
import warnings
Expand Down Expand Up @@ -612,119 +611,6 @@ async def _list_keys_async(

return keys

async def is_keys_unchanged_async(
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
self,
client: AioBaseClient,
bucket_name: str,
prefix: str,
inactivity_period: float = 60 * 60,
min_objects: int = 1,
previous_objects: set[str] | None = None,
inactivity_seconds: int = 0,
allow_delete: bool = True,
last_activity_time: datetime | None = None,
) -> dict[str, Any]:
"""
Checks whether new objects have been uploaded and the inactivity_period
has passed and updates the state of the sensor accordingly.

:param client: aiobotocore client
:param bucket_name: the name of the bucket
:param prefix: a key prefix
:param inactivity_period: the total seconds of inactivity to designate
keys unchanged. Note, this mechanism is not real time and
this operator may not return until a poke_interval after this period
has passed with no additional objects sensed.
:param min_objects: the minimum number of objects needed for keys unchanged
sensor to be considered valid.
:param previous_objects: the set of object ids found during the last poke.
:param inactivity_seconds: number of inactive seconds
:param last_activity_time: last activity datetime
:param allow_delete: Should this sensor consider objects being deleted
between pokes valid behavior. If true a warning message will be logged
when this happens. If false an error will be raised.
:return: dictionary with status and message
:rtype: Dict
"""
if previous_objects is None:
previous_objects = set()
list_keys = await self._list_keys_async(client=client, bucket_name=bucket_name, prefix=prefix)
current_objects = set(list_keys)
current_num_objects = len(current_objects)
if current_num_objects > len(previous_objects):
# When new objects arrived, reset the inactivity_seconds
# and update previous_objects for the next poke.
self.log.info(
"New objects found at %s, resetting last_activity_time.",
os.path.join(bucket_name, prefix),
)
self.log.debug("New objects: %s", current_objects - previous_objects)
last_activity_time = datetime.now()
inactivity_seconds = 0
previous_objects = current_objects
return {
"status": "pending",
"previous_objects": previous_objects,
"last_activity_time": last_activity_time,
"inactivity_seconds": inactivity_seconds,
}

if len(previous_objects) - len(current_objects):
# During the last poke interval objects were deleted.
if allow_delete:
deleted_objects = previous_objects - current_objects
previous_objects = current_objects
last_activity_time = datetime.now()
self.log.info(
"Objects were deleted during the last poke interval. Updating the "
"file counter and resetting last_activity_time:\n%s",
deleted_objects,
)
return {
"status": "pending",
"previous_objects": previous_objects,
"last_activity_time": last_activity_time,
"inactivity_seconds": inactivity_seconds,
}

return {
"status": "error",
"message": f" {os.path.join(bucket_name, prefix)} between pokes.",
}

if last_activity_time:
inactivity_seconds = int((datetime.now() - last_activity_time).total_seconds())
else:
# Handles the first poke where last inactivity time is None.
last_activity_time = datetime.now()
inactivity_seconds = 0

if inactivity_seconds >= inactivity_period:
path = os.path.join(bucket_name, prefix)

if current_num_objects >= min_objects:
success_message = (
"SUCCESS: Sensor found %s objects at %s. "
"Waited at least %s seconds, with no new objects uploaded."
)
self.log.info(success_message, current_num_objects, path, inactivity_period)
return {
"status": "success",
"message": success_message % (current_num_objects, path, inactivity_period),
}

self.log.error("FAILURE: Inactivity Period passed, not enough objects found in %s", path)
return {
"status": "error",
"message": f"FAILURE: Inactivity Period passed, not enough objects found in {path}",
}
return {
"status": "pending",
"previous_objects": previous_objects,
"last_activity_time": last_activity_time,
"inactivity_seconds": inactivity_seconds,
}

def _list_key_object_filter(
self, keys: list, from_datetime: datetime | None = None, to_datetime: datetime | None = None
) -> list:
Expand Down
131 changes: 6 additions & 125 deletions tests/providers/amazon/aws/hooks/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ def test_list_keys(self, s3_bucket):
bucket.put_object(Key="bxb", Body=b"axb")
bucket.put_object(Key="dir/b", Body=b"b")

from_datetime = datetime(1992, 3, 8, 18, 52, 51)
to_datetime = datetime(1993, 3, 14, 21, 52, 42)

def dummy_object_filter(keys, from_datetime=None, to_datetime=None):
return []

assert [] == hook.list_keys(s3_bucket, prefix="non-existent/")
assert ["a", "ba", "bxa", "bxb", "dir/b"] == hook.list_keys(s3_bucket)
assert ["a", "ba", "bxa", "bxb"] == hook.list_keys(s3_bucket, delimiter="/")
Expand Down Expand Up @@ -551,131 +557,6 @@ async def test_s3_key_hook_list_keys_async(self, mock_client):
response = await s3_hook_async._list_keys_async(mock_client, "test_bucket", "test*")
assert response == ["test_key", "test_key2"]

@pytest.mark.asyncio
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
async def test_s3_key_hook_is_keys_unchanged_false_async(self, mock_list_keys, mock_client):
"""
Test is_key_unchanged gives False response
:return:
"""

mock_list_keys.return_value = ["test"]

s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
response = await s3_hook_async.is_keys_unchanged_async(
client=mock_client.return_value,
bucket_name="test_bucket",
prefix="test",
inactivity_period=1,
min_objects=1,
previous_objects=set(),
inactivity_seconds=0,
allow_delete=True,
last_activity_time=None,
)

assert response.get("status") == "pending"

# test for the case when current_objects < previous_objects
mock_list_keys.return_value = []

s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
response = await s3_hook_async.is_keys_unchanged_async(
client=mock_client.return_value,
bucket_name="test_bucket",
prefix="test",
inactivity_period=1,
min_objects=1,
previous_objects=set("test"),
inactivity_seconds=0,
allow_delete=True,
last_activity_time=None,
)

assert response.get("status") == "pending"

@pytest.mark.asyncio
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
async def test_s3_key_hook_is_keys_unchanged_exception_async(self, mock_list_keys, mock_client):
"""
Test is_key_unchanged gives AirflowException
:return:
"""
mock_list_keys.return_value = []

s3_hook_async = S3Hook(client_type="S3", resource_type="S3")

response = await s3_hook_async.is_keys_unchanged_async(
client=mock_client.return_value,
bucket_name="test_bucket",
prefix="test",
inactivity_period=1,
min_objects=1,
previous_objects=set("test"),
inactivity_seconds=0,
allow_delete=False,
last_activity_time=None,
)

assert response == {"message": " test_bucket/test between pokes.", "status": "error"}

@pytest.mark.asyncio
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
async def test_s3_key_hook_is_keys_unchanged_pending_async(self, mock_list_keys, mock_client):
"""
Test is_key_unchanged gives AirflowException
:return:
"""
mock_list_keys.return_value = []

s3_hook_async = S3Hook(client_type="S3", resource_type="S3")

response = await s3_hook_async.is_keys_unchanged_async(
client=mock_client.return_value,
bucket_name="test_bucket",
prefix="test",
inactivity_period=1,
min_objects=0,
previous_objects=set(),
inactivity_seconds=0,
allow_delete=False,
last_activity_time=None,
)

assert response.get("status") == "pending"

@pytest.mark.asyncio
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
async def test_s3_key_hook_is_keys_unchanged_inactivity_error_async(self, mock_list_keys, mock_client):
"""
Test is_key_unchanged gives AirflowException
:return:
"""
mock_list_keys.return_value = []

s3_hook_async = S3Hook(client_type="S3", resource_type="S3")

response = await s3_hook_async.is_keys_unchanged_async(
client=mock_client.return_value,
bucket_name="test_bucket",
prefix="test",
inactivity_period=0,
min_objects=5,
previous_objects=set(),
inactivity_seconds=5,
allow_delete=False,
last_activity_time=None,
)

assert response == {
"status": "error",
"message": "FAILURE: Inactivity Period passed, not enough objects found in test_bucket/test",
}

@pytest.mark.asyncio
@pytest.mark.parametrize(
"test_first_prefix, test_second_prefix",
Expand Down