Skip to content

Regression testing version comparison #35534

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from __future__ import annotations

from abc import ABC
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional

import docker # type: ignore
from dagger import Container, ExecError, Platform, QueryError
Expand All @@ -26,28 +26,40 @@ class BuildConnectorImagesBase(Step, ABC):
def title(self) -> str:
return f"Build {self.context.connector.technical_name} docker image for platform(s) {', '.join(self.build_platforms)}"

def __init__(self, context: ConnectorContext) -> None:
def __init__(self, context: ConnectorContext, docker_image_name: Optional[str] = None) -> None:
self.build_platforms = context.targeted_platforms
self.docker_image_name = docker_image_name
super().__init__(context)

async def _run(self, *args: Any) -> StepResult:
build_results_per_platform = {}
for platform in self.build_platforms:
try:
connector = await self._build_connector(platform, *args)
if self.docker_image_name:
self.logger.info(f"Pulling connector {self.docker_image_name}")
build_results_per_platform = {
platform: self.dagger_client.container(platform=platform).from_(self.docker_image_name)
for platform in self.build_platforms
}
success_message = (
f"The {self.docker_image_name} images "
f"were successfully pulled for platform(s) {', '.join(self.build_platforms)}"
)
else:
build_results_per_platform = {}
for platform in self.build_platforms:
try:
await connector.with_exec(["spec"])
except ExecError as e:
return StepResult(
step=self, status=StepStatus.FAILURE, stderr=str(e), stdout=f"Failed to run the spec command on the connector container for platform {platform}."
)
build_results_per_platform[platform] = connector
except QueryError as e:
return StepResult(step=self, status=StepStatus.FAILURE, stderr=f"Failed to build connector image for platform {platform}: {e}")
success_message = (
f"The {self.context.connector.technical_name} docker image "
f"was successfully built for platform(s) {', '.join(self.build_platforms)}"
)
connector = await self._build_connector(platform, *args)
try:
await connector.with_exec(["spec"])
except ExecError as e:
return StepResult(
step=self, status=StepStatus.FAILURE, stderr=str(e), stdout=f"Failed to run the spec command on the connector container for platform {platform}."
)
build_results_per_platform[platform] = connector
except QueryError as e:
return StepResult(step=self, status=StepStatus.FAILURE, stderr=f"Failed to build connector image for platform {platform}: {e}")
success_message = (
f"The {self.context.connector.technical_name} docker image "
f"was successfully built for platform(s) {', '.join(self.build_platforms)}"
)
return StepResult(step=self, status=StepStatus.SUCCESS, stdout=success_message, output_artifact=build_results_per_platform)

