Skip to content

🐙 octavia-cli: add generate for connections #10809

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
623d6f8
create connection cli command
marcosmarxm Feb 25, 2022
dab0307
add connection.yaml template
marcosmarxm Feb 28, 2022
a85dd65
add template file
marcosmarxm Mar 2, 2022
eb800bf
remove unused vars and imports
marcosmarxm Mar 3, 2022
fae98ea
create group for generate command
marcosmarxm Mar 3, 2022
095984a
merge master
marcosmarxm Mar 7, 2022
d0a547c
correct connection template
marcosmarxm Mar 7, 2022
da88602
implement suggestions
alafanechere Mar 7, 2022
42ba11f
implement suggestions
alafanechere Mar 7, 2022
f05d24d
fix existing tests
alafanechere Mar 7, 2022
b2981d1
test resources
alafanechere Mar 8, 2022
37b28f9
test renderer
alafanechere Mar 8, 2022
65ae1b2
rename renderer module to renderers
alafanechere Mar 8, 2022
07ec301
Merge branch 'master' into marcos/octavia-generate-connection
alafanechere Mar 8, 2022
de5dce9
test generate command
alafanechere Mar 8, 2022
3a17693
fix yaml style
alafanechere Mar 8, 2022
699cb31
fix yaml style
alafanechere Mar 8, 2022
fef3815
revert unrelated changes
alafanechere Mar 8, 2022
910e052
revert unrelated changes
alafanechere Mar 8, 2022
15ea2be
clean
alafanechere Mar 8, 2022
0a2d90d
clean
alafanechere Mar 8, 2022
31e515a
add required resource_name field to template
alafanechere Mar 8, 2022
8c4ceee
undo snakecasing + delete jsonschema
alafanechere Mar 8, 2022
81e04b7
add source_id destination_id to yaml top
alafanechere Mar 8, 2022
7462827
fix
alafanechere Mar 8, 2022
fdd7909
add json schema in connection integration test
alafanechere Mar 9, 2022
be2a910
Merge branch 'master' into marcos/octavia-generate-connection
alafanechere Mar 9, 2022
d184a88
make write_yaml a base method + implement _render
alafanechere Mar 9, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Configuration for connection my_new_connection
definition_type: connection
resource_name: my_new_connection
source_id: my_source_id
destination_id: my_destination_id

# EDIT THE CONFIGURATION BELOW!
configuration:
sourceId: my_source_id # REQUIRED | string
destinationId: my_destination_id # REQUIRED | string
status: active # REQUIRED | string | Allowed values: active, inactive, deprecated
name: my_new_connection # OPTIONAL | string | Optional name of the connection
namespaceDefinition: source # OPTIONAL | string | Allowed values: source, destination, customformat
namespaceFormat: "${SOURCE_NAMESPACE}" # OPTIONAL | string | Used when namespaceDefinition is 'customformat'. If blank then behaves like namespaceDefinition = 'destination'. If "${SOURCE_NAMESPACE}" then behaves like namespaceDefinition = 'source'.
prefix: "" # REQUIRED | Prefix that will be prepended to the name of each stream when it is written to the destination
resourceRequirements: # OPTIONAL | object | Resource requirements to run workers (blank for unbounded allocations)
cpu_limit: "" # OPTIONAL
cpu_request: "" # OPTIONAL
memory_limit: "" # OPTIONAL
memory_request: "" # OPTIONAL
schedule: # OPTIONAL | object
timeUnit: hours # REQUIRED | string | Allowed values: minutes, hours, days, weeks, months
units: 1 # REQUIRED | integer
syncCatalog: # OPTIONAL | object | 🚨 ONLY edit streams.config, streams.stream should not be edited as schema cannot be changed.
streams:
- config:
aliasName: aliasMock
cursorField: []
destinationSyncMode: append
primaryKey: []
selected: true
syncMode: full_refresh
stream:
defaultCursorField:
- foo
jsonSchema:
$schema: http://json-schema.org/draft-07/schema#
properties:
foo:
type: number
name: stream_1
namespace: null
sourceDefinedCursor: null
sourceDefinedPrimaryKey: []
supportedSyncModes:
- full_refresh
- config:
aliasName: aliasMock
cursorField: []
destinationSyncMode: append
primaryKey: []
selected: true
syncMode: full_refresh
stream:
defaultCursorField: []
jsonSchema:
$schema: http://json-schema.org/draft-07/schema#
properties:
bar:
type: number
name: stream_2
namespace: null
sourceDefinedCursor: null
sourceDefinedPrimaryKey: []
supportedSyncModes:
- full_refresh
- incremental
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import pytest
import yaml
from octavia_cli.generate.renderer import ConnectionSpecificationRenderer
from octavia_cli.generate.renderers import ConnectionRenderer, ConnectorSpecificationRenderer

