Skip to content

Commit

Permalink
Feat: Improve progress prints (#302)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Jul 18, 2024
1 parent 0616d8a commit e784ed9
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 49 deletions.
73 changes: 45 additions & 28 deletions airbyte/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ def _get_elapsed_time_str(seconds: float) -> str:
Minutes are always included after 1 minute elapsed.
Hours are always included after 1 hour elapsed.
"""
if seconds <= 2: # noqa: PLR2004 # Magic numbers OK here.
# Less than 1 minute elapsed
return f"{seconds:.2f} seconds"

if seconds <= 60: # noqa: PLR2004 # Magic numbers OK here.
# Less than 1 minute elapsed
return f"{seconds:.0f} seconds"
Expand Down Expand Up @@ -128,6 +132,7 @@ def __init__(
# Reads
self.read_start_time = time.time()
self.read_end_time: float | None = None
self.first_record_received_time: float | None = None
self.total_records_read = 0

# Writes
Expand Down Expand Up @@ -245,6 +250,7 @@ def reset(self, num_streams_expected: int) -> None:

# Reads
self.read_start_time = time.time()
self.first_record_received_time = None
self.read_end_time = None
self.total_records_read = 0

Expand All @@ -263,12 +269,22 @@ def reset(self, num_streams_expected: int) -> None:
self._start_rich_view()

@property
def elapsed_seconds(self) -> int:
def elapsed_seconds(self) -> float:
"""Return the number of seconds elapsed since the read operation started."""
if self.finalize_end_time:
return int(self.finalize_end_time - self.read_start_time)
return self.finalize_end_time - self.read_start_time

return time.time() - self.read_start_time

@property
def elapsed_read_time(self) -> float:
"""Return the number of seconds elapsed since the read operation started."""
if self.finalize_start_time:
return self.finalize_start_time - (
self.first_record_received_time or self.read_start_time
)

return int(time.time() - self.read_start_time)
return time.time() - (self.first_record_received_time or self.read_start_time)

@property
def elapsed_time_string(self) -> str:
Expand Down Expand Up @@ -297,13 +313,13 @@ def elapsed_read_time_string(self) -> str:
return _get_elapsed_time_str(self.elapsed_read_seconds)

@property
def elapsed_finalization_seconds(self) -> int:
def elapsed_finalization_seconds(self) -> float:
"""Return the number of seconds elapsed since the read operation started."""
if self.finalize_start_time is None:
return 0
if self.finalize_end_time is None:
return int(time.time() - self.finalize_start_time)
return int(self.finalize_end_time - self.finalize_start_time)
return time.time() - self.finalize_start_time
return self.finalize_end_time - self.finalize_start_time

@property
def elapsed_finalization_time_str(self) -> str:
Expand All @@ -312,6 +328,9 @@ def elapsed_finalization_time_str(self) -> str:

def log_records_read(self, new_total_count: int) -> None:
"""Load a number of records read."""
if self.first_record_received_time is None:
self.first_record_received_time = time.time()

self.total_records_read = new_total_count

# This is some math to make updates adaptive to the scale of records read.
Expand Down Expand Up @@ -399,50 +418,48 @@ def _get_status_message(self) -> str:
# Format start time as a friendly string in local timezone:
start_time_str = _to_time_str(self.read_start_time)
records_per_second: float = 0.0
if self.elapsed_read_seconds > 0:
records_per_second = round(
float(self.total_records_read) / self.elapsed_read_seconds,
ndigits=1,
)
if self.elapsed_read_time > 0:
records_per_second = self.total_records_read / self.elapsed_read_time

status_message = (
f"## Read Progress\n\n"
f"Started reading at {start_time_str}.\n\n"
f"Read **{self.total_records_read:,}** records "
f"### Read Progress\n\n"
f"**Started reading from source at `{start_time_str}`:**\n\n"
f"- Read **{self.total_records_read:,}** records "
f"over **{self.elapsed_read_time_string}** "
f"({records_per_second:,} records / second).\n\n"
f"({records_per_second:,.1f} records / second).\n\n"
)
if self.total_records_written > 0:
status_message += (
f"Wrote **{self.total_records_written:,}** records "
f"over {self.total_batches_written:,} batches.\n\n"
f"- Cached **{self.total_records_written:,}** records "
f"into {self.total_batches_written:,} local cache file(s).\n\n"
)
if self.read_end_time is not None:
read_end_time_str = _to_time_str(self.read_end_time)
status_message += f"Finished reading at {read_end_time_str}.\n\n"
status_message += f"- Finished reading from source at `{read_end_time_str}`.\n\n"
if self.finalize_start_time is not None:
finalize_start_time_str = _to_time_str(self.finalize_start_time)
status_message += f"Started finalizing streams at {finalize_start_time_str}.\n\n"
status_message += f"**Started cache processing at `{finalize_start_time_str}`:**\n\n"
status_message += (
f"Finalized **{self.total_batches_finalized}** batches "
f"over {self.elapsed_finalization_time_str}.\n\n"
f"- Processed **{self.total_batches_finalized}** cache "
f"file(s) over **{self.elapsed_finalization_time_str}**.\n\n"
)
if self.finalize_end_time is not None:
completion_time_str = _to_time_str(self.finalize_end_time)
status_message += f"- Finished cache processing at `{completion_time_str}`.\n\n"

if self.finalized_stream_names:
status_message += (
f"Completed {len(self.finalized_stream_names)} "
f"**Completed processing {len(self.finalized_stream_names)} "
+ (f"out of {self.num_streams_expected} " if self.num_streams_expected else "")
+ "streams:\n\n"
+ "streams:**\n\n"
)
for stream_name in self.finalized_stream_names:
status_message += f" - {stream_name}\n"

status_message += "\n\n"

if self.finalize_end_time is not None:
completion_time_str = _to_time_str(self.finalize_end_time)
status_message += (
f"Completed writing at {completion_time_str}. "
f"Total time elapsed: {self.elapsed_time_string}\n\n"
)
status_message += f"**Total time elapsed: {self.elapsed_time_string}**\n\n"
status_message += "\n------------------------------------------------\n"

return status_message
Expand Down
50 changes: 29 additions & 21 deletions tests/unit_tests/test_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,29 @@ def _assert_lines(expected_lines, actual_lines: list[str] | str):
if isinstance(actual_lines, list):
actual_lines = "\n".join(actual_lines)
for line in expected_lines:
assert line in actual_lines, f"Missing line: {line}"
assert (
line in actual_lines
), f"Missing line:\n{line}\n\nIn lines:\n\n{actual_lines}"


def test_get_status_message_after_finalizing_records():
# Test that we can render the initial status message before starting to read
with freeze_time("2022-01-01 00:00:00"):
progress = ReadProgress()
expected_lines = [
"Started reading at 00:00:00.",
"Read **0** records over **0 seconds** (0.0 records / second).",
"Started reading from source at `00:00:00`",
"Read **0** records over **0.00 seconds** (0.0 records / second).",
]
_assert_lines(expected_lines, progress._get_status_message())

# We need to read one record to start the "time since first record" timer
progress.log_records_read(1)

# Test after reading some records
with freeze_time("2022-01-01 00:01:00"):
progress.log_records_read(100)
expected_lines = [
"Started reading at 00:00:00.",
"Started reading from source at `00:00:00`",
"Read **100** records over **60 seconds** (1.7 records / second).",
]
_assert_lines(expected_lines, progress._get_status_message())
Expand All @@ -132,23 +137,26 @@ def test_get_status_message_after_finalizing_records():
progress = ReadProgress()
progress.reset(1)
expected_lines = [
"Started reading at 00:00:00.",
"Read **0** records over **0 seconds** (0.0 records / second).",
"Started reading from source at `00:00:00`",
"Read **0** records over **0.00 seconds** (0.0 records / second).",
]
_assert_lines(expected_lines, progress._get_status_message())

# We need to read one record to start the "time since first record" timer
progress.log_records_read(1)

# Test after writing some records and starting to finalize
with freeze_time("2022-01-02 00:01:00"):
progress.log_records_read(100)
progress.log_batch_written("stream1", 50)
progress.log_batches_finalizing("stream1", 1)
expected_lines = [
"## Read Progress",
"Started reading at 00:00:00.",
"Started reading from source at `00:00:00`",
"Read **100** records over **60 seconds** (1.7 records / second).",
"Wrote **50** records over 1 batches.",
"Finished reading at 00:01:00.",
"Started finalizing streams at 00:01:00.",
"Cached **50** records into 1 local cache file(s).",
"Finished reading from source at `00:01:00`",
"Started cache processing at `00:01:00`",
]
_assert_lines(expected_lines, progress._get_status_message())

Expand All @@ -157,12 +165,12 @@ def test_get_status_message_after_finalizing_records():
progress.log_batches_finalized("stream1", 1)
expected_lines = [
"## Read Progress",
"Started reading at 00:00:00.",
"Started reading from source at `00:00:00`",
"Read **100** records over **60 seconds** (1.7 records / second).",
"Wrote **50** records over 1 batches.",
"Finished reading at 00:01:00.",
"Started finalizing streams at 00:01:00.",
"Finalized **1** batches over 60 seconds.",
"Cached **50** records into 1 local cache file(s).",
"Finished reading from source at `00:01:00`",
"Started cache processing at `00:01:00`",
"Processed **1** cache file(s) over **60 seconds**",
]
_assert_lines(expected_lines, progress._get_status_message())

Expand All @@ -172,13 +180,13 @@ def test_get_status_message_after_finalizing_records():
message = progress._get_status_message()
expected_lines = [
"## Read Progress",
"Started reading at 00:00:00.",
"Started reading from source at `00:00:00`",
"Read **100** records over **60 seconds** (1.7 records / second).",
"Wrote **50** records over 1 batches.",
"Finished reading at 00:01:00.",
"Started finalizing streams at 00:01:00.",
"Finalized **1** batches over 60 seconds.",
"Completed 1 out of 1 streams:",
"Cached **50** records into 1 local cache file(s).",
"Finished reading from source at `00:01:00`",
"Started cache processing at `00:01:00`",
"Processed **1** cache file(s) over **60 seconds",
"Completed processing 1 out of 1 streams",
"- stream1",
"Total time elapsed: 2min 0s",
]
Expand Down

0 comments on commit e784ed9

Please sign in to comment.