-
Notifications
You must be signed in to change notification settings - Fork 4.6k
AirbyteLib: Progress Printer #34588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AirbyteLib: Progress Printer #34588
Changes from all commits
abbb256
3845f5c
9fccace
a217a6e
6aa85d6
dddbc78
b1d966b
f61152a
d665088
809918b
4a41ffb
bab5e06
10ce077
063bba3
ab75be4
8880b0b
3773149
90918c8
5f0bcb3
9ed1929
ec4d8dd
5d4eb45
a164168
0c62f04
bec8d11
df24520
6cc9f50
6d6708c
bf11816
f1505f1
625afb6
7b7150b
c17b652
617371b
94a90b5
6cc2cfd
1007b88
2a2fdc8
c4c6a6b
0b6895b
6675233
9197728
f73f288
dd9ac99
a2bed01
2e49154
0772a68
f975282
ace7208
8775c1b
34cb485
f24226d
7370d83
cfafccc
8446838
8b85a67
11ebbb8
a2d6a5c
df77431
4d6a5aa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,320 @@ | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
|
||
"""A simple progress bar for the command line and IPython notebooks.""" | ||
from __future__ import annotations | ||
|
||
import datetime | ||
import math | ||
import time | ||
from contextlib import suppress | ||
from typing import cast | ||
|
||
from rich.errors import LiveError | ||
from rich.live import Live as RichLive | ||
from rich.markdown import Markdown as RichMarkdown | ||
|
||
|
||
try: | ||
IS_NOTEBOOK = True | ||
from IPython import display as ipy_display | ||
|
||
except ImportError: | ||
ipy_display = None | ||
IS_NOTEBOOK = False | ||
|
||
|
||
MAX_UPDATE_FREQUENCY = 1000 | ||
"""The max number of records to read before updating the progress bar.""" | ||
|
||
|
||
def _to_time_str(timestamp: float) -> str: | ||
"""Convert a timestamp float to a local time string. | ||
|
||
For now, we'll just use UTC to avoid breaking tests. In the future, we should | ||
return a local time string. | ||
""" | ||
datetime_obj = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc) | ||
# TODO: Uncomment this line when we can get tests to properly account for local timezones. | ||
# For now, we'll just use UTC to avoid breaking tests. | ||
# datetime_obj = datetime_obj.astimezone() | ||
return datetime_obj.strftime("%H:%M:%S") | ||
|
||
|
||
def _get_elapsed_time_str(seconds: int) -> str: | ||
"""Return duration as a string. | ||
|
||
Seconds are included until 10 minutes is exceeded. | ||
Minutes are always included after 1 minute elapsed. | ||
Hours are always included after 1 hour elapsed. | ||
""" | ||
if seconds <= 60: # noqa: PLR2004 # Magic numbers OK here. | ||
return f"{seconds} seconds" | ||
|
||
if seconds < 60 * 10: | ||
minutes = seconds // 60 | ||
seconds = seconds % 60 | ||
return f"{minutes}min {seconds}s" | ||
|
||
if seconds < 60 * 60: | ||
minutes = seconds // 60 | ||
seconds = seconds % 60 | ||
return f"{minutes}min" | ||
|
||
hours = seconds // (60 * 60) | ||
minutes = (seconds % (60 * 60)) // 60 | ||
return f"{hours}hr {minutes}min" | ||
|
||
|
||
class ReadProgress: | ||
"""A simple progress bar for the command line and IPython notebooks.""" | ||
|
||
def __init__(self) -> None: | ||
"""Initialize the progress tracker.""" | ||
# Streams expected (for progress bar) | ||
self.num_streams_expected = 0 | ||
|
||
# Reads | ||
self.read_start_time = time.time() | ||
self.read_end_time: float | None = None | ||
self.total_records_read = 0 | ||
|
||
# Writes | ||
self.total_records_written = 0 | ||
self.total_batches_written = 0 | ||
self.written_stream_names: set[str] = set() | ||
|
||
# Finalization | ||
self.finalize_start_time: float | None = None | ||
self.finalize_end_time: float | None = None | ||
self.total_records_finalized = 0 | ||
self.total_batches_finalized = 0 | ||
self.finalized_stream_names: set[str] = set() | ||
|
||
self.last_update_time: float | None = None | ||
|
||
self.rich_view: RichLive | None = None | ||
if not IS_NOTEBOOK: | ||
# If we're in a terminal, use a Rich view to display the progress updates. | ||
self.rich_view = RichLive() | ||
try: | ||
self.rich_view.start() | ||
except LiveError: | ||
self.rich_view = None | ||
|
||
def __del__(self) -> None: | ||
"""Close the Rich view.""" | ||
if self.rich_view: | ||
with suppress(Exception): | ||
self.rich_view.stop() | ||
|
||
def log_success(self) -> None: | ||
"""Log success and stop tracking progress.""" | ||
if self.finalize_end_time is None: | ||
# If we haven't already finalized, do so now. | ||
|
||
self.finalize_end_time = time.time() | ||
|
||
self.update_display(force_refresh=True) | ||
if self.rich_view: | ||
with suppress(Exception): | ||
self.rich_view.stop() | ||
|
||
def reset(self, num_streams_expected: int) -> None: | ||
"""Reset the progress tracker.""" | ||
# Streams expected (for progress bar) | ||
self.num_streams_expected = num_streams_expected | ||
|
||
# Reads | ||
self.read_start_time = time.time() | ||
self.read_end_time = None | ||
self.total_records_read = 0 | ||
|
||
# Writes | ||
self.total_records_written = 0 | ||
self.total_batches_written = 0 | ||
self.written_stream_names = set() | ||
|
||
# Finalization | ||
self.finalize_start_time = None | ||
self.finalize_end_time = None | ||
self.total_records_finalized = 0 | ||
self.total_batches_finalized = 0 | ||
self.finalized_stream_names = set() | ||
|
||
@property | ||
def elapsed_seconds(self) -> int: | ||
"""Return the number of seconds elapsed since the read operation started.""" | ||
if self.finalize_end_time: | ||
return int(self.finalize_end_time - self.read_start_time) | ||
|
||
return int(time.time() - self.read_start_time) | ||
|
||
@property | ||
def elapsed_time_string(self) -> str: | ||
"""Return duration as a string.""" | ||
return _get_elapsed_time_str(self.elapsed_seconds) | ||
|
||
@property | ||
def elapsed_seconds_since_last_update(self) -> float | None: | ||
"""Return the number of seconds elapsed since the last update.""" | ||
if self.last_update_time is None: | ||
return None | ||
|
||
return time.time() - self.last_update_time | ||
|
||
@property | ||
def elapsed_read_seconds(self) -> int: | ||
"""Return the number of seconds elapsed since the read operation started.""" | ||
if self.read_end_time is None: | ||
return int(time.time() - self.read_start_time) | ||
|
||
return int(self.read_end_time - self.read_start_time) | ||
|
||
@property | ||
def elapsed_read_time_string(self) -> str: | ||
"""Return duration as a string.""" | ||
return _get_elapsed_time_str(self.elapsed_read_seconds) | ||
|
||
@property | ||
def elapsed_finalization_seconds(self) -> int: | ||
"""Return the number of seconds elapsed since the read operation started.""" | ||
if self.finalize_start_time is None: | ||
return 0 | ||
if self.finalize_end_time is None: | ||
return int(time.time() - self.finalize_start_time) | ||
return int(self.finalize_end_time - self.finalize_start_time) | ||
|
||
@property | ||
def elapsed_finalization_time_str(self) -> str: | ||
"""Return duration as a string.""" | ||
return _get_elapsed_time_str(self.elapsed_finalization_seconds) | ||
|
||
def log_records_read(self, new_total_count: int) -> None: | ||
"""Load a number of records read.""" | ||
self.total_records_read = new_total_count | ||
|
||
# This is some math to make updates adaptive to the scale of records read. | ||
# We want to update the display more often when the count is low, and less | ||
# often when the count is high. | ||
updated_period = min( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO it would make sense to throttle these updates based on the elapsed time. The current approach works well in a lot of scenarios, but gets weird for edge cases like:
By just updating once a second or so, it's lively without any risk of overloading the terminal There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is good feedback. It's not obvious here, but there's also a time-based throttle inside update_display(). In this layer I'm trying to keep as fast as possible performance, so just the math-based check. Probably we could add a time-elapsed check, but this codepath may be called literally thousands of times per second, so I'm wary to change this further right now. Happy to keep iterating after merging though. |
||
MAX_UPDATE_FREQUENCY, 10 ** math.floor(math.log10(self.total_records_read) / 4) | ||
) | ||
if self.total_records_read % updated_period != 0: | ||
return | ||
|
||
self.update_display() | ||
|
||
def log_batch_written(self, stream_name: str, batch_size: int) -> None: | ||
"""Log that a batch has been written. | ||
|
||
Args: | ||
stream_name: The name of the stream. | ||
batch_size: The number of records in the batch. | ||
""" | ||
self.total_records_written += batch_size | ||
self.total_batches_written += 1 | ||
self.written_stream_names.add(stream_name) | ||
self.update_display() | ||
|
||
def log_batches_finalizing(self, stream_name: str, num_batches: int) -> None: | ||
"""Log that batch are ready to be finalized. | ||
|
||
In our current implementation, we ignore the stream name and number of batches. | ||
We just use this as a signal that we're finished reading and have begun to | ||
finalize any accumulated batches. | ||
""" | ||
_ = stream_name, num_batches # unused for now | ||
if self.finalize_start_time is None: | ||
self.read_end_time = time.time() | ||
self.finalize_start_time = self.read_end_time | ||
|
||
self.update_display(force_refresh=True) | ||
|
||
def log_batches_finalized(self, stream_name: str, num_batches: int) -> None: | ||
"""Log that a batch has been finalized.""" | ||
_ = stream_name # unused for now | ||
self.total_batches_finalized += num_batches | ||
self.update_display(force_refresh=True) | ||
|
||
def log_stream_finalized(self, stream_name: str) -> None: | ||
"""Log that a stream has been finalized.""" | ||
self.finalized_stream_names.add(stream_name) | ||
if len(self.finalized_stream_names) == self.num_streams_expected: | ||
self.log_success() | ||
|
||
self.update_display(force_refresh=True) | ||
|
||
def update_display(self, *, force_refresh: bool = False) -> None: | ||
"""Update the display.""" | ||
# Don't update more than twice per second unless force_refresh is True. | ||
if ( | ||
not force_refresh | ||
and self.last_update_time # if not set, then we definitely need to update | ||
and cast(float, self.elapsed_seconds_since_last_update) < 0.5 # noqa: PLR2004 | ||
): | ||
return | ||
|
||
status_message = self._get_status_message() | ||
|
||
if IS_NOTEBOOK: | ||
# We're in a notebook so use the IPython display. | ||
ipy_display.clear_output(wait=True) | ||
ipy_display.display(ipy_display.Markdown(status_message)) | ||
|
||
elif self.rich_view is not None: | ||
self.rich_view.update(RichMarkdown(status_message)) | ||
|
||
self.last_update_time = time.time() | ||
|
||
def _get_status_message(self) -> str: | ||
"""Compile and return a status message.""" | ||
# Format start time as a friendly string in local timezone: | ||
start_time_str = _to_time_str(self.read_start_time) | ||
records_per_second: float = 0.0 | ||
if self.elapsed_read_seconds > 0: | ||
records_per_second = round(self.total_records_read / self.elapsed_read_seconds, 1) | ||
status_message = ( | ||
f"## Read Progress\n\n" | ||
f"Started reading at {start_time_str}.\n\n" | ||
f"Read **{self.total_records_read:,}** records " | ||
f"over **{self.elapsed_read_time_string}** " | ||
f"({records_per_second:,} records / second).\n\n" | ||
) | ||
if self.total_records_written > 0: | ||
status_message += ( | ||
f"Wrote **{self.total_records_written:,}** records " | ||
f"over {self.total_batches_written:,} batches.\n\n" | ||
) | ||
if self.read_end_time is not None: | ||
read_end_time_str = _to_time_str(self.read_end_time) | ||
status_message += f"Finished reading at {read_end_time_str}.\n\n" | ||
if self.finalize_start_time is not None: | ||
finalize_start_time_str = _to_time_str(self.finalize_start_time) | ||
status_message += f"Started finalizing streams at {finalize_start_time_str}.\n\n" | ||
status_message += ( | ||
f"Finalized **{self.total_batches_finalized}** batches " | ||
f"over {self.elapsed_finalization_time_str}.\n\n" | ||
) | ||
if self.finalized_stream_names: | ||
status_message += ( | ||
f"Completed {len(self.finalized_stream_names)} " | ||
+ (f"out of {self.num_streams_expected} " if self.num_streams_expected else "") | ||
+ "streams:\n\n" | ||
) | ||
for stream_name in self.finalized_stream_names: | ||
status_message += f" - {stream_name}\n" | ||
|
||
status_message += "\n\n" | ||
|
||
if self.finalize_end_time is not None: | ||
completion_time_str = _to_time_str(self.finalize_end_time) | ||
status_message += ( | ||
f"Completed writing at {completion_time_str}. " | ||
f"Total time elapsed: {self.elapsed_time_string}\n\n" | ||
) | ||
status_message += "\n------------------------------------------------\n" | ||
|
||
return status_message | ||
|
||
|
||
progress = ReadProgress() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like these small QOL checks, but could we also add an option for this like pinecone does?
https://github.com/pinecone-io/pinecone-python-client/blob/61c19ad5a6a70321db44189dd90e603703670869/pinecone/data/index.py#L154
Some environments don't handle this kind of using the console very well and it's nice to be able to mute it for a clean output in a more "production" setting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm definitely on board for this. The refactoring would be significant at this stage though, specifically because the progress messages are currently sent from a few different places in the codebase. While I'm not able to do so in this iteration, I definitely think this is worth doing and will probably get to it down the road.