Skip to content

🐙 octavia-cli: implement init command #9665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions octavia-cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ We welcome community contributions!

| Date | Milestone |
|------------|-------------------------------------|
| 2022-01-25 | Implement `octavia init` + some context checks|
| 2022-01-19 | Implement `octavia list workspace sources`, `octavia list workspace destinations`, `octavia list workspace connections`|
| 2022-01-17 | Implement `octavia list connectors source` and `octavia list connectors destinations`|
| 2022-01-17 | Generate an API Python client from our Open API spec |
Expand Down
76 changes: 76 additions & 0 deletions octavia-cli/octavia_cli/check_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import os

import airbyte_api_client
import click
from airbyte_api_client.api import health_api, workspace_api
from airbyte_api_client.model.workspace_id_request_body import WorkspaceIdRequestBody
from urllib3.exceptions import MaxRetryError

from .init.commands import DIRECTORIES_TO_CREATE as REQUIRED_PROJECT_DIRECTORIES


class UnhealthyApiError(click.ClickException):
pass


class UnreachableAirbyteInstanceError(click.ClickException):
pass


class WorkspaceIdError(click.ClickException):
pass


def check_api_health(api_client: airbyte_api_client.ApiClient) -> None:
"""Check if the Airbyte API is network reachable and healthy.

Args:
api_client (airbyte_api_client.ApiClient): Airbyte API client.

Raises:
click.ClickException: Raised if the Airbyte api server is unavailable according to the API response.
click.ClickException: Raised if the Airbyte URL is not reachable.
"""
api_instance = health_api.HealthApi(api_client)
try:
api_response = api_instance.get_health_check()
if not api_response.available:
raise UnhealthyApiError("Your Airbyte instance is not ready to receive requests.")
except (airbyte_api_client.ApiException, MaxRetryError):
raise UnreachableAirbyteInstanceError(
"Could not reach your Airbyte instance, make sure the instance is up and running an network reachable."
)


def check_workspace_exists(api_client: airbyte_api_client.ApiClient, workspace_id: str) -> None:
"""Check if the provided workspace id corresponds to an existing workspace on the Airbyte instance.

Args:
api_client (airbyte_api_client.ApiClient): Airbyte API client.
workspace_id (str): Id of the workspace whose existence we are trying to verify.

Raises:
click.ClickException: Raised if the workspace does not exist on the Airbyte instance.
"""
api_instance = workspace_api.WorkspaceApi(api_client)
try:
api_instance.get_workspace(WorkspaceIdRequestBody(workspace_id=workspace_id), _check_return_type=False)
except airbyte_api_client.ApiException:
raise WorkspaceIdError("The workspace you are trying to use does not exist in your Airbyte instance")


def check_is_initialized(project_directory: str = ".") -> bool:
"""Check if required project directories exist to consider the project as initialized.

Args:
project_directory (str, optional): Where the project should be initialized. Defaults to ".".

Returns:
bool: [description]
"""
sub_directories = [f.name for f in os.scandir(project_directory) if f.is_dir()]
return set(REQUIRED_PROJECT_DIRECTORIES).issubset(sub_directories)
53 changes: 35 additions & 18 deletions octavia-cli/octavia_cli/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,58 @@
import click
from airbyte_api_client.api import workspace_api

from .check_context import check_api_health, check_is_initialized, check_workspace_exists
from .init import commands as init_commands
from .list import commands as list_commands

AVAILABLE_COMMANDS: List[click.Command] = [list_commands._list]
AVAILABLE_COMMANDS: List[click.Command] = [list_commands._list, init_commands.init]


@click.group()
@click.option("--airbyte-url", envvar="AIRBYTE_URL", default="http://localhost:8000", help="The URL of your Airbyte instance.")
@click.option(
"--workspace-id",
envvar="AIRBYTE_WORKSPACE_ID",
default=None,
help="The id of the workspace on which you want octavia-cli to work. Defaults to the first one found on your Airbyte instance.",
)
@click.pass_context
def octavia(ctx: click.Context, airbyte_url: str) -> None:
def octavia(ctx: click.Context, airbyte_url: str, workspace_id: str) -> None:
ctx.ensure_object(dict)
ctx.obj["API_CLIENT"] = get_api_client(airbyte_url)
ctx.obj["WORKSPACE_ID"] = get_workspace_id(ctx.obj["API_CLIENT"], workspace_id)
ctx.obj["PROJECT_IS_INITIALIZED"] = check_is_initialized()
click.echo(
click.style(
f"🐙 - Octavia is targetting your Airbyte instance running at {airbyte_url} on workspace {ctx.obj['WORKSPACE_ID']}.", fg="green"
)
)
if not ctx.obj["PROJECT_IS_INITIALIZED"]:
click.echo(click.style("🐙 - Project is not yet initialized.", fg="red", bold=True))


