Skip to content

AirbyteLib: Progress Printer #34588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 60 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
abbb256
new exception type: AirbyteConnectorNotRegisteredError
aaronsteers Jan 26, 2024
3845f5c
make constructors more resilient
aaronsteers Jan 26, 2024
9fccace
print stderr in exception text, cleanup failed install, remove editab…
aaronsteers Jan 26, 2024
a217a6e
move auto-install out of venv constructor, for easier debugging
aaronsteers Jan 26, 2024
6aa85d6
add test to assert that install failure includes pip log text
aaronsteers Jan 26, 2024
dddbc78
update docs
aaronsteers Jan 26, 2024
b1d966b
auto-format
aaronsteers Jan 26, 2024
f61152a
update docs
aaronsteers Jan 26, 2024
d665088
refactor version handling, control for side effects
aaronsteers Jan 26, 2024
809918b
fix exception handling in _get_installed_version()
aaronsteers Jan 26, 2024
4a41ffb
fix tests
aaronsteers Jan 26, 2024
bab5e06
improve thread safety
aaronsteers Jan 27, 2024
10ce077
handle quoted spaces in pip_url
aaronsteers Jan 27, 2024
063bba3
fix import sorts
aaronsteers Jan 27, 2024
ab75be4
standalone validate_config() method
aaronsteers Jan 27, 2024
8880b0b
add Source.yaml_spec property
aaronsteers Jan 27, 2024
3773149
make _yaml_spec a protected member
aaronsteers Jan 27, 2024
90918c8
fix too-limited json package_data glob
aaronsteers Jan 27, 2024
5f0bcb3
basic progress reporting
aaronsteers Jan 27, 2024
9ed1929
remove raw=True
aaronsteers Jan 27, 2024
ec4d8dd
add progress tracker class
aaronsteers Jan 28, 2024
5d4eb45
update docs
aaronsteers Jan 28, 2024
a164168
bug fixes
aaronsteers Jan 28, 2024
0c62f04
fix separator
aaronsteers Jan 28, 2024
bec8d11
bug fixes
aaronsteers Jan 28, 2024
df24520
bug fix
aaronsteers Jan 28, 2024
6cc9f50
improved logs
aaronsteers Jan 28, 2024
6d6708c
fix progress bugs, add unit tests
aaronsteers Jan 28, 2024
bf11816
add reset() at beginning of sync
aaronsteers Jan 28, 2024
f1505f1
linting fix and fix missing stream count for progress reset
aaronsteers Jan 28, 2024
625afb6
Update airbyte-integrations/connectors/source-github/setup.py
aaronsteers Jan 28, 2024
7b7150b
add faker test script to sample terminal output
aaronsteers Jan 28, 2024
c17b652
fix logs
aaronsteers Jan 28, 2024
617371b
add terminal implementation via rich, add final complete message
aaronsteers Jan 28, 2024
94a90b5
update scale in faker example
aaronsteers Jan 28, 2024
6cc2cfd
poetry add rich
aaronsteers Jan 28, 2024
1007b88
cleanup helper functions
aaronsteers Jan 28, 2024
2a2fdc8
update docs
aaronsteers Jan 28, 2024
c4c6a6b
fix datetime refs
aaronsteers Jan 28, 2024
0b6895b
fix tests, make tests more flexible
aaronsteers Jan 28, 2024
6675233
reorder import
aaronsteers Jan 28, 2024
9197728
fix missing copyright str
aaronsteers Jan 28, 2024
f73f288
docstring
aaronsteers Jan 28, 2024
dd9ac99
update docs
aaronsteers Jan 28, 2024
a2bed01
revert source-github change
aaronsteers Jan 28, 2024
2e49154
updated comment
aaronsteers Jan 28, 2024
0772a68
Merge branch 'aj/airbyte-lib/install-failure-handling' into aj/airbyt…
aaronsteers Jan 28, 2024
f975282
remove redundant strings
aaronsteers Jan 28, 2024
ace7208
Merge remote-tracking branch 'origin/master' into aj/airbyte-lib/inst…
aaronsteers Jan 28, 2024
8775c1b
update docs (removes empty cloud page)
aaronsteers Jan 28, 2024
34cb485
Merge branch 'aj/airbyte-lib/install-failure-handling' into aj/airbyt…
aaronsteers Jan 28, 2024
f24226d
remove unused lock
aaronsteers Jan 30, 2024
7370d83
rename AirbyteConnectoNotFoundError to AirbyteConnectorExecutableNotF…
aaronsteers Jan 30, 2024
cfafccc
Merge branch 'master' into aj/airbyte-lib/install-failure-handling
aaronsteers Jan 30, 2024
8446838
allow prereleases in version check
aaronsteers Jan 30, 2024
8b85a67
Merge branch 'aj/airbyte-lib/install-failure-handling' into aj/airbyt…
aaronsteers Jan 30, 2024
11ebbb8
Merge branch 'master' into aj/airbyte-lib/progress-print
aaronsteers Jan 30, 2024
a2d6a5c
fix missing copyright
aaronsteers Jan 30, 2024
df77431
Merge branch 'master' into aj/airbyte-lib/progress-print
aaronsteers Jan 30, 2024
4d6a5aa
update docs
aaronsteers Jan 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions airbyte-lib/airbyte_lib/_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from airbyte_lib import exceptions as exc
from airbyte_lib._util import protocol_util # Internal utility functions
from airbyte_lib.progress import progress


