Skip to content

airbyte-ci: introduce ConnectorTestContext #38628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/workflows/publish_connectors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ on:
default: "--pre-release"
airbyte_ci_binary_url:
description: "URL to the airbyte-ci binary to use for the action. If not provided, the action will use the latest release of airbyte-ci."
# Pinning to 4.15.0 as 4.15.1 has a bug:
# https://github.com/airbytehq/airbyte/actions/runs/9211856191/job/25342184646
default: "https://connectors.airbyte.com/airbyte-ci/releases/ubuntu/4.15.0/airbyte-ci"
default: "https://connectors.airbyte.com/airbyte-ci/releases/ubuntu/latest/airbyte-ci"
jobs:
publish_connectors:
name: Publish connectors
Expand Down
3 changes: 2 additions & 1 deletion airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,8 @@ E.G.: running Poe tasks on the modified internal packages of the current branch:

| Version | PR | Description |
| ------- | ---------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------- |
| 4.15.1 | [#38615](https://github.com/airbytehq/airbyte/pull/38615) | Do not eagerly fetch connector secrets. |
| 4.15.2 | [#38628](https://github.com/airbytehq/airbyte/pull/38628) | Introduce ConnectorTestContext to avoid trying fetching connector secret in the PublishContext. |
| 4.15.1 | [#38615](https://github.com/airbytehq/airbyte/pull/38615) | Do not eagerly fetch connector secrets. |
| 4.15.0 | [#38322](https://github.com/airbytehq/airbyte/pull/38322) | Introduce a SecretStore abstraction to fetch connector secrets from metadata files. |
| 4.14.1 | [#38582](https://github.com/airbytehq/airbyte/pull/38582) | Fixed bugs in `up_to_date` flags, `pull_request` version change logic. |
| 4.14.0 | [#38281](https://github.com/airbytehq/airbyte/pull/38281) | Conditionally run test suites according to `connectorTestSuitesOptions` in metadata files. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from __future__ import annotations

from copy import deepcopy
from datetime import datetime
from pathlib import Path
from types import TracebackType
Expand All @@ -16,7 +15,6 @@
from asyncer import asyncify
from dagger import Directory, Platform
from github import PullRequest
from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID
from pipelines.airbyte_ci.connectors.reports import ConnectorReport
from pipelines.consts import BUILD_PLATFORMS
from pipelines.dagger.actions import secrets
Expand All @@ -26,19 +24,11 @@
from pipelines.helpers.slack import send_message_to_webhook
from pipelines.helpers.utils import METADATA_FILE_NAME
from pipelines.models.contexts.pipeline_context import PipelineContext
from pipelines.models.secrets import LocalDirectorySecretStore, Secret, SecretNotFoundError, SecretStore
from pydash import find # type: ignore
from pipelines.models.secrets import LocalDirectorySecretStore, Secret, SecretStore

if TYPE_CHECKING:
from logging import Logger
from pathlib import Path as NativePath
from typing import Dict, FrozenSet, List, Optional, Sequence
# These test suite names are declared in metadata.yaml files
TEST_SUITE_NAME_TO_STEP_ID = {
"unitTests": CONNECTOR_TEST_STEP_ID.UNIT,
"integrationTests": CONNECTOR_TEST_STEP_ID.INTEGRATION,
"acceptanceTests": CONNECTOR_TEST_STEP_ID.ACCEPTANCE,
}


class ConnectorContext(PipelineContext):
Expand Down Expand Up @@ -147,11 +137,10 @@ def __init__(
ci_gcp_credentials=ci_gcp_credentials,
ci_git_user=ci_git_user,
ci_github_access_token=ci_github_access_token,
run_step_options=self._skip_metadata_disabled_test_suites(run_step_options),
run_step_options=run_step_options,
enable_report_auto_open=enable_report_auto_open,
secret_stores=secret_stores,
)
self.step_id_to_secrets_mapping = self._get_step_id_to_secret_mapping()

@property
def modified_files(self) -> FrozenSet[NativePath]:
Expand Down Expand Up @@ -233,124 +222,6 @@ async def get_connector_dir(self, exclude: Optional[List[str]] = None, include:
vanilla_connector_dir = self.get_repo_dir(str(self.connector.code_directory), exclude=exclude, include=include)
return await vanilla_connector_dir.with_timestamps(1)

@staticmethod
def _handle_missing_secret_store(
secret_info: Dict[str, str | Dict[str, str]], raise_on_missing: bool, logger: Optional[Logger] = None
) -> None:
assert isinstance(secret_info["secretStore"], dict), "The secretStore field must be a dict"
message = f"Secret {secret_info['name']} can't be retrieved as {secret_info['secretStore']['alias']} is not available"
if raise_on_missing:
raise SecretNotFoundError(message)
if logger is not None:
logger.warn(message)

@staticmethod
def _process_secret(
secret_info: Dict[str, str | Dict[str, str]],
secret_stores: Dict[str, SecretStore],
raise_on_missing: bool,
logger: Optional[Logger] = None,
) -> Optional[Secret]:
assert isinstance(secret_info["secretStore"], dict), "The secretStore field must be a dict"
secret_store_alias = secret_info["secretStore"]["alias"]
if secret_store_alias not in secret_stores:
ConnectorContext._handle_missing_secret_store(secret_info, raise_on_missing, logger)
return None
else:
# All these asserts and casting are there to make MyPy happy
# The dict structure being nested MyPy can't figure if the values are str or dict
assert isinstance(secret_info["name"], str), "The secret name field must be a string"
if file_name := secret_info.get("fileName"):
assert isinstance(secret_info["fileName"], str), "The secret fileName must be a string"
file_name = str(secret_info["fileName"])
else:
file_name = None
return Secret(secret_info["name"], secret_stores[secret_store_alias], file_name=file_name)

@staticmethod
def get_secrets_from_connector_test_suites_option(
connector_test_suites_options: List[Dict[str, str | Dict[str, List[Dict[str, str | Dict[str, str]]]]]],
suite_name: str,
secret_stores: Dict[str, SecretStore],
raise_on_missing_secret_store: bool = True,
logger: Logger | None = None,
) -> List[Secret]:
"""Get secrets declared in metadata connectorTestSuitesOptions for a test suite name.
It will use the secret store alias declared in connectorTestSuitesOptions.
If the secret store is not available a warning or and error could be raised according to the raise_on_missing_secret_store parameter value.
We usually want to raise an error when running in CI context and log a warning when running locally, as locally we can fallback on local secrets.

Args:
connector_test_suites_options (List[Dict[str, str | Dict]]): The connector under test test suite options
suite_name (str): The test suite name
secret_stores (Dict[str, SecretStore]): The available secrets stores
raise_on_missing_secret_store (bool, optional): Raise an error if the secret store declared in the connectorTestSuitesOptions is not available. Defaults to True.
logger (Logger | None, optional): Logger to log a warning if the secret store declared in the connectorTestSuitesOptions is not available. Defaults to None.

Raises:
SecretNotFoundError: Raised if the secret store declared in the connectorTestSuitesOptions is not available and raise_on_missing_secret_store is truthy.

Returns:
List[Secret]: List of secrets declared in the connectorTestSuitesOptions for a test suite name.
"""
secrets: List[Secret] = []
enabled_test_suite = find(connector_test_suites_options, lambda x: x["suite"] == suite_name)

if enabled_test_suite and "testSecrets" in enabled_test_suite:
for secret_info in enabled_test_suite["testSecrets"]:
if secret := ConnectorContext._process_secret(secret_info, secret_stores, raise_on_missing_secret_store, logger):
secrets.append(secret)

return secrets

def get_connector_secrets_for_test_suite(
self, test_suite_name: str, connector_test_suites_options: List, local_secrets: List[Secret]
) -> List[Secret]:
"""Get secrets to use for a test suite.
Always merge secrets declared in metadata's connectorTestSuiteOptions with secrets declared locally.

Args:
test_suite_name (str): Name of the test suite to get secrets for
context (ConnectorContext): The current connector context
connector_test_suites_options (Dict): The current connector test suite options (from metadata)
local_secrets (List[Secret]): The local connector secrets.

Returns:
List[Secret]: Secrets to use to run the passed test suite name.
"""
return (
self.get_secrets_from_connector_test_suites_option(
connector_test_suites_options,
test_suite_name,
self.secret_stores,
raise_on_missing_secret_store=self.is_ci,
logger=self.logger,
)
+ local_secrets
)

def _get_step_id_to_secret_mapping(self) -> Dict[CONNECTOR_TEST_STEP_ID, List[Secret]]:
step_id_to_secrets: Dict[CONNECTOR_TEST_STEP_ID, List[Secret]] = {
CONNECTOR_TEST_STEP_ID.UNIT: [],
CONNECTOR_TEST_STEP_ID.INTEGRATION: [],
CONNECTOR_TEST_STEP_ID.ACCEPTANCE: [],
}
local_secrets = self.local_secret_store.get_all_secrets() if self.local_secret_store else []
connector_test_suites_options = self.metadata.get("connectorTestSuitesOptions", [])

keep_steps = set(self.run_step_options.keep_steps or [])
skip_steps = set(self.run_step_options.skip_steps or [])

for test_suite_name, step_id in TEST_SUITE_NAME_TO_STEP_ID.items():
if step_id in keep_steps or (not keep_steps and step_id not in skip_steps):
step_id_to_secrets[step_id] = self.get_connector_secrets_for_test_suite(
test_suite_name, connector_test_suites_options, local_secrets
)
return step_id_to_secrets

def get_secrets_for_step_id(self, step_id: CONNECTOR_TEST_STEP_ID) -> List[Secret]:
return self.step_id_to_secrets_mapping.get(step_id, [])

async def __aexit__(
self, exception_type: Optional[type[BaseException]], exception_value: Optional[BaseException], traceback: Optional[TracebackType]
) -> bool:
Expand Down Expand Up @@ -395,30 +266,3 @@ async def __aexit__(

def create_slack_message(self) -> str:
raise NotImplementedError

def _get_step_id_to_skip_according_to_metadata(self) -> List[CONNECTOR_TEST_STEP_ID]:
"""The connector metadata have a connectorTestSuitesOptions field.
It allows connector developers to declare the test suites that are enabled for a connector.
This function retrieved enabled test suites according to this field value and returns the test suites steps that are skipped (because they're not declared in this field.)
The skippable test suites steps are declared in TEST_SUITE_NAME_TO_STEP_ID.

Returns:
List[CONNECTOR_TEST_STEP_ID]: List of step ids that should be skipped according to connector metadata.
"""
enabled_test_suites = [option["suite"] for option in self.metadata.get("connectorTestSuitesOptions", [])]
return [step_id for test_suite_name, step_id in TEST_SUITE_NAME_TO_STEP_ID.items() if test_suite_name not in enabled_test_suites]

def _skip_metadata_disabled_test_suites(self, run_step_options: RunStepOptions) -> RunStepOptions:
"""Updated the original run_step_options to skip the disabled test suites according to connector metadata.

Args:
run_step_options (RunStepOptions): Original run step options.

Returns:
RunStepOptions: Updated run step options.
"""
run_step_options = deepcopy(run_step_options)
# If any `skip_steps` are present, we will run everything except the skipped steps, instead of just `keep_steps`.
if not run_step_options.keep_steps:
run_step_options.skip_steps += self._get_step_id_to_skip_according_to_metadata()
return run_step_options
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@

import sys
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Union
from typing import TYPE_CHECKING, Any, Callable, List, Optional

import anyio
import dagger
from connector_ops.utils import ConnectorLanguage # type: ignore
from dagger import Config
from pipelines.airbyte_ci.connectors.context import ConnectorContext
from pipelines.airbyte_ci.connectors.publish.context import PublishConnectorContext
from pipelines.airbyte_ci.connectors.test.context import ConnectorTestContext
from pipelines.airbyte_ci.steps.no_op import NoOpStep
from pipelines.consts import ContextState
from pipelines.dagger.actions.system import docker
Expand Down Expand Up @@ -51,7 +52,8 @@ async def context_to_step_result(context: PipelineContext) -> StepResult:
# HACK: This is to avoid wrapping the whole pipeline in a dagger pipeline to avoid instability just prior to launch
# TODO (ben): Refactor run_connectors_pipelines to wrap the whole pipeline in a dagger pipeline once Steps are refactored
async def run_report_complete_pipeline(
dagger_client: dagger.Client, contexts: List[ConnectorContext] | List[PublishConnectorContext] | List[PipelineContext]
dagger_client: dagger.Client,
contexts: List[ConnectorContext] | List[PublishConnectorContext] | List[PipelineContext] | List[ConnectorTestContext],
) -> None:
"""Create and Save a report representing the run of the encompassing pipeline.

Expand Down Expand Up @@ -81,14 +83,14 @@ async def run_report_complete_pipeline(


async def run_connectors_pipelines(
contexts: Union[List[ConnectorContext], List[PublishConnectorContext]],
contexts: List[ConnectorContext] | List[PublishConnectorContext] | List[ConnectorTestContext],
connector_pipeline: Callable,
pipeline_name: str,
concurrency: int,
dagger_logs_path: Optional[Path],
execute_timeout: Optional[int],
*args: Any,
) -> List[ConnectorContext] | List[PublishConnectorContext]:
) -> List[ConnectorContext] | List[PublishConnectorContext] | List[ConnectorTestContext]:
"""Run a connector pipeline for all the connector contexts."""

default_connectors_semaphore = anyio.Semaphore(concurrency)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import asyncclick as click
from pipelines import main_logger
from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID
from pipelines.airbyte_ci.connectors.context import ConnectorContext
from pipelines.airbyte_ci.connectors.pipeline import run_connectors_pipelines
from pipelines.airbyte_ci.connectors.test.context import ConnectorTestContext
from pipelines.airbyte_ci.connectors.test.pipeline import run_connector_test_pipeline
from pipelines.airbyte_ci.connectors.test.steps.common import RegressionTests
from pipelines.cli.click_decorators import click_ci_requirements_option
Expand Down Expand Up @@ -134,7 +134,7 @@ async def test(
)

connectors_tests_contexts = [
ConnectorContext(
ConnectorTestContext(
pipeline_name=f"{global_status_check_context} on {connector.technical_name}",
connector=connector,
is_local=ctx.obj["is_local"],
Expand Down
Loading
Loading