Skip to content

Commit 6408023

Browse files
aaronsteersjbfbell
authored andcommitted
AirbyteLib: Progress Printer (#34588)
1 parent 98d760f commit 6408023

File tree

9 files changed

+640
-8
lines changed

9 files changed

+640
-8
lines changed

airbyte-lib/airbyte_lib/_processors.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
from airbyte_lib import exceptions as exc
3434
from airbyte_lib._util import protocol_util # Internal utility functions
35+
from airbyte_lib.progress import progress
3536

3637

3738
if TYPE_CHECKING:
@@ -40,7 +41,7 @@
4041
from airbyte_lib.config import CacheConfigBase
4142

4243

43-
DEFAULT_BATCH_SIZE = 10000
44+
DEFAULT_BATCH_SIZE = 10_000
4445

4546

4647
class BatchHandle:
@@ -95,7 +96,7 @@ def register_source(
9596
For now, only one source at a time is supported.
9697
If this method is called multiple times, the last call will overwrite the previous one.
9798
98-
TODO: Expand this to handle mutliple sources.
99+
TODO: Expand this to handle multiple sources.
99100
"""
100101
_ = source_name
101102
self.source_catalog = incoming_source_catalog
@@ -157,6 +158,7 @@ def process_airbyte_messages(
157158
if len(stream_batch) >= max_batch_size:
158159
record_batch = pa.Table.from_pylist(stream_batch)
159160
self._process_batch(stream_name, record_batch)
161+
progress.log_batch_written(stream_name, len(stream_batch))
160162
stream_batch.clear()
161163

162164
elif message.type is Type.STATE:
@@ -180,14 +182,16 @@ def process_airbyte_messages(
180182
)
181183

182184
# We are at the end of the stream. Process whatever else is queued.
183-
for stream_name, batch in stream_batches.items():
184-
if batch:
185-
record_batch = pa.Table.from_pylist(batch)
185+
for stream_name, stream_batch in stream_batches.items():
186+
if stream_batch:
187+
record_batch = pa.Table.from_pylist(stream_batch)
186188
self._process_batch(stream_name, record_batch)
189+
progress.log_batch_written(stream_name, len(stream_batch))
187190

188191
# Finalize any pending batches
189192
for stream_name in list(self._pending_batches.keys()):
190193
self._finalize_batches(stream_name)
194+
progress.log_stream_finalized(stream_name)
191195

192196
@final
193197
def _process_batch(
@@ -287,7 +291,10 @@ def _finalizing_batches(
287291
state_messages_to_finalize = self._pending_state_messages[stream_name].copy()
288292
self._pending_batches[stream_name].clear()
289293
self._pending_state_messages[stream_name].clear()
294+
295+
progress.log_batches_finalizing(stream_name, len(batches_to_finalize))
290296
yield batches_to_finalize
297+
progress.log_batches_finalized(stream_name, len(batches_to_finalize))
291298

292299
self._finalized_batches[stream_name].update(batches_to_finalize)
293300
self._finalized_state_messages[stream_name] += state_messages_to_finalize

airbyte-lib/airbyte_lib/progress.py

Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
3+
"""A simple progress bar for the command line and IPython notebooks."""
4+
from __future__ import annotations
5+
6+
import datetime
7+
import math
8+
import time
9+
from contextlib import suppress
10+
from typing import cast
11+
12+
from rich.errors import LiveError
13+
from rich.live import Live as RichLive
14+
from rich.markdown import Markdown as RichMarkdown
15+
16+
17+
try:
18+
IS_NOTEBOOK = True
19+
from IPython import display as ipy_display
20+
21+
except ImportError:
22+
ipy_display = None
23+
IS_NOTEBOOK = False
24+
25+
26+
MAX_UPDATE_FREQUENCY = 1000
27+
"""The max number of records to read before updating the progress bar."""
28+
29+
30+
def _to_time_str(timestamp: float) -> str:
31+
"""Convert a timestamp float to a local time string.
32+
33+
For now, we'll just use UTC to avoid breaking tests. In the future, we should
34+
return a local time string.
35+
"""
36+
datetime_obj = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc)
37+
# TODO: Uncomment this line when we can get tests to properly account for local timezones.
38+
# For now, we'll just use UTC to avoid breaking tests.
39+
# datetime_obj = datetime_obj.astimezone()
40+
return datetime_obj.strftime("%H:%M:%S")
41+
42+
43+
def _get_elapsed_time_str(seconds: int) -> str:
44+
"""Return duration as a string.
45+
46+
Seconds are included until 10 minutes is exceeded.
47+
Minutes are always included after 1 minute elapsed.
48+
Hours are always included after 1 hour elapsed.
49+
"""
50+
if seconds <= 60: # noqa: PLR2004 # Magic numbers OK here.
51+
return f"{seconds} seconds"
52+
53+
if seconds < 60 * 10:
54+
minutes = seconds // 60
55+
seconds = seconds % 60
56+
return f"{minutes}min {seconds}s"
57+
58+
if seconds < 60 * 60:
59+
minutes = seconds // 60
60+
seconds = seconds % 60
61+
return f"{minutes}min"
62+
63+
hours = seconds // (60 * 60)
64+
minutes = (seconds % (60 * 60)) // 60
65+
return f"{hours}hr {minutes}min"
66+
67+
68+
class ReadProgress:
69+
"""A simple progress bar for the command line and IPython notebooks."""
70+
71+
def __init__(self) -> None:
72+
"""Initialize the progress tracker."""
73+
# Streams expected (for progress bar)
74+
self.num_streams_expected = 0
75+
76+
# Reads
77+
self.read_start_time = time.time()
78+
self.read_end_time: float | None = None
79+
self.total_records_read = 0
80+
81+
# Writes
82+
self.total_records_written = 0
83+
self.total_batches_written = 0
84+
self.written_stream_names: set[str] = set()
85+
86+
# Finalization
87+
self.finalize_start_time: float | None = None
88+
self.finalize_end_time: float | None = None
89+
self.total_records_finalized = 0
90+
self.total_batches_finalized = 0
91+
self.finalized_stream_names: set[str] = set()
92+
93+
self.last_update_time: float | None = None
94+
95+
self.rich_view: RichLive | None = None
96+
if not IS_NOTEBOOK:
97+
# If we're in a terminal, use a Rich view to display the progress updates.
98+
self.rich_view = RichLive()
99+
try:
100+
self.rich_view.start()
101+
except LiveError:
102+
self.rich_view = None
103+
104+
def __del__(self) -> None:
105+
"""Close the Rich view."""
106+
if self.rich_view:
107+
with suppress(Exception):
108+
self.rich_view.stop()
109+
110+
def log_success(self) -> None:
111+
"""Log success and stop tracking progress."""
112+
if self.finalize_end_time is None:
113+
# If we haven't already finalized, do so now.
114+
115+
self.finalize_end_time = time.time()
116+
117+
self.update_display(force_refresh=True)
118+
if self.rich_view:
119+
with suppress(Exception):
120+
self.rich_view.stop()
121+
122+
def reset(self, num_streams_expected: int) -> None:
123+
"""Reset the progress tracker."""
124+
# Streams expected (for progress bar)
125+
self.num_streams_expected = num_streams_expected
126+
127+
# Reads
128+
self.read_start_time = time.time()
129+
self.read_end_time = None
130+
self.total_records_read = 0
131+
132+
# Writes
133+
self.total_records_written = 0
134+
self.total_batches_written = 0
135+
self.written_stream_names = set()
136+
137+
# Finalization
138+
self.finalize_start_time = None
139+
self.finalize_end_time = None
140+
self.total_records_finalized = 0
141+
self.total_batches_finalized = 0
142+
self.finalized_stream_names = set()
143+
144+
@property
145+
def elapsed_seconds(self) -> int:
146+
"""Return the number of seconds elapsed since the read operation started."""
147+
if self.finalize_end_time:
148+
return int(self.finalize_end_time - self.read_start_time)
149+
150+
return int(time.time() - self.read_start_time)
151+
152+
@property
153+
def elapsed_time_string(self) -> str:
154+
"""Return duration as a string."""
155+
return _get_elapsed_time_str(self.elapsed_seconds)
156+
157+
@property
158+
def elapsed_seconds_since_last_update(self) -> float | None:
159+
"""Return the number of seconds elapsed since the last update."""
160+
if self.last_update_time is None:
161+
return None
162+
163+
return time.time() - self.last_update_time
164+
165+
@property
166+
def elapsed_read_seconds(self) -> int:
167+
"""Return the number of seconds elapsed since the read operation started."""
168+
if self.read_end_time is None:
169+
return int(time.time() - self.read_start_time)
170+
171+
return int(self.read_end_time - self.read_start_time)
172+
173+
@property
174+
def elapsed_read_time_string(self) -> str:
175+
"""Return duration as a string."""
176+
return _get_elapsed_time_str(self.elapsed_read_seconds)
177+
178+
@property
179+
def elapsed_finalization_seconds(self) -> int:
180+
"""Return the number of seconds elapsed since the read operation started."""
181+
if self.finalize_start_time is None:
182+
return 0
183+
if self.finalize_end_time is None:
184+
return int(time.time() - self.finalize_start_time)
185+
return int(self.finalize_end_time - self.finalize_start_time)
186+
187+
@property
188+
def elapsed_finalization_time_str(self) -> str:
189+
"""Return duration as a string."""
190+
return _get_elapsed_time_str(self.elapsed_finalization_seconds)
191+
192+
def log_records_read(self, new_total_count: int) -> None:
193+
"""Load a number of records read."""
194+
self.total_records_read = new_total_count
195+
196+
# This is some math to make updates adaptive to the scale of records read.
197+
# We want to update the display more often when the count is low, and less
198+
# often when the count is high.
199+
updated_period = min(
200+
MAX_UPDATE_FREQUENCY, 10 ** math.floor(math.log10(self.total_records_read) / 4)
201+
)
202+
if self.total_records_read % updated_period != 0:
203+
return
204+
205+
self.update_display()
206+
207+
def log_batch_written(self, stream_name: str, batch_size: int) -> None:
208+
"""Log that a batch has been written.
209+
210+
Args:
211+
stream_name: The name of the stream.
212+
batch_size: The number of records in the batch.
213+
"""
214+
self.total_records_written += batch_size
215+
self.total_batches_written += 1
216+
self.written_stream_names.add(stream_name)
217+
self.update_display()
218+
219+
def log_batches_finalizing(self, stream_name: str, num_batches: int) -> None:
220+
"""Log that batch are ready to be finalized.
221+
222+
In our current implementation, we ignore the stream name and number of batches.
223+
We just use this as a signal that we're finished reading and have begun to
224+
finalize any accumulated batches.
225+
"""
226+
_ = stream_name, num_batches # unused for now
227+
if self.finalize_start_time is None:
228+
self.read_end_time = time.time()
229+
self.finalize_start_time = self.read_end_time
230+
231+
self.update_display(force_refresh=True)
232+
233+
def log_batches_finalized(self, stream_name: str, num_batches: int) -> None:
234+
"""Log that a batch has been finalized."""
235+
_ = stream_name # unused for now
236+
self.total_batches_finalized += num_batches
237+
self.update_display(force_refresh=True)
238+
239+
def log_stream_finalized(self, stream_name: str) -> None:
240+
"""Log that a stream has been finalized."""
241+
self.finalized_stream_names.add(stream_name)
242+
if len(self.finalized_stream_names) == self.num_streams_expected:
243+
self.log_success()
244+
245+
self.update_display(force_refresh=True)
246+
247+
def update_display(self, *, force_refresh: bool = False) -> None:
248+
"""Update the display."""
249+
# Don't update more than twice per second unless force_refresh is True.
250+
if (
251+
not force_refresh
252+
and self.last_update_time # if not set, then we definitely need to update
253+
and cast(float, self.elapsed_seconds_since_last_update) < 0.5 # noqa: PLR2004
254+
):
255+
return
256+
257+
status_message = self._get_status_message()
258+
259+
if IS_NOTEBOOK:
260+
# We're in a notebook so use the IPython display.
261+
ipy_display.clear_output(wait=True)
262+
ipy_display.display(ipy_display.Markdown(status_message))
263+
264+
elif self.rich_view is not None:
265+
self.rich_view.update(RichMarkdown(status_message))
266+
267+
self.last_update_time = time.time()
268+
269+
def _get_status_message(self) -> str:
270+
"""Compile and return a status message."""
271+
# Format start time as a friendly string in local timezone:
272+
start_time_str = _to_time_str(self.read_start_time)
273+
records_per_second: float = 0.0
274+
if self.elapsed_read_seconds > 0:
275+
records_per_second = round(self.total_records_read / self.elapsed_read_seconds, 1)
276+
status_message = (
277+
f"## Read Progress\n\n"
278+
f"Started reading at {start_time_str}.\n\n"
279+
f"Read **{self.total_records_read:,}** records "
280+
f"over **{self.elapsed_read_time_string}** "
281+
f"({records_per_second:,} records / second).\n\n"
282+
)
283+
if self.total_records_written > 0:
284+
status_message += (
285+
f"Wrote **{self.total_records_written:,}** records "
286+
f"over {self.total_batches_written:,} batches.\n\n"
287+
)
288+
if self.read_end_time is not None:
289+
read_end_time_str = _to_time_str(self.read_end_time)
290+
status_message += f"Finished reading at {read_end_time_str}.\n\n"
291+
if self.finalize_start_time is not None:
292+
finalize_start_time_str = _to_time_str(self.finalize_start_time)
293+
status_message += f"Started finalizing streams at {finalize_start_time_str}.\n\n"
294+
status_message += (
295+
f"Finalized **{self.total_batches_finalized}** batches "
296+
f"over {self.elapsed_finalization_time_str}.\n\n"
297+
)
298+
if self.finalized_stream_names:
299+
status_message += (
300+
f"Completed {len(self.finalized_stream_names)} "
301+
+ (f"out of {self.num_streams_expected} " if self.num_streams_expected else "")
302+
+ "streams:\n\n"
303+
)
304+
for stream_name in self.finalized_stream_names:
305+
status_message += f" - {stream_name}\n"
306+
307+
status_message += "\n\n"
308+
309+
if self.finalize_end_time is not None:
310+
completion_time_str = _to_time_str(self.finalize_end_time)
311+
status_message += (
312+
f"Completed writing at {completion_time_str}. "
313+
f"Total time elapsed: {self.elapsed_time_string}\n\n"
314+
)
315+
status_message += "\n------------------------------------------------\n"
316+
317+
return status_message
318+
319+
320+
progress = ReadProgress()

0 commit comments

Comments
 (0)