def get_api_client(airbyte_url):
client_configuration = airbyte_api_client.Configuration(host=f"{airbyte_url}/api")
api_client = airbyte_api_client.ApiClient(client_configuration)
# TODO alafanechere workspace check might deserve its own function
api_instance = workspace_api.WorkspaceApi(api_client)
# open-api-generator consider non-required field as not nullable
# This will break validation of WorkspaceRead object for firstCompletedSync and feedbackDone fields
# This is why we bypass _check_return_type
api_response = api_instance.list_workspaces(_check_return_type=False)
# TODO alafanechere prompt user to chose a workspace if multiple workspaces exist
workspace_id = api_response.workspaces[0]["workspaceId"]
click.echo(f"🐙 - Octavia is targetting your Airbyte instance running at {airbyte_url} on workspace {workspace_id}")
ctx.obj["API_CLIENT"] = api_client
ctx.obj["WORKSPACE_ID"] = workspace_id
check_api_health(api_client)
return api_client


def get_workspace_id(api_client, user_defined_workspace_id):
if user_defined_workspace_id:
check_workspace_exists(api_client, user_defined_workspace_id)
return user_defined_workspace_id
else:
api_instance = workspace_api.WorkspaceApi(api_client)
api_response = api_instance.list_workspaces(_check_return_type=False)
return api_response.workspaces[0]["workspaceId"]


def add_commands_to_octavia():
for command in AVAILABLE_COMMANDS:
octavia.add_command(command)


@octavia.command(help="Scaffolds a local project directories.")
def init():
raise click.ClickException("The init command is not yet implemented.")


@octavia.command(name="import", help="Import an existing resources from the Airbyte instance.")
def _import() -> None:
raise click.ClickException("The import command is not yet implemented.")
Expand Down
3 changes: 3 additions & 0 deletions octavia-cli/octavia_cli/init/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
34 changes: 34 additions & 0 deletions octavia-cli/octavia_cli/init/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import os
from typing import Iterable, Tuple

import click

DIRECTORIES_TO_CREATE = {"connections", "destinations", "sources"}


def create_directories(directories_to_create: Iterable[str]) -> Tuple[Iterable[str], Iterable[str]]:
created_directories = []
not_created_directories = []
for directory in directories_to_create:
try:
os.mkdir(directory)
created_directories.append(directory)
except FileExistsError:
not_created_directories.append(directory)
return created_directories, not_created_directories


@click.command(help="Initialize required directories for the project.")
def init():
click.echo("🔨 - Initializing the project.")
created_directories, not_created_directories = create_directories(DIRECTORIES_TO_CREATE)
if created_directories:
message = f"✅ - Created the following directories: {', '.join(created_directories)}."
click.echo(click.style(message, fg="green"))
if not_created_directories:
message = f"❓ - Already existing directories: {', '.join(not_created_directories) }."
click.echo(click.style(message, fg="yellow", bold=True))
85 changes: 85 additions & 0 deletions octavia-cli/unit_tests/test_check_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import os
import shutil
import tempfile
from pathlib import Path

import airbyte_api_client
import pytest
from airbyte_api_client.model.workspace_id_request_body import WorkspaceIdRequestBody
from octavia_cli import check_context
from urllib3.exceptions import MaxRetryError


@pytest.fixture
def mock_api_client(mocker):
return mocker.Mock()


def test_api_check_health_available(mock_api_client, mocker):
mocker.patch.object(check_context, "health_api")
mock_api_response = mocker.Mock(available=True)
check_context.health_api.HealthApi.return_value.get_health_check.return_value = mock_api_response

assert check_context.check_api_health(mock_api_client) is None
check_context.health_api.HealthApi.assert_called_with(mock_api_client)
api_instance = check_context.health_api.HealthApi.return_value
api_instance.get_health_check.assert_called()


