Skip to content

Commit 22c80b7

Browse files
committed
implement apply
1 parent f71754d commit 22c80b7

File tree

9 files changed

+451
-14
lines changed

9 files changed

+451
-14
lines changed

octavia-cli/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ RUN apt-get upgrade \
66
WORKDIR /home/octavia-cli
77
COPY . ./
88

9-
RUN pip install --no-cache-dir .
9+
RUN pip install -e .\[dev\]
1010

1111
RUN useradd --create-home --shell /bin/bash octavia-cli
1212
USER octavia-cli
Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
#
2+
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import abc
6+
import os
7+
import time
8+
from pathlib import Path
9+
from typing import Any, Callable
10+
11+
import airbyte_api_client
12+
import yaml
13+
from airbyte_api_client.api import destination_api, source_api
14+
from airbyte_api_client.model.destination_create import DestinationCreate
15+
from airbyte_api_client.model.destination_search import DestinationSearch
16+
from airbyte_api_client.model.destination_update import DestinationUpdate
17+
from airbyte_api_client.model.source_create import SourceCreate
18+
from airbyte_api_client.model.source_search import SourceSearch
19+
from airbyte_api_client.model.source_update import SourceUpdate
20+
from click import ClickException
21+
22+
from .diff_helpers import compute_checksum, compute_diff
23+
24+
25+
class DuplicateRessourceError(ClickException):
26+
pass
27+
28+
29+
class InvalidConfigurationError(ClickException):
30+
pass
31+
32+
33+
class ResourceState:
34+
def __init__(self, configuration_path, resource_id, generation_timestamp, configuration_checksum):
35+
self.configuration_path = configuration_path
36+
self.resource_id = resource_id
37+
self.generation_timestamp = generation_timestamp
38+
self.configuration_checksum = configuration_checksum
39+
self.path = os.path.join(os.path.dirname(self.configuration_path), "state.yaml")
40+
41+
def _save(self):
42+
content = {
43+
"configuration_path": self.configuration_path,
44+
"resource_id": self.resource_id,
45+
"generation_timestamp": self.generation_timestamp,
46+
"configuration_checksum": self.configuration_checksum,
47+
}
48+
with open(self.path, "w") as state_file:
49+
yaml.dump(content, state_file)
50+
51+
@classmethod
52+
def create(cls, configuration_path, resource_id):
53+
generation_timestamp = int(time.time())
54+
configuration_checksum = compute_checksum(configuration_path)
55+
state = ResourceState(configuration_path, resource_id, generation_timestamp, configuration_checksum)
56+
state._save()
57+
return state
58+
59+
@classmethod
60+
def from_file(cls, file_path):
61+
with open(file_path, "r") as f:
62+
raw_state = yaml.load(f, yaml.FullLoader)
63+
return ResourceState(
64+
raw_state["configuration_path"],
65+
raw_state["resource_id"],
66+
raw_state["generation_timestamp"],
67+
raw_state["configuration_checksum"],
68+
)
69+
70+
71+
class BaseAirbyteResource(abc.ABC):
72+
@property
73+
@abc.abstractmethod
74+
def api(
75+
self,
76+
): # pragma: no cover
77+
pass
78+
79+
@property
80+
@abc.abstractmethod
81+
def create_function_name(
82+
self,
83+
): # pragma: no cover
84+
pass
85+
86+
@property
87+
@abc.abstractmethod
88+
def update_function_name(
89+
self,
90+
): # pragma: no cover
91+
pass
92+
93+
@property
94+
@abc.abstractmethod
95+
def search_function_name(
96+
self,
97+
): # pragma: no cover
98+
pass
99+
100+
@property
101+
@abc.abstractmethod
102+
def create_payload(
103+
self,
104+
): # pragma: no cover
105+
pass
106+
107+
@property
108+
@abc.abstractmethod
109+
def search_payload(
110+
self,
111+
): # pragma: no cover
112+
pass
113+
114+
@property
115+
@abc.abstractmethod
116+
def update_payload(
117+
self,
118+
): # pragma: no cover
119+
pass
120+
121+
@property
122+
@abc.abstractmethod
123+
def resource_id_field(
124+
self,
125+
): # pragma: no cover
126+
pass
127+
128+
@property
129+
@abc.abstractmethod
130+
def resource_type(
131+
self,
132+
): # pragma: no cover
133+
pass
134+
135+
@property
136+
def _create_fn(self) -> Callable:
137+
return getattr(self.api, self.create_function_name)
138+
139+
@property
140+
def _update_fn(self) -> Callable:
141+
return getattr(self.api, self.update_function_name)
142+
143+
@property
144+
def _search_fn(self) -> Callable:
145+
return getattr(self.api, self.search_function_name)
146+
147+
def __init__(self, api_client: airbyte_api_client.ApiClient, workspace_id, local_configuration: dict, configuration_path: str) -> None:
148+
self.workspace_id = workspace_id
149+
self.local_configuration = local_configuration
150+
self.configuration_path = configuration_path
151+
self.api_instance = self.api(api_client)
152+
self.state = self.get_state()
153+
self.remote_resource = self.get_remote_resource()
154+
self.was_created = True if self.remote_resource else False
155+
self.local_file_changed = (
156+
True if self.state is None else compute_checksum(self.configuration_path) != self.state.configuration_checksum
157+
)
158+
159+
def get_state(self):
160+
expected_state_path = Path(os.path.join(os.path.dirname(self.configuration_path), "state.yaml"))
161+
if expected_state_path.is_file():
162+
return ResourceState.from_file(expected_state_path)
163+
164+
def get_connection_configuration_diff(self):
165+
current_config = self.configuration
166+
if self.was_created:
167+
remote_config = self.remote_resource.connection_configuration
168+
diff = compute_diff(remote_config, current_config)
169+
return diff.pretty()
170+
171+
def __getattr__(self, name: str) -> Any:
172+
"""Map attribute of the YAML config to the BaseAirbyteResource object.
173+
174+
Args:
175+
name (str): Attribute name
176+
177+
Raises:
178+
AttributeError: Raised if the attributed was not found in the API response payload.
179+
180+
Returns:
181+
[Any]: Attribute value
182+
"""
183+
if name in self.local_configuration:
184+
return self.local_configuration.get(name)
185+
raise AttributeError(f"{self.__class__.__name__}.{name} is invalid.")
186+
187+
def _create_or_update(self, operation_fn, payload):
188+
try:
189+
result = operation_fn(self.api_instance, payload)
190+
return result, ResourceState.create(self.configuration_path, result[self.resource_id_field])
191+
except airbyte_api_client.ApiException as e:
192+
if e.status == 422:
193+
# This error is really verbose from the API response, but it embodies all the details about why the config is not valid.
194+
# TODO alafanechere: try to parse it and display it in a more readable way.
195+
raise InvalidConfigurationError(e.body)
196+
else:
197+
raise e
198+
199+
def create(self):
200+
return self._create_or_update(self._create_fn, self.create_payload)
201+
202+
def update(self):
203+
return self._create_or_update(self._update_fn, self.update_payload)
204+
205+
def _search(self):
206+
return self._search_fn(self.api_instance, self.search_payload)
207+
208+
def get_remote_resource(self):
209+
search_results = self._search().get(f"{self.resource_type}s", [])
210+
if len(search_results) > 1:
211+
raise DuplicateRessourceError("Two or more ressource exist with the same name")
212+
if len(search_results) == 1:
213+
return search_results[0]
214+
else:
215+
return None
216+
217+
@property
218+
def resource_id(self):
219+
return self.remote_resource.get(self.resource_id_field)
220+
221+
222+
class Source(BaseAirbyteResource):
223+
224+
api = source_api.SourceApi
225+
create_function_name = "create_source"
226+
resource_id_field = "source_id"
227+
search_function_name = "search_sources"
228+
update_function_name = "update_source"
229+
resource_type = "source"
230+
231+
@property
232+
def create_payload(self):
233+
return SourceCreate(self.definition_id, self.configuration, self.workspace_id, self.resource_name)
234+
235+
@property
236+
def search_payload(self):
237+
if self.state is None:
238+
return SourceSearch(source_definition_id=self.definition_id, workspace_id=self.workspace_id, name=self.resource_name)
239+
else:
240+
return SourceSearch(source_definition_id=self.definition_id, workspace_id=self.workspace_id, source_id=self.state.resource_id)
241+
242+
@property
243+
def update_payload(self):
244+
return SourceUpdate(
245+
source_id=self.resource_id,
246+
connection_configuration=self.configuration,
247+
name=self.resource_name,
248+
)
249+
250+
251+
class Destination(BaseAirbyteResource):
252+
api = destination_api.DestinationApi
253+
create_function_name = "create_destination"
254+
resource_id_field = "destination_id"
255+
search_function_name = "search_destinations"
256+
update_function_name = "update_destination"
257+
resource_type = "destination"
258+
259+
@property
260+
def create_payload(self):
261+
return DestinationCreate(self.workspace_id, self.resource_name, self.definition_id, self.configuration)
262+
263+
@property
264+
def search_payload(self):
265+
if self.state is None:
266+
return DestinationSearch(destination_definition_id=self.definition_id, workspace_id=self.workspace_id, name=self.resource_name)
267+
else:
268+
return DestinationSearch(
269+
destination_definition_id=self.definition_id, workspace_id=self.workspace_id, destination_id=self.state.resource_id
270+
)
271+
272+
@property
273+
def update_payload(self):
274+
return DestinationUpdate(
275+
destination_id=self.resource_id,
276+
connection_configuration=self.configuration,
277+
name=self.resource_name,
278+
)
279+
280+
281+
def factory(api_client, workspace_id, configuration_path):
282+
with open(configuration_path, "r") as f:
283+
local_configuration = yaml.load(f, yaml.FullLoader)
284+
if local_configuration["definition_type"] == "source":
285+
AirbyteResource = Source
286+
if local_configuration["definition_type"] == "destination":
287+
AirbyteResource = Destination
288+
return AirbyteResource(api_client, workspace_id, local_configuration, configuration_path)

0 commit comments

Comments
 (0)