-
Notifications
You must be signed in to change notification settings - Fork 4.5k
🐙 octavia-cli: apply
connections
#10881
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d8da27e
c05c684
120563f
0e67233
71381d4
6ec2dd4
f2f6445
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -3,15 +3,22 @@ | |||||
# | ||||||
|
||||||
import abc | ||||||
import copy | ||||||
import os | ||||||
import time | ||||||
from pathlib import Path | ||||||
from typing import Any, Callable, Optional, Union | ||||||
|
||||||
import airbyte_api_client | ||||||
import yaml | ||||||
from airbyte_api_client.api import destination_api, source_api | ||||||
from airbyte_api_client.api import connection_api, destination_api, source_api | ||||||
from airbyte_api_client.model.airbyte_catalog import AirbyteCatalog | ||||||
from airbyte_api_client.model.connection_create import ConnectionCreate | ||||||
from airbyte_api_client.model.connection_read import ConnectionRead | ||||||
from airbyte_api_client.model.connection_read_list import ConnectionReadList | ||||||
from airbyte_api_client.model.connection_search import ConnectionSearch | ||||||
from airbyte_api_client.model.connection_status import ConnectionStatus | ||||||
from airbyte_api_client.model.connection_update import ConnectionUpdate | ||||||
from airbyte_api_client.model.destination_create import DestinationCreate | ||||||
from airbyte_api_client.model.destination_read import DestinationRead | ||||||
from airbyte_api_client.model.destination_read_list import DestinationReadList | ||||||
|
@@ -107,12 +114,7 @@ def from_file(cls, file_path: str) -> "ResourceState": | |||||
|
||||||
|
||||||
class BaseResource(abc.ABC): | ||||||
@property | ||||||
@abc.abstractmethod | ||||||
def apply_priority( | ||||||
self, | ||||||
): # pragma: no cover | ||||||
pass | ||||||
APPLY_PRIORITY = 0 # Priority of the resource during the apply. 0 means the resource is top priority. | ||||||
|
||||||
@property | ||||||
@abc.abstractmethod | ||||||
|
@@ -204,6 +206,14 @@ def __init__( | |||||
def remote_resource(self): | ||||||
return self._get_remote_resource() | ||||||
|
||||||
def _get_comparable_configuration( | ||||||
self, | ||||||
) -> dict: # pragma: no cover | ||||||
if not self.was_created: | ||||||
raise NonExistingResourceError("Can't find a comparable configuration as the remote resource does not exists.") | ||||||
else: | ||||||
return copy.deepcopy(self.remote_resource) | ||||||
|
||||||
@property | ||||||
def was_created(self): | ||||||
return True if self.remote_resource else False | ||||||
|
@@ -224,13 +234,13 @@ def __getattr__(self, name: str) -> Any: | |||||
return self.local_configuration.get(name) | ||||||
raise AttributeError(f"{self.__class__.__name__}.{name} is invalid.") | ||||||
|
||||||
def _search(self) -> Union[SourceReadList, DestinationReadList]: | ||||||
def _search(self, check_return_type=True) -> Union[SourceReadList, DestinationReadList, ConnectionReadList]: | ||||||
"""Run search of a resources on the remote Airbyte instance. | ||||||
|
||||||
Returns: | ||||||
Union[SourceReadList, DestinationReadList]: Search results | ||||||
Union[SourceReadList, DestinationReadList, ConnectionReadList]: Search results | ||||||
""" | ||||||
return self._search_fn(self.api_instance, self.search_payload) | ||||||
return self._search_fn(self.api_instance, self.search_payload, _check_return_type=check_return_type) | ||||||
|
||||||
def _get_state_from_file(self) -> Optional[ResourceState]: | ||||||
"""Retrieve a state object from a local YAML file if it exists. | ||||||
|
@@ -242,14 +252,14 @@ def _get_state_from_file(self) -> Optional[ResourceState]: | |||||
if expected_state_path.is_file(): | ||||||
return ResourceState.from_file(expected_state_path) | ||||||
|
||||||
def _get_remote_resource(self) -> Optional[Union[SourceRead, DestinationRead]]: | ||||||
def _get_remote_resource(self) -> Optional[Union[SourceRead, DestinationRead, ConnectionRead]]: | ||||||
"""Find the remote resource on the Airbyte instance associated with the current resource. | ||||||
|
||||||
Raises: | ||||||
DuplicateResourceError: raised if the search results return multiple resources. | ||||||
|
||||||
Returns: | ||||||
Optional[Union[SourceRead, DestinationRead]]: The remote resource found. | ||||||
Optional[Union[SourceRead, DestinationRead, ConnectionRead]]: The remote resource found. | ||||||
""" | ||||||
search_results = self._search().get(f"{self.resource_type}s", []) | ||||||
if len(search_results) > 1: | ||||||
|
@@ -271,28 +281,33 @@ def get_diff_with_remote_resource(self) -> str: | |||||
if not self.was_created: | ||||||
raise NonExistingResourceError("Cannot compute diff with a non existing remote resource.") | ||||||
current_config = self.configuration | ||||||
remote_config = self.remote_resource.connection_configuration | ||||||
remote_config = self._get_comparable_configuration() | ||||||
diff = compute_diff(remote_config, current_config) | ||||||
return diff.pretty() | ||||||
|
||||||
def _create_or_update( | ||||||
self, operation_fn: Callable, payload: Union[SourceCreate, SourceUpdate, DestinationCreate, DestinationUpdate] | ||||||
self, | ||||||
operation_fn: Callable, | ||||||
payload: Union[SourceCreate, SourceUpdate, DestinationCreate, DestinationUpdate, ConnectionCreate, ConnectionUpdate], | ||||||
_check_return_type: bool = True, | ||||||
) -> Union[SourceRead, DestinationRead]: | ||||||
"""Wrapper to trigger create or update of remote resource. | ||||||
|
||||||
Args: | ||||||
operation_fn (Callable): The API function to run. | ||||||
payload (Union[SourceCreate, SourceUpdate, DestinationCreate, DestinationUpdate]): The payload to send to create or update the resource. | ||||||
|
||||||
Kwargs: | ||||||
_check_return_type (boolean): Whether to check the types returned in the API agains airbyte-api-client open api spec. | ||||||
Raises: | ||||||
InvalidConfigurationError: Raised if the create or update payload is invalid. | ||||||
ApiException: Raised in case of other API errors. | ||||||
|
||||||
Returns: | ||||||
Union[SourceRead, DestinationRead]: The created or updated resource. | ||||||
Union[SourceRead, DestinationRead, ConnectionRead]: The created or updated resource. | ||||||
""" | ||||||
try: | ||||||
result = operation_fn(self.api_instance, payload) | ||||||
result = operation_fn(self.api_instance, payload, _check_return_type=_check_return_type) | ||||||
return result, ResourceState.create(self.configuration_path, result[self.resource_id_field]) | ||||||
except airbyte_api_client.ApiException as api_error: | ||||||
if api_error.status == 422: | ||||||
|
@@ -302,19 +317,19 @@ def _create_or_update( | |||||
else: | ||||||
raise api_error | ||||||
|
||||||
def create(self) -> Union[SourceRead, DestinationRead]: | ||||||
def create(self) -> Union[SourceRead, DestinationRead, ConnectionRead]: | ||||||
"""Public function to create the resource on the remote Airbyte instance. | ||||||
|
||||||
Returns: | ||||||
Union[SourceRead, DestinationRead]: The created resource. | ||||||
Union[SourceRead, DestinationRead, ConnectionRead]: The created resource. | ||||||
""" | ||||||
return self._create_or_update(self._create_fn, self.create_payload) | ||||||
|
||||||
def update(self) -> Union[SourceRead, DestinationRead]: | ||||||
def update(self) -> Union[SourceRead, DestinationRead, ConnectionRead]: | ||||||
"""Public function to update the resource on the remote Airbyte instance. | ||||||
|
||||||
Returns: | ||||||
Union[SourceRead, DestinationRead]: The updated resource. | ||||||
Union[SourceRead, DestinationRead, ConnectionRead]: The updated resource. | ||||||
""" | ||||||
return self._create_or_update(self._update_fn, self.update_payload) | ||||||
|
||||||
|
@@ -325,12 +340,11 @@ def resource_id(self) -> Optional[str]: | |||||
Returns: | ||||||
str: Remote resource's UUID | ||||||
""" | ||||||
return self.remote_resource.get(self.resource_id_field) if self.was_created else None | ||||||
return self.state.resource_id if self.state is not None else None | ||||||
|
||||||
|
||||||
class Source(BaseResource): | ||||||
|
||||||
apply_priority = 0 | ||||||
api = source_api.SourceApi | ||||||
create_function_name = "create_source" | ||||||
resource_id_field = "source_id" | ||||||
|
@@ -357,6 +371,15 @@ def update_payload(self): | |||||
name=self.resource_name, | ||||||
) | ||||||
|
||||||
def _get_comparable_configuration(self): | ||||||
"""Get the object to which local configuration will be compared to. | ||||||
|
||||||
Returns: | ||||||
dict: Remote source configuration. | ||||||
""" | ||||||
comparable_configuration = super()._get_comparable_configuration() | ||||||
return comparable_configuration.connection_configuration | ||||||
|
||||||
@property | ||||||
def resource_id_request_body(self) -> SourceIdRequestBody: | ||||||
"""Creates SourceIdRequestBody from resource id. | ||||||
|
@@ -384,7 +407,6 @@ def catalog(self) -> AirbyteCatalog: | |||||
|
||||||
class Destination(BaseResource): | ||||||
|
||||||
apply_priority = 0 | ||||||
api = destination_api.DestinationApi | ||||||
create_function_name = "create_destination" | ||||||
resource_id_field = "destination_id" | ||||||
|
@@ -427,8 +449,97 @@ def update_payload(self) -> DestinationUpdate: | |||||
name=self.resource_name, | ||||||
) | ||||||
|
||||||
def _get_comparable_configuration(self): | ||||||
alafanechere marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
"""Get the object to which local configuration will be compared to. | ||||||
|
||||||
Returns: | ||||||
dict: Remote destination configuration. | ||||||
""" | ||||||
comparable_configuration = super()._get_comparable_configuration() | ||||||
return comparable_configuration.connection_configuration | ||||||
|
||||||
|
||||||
class Connection(BaseResource): | ||||||
APPLY_PRIORITY = 1 # Set to 1 to create connection after source or destination. | ||||||
api = connection_api.ConnectionApi | ||||||
create_function_name = "create_connection" | ||||||
resource_id_field = "connection_id" | ||||||
search_function_name = "search_connections" | ||||||
update_function_name = "update_connection" | ||||||
resource_type = "connection" | ||||||
|
||||||
@property | ||||||
def status(self) -> ConnectionStatus: | ||||||
return ConnectionStatus(self.local_configuration["configuration"]["status"]) | ||||||
|
||||||
@property | ||||||
def create_payload(self) -> ConnectionCreate: | ||||||
"""Defines the payload to create the remote connection. | ||||||
Disable snake case parameter usage with _spec_property_naming=True | ||||||
|
||||||
Returns: | ||||||
ConnectionCreate: The ConnectionCreate model instance | ||||||
""" | ||||||
return ConnectionCreate(**self.configuration, _check_type=False, _spec_property_naming=True) | ||||||
alafanechere marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
@property | ||||||
def search_payload(self) -> ConnectionSearch: | ||||||
"""Defines the payload to search the remote connection. Search by connection name if no state found, otherwise search by connection id found in the state. | ||||||
Returns: | ||||||
ConnectionSearch: The ConnectionSearch model instance | ||||||
""" | ||||||
if self.state is None: | ||||||
return ConnectionSearch( | ||||||
source_id=self.source_id, destination_id=self.destination_id, name=self.resource_name, status=self.status | ||||||
) | ||||||
else: | ||||||
return ConnectionSearch(connection_id=self.state.resource_id, source_id=self.source_id, destination_id=self.destination_id) | ||||||
|
||||||
@property | ||||||
def update_payload(self) -> ConnectionUpdate: | ||||||
"""Defines the payload to update a remote connection. | ||||||
|
||||||
Returns: | ||||||
ConnectionUpdate: The DestinationUpdate model instance. | ||||||
""" | ||||||
return ConnectionUpdate( | ||||||
connection_id=self.resource_id, | ||||||
sync_catalog=self.configuration["syncCatalog"], | ||||||
status=self.configuration["status"], | ||||||
namespace_definition=self.configuration["namespaceDefinition"], | ||||||
namespace_format=self.configuration["namespaceFormat"], | ||||||
prefix=self.configuration["prefix"], | ||||||
schedule=self.configuration["schedule"], | ||||||
resource_requirements=self.configuration["resourceRequirements"], | ||||||
_check_type=False, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made a bit of refacto to make it a bit more readable: 6ec2dd4 |
||||||
) | ||||||
|
||||||
def create(self) -> dict: | ||||||
return self._create_or_update( | ||||||
self._create_fn, self.create_payload, _check_return_type=False | ||||||
) # Disable check_return_type as the returned payload does not match the open api spec. | ||||||
|
||||||
def update(self) -> dict: | ||||||
return self._create_or_update( | ||||||
self._update_fn, self.update_payload, _check_return_type=False | ||||||
) # Disable check_return_type as the returned payload does not match the open api spec. | ||||||
|
||||||
def _search(self, check_return_type=True) -> dict: | ||||||
return self._search_fn(self.api_instance, self.search_payload, _check_return_type=False) | ||||||
|
||||||
def _get_comparable_configuration(self) -> dict: | ||||||
"""Get the object to which local configuration will be compared to. | ||||||
|
||||||
Returns: | ||||||
dict: Remote connection configuration. | ||||||
""" | ||||||
comparable_configuration = super()._get_comparable_configuration() | ||||||
comparable_configuration.pop("connectionId") | ||||||
comparable_configuration.pop("operationIds") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Today octavia-cli can't work with normalization right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exactly, I remove the |
||||||
return comparable_configuration | ||||||
|
||||||
|
||||||
def factory(api_client: airbyte_api_client.ApiClient, workspace_id: str, configuration_path: str) -> Union[Source, Destination]: | ||||||
def factory(api_client: airbyte_api_client.ApiClient, workspace_id: str, configuration_path: str) -> Union[Source, Destination, Connection]: | ||||||
"""Create resource object according to the definition type field in their YAML configuration. | ||||||
|
||||||
Args: | ||||||
|
@@ -440,13 +551,15 @@ def factory(api_client: airbyte_api_client.ApiClient, workspace_id: str, configu | |||||
NotImplementedError: Raised if the definition type found in the YAML is not a supported resource. | ||||||
|
||||||
Returns: | ||||||
Union[Source, Destination]: The resource object created from the YAML config. | ||||||
Union[Source, Destination, Connection]: The resource object created from the YAML config. | ||||||
""" | ||||||
with open(configuration_path, "r") as f: | ||||||
local_configuration = yaml.safe_load(f) | ||||||
if local_configuration["definition_type"] == "source": | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
wdyt creating a enum class for definitions? I think there is other places to be used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a future improvement we could make for this file but also in |
||||||
return Source(api_client, workspace_id, local_configuration, configuration_path) | ||||||
if local_configuration["definition_type"] == "destination": | ||||||
return Destination(api_client, workspace_id, local_configuration, configuration_path) | ||||||
if local_configuration["definition_type"] == "connection": | ||||||
return Connection(api_client, workspace_id, local_configuration, configuration_path) | ||||||
else: | ||||||
raise NotImplementedError(f"Resource {local_configuration['definition_type']} was not yet implemented") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
# | ||
# Copyright (c) 2021 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import yaml | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can't be done in BaseResource and only be overwritten by Connection object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I duplicated this on purpose as this is really specific to source and destination and not something common to the other potential resources. I was thinking of adding a parent class for
Source
andDestination
(SourceOrDestination(BaseResource)
🤔 ) but thought it was too much for this PR.