def test_api_check_health_unavailable(mock_api_client, mocker):
mocker.patch.object(check_context, "health_api")
mock_api_response = mocker.Mock(available=False)
check_context.health_api.HealthApi.return_value.get_health_check.return_value = mock_api_response
with pytest.raises(check_context.UnhealthyApiError):
check_context.check_api_health(mock_api_client)


def test_api_check_health_unreachable_api_exception(mock_api_client, mocker):
mocker.patch.object(check_context, "health_api")
check_context.health_api.HealthApi.return_value.get_health_check.side_effect = airbyte_api_client.ApiException()
with pytest.raises(check_context.UnreachableAirbyteInstanceError):
check_context.check_api_health(mock_api_client)


def test_api_check_health_unreachable_max_retry_error(mock_api_client, mocker):
mocker.patch.object(check_context, "health_api")
check_context.health_api.HealthApi.return_value.get_health_check.side_effect = MaxRetryError("foo", "bar")
with pytest.raises(check_context.UnreachableAirbyteInstanceError):
check_context.check_api_health(mock_api_client)


def test_check_workspace_exists(mock_api_client, mocker):
mocker.patch.object(check_context, "workspace_api")
mock_api_instance = mocker.Mock()
check_context.workspace_api.WorkspaceApi.return_value = mock_api_instance
assert check_context.check_workspace_exists(mock_api_client, "foo") is None
check_context.workspace_api.WorkspaceApi.assert_called_with(mock_api_client)
mock_api_instance.get_workspace.assert_called_with(WorkspaceIdRequestBody("foo"), _check_return_type=False)


def test_check_workspace_exists_error(mock_api_client, mocker):
mocker.patch.object(check_context, "workspace_api")
check_context.workspace_api.WorkspaceApi.return_value.get_workspace.side_effect = airbyte_api_client.ApiException()
with pytest.raises(check_context.WorkspaceIdError):
check_context.check_workspace_exists(mock_api_client, "foo")


@pytest.fixture
def project_directories():
dirpath = tempfile.mkdtemp()
yield str(Path(dirpath).parent.absolute()), [os.path.basename(dirpath)]
shutil.rmtree(dirpath)


def test_check_is_initialized(mocker, project_directories):
project_directory, sub_directories = project_directories
mocker.patch.object(check_context, "REQUIRED_PROJECT_DIRECTORIES", sub_directories)
assert check_context.check_is_initialized(project_directory)


def test_check_not_initialized():
assert not check_context.check_is_initialized(".")
73 changes: 58 additions & 15 deletions octavia-cli/unit_tests/test_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from unittest import mock

import click
import pytest
from click.testing import CliRunner
Expand All @@ -17,25 +15,70 @@ def dumb(ctx):


def test_octavia(mocker):
mocker.patch.object(entrypoint, "workspace_api")
mocker.patch.object(entrypoint, "airbyte_api_client")

mocker.patch.object(entrypoint, "click")
mocker.patch.object(entrypoint, "get_api_client")
mocker.patch.object(entrypoint, "get_workspace_id", mocker.Mock(return_value="api-defined-workspace-id"))
mocker.patch.object(entrypoint, "check_is_initialized", mocker.Mock(return_value=True))
context_object = {}
mock_api_instance = entrypoint.workspace_api.WorkspaceApi.return_value
mock_api_instance.list_workspaces.return_value = mock.MagicMock(workspaces=[{"workspaceId": "expected_workspace_id"}])
entrypoint.octavia.add_command(dumb)
runner = CliRunner()
result = runner.invoke(entrypoint.octavia, ["--airbyte-url", "test-airbyte-url", "dumb"], obj=context_object)
entrypoint.get_api_client.assert_called()
entrypoint.get_workspace_id.assert_called_with(entrypoint.get_api_client.return_value, None)
expected_message = "🐙 - Octavia is targetting your Airbyte instance running at test-airbyte-url on workspace api-defined-workspace-id."
entrypoint.click.style.assert_called_with(expected_message, fg="green")
entrypoint.click.echo.assert_called_with(entrypoint.click.style.return_value)
assert context_object == {
"API_CLIENT": entrypoint.get_api_client.return_value,
"WORKSPACE_ID": entrypoint.get_workspace_id.return_value,
"PROJECT_IS_INITIALIZED": entrypoint.check_is_initialized.return_value,
}
assert result.exit_code == 0


