Skip to content

AirbyteLib: Progress Printer #34588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 60 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 56 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
abbb256
new exception type: AirbyteConnectorNotRegisteredError
aaronsteers Jan 26, 2024
3845f5c
make constructors more resilient
aaronsteers Jan 26, 2024
9fccace
print stderr in exception text, cleanup failed install, remove editab…
aaronsteers Jan 26, 2024
a217a6e
move auto-install out of venv constructor, for easier debugging
aaronsteers Jan 26, 2024
6aa85d6
add test to assert that install failure includes pip log text
aaronsteers Jan 26, 2024
dddbc78
update docs
aaronsteers Jan 26, 2024
b1d966b
auto-format
aaronsteers Jan 26, 2024
f61152a
update docs
aaronsteers Jan 26, 2024
d665088
refactor version handling, control for side effects
aaronsteers Jan 26, 2024
809918b
fix exception handling in _get_installed_version()
aaronsteers Jan 26, 2024
4a41ffb
fix tests
aaronsteers Jan 26, 2024
bab5e06
improve thread safety
aaronsteers Jan 27, 2024
10ce077
handle quoted spaces in pip_url
aaronsteers Jan 27, 2024
063bba3
fix import sorts
aaronsteers Jan 27, 2024
ab75be4
standalone validate_config() method
aaronsteers Jan 27, 2024
8880b0b
add Source.yaml_spec property
aaronsteers Jan 27, 2024
3773149
make _yaml_spec a protected member
aaronsteers Jan 27, 2024
90918c8
fix too-limited json package_data glob
aaronsteers Jan 27, 2024
5f0bcb3
basic progress reporting
aaronsteers Jan 27, 2024
9ed1929
remove raw=True
aaronsteers Jan 27, 2024
ec4d8dd
add progress tracker class
aaronsteers Jan 28, 2024
5d4eb45
update docs
aaronsteers Jan 28, 2024
a164168
bug fixes
aaronsteers Jan 28, 2024
0c62f04
fix separator
aaronsteers Jan 28, 2024
bec8d11
bug fixes
aaronsteers Jan 28, 2024
df24520
bug fix
aaronsteers Jan 28, 2024
6cc9f50
improved logs
aaronsteers Jan 28, 2024
6d6708c
fix progress bugs, add unit tests
aaronsteers Jan 28, 2024
bf11816
add reset() at beginning of sync
aaronsteers Jan 28, 2024
f1505f1
linting fix and fix missing stream count for progress reset
aaronsteers Jan 28, 2024
625afb6
Update airbyte-integrations/connectors/source-github/setup.py
aaronsteers Jan 28, 2024
7b7150b
add faker test script to sample terminal output
aaronsteers Jan 28, 2024
c17b652
fix logs
aaronsteers Jan 28, 2024
617371b
add terminal implementation via rich, add final complete message
aaronsteers Jan 28, 2024
94a90b5
update scale in faker example
aaronsteers Jan 28, 2024
6cc2cfd
poetry add rich
aaronsteers Jan 28, 2024
1007b88
cleanup helper functions
aaronsteers Jan 28, 2024
2a2fdc8
update docs
aaronsteers Jan 28, 2024
c4c6a6b
fix datetime refs
aaronsteers Jan 28, 2024
0b6895b
fix tests, make tests more flexible
aaronsteers Jan 28, 2024
6675233
reorder import
aaronsteers Jan 28, 2024
9197728
fix missing copyright str
aaronsteers Jan 28, 2024
f73f288
docstring
aaronsteers Jan 28, 2024
dd9ac99
update docs
aaronsteers Jan 28, 2024
a2bed01
revert source-github change
aaronsteers Jan 28, 2024
2e49154
updated comment
aaronsteers Jan 28, 2024
0772a68
Merge branch 'aj/airbyte-lib/install-failure-handling' into aj/airbyt…
aaronsteers Jan 28, 2024
f975282
remove redundant strings
aaronsteers Jan 28, 2024
ace7208
Merge remote-tracking branch 'origin/master' into aj/airbyte-lib/inst…
aaronsteers Jan 28, 2024
8775c1b
update docs (removes empty cloud page)
aaronsteers Jan 28, 2024
34cb485
Merge branch 'aj/airbyte-lib/install-failure-handling' into aj/airbyt…
aaronsteers Jan 28, 2024
f24226d
remove unused lock
aaronsteers Jan 30, 2024
7370d83
rename AirbyteConnectoNotFoundError to AirbyteConnectorExecutableNotF…
aaronsteers Jan 30, 2024
cfafccc
Merge branch 'master' into aj/airbyte-lib/install-failure-handling
aaronsteers Jan 30, 2024
8446838
allow prereleases in version check
aaronsteers Jan 30, 2024
8b85a67
Merge branch 'aj/airbyte-lib/install-failure-handling' into aj/airbyt…
aaronsteers Jan 30, 2024
11ebbb8
Merge branch 'master' into aj/airbyte-lib/progress-print
aaronsteers Jan 30, 2024
a2d6a5c
fix missing copyright
aaronsteers Jan 30, 2024
df77431
Merge branch 'master' into aj/airbyte-lib/progress-print
aaronsteers Jan 30, 2024
4d6a5aa
update docs
aaronsteers Jan 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 235 additions & 75 deletions airbyte-lib/airbyte_lib/_executor.py

