Skip to content

🐙 octavia-cli: apply connections #10881

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions octavia-cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ We welcome community contributions!

| Date | Milestone |
|------------|-------------------------------------|
| 2022-03-04 | Implement `octavia apply` for connections|
| 2022-03-02 | Implement `octavia apply` (sources and destination only)|
| 2022-02-06 | Implement `octavia generate` (sources and destination only)|
| 2022-01-25 | Implement `octavia init` + some context checks|
Expand Down
2 changes: 1 addition & 1 deletion octavia-cli/octavia_cli/apply/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def get_resources_to_apply(
List[BaseResource]: Resources sorted according to their apply priority.
"""
all_resources = [resource_factory(api_client, workspace_id, path) for path in configuration_files]
return sorted(all_resources, key=lambda resource: resource.apply_priority)
return sorted(all_resources, key=lambda resource: resource.APPLY_PRIORITY)


def apply_single_resource(resource: BaseResource, force: bool) -> None:
Expand Down
163 changes: 138 additions & 25 deletions octavia-cli/octavia_cli/apply/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@
#

import abc
import copy
import os
import time
from pathlib import Path
from typing import Any, Callable, Optional, Union

import airbyte_api_client
import yaml
from airbyte_api_client.api import destination_api, source_api
from airbyte_api_client.api import connection_api, destination_api, source_api
from airbyte_api_client.model.airbyte_catalog import AirbyteCatalog
from airbyte_api_client.model.connection_create import ConnectionCreate
from airbyte_api_client.model.connection_read import ConnectionRead
from airbyte_api_client.model.connection_read_list import ConnectionReadList
from airbyte_api_client.model.connection_search import ConnectionSearch
from airbyte_api_client.model.connection_status import ConnectionStatus
from airbyte_api_client.model.connection_update import ConnectionUpdate
from airbyte_api_client.model.destination_create import DestinationCreate
from airbyte_api_client.model.destination_read import DestinationRead
from airbyte_api_client.model.destination_read_list import DestinationReadList
Expand Down Expand Up @@ -107,12 +114,7 @@ def from_file(cls, file_path: str) -> "ResourceState":


class BaseResource(abc.ABC):
@property
@abc.abstractmethod
def apply_priority(
self,
): # pragma: no cover
pass
APPLY_PRIORITY = 0 # Priority of the resource during the apply. 0 means the resource is top priority.

@property
@abc.abstractmethod
Expand Down Expand Up @@ -204,6 +206,14 @@ def __init__(
def remote_resource(self):
return self._get_remote_resource()

def _get_comparable_configuration(
self,
) -> dict: # pragma: no cover
if not self.was_created:
raise NonExistingResourceError("Can't find a comparable configuration as the remote resource does not exists.")
else:
return copy.deepcopy(self.remote_resource)

@property
def was_created(self):
return True if self.remote_resource else False
Expand All @@ -224,13 +234,13 @@ def __getattr__(self, name: str) -> Any:
return self.local_configuration.get(name)
raise AttributeError(f"{self.__class__.__name__}.{name} is invalid.")

def _search(self) -> Union[SourceReadList, DestinationReadList]:
def _search(self, check_return_type=True) -> Union[SourceReadList, DestinationReadList, ConnectionReadList]:
"""Run search of a resources on the remote Airbyte instance.

Returns:
Union[SourceReadList, DestinationReadList]: Search results
Union[SourceReadList, DestinationReadList, ConnectionReadList]: Search results
"""
return self._search_fn(self.api_instance, self.search_payload)
return self._search_fn(self.api_instance, self.search_payload, _check_return_type=check_return_type)

def _get_state_from_file(self) -> Optional[ResourceState]:
"""Retrieve a state object from a local YAML file if it exists.
Expand All @@ -242,14 +252,14 @@ def _get_state_from_file(self) -> Optional[ResourceState]:
if expected_state_path.is_file():
return ResourceState.from_file(expected_state_path)

def _get_remote_resource(self) -> Optional[Union[SourceRead, DestinationRead]]:
def _get_remote_resource(self) -> Optional[Union[SourceRead, DestinationRead, ConnectionRead]]:
"""Find the remote resource on the Airbyte instance associated with the current resource.

Raises:
DuplicateResourceError: raised if the search results return multiple resources.

Returns:
Optional[Union[SourceRead, DestinationRead]]: The remote resource found.
Optional[Union[SourceRead, DestinationRead, ConnectionRead]]: The remote resource found.
"""
search_results = self._search().get(f"{self.resource_type}s", [])
if len(search_results) > 1:
Expand All @@ -271,28 +281,33 @@ def get_diff_with_remote_resource(self) -> str:
if not self.was_created:
raise NonExistingResourceError("Cannot compute diff with a non existing remote resource.")
current_config = self.configuration
remote_config = self.remote_resource.connection_configuration
remote_config = self._get_comparable_configuration()
diff = compute_diff(remote_config, current_config)
return diff.pretty()

def _create_or_update(
self, operation_fn: Callable, payload: Union[SourceCreate, SourceUpdate, DestinationCreate, DestinationUpdate]
self,
operation_fn: Callable,
payload: Union[SourceCreate, SourceUpdate, DestinationCreate, DestinationUpdate, ConnectionCreate, ConnectionUpdate],
_check_return_type: bool = True,
) -> Union[SourceRead, DestinationRead]:
"""Wrapper to trigger create or update of remote resource.

Args:
operation_fn (Callable): The API function to run.
payload (Union[SourceCreate, SourceUpdate, DestinationCreate, DestinationUpdate]): The payload to send to create or update the resource.

Kwargs:
_check_return_type (boolean): Whether to check the types returned in the API agains airbyte-api-client open api spec.
Raises:
InvalidConfigurationError: Raised if the create or update payload is invalid.
ApiException: Raised in case of other API errors.

Returns:
Union[SourceRead, DestinationRead]: The created or updated resource.
Union[SourceRead, DestinationRead, ConnectionRead]: The created or updated resource.
"""
try:
result = operation_fn(self.api_instance, payload)
result = operation_fn(self.api_instance, payload, _check_return_type=_check_return_type)
return result, ResourceState.create(self.configuration_path, result[self.resource_id_field])
except airbyte_api_client.ApiException as api_error:
if api_error.status == 422:
Expand All @@ -302,19 +317,19 @@ def _create_or_update(
else:
raise api_error

def create(self) -> Union[SourceRead, DestinationRead]:
def create(self) -> Union[SourceRead, DestinationRead, ConnectionRead]:
"""Public function to create the resource on the remote Airbyte instance.

Returns:
Union[SourceRead, DestinationRead]: The created resource.
Union[SourceRead, DestinationRead, ConnectionRead]: The created resource.
"""
return self._create_or_update(self._create_fn, self.create_payload)

def update(self) -> Union[SourceRead, DestinationRead]:
def update(self) -> Union[SourceRead, DestinationRead, ConnectionRead]:
"""Public function to update the resource on the remote Airbyte instance.

Returns:
Union[SourceRead, DestinationRead]: The updated resource.
Union[SourceRead, DestinationRead, ConnectionRead]: The updated resource.
"""
return self._create_or_update(self._update_fn, self.update_payload)

Expand All @@ -325,12 +340,11 @@ def resource_id(self) -> Optional[str]:
Returns:
str: Remote resource's UUID
"""
return self.remote_resource.get(self.resource_id_field) if self.was_created else None
return self.state.resource_id if self.state is not None else None


class Source(BaseResource):

apply_priority = 0
api = source_api.SourceApi
create_function_name = "create_source"
resource_id_field = "source_id"
Expand All @@ -357,6 +371,15 @@ def update_payload(self):
name=self.resource_name,
)

def _get_comparable_configuration(self):
"""Get the object to which local configuration will be compared to.

Returns:
dict: Remote source configuration.
"""
comparable_configuration = super()._get_comparable_configuration()
return comparable_configuration.connection_configuration
Comment on lines +374 to +381
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't be done in BaseResource and only be overwritten by Connection object?

Copy link
Contributor Author

@alafanechere alafanechere Mar 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I duplicated this on purpose as this is really specific to source and destination and not something common to the other potential resources. I was thinking of adding a parent class for Source and Destination (SourceOrDestination(BaseResource) 🤔 ) but thought it was too much for this PR.


@property
def resource_id_request_body(self) -> SourceIdRequestBody:
"""Creates SourceIdRequestBody from resource id.
Expand Down Expand Up @@ -384,7 +407,6 @@ def catalog(self) -> AirbyteCatalog:

class Destination(BaseResource):

apply_priority = 0
api = destination_api.DestinationApi
create_function_name = "create_destination"
resource_id_field = "destination_id"
Expand Down Expand Up @@ -427,8 +449,97 @@ def update_payload(self) -> DestinationUpdate:
name=self.resource_name,
)

def _get_comparable_configuration(self):
"""Get the object to which local configuration will be compared to.

Returns:
dict: Remote destination configuration.
"""
comparable_configuration = super()._get_comparable_configuration()
return comparable_configuration.connection_configuration


class Connection(BaseResource):
APPLY_PRIORITY = 1 # Set to 1 to create connection after source or destination.
api = connection_api.ConnectionApi
create_function_name = "create_connection"
resource_id_field = "connection_id"
search_function_name = "search_connections"
update_function_name = "update_connection"
resource_type = "connection"

@property
def status(self) -> ConnectionStatus:
return ConnectionStatus(self.local_configuration["configuration"]["status"])

@property
def create_payload(self) -> ConnectionCreate:
"""Defines the payload to create the remote connection.
Disable snake case parameter usage with _spec_property_naming=True

Returns:
ConnectionCreate: The ConnectionCreate model instance
"""
return ConnectionCreate(**self.configuration, _check_type=False, _spec_property_naming=True)

@property
def search_payload(self) -> ConnectionSearch:
"""Defines the payload to search the remote connection. Search by connection name if no state found, otherwise search by connection id found in the state.
Returns:
ConnectionSearch: The ConnectionSearch model instance
"""
if self.state is None:
return ConnectionSearch(
source_id=self.source_id, destination_id=self.destination_id, name=self.resource_name, status=self.status
)
else:
return ConnectionSearch(connection_id=self.state.resource_id, source_id=self.source_id, destination_id=self.destination_id)

@property
def update_payload(self) -> ConnectionUpdate:
"""Defines the payload to update a remote connection.

Returns:
ConnectionUpdate: The DestinationUpdate model instance.
"""
return ConnectionUpdate(
connection_id=self.resource_id,
sync_catalog=self.configuration["syncCatalog"],
status=self.configuration["status"],
namespace_definition=self.configuration["namespaceDefinition"],
namespace_format=self.configuration["namespaceFormat"],
prefix=self.configuration["prefix"],
schedule=self.configuration["schedule"],
resource_requirements=self.configuration["resourceRequirements"],
_check_type=False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_check_type=False,
_check_type=self.CHECK_RETURN_TYPE,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_check_type=False:We disable the validation of type at model instanciation.
_check_return_type=False:We disable the validation of type of the API response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a bit of refacto to make it a bit more readable: 6ec2dd4

)

def create(self) -> dict:
return self._create_or_update(
self._create_fn, self.create_payload, _check_return_type=False
) # Disable check_return_type as the returned payload does not match the open api spec.

def update(self) -> dict:
return self._create_or_update(
self._update_fn, self.update_payload, _check_return_type=False
) # Disable check_return_type as the returned payload does not match the open api spec.

def _search(self, check_return_type=True) -> dict:
return self._search_fn(self.api_instance, self.search_payload, _check_return_type=False)

def _get_comparable_configuration(self) -> dict:
"""Get the object to which local configuration will be compared to.

Returns:
dict: Remote connection configuration.
"""
comparable_configuration = super()._get_comparable_configuration()
comparable_configuration.pop("connectionId")
comparable_configuration.pop("operationIds")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today octavia-cli can't work with normalization right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, I remove the operationIds in case the user change the transformation in the UI, so we exclude operationIds from the diff computation.

return comparable_configuration


def factory(api_client: airbyte_api_client.ApiClient, workspace_id: str, configuration_path: str) -> Union[Source, Destination]:
def factory(api_client: airbyte_api_client.ApiClient, workspace_id: str, configuration_path: str) -> Union[Source, Destination, Connection]:
"""Create resource object according to the definition type field in their YAML configuration.

Args:
Expand All @@ -440,13 +551,15 @@ def factory(api_client: airbyte_api_client.ApiClient, workspace_id: str, configu
NotImplementedError: Raised if the definition type found in the YAML is not a supported resource.

Returns:
Union[Source, Destination]: The resource object created from the YAML config.
Union[Source, Destination, Connection]: The resource object created from the YAML config.
"""
with open(configuration_path, "r") as f:
local_configuration = yaml.safe_load(f)
if local_configuration["definition_type"] == "source":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if local_configuration["definition_type"] == "source":
if local_configuration["definition_type"] == DEFINITION_TYPE.source:

wdyt creating a enum class for definitions? I think there is other places to be used.
only small suggestion though

Copy link
Contributor Author

@alafanechere alafanechere Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a future improvement we could make for this file but also in octavia_cli/generate/definitions.py

return Source(api_client, workspace_id, local_configuration, configuration_path)
if local_configuration["definition_type"] == "destination":
return Destination(api_client, workspace_id, local_configuration, configuration_path)
if local_configuration["definition_type"] == "connection":
return Connection(api_client, workspace_id, local_configuration, configuration_path)
else:
raise NotImplementedError(f"Resource {local_configuration['definition_type']} was not yet implemented")
1 change: 1 addition & 0 deletions octavia-cli/octavia_cli/generate/yaml_dumpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import yaml


Expand Down
4 changes: 2 additions & 2 deletions octavia-cli/unit_tests/test_apply/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ def test_apply_with_custom_configuration_file_force(mocker, context_object):
def test_get_resource_to_apply(mocker, mock_api_client):
local_files_priorities = [("foo", 2), ("bar", 1)]
mock_resource_factory = mocker.Mock()
mock_resource_factory.side_effect = [mocker.Mock(apply_priority=priority) for _, priority in local_files_priorities]
mock_resource_factory.side_effect = [mocker.Mock(APPLY_PRIORITY=priority) for _, priority in local_files_priorities]
mocker.patch.object(commands, "resource_factory", mock_resource_factory)

resources_to_apply = commands.get_resources_to_apply([f[0] for f in local_files_priorities], mock_api_client, "workspace_id")
assert resources_to_apply == sorted(resources_to_apply, key=lambda r: r.apply_priority)
assert resources_to_apply == sorted(resources_to_apply, key=lambda r: r.APPLY_PRIORITY)
assert commands.resource_factory.call_count == len(local_files_priorities)
commands.resource_factory.assert_has_calls([mocker.call(mock_api_client, "workspace_id", path) for path, _ in local_files_priorities])

Expand Down
Loading