def test_octavia_not_initialized(mocker):
mocker.patch.object(entrypoint, "click")
mocker.patch.object(entrypoint, "get_api_client")
mocker.patch.object(entrypoint, "get_workspace_id", mocker.Mock(return_value="api-defined-workspace-id"))
mocker.patch.object(entrypoint, "check_is_initialized", mocker.Mock(return_value=False))
context_object = {}
entrypoint.octavia.add_command(dumb)
runner = CliRunner()
result = runner.invoke(entrypoint.octavia, ["--airbyte-url", "test-airbyte-url", "dumb"], obj=context_object)
entrypoint.airbyte_api_client.Configuration.assert_called_with(host="test-airbyte-url/api")
entrypoint.airbyte_api_client.ApiClient.assert_called_with(entrypoint.airbyte_api_client.Configuration.return_value)
entrypoint.workspace_api.WorkspaceApi.assert_called_with(entrypoint.airbyte_api_client.ApiClient.return_value)
mock_api_instance.list_workspaces.assert_called_once()
assert context_object["API_CLIENT"] == entrypoint.airbyte_api_client.ApiClient.return_value
assert context_object["WORKSPACE_ID"] == "expected_workspace_id"
entrypoint.click.style.assert_called_with("🐙 - Project is not yet initialized.", fg="red", bold=True)
entrypoint.click.echo.assert_called_with(entrypoint.click.style.return_value)
assert result.exit_code == 0


def test_get_api_client(mocker):
mocker.patch.object(entrypoint, "airbyte_api_client")
mocker.patch.object(entrypoint, "check_api_health")
api_client = entrypoint.get_api_client("test-url")
entrypoint.airbyte_api_client.Configuration.assert_called_with(host="test-url/api")
entrypoint.airbyte_api_client.ApiClient.assert_called_with(entrypoint.airbyte_api_client.Configuration.return_value)
entrypoint.check_api_health.assert_called_with(entrypoint.airbyte_api_client.ApiClient.return_value)
assert api_client == entrypoint.airbyte_api_client.ApiClient.return_value


def test_get_workspace_id_user_defined(mocker):
mock_api_client = mocker.Mock()
mocker.patch.object(entrypoint, "check_workspace_exists")
mocker.patch.object(entrypoint, "workspace_api")
assert entrypoint.get_workspace_id(mock_api_client, "user-defined-workspace-id") == "user-defined-workspace-id"
entrypoint.check_workspace_exists.assert_called_with(mock_api_client, "user-defined-workspace-id")


def test_get_workspace_id_api_defined(mocker):
mock_api_client = mocker.Mock()
mocker.patch.object(entrypoint, "check_workspace_exists")
mocker.patch.object(entrypoint, "workspace_api")
mock_api_instance = entrypoint.workspace_api.WorkspaceApi.return_value
mock_api_instance.list_workspaces.return_value = mocker.Mock(workspaces=[{"workspaceId": "api-defined-workspace-id"}])
assert entrypoint.get_workspace_id(mock_api_client, None) == "api-defined-workspace-id"
entrypoint.workspace_api.WorkspaceApi.assert_called_with(mock_api_client)
mock_api_instance.list_workspaces.assert_called_with(_check_return_type=False)


def test_commands_in_octavia_group():
octavia_commands = entrypoint.octavia.commands.values()
for command in entrypoint.AVAILABLE_COMMANDS:
Expand All @@ -44,7 +87,7 @@ def test_commands_in_octavia_group():

@pytest.mark.parametrize(
"command",
[entrypoint.init, entrypoint.apply, entrypoint.create, entrypoint.delete, entrypoint._import],
[entrypoint.apply, entrypoint.create, entrypoint.delete, entrypoint._import],
)
def test_not_implemented_commands(command):
runner = CliRunner()
Expand All @@ -54,4 +97,4 @@ def test_not_implemented_commands(command):


def test_available_commands():
assert entrypoint.AVAILABLE_COMMANDS == [entrypoint.list_commands._list]
assert entrypoint.AVAILABLE_COMMANDS == [entrypoint.list_commands._list, entrypoint.init_commands.init]
3 changes: 3 additions & 0 deletions octavia-cli/unit_tests/test_init/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
Loading