Skip to content

Commit 8f49e19

Browse files
🐙 octavia-cli: add generate for connections (#10809)
* create connection cli command * add connection.yaml template * add template file * remove unused vars and imports * create group for generate command * correct connection template * implement suggestions * implement suggestions * fix existing tests * test resources * test renderer * rename renderer module to renderers * test generate command * fix yaml style * fix yaml style * revert unrelated changes * revert unrelated changes * clean * clean * add required resource_name field to template * undo snakecasing + delete jsonschema * add source_id destination_id to yaml top * fix * add json schema in connection integration test * make write_yaml a base method + implement _render Co-authored-by: alafanechere <[email protected]>
1 parent 124a3c6 commit 8f49e19

File tree

14 files changed

+833
-262
lines changed

14 files changed

+833
-262
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# Configuration for connection my_new_connection
2+
definition_type: connection
3+
resource_name: my_new_connection
4+
source_id: my_source_id
5+
destination_id: my_destination_id
6+
7+
# EDIT THE CONFIGURATION BELOW!
8+
configuration:
9+
sourceId: my_source_id # REQUIRED | string
10+
destinationId: my_destination_id # REQUIRED | string
11+
status: active # REQUIRED | string | Allowed values: active, inactive, deprecated
12+
name: my_new_connection # OPTIONAL | string | Optional name of the connection
13+
namespaceDefinition: source # OPTIONAL | string | Allowed values: source, destination, customformat
14+
namespaceFormat: "${SOURCE_NAMESPACE}" # OPTIONAL | string | Used when namespaceDefinition is 'customformat'. If blank then behaves like namespaceDefinition = 'destination'. If "${SOURCE_NAMESPACE}" then behaves like namespaceDefinition = 'source'.
15+
prefix: "" # REQUIRED | Prefix that will be prepended to the name of each stream when it is written to the destination
16+
resourceRequirements: # OPTIONAL | object | Resource requirements to run workers (blank for unbounded allocations)
17+
cpu_limit: "" # OPTIONAL
18+
cpu_request: "" # OPTIONAL
19+
memory_limit: "" # OPTIONAL
20+
memory_request: "" # OPTIONAL
21+
schedule: # OPTIONAL | object
22+
timeUnit: hours # REQUIRED | string | Allowed values: minutes, hours, days, weeks, months
23+
units: 1 # REQUIRED | integer
24+
syncCatalog: # OPTIONAL | object | 🚨 ONLY edit streams.config, streams.stream should not be edited as schema cannot be changed.
25+
streams:
26+
- config:
27+
aliasName: aliasMock
28+
cursorField: []
29+
destinationSyncMode: append
30+
primaryKey: []
31+
selected: true
32+
syncMode: full_refresh
33+
stream:
34+
defaultCursorField:
35+
- foo
36+
jsonSchema:
37+
$schema: http://json-schema.org/draft-07/schema#
38+
properties:
39+
foo:
40+
type: number
41+
name: stream_1
42+
namespace: null
43+
sourceDefinedCursor: null
44+
sourceDefinedPrimaryKey: []
45+
supportedSyncModes:
46+
- full_refresh
47+
- config:
48+
aliasName: aliasMock
49+
cursorField: []
50+
destinationSyncMode: append
51+
primaryKey: []
52+
selected: true
53+
syncMode: full_refresh
54+
stream:
55+
defaultCursorField: []
56+
jsonSchema:
57+
$schema: http://json-schema.org/draft-07/schema#
58+
properties:
59+
bar:
60+
type: number
61+
name: stream_2
62+
namespace: null
63+
sourceDefinedCursor: null
64+
sourceDefinedPrimaryKey: []
65+
supportedSyncModes:
66+
- full_refresh
67+
- incremental

octavia-cli/integration_tests/test_generate/test_renderer.py renamed to octavia-cli/integration_tests/test_generate/test_renderers.py

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import pytest
99
import yaml
10-
from octavia_cli.generate.renderer import ConnectionSpecificationRenderer
10+
from octavia_cli.generate.renderers import ConnectionRenderer, ConnectorSpecificationRenderer
1111

1212
pytestmark = pytest.mark.integration
1313
SOURCE_SPECS = "../airbyte-config/init/src/main/resources/seed/source_specs.yaml"
@@ -26,7 +26,7 @@ def get_all_specs_params():
2626

2727
@pytest.mark.parametrize("spec_type, spec", get_all_specs_params())
2828
def test_render_spec(spec_type, spec, octavia_project_directory, mocker):
29-
renderer = ConnectionSpecificationRenderer(
29+
renderer = ConnectorSpecificationRenderer(
3030
resource_name=f"resource-{spec['dockerImage']}",
3131
definition=mocker.Mock(
3232
type=spec_type,
@@ -66,10 +66,12 @@ def test_render_spec(spec_type, spec, octavia_project_directory, mocker):
6666
("my_s3_destination", "destination", "destination_s3/input_spec.yaml", "destination_s3/expected.yaml"),
6767
],
6868
)
69-
def test_expected_output(resource_name, spec_type, input_spec_path, expected_yaml_path, octavia_project_directory, mocker):
69+
def test_expected_output_connector_specification_renderer(
70+
resource_name, spec_type, input_spec_path, expected_yaml_path, octavia_project_directory, mocker
71+
):
7072
with open(os.path.join(EXPECTED_RENDERED_YAML_PATH, input_spec_path), "r") as f:
7173
input_spec = yaml.safe_load(f)
72-
renderer = ConnectionSpecificationRenderer(
74+
renderer = ConnectorSpecificationRenderer(
7375
resource_name=resource_name,
7476
definition=mocker.Mock(
7577
type=spec_type,
@@ -83,3 +85,71 @@ def test_expected_output(resource_name, spec_type, input_spec_path, expected_yam
8385
output_path = renderer.write_yaml(octavia_project_directory)
8486
expect_output_path = os.path.join(EXPECTED_RENDERED_YAML_PATH, expected_yaml_path)
8587
assert filecmp.cmp(output_path, expect_output_path)
88+
89+
90+
def test_expected_output_connection_renderer(octavia_project_directory, mocker):
91+
mock_source = mocker.Mock(
92+
resource_id="my_source_id",
93+
catalog={
94+
"streams": [
95+
{
96+
"stream": {
97+
"name": "stream_1",
98+
"jsonSchema": {
99+
"$schema": "http://json-schema.org/draft-07/schema#",
100+
"properties": {
101+
"foo": {
102+
"type": "number",
103+
}
104+
},
105+
},
106+
"supportedSyncModes": ["full_refresh"],
107+
"sourceDefinedCursor": None,
108+
"defaultCursorField": ["foo"],
109+
"sourceDefinedPrimaryKey": [],
110+
"namespace": None,
111+
},
112+
"config": {
113+
"syncMode": "full_refresh",
114+
"cursorField": [],
115+
"destinationSyncMode": "append",
116+
"primaryKey": [],
117+
"aliasName": "aliasMock",
118+
"selected": True,
119+
},
120+
},
121+
{
122+
"stream": {
123+
"name": "stream_2",
124+
"jsonSchema": {
125+
"$schema": "http://json-schema.org/draft-07/schema#",
126+
"properties": {
127+
"bar": {
128+
"type": "number",
129+
}
130+
},
131+
},
132+
"supportedSyncModes": ["full_refresh", "incremental"],
133+
"sourceDefinedCursor": None,
134+
"defaultCursorField": [],
135+
"sourceDefinedPrimaryKey": [],
136+
"namespace": None,
137+
},
138+
"config": {
139+
"syncMode": "full_refresh",
140+
"cursorField": [],
141+
"destinationSyncMode": "append",
142+
"primaryKey": [],
143+
"aliasName": "aliasMock",
144+
"selected": True,
145+
},
146+
},
147+
]
148+
},
149+
)
150+
mock_destination = mocker.Mock(resource_id="my_destination_id")
151+
152+
renderer = ConnectionRenderer("my_new_connection", mock_source, mock_destination)
153+
output_path = renderer.write_yaml(octavia_project_directory)
154+
expect_output_path = os.path.join(EXPECTED_RENDERED_YAML_PATH, "connection/expected.yaml")
155+
assert filecmp.cmp(output_path, expect_output_path)

octavia-cli/octavia_cli/apply/resources.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111
import airbyte_api_client
1212
import yaml
1313
from airbyte_api_client.api import destination_api, source_api
14+
from airbyte_api_client.model.airbyte_catalog import AirbyteCatalog
1415
from airbyte_api_client.model.destination_create import DestinationCreate
1516
from airbyte_api_client.model.destination_read import DestinationRead
1617
from airbyte_api_client.model.destination_read_list import DestinationReadList
1718
from airbyte_api_client.model.destination_search import DestinationSearch
1819
from airbyte_api_client.model.destination_update import DestinationUpdate
1920
from airbyte_api_client.model.source_create import SourceCreate
21+
from airbyte_api_client.model.source_id_request_body import SourceIdRequestBody
2022
from airbyte_api_client.model.source_read import SourceRead
2123
from airbyte_api_client.model.source_read_list import SourceReadList
2224
from airbyte_api_client.model.source_search import SourceSearch
@@ -355,6 +357,30 @@ def update_payload(self):
355357
name=self.resource_name,
356358
)
357359

360+
@property
361+
def resource_id_request_body(self) -> SourceIdRequestBody:
362+
"""Creates SourceIdRequestBody from resource id.
363+
364+
Raises:
365+
NonExistingResourceError: raised if the resource id is None.
366+
367+
Returns:
368+
SourceIdRequestBody: The SourceIdRequestBody model instance.
369+
"""
370+
if self.resource_id is None:
371+
raise NonExistingResourceError("The resource id could not be retrieved, the remote resource is not existing.")
372+
return SourceIdRequestBody(source_id=self.resource_id)
373+
374+
@property
375+
def catalog(self) -> AirbyteCatalog:
376+
"""Retrieves the source's Airbyte catalog.
377+
378+
Returns:
379+
AirbyteCatalog: The catalog issued by schema discovery.
380+
"""
381+
schema = self.api_instance.discover_schema_for_source(self.resource_id_request_body, _check_return_type=False)
382+
return schema.catalog
383+
358384

359385
class Destination(BaseResource):
360386

octavia-cli/octavia_cli/generate/commands.py

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,74 @@
44

55
import click
66
import octavia_cli.generate.definitions as definitions
7+
from octavia_cli.apply import resources
78
from octavia_cli.check_context import requires_init
89

9-
from .renderer import ConnectionSpecificationRenderer
10+
from .renderers import ConnectionRenderer, ConnectorSpecificationRenderer
1011

1112

12-
@click.command(name="generate", help="Generate a YAML template for a source or a destination.")
13-
@click.argument("definition_type", type=click.Choice(["source", "destination"]))
14-
@click.argument("definition_id", type=click.STRING)
15-
@click.argument("resource_name", type=click.STRING)
13+
@click.group("generate", help="Generate a YAML template for a source, destination or a connection.")
1614
@click.pass_context
1715
@requires_init
18-
def generate(ctx: click.Context, definition_type: str, definition_id: str, resource_name: str):
19-
definition = definitions.factory(definition_type, ctx.obj["API_CLIENT"], definition_id)
20-
renderer = ConnectionSpecificationRenderer(resource_name, definition)
16+
def generate(ctx: click.Context):
17+
pass
18+
19+
20+
def generate_source_or_destination(definition_type, api_client, definition_id, resource_name):
21+
definition = definitions.factory(definition_type, api_client, definition_id)
22+
renderer = ConnectorSpecificationRenderer(resource_name, definition)
2123
output_path = renderer.write_yaml(project_path=".")
22-
message = f"✅ - Created the specification template for {resource_name} in {output_path}."
24+
message = f"✅ - Created the {definition_type} template for {resource_name} in {output_path}."
25+
click.echo(click.style(message, fg="green"))
26+
27+
28+
@generate.command(name="source", help="Create YAML for a source")
29+
@click.argument("definition_id", type=click.STRING)
30+
@click.argument("resource_name", type=click.STRING)
31+
@click.pass_context
32+
def source(ctx: click.Context, definition_id: str, resource_name: str):
33+
generate_source_or_destination("source", ctx.obj["API_CLIENT"], definition_id, resource_name)
34+
35+
36+
@generate.command(name="destination", help="Create YAML for a destination")
37+
@click.argument("definition_id", type=click.STRING)
38+
@click.argument("resource_name", type=click.STRING)
39+
@click.pass_context
40+
def destination(ctx: click.Context, definition_id: str, resource_name: str):
41+
generate_source_or_destination("destination", ctx.obj["API_CLIENT"], definition_id, resource_name)
42+
43+
44+
@generate.command(name="connection", help="Generate a YAML template for a connection.")
45+
@click.argument("connection_name", type=click.STRING)
46+
@click.option(
47+
"--source",
48+
"source_path",
49+
type=click.Path(exists=True, readable=True),
50+
required=True,
51+
help="Path to the YAML fine defining your source configuration.",
52+
)
53+
@click.option(
54+
"--destination",
55+
"destination_path",
56+
type=click.Path(exists=True, readable=True),
57+
required=True,
58+
help="Path to the YAML fine defining your destination configuration.",
59+
)
60+
@click.pass_context
61+
def connection(ctx: click.Context, connection_name: str, source_path: str, destination_path: str):
62+
source = resources.factory(ctx.obj["API_CLIENT"], ctx.obj["WORKSPACE_ID"], source_path)
63+
if not source.was_created:
64+
raise resources.NonExistingResourceError(
65+
f"The source defined at {source_path} does not exists. Please run octavia apply before creating this connection."
66+
)
67+
68+
destination = resources.factory(ctx.obj["API_CLIENT"], ctx.obj["WORKSPACE_ID"], destination_path)
69+
if not destination.was_created:
70+
raise resources.NonExistingResourceError(
71+
f"The destination defined at {destination_path} does not exists. Please run octavia apply before creating this connection."
72+
)
73+
74+
connection_renderer = ConnectionRenderer(connection_name, source, destination)
75+
output_path = connection_renderer.write_yaml(project_path=".")
76+
message = f"✅ - Created the connection template for {connection_name} in {output_path}."
2377
click.echo(click.style(message, fg="green"))

octavia-cli/octavia_cli/generate/definitions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ def __getattr__(self, name: str) -> Any:
8686
raise AttributeError(f"{self.__class__.__name__}.{name} is invalid.")
8787

8888

89+
class ConnectionDefinition(BaseDefinition):
90+
type = "connection"
91+
92+
8993
class SourceDefinition(BaseDefinition):
9094
api = source_definition_api.SourceDefinitionApi
9195
type = "source"

0 commit comments

Comments
 (0)