Skip to content

Commit 3a005d7

Browse files
aaronsteersjbfbell
authored andcommitted
AirbyteLib: Installation improvements and improved error handling (#34572)
1 parent 372edd2 commit 3a005d7

File tree

10 files changed

+609
-189
lines changed

10 files changed

+609
-189
lines changed

airbyte-lib/airbyte_lib/_executor.py

Lines changed: 235 additions & 75 deletions
Large diffs are not rendered by default.

airbyte-lib/airbyte_lib/_factories/connector_factories.py

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
22
from __future__ import annotations
33

4+
import shutil
5+
from pathlib import Path
46
from typing import Any
57

6-
from airbyte_lib._executor import Executor, PathExecutor, VenvExecutor
7-
from airbyte_lib.exceptions import AirbyteLibInputError
8-
from airbyte_lib.registry import get_connector_metadata
8+
from airbyte_lib import exceptions as exc
9+
from airbyte_lib._executor import PathExecutor, VenvExecutor
10+
from airbyte_lib.registry import ConnectorMetadata, get_connector_metadata
911
from airbyte_lib.source import Source
1012

1113

@@ -15,7 +17,7 @@ def get_connector(
1517
pip_url: str | None = None,
1618
config: dict[str, Any] | None = None,
1719
*,
18-
use_local_install: bool = False,
20+
local_executable: Path | str | None = None,
1921
install_if_missing: bool = True,
2022
) -> Source:
2123
"""Get a connector by name and version.
@@ -29,34 +31,58 @@ def get_connector(
2931
connector name.
3032
config: connector config - if not provided, you need to set it later via the set_config
3133
method.
32-
use_local_install: whether to use a virtual environment to run the connector. If True, the
33-
connector is expected to be available on the path (e.g. installed via pip). If False,
34-
the connector will be installed automatically in a virtual environment.
35-
install_if_missing: whether to install the connector if it is not available locally. This
36-
parameter is ignored if use_local_install is True.
34+
local_executable: If set, the connector will be assumed to already be installed and will be
35+
executed using this path or executable name. Otherwise, the connector will be installed
36+
automatically in a virtual environment.
37+
install_if_missing: Whether to install the connector if it is not available locally. This
38+
parameter is ignored when local_executable is set.
3739
"""
38-
metadata = get_connector_metadata(name)
39-
if use_local_install:
40+
if local_executable:
4041
if pip_url:
41-
raise AirbyteLibInputError(
42-
message="Param 'pip_url' is not supported when 'use_local_install' is True."
42+
raise exc.AirbyteLibInputError(
43+
message="Param 'pip_url' is not supported when 'local_executable' is set."
4344
)
4445
if version:
45-
raise AirbyteLibInputError(
46-
message="Param 'version' is not supported when 'use_local_install' is True."
46+
raise exc.AirbyteLibInputError(
47+
message="Param 'version' is not supported when 'local_executable' is set."
4748
)
48-
executor: Executor = PathExecutor(
49-
metadata=metadata,
50-
target_version=version,
51-
)
5249

53-
else:
54-
executor = VenvExecutor(
55-
metadata=metadata,
56-
target_version=version,
57-
install_if_missing=install_if_missing,
58-
pip_url=pip_url,
50+
if isinstance(local_executable, str):
51+
if "/" in local_executable or "\\" in local_executable:
52+
# Assume this is a path
53+
local_executable = Path(local_executable).absolute()
54+
else:
55+
which_executable = shutil.which(local_executable)
56+
if which_executable is None:
57+
raise FileNotFoundError(local_executable)
58+
local_executable = Path(which_executable).absolute()
59+
60+
return Source(
61+
name=name,
62+
config=config,
63+
executor=PathExecutor(
64+
name=name,
65+
path=local_executable,
66+
),
5967
)
68+
69+
metadata: ConnectorMetadata | None = None
70+
try:
71+
metadata = get_connector_metadata(name)
72+
except exc.AirbyteConnectorNotRegisteredError:
73+
if not pip_url:
74+
# We don't have a pip url or registry entry, so we can't install the connector
75+
raise
76+
77+
executor = VenvExecutor(
78+
name=name,
79+
metadata=metadata,
80+
target_version=version,
81+
pip_url=pip_url,
82+
)
83+
if install_if_missing:
84+
executor.ensure_installation()
85+
6086
return Source(
6187
executor=executor,
6288
name=name,

airbyte-lib/airbyte_lib/exceptions.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,14 @@ class AirbyteConnectorRegistryError(AirbyteError):
174174
"""Error when accessing the connector registry."""
175175

176176

177+
@dataclass
178+
class AirbyteConnectorNotRegisteredError(AirbyteConnectorRegistryError):
179+
"""Connector not found in registry."""
180+
181+
connector_name: str | None = None
182+
guidance = "Please double check the connector name."
183+
184+
177185
# Connector Errors
178186

179187

@@ -184,8 +192,8 @@ class AirbyteConnectorError(AirbyteError):
184192
connector_name: str | None = None
185193

186194

187-
class AirbyteConnectorNotFoundError(AirbyteConnectorError):
188-
"""Connector not found."""
195+
class AirbyteConnectorExecutableNotFoundError(AirbyteConnectorError):
196+
"""Connector executable not found."""
189197

190198

191199
class AirbyteConnectorInstallationError(AirbyteConnectorError):

airbyte-lib/airbyte_lib/registry.py

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import json
55
import os
6+
from copy import copy
67
from dataclasses import dataclass
78
from pathlib import Path
89

@@ -12,47 +13,81 @@
1213
from airbyte_lib.version import get_version
1314

1415

16+
__cache: dict[str, ConnectorMetadata] | None = None
17+
18+
19+
REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY"
20+
REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
21+
22+
1523
@dataclass
1624
class ConnectorMetadata:
1725
name: str
1826
latest_available_version: str
1927

2028

21-
_cache: dict[str, ConnectorMetadata] | None = None
29+
def _get_registry_url() -> str:
30+
if REGISTRY_ENV_VAR in os.environ:
31+
return str(os.environ.get(REGISTRY_ENV_VAR))
2232

23-
REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
33+
return REGISTRY_URL
2434

2535

26-
def _update_cache() -> None:
27-
global _cache
28-
if os.environ.get("AIRBYTE_LOCAL_REGISTRY"):
29-
with Path(str(os.environ.get("AIRBYTE_LOCAL_REGISTRY"))).open() as f:
30-
data = json.load(f)
31-
else:
36+
def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]:
37+
"""Return the registry cache."""
38+
global __cache
39+
if __cache and not force_refresh:
40+
return __cache
41+
42+
registry_url = _get_registry_url()
43+
if registry_url.startswith("http"):
3244
response = requests.get(
33-
REGISTRY_URL, headers={"User-Agent": f"airbyte-lib-{get_version()}"}
45+
registry_url, headers={"User-Agent": f"airbyte-lib-{get_version()}"}
3446
)
3547
response.raise_for_status()
3648
data = response.json()
37-
_cache = {}
49+
else:
50+
# Assume local file
51+
with Path(registry_url).open() as f:
52+
data = json.load(f)
53+
54+
new_cache: dict[str, ConnectorMetadata] = {}
55+
3856
for connector in data["sources"]:
3957
name = connector["dockerRepository"].replace("airbyte/", "")
40-
_cache[name] = ConnectorMetadata(name, connector["dockerImageTag"])
58+
new_cache[name] = ConnectorMetadata(name, connector["dockerImageTag"])
59+
60+
if len(new_cache) == 0:
61+
raise exc.AirbyteLibInternalError(
62+
message="Connector registry is empty.",
63+
context={
64+
"registry_url": _get_registry_url(),
65+
},
66+
)
67+
68+
__cache = new_cache
69+
return __cache
4170

4271

4372
def get_connector_metadata(name: str) -> ConnectorMetadata:
4473
"""Check the cache for the connector.
4574
4675
If the cache is empty, populate by calling update_cache.
4776
"""
48-
if not _cache:
49-
_update_cache()
50-
if not _cache or name not in _cache:
51-
raise exc.AirbyteLibInputError(
52-
message="Connector name not found in registry.",
53-
guidance="Please double check the connector name.",
77+
cache = copy(_get_registry_cache())
78+
if not cache:
79+
raise exc.AirbyteLibInternalError(
80+
message="Connector registry could not be loaded.",
81+
context={
82+
"registry_url": _get_registry_url(),
83+
},
84+
)
85+
if name not in cache:
86+
raise exc.AirbyteConnectorNotRegisteredError(
87+
connector_name=name,
5488
context={
55-
"connector_name": name,
89+
"registry_url": _get_registry_url(),
90+
"available_connectors": sorted(cache.keys()),
5691
},
5792
)
58-
return _cache[name]
93+
return cache[name]

airbyte-lib/airbyte_lib/source.py

Lines changed: 64 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from typing import TYPE_CHECKING, Any
88

99
import jsonschema
10+
import yaml
1011

1112
from airbyte_protocol.models import (
1213
AirbyteCatalog,
@@ -68,7 +69,13 @@ def __init__(
6869
name: str,
6970
config: dict[str, Any] | None = None,
7071
streams: list[str] | None = None,
72+
*,
73+
validate: bool = False,
7174
) -> None:
75+
"""Initialize the source.
76+
77+
If config is provided, it will be validated against the spec if validate is True.
78+
"""
7279
self._processed_records = 0
7380
self.executor = executor
7481
self.name = name
@@ -79,7 +86,7 @@ def __init__(
7986
self._spec: ConnectorSpecification | None = None
8087
self._selected_stream_names: list[str] | None = None
8188
if config is not None:
82-
self.set_config(config)
89+
self.set_config(config, validate=validate)
8390
if streams is not None:
8491
self.set_streams(streams)
8592

@@ -102,8 +109,22 @@ def set_streams(self, streams: list[str]) -> None:
102109
)
103110
self._selected_stream_names = streams
104111

105-
def set_config(self, config: dict[str, Any]) -> None:
106-
self._validate_config(config)
112+
def set_config(
113+
self,
114+
config: dict[str, Any],
115+
*,
116+
validate: bool = False,
117+
) -> None:
118+
"""Set the config for the connector.
119+
120+
If validate is True, raise an exception if the config fails validation.
121+
122+
If validate is False, validation will be deferred until check() or validate_config()
123+
is called.
124+
"""
125+
if validate:
126+
self.validate_config(config)
127+
107128
self._config_dict = config
108129

109130
@property
@@ -131,9 +152,13 @@ def _discover(self) -> AirbyteCatalog:
131152
log_text=self._last_log_messages,
132153
)
133154

134-
def _validate_config(self, config: dict[str, Any]) -> None:
135-
"""Validate the config against the spec."""
155+
def validate_config(self, config: dict[str, Any] | None = None) -> None:
156+
"""Validate the config against the spec.
157+
158+
If config is not provided, the already-set config will be validated.
159+
"""
136160
spec = self._get_spec(force_refresh=False)
161+
config = self._config if config is None else config
137162
jsonschema.validate(config, spec.connectionSpecification)
138163

139164
def get_available_streams(self) -> list[str]:
@@ -161,6 +186,21 @@ def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification:
161186
log_text=self._last_log_messages,
162187
)
163188

189+
@property
190+
def _yaml_spec(self) -> str:
191+
"""Get the spec as a yaml string.
192+
193+
For now, the primary use case is for writing and debugging a valid config for a source.
194+
195+
This is private for now because we probably want better polish before exposing this
196+
as a stable interface. This will also get easier when we have docs links with this info
197+
for each connector.
198+
"""
199+
spec_obj: ConnectorSpecification = self._get_spec()
200+
spec_dict = spec_obj.dict(exclude_unset=True)
201+
# convert to a yaml string
202+
return yaml.dump(spec_dict)
203+
164204
@property
165205
def discovered_catalog(self) -> AirbyteCatalog:
166206
"""Get the raw catalog for the given streams.
@@ -248,17 +288,23 @@ def check(self) -> None:
248288
* Make sure the subprocess is killed when the function returns.
249289
"""
250290
with as_temp_files([self._config]) as [config_file]:
251-
for msg in self._execute(["check", "--config", config_file]):
252-
if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus:
253-
if msg.connectionStatus.status != Status.FAILED:
254-
return # Success!
255-
256-
raise exc.AirbyteConnectorCheckFailedError(
257-
context={
258-
"message": msg.connectionStatus.message,
259-
}
260-
)
261-
raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages)
291+
try:
292+
for msg in self._execute(["check", "--config", config_file]):
293+
if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus:
294+
if msg.connectionStatus.status != Status.FAILED:
295+
return # Success!
296+
297+
raise exc.AirbyteConnectorCheckFailedError(
298+
context={
299+
"message": msg.connectionStatus.message,
300+
}
301+
)
302+
raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages)
303+
except exc.AirbyteConnectorReadError as ex:
304+
raise exc.AirbyteConnectorCheckFailedError(
305+
message="The connector failed to check the connection.",
306+
log_text=ex.log_text,
307+
) from ex
262308

263309
def install(self) -> None:
264310
"""Install the connector if it is not yet installed."""
@@ -338,7 +384,8 @@ def _execute(self, args: list[str]) -> Iterator[AirbyteMessage]:
338384
* Read the output line by line of the subprocess and serialize them AirbyteMessage objects.
339385
Drop if not valid.
340386
"""
341-
self.executor.ensure_installation()
387+
# Fail early if the connector is not installed.
388+
self.executor.ensure_installation(auto_fix=False)
342389

343390
try:
344391
self._last_log_messages = []

0 commit comments

Comments
 (0)