if TYPE_CHECKING:
Expand All @@ -40,7 +41,7 @@
from airbyte_lib.config import CacheConfigBase


DEFAULT_BATCH_SIZE = 10000
DEFAULT_BATCH_SIZE = 10_000


class BatchHandle:
Expand Down Expand Up @@ -95,7 +96,7 @@ def register_source(
For now, only one source at a time is supported.
If this method is called multiple times, the last call will overwrite the previous one.

TODO: Expand this to handle mutliple sources.
TODO: Expand this to handle multiple sources.
"""
_ = source_name
self.source_catalog = incoming_source_catalog
Expand Down Expand Up @@ -157,6 +158,7 @@ def process_airbyte_messages(
if len(stream_batch) >= max_batch_size:
record_batch = pa.Table.from_pylist(stream_batch)
self._process_batch(stream_name, record_batch)
progress.log_batch_written(stream_name, len(stream_batch))
stream_batch.clear()

elif message.type is Type.STATE:
Expand All @@ -180,14 +182,16 @@ def process_airbyte_messages(
)

# We are at the end of the stream. Process whatever else is queued.
for stream_name, batch in stream_batches.items():
if batch:
record_batch = pa.Table.from_pylist(batch)
for stream_name, stream_batch in stream_batches.items():
if stream_batch:
record_batch = pa.Table.from_pylist(stream_batch)
self._process_batch(stream_name, record_batch)
progress.log_batch_written(stream_name, len(stream_batch))

# Finalize any pending batches
for stream_name in list(self._pending_batches.keys()):
self._finalize_batches(stream_name)
progress.log_stream_finalized(stream_name)

@final
def _process_batch(
Expand Down Expand Up @@ -287,7 +291,10 @@ def _finalizing_batches(
state_messages_to_finalize = self._pending_state_messages[stream_name].copy()
self._pending_batches[stream_name].clear()
self._pending_state_messages[stream_name].clear()

progress.log_batches_finalizing(stream_name, len(batches_to_finalize))
yield batches_to_finalize
progress.log_batches_finalized(stream_name, len(batches_to_finalize))

self._finalized_batches[stream_name].update(batches_to_finalize)
self._finalized_state_messages[stream_name] += state_messages_to_finalize
Expand Down
320 changes: 320 additions & 0 deletions airbyte-lib/airbyte_lib/progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

"""A simple progress bar for the command line and IPython notebooks."""
from __future__ import annotations

import datetime
import math
import time
from contextlib import suppress
from typing import cast

from rich.errors import LiveError
from rich.live import Live as RichLive
from rich.markdown import Markdown as RichMarkdown


try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like these small QOL checks, but could we also add an option for this like pinecone does?

https://github.com/pinecone-io/pinecone-python-client/blob/61c19ad5a6a70321db44189dd90e603703670869/pinecone/data/index.py#L154

Some environments don't handle this kind of using the console very well and it's nice to be able to mute it for a clean output in a more "production" setting.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm definitely on board for this. The refactoring would be significant at this stage though, specifically because the progress messages are currently sent from a few different places in the codebase. While I'm not able to do so in this iteration, I definitely think this is worth doing and will probably get to it down the road.

IS_NOTEBOOK = True
from IPython import display as ipy_display

except ImportError:
ipy_display = None
IS_NOTEBOOK = False


MAX_UPDATE_FREQUENCY = 1000
"""The max number of records to read before updating the progress bar."""


def _to_time_str(timestamp: float) -> str:
"""Convert a timestamp float to a local time string.

For now, we'll just use UTC to avoid breaking tests. In the future, we should
return a local time string.
"""
datetime_obj = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc)
# TODO: Uncomment this line when we can get tests to properly account for local timezones.
# For now, we'll just use UTC to avoid breaking tests.
# datetime_obj = datetime_obj.astimezone()
return datetime_obj.strftime("%H:%M:%S")


def _get_elapsed_time_str(seconds: int) -> str:
"""Return duration as a string.

Seconds are included until 10 minutes is exceeded.
Minutes are always included after 1 minute elapsed.
Hours are always included after 1 hour elapsed.
"""
if seconds <= 60: # noqa: PLR2004 # Magic numbers OK here.
return f"{seconds} seconds"

if seconds < 60 * 10:
minutes = seconds // 60
seconds = seconds % 60
return f"{minutes}min {seconds}s"

if seconds < 60 * 60:
minutes = seconds // 60
seconds = seconds % 60
return f"{minutes}min"

hours = seconds // (60 * 60)
minutes = (seconds % (60 * 60)) // 60
return f"{hours}hr {minutes}min"


class ReadProgress:
"""A simple progress bar for the command line and IPython notebooks."""

def __init__(self) -> None:
"""Initialize the progress tracker."""
# Streams expected (for progress bar)
self.num_streams_expected = 0

# Reads
self.read_start_time = time.time()
self.read_end_time: float | None = None
self.total_records_read = 0

# Writes
self.total_records_written = 0
self.total_batches_written = 0
self.written_stream_names: set[str] = set()

# Finalization
self.finalize_start_time: float | None = None
self.finalize_end_time: float | None = None
self.total_records_finalized = 0
self.total_batches_finalized = 0
self.finalized_stream_names: set[str] = set()

self.last_update_time: float | None = None

self.rich_view: RichLive | None = None
if not IS_NOTEBOOK:
# If we're in a terminal, use a Rich view to display the progress updates.
self.rich_view = RichLive()
try:
self.rich_view.start()
except LiveError:
self.rich_view = None

def __del__(self) -> None:
"""Close the Rich view."""
if self.rich_view:
with suppress(Exception):
self.rich_view.stop()

def log_success(self) -> None:
"""Log success and stop tracking progress."""
if self.finalize_end_time is None:
# If we haven't already finalized, do so now.

self.finalize_end_time = time.time()

self.update_display(force_refresh=True)
if self.rich_view:
with suppress(Exception):
self.rich_view.stop()

def reset(self, num_streams_expected: int) -> None:
"""Reset the progress tracker."""
# Streams expected (for progress bar)
self.num_streams_expected = num_streams_expected

# Reads
self.read_start_time = time.time()
self.read_end_time = None
self.total_records_read = 0

# Writes
self.total_records_written = 0
self.total_batches_written = 0
self.written_stream_names = set()

# Finalization
self.finalize_start_time = None
self.finalize_end_time = None
self.total_records_finalized = 0
self.total_batches_finalized = 0
self.finalized_stream_names = set()

@property
def elapsed_seconds(self) -> int:
"""Return the number of seconds elapsed since the read operation started."""
if self.finalize_end_time:
return int(self.finalize_end_time - self.read_start_time)

return int(time.time() - self.read_start_time)

@property
def elapsed_time_string(self) -> str:
"""Return duration as a string."""
return _get_elapsed_time_str(self.elapsed_seconds)

@property
def elapsed_seconds_since_last_update(self) -> float | None:
"""Return the number of seconds elapsed since the last update."""
if self.last_update_time is None:
return None

return time.time() - self.last_update_time

@property
def elapsed_read_seconds(self) -> int:
"""Return the number of seconds elapsed since the read operation started."""
if self.read_end_time is None:
return int(time.time() - self.read_start_time)

return int(self.read_end_time - self.read_start_time)

@property
def elapsed_read_time_string(self) -> str:
"""Return duration as a string."""
return _get_elapsed_time_str(self.elapsed_read_seconds)

@property
def elapsed_finalization_seconds(self) -> int:
"""Return the number of seconds elapsed since the read operation started."""
if self.finalize_start_time is None:
return 0
if self.finalize_end_time is None:
return int(time.time() - self.finalize_start_time)
return int(self.finalize_end_time - self.finalize_start_time)

@property
def elapsed_finalization_time_str(self) -> str:
"""Return duration as a string."""
return _get_elapsed_time_str(self.elapsed_finalization_seconds)

def log_records_read(self, new_total_count: int) -> None:
"""Load a number of records read."""
self.total_records_read = new_total_count

# This is some math to make updates adaptive to the scale of records read.
# We want to update the display more often when the count is low, and less
# often when the count is high.
updated_period = min(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO it would make sense to throttle these updates based on the elapsed time.

The current approach works well in a lot of scenarios, but gets weird for edge cases like:

  • Records are read really fast in the beginning, then get slow (e.g. because it's reading a second, slower stream) - it would mean it would update really rarely, so the user can't see the slow progress on the second stream well
  • Records are read glacially slow (like one per second). It takes a long time for stuff to show up in the first place

By just updating once a second or so, it's lively without any risk of overloading the terminal

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good feedback. It's not obvious here, but there's also a time-based throttle inside update_display(). In this layer I'm trying to keep as fast as possible performance, so just the math-based check. Probably we could add a time-elapsed check, but this codepath may be called literally thousands of times per second, so I'm wary to change this further right now. Happy to keep iterating after merging though.

MAX_UPDATE_FREQUENCY, 10 ** math.floor(math.log10(self.total_records_read) / 4)
)
if self.total_records_read % updated_period != 0:
return

self.update_display()

def log_batch_written(self, stream_name: str, batch_size: int) -> None:
"""Log that a batch has been written.

Args:
stream_name: The name of the stream.
batch_size: The number of records in the batch.
"""
self.total_records_written += batch_size
self.total_batches_written += 1
self.written_stream_names.add(stream_name)
self.update_display()

def log_batches_finalizing(self, stream_name: str, num_batches: int) -> None:
"""Log that batch are ready to be finalized.

In our current implementation, we ignore the stream name and number of batches.
We just use this as a signal that we're finished reading and have begun to
finalize any accumulated batches.
"""
_ = stream_name, num_batches # unused for now
if self.finalize_start_time is None:
self.read_end_time = time.time()
self.finalize_start_time = self.read_end_time

self.update_display(force_refresh=True)

def log_batches_finalized(self, stream_name: str, num_batches: int) -> None:
"""Log that a batch has been finalized."""
_ = stream_name # unused for now
self.total_batches_finalized += num_batches
self.update_display(force_refresh=True)

def log_stream_finalized(self, stream_name: str) -> None:
"""Log that a stream has been finalized."""
self.finalized_stream_names.add(stream_name)
if len(self.finalized_stream_names) == self.num_streams_expected:
self.log_success()

self.update_display(force_refresh=True)

def update_display(self, *, force_refresh: bool = False) -> None:
"""Update the display."""
# Don't update more than twice per second unless force_refresh is True.
if (
not force_refresh
and self.last_update_time # if not set, then we definitely need to update
and cast(float, self.elapsed_seconds_since_last_update) < 0.5 # noqa: PLR2004
):
return

status_message = self._get_status_message()

if IS_NOTEBOOK:
# We're in a notebook so use the IPython display.
ipy_display.clear_output(wait=True)
ipy_display.display(ipy_display.Markdown(status_message))

elif self.rich_view is not None:
self.rich_view.update(RichMarkdown(status_message))

self.last_update_time = time.time()

def _get_status_message(self) -> str:
"""Compile and return a status message."""
# Format start time as a friendly string in local timezone:
start_time_str = _to_time_str(self.read_start_time)
records_per_second: float = 0.0
if self.elapsed_read_seconds > 0:
records_per_second = round(self.total_records_read / self.elapsed_read_seconds, 1)
status_message = (
f"## Read Progress\n\n"
f"Started reading at {start_time_str}.\n\n"
f"Read **{self.total_records_read:,}** records "
f"over **{self.elapsed_read_time_string}** "
f"({records_per_second:,} records / second).\n\n"
)
if self.total_records_written > 0:
status_message += (
f"Wrote **{self.total_records_written:,}** records "
f"over {self.total_batches_written:,} batches.\n\n"
)
if self.read_end_time is not None:
read_end_time_str = _to_time_str(self.read_end_time)
status_message += f"Finished reading at {read_end_time_str}.\n\n"
if self.finalize_start_time is not None:
finalize_start_time_str = _to_time_str(self.finalize_start_time)
status_message += f"Started finalizing streams at {finalize_start_time_str}.\n\n"
status_message += (
f"Finalized **{self.total_batches_finalized}** batches "
f"over {self.elapsed_finalization_time_str}.\n\n"
)
if self.finalized_stream_names:
status_message += (
f"Completed {len(self.finalized_stream_names)} "
+ (f"out of {self.num_streams_expected} " if self.num_streams_expected else "")
+ "streams:\n\n"
)
for stream_name in self.finalized_stream_names:
status_message += f" - {stream_name}\n"

status_message += "\n\n"

if self.finalize_end_time is not None:
completion_time_str = _to_time_str(self.finalize_end_time)
status_message += (
f"Completed writing at {completion_time_str}. "
f"Total time elapsed: {self.elapsed_time_string}\n\n"
)
status_message += "\n------------------------------------------------\n"

return status_message


progress = ReadProgress()
Loading