async def _build_connector(self, platform: Platform, *args: Any, **kwargs: Any) -> Container:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def should_use_remote_secrets(use_remote_secrets: Optional[bool]) -> bool:
"migrate_to_base_image": "pipelines.airbyte_ci.connectors.migrate_to_base_image.commands.migrate_to_base_image",
"upgrade_base_image": "pipelines.airbyte_ci.connectors.upgrade_base_image.commands.upgrade_base_image",
"upgrade_cdk": "pipelines.airbyte_ci.connectors.upgrade_cdk.commands.bump_version",
"regression_test": "pipelines.airbyte_ci.connectors.regression_test.commands.regression_test",
},
)
@click.option(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class CONNECTOR_TEST_STEP_ID(str, Enum):
VERSION_INC_CHECK = "version_inc_check"
TEST_ORCHESTRATOR = "test_orchestrator"
DEPLOY_ORCHESTRATOR = "deploy_orchestrator"
REGRESSION_TEST_BUILD_CONTROL = "regression_test_build_control"
REGRESSION_TEST_BUILD_TARGET = "regression_test_build_target"
REGRESSION_TEST_CONTROL = "regression_test_control"
REGRESSION_TEST_TARGET = "regression_test_target"

def __str__(self) -> str:
return self.value
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from datetime import datetime
from types import TracebackType
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Tuple

import yaml # type: ignore
from anyio import Path
Expand Down Expand Up @@ -68,6 +68,7 @@ def __init__(
concurrent_cat: Optional[bool] = False,
run_step_options: RunStepOptions = RunStepOptions(),
targeted_platforms: Sequence[Platform] = BUILD_PLATFORMS,
versions_to_test: Optional[Tuple[str, str]] = None,
) -> None:
"""Initialize a connector context.

Expand Down Expand Up @@ -95,6 +96,7 @@ def __init__(
s3_build_cache_secret_key (Optional[str], optional): Gradle S3 Build Cache credentials. Defaults to None.
concurrent_cat (bool, optional): Whether to run the CAT tests in parallel. Defaults to False.
targeted_platforms (Optional[Iterable[Platform]], optional): The platforms to build the connector image for. Defaults to BUILD_PLATFORMS.
versions_to_test (Optional[Tuple[str, str]]): The control and target version of the connector to be used in regression tests.
"""

self.pipeline_name = pipeline_name
Expand All @@ -116,6 +118,7 @@ def __init__(
self.concurrent_cat = concurrent_cat
self._connector_secrets: Optional[Dict[str, Secret]] = None
self.targeted_platforms = targeted_platforms
self.versions_to_test = versions_to_test

super().__init__(
pipeline_name=pipeline_name,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Dict, List

import asyncclick as click
from pipelines import main_logger
from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID
from pipelines.airbyte_ci.connectors.context import ConnectorContext
from pipelines.airbyte_ci.connectors.pipeline import run_connectors_pipelines
from pipelines.airbyte_ci.connectors.regression_test.pipeline import run_connector_regression_test_pipeline
from pipelines.cli.click_decorators import click_ci_requirements_option
from pipelines.cli.dagger_pipeline_command import DaggerPipelineCommand
from pipelines.consts import LOCAL_BUILD_PLATFORM, ContextState
from pipelines.helpers.execution import argument_parsing
from pipelines.helpers.execution.run_steps import RunStepOptions
from pipelines.helpers.github import update_global_commit_status_check_for_tests
from pipelines.helpers.utils import fail_if_missing_docker_hub_creds
from pipelines.models.steps import STEP_PARAMS


@click.command(
cls=DaggerPipelineCommand,
help="Test all the selected connectors.",
context_settings=dict(
ignore_unknown_options=True,
),
)
@click_ci_requirements_option()
@click.option(
"--control-version",
help=(
"Control version of the connector to be tested. Records will be downloaded from this container and used as expected records for the target version."
),
default="latest",
type=str,
)
@click.option(
"--target-version",
help=("Target version of the connector being tested."),
default="dev",
type=str,
)
@click.option(
"--fail-fast",
help="When enabled, tests will fail fast.",
default=False,
type=bool,
is_flag=True,
)
@click.pass_context
async def regression_test(ctx: click.Context, control_version: str, target_version: str, fail_fast: bool) -> bool:
"""
Runs a regression test pipeline for the selected connectors.
"""
if ctx.obj["is_ci"]:
fail_if_missing_docker_hub_creds(ctx)

if ctx.obj["selected_connectors_with_modified_files"]:
update_global_commit_status_check_for_tests(ctx.obj, "pending")
else:
main_logger.warn("No connector were selected for testing.")
update_global_commit_status_check_for_tests(ctx.obj, "success")
return True

run_step_options = RunStepOptions(fail_fast=fail_fast)
connectors_tests_contexts = []
for connector in ctx.obj["selected_connectors_with_modified_files"]:
if control_version in ("dev", "latest"):
control_version = f"airbyte/{connector.technical_name}:{control_version}"

if target_version in ("dev", "latest"):
target_version = f"airbyte/{connector.technical_name}:{target_version}"

connectors_tests_contexts.append(
ConnectorContext(
pipeline_name=f"Testing connector {connector.technical_name}",
connector=connector,
is_local=ctx.obj["is_local"],
git_branch=ctx.obj["git_branch"],
git_revision=ctx.obj["git_revision"],
ci_report_bucket=ctx.obj["ci_report_bucket_name"],
report_output_prefix=ctx.obj["report_output_prefix"],
use_remote_secrets=ctx.obj["use_remote_secrets"],
gha_workflow_run_url=ctx.obj.get("gha_workflow_run_url"),
dagger_logs_url=ctx.obj.get("dagger_logs_url"),
pipeline_start_timestamp=ctx.obj.get("pipeline_start_timestamp"),
ci_context=ctx.obj.get("ci_context"),
pull_request=ctx.obj.get("pull_request"),
ci_gcs_credentials=ctx.obj["ci_gcs_credentials"],
use_local_cdk=ctx.obj.get("use_local_cdk"),
s3_build_cache_access_key_id=ctx.obj.get("s3_build_cache_access_key_id"),
s3_build_cache_secret_key=ctx.obj.get("s3_build_cache_secret_key"),
docker_hub_username=ctx.obj.get("docker_hub_username"),
docker_hub_password=ctx.obj.get("docker_hub_password"),
run_step_options=run_step_options,
targeted_platforms=[LOCAL_BUILD_PLATFORM],
versions_to_test=(control_version, target_version),
)
)

try:
await run_connectors_pipelines(
[connector_context for connector_context in connectors_tests_contexts],
run_connector_regression_test_pipeline,
"Regression Test Pipeline",
ctx.obj["concurrency"],
ctx.obj["dagger_logs_path"],
ctx.obj["execute_timeout"],
)
except Exception as e:
main_logger.error("An error occurred while running the regression test pipeline", exc_info=e)
return False

return True
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

"""This module groups factory like functions to dispatch tests steps according to the connector under test language."""

from __future__ import annotations

from typing import TYPE_CHECKING

import anyio
from connector_ops.utils import ConnectorLanguage # type: ignore
from pipelines.airbyte_ci.connectors.context import ConnectorContext
from pipelines.airbyte_ci.connectors.regression_test.steps.regression_cats import get_test_steps
from pipelines.airbyte_ci.connectors.reports import ConnectorReport
from pipelines.helpers.execution.run_steps import StepToRun, run_steps

if TYPE_CHECKING:

from pipelines.helpers.execution.run_steps import STEP_TREE


async def run_connector_regression_test_pipeline(context: ConnectorContext, semaphore: anyio.Semaphore) -> ConnectorReport:
"""
Compute the steps to run for a connector test pipeline.
"""
steps_to_run: STEP_TREE = get_test_steps(context)

async with semaphore:
async with context:
result_dict = await run_steps(
runnables=steps_to_run,
options=context.run_step_options,
)

results = list(result_dict.values())
report = ConnectorReport(context, steps_results=results, name="TEST RESULTS")
context.report = report

return report
Loading