From d8da27edd03564f364490517b7c878208727e7e6 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Sat, 5 Mar 2022 23:40:34 +0100 Subject: [PATCH 1/6] add connection resource --- octavia-cli/octavia_cli/apply/resources.py | 108 +++++++++++++++--- .../unit_tests/test_apply/test_resources.py | 97 +++++++++++++++- 2 files changed, 191 insertions(+), 14 deletions(-) diff --git a/octavia-cli/octavia_cli/apply/resources.py b/octavia-cli/octavia_cli/apply/resources.py index 4efe338b535ef..dac445b68845f 100644 --- a/octavia-cli/octavia_cli/apply/resources.py +++ b/octavia-cli/octavia_cli/apply/resources.py @@ -10,7 +10,13 @@ import airbyte_api_client import yaml -from airbyte_api_client.api import destination_api, source_api +from airbyte_api_client.api import connection_api, destination_api, source_api +from airbyte_api_client.model.connection_create import ConnectionCreate +from airbyte_api_client.model.connection_read import ConnectionRead +from airbyte_api_client.model.connection_read_list import ConnectionReadList +from airbyte_api_client.model.connection_search import ConnectionSearch +from airbyte_api_client.model.connection_status import ConnectionStatus +from airbyte_api_client.model.connection_update import ConnectionUpdate from airbyte_api_client.model.destination_create import DestinationCreate from airbyte_api_client.model.destination_read import DestinationRead from airbyte_api_client.model.destination_read_list import DestinationReadList @@ -222,11 +228,11 @@ def __getattr__(self, name: str) -> Any: return self.local_configuration.get(name) raise AttributeError(f"{self.__class__.__name__}.{name} is invalid.") - def _search(self) -> Union[SourceReadList, DestinationReadList]: + def _search(self) -> Union[SourceReadList, DestinationReadList, ConnectionReadList]: """Run search of a resources on the remote Airbyte instance. Returns: - Union[SourceReadList, DestinationReadList]: Search results + Union[SourceReadList, DestinationReadList, ConnectionReadList]: Search results """ return self._search_fn(self.api_instance, self.search_payload) @@ -240,14 +246,14 @@ def _get_state_from_file(self) -> Optional[ResourceState]: if expected_state_path.is_file(): return ResourceState.from_file(expected_state_path) - def _get_remote_resource(self) -> Optional[Union[SourceRead, DestinationRead]]: + def _get_remote_resource(self) -> Optional[Union[SourceRead, DestinationRead, ConnectionRead]]: """Find the remote resource on the Airbyte instance associated with the current resource. Raises: DuplicateResourceError: raised if the search results return multiple resources. Returns: - Optional[Union[SourceRead, DestinationRead]]: The remote resource found. + Optional[Union[SourceRead, DestinationRead, ConnectionRead]]: The remote resource found. """ search_results = self._search().get(f"{self.resource_type}s", []) if len(search_results) > 1: @@ -274,7 +280,9 @@ def get_diff_with_remote_resource(self) -> str: return diff.pretty() def _create_or_update( - self, operation_fn: Callable, payload: Union[SourceCreate, SourceUpdate, DestinationCreate, DestinationUpdate] + self, + operation_fn: Callable, + payload: Union[SourceCreate, SourceUpdate, DestinationCreate, DestinationUpdate, ConnectionCreate, ConnectionUpdate], ) -> Union[SourceRead, DestinationRead]: """Wrapper to trigger create or update of remote resource. @@ -287,7 +295,7 @@ def _create_or_update( ApiException: Raised in case of other API errors. Returns: - Union[SourceRead, DestinationRead]: The created or updated resource. + Union[SourceRead, DestinationRead, ConnectionRead]: The created or updated resource. """ try: result = operation_fn(self.api_instance, payload) @@ -300,19 +308,19 @@ def _create_or_update( else: raise api_error - def create(self) -> Union[SourceRead, DestinationRead]: + def create(self) -> Union[SourceRead, DestinationRead, ConnectionRead]: """Public function to create the resource on the remote Airbyte instance. Returns: - Union[SourceRead, DestinationRead]: The created resource. + Union[SourceRead, DestinationRead, ConnectionRead]: The created resource. """ return self._create_or_update(self._create_fn, self.create_payload) - def update(self) -> Union[SourceRead, DestinationRead]: + def update(self) -> Union[SourceRead, DestinationRead, ConnectionRead]: """Public function to update the resource on the remote Airbyte instance. Returns: - Union[SourceRead, DestinationRead]: The updated resource. + Union[SourceRead, DestinationRead, ConnectionRead]: The updated resource. """ return self._create_or_update(self._update_fn, self.update_payload) @@ -402,7 +410,79 @@ def update_payload(self) -> DestinationUpdate: ) -def factory(api_client: airbyte_api_client.ApiClient, workspace_id: str, configuration_path: str) -> Union[Source, Destination]: +class Connection(BaseResource): + + apply_priority = 1 + api = connection_api.ConnectionApi + create_function_name = "create_connection" + resource_id_field = "connection_id" + search_function_name = "search_connections" + update_function_name = "update_connection" + resource_type = "connection" + + @property + def status(self) -> ConnectionStatus: + return ConnectionStatus(self.local_configuration["status"]) + + @property + def create_payload(self) -> ConnectionCreate: + """Defines the payload to create the remote connection. + + Returns: + ConnectionCreate: The ConnectionCreate model instance + """ + return ConnectionCreate( + self.source_id, + self.destination_id, + self.status, + name=self.resource_name, + namespace_definition=self.namespace_definition, + namespace_format=self.namespace_format, + prefix=self.prefix, + operations_ids=self.operation_ids, + sync_catalog=self.sync_catalog, + schedule=self.schedule, + resource_requirements=self.resource_requirements, + _check_type=False, + ) + + @property + def search_payload(self) -> ConnectionSearch: + """Defines the payload to search the remote connection. Search by connection name if no state found, otherwise search by connection id found in the state. + Returns: + ConnectionSearch: The ConnectionSearch model instance + """ + if self.state is None: + return ConnectionSearch( + source_id=self.source_id, destination_id=self.destination_id, name=self.resource_name, status=self.status + ) + else: + return ConnectionSearch(connection_id=self.state.resource_id, source_id=self.source_id, destination_id=self.destination_id) + + @property + def update_payload(self) -> ConnectionUpdate: + """Defines the payload to update a remote connection. + + Returns: + ConnectionUpdate: The DestinationUpdate model instance. + """ + return ConnectionUpdate( + self.resource_id, + self.sync_catalog, + self.status, + namespace_definition=self.namespace_definition, + namespace_format=self.namespace_format, + prefix=self.prefix, + source_id=self.source_id, + destination_id=self.destination_id, + operations_ids=self.operation_ids, + schedule=self.schedule, + resource_requirements=self.resource_requirements, + _check_type=False, + ) + + +def factory(api_client: airbyte_api_client.ApiClient, workspace_id: str, configuration_path: str) -> Union[Source, Destination, Connection]: """Create resource object according to the definition type field in their YAML configuration. Args: @@ -414,7 +494,7 @@ def factory(api_client: airbyte_api_client.ApiClient, workspace_id: str, configu NotImplementedError: Raised if the definition type found in the YAML is not a supported resource. Returns: - Union[Source, Destination]: The resource object created from the YAML config. + Union[Source, Destination, Connection]: The resource object created from the YAML config. """ with open(configuration_path, "r") as f: local_configuration = yaml.load(f, yaml.FullLoader) @@ -422,5 +502,7 @@ def factory(api_client: airbyte_api_client.ApiClient, workspace_id: str, configu return Source(api_client, workspace_id, local_configuration, configuration_path) if local_configuration["definition_type"] == "destination": return Destination(api_client, workspace_id, local_configuration, configuration_path) + if local_configuration["definition_type"] == "connection": + return Connection(api_client, workspace_id, local_configuration, configuration_path) else: raise NotImplementedError(f"Resource {local_configuration['definition_type']} was not yet implemented") diff --git a/octavia-cli/unit_tests/test_apply/test_resources.py b/octavia-cli/unit_tests/test_apply/test_resources.py index f4702715c9976..fbc40840e13cc 100644 --- a/octavia-cli/unit_tests/test_apply/test_resources.py +++ b/octavia-cli/unit_tests/test_apply/test_resources.py @@ -299,12 +299,107 @@ def test_init(self, mocker, mock_api_client, local_configuration, state): ) +class TestConnection: + @pytest.mark.parametrize( + "state", + [None, resources.ResourceState("config_path", "resource_id", 123, "abc")], + ) + def test_init(self, mocker, mock_api_client, state): + connection_configuration = { + "resource_name": "my_connection", + "namespace_definition": "customformat", + "namespace_format": "foo", + "prefix": "foo", + "source_id": "foo", + "destination_id": "foo", + "operation_ids": ["foo"], + "sync_catalog": { + "streams": [ + { + "stream": { + "name": "name_example", + "json_schema": {}, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": True, + "default_cursor_field": ["default_cursor_field"], + "source_defined_primary_key": [["string_example"]], + "namespace": "namespace_example", + }, + "config": { + "sync_mode": "incremental", + "cursor_field": ["cursor_field_example"], + "destination_sync_mode": "append_dedup", + "primary_key": [["string_example"]], + "alias_name": "alias_name_example", + "selected": True, + }, + } + ] + }, + "schedule": {"units": 1, "time_units": "days"}, + "status": "active", + "resource_requirements": {"cpu_request": "foo", "cpu_limit": "foo", "memory_request": "foo", "memory_limit": "foo"}, + } + assert resources.Connection.__base__ == resources.BaseResource + mocker.patch.object(resources.Connection, "resource_id", "foo") + connection = resources.Connection(mock_api_client, "workspace_id", connection_configuration, "bar.yaml") + mocker.patch.object(connection, "state", state) + assert connection.api == resources.connection_api.ConnectionApi + assert connection.create_function_name == "create_connection" + assert connection.resource_id_field == "connection_id" + assert connection.search_function_name == "search_connections" + assert connection.update_function_name == "update_connection" + assert connection.resource_type == "connection" + assert connection.apply_priority == 1 + + assert connection.create_payload == resources.ConnectionCreate( + connection.source_id, + connection.destination_id, + connection.status, + name=connection.resource_name, + namespace_definition=connection.namespace_definition, + namespace_format=connection.namespace_format, + prefix=connection.prefix, + operations_ids=connection.operation_ids, + sync_catalog=connection.sync_catalog, + schedule=connection.schedule, + resource_requirements=connection.resource_requirements, + _check_type=False, + ) + assert connection.update_payload == resources.ConnectionUpdate( + connection.resource_id, + connection.sync_catalog, + connection.status, + namespace_definition=connection.namespace_definition, + namespace_format=connection.namespace_format, + prefix=connection.prefix, + source_id=connection.source_id, + destination_id=connection.destination_id, + operations_ids=connection.operation_ids, + schedule=connection.schedule, + resource_requirements=connection.resource_requirements, + _check_type=False, + ) + if state is None: + assert connection.search_payload == resources.ConnectionSearch( + source_id=connection.source_id, + destination_id=connection.destination_id, + name=connection.resource_name, + status=connection.status, + ) + else: + assert connection.search_payload == resources.ConnectionSearch( + connection_id=connection.state.resource_id, source_id=connection.source_id, destination_id=connection.destination_id + ) + + @pytest.mark.parametrize( "local_configuration,resource_to_mock,expected_error", [ ({"definition_type": "source"}, "Source", None), ({"definition_type": "destination"}, "Destination", None), - ({"definition_type": "connection"}, None, NotImplementedError), + ({"definition_type": "connection"}, "Connection", None), + ({"definition_type": "not_existing"}, None, NotImplementedError), ], ) def test_factory(mocker, mock_api_client, local_configuration, resource_to_mock, expected_error): From c05c6844208fa6bd5690153b58220540d5d0cb3a Mon Sep 17 00:00:00 2001 From: alafanechere Date: Sat, 5 Mar 2022 23:49:38 +0100 Subject: [PATCH 2/6] update readme --- octavia-cli/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/octavia-cli/README.md b/octavia-cli/README.md index 26b2242a834b8..d8e7a8812a2b4 100644 --- a/octavia-cli/README.md +++ b/octavia-cli/README.md @@ -38,6 +38,7 @@ We welcome community contributions! | Date | Milestone | |------------|-------------------------------------| +| 2022-03-04 | Implement `octavia apply` for connections| | 2022-03-02 | Implement `octavia apply` (sources and destination only)| | 2022-02-06 | Implement `octavia generate` (sources and destination only)| | 2022-01-25 | Implement `octavia init` + some context checks| From 120563f875cbe6f0f62b0df9561e5e43d93525a3 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 8 Mar 2022 14:45:31 +0100 Subject: [PATCH 3/6] wip --- octavia-cli/octavia_cli/apply/resources.py | 25 +++++++++------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/octavia-cli/octavia_cli/apply/resources.py b/octavia-cli/octavia_cli/apply/resources.py index dac445b68845f..fffe390aefd3b 100644 --- a/octavia-cli/octavia_cli/apply/resources.py +++ b/octavia-cli/octavia_cli/apply/resources.py @@ -422,7 +422,7 @@ class Connection(BaseResource): @property def status(self) -> ConnectionStatus: - return ConnectionStatus(self.local_configuration["status"]) + return ConnectionStatus(self.local_configuration["configuration"]["status"]) @property def create_payload(self) -> ConnectionCreate: @@ -431,20 +431,7 @@ def create_payload(self) -> ConnectionCreate: Returns: ConnectionCreate: The ConnectionCreate model instance """ - return ConnectionCreate( - self.source_id, - self.destination_id, - self.status, - name=self.resource_name, - namespace_definition=self.namespace_definition, - namespace_format=self.namespace_format, - prefix=self.prefix, - operations_ids=self.operation_ids, - sync_catalog=self.sync_catalog, - schedule=self.schedule, - resource_requirements=self.resource_requirements, - _check_type=False, - ) + return ConnectionCreate(**self.configuration, _check_type=False) @property def search_payload(self) -> ConnectionSearch: @@ -459,6 +446,14 @@ def search_payload(self) -> ConnectionSearch: else: return ConnectionSearch(connection_id=self.state.resource_id, source_id=self.source_id, destination_id=self.destination_id) + def _search(self) -> Union[SourceReadList, DestinationReadList, ConnectionReadList]: + """Run search of a resources on the remote Airbyte instance. + + Returns: + Union[SourceReadList, DestinationReadList, ConnectionReadList]: Search results + """ + return self._search_fn(self.api_instance, self.search_payload, _check_return_type=False) + @property def update_payload(self) -> ConnectionUpdate: """Defines the payload to update a remote connection. From 0e67233f82da74545ea35a6958ceaccfa18a4d5c Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 8 Mar 2022 19:59:40 +0100 Subject: [PATCH 4/6] test --- octavia-cli/octavia_cli/apply/commands.py | 2 +- octavia-cli/octavia_cli/apply/resources.py | 90 +++++---- .../unit_tests/test_apply/test_commands.py | 4 +- .../unit_tests/test_apply/test_resources.py | 171 +++++++++++------- 4 files changed, 164 insertions(+), 103 deletions(-) diff --git a/octavia-cli/octavia_cli/apply/commands.py b/octavia-cli/octavia_cli/apply/commands.py index 49a175a8ac554..76c5dc82dcea5 100644 --- a/octavia-cli/octavia_cli/apply/commands.py +++ b/octavia-cli/octavia_cli/apply/commands.py @@ -42,7 +42,7 @@ def get_resources_to_apply( List[BaseResource]: Resources sorted according to their apply priority. """ all_resources = [resource_factory(api_client, workspace_id, path) for path in configuration_files] - return sorted(all_resources, key=lambda resource: resource.apply_priority) + return sorted(all_resources, key=lambda resource: resource.APPLY_PRIORITY) def apply_single_resource(resource: BaseResource, force: bool) -> None: diff --git a/octavia-cli/octavia_cli/apply/resources.py b/octavia-cli/octavia_cli/apply/resources.py index fffe390aefd3b..dead5794f7b13 100644 --- a/octavia-cli/octavia_cli/apply/resources.py +++ b/octavia-cli/octavia_cli/apply/resources.py @@ -3,6 +3,7 @@ # import abc +import copy import os import time from pathlib import Path @@ -111,12 +112,8 @@ def from_file(cls, file_path: str) -> "ResourceState": class BaseResource(abc.ABC): - @property - @abc.abstractmethod - def apply_priority( - self, - ): # pragma: no cover - pass + CHECK_RETURN_TYPE = True + APPLY_PRIORITY = 0 @property @abc.abstractmethod @@ -208,6 +205,14 @@ def __init__( def remote_resource(self): return self._get_remote_resource() + def _get_comparable_configuration( + self, + ): # pragma: no cover + if not self.was_created: + raise NonExistingResourceError("Can't find a comparable configuration as the remote resource does not exists.") + else: + return copy.deepcopy(self.remote_resource) + @property def was_created(self): return True if self.remote_resource else False @@ -234,7 +239,7 @@ def _search(self) -> Union[SourceReadList, DestinationReadList, ConnectionReadLi Returns: Union[SourceReadList, DestinationReadList, ConnectionReadList]: Search results """ - return self._search_fn(self.api_instance, self.search_payload) + return self._search_fn(self.api_instance, self.search_payload, _check_return_type=self.CHECK_RETURN_TYPE) def _get_state_from_file(self) -> Optional[ResourceState]: """Retrieve a state object from a local YAML file if it exists. @@ -275,7 +280,7 @@ def get_diff_with_remote_resource(self) -> str: if not self.was_created: raise NonExistingResourceError("Cannot compute diff with a non existing remote resource.") current_config = self.configuration - remote_config = self.remote_resource.connection_configuration + remote_config = self._get_comparable_configuration() diff = compute_diff(remote_config, current_config) return diff.pretty() @@ -298,7 +303,7 @@ def _create_or_update( Union[SourceRead, DestinationRead, ConnectionRead]: The created or updated resource. """ try: - result = operation_fn(self.api_instance, payload) + result = operation_fn(self.api_instance, payload, _check_return_type=self.CHECK_RETURN_TYPE) return result, ResourceState.create(self.configuration_path, result[self.resource_id_field]) except airbyte_api_client.ApiException as api_error: if api_error.status == 422: @@ -331,12 +336,11 @@ def resource_id(self) -> Optional[str]: Returns: str: Remote resource's UUID """ - return self.remote_resource.get(self.resource_id_field) if self.was_created else None + return self.state.resource_id if self.state is not None else None class Source(BaseResource): - apply_priority = 0 api = source_api.SourceApi create_function_name = "create_source" resource_id_field = "source_id" @@ -363,10 +367,18 @@ def update_payload(self): name=self.resource_name, ) + def _get_comparable_configuration(self): + """Get the object to which local configuration will be compared to. + + Returns: + dict: Remote source configuration. + """ + comparable_configuration = super()._get_comparable_configuration() + return comparable_configuration.connection_configuration + class Destination(BaseResource): - apply_priority = 0 api = destination_api.DestinationApi create_function_name = "create_destination" resource_id_field = "destination_id" @@ -409,10 +421,19 @@ def update_payload(self) -> DestinationUpdate: name=self.resource_name, ) + def _get_comparable_configuration(self): + """Get the object to which local configuration will be compared to. -class Connection(BaseResource): + Returns: + dict: Remote destination configuration. + """ + comparable_configuration = super()._get_comparable_configuration() + return comparable_configuration.connection_configuration - apply_priority = 1 + +class Connection(BaseResource): + CHECK_RETURN_TYPE = False + APPLY_PRIORITY = 1 api = connection_api.ConnectionApi create_function_name = "create_connection" resource_id_field = "connection_id" @@ -427,11 +448,12 @@ def status(self) -> ConnectionStatus: @property def create_payload(self) -> ConnectionCreate: """Defines the payload to create the remote connection. + Disable snake case parameter usage with _spec_property_naming=True Returns: ConnectionCreate: The ConnectionCreate model instance """ - return ConnectionCreate(**self.configuration, _check_type=False) + return ConnectionCreate(**self.configuration, _check_type=False, _spec_property_naming=True) @property def search_payload(self) -> ConnectionSearch: @@ -446,14 +468,6 @@ def search_payload(self) -> ConnectionSearch: else: return ConnectionSearch(connection_id=self.state.resource_id, source_id=self.source_id, destination_id=self.destination_id) - def _search(self) -> Union[SourceReadList, DestinationReadList, ConnectionReadList]: - """Run search of a resources on the remote Airbyte instance. - - Returns: - Union[SourceReadList, DestinationReadList, ConnectionReadList]: Search results - """ - return self._search_fn(self.api_instance, self.search_payload, _check_return_type=False) - @property def update_payload(self) -> ConnectionUpdate: """Defines the payload to update a remote connection. @@ -462,20 +476,28 @@ def update_payload(self) -> ConnectionUpdate: ConnectionUpdate: The DestinationUpdate model instance. """ return ConnectionUpdate( - self.resource_id, - self.sync_catalog, - self.status, - namespace_definition=self.namespace_definition, - namespace_format=self.namespace_format, - prefix=self.prefix, - source_id=self.source_id, - destination_id=self.destination_id, - operations_ids=self.operation_ids, - schedule=self.schedule, - resource_requirements=self.resource_requirements, + connection_id=self.resource_id, + sync_catalog=self.configuration["syncCatalog"], + status=self.configuration["status"], + namespace_definition=self.configuration["namespaceDefinition"], + namespace_format=self.configuration["namespaceFormat"], + prefix=self.configuration["prefix"], + schedule=self.configuration["schedule"], + resource_requirements=self.configuration["resourceRequirements"], _check_type=False, ) + def _get_comparable_configuration(self): + """Get the object to which local configuration will be compared to. + + Returns: + dict: Remote connection configuration. + """ + comparable_configuration = super()._get_comparable_configuration() + comparable_configuration.pop("connectionId") + comparable_configuration.pop("operationIds") + return comparable_configuration + def factory(api_client: airbyte_api_client.ApiClient, workspace_id: str, configuration_path: str) -> Union[Source, Destination, Connection]: """Create resource object according to the definition type field in their YAML configuration. diff --git a/octavia-cli/unit_tests/test_apply/test_commands.py b/octavia-cli/unit_tests/test_apply/test_commands.py index 434be4b7cad7a..81708777d79fb 100644 --- a/octavia-cli/unit_tests/test_apply/test_commands.py +++ b/octavia-cli/unit_tests/test_apply/test_commands.py @@ -61,11 +61,11 @@ def test_apply_with_custom_configuration_file_force(mocker, context_object): def test_get_resource_to_apply(mocker, mock_api_client): local_files_priorities = [("foo", 2), ("bar", 1)] mock_resource_factory = mocker.Mock() - mock_resource_factory.side_effect = [mocker.Mock(apply_priority=priority) for _, priority in local_files_priorities] + mock_resource_factory.side_effect = [mocker.Mock(APPLY_PRIORITY=priority) for _, priority in local_files_priorities] mocker.patch.object(commands, "resource_factory", mock_resource_factory) resources_to_apply = commands.get_resources_to_apply([f[0] for f in local_files_priorities], mock_api_client, "workspace_id") - assert resources_to_apply == sorted(resources_to_apply, key=lambda r: r.apply_priority) + assert resources_to_apply == sorted(resources_to_apply, key=lambda r: r.APPLY_PRIORITY) assert commands.resource_factory.call_count == len(local_files_priorities) commands.resource_factory.assert_has_calls([mocker.call(mock_api_client, "workspace_id", path) for path, _ in local_files_priorities]) diff --git a/octavia-cli/unit_tests/test_apply/test_resources.py b/octavia-cli/unit_tests/test_apply/test_resources.py index fbc40840e13cc..fe803db63c5da 100644 --- a/octavia-cli/unit_tests/test_apply/test_resources.py +++ b/octavia-cli/unit_tests/test_apply/test_resources.py @@ -95,6 +95,8 @@ def test_init_no_remote_resource(self, mocker, patch_base_class, mock_api_client mocker.patch.object(resources.BaseResource, "_get_remote_resource", mocker.Mock(return_value=False)) mocker.patch.object(resources, "compute_checksum") resource = resources.BaseResource(mock_api_client, "workspace_id", local_configuration, "bar.yaml") + assert resource.CHECK_RETURN_TYPE is True + assert resource.APPLY_PRIORITY == 0 assert resource.workspace_id == "workspace_id" assert resource.local_configuration == local_configuration assert resource.configuration_path == "bar.yaml" @@ -116,7 +118,7 @@ def test_init_with_remote_resource_not_changed(self, mocker, patch_base_class, m resource = resources.BaseResource(mock_api_client, "workspace_id", local_configuration, "bar.yaml") assert resource.was_created is True assert resource.local_file_changed is False - assert resource.resource_id == "my_resource_id" + assert resource.resource_id == resource.state.resource_id def test_init_with_remote_resource_changed(self, mocker, patch_base_class, mock_api_client, local_configuration): mocker.patch.object( @@ -129,7 +131,7 @@ def test_init_with_remote_resource_changed(self, mocker, patch_base_class, mock_ resource = resources.BaseResource(mock_api_client, "workspace_id", local_configuration, "bar.yaml") assert resource.was_created is True assert resource.local_file_changed is True - assert resource.resource_id == "my_resource_id" + assert resource.resource_id == resource.state.resource_id @pytest.fixture def resource(self, patch_base_class, mock_api_client, local_configuration): @@ -143,7 +145,9 @@ def test_get_attr(self, resource, local_configuration): def test_search(self, resource): search_results = resource._search() assert search_results == resource._search_fn.return_value - resource._search_fn.assert_called_with(resource.api_instance, resource.search_payload) + resource._search_fn.assert_called_with( + resource.api_instance, resource.search_payload, _check_return_type=resource.CHECK_RETURN_TYPE + ) @pytest.mark.parametrize( "search_results,expected_error,expected_output", @@ -187,14 +191,14 @@ def test_get_state_from_file(self, mocker, resource, state_path_is_file): "was_created", [True, False], ) - def test_get_diff_with_remote_resource(self, mocker, resource, was_created): - - mock_remote_resource = mocker.Mock(return_value=None if not was_created else mocker.Mock()) - mocker.patch.object(resource, "_get_remote_resource", mock_remote_resource) + def test_get_diff_with_remote_resource(self, patch_base_class, mocker, mock_api_client, local_configuration, was_created): + mocker.patch.object(resources.BaseResource, "_get_comparable_configuration") + mocker.patch.object(resources.BaseResource, "was_created", was_created) + resource = resources.BaseResource(mock_api_client, "workspace_id", local_configuration, "bar.yaml") mocker.patch.object(resources, "compute_diff") if was_created: diff = resource.get_diff_with_remote_resource() - resources.compute_diff.assert_called_with(resource.remote_resource.connection_configuration, resource.configuration) + resources.compute_diff.assert_called_with(resource._get_comparable_configuration.return_value, resource.configuration) assert diff == resources.compute_diff.return_value.pretty.return_value else: with pytest.raises(resources.NonExistingResourceError): @@ -230,6 +234,20 @@ def test_update(self, mocker, resource): assert resource.update() == resource._create_or_update.return_value resource._create_or_update.assert_called_with(resource._update_fn, resource.update_payload) + @pytest.mark.parametrize( + "was_created", + [True, False], + ) + def test_get_comparable_configuration(self, patch_base_class, mocker, mock_api_client, local_configuration, was_created): + mocker.patch.object(resources.BaseResource, "was_created", was_created) + mocker.patch.object(resources.BaseResource, "remote_resource") + resource = resources.BaseResource(mock_api_client, "workspace_id", local_configuration, "bar.yaml") + if not was_created: + with pytest.raises(resources.NonExistingResourceError): + resource._get_comparable_configuration() + else: + assert resource._get_comparable_configuration() == resource.remote_resource + class TestSource: @pytest.mark.parametrize( @@ -247,7 +265,7 @@ def test_init(self, mocker, mock_api_client, local_configuration, state): assert source.search_function_name == "search_sources" assert source.update_function_name == "update_source" assert source.resource_type == "source" - assert source.apply_priority == 0 + assert source.APPLY_PRIORITY == 0 assert source.create_payload == resources.SourceCreate( source.definition_id, source.configuration, source.workspace_id, source.resource_name ) @@ -263,6 +281,15 @@ def test_init(self, mocker, mock_api_client, local_configuration, state): source_definition_id=source.definition_id, workspace_id=source.workspace_id, source_id=source.state.resource_id ) + def test_get_comparable_configuration(self, mocker, mock_api_client, local_configuration): + mock_base_comparable_configuration = mocker.Mock() + mocker.patch.object(resources.BaseResource, "_get_comparable_configuration", mock_base_comparable_configuration) + mocker.patch.object(resources.Source, "was_created", True) + mocker.patch.object(resources.Source, "remote_resource") + + resource = resources.Source(mock_api_client, "workspace_id", local_configuration, "bar.yaml") + assert resource._get_comparable_configuration() == mock_base_comparable_configuration.return_value.connection_configuration + class TestDestination: @pytest.mark.parametrize( @@ -280,7 +307,7 @@ def test_init(self, mocker, mock_api_client, local_configuration, state): assert destination.search_function_name == "search_destinations" assert destination.update_function_name == "update_destination" assert destination.resource_type == "destination" - assert destination.apply_priority == 0 + assert destination.APPLY_PRIORITY == 0 assert destination.create_payload == resources.DestinationCreate( destination.workspace_id, destination.resource_name, destination.definition_id, destination.configuration ) @@ -298,6 +325,15 @@ def test_init(self, mocker, mock_api_client, local_configuration, state): destination_id=destination.state.resource_id, ) + def test_get_comparable_configuration(self, mocker, mock_api_client, local_configuration): + mock_base_comparable_configuration = mocker.Mock() + mocker.patch.object(resources.BaseResource, "_get_comparable_configuration", mock_base_comparable_configuration) + mocker.patch.object(resources.Destination, "was_created", True) + mocker.patch.object(resources.Destination, "remote_resource") + + resource = resources.Destination(mock_api_client, "workspace_id", local_configuration, "bar.yaml") + assert resource._get_comparable_configuration() == mock_base_comparable_configuration.return_value.connection_configuration + class TestConnection: @pytest.mark.parametrize( @@ -306,39 +342,43 @@ class TestConnection: ) def test_init(self, mocker, mock_api_client, state): connection_configuration = { + "definition_type": "connection", "resource_name": "my_connection", - "namespace_definition": "customformat", - "namespace_format": "foo", - "prefix": "foo", - "source_id": "foo", - "destination_id": "foo", - "operation_ids": ["foo"], - "sync_catalog": { - "streams": [ - { - "stream": { - "name": "name_example", - "json_schema": {}, - "supported_sync_modes": ["incremental"], - "source_defined_cursor": True, - "default_cursor_field": ["default_cursor_field"], - "source_defined_primary_key": [["string_example"]], - "namespace": "namespace_example", - }, - "config": { - "sync_mode": "incremental", - "cursor_field": ["cursor_field_example"], - "destination_sync_mode": "append_dedup", - "primary_key": [["string_example"]], - "alias_name": "alias_name_example", - "selected": True, - }, - } - ] + "source_id": "my_source", + "destination_id": "my_destination", + "configuration": { + "sourceId": "my_source", + "destinationId": "my_destination", + "namespaceDefinition": "customformat", + "namespaceFormat": "foo", + "prefix": "foo", + "syncCatalog": { + "streams": [ + { + "stream": { + "name": "name_example", + "jsonSchema": {}, + "supportedSyncModes": ["incremental"], + "sourceDefinedCursor": True, + "defaultCursorField": ["default_cursor_field"], + "sourceDefinedPrimary_key": [["string_example"]], + "namespace": "namespace_example", + }, + "config": { + "syncMode": "incremental", + "cursorField": ["cursor_field_example"], + "destinationSyncMode": "append_dedup", + "primaryKey": [["string_example"]], + "aliasName": "alias_name_example", + "selected": True, + }, + } + ] + }, + "schedule": {"units": 1, "time_units": "days"}, + "status": "active", + "resourceRequirements": {"cpu_request": "foo", "cpu_limit": "foo", "memory_request": "foo", "memory_limit": "foo"}, }, - "schedule": {"units": 1, "time_units": "days"}, - "status": "active", - "resource_requirements": {"cpu_request": "foo", "cpu_limit": "foo", "memory_request": "foo", "memory_limit": "foo"}, } assert resources.Connection.__base__ == resources.BaseResource mocker.patch.object(resources.Connection, "resource_id", "foo") @@ -350,34 +390,20 @@ def test_init(self, mocker, mock_api_client, state): assert connection.search_function_name == "search_connections" assert connection.update_function_name == "update_connection" assert connection.resource_type == "connection" - assert connection.apply_priority == 1 + assert connection.APPLY_PRIORITY == 1 assert connection.create_payload == resources.ConnectionCreate( - connection.source_id, - connection.destination_id, - connection.status, - name=connection.resource_name, - namespace_definition=connection.namespace_definition, - namespace_format=connection.namespace_format, - prefix=connection.prefix, - operations_ids=connection.operation_ids, - sync_catalog=connection.sync_catalog, - schedule=connection.schedule, - resource_requirements=connection.resource_requirements, - _check_type=False, + **connection.configuration, _check_type=False, _spec_property_naming=True ) assert connection.update_payload == resources.ConnectionUpdate( - connection.resource_id, - connection.sync_catalog, - connection.status, - namespace_definition=connection.namespace_definition, - namespace_format=connection.namespace_format, - prefix=connection.prefix, - source_id=connection.source_id, - destination_id=connection.destination_id, - operations_ids=connection.operation_ids, - schedule=connection.schedule, - resource_requirements=connection.resource_requirements, + connection_id=connection.resource_id, + sync_catalog=connection.configuration["syncCatalog"], + status=connection.configuration["status"], + namespace_definition=connection.configuration["namespaceDefinition"], + namespace_format=connection.configuration["namespaceFormat"], + prefix=connection.configuration["prefix"], + schedule=connection.configuration["schedule"], + resource_requirements=connection.configuration["resourceRequirements"], _check_type=False, ) if state is None: @@ -385,13 +411,26 @@ def test_init(self, mocker, mock_api_client, state): source_id=connection.source_id, destination_id=connection.destination_id, name=connection.resource_name, - status=connection.status, + status=resources.ConnectionStatus( + connection_configuration["configuration"]["status"], + ), ) else: assert connection.search_payload == resources.ConnectionSearch( connection_id=connection.state.resource_id, source_id=connection.source_id, destination_id=connection.destination_id ) + def test_get_comparable_configuration(self, mocker, mock_api_client, local_configuration): + mock_base_comparable_configuration = mocker.Mock( + return_value={"foo": "bar", "connectionId": "should be popped", "operationIds": "should be popped"} + ) + mocker.patch.object(resources.BaseResource, "_get_comparable_configuration", mock_base_comparable_configuration) + mocker.patch.object(resources.Connection, "was_created", True) + mocker.patch.object(resources.Connection, "remote_resource") + + resource = resources.Connection(mock_api_client, "workspace_id", local_configuration, "bar.yaml") + assert resource._get_comparable_configuration() == {"foo": "bar"} + @pytest.mark.parametrize( "local_configuration,resource_to_mock,expected_error", From 6ec2dd402a020a45ab8b6539ff3d5bea05b8806b Mon Sep 17 00:00:00 2001 From: alafanechere Date: Wed, 9 Mar 2022 15:06:34 +0100 Subject: [PATCH 5/6] more readbable implementation for _check_return_type --- octavia-cli/octavia_cli/apply/resources.py | 38 +++++++++++++--- .../unit_tests/test_apply/test_resources.py | 45 ++++++++++++++----- 2 files changed, 64 insertions(+), 19 deletions(-) diff --git a/octavia-cli/octavia_cli/apply/resources.py b/octavia-cli/octavia_cli/apply/resources.py index 308b8e053edee..84fd3860c0853 100644 --- a/octavia-cli/octavia_cli/apply/resources.py +++ b/octavia-cli/octavia_cli/apply/resources.py @@ -114,8 +114,7 @@ def from_file(cls, file_path: str) -> "ResourceState": class BaseResource(abc.ABC): - CHECK_RETURN_TYPE = True - APPLY_PRIORITY = 0 + APPLY_PRIORITY = 0 # Priority of the resource during the apply. 0 means the resource is top priority. @property @abc.abstractmethod @@ -235,13 +234,13 @@ def __getattr__(self, name: str) -> Any: return self.local_configuration.get(name) raise AttributeError(f"{self.__class__.__name__}.{name} is invalid.") - def _search(self) -> Union[SourceReadList, DestinationReadList, ConnectionReadList]: + def _search(self, check_return_type=True) -> Union[SourceReadList, DestinationReadList, ConnectionReadList]: """Run search of a resources on the remote Airbyte instance. Returns: Union[SourceReadList, DestinationReadList, ConnectionReadList]: Search results """ - return self._search_fn(self.api_instance, self.search_payload, _check_return_type=self.CHECK_RETURN_TYPE) + return self._search_fn(self.api_instance, self.search_payload, _check_return_type=check_return_type) def _get_state_from_file(self) -> Optional[ResourceState]: """Retrieve a state object from a local YAML file if it exists. @@ -290,6 +289,7 @@ def _create_or_update( self, operation_fn: Callable, payload: Union[SourceCreate, SourceUpdate, DestinationCreate, DestinationUpdate, ConnectionCreate, ConnectionUpdate], + _check_return_type: bool = True, ) -> Union[SourceRead, DestinationRead]: """Wrapper to trigger create or update of remote resource. @@ -297,6 +297,8 @@ def _create_or_update( operation_fn (Callable): The API function to run. payload (Union[SourceCreate, SourceUpdate, DestinationCreate, DestinationUpdate]): The payload to send to create or update the resource. + Kwargs: + _check_return_type (boolean): Whether to check the types returned in the API agains airbyte-api-client open api spec. Raises: InvalidConfigurationError: Raised if the create or update payload is invalid. ApiException: Raised in case of other API errors. @@ -305,7 +307,7 @@ def _create_or_update( Union[SourceRead, DestinationRead, ConnectionRead]: The created or updated resource. """ try: - result = operation_fn(self.api_instance, payload, _check_return_type=self.CHECK_RETURN_TYPE) + result = operation_fn(self.api_instance, payload, _check_return_type=_check_return_type) return result, ResourceState.create(self.configuration_path, result[self.resource_id_field]) except airbyte_api_client.ApiException as api_error: if api_error.status == 422: @@ -458,8 +460,7 @@ def _get_comparable_configuration(self): class Connection(BaseResource): - CHECK_RETURN_TYPE = False - APPLY_PRIORITY = 1 + APPLY_PRIORITY = 1 # Set to 1 to create connection after source or destination. api = connection_api.ConnectionApi create_function_name = "create_connection" resource_id_field = "connection_id" @@ -513,6 +514,29 @@ def update_payload(self) -> ConnectionUpdate: _check_type=False, ) + def create(self) -> dict: + return self._create_or_update( + self._create_fn, self.create_payload, _check_return_type=False + ) # Disable check_return_type as the returned payload does not match the open api spec. + + def update(self) -> dict: + """Public function to update the resource on the remote Airbyte instance. + + Returns: + Union[SourceRead, DestinationRead, ConnectionRead]: The updated resource. + """ + return self._create_or_update( + self._update_fn, self.update_payload, _check_return_type=False + ) # Disable check_return_type as the returned payload does not match the open api spec. + + def _search(self, check_return_type=True) -> dict: + """Run search of a resources on the remote Airbyte instance. + + Returns: + Union[SourceReadList, DestinationReadList, ConnectionReadList]: Search results + """ + return self._search_fn(self.api_instance, self.search_payload, _check_return_type=False) + def _get_comparable_configuration(self): """Get the object to which local configuration will be compared to. diff --git a/octavia-cli/unit_tests/test_apply/test_resources.py b/octavia-cli/unit_tests/test_apply/test_resources.py index bc052e3e779a9..ea1dcadb11167 100644 --- a/octavia-cli/unit_tests/test_apply/test_resources.py +++ b/octavia-cli/unit_tests/test_apply/test_resources.py @@ -95,7 +95,6 @@ def test_init_no_remote_resource(self, mocker, patch_base_class, mock_api_client mocker.patch.object(resources.BaseResource, "_get_remote_resource", mocker.Mock(return_value=False)) mocker.patch.object(resources, "compute_checksum") resource = resources.BaseResource(mock_api_client, "workspace_id", local_configuration, "bar.yaml") - assert resource.CHECK_RETURN_TYPE is True assert resource.APPLY_PRIORITY == 0 assert resource.workspace_id == "workspace_id" assert resource.local_configuration == local_configuration @@ -145,9 +144,7 @@ def test_get_attr(self, resource, local_configuration): def test_search(self, resource): search_results = resource._search() assert search_results == resource._search_fn.return_value - resource._search_fn.assert_called_with( - resource.api_instance, resource.search_payload, _check_return_type=resource.CHECK_RETURN_TYPE - ) + resource._search_fn.assert_called_with(resource.api_instance, resource.search_payload, _check_return_type=True) @pytest.mark.parametrize( "search_results,expected_error,expected_output", @@ -363,12 +360,9 @@ def test_get_comparable_configuration(self, mocker, mock_api_client, local_confi class TestConnection: - @pytest.mark.parametrize( - "state", - [None, resources.ResourceState("config_path", "resource_id", 123, "abc")], - ) - def test_init(self, mocker, mock_api_client, state): - connection_configuration = { + @pytest.fixture + def connection_configuration(self): + return { "definition_type": "connection", "resource_name": "my_connection", "source_id": "my_source", @@ -407,6 +401,12 @@ def test_init(self, mocker, mock_api_client, state): "resourceRequirements": {"cpu_request": "foo", "cpu_limit": "foo", "memory_request": "foo", "memory_limit": "foo"}, }, } + + @pytest.mark.parametrize( + "state", + [None, resources.ResourceState("config_path", "resource_id", 123, "abc")], + ) + def test_init(self, mocker, mock_api_client, state, connection_configuration): assert resources.Connection.__base__ == resources.BaseResource mocker.patch.object(resources.Connection, "resource_id", "foo") connection = resources.Connection(mock_api_client, "workspace_id", connection_configuration, "bar.yaml") @@ -447,7 +447,7 @@ def test_init(self, mocker, mock_api_client, state): connection_id=connection.state.resource_id, source_id=connection.source_id, destination_id=connection.destination_id ) - def test_get_comparable_configuration(self, mocker, mock_api_client, local_configuration): + def test_get_comparable_configuration(self, mocker, mock_api_client, connection_configuration): mock_base_comparable_configuration = mocker.Mock( return_value={"foo": "bar", "connectionId": "should be popped", "operationIds": "should be popped"} ) @@ -455,9 +455,30 @@ def test_get_comparable_configuration(self, mocker, mock_api_client, local_confi mocker.patch.object(resources.Connection, "was_created", True) mocker.patch.object(resources.Connection, "remote_resource") - resource = resources.Connection(mock_api_client, "workspace_id", local_configuration, "bar.yaml") + resource = resources.Connection(mock_api_client, "workspace_id", connection_configuration, "bar.yaml") assert resource._get_comparable_configuration() == {"foo": "bar"} + def test__search(self, mocker, mock_api_client, connection_configuration): + resource = resources.Connection(mock_api_client, "workspace_id", connection_configuration, "bar.yaml") + mocker.patch.object(resource, "_search_fn") + search_results = resource._search() + assert search_results == resource._search_fn.return_value + resource._search_fn.assert_called_with(resource.api_instance, resource.search_payload, _check_return_type=False) + + def test_create(self, mocker, mock_api_client, connection_configuration): + mocker.patch.object(resources.Connection, "_create_or_update") + resource = resources.Connection(mock_api_client, "workspace_id", connection_configuration, "bar.yaml") + create_result = resource.create() + assert create_result == resource._create_or_update.return_value + resource._create_or_update.assert_called_with(resource._create_fn, resource.create_payload, _check_return_type=False) + + def test_update(self, mocker, mock_api_client, connection_configuration): + mocker.patch.object(resources.Connection, "_create_or_update") + resource = resources.Connection(mock_api_client, "workspace_id", connection_configuration, "bar.yaml") + update_result = resource.update() + assert update_result == resource._create_or_update.return_value + resource._create_or_update.assert_called_with(resource._update_fn, resource.update_payload, _check_return_type=False) + @pytest.mark.parametrize( "local_configuration,resource_to_mock,expected_error", From f2f64452a9a1bc99c25d5aa5c5a67f17186b0a6a Mon Sep 17 00:00:00 2001 From: alafanechere Date: Wed, 9 Mar 2022 15:12:56 +0100 Subject: [PATCH 6/6] more readbable implementation for _check_return_type --- octavia-cli/octavia_cli/apply/resources.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/octavia-cli/octavia_cli/apply/resources.py b/octavia-cli/octavia_cli/apply/resources.py index 84fd3860c0853..196317adb6388 100644 --- a/octavia-cli/octavia_cli/apply/resources.py +++ b/octavia-cli/octavia_cli/apply/resources.py @@ -208,7 +208,7 @@ def remote_resource(self): def _get_comparable_configuration( self, - ): # pragma: no cover + ) -> dict: # pragma: no cover if not self.was_created: raise NonExistingResourceError("Can't find a comparable configuration as the remote resource does not exists.") else: @@ -520,24 +520,14 @@ def create(self) -> dict: ) # Disable check_return_type as the returned payload does not match the open api spec. def update(self) -> dict: - """Public function to update the resource on the remote Airbyte instance. - - Returns: - Union[SourceRead, DestinationRead, ConnectionRead]: The updated resource. - """ return self._create_or_update( self._update_fn, self.update_payload, _check_return_type=False ) # Disable check_return_type as the returned payload does not match the open api spec. def _search(self, check_return_type=True) -> dict: - """Run search of a resources on the remote Airbyte instance. - - Returns: - Union[SourceReadList, DestinationReadList, ConnectionReadList]: Search results - """ return self._search_fn(self.api_instance, self.search_payload, _check_return_type=False) - def _get_comparable_configuration(self): + def _get_comparable_configuration(self) -> dict: """Get the object to which local configuration will be compared to. Returns: