Skip to content

Commit

Permalink
Update microbatch end_time to the batch_size ceiling (#10883)
Browse files Browse the repository at this point in the history
* Add function to MicrobatchBuilder to get ceiling of timestamp by batch_size

* Update `MicrobatchBuilder.build_end_time` to use `ceiling_timestamp`

* fix TestMicrobatchBuilder.test_build_end_time by specifying a BatchSize + asserting actual is a ceiling timestamp

* Add changie

---------

Co-authored-by: Michelle Ark <[email protected]>
  • Loading branch information
QMalcolm and MichelleArk authored Oct 29, 2024
1 parent 8df5c96 commit dd77210
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 4 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20241029-161615.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Take `end_time` for batches to the ceiling to handle edge case where `event_time`
column is a date
time: 2024-10-29T16:16:15.714993-05:00
custom:
Author: QMalcolm MichelleArk
Issue: "10868"
25 changes: 23 additions & 2 deletions core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def __init__(

def build_end_time(self):
"""Defaults the end_time to the current time in UTC unless a non `None` event_time_end was provided"""
return self.event_time_end or self.default_end_time
end_time = self.event_time_end or self.default_end_time
return MicrobatchBuilder.ceiling_timestamp(end_time, self.model.config.batch_size)

def build_start_time(self, checkpoint: Optional[datetime]):
"""Create a start time based off the passed in checkpoint.
Expand Down Expand Up @@ -161,7 +162,7 @@ def offset_timestamp(timestamp: datetime, batch_size: BatchSize, offset: int) ->
return offset_timestamp

@staticmethod
def truncate_timestamp(timestamp: datetime, batch_size: BatchSize):
def truncate_timestamp(timestamp: datetime, batch_size: BatchSize) -> datetime:
"""Truncates the passed in timestamp based on the batch_size.
2024-09-17 16:06:00 + Batchsize.hour -> 2024-09-17 16:00:00
Expand Down Expand Up @@ -201,3 +202,23 @@ def format_batch_start(
return str(
batch_start.date() if (batch_start and batch_size != BatchSize.hour) else batch_start
)

@staticmethod
def ceiling_timestamp(timestamp: datetime, batch_size: BatchSize) -> datetime:
"""Takes the given timestamp and moves it to the ceiling for the given batch size
Note, if the timestamp is already the batch size ceiling, that is returned
2024-09-17 16:06:00 + BatchSize.hour -> 2024-09-17 17:00:00
2024-09-17 16:00:00 + BatchSize.hour -> 2024-09-17 16:00:00
2024-09-17 16:06:00 + BatchSize.day -> 2024-09-18 00:00:00
2024-09-17 00:00:00 + BatchSize.day -> 2024-09-17 00:00:00
2024-09-17 16:06:00 + BatchSize.month -> 2024-10-01 00:00:00
2024-09-01 00:00:00 + BatchSize.month -> 2024-09-01 00:00:00
2024-09-17 16:06:00 + BatchSize.year -> 2025-01-01 00:00:00
2024-01-01 00:00:00 + BatchSize.year -> 2024-01-01 00:00:00
"""
ceiling = truncated = MicrobatchBuilder.truncate_timestamp(timestamp, batch_size)
if truncated != timestamp:
ceiling = MicrobatchBuilder.offset_timestamp(truncated, batch_size, 1)
return ceiling
56 changes: 54 additions & 2 deletions tests/unit/materializations/incremental/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def microbatch_model(self):
model.config.materialized = "incremental"
model.config.incremental_strategy = "microbatch"
model.config.begin = MODEL_CONFIG_BEGIN
model.config.batch_size = BatchSize.day

return model

Expand All @@ -30,12 +31,12 @@ def microbatch_model(self):
(
False,
None,
datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC),
datetime(2024, 9, 6, 0, 0, 0, 0, pytz.UTC),
),
(
True,
None,
datetime(2024, 9, 5, 8, 56, 0, 0, pytz.UTC),
datetime(2024, 9, 6, 0, 0, 0, 0, pytz.UTC),
),
(
False,
Expand Down Expand Up @@ -616,3 +617,54 @@ def test_format_batch_start(self, batch_size, batch_start, expected_formatted_ba
MicrobatchBuilder.format_batch_start(batch_start, batch_size)
== expected_formatted_batch_start
)

@pytest.mark.parametrize(
"timestamp,batch_size,expected_datetime",
[
(
datetime(2024, 9, 17, 16, 6, 0, 0, pytz.UTC),
BatchSize.hour,
datetime(2024, 9, 17, 17, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 9, 17, 16, 0, 0, 0, pytz.UTC),
BatchSize.hour,
datetime(2024, 9, 17, 16, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 9, 17, 16, 6, 0, 0, pytz.UTC),
BatchSize.day,
datetime(2024, 9, 18, 0, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 9, 17, 0, 0, 0, 0, pytz.UTC),
BatchSize.day,
datetime(2024, 9, 17, 0, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 9, 17, 16, 6, 0, 0, pytz.UTC),
BatchSize.month,
datetime(2024, 10, 1, 0, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC),
BatchSize.month,
datetime(2024, 9, 1, 0, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 9, 17, 16, 6, 0, 0, pytz.UTC),
BatchSize.year,
datetime(2025, 1, 1, 0, 0, 0, 0, pytz.UTC),
),
(
datetime(2024, 1, 1, 0, 0, 0, 0, pytz.UTC),
BatchSize.year,
datetime(2024, 1, 1, 0, 0, 0, 0, pytz.UTC),
),
],
)
def test_ceiling_timestamp(
self, timestamp: datetime, batch_size: BatchSize, expected_datetime: datetime
) -> None:
ceilinged = MicrobatchBuilder.ceiling_timestamp(timestamp, batch_size)
assert ceilinged == expected_datetime

0 comments on commit dd77210

Please sign in to comment.