pytestmark = pytest.mark.integration
SOURCE_SPECS = "../airbyte-config/init/src/main/resources/seed/source_specs.yaml"
Expand All @@ -26,7 +26,7 @@ def get_all_specs_params():

@pytest.mark.parametrize("spec_type, spec", get_all_specs_params())
def test_render_spec(spec_type, spec, octavia_project_directory, mocker):
renderer = ConnectionSpecificationRenderer(
renderer = ConnectorSpecificationRenderer(
resource_name=f"resource-{spec['dockerImage']}",
definition=mocker.Mock(
type=spec_type,
Expand Down Expand Up @@ -66,10 +66,12 @@ def test_render_spec(spec_type, spec, octavia_project_directory, mocker):
("my_s3_destination", "destination", "destination_s3/input_spec.yaml", "destination_s3/expected.yaml"),
],
)
def test_expected_output(resource_name, spec_type, input_spec_path, expected_yaml_path, octavia_project_directory, mocker):
def test_expected_output_connector_specification_renderer(
resource_name, spec_type, input_spec_path, expected_yaml_path, octavia_project_directory, mocker
):
with open(os.path.join(EXPECTED_RENDERED_YAML_PATH, input_spec_path), "r") as f:
input_spec = yaml.safe_load(f)
renderer = ConnectionSpecificationRenderer(
renderer = ConnectorSpecificationRenderer(
resource_name=resource_name,
definition=mocker.Mock(
type=spec_type,
Expand All @@ -83,3 +85,71 @@ def test_expected_output(resource_name, spec_type, input_spec_path, expected_yam
output_path = renderer.write_yaml(octavia_project_directory)
expect_output_path = os.path.join(EXPECTED_RENDERED_YAML_PATH, expected_yaml_path)
assert filecmp.cmp(output_path, expect_output_path)


def test_expected_output_connection_renderer(octavia_project_directory, mocker):
mock_source = mocker.Mock(
resource_id="my_source_id",
catalog={
"streams": [
{
"stream": {
"name": "stream_1",
"jsonSchema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"foo": {
"type": "number",
}
},
},
"supportedSyncModes": ["full_refresh"],
"sourceDefinedCursor": None,
"defaultCursorField": ["foo"],
"sourceDefinedPrimaryKey": [],
"namespace": None,
},
"config": {
"syncMode": "full_refresh",
"cursorField": [],
"destinationSyncMode": "append",
"primaryKey": [],
"aliasName": "aliasMock",
"selected": True,
},
},
{
"stream": {
"name": "stream_2",
"jsonSchema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"bar": {
"type": "number",
}
},
},
"supportedSyncModes": ["full_refresh", "incremental"],
"sourceDefinedCursor": None,
"defaultCursorField": [],
"sourceDefinedPrimaryKey": [],
"namespace": None,
},
"config": {
"syncMode": "full_refresh",
"cursorField": [],
"destinationSyncMode": "append",
"primaryKey": [],
"aliasName": "aliasMock",
"selected": True,
},
},
]
},
)
mock_destination = mocker.Mock(resource_id="my_destination_id")

renderer = ConnectionRenderer("my_new_connection", mock_source, mock_destination)
output_path = renderer.write_yaml(octavia_project_directory)
expect_output_path = os.path.join(EXPECTED_RENDERED_YAML_PATH, "connection/expected.yaml")
assert filecmp.cmp(output_path, expect_output_path)
26 changes: 26 additions & 0 deletions octavia-cli/octavia_cli/apply/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
import airbyte_api_client
import yaml
from airbyte_api_client.api import destination_api, source_api
from airbyte_api_client.model.airbyte_catalog import AirbyteCatalog
from airbyte_api_client.model.destination_create import DestinationCreate
from airbyte_api_client.model.destination_read import DestinationRead
from airbyte_api_client.model.destination_read_list import DestinationReadList
from airbyte_api_client.model.destination_search import DestinationSearch
from airbyte_api_client.model.destination_update import DestinationUpdate
from airbyte_api_client.model.source_create import SourceCreate
from airbyte_api_client.model.source_id_request_body import SourceIdRequestBody
from airbyte_api_client.model.source_read import SourceRead
from airbyte_api_client.model.source_read_list import SourceReadList
from airbyte_api_client.model.source_search import SourceSearch
Expand Down Expand Up @@ -355,6 +357,30 @@ def update_payload(self):
name=self.resource_name,
)