Large diffs are not rendered by default.

76 changes: 51 additions & 25 deletions airbyte-lib/airbyte_lib/_factories/connector_factories.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from __future__ import annotations

import shutil
from pathlib import Path
from typing import Any

from airbyte_lib._executor import Executor, PathExecutor, VenvExecutor
from airbyte_lib.exceptions import AirbyteLibInputError
from airbyte_lib.registry import get_connector_metadata
from airbyte_lib import exceptions as exc
from airbyte_lib._executor import PathExecutor, VenvExecutor
from airbyte_lib.registry import ConnectorMetadata, get_connector_metadata
from airbyte_lib.source import Source


Expand All @@ -15,7 +17,7 @@ def get_connector(
pip_url: str | None = None,
config: dict[str, Any] | None = None,
*,
use_local_install: bool = False,
local_executable: Path | str | None = None,
install_if_missing: bool = True,
) -> Source:
"""Get a connector by name and version.
Expand All @@ -29,34 +31,58 @@ def get_connector(
connector name.
config: connector config - if not provided, you need to set it later via the set_config
method.
use_local_install: whether to use a virtual environment to run the connector. If True, the
connector is expected to be available on the path (e.g. installed via pip). If False,
the connector will be installed automatically in a virtual environment.
install_if_missing: whether to install the connector if it is not available locally. This
parameter is ignored if use_local_install is True.
local_executable: If set, the connector will be assumed to already be installed and will be
executed using this path or executable name. Otherwise, the connector will be installed
automatically in a virtual environment.
install_if_missing: Whether to install the connector if it is not available locally. This
parameter is ignored when local_executable is set.
"""
metadata = get_connector_metadata(name)
if use_local_install:
if local_executable:
if pip_url:
raise AirbyteLibInputError(
message="Param 'pip_url' is not supported when 'use_local_install' is True."
raise exc.AirbyteLibInputError(
message="Param 'pip_url' is not supported when 'local_executable' is set."
)
if version:
raise AirbyteLibInputError(
message="Param 'version' is not supported when 'use_local_install' is True."
raise exc.AirbyteLibInputError(
message="Param 'version' is not supported when 'local_executable' is set."
)
executor: Executor = PathExecutor(
metadata=metadata,
target_version=version,
)

else:
executor = VenvExecutor(
metadata=metadata,
target_version=version,
install_if_missing=install_if_missing,
pip_url=pip_url,
if isinstance(local_executable, str):
if "/" in local_executable or "\\" in local_executable:
# Assume this is a path
local_executable = Path(local_executable).absolute()
else:
which_executable = shutil.which(local_executable)
if which_executable is None:
raise FileNotFoundError(local_executable)
local_executable = Path(which_executable).absolute()

return Source(
name=name,
config=config,
executor=PathExecutor(
name=name,
path=local_executable,
),
)

metadata: ConnectorMetadata | None = None
try:
metadata = get_connector_metadata(name)
except exc.AirbyteConnectorNotRegisteredError:
if not pip_url:
# We don't have a pip url or registry entry, so we can't install the connector
raise

executor = VenvExecutor(
name=name,
metadata=metadata,
target_version=version,
pip_url=pip_url,
)
if install_if_missing:
executor.ensure_installation()

return Source(
executor=executor,
name=name,
Expand Down
17 changes: 12 additions & 5 deletions airbyte-lib/airbyte_lib/_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from airbyte_lib import exceptions as exc
from airbyte_lib._util import protocol_util # Internal utility functions
from airbyte_lib.progress import progress


if TYPE_CHECKING:
Expand All @@ -40,7 +41,7 @@
from airbyte_lib.config import CacheConfigBase


DEFAULT_BATCH_SIZE = 10000
DEFAULT_BATCH_SIZE = 10_000


class BatchHandle:
Expand Down Expand Up @@ -95,7 +96,7 @@ def register_source(
For now, only one source at a time is supported.
If this method is called multiple times, the last call will overwrite the previous one.

TODO: Expand this to handle mutliple sources.
TODO: Expand this to handle multiple sources.
"""
_ = source_name
self.source_catalog = incoming_source_catalog
Expand Down Expand Up @@ -157,6 +158,7 @@ def process_airbyte_messages(
if len(stream_batch) >= max_batch_size:
record_batch = pa.Table.from_pylist(stream_batch)
self._process_batch(stream_name, record_batch)
progress.log_batch_written(stream_name, len(stream_batch))
stream_batch.clear()

elif message.type is Type.STATE:
Expand All @@ -180,14 +182,16 @@ def process_airbyte_messages(
)

# We are at the end of the stream. Process whatever else is queued.
for stream_name, batch in stream_batches.items():
if batch:
record_batch = pa.Table.from_pylist(batch)
for stream_name, stream_batch in stream_batches.items():
if stream_batch:
record_batch = pa.Table.from_pylist(stream_batch)
self._process_batch(stream_name, record_batch)
progress.log_batch_written(stream_name, len(stream_batch))

# Finalize any pending batches
for stream_name in list(self._pending_batches.keys()):
self._finalize_batches(stream_name)
progress.log_stream_finalized(stream_name)

@final
def _process_batch(
Expand Down Expand Up @@ -287,7 +291,10 @@ def _finalizing_batches(
state_messages_to_finalize = self._pending_state_messages[stream_name].copy()
self._pending_batches[stream_name].clear()
self._pending_state_messages[stream_name].clear()

progress.log_batches_finalizing(stream_name, len(batches_to_finalize))
yield batches_to_finalize
progress.log_batches_finalized(stream_name, len(batches_to_finalize))

self._finalized_batches[stream_name].update(batches_to_finalize)
self._finalized_state_messages[stream_name] += state_messages_to_finalize
Expand Down
12 changes: 10 additions & 2 deletions airbyte-lib/airbyte_lib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ class AirbyteConnectorRegistryError(AirbyteError):
"""Error when accessing the connector registry."""


@dataclass
class AirbyteConnectorNotRegisteredError(AirbyteConnectorRegistryError):
"""Connector not found in registry."""

connector_name: str | None = None
guidance = "Please double check the connector name."


# Connector Errors


Expand All @@ -184,8 +192,8 @@ class AirbyteConnectorError(AirbyteError):
connector_name: str | None = None


class AirbyteConnectorNotFoundError(AirbyteConnectorError):
"""Connector not found."""
class AirbyteConnectorExecutableNotFoundError(AirbyteConnectorError):
"""Connector executable not found."""


class AirbyteConnectorInstallationError(AirbyteConnectorError):
Expand Down
Loading