@property
def resource_id_request_body(self) -> SourceIdRequestBody:
"""Creates SourceIdRequestBody from resource id.

Raises:
NonExistingResourceError: raised if the resource id is None.

Returns:
SourceIdRequestBody: The SourceIdRequestBody model instance.
"""
if self.resource_id is None:
raise NonExistingResourceError("The resource id could not be retrieved, the remote resource is not existing.")
return SourceIdRequestBody(source_id=self.resource_id)

@property
def catalog(self) -> AirbyteCatalog:
"""Retrieves the source's Airbyte catalog.

Returns:
AirbyteCatalog: The catalog issued by schema discovery.
"""
schema = self.api_instance.discover_schema_for_source(self.resource_id_request_body, _check_return_type=False)
return schema.catalog


class Destination(BaseResource):

Expand Down
72 changes: 63 additions & 9 deletions octavia-cli/octavia_cli/generate/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,74 @@

import click
import octavia_cli.generate.definitions as definitions
from octavia_cli.apply import resources
from octavia_cli.check_context import requires_init

from .renderer import ConnectionSpecificationRenderer
from .renderers import ConnectionRenderer, ConnectorSpecificationRenderer


@click.command(name="generate", help="Generate a YAML template for a source or a destination.")
@click.argument("definition_type", type=click.Choice(["source", "destination"]))
@click.argument("definition_id", type=click.STRING)
@click.argument("resource_name", type=click.STRING)
@click.group("generate", help="Generate a YAML template for a source, destination or a connection.")
@click.pass_context
@requires_init
def generate(ctx: click.Context, definition_type: str, definition_id: str, resource_name: str):
definition = definitions.factory(definition_type, ctx.obj["API_CLIENT"], definition_id)
renderer = ConnectionSpecificationRenderer(resource_name, definition)
def generate(ctx: click.Context):
pass


def generate_source_or_destination(definition_type, api_client, definition_id, resource_name):
definition = definitions.factory(definition_type, api_client, definition_id)
renderer = ConnectorSpecificationRenderer(resource_name, definition)
output_path = renderer.write_yaml(project_path=".")
message = f"✅ - Created the specification template for {resource_name} in {output_path}."
message = f"✅ - Created the {definition_type} template for {resource_name} in {output_path}."
click.echo(click.style(message, fg="green"))


@generate.command(name="source", help="Create YAML for a source")
@click.argument("definition_id", type=click.STRING)
@click.argument("resource_name", type=click.STRING)
@click.pass_context
def source(ctx: click.Context, definition_id: str, resource_name: str):
generate_source_or_destination("source", ctx.obj["API_CLIENT"], definition_id, resource_name)


@generate.command(name="destination", help="Create YAML for a destination")
@click.argument("definition_id", type=click.STRING)
@click.argument("resource_name", type=click.STRING)
@click.pass_context
def destination(ctx: click.Context, definition_id: str, resource_name: str):
generate_source_or_destination("destination", ctx.obj["API_CLIENT"], definition_id, resource_name)


@generate.command(name="connection", help="Generate a YAML template for a connection.")
@click.argument("connection_name", type=click.STRING)
@click.option(
"--source",
"source_path",
type=click.Path(exists=True, readable=True),
required=True,
help="Path to the YAML fine defining your source configuration.",
)
@click.option(
"--destination",
"destination_path",
type=click.Path(exists=True, readable=True),
required=True,
help="Path to the YAML fine defining your destination configuration.",
)
@click.pass_context
def connection(ctx: click.Context, connection_name: str, source_path: str, destination_path: str):
source = resources.factory(ctx.obj["API_CLIENT"], ctx.obj["WORKSPACE_ID"], source_path)
if not source.was_created:
raise resources.NonExistingResourceError(
f"The source defined at {source_path} does not exists. Please run octavia apply before creating this connection."
)

destination = resources.factory(ctx.obj["API_CLIENT"], ctx.obj["WORKSPACE_ID"], destination_path)
if not destination.was_created:
raise resources.NonExistingResourceError(
f"The destination defined at {destination_path} does not exists. Please run octavia apply before creating this connection."
)

connection_renderer = ConnectionRenderer(connection_name, source, destination)
output_path = connection_renderer.write_yaml(project_path=".")
message = f"✅ - Created the connection template for {connection_name} in {output_path}."
click.echo(click.style(message, fg="green"))
4 changes: 4 additions & 0 deletions octavia-cli/octavia_cli/generate/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ def __getattr__(self, name: str) -> Any:
raise AttributeError(f"{self.__class__.__name__}.{name} is invalid.")


class ConnectionDefinition(BaseDefinition):
type = "connection"


class SourceDefinition(BaseDefinition):
api = source_definition_api.SourceDefinitionApi
type = "source"
Expand Down
Loading