diff --git a/airbyte-ci/connectors/pipelines/pipelines/__init__.py b/airbyte-ci/connectors/pipelines/pipelines/__init__.py index d94a8ee9807a6..4b1a6ecc74dd0 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/__init__.py +++ b/airbyte-ci/connectors/pipelines/pipelines/__init__.py @@ -17,7 +17,7 @@ logging.getLogger("httpx").setLevel(logging.WARNING) # RichHandler does not work great in the CI environment, so we use a StreamHandler instead -logging_handler: Union[RichHandler, logging.StreamHandler] = RichHandler(rich_tracebacks=True) if not "CI" in os.environ else logging.StreamHandler() +logging_handler: Union[RichHandler, logging.StreamHandler] = RichHandler(rich_tracebacks=True) if "CI" not in os.environ else logging.StreamHandler() logging.basicConfig( diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/__init__.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/__init__.py index c075f982c7bd5..7a47568639a6a 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/__init__.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/__init__.py @@ -52,5 +52,6 @@ async def run_connector_build_pipeline(context: ConnectorContext, semaphore: any if context.is_local and build_result.status is StepStatus.SUCCESS: load_image_result = await LoadContainerToLocalDockerHost(context, per_platform_built_containers, image_tag).run() step_results.append(load_image_result) - context.report = ConnectorReport(context, step_results, name="BUILD RESULTS") - return context.report + report = ConnectorReport(context, step_results, name="BUILD RESULTS") + context.report = report + return report diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/common.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/common.py index 4f507690a6a8c..658eee031e5b7 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/common.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/common.py @@ -4,7 +4,6 @@ from __future__ import annotations from abc import ABC -from typing import List, Optional, Tuple import docker # type: ignore from dagger import Container, ExecError, Platform, QueryError @@ -21,7 +20,7 @@ class BuildConnectorImagesBase(Step, ABC): context: ConnectorContext @property - def title(self): + def title(self) -> str: return f"Build {self.context.connector.technical_name} docker image for platform(s) {', '.join(self.build_platforms)}" def __init__(self, context: ConnectorContext) -> None: @@ -60,12 +59,12 @@ async def _build_connector(self, platform: Platform, *args, **kwargs) -> Contain class LoadContainerToLocalDockerHost(Step): context: ConnectorContext - def __init__(self, context: ConnectorContext, containers: dict[Platform, Container], image_tag: Optional[str] = "dev") -> None: + def __init__(self, context: ConnectorContext, containers: dict[Platform, Container], image_tag: str = "dev") -> None: super().__init__(context) self.image_tag = image_tag self.containers = containers - def _generate_dev_tag(self, platform: Platform, multi_platforms: bool): + def _generate_dev_tag(self, platform: Platform, multi_platforms: bool) -> str: """ When building for multiple platforms, we need to tag the image with the platform name. There's no way to locally build a multi-arch image, so we need to tag the image with the platform name when the user passed multiple architecture options. @@ -73,7 +72,7 @@ def _generate_dev_tag(self, platform: Platform, multi_platforms: bool): return f"{self.image_tag}-{platform.replace('/', '-')}" if multi_platforms else self.image_tag @property - def title(self): + def title(self) -> str: return f"Load {self.image_name}:{self.image_tag} to the local docker host." @property diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/normalization.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/normalization.py index 08d81bb70092b..4774c4fe78f30 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/normalization.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/normalization.py @@ -26,7 +26,10 @@ def __init__(self, context: ConnectorContext, normalization_image: str, build_pl self.build_platform = build_platform self.use_dev_normalization = normalization_image.endswith(":dev") self.normalization_image = normalization_image - self.title = f"Build {self.normalization_image}" if self.use_dev_normalization else f"Pull {self.normalization_image}" + + @property + def title(self) -> str: + return f"Build {self.normalization_image}" if self.use_dev_normalization else f"Pull {self.normalization_image}" async def _run(self) -> StepResult: if self.use_dev_normalization: diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/python_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/python_connectors.py index 2d125d56b0931..b180b16a115df 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/python_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/build_image/steps/python_connectors.py @@ -75,10 +75,10 @@ async def _build_from_base_image(self, platform: Platform) -> Container: # copy python dependencies from builder to connector container customized_base.with_directory("/usr/local", builder.directory("/usr/local")) .with_workdir(self.PATH_TO_INTEGRATION_CODE) - .with_file(main_file_name, (await self.context.get_connector_dir(include=main_file_name)).file(main_file_name)) + .with_file(main_file_name, (await self.context.get_connector_dir(include=[main_file_name])).file(main_file_name)) .with_directory( connector_snake_case_name, - (await self.context.get_connector_dir(include=connector_snake_case_name)).directory(connector_snake_case_name), + (await self.context.get_connector_dir(include=[connector_snake_case_name])).directory(connector_snake_case_name), ) .with_env_variable("AIRBYTE_ENTRYPOINT", " ".join(entrypoint)) .with_entrypoint(entrypoint) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/bump_version/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/bump_version/pipeline.py index a1d72d089903d..004c0328dff08 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/bump_version/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/bump_version/pipeline.py @@ -1,9 +1,9 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - import datetime from copy import deepcopy +from typing import TYPE_CHECKING import semver from dagger import Container, Directory @@ -13,6 +13,9 @@ from pipelines.helpers.connectors import metadata_change_helpers from pipelines.models.steps import Step, StepResult, StepStatus +if TYPE_CHECKING: + from anyio import Semaphore + def get_bumped_version(version: str, bump_type: str) -> str: current_version = semver.VersionInfo.parse(version) @@ -38,7 +41,7 @@ def __init__( new_version: str, changelog_entry: str, pull_request_number: str, - ): + ) -> None: super().__init__(context) self.repo_dir = repo_dir self.new_version = new_version @@ -71,14 +74,14 @@ async def _run(self) -> StepResult: output_artifact=updated_repo_dir, ) - def find_line_index_for_new_entry(self, markdown_text) -> int: + def find_line_index_for_new_entry(self, markdown_text: str) -> int: lines = markdown_text.splitlines() for line_index, line in enumerate(lines): if "version" in line.lower() and "date" in line.lower() and "pull request" in line.lower() and "subject" in line.lower(): return line_index + 2 raise Exception("Could not find the changelog section table in the documentation file.") - def add_changelog_entry(self, og_doc_content) -> str: + def add_changelog_entry(self, og_doc_content: str) -> str: today = datetime.date.today().strftime("%Y-%m-%d") lines = og_doc_content.splitlines() line_index_for_new_entry = self.find_line_index_for_new_entry(og_doc_content) @@ -96,7 +99,7 @@ def __init__( context: ConnectorContext, repo_dir: Directory, new_version: str, - ): + ) -> None: super().__init__(context) self.repo_dir = repo_dir self.new_version = new_version @@ -136,7 +139,7 @@ async def _run(self) -> StepResult: async def run_connector_version_bump_pipeline( context: ConnectorContext, - semaphore, + semaphore: Semaphore, bump_type: str, changelog_entry: str, pull_request_number: str, @@ -174,5 +177,6 @@ async def run_connector_version_bump_pipeline( steps_results.append(add_changelog_entry_result) final_repo_dir = add_changelog_entry_result.output_artifact await og_repo_dir.diff(final_repo_dir).export(str(git.get_git_repo_path())) - context.report = ConnectorReport(context, steps_results, name="CONNECTOR VERSION BUMP RESULTS") - return context.report + report = ConnectorReport(context, steps_results, name="CONNECTOR VERSION BUMP RESULTS") + context.report = report + return report diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/commands.py index 03b66e5bc285a..e3f6a5e0922c6 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/commands.py @@ -93,7 +93,7 @@ def get_selected_connectors_with_modified_files( return selected_connectors_with_modified_files -def validate_environment(is_local: bool): +def validate_environment(is_local: bool) -> None: """Check if the required environment variables exist.""" if is_local: if not Path(".git").is_dir(): @@ -236,7 +236,7 @@ def should_use_remote_secrets(use_remote_secrets: Optional[bool]) -> bool: @click_ignore_unused_kwargs async def connectors( ctx: click.Context, -): +) -> None: """Group all the connectors-ci command.""" validate_environment(ctx.obj["is_local"]) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py index f27a7b1ef6521..725e32df6a738 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py @@ -6,7 +6,7 @@ from datetime import datetime from types import TracebackType -from typing import Dict, Optional, Sequence +from typing import TYPE_CHECKING import yaml # type: ignore from anyio import Path @@ -23,6 +23,10 @@ from pipelines.helpers.utils import METADATA_FILE_NAME from pipelines.models.contexts.pipeline_context import PipelineContext +if TYPE_CHECKING: + from pathlib import Path as NativePath + from typing import Dict, FrozenSet, List, Optional, Sequence + class ConnectorContext(PipelineContext): """The connector context is used to store configuration for a specific connector pipeline run.""" @@ -62,7 +66,7 @@ def __init__( concurrent_cat: Optional[bool] = False, run_step_options: RunStepOptions = RunStepOptions(), targeted_platforms: Sequence[Platform] = BUILD_PLATFORMS, - ): + ) -> None: """Initialize a connector context. Args: @@ -145,31 +149,31 @@ def s3_build_cache_secret_key_secret(self) -> Optional[Secret]: return None @property - def modified_files(self): + def modified_files(self) -> FrozenSet[NativePath]: return self.connector.modified_files @property - def secrets_dir(self) -> Optional[Directory]: # noqa D102 + def secrets_dir(self) -> Optional[Directory]: return self._secrets_dir @secrets_dir.setter - def secrets_dir(self, secrets_dir: Directory): # noqa D102 + def secrets_dir(self, secrets_dir: Directory) -> None: self._secrets_dir = secrets_dir @property - def updated_secrets_dir(self) -> Optional[Directory]: # noqa D102 + def updated_secrets_dir(self) -> Optional[Directory]: return self._updated_secrets_dir @updated_secrets_dir.setter - def updated_secrets_dir(self, updated_secrets_dir: Directory): # noqa D102 + def updated_secrets_dir(self, updated_secrets_dir: Directory) -> None: self._updated_secrets_dir = updated_secrets_dir @property - def connector_acceptance_test_source_dir(self) -> Directory: # noqa D102 + def connector_acceptance_test_source_dir(self) -> Directory: return self.get_repo_dir("airbyte-integrations/bases/connector-acceptance-test") @property - def should_save_updated_secrets(self) -> bool: # noqa D102 + def should_save_updated_secrets(self) -> bool: return self.use_remote_secrets and self.updated_secrets_dir is not None @property @@ -208,12 +212,12 @@ def docker_hub_password_secret(self) -> Optional[Secret]: return None return self.dagger_client.set_secret("docker_hub_password", self.docker_hub_password) - async def get_connector_secrets(self): + async def get_connector_secrets(self) -> Dict[str, Secret]: if self._connector_secrets is None: self._connector_secrets = await secrets.get_connector_secrets(self) return self._connector_secrets - async def get_connector_dir(self, exclude=None, include=None) -> Directory: + async def get_connector_dir(self, exclude: Optional[List[str]] = None, include: Optional[List[str]] = None) -> Directory: """Get the connector under test source code directory. Args: diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/list/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/list/commands.py index 5101d13064ea1..5c5b97ca5621b 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/list/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/list/commands.py @@ -13,7 +13,7 @@ @click.pass_context async def list_connectors( ctx: click.Context, -): +) -> bool: selected_connectors = sorted(ctx.obj["selected_connectors_with_modified_files"], key=lambda x: x.technical_name) table = Table(title=f"{len(selected_connectors)} selected connectors") table.add_column("Modified") diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/migrate_to_base_image/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/migrate_to_base_image/pipeline.py index 13da0de583c51..57253209f1cfc 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/migrate_to_base_image/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/migrate_to_base_image/pipeline.py @@ -4,7 +4,7 @@ import textwrap from copy import deepcopy -from typing import Optional +from typing import TYPE_CHECKING from base_images import version_registry # type: ignore from connector_ops.utils import ConnectorLanguage # type: ignore @@ -17,6 +17,11 @@ from pipelines.helpers.connectors import metadata_change_helpers from pipelines.models.steps import Step, StepResult, StepStatus +if TYPE_CHECKING: + from typing import Optional + + from anyio import Semaphore + class UpgradeBaseImageMetadata(Step): context: ConnectorContext @@ -28,7 +33,7 @@ def __init__( context: ConnectorContext, repo_dir: Directory, set_if_not_exists: bool = True, - ): + ) -> None: super().__init__(context) self.repo_dir = repo_dir self.set_if_not_exists = set_if_not_exists @@ -99,12 +104,12 @@ def __init__( self, context: ConnectorContext, file_to_delete: str, - ): + ) -> None: super().__init__(context) self.file_to_delete = file_to_delete @property - def title(self): + def title(self) -> str: return f"Delete {self.file_to_delete}" async def _run(self) -> StepResult: @@ -161,7 +166,7 @@ async def _run(self) -> StepResult: output_artifact=updated_repo_dir, ) - def add_build_instructions(self, og_doc_content) -> str: + def add_build_instructions(self, og_doc_content: str) -> str: build_instructions_template = Template( textwrap.dedent( @@ -258,7 +263,7 @@ async def post_connector_install(connector_container: Container) -> Container: return new_doc -async def run_connector_base_image_upgrade_pipeline(context: ConnectorContext, semaphore, set_if_not_exists: bool) -> Report: +async def run_connector_base_image_upgrade_pipeline(context: ConnectorContext, semaphore: Semaphore, set_if_not_exists: bool) -> Report: """Run a pipeline to upgrade for a single connector to use our base image.""" async with semaphore: steps_results = [] @@ -273,11 +278,14 @@ async def run_connector_base_image_upgrade_pipeline(context: ConnectorContext, s steps_results.append(update_base_image_in_metadata_result) final_repo_dir = update_base_image_in_metadata_result.output_artifact await og_repo_dir.diff(final_repo_dir).export(str(git.get_git_repo_path())) - context.report = ConnectorReport(context, steps_results, name="BASE IMAGE UPGRADE RESULTS") - return context.report + report = ConnectorReport(context, steps_results, name="BASE IMAGE UPGRADE RESULTS") + context.report = report + return report -async def run_connector_migration_to_base_image_pipeline(context: ConnectorContext, semaphore, pull_request_number: str): +async def run_connector_migration_to_base_image_pipeline( + context: ConnectorContext, semaphore: Semaphore, pull_request_number: str +) -> Report: async with semaphore: steps_results = [] async with context: @@ -344,6 +352,6 @@ async def run_connector_migration_to_base_image_pipeline(context: ConnectorConte # EXPORT MODIFIED FILES BACK TO HOST final_repo_dir = add_build_instructions_to_doc_results.output_artifact await og_repo_dir.diff(final_repo_dir).export(str(git.get_git_repo_path())) - - context.report = ConnectorReport(context, steps_results, name="MIGRATE TO BASE IMAGE RESULTS") - return context.report + report = ConnectorReport(context, steps_results, name="MIGRATE TO BASE IMAGE RESULTS") + context.report = report + return report diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/pipeline.py index 6bb350052d44e..ee3b14215e33a 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/pipeline.py @@ -52,14 +52,14 @@ async def context_to_step_result(context: PipelineContext) -> StepResult: # TODO (ben): Refactor run_connectors_pipelines to wrap the whole pipeline in a dagger pipeline once Steps are refactored async def run_report_complete_pipeline( dagger_client: dagger.Client, contexts: List[ConnectorContext] | List[PublishConnectorContext] | List[PipelineContext] -): +) -> None: """Create and Save a report representing the run of the encompassing pipeline. This is to denote when the pipeline is complete, useful for long running pipelines like nightlies. """ if not contexts: - return [] + return # Repurpose the first context to be the pipeline upload context to preserve timestamps first_connector_context = contexts[0] diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/commands.py index ce8d58f7c2eff..e57b930304b95 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/commands.py @@ -2,7 +2,6 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from typing import List import asyncclick as click from pipelines import main_logger @@ -68,7 +67,7 @@ async def publish( metadata_service_gcs_credentials: str, slack_webhook: str, slack_channel: str, -): +) -> bool: ctx.obj["spec_cache_gcs_credentials"] = spec_cache_gcs_credentials ctx.obj["spec_cache_bucket_name"] = spec_cache_bucket_name ctx.obj["metadata_service_bucket_name"] = metadata_service_bucket_name diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/context.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/context.py index 78cf11ced4e30..f1681cda15936 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/context.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/context.py @@ -46,7 +46,7 @@ def __init__( s3_build_cache_access_key_id: Optional[str] = None, s3_build_cache_secret_key: Optional[str] = None, use_local_cdk: bool = False, - ): + ) -> None: self.pre_release = pre_release self.spec_cache_bucket_name = spec_cache_bucket_name self.metadata_bucket_name = metadata_bucket_name @@ -90,7 +90,7 @@ def spec_cache_gcs_credentials_secret(self) -> Secret: return self.dagger_client.set_secret("spec_cache_gcs_credentials", self.spec_cache_gcs_credentials) @property - def docker_image_tag(self): + def docker_image_tag(self) -> str: # get the docker image tag from the parent class metadata_tag = super().docker_image_tag if self.pre_release: @@ -99,6 +99,8 @@ def docker_image_tag(self): return metadata_tag def create_slack_message(self) -> str: + assert self.report and self.report.run_duration is not None, "The report must be set to create a slack message." + docker_hub_url = f"https://hub.docker.com/r/{self.connector.metadata['dockerRepository']}/tags" message = f"*Publish <{docker_hub_url}|{self.docker_image}>*\n" if self.is_ci: diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/pipeline.py index d038f46ba6cb2..ba61a521ec66c 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/publish/pipeline.py @@ -57,7 +57,7 @@ class PushConnectorImageToRegistry(Step): title = "Push connector image to registry" @property - def latest_docker_image_name(self): + def latest_docker_image_name(self) -> str: return f"{self.context.docker_repository}:latest" async def _run(self, built_containers_per_platform: List[Container], attempts: int = 3) -> StepResult: @@ -148,15 +148,15 @@ class UploadSpecToCache(Step): cloud_spec_file_name = "spec.cloud.json" @property - def spec_key_prefix(self): + def spec_key_prefix(self) -> str: return "specs/" + self.context.docker_image.replace(":", "/") @property - def cloud_spec_key(self): + def cloud_spec_key(self) -> str: return f"{self.spec_key_prefix}/{self.cloud_spec_file_name}" @property - def oss_spec_key(self): + def oss_spec_key(self) -> str: return f"{self.spec_key_prefix}/{self.default_spec_file_name}" def _parse_spec_output(self, spec_output: str) -> str: @@ -182,7 +182,7 @@ async def _get_connector_spec(self, connector: Container, deployment_mode: str) spec_output = await connector.with_env_variable("DEPLOYMENT_MODE", deployment_mode).with_exec(["spec"]).stdout() return self._parse_spec_output(spec_output) - async def _get_spec_as_file(self, spec: str, name="spec_to_cache.json") -> File: + async def _get_spec_as_file(self, spec: str, name: str = "spec_to_cache.json") -> File: return (await self.context.get_connector_dir()).with_new_file(name, contents=spec).file(name) async def _run(self, built_connector: Container) -> StepResult: diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/reports.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/reports.py index ddd7c8aa15b50..b8265c4385a19 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/reports.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/reports.py @@ -35,19 +35,19 @@ class ConnectorReport(Report): pipeline_context: ConnectorContext @property - def report_output_prefix(self) -> str: # noqa D102 + def report_output_prefix(self) -> str: return f"{self.pipeline_context.report_output_prefix}/{self.pipeline_context.connector.technical_name}/{self.pipeline_context.connector.version}" @property - def html_report_file_name(self) -> str: # noqa D102 + def html_report_file_name(self) -> str: return self.filename + ".html" @property - def html_report_remote_storage_key(self) -> str: # noqa D102 + def html_report_remote_storage_key(self) -> str: return f"{self.report_output_prefix}/{self.html_report_file_name}" @property - def html_report_url(self) -> str: # noqa D102 + def html_report_url(self) -> str: return f"{GCS_PUBLIC_DOMAIN}/{self.pipeline_context.ci_report_bucket}/{self.html_report_remote_storage_key}" def to_json(self) -> str: @@ -129,7 +129,7 @@ async def save(self) -> None: self.pipeline_context.logger.info(f"HTML report uploaded to {self.html_report_url}") await super().save() - def print(self): + def print(self) -> None: """Print the test report to the console in a nice way.""" connector_name = self.pipeline_context.connector.technical_name main_panel_title = Text(f"{connector_name.upper()} - {self.name}") diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/pipeline.py index 577b104f392d0..82b9b0099efc4 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/pipeline.py @@ -39,7 +39,7 @@ def get_test_steps(context: ConnectorContext) -> STEP_TREE: return [] -async def run_connector_test_pipeline(context: ConnectorContext, semaphore: anyio.Semaphore): +async def run_connector_test_pipeline(context: ConnectorContext, semaphore: anyio.Semaphore) -> ConnectorReport: """ Compute the steps to run for a connector test pipeline. """ @@ -64,6 +64,7 @@ async def run_connector_test_pipeline(context: ConnectorContext, semaphore: anyi ) results = list(result_dict.values()) - context.report = ConnectorReport(context, steps_results=results, name="TEST RESULTS") + report = ConnectorReport(context, steps_results=results, name="TEST RESULTS") + context.report = report - return context.report + return report diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py index cd28a74183b6a..d8f72a7cfef4c 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py @@ -36,7 +36,7 @@ def should_run(self) -> bool: return True @property - def github_master_metadata_url(self): + def github_master_metadata_url(self) -> str: return f"{self.GITHUB_URL_PREFIX_FOR_CONNECTORS}/{self.context.connector.technical_name}/{METADATA_FILE_NAME}" @cached_property @@ -325,7 +325,7 @@ async def _run(self, *args, **kwargs) -> StepResult: StepStatus.FAILURE, stdout=f"Connector is certified but does not use our base image. {migration_hint}", ) - has_dockerfile = "Dockerfile" in await (await self.context.get_connector_dir(include="Dockerfile")).entries() + has_dockerfile = "Dockerfile" in await (await self.context.get_connector_dir(include=["Dockerfile"])).entries() if has_dockerfile: return StepResult( self, diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py index f305d62233005..1dad9e215c7a7 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py @@ -4,10 +4,10 @@ """This module groups steps made to run tests for a specific Java connector given a test context.""" -from typing import Callable, List, Optional +from typing import TYPE_CHECKING import anyio -from dagger import Directory, File, QueryError +from dagger import File, QueryError from pipelines.airbyte_ci.connectors.build_image.steps.java_connectors import ( BuildConnectorDistributionTar, BuildConnectorImages, @@ -24,6 +24,9 @@ from pipelines.helpers.utils import export_container_to_tarball from pipelines.models.steps import StepResult, StepStatus +if TYPE_CHECKING: + from typing import Callable, Dict, List, Optional + class IntegrationTests(GradleTask): """A step to run integrations tests for Java connectors using the integrationTestJava Gradle task.""" @@ -33,13 +36,13 @@ class IntegrationTests(GradleTask): mount_connector_secrets = True bind_to_docker_host = True - async def _load_normalization_image(self, normalization_tar_file: File): + async def _load_normalization_image(self, normalization_tar_file: File) -> None: normalization_image_tag = f"{self.context.connector.normalization_repository}:dev" self.context.logger.info("Load the normalization image to the docker host.") await docker.load_image_to_docker_host(self.context, normalization_tar_file, normalization_image_tag) self.context.logger.info("Successfully loaded the normalization image to the docker host.") - async def _load_connector_image(self, connector_tar_file: File): + async def _load_connector_image(self, connector_tar_file: File) -> None: connector_image_tag = f"airbyte/{self.context.connector.technical_name}:dev" self.context.logger.info("Load the connector image to the docker host") await docker.load_image_to_docker_host(self.context, connector_tar_file, connector_image_tag) @@ -70,7 +73,7 @@ def _create_integration_step_args_factory(context: ConnectorContext) -> Callable Create a function that can process the args for the integration step. """ - async def _create_integration_step_args(results: RESULTS_DICT): + async def _create_integration_step_args(results: RESULTS_DICT) -> Dict[str, Optional[File]]: connector_container = results["build"].output_artifact[LOCAL_BUILD_PLATFORM] connector_image_tar_file, _ = await export_container_to_tarball(context, connector_container, LOCAL_BUILD_PLATFORM) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py index 59235d01f6dfe..65117b6d804b9 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py @@ -5,7 +5,7 @@ """This module groups steps made to run tests for a specific Python connector given a test context.""" from abc import ABC, abstractmethod -from typing import Callable, List, Sequence, Tuple +from typing import List, Sequence, Tuple import pipelines.dagger.actions.python.common import pipelines.dagger.actions.system.docker diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_cdk/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_cdk/commands.py index b1d3c1e953cc6..ad1fd2e0e80a9 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_cdk/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_cdk/commands.py @@ -10,7 +10,7 @@ from pipelines.cli.dagger_pipeline_command import DaggerPipelineCommand -def latest_cdk_version(): +def latest_cdk_version() -> str: """ Get the latest version of airbyte-cdk from pypi """ diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_cdk/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_cdk/pipeline.py index 21bd02b46fbd7..0ee1f67430a73 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_cdk/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/upgrade_cdk/pipeline.py @@ -4,12 +4,16 @@ import os import re +from typing import TYPE_CHECKING from pipelines.airbyte_ci.connectors.context import ConnectorContext from pipelines.airbyte_ci.connectors.reports import ConnectorReport, Report from pipelines.helpers import git from pipelines.models.steps import Step, StepResult, StepStatus +if TYPE_CHECKING: + from anyio import Semaphore + class SetCDKVersion(Step): context: ConnectorContext @@ -19,14 +23,14 @@ def __init__( self, context: ConnectorContext, new_version: str, - ): + ) -> None: super().__init__(context) self.new_version = new_version async def _run(self) -> StepResult: context = self.context og_connector_dir = await context.get_connector_dir() - if not "setup.py" in await og_connector_dir.entries(): + if "setup.py" not in await og_connector_dir.entries(): return self.skip("Connector does not have a setup.py file.") setup_py = og_connector_dir.file("setup.py") setup_py_content = await setup_py.contents() @@ -64,7 +68,7 @@ def update_cdk_version(self, og_setup_py_content: str) -> str: async def run_connector_cdk_upgrade_pipeline( context: ConnectorContext, - semaphore, + semaphore: Semaphore, target_version: str, ) -> Report: """Run a pipeline to upgrade the CDK version for a single connector. @@ -84,5 +88,6 @@ async def run_connector_cdk_upgrade_pipeline( ) set_cdk_version_result = await set_cdk_version.run() steps_results.append(set_cdk_version_result) - context.report = ConnectorReport(context, steps_results, name="CONNECTOR VERSION CDK UPGRADE RESULTS") - return context.report + report = ConnectorReport(context, steps_results, name="CONNECTOR VERSION CDK UPGRADE RESULTS") + context.report = report + return report diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/format/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/format/commands.py index 2e5147eabac0e..59e84e10c4aa5 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/format/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/format/commands.py @@ -28,7 +28,7 @@ @click_merge_args_into_context_obj @pass_pipeline_context @click_ignore_unused_kwargs -async def format_code(pipeline_context: ClickPipelineContext): +async def format_code(pipeline_context: ClickPipelineContext) -> None: pass @@ -36,7 +36,7 @@ async def format_code(pipeline_context: ClickPipelineContext): help="Run code format checks and fail if any checks fail.", chain=True, ) -async def check(): +async def check() -> None: pass @@ -44,7 +44,7 @@ async def check(): help="Run code format checks and fix any failures.", chain=True, ) -async def fix(): +async def fix() -> None: pass @@ -74,7 +74,7 @@ async def fix(): @check.command(name="all", help="Run all format checks and fail if any checks fail.") @click.pass_context -async def all_checks(ctx: click.Context): +async def all_checks(ctx: click.Context) -> None: """ Run all format checks and fail if any checks fail. """ @@ -97,7 +97,7 @@ async def all_checks(ctx: click.Context): @fix.command(name="all", help="Fix all format failures. Exits with status 1 if any file was modified.") @click.pass_context -async def all_fix(ctx: click.Context): +async def all_fix(ctx: click.Context) -> None: """Run code format checks and fix any failures.""" logger = logging.getLogger(fix.commands["all"].name) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/format/format_command.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/format/format_command.py index be7790781c4f4..62c59a69a5b6f 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/format/format_command.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/format/format_command.py @@ -72,7 +72,7 @@ def get_help_message(self) -> str: message = f"{message}." return message - def get_dir_to_format(self, dagger_client) -> dagger.Directory: + def get_dir_to_format(self, dagger_client: dagger.Client) -> dagger.Directory: """Get a directory with all the source code to format according to the file_filter. We mount the files to format in a git container and remove all gitignored files. It ensures we're not formatting files that are gitignored. diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/commands.py index 1a5e6269453a4..24b196fc8a3fb 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/commands.py @@ -11,13 +11,13 @@ @click.group(help="Commands related to the metadata service.") @click.pass_context -def metadata(ctx: click.Context): +def metadata(ctx: click.Context) -> None: pass @metadata.group(help="Commands related to deploying components of the metadata service.") @click.pass_context -def deploy(ctx: click.Context): +def deploy(ctx: click.Context) -> None: pass diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/pipeline.py index b22b77d8b05b0..60f3d222dd16c 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/pipeline.py @@ -22,7 +22,7 @@ class MetadataValidation(SimpleDockerStep): - def __init__(self, context: ConnectorContext): + def __init__(self, context: ConnectorContext) -> None: super().__init__( title=f"Validate metadata for {context.connector.technical_name}", context=context, @@ -64,7 +64,7 @@ def __init__( docker_hub_password_secret: dagger.Secret, pre_release: bool = False, pre_release_tag: Optional[str] = None, - ): + ) -> None: title = f"Upload metadata for {context.connector.technical_name} v{context.connector.version}" command_to_run = [ "metadata_service", @@ -131,7 +131,7 @@ async def _run(self) -> StepResult: container_to_run = ( python_with_dependencies.with_mounted_directory("/src", parent_dir) .with_secret_variable("DAGSTER_CLOUD_API_TOKEN", dagster_cloud_api_token_secret) - .with_workdir(f"/src/orchestrator") + .with_workdir("/src/orchestrator") .with_exec(["/bin/sh", "-c", "poetry2setup >> setup.py"]) .with_exec(self.deploy_dagster_command) ) @@ -139,7 +139,7 @@ async def _run(self) -> StepResult: class TestOrchestrator(PoetryRunStep): - def __init__(self, context: PipelineContext): + def __init__(self, context: PipelineContext) -> None: super().__init__( context=context, title="Test Metadata Orchestrator", diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/docker.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/docker.py index 5157dd83dddf9..71c692c37fae7 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/docker.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/docker.py @@ -22,7 +22,7 @@ def __init__( env_variables: dict[str, str] = {}, working_directory: str = "/", command: Optional[List[str]] = None, - ): + ) -> None: """A simple step that runs a given command in a container. Args: @@ -35,7 +35,7 @@ def __init__( working_directory (str, optional): working directory to run the command in. Defaults to "/". command (Optional[List[str]], optional): The default command to run. Defaults to None. """ - self.title = title + self._title = title super().__init__(context) self.paths_to_mount = paths_to_mount @@ -45,6 +45,10 @@ def __init__( self.env_variables = env_variables self.command = command + @property + def title(self) -> str: + return self._title + def _mount_paths(self, container: dagger.Container) -> dagger.Container: for path_to_mount in self.paths_to_mount: if path_to_mount.optional and not path_to_mount.get_path().exists(): @@ -89,7 +93,7 @@ async def init_container(self) -> dagger.Container: return container - async def _run(self, command=None) -> StepResult: + async def _run(self, command: Optional[List[str]] = None) -> StepResult: command_to_run = command or self.command if not command_to_run: raise ValueError(f"No command given to the {self.title} step") diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/poetry.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/poetry.py index e798e3b4f594b..43cb05ead0745 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/poetry.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/poetry.py @@ -10,7 +10,7 @@ class PoetryRunStep(Step): - def __init__(self, context: PipelineContext, title: str, parent_dir_path: str, module_path: str, poetry_run_args: List[str]): + def __init__(self, context: PipelineContext, title: str, parent_dir_path: str, module_path: str, poetry_run_args: List[str]) -> None: """A simple step that runs a given command inside a poetry project. Args: @@ -20,7 +20,7 @@ def __init__(self, context: PipelineContext, title: str, parent_dir_path: str, m module_path (str): The path to the poetry project poetry_run_args (List[str]): The arguments to pass to the poetry run command """ - self.title = title + self._title = title super().__init__(context) parent_dir = self.context.get_repo_dir(parent_dir_path) @@ -28,6 +28,10 @@ def __init__(self, context: PipelineContext, title: str, parent_dir_path: str, m self.poetry_run_args = poetry_run_args self.poetry_run_container = with_poetry_module(self.context, parent_dir, module_path).with_entrypoint(["poetry", "run"]) + @property + def title(self) -> str: + return self._title + async def _run(self) -> StepResult: poetry_run_exec = self.poetry_run_container.with_exec(self.poetry_run_args) return await self.get_step_result(poetry_run_exec) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/commands.py index c3d002a017c78..0ea67650eb192 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/test/commands.py @@ -46,7 +46,7 @@ async def run_poetry_command(container: dagger.Container, command: str) -> Tuple @click_merge_args_into_context_obj @pass_pipeline_context @click_ignore_unused_kwargs -async def test(pipeline_context: ClickPipelineContext): +async def test(pipeline_context: ClickPipelineContext) -> None: """Runs the tests for the given airbyte-ci package Args: diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/update/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/update/commands.py index 78c5e22e7db4f..c633f59db1d22 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/update/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/update/commands.py @@ -12,7 +12,7 @@ @click.command() @click.option("--version", default="latest", type=str, help="The version to update to.") -async def update(version: str): +async def update(version: str) -> None: """Updates airbyte-ci to the latest version.""" is_dev = is_dev_command() if is_dev: diff --git a/airbyte-ci/connectors/pipelines/pipelines/cli/airbyte_ci.py b/airbyte-ci/connectors/pipelines/pipelines/cli/airbyte_ci.py index 45770fbaddf24..fcfee0c1fb078 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/cli/airbyte_ci.py +++ b/airbyte-ci/connectors/pipelines/pipelines/cli/airbyte_ci.py @@ -4,6 +4,8 @@ """This module is the CLI entrypoint to the airbyte-ci commands.""" +from __future__ import annotations + import logging import multiprocessing import os @@ -79,7 +81,7 @@ def set_working_directory_to_root() -> None: os.chdir(working_dir) -def log_git_info(ctx: click.Context): +def log_git_info(ctx: click.Context) -> None: main_logger.info("Running airbyte-ci in CI mode.") main_logger.info(f"CI Context: {ctx.obj['ci_context']}") main_logger.info(f"CI Report Bucket Name: {ctx.obj['ci_report_bucket_name']}") @@ -110,7 +112,7 @@ def _get_pull_request(ctx: click.Context) -> Optional[PullRequest.PullRequest]: return github.get_pull_request(pull_request_number, ci_github_access_token) -def check_local_docker_configuration(): +def check_local_docker_configuration() -> None: try: docker_client = docker.from_env() except Exception as e: @@ -139,7 +141,7 @@ def is_dagger_run_enabled_by_default() -> bool: return False -def check_dagger_wrap(): +def check_dagger_wrap() -> bool: """ Check if the command is already wrapped by dagger run. This is useful to avoid infinite recursion when calling dagger run from dagger run. diff --git a/airbyte-ci/connectors/pipelines/pipelines/cli/auto_update.py b/airbyte-ci/connectors/pipelines/pipelines/cli/auto_update.py index c9bfdd9a93284..e1ac37ee68d94 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/cli/auto_update.py +++ b/airbyte-ci/connectors/pipelines/pipelines/cli/auto_update.py @@ -3,11 +3,13 @@ # # HELPERS +from __future__ import annotations import importlib import logging import os import sys +from typing import TYPE_CHECKING import asyncclick as click import requests # type: ignore @@ -16,6 +18,9 @@ from pipelines.consts import LOCAL_PIPELINE_PACKAGE_PATH from pipelines.external_scripts.airbyte_ci_install import RELEASE_URL, get_airbyte_os_name +if TYPE_CHECKING: + from typing import Callable + __installed_version__ = importlib.metadata.version("pipelines") PROD_COMMAND = "airbyte-ci" @@ -23,7 +28,7 @@ AUTO_UPDATE_AGREE_KEY = "yes_auto_update" -def pre_confirm_auto_update_flag(f): +def pre_confirm_auto_update_flag(f: Callable) -> Callable: """Decorator to add a --yes-auto-update flag to a command.""" return click.option( "--yes-auto-update", AUTO_UPDATE_AGREE_KEY, is_flag=True, default=False, help="Skip prompts and automatically upgrade pipelines" @@ -70,7 +75,7 @@ def is_dev_command() -> bool: def check_for_upgrade( require_update: bool = True, enable_auto_update: bool = True, -): +) -> None: """Check if the installed version of pipelines is up to date.""" current_command = " ".join(sys.argv) latest_version = _get_latest_version() diff --git a/airbyte-ci/connectors/pipelines/pipelines/cli/click_decorators.py b/airbyte-ci/connectors/pipelines/pipelines/cli/click_decorators.py index 3bab454b741b5..2c8a5e9f1232c 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/cli/click_decorators.py +++ b/airbyte-ci/connectors/pipelines/pipelines/cli/click_decorators.py @@ -34,7 +34,7 @@ def click_ignore_unused_kwargs(f: Callable) -> Callable: return f @functools.wraps(f) - def inner(*args, **kwargs): + def inner(*args, **kwargs) -> Callable: filtered_kwargs = {key: value for key, value in kwargs.items() if _is_kwarg_of(key, f)} return f(*args, **filtered_kwargs) @@ -46,7 +46,7 @@ def click_merge_args_into_context_obj(f: Callable) -> Callable: Decorator to pass click context and args to children commands. """ - def wrapper(*args, **kwargs): + def wrapper(*args, **kwargs) -> Callable: ctx = click.get_current_context() ctx.ensure_object(dict) click_obj = ctx.obj @@ -69,8 +69,8 @@ def click_append_to_context_object(key: str, value: Callable | Any) -> Callable: Decorator to append a value to the click context object. """ - def decorator(f): - async def wrapper(*args, **kwargs): + def decorator(f: Callable) -> Callable: + async def wrapper(*args, **kwargs) -> Any: ctx = click.get_current_context() ctx.ensure_object(dict) diff --git a/airbyte-ci/connectors/pipelines/pipelines/cli/confirm_prompt.py b/airbyte-ci/connectors/pipelines/pipelines/cli/confirm_prompt.py index 79f7785ef48fd..0a6f2a8efc63f 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/cli/confirm_prompt.py +++ b/airbyte-ci/connectors/pipelines/pipelines/cli/confirm_prompt.py @@ -2,14 +2,19 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from typing import Optional +from __future__ import annotations + +from typing import TYPE_CHECKING import asyncclick as click +if TYPE_CHECKING: + from typing import Callable + PRE_CONFIRM_ALL_KEY = "yes" -def pre_confirm_all_flag(f): +def pre_confirm_all_flag(f: Callable) -> Callable: """Decorator to add a --yes flag to a command.""" return click.option("-y", "--yes", PRE_CONFIRM_ALL_KEY, is_flag=True, default=False, help="Skip prompts and use default values")(f) diff --git a/airbyte-ci/connectors/pipelines/pipelines/cli/dagger_pipeline_command.py b/airbyte-ci/connectors/pipelines/pipelines/cli/dagger_pipeline_command.py index 2fed8450ebd6a..7999ce43aaabd 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/cli/dagger_pipeline_command.py +++ b/airbyte-ci/connectors/pipelines/pipelines/cli/dagger_pipeline_command.py @@ -6,7 +6,6 @@ from __future__ import annotations import sys -from glob import glob from pathlib import Path from typing import Any diff --git a/airbyte-ci/connectors/pipelines/pipelines/cli/dagger_run.py b/airbyte-ci/connectors/pipelines/pipelines/cli/dagger_run.py index f80109d8a4b7e..affc5c3c68a6e 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/cli/dagger_run.py +++ b/airbyte-ci/connectors/pipelines/pipelines/cli/dagger_run.py @@ -95,14 +95,14 @@ def check_dagger_cli_install() -> str: return dagger_path -def mark_dagger_wrap(): +def mark_dagger_wrap() -> None: """ Mark that the dagger wrap has been applied. """ os.environ[DAGGER_WRAP_ENV_VAR_NAME] = "true" -def call_current_command_with_dagger_run(): +def call_current_command_with_dagger_run() -> None: mark_dagger_wrap() if (os.environ.get("AIRBYTE_ROLE") == "airbyter") or (os.environ.get("CI") == "True"): os.environ[DAGGER_CLOUD_TOKEN_ENV_VAR_NAME_VALUE[0]] = DAGGER_CLOUD_TOKEN_ENV_VAR_NAME_VALUE[1] diff --git a/airbyte-ci/connectors/pipelines/pipelines/cli/lazy_group.py b/airbyte-ci/connectors/pipelines/pipelines/cli/lazy_group.py index bc794c4a52d79..7300feae19f14 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/cli/lazy_group.py +++ b/airbyte-ci/connectors/pipelines/pipelines/cli/lazy_group.py @@ -13,7 +13,7 @@ class LazyGroup(click.Group): A click Group that can lazily load subcommands. """ - def __init__(self, *args, lazy_subcommands: Optional[Dict[str, str]] = None, **kwargs): + def __init__(self, *args, lazy_subcommands: Optional[Dict[str, str]] = None, **kwargs) -> None: super().__init__(*args, **kwargs) # lazy_subcommands is a map of the form: # diff --git a/airbyte-ci/connectors/pipelines/pipelines/cli/telemetry.py b/airbyte-ci/connectors/pipelines/pipelines/cli/telemetry.py index cfcaefc44bdca..407011b4e7808 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/cli/telemetry.py +++ b/airbyte-ci/connectors/pipelines/pipelines/cli/telemetry.py @@ -1,27 +1,34 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from __future__ import annotations import getpass import hashlib import os import platform import sys +from typing import TYPE_CHECKING import segment.analytics as analytics # type: ignore from asyncclick import get_current_context +if TYPE_CHECKING: + from typing import Callable + + from asyncclick import Command + analytics.write_key = "G6G7whgro81g9xM00kN2buclGKvcOjFd" analytics.send = True analytics.debug = False -def _is_airbyte_user(): +def _is_airbyte_user() -> bool: """Returns True if the user is airbyter, False otherwise.""" return os.getenv("AIRBYTE_ROLE") == "airbyter" -def _get_anonymous_system_id(): +def _get_anonymous_system_id() -> str: """Returns a unique anonymous hashid of the current system info.""" # Collect machine-specific information machine_info = platform.node() @@ -35,12 +42,12 @@ def _get_anonymous_system_id(): return unique_id -def click_track_command(f): +def click_track_command(f: Callable) -> Callable: """ Decorator to track CLI commands with segment.io """ - def wrapper(*args, **kwargs): + def wrapper(*args, **kwargs) -> Command: ctx = get_current_context() top_level_command = ctx.command_path full_cmd = " ".join(sys.argv) diff --git a/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/secrets.py b/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/secrets.py index ce777b1dc86d4..7ebfc217557ed 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/secrets.py +++ b/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/secrets.py @@ -6,15 +6,17 @@ from __future__ import annotations import datetime -from typing import TYPE_CHECKING, Callable, Dict +from typing import TYPE_CHECKING from anyio import Path from dagger import Secret from pipelines.helpers.utils import get_file_contents, get_secret_host_variable if TYPE_CHECKING: + from typing import Callable, Dict + from dagger import Container - from pipelines.airbyte_ci.connectors.context import ConnectorContext, PipelineContext + from pipelines.airbyte_ci.connectors.context import ConnectorContext async def get_secrets_to_mask(ci_credentials_with_downloaded_secrets: Container) -> list[str]: @@ -32,7 +34,7 @@ async def get_secrets_to_mask(ci_credentials_with_downloaded_secrets: Container) return secrets_to_mask -async def download(context: ConnectorContext, gcp_gsm_env_variable_name: str = "GCP_GSM_CREDENTIALS") -> dict[str, Secret]: +async def download(context: ConnectorContext, gcp_gsm_env_variable_name: str = "GCP_GSM_CREDENTIALS") -> Dict[str, Secret]: """Use the ci-credentials tool to download the secrets stored for a specific connector to a Directory. Args: @@ -68,7 +70,7 @@ async def download(context: ConnectorContext, gcp_gsm_env_variable_name: str = " return connector_secrets -async def upload(context: ConnectorContext, gcp_gsm_env_variable_name: str = "GCP_GSM_CREDENTIALS"): +async def upload(context: ConnectorContext, gcp_gsm_env_variable_name: str = "GCP_GSM_CREDENTIALS") -> Container: """Use the ci-credentials tool to upload the secrets stored in the context's updated_secrets-dir. Args: diff --git a/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/system/docker.py b/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/system/docker.py index d637f548b80f5..9544d79a145e2 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/system/docker.py +++ b/airbyte-ci/connectors/pipelines/pipelines/dagger/actions/system/docker.py @@ -20,7 +20,6 @@ TAILSCALE_AUTH_KEY, ) from pipelines.helpers.utils import sh_dash_c -from pipelines.models.contexts.pipeline_context import PipelineContext def get_base_dockerd_container(dagger_client: Client) -> Container: @@ -190,7 +189,7 @@ def with_docker_cli(context: ConnectorContext) -> Container: return with_bound_docker_host(context, docker_cli) -async def load_image_to_docker_host(context: ConnectorContext, tar_file: File, image_tag: str): +async def load_image_to_docker_host(context: ConnectorContext, tar_file: File, image_tag: str) -> str: """Load a docker image tar archive to the docker host. Args: diff --git a/airbyte-ci/connectors/pipelines/pipelines/external_scripts/airbyte_ci_dev_install.py b/airbyte-ci/connectors/pipelines/pipelines/external_scripts/airbyte_ci_dev_install.py index dbb9b31d51d70..f938896d95a46 100755 --- a/airbyte-ci/connectors/pipelines/pipelines/external_scripts/airbyte_ci_dev_install.py +++ b/airbyte-ci/connectors/pipelines/pipelines/external_scripts/airbyte_ci_dev_install.py @@ -8,7 +8,7 @@ import sys -def check_command_exists(command, not_found_message): +def check_command_exists(command: str, not_found_message: str) -> None: """ Check if a command exists in the system path. """ @@ -19,7 +19,7 @@ def check_command_exists(command, not_found_message): sys.exit(1) -def main(): +def main() -> None: # Check if Python 3.10 is on the path check_command_exists( "python3.10", diff --git a/airbyte-ci/connectors/pipelines/pipelines/external_scripts/airbyte_ci_install.py b/airbyte-ci/connectors/pipelines/pipelines/external_scripts/airbyte_ci_install.py index 2dab769f8b334..d1267c9840e95 100755 --- a/airbyte-ci/connectors/pipelines/pipelines/external_scripts/airbyte_ci_install.py +++ b/airbyte-ci/connectors/pipelines/pipelines/external_scripts/airbyte_ci_install.py @@ -4,19 +4,25 @@ # Meaning, no external dependencies are allowed as we don't want users to have to run anything # other than this script to install the tool. +from __future__ import annotations + import os import shutil import ssl import sys import tempfile import urllib.request +from typing import TYPE_CHECKING # !IMPORTANT! This constant is inline here instead of being imported from pipelines/consts.py # because we don't want to introduce any dependencies on other files in the repository. RELEASE_URL = os.getenv("RELEASE_URL", "https://connectors.airbyte.com/files/airbyte-ci/releases") +if TYPE_CHECKING: + from typing import Optional + -def _get_custom_certificate_path(): +def _get_custom_certificate_path() -> Optional[str]: """ Returns the path to the custom certificate file if certifi is installed, otherwise None. @@ -41,10 +47,10 @@ def _get_custom_certificate_path(): return certifi.where() except ImportError: - return + return None -def get_ssl_context(): +def get_ssl_context() -> ssl.SSLContext: """ Returns an ssl.SSLContext object with the custom certificate file if certifi is installed, otherwise returns the default ssl.SSLContext object. @@ -56,22 +62,22 @@ def get_ssl_context(): return ssl.create_default_context(cafile=certifi_path) -def get_airbyte_os_name(): +def get_airbyte_os_name() -> Optional[str]: """ Returns 'ubuntu' if the system is Linux or 'macos' if the system is macOS. """ OS = os.uname().sysname if OS == "Linux": - print(f"Linux based system detected.") + print("Linux based system detected.") return "ubuntu" elif OS == "Darwin": - print(f"macOS based system detected.") + print("macOS based system detected.") return "macos" else: return None -def main(version="latest"): +def main(version: str = "latest") -> None: # Determine the operating system os_name = get_airbyte_os_name() if os_name is None: diff --git a/airbyte-ci/connectors/pipelines/pipelines/hacks.py b/airbyte-ci/connectors/pipelines/pipelines/hacks.py index 4a65628eaea41..e11cc5664b234 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/hacks.py +++ b/airbyte-ci/connectors/pipelines/pipelines/hacks.py @@ -47,7 +47,7 @@ async def cache_latest_cdk(context: ConnectorContext) -> None: ) -def never_fail_exec(command: List[str]) -> Callable: +def never_fail_exec(command: List[str]) -> Callable[[Container], Container]: """ Wrap a command execution with some bash sugar to always exit with a 0 exit code but write the actual exit code to a file. @@ -66,7 +66,7 @@ def never_fail_exec(command: List[str]) -> Callable: Callable: _description_ """ - def never_fail_exec_inner(container: Container): + def never_fail_exec_inner(container: Container) -> Container: return container.with_exec(["sh", "-c", f"{' '.join(command)}; echo $? > /exit_code"], skip_entrypoint=True) return never_fail_exec_inner diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/cli.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/cli.py index 711743ee8b5d2..4f601b7e83dc7 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/helpers/cli.py +++ b/airbyte-ci/connectors/pipelines/pipelines/helpers/cli.py @@ -50,7 +50,9 @@ class LogOptions: help_message: Optional[str] = None -def log_command_results(ctx: click.Context, command_results: List[CommandResult], logger: Logger, options: LogOptions = LogOptions()): +def log_command_results( + ctx: click.Context, command_results: List[CommandResult], logger: Logger, options: LogOptions = LogOptions() +) -> None: """ Log the output of the subcommands run by `run_all_subcommands`. """ diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/gcs.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/gcs.py index 701356abfdf0d..71fbce43b44bc 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/helpers/gcs.py +++ b/airbyte-ci/connectors/pipelines/pipelines/helpers/gcs.py @@ -4,7 +4,7 @@ import json from pathlib import Path -from typing import Optional, Tuple +from typing import Tuple from google.cloud import storage # type: ignore from google.oauth2 import service_account # type: ignore diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/git.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/git.py index e4092e8d7f61b..fee2f7a0708f0 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/helpers/git.py +++ b/airbyte-ci/connectors/pipelines/pipelines/helpers/git.py @@ -3,10 +3,7 @@ # import functools -import os -import re -from pathlib import Path -from typing import List, Set +from typing import Set import git from dagger import Connection diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/github.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/github.py index 3f3bfe78dda03..867c0fa896b7c 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/helpers/github.py +++ b/airbyte-ci/connectors/pipelines/pipelines/helpers/github.py @@ -38,10 +38,10 @@ def update_commit_status_check( target_url: str, description: str, context: str, - is_optional=False, - should_send=True, + is_optional: bool = False, + should_send: bool = True, logger: Optional[Logger] = None, -): +) -> None: """Call the GitHub API to create commit status check. Args: @@ -98,7 +98,7 @@ def get_pull_request(pull_request_number: int, github_access_token: str) -> Pull return airbyte_repo.get_pull(pull_request_number) -def update_global_commit_status_check_for_tests(click_context: dict, github_state: str, logger: Optional[Logger] = None): +def update_global_commit_status_check_for_tests(click_context: dict, github_state: str, logger: Optional[Logger] = None) -> None: update_commit_status_check( click_context["git_revision"], diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/run_steps.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/run_steps.py index 441a7f492a7d0..b1588a3f61994 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/helpers/run_steps.py +++ b/airbyte-ci/connectors/pipelines/pipelines/helpers/run_steps.py @@ -13,14 +13,14 @@ import anyio import asyncer from pipelines import main_logger -from pipelines.models.steps import StepResult, StepStatus - -RESULTS_DICT = Dict[str, StepResult] -ARGS_TYPE = Union[Dict, Callable[[RESULTS_DICT], Dict], Awaitable[Dict]] +from pipelines.models.steps import StepStatus if TYPE_CHECKING: from pipelines.models.steps import Step, StepResult +RESULTS_DICT = Dict[str, StepResult] +ARGS_TYPE = Union[Dict, Callable[[RESULTS_DICT], Dict], Awaitable[Dict]] + @dataclass class RunStepOptions: @@ -135,7 +135,7 @@ def _get_next_step_group(steps: STEP_TREE) -> Tuple[STEP_TREE, STEP_TREE]: return steps, [] -def _log_step_tree(step_tree: STEP_TREE, options: RunStepOptions, depth: int = 0): +def _log_step_tree(step_tree: STEP_TREE, options: RunStepOptions, depth: int = 0) -> None: """ Log the step tree to the console. diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/sentry_utils.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/sentry_utils.py index 49ec111aa08ca..c6b739afa9b4e 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/helpers/sentry_utils.py +++ b/airbyte-ci/connectors/pipelines/pipelines/helpers/sentry_utils.py @@ -1,15 +1,23 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from __future__ import annotations import importlib.metadata import os +from typing import TYPE_CHECKING import sentry_sdk from connector_ops.utils import Connector # type: ignore +if TYPE_CHECKING: + from typing import Any, Callable, Dict, Optional -def initialize(): + from asyncclick import Command, Context + from pipelines.models.steps import Step + + +def initialize() -> None: if "SENTRY_DSN" in os.environ: sentry_sdk.init( dsn=os.environ.get("SENTRY_DSN"), @@ -18,7 +26,7 @@ def initialize(): ) -def before_send(event, hint): +def before_send(event: Dict[str, Any], hint: Dict[str, Any]) -> Optional[Dict[str, Any]]: # Ignore logged errors that do not contain an exception if "log_record" in hint and "exc_info" not in hint: return None @@ -26,8 +34,8 @@ def before_send(event, hint): return event -def with_step_context(func): - def wrapper(self, *args, **kwargs): +def with_step_context(func: Callable) -> Callable: + def wrapper(self: Step, *args, **kwargs) -> Step: with sentry_sdk.configure_scope() as scope: step_name = self.__class__.__name__ scope.set_tag("pipeline_step", step_name) @@ -61,8 +69,8 @@ def wrapper(self, *args, **kwargs): return wrapper -def with_command_context(func): - def wrapper(self, ctx, *args, **kwargs): +def with_command_context(func: Callable) -> Callable: + def wrapper(self: Command, ctx: Context, *args, **kwargs) -> Command: with sentry_sdk.configure_scope() as scope: scope.set_tag("pipeline_command", self.name) scope.set_context( diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/utils.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/utils.py index 842301a314dad..aa83ceef43b82 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/helpers/utils.py +++ b/airbyte-ci/connectors/pipelines/pipelines/helpers/utils.py @@ -13,7 +13,7 @@ import unicodedata from io import TextIOWrapper from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable, List, Optional, Set, Tuple +from typing import TYPE_CHECKING import anyio import asyncclick as click @@ -22,6 +22,8 @@ from more_itertools import chunked if TYPE_CHECKING: + from typing import Any, Callable, Generator, List, Optional, Set, Tuple + from pipelines.airbyte_ci.connectors.context import ConnectorContext DAGGER_CONFIG = Config(log_output=sys.stderr) @@ -52,7 +54,7 @@ async def check_path_in_workdir(container: Container, path: str) -> bool: return False -def secret_host_variable(client: Client, name: str, default: str = ""): +def secret_host_variable(client: Client, name: str, default: str = "") -> Callable[[Container], Container]: """Add a host environment variable as a secret in a container. Example: @@ -68,7 +70,7 @@ def secret_host_variable(client: Client, name: str, default: str = ""): Callable[[Container], Container]: A function that can be used in a `Container.with_()` method. """ - def _secret_host_variable(container: Container): + def _secret_host_variable(container: Container) -> Container: return container.with_secret_variable(name, get_secret_host_variable(client, name, default)) return _secret_host_variable @@ -109,7 +111,7 @@ async def get_file_contents(container: Container, path: str) -> Optional[str]: @contextlib.contextmanager -def catch_exec_error_group(): +def catch_exec_error_group() -> Generator: try: yield except anyio.ExceptionGroup as eg: @@ -197,7 +199,7 @@ def get_current_epoch_time() -> int: # noqa D103 return round(datetime.datetime.utcnow().timestamp()) -def slugify(value: Any, allow_unicode: bool = False): +def slugify(value: object, allow_unicode: bool = False) -> str: """ Taken from https://github.com/django/django/blob/master/django/utils/text.py. @@ -257,7 +259,7 @@ def create_and_open_file(file_path: Path) -> TextIOWrapper: return file_path.open("w") -async def execute_concurrently(steps: List[Callable], concurrency=5): +async def execute_concurrently(steps: List[Callable], concurrency: int = 5) -> List[Any]: tasks = [] # Asyncer does not have builtin semaphore, so control concurrency via chunks of steps # Anyio has semaphores but does not have the soonify method which allow access to results via the value task attribute. @@ -322,7 +324,7 @@ def transform_strs_to_paths(str_paths: Set[str]) -> List[Path]: return sorted([Path(str_path) for str_path in str_paths]) -def fail_if_missing_docker_hub_creds(ctx: click.Context): +def fail_if_missing_docker_hub_creds(ctx: click.Context) -> None: if ctx.obj["docker_hub_username"] is None or ctx.obj["docker_hub_password"] is None: raise click.UsageError( "You need to be logged to DockerHub registry to run this command. Please set DOCKER_HUB_USERNAME and DOCKER_HUB_PASSWORD environment variables." diff --git a/airbyte-ci/connectors/pipelines/pipelines/models/contexts/click_pipeline_context.py b/airbyte-ci/connectors/pipelines/pipelines/models/contexts/click_pipeline_context.py index 68ece18574e03..d4c80a5fcd229 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/models/contexts/click_pipeline_context.py +++ b/airbyte-ci/connectors/pipelines/pipelines/models/contexts/click_pipeline_context.py @@ -5,7 +5,7 @@ import io import sys import tempfile -from typing import Any, Callable, Optional, TextIO, Tuple, Union +from typing import Any, Callable, Dict, Optional, TextIO, Tuple import anyio import dagger @@ -32,7 +32,7 @@ class ClickPipelineContext(BaseModel, Singleton): _og_click_context: Context = PrivateAttr(default=None) @property - def params(self): + def params(self) -> Dict[str, Any]: """ Returns a combination of the click context object and the click context params. @@ -57,7 +57,7 @@ def params(self): class Config: arbitrary_types_allowed = True - def __init__(self, **data: dict[str, Any]): + def __init__(self, **data: dict[str, Any]) -> None: """ Initialize the ClickPipelineContext instance. @@ -113,7 +113,7 @@ def _create_dagger_client_log_file(self) -> Tuple[TextIO, str]: """ Create the dagger client log file. """ - dagger_logs_file_descriptor, dagger_logs_temp_file_path = tempfile.mkstemp(dir="/tmp", prefix=f"dagger_client_", suffix=".log") + dagger_logs_file_descriptor, dagger_logs_temp_file_path = tempfile.mkstemp(dir="/tmp", prefix="dagger_client_", suffix=".log") main_logger.info(f"Dagger client logs stored in {dagger_logs_temp_file_path}") return io.TextIOWrapper(io.FileIO(dagger_logs_file_descriptor, "w+")), dagger_logs_temp_file_path diff --git a/airbyte-ci/connectors/pipelines/pipelines/models/contexts/pipeline_context.py b/airbyte-ci/connectors/pipelines/pipelines/models/contexts/pipeline_context.py index f9a664dd43909..fed31ec245cb7 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/models/contexts/pipeline_context.py +++ b/airbyte-ci/connectors/pipelines/pipelines/models/contexts/pipeline_context.py @@ -4,15 +4,17 @@ """Module declaring context related classes.""" +from __future__ import annotations + import logging import os from datetime import datetime from glob import glob from types import TracebackType -from typing import List, Optional +from typing import TYPE_CHECKING from asyncer import asyncify -from dagger import Client, Container, Directory, File, Secret +from dagger import Client, Container, Directory, File, GitRepository, Secret from github import PullRequest from pipelines.consts import CIContext, ContextState from pipelines.helpers.gcs import sanitize_gcs_credentials @@ -22,12 +24,17 @@ from pipelines.helpers.utils import AIRBYTE_REPO_URL from pipelines.models.reports import Report +if TYPE_CHECKING: + from typing import List, Optional + + from pipelines.airbyte_ci.connectors.reports import ConnectorReport + class PipelineContext: """The pipeline context is used to store configuration for a specific pipeline run.""" _dagger_client: Optional[Client] - _report: Optional[Report] + _report: Optional[Report | ConnectorReport] dockerd_service: Optional[Container] started_at: Optional[datetime] stopped_at: Optional[datetime] @@ -73,7 +80,7 @@ def __init__( ci_github_access_token: Optional[str] = None, run_step_options: RunStepOptions = RunStepOptions(), enable_report_auto_open: bool = True, - ): + ) -> None: """Initialize a pipeline context. Args: @@ -122,33 +129,33 @@ def __init__( update_commit_status_check(**self.github_commit_status) @property - def dagger_client(self) -> Client: # noqa D102 + def dagger_client(self) -> Client: assert self._dagger_client is not None, "The dagger client was not set on this PipelineContext" return self._dagger_client @dagger_client.setter - def dagger_client(self, dagger_client: Client): # noqa D102 + def dagger_client(self, dagger_client: Client) -> None: self._dagger_client = dagger_client @property - def is_ci(self): # noqa D102 + def is_ci(self) -> bool: return self.is_local is False @property - def is_pr(self): # noqa D102 + def is_pr(self) -> bool: return self.ci_context == CIContext.PULL_REQUEST @property - def repo(self): # noqa D102 + def repo(self) -> GitRepository: return self.dagger_client.git(AIRBYTE_REPO_URL, keep_git_dir=True) @property - def report(self) -> Report: # noqa D102 + def report(self) -> Report | ConnectorReport: assert self._report is not None, "The report was not set on this PipelineContext." return self._report @report.setter - def report(self, report: Report): # noqa D102 + def report(self, report: Report | ConnectorReport) -> None: self._report = report @property @@ -233,7 +240,7 @@ def get_repo_dir(self, subdir: str = ".", exclude: Optional[List[str]] = None, i def create_slack_message(self) -> str: raise NotImplementedError() - async def __aenter__(self): + async def __aenter__(self) -> PipelineContext: """Perform setup operation for the PipelineContext. Updates the current commit status on Github. diff --git a/airbyte-ci/connectors/pipelines/pipelines/models/reports.py b/airbyte-ci/connectors/pipelines/pipelines/models/reports.py index 9879a1554f7e9..206e1c44e9e11 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/models/reports.py +++ b/airbyte-ci/connectors/pipelines/pipelines/models/reports.py @@ -41,47 +41,47 @@ class Report: filename: str = "output" @property - def report_output_prefix(self) -> str: # noqa D102 + def report_output_prefix(self) -> str: return self.pipeline_context.report_output_prefix @property - def json_report_file_name(self) -> str: # noqa D102 + def json_report_file_name(self) -> str: return self.filename + ".json" @property - def json_report_remote_storage_key(self) -> str: # noqa D102 + def json_report_remote_storage_key(self) -> str: return f"{self.report_output_prefix}/{self.json_report_file_name}" @property - def failed_steps(self) -> List[StepResult]: # noqa D102 + def failed_steps(self) -> List[StepResult]: return [step_result for step_result in self.steps_results if step_result.status is StepStatus.FAILURE] @property - def successful_steps(self) -> List[StepResult]: # noqa D102 + def successful_steps(self) -> List[StepResult]: return [step_result for step_result in self.steps_results if step_result.status is StepStatus.SUCCESS] @property - def skipped_steps(self) -> List[StepResult]: # noqa D102 + def skipped_steps(self) -> List[StepResult]: return [step_result for step_result in self.steps_results if step_result.status is StepStatus.SKIPPED] @property - def success(self) -> bool: # noqa D102 + def success(self) -> bool: return len(self.failed_steps) == 0 and (len(self.skipped_steps) > 0 or len(self.successful_steps) > 0) @property - def run_duration(self) -> timedelta: # noqa D102 + def run_duration(self) -> timedelta: assert self.pipeline_context.started_at is not None, "The pipeline started_at timestamp must be set to save reports." assert self.pipeline_context.stopped_at is not None, "The pipeline stopped_at timestamp must be set to save reports." return self.pipeline_context.stopped_at - self.pipeline_context.started_at @property - def lead_duration(self) -> timedelta: # noqa D102 + def lead_duration(self) -> timedelta: assert self.pipeline_context.started_at is not None, "The pipeline started_at timestamp must be set to save reports." assert self.pipeline_context.stopped_at is not None, "The pipeline stopped_at timestamp must be set to save reports." return self.pipeline_context.stopped_at - self.pipeline_context.created_at @property - def remote_storage_enabled(self) -> bool: # noqa D102 + def remote_storage_enabled(self) -> bool: return self.pipeline_context.is_ci async def save_local(self, filename: str, content: str) -> Path: @@ -151,7 +151,7 @@ def to_json(self) -> str: } ) - def print(self): + def print(self) -> None: """Print the test report to the console in a nice way.""" pipeline_name = self.pipeline_context.pipeline_name main_panel_title = Text(f"{pipeline_name.upper()} - {self.name}") diff --git a/airbyte-ci/connectors/pipelines/pipelines/models/steps.py b/airbyte-ci/connectors/pipelines/pipelines/models/steps.py index 13a3033d30622..f98aae106042e 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/models/steps.py +++ b/airbyte-ci/connectors/pipelines/pipelines/models/steps.py @@ -10,7 +10,7 @@ from datetime import datetime, timedelta from enum import Enum from pathlib import Path -from typing import TYPE_CHECKING, Any, ClassVar, Final, Optional, Union +from typing import TYPE_CHECKING import anyio import asyncer @@ -21,6 +21,7 @@ from pipelines.helpers.utils import format_duration, get_exec_result if TYPE_CHECKING: + from typing import Any, ClassVar, Optional, Union from pipelines.airbyte_ci.format.format_command import FormatCommand from pipelines.models.contexts.pipeline_context import PipelineContext @@ -34,11 +35,11 @@ class MountPath: path: Union[Path, str] optional: bool = False - def _cast_fields(self): + def _cast_fields(self) -> None: self.path = Path(self.path) self.optional = bool(self.optional) - def _check_exists(self): + def _check_exists(self) -> None: if not self.get_path().exists(): message = f"{self.path} does not exist." if self.optional: @@ -49,11 +50,11 @@ def _check_exists(self): def get_path(self) -> Path: return Path(self.path) - def __post_init__(self): + def __post_init__(self) -> None: self._cast_fields() self._check_exists() - def __str__(self): + def __str__(self) -> str: return str(self.path) @property @@ -79,7 +80,7 @@ def __repr__(self) -> str: # noqa D105 def __str__(self) -> str: # noqa D105 return f"{self.step.title}: {self.status.value}\n\nSTDOUT:\n{self.stdout}\n\nSTDERR:\n{self.stderr}" - def __post_init__(self): + def __post_init__(self) -> None: if self.stderr: super().__setattr__("stderr", self.redact_secrets_from_string(self.stderr)) if self.stdout: @@ -146,7 +147,6 @@ def __str__(self) -> str: # noqa D105 class Step(ABC): """An abstract class to declare and run pipeline step.""" - title: str max_retries: ClassVar[int] = 0 max_dagger_error_retries: ClassVar[int] = 3 should_log: ClassVar[bool] = True @@ -164,6 +164,11 @@ def __init__(self, context: PipelineContext) -> None: # noqa D107 self.started_at: Optional[datetime] = None self.stopped_at: Optional[datetime] = None + @property + def title(self) -> str: + """The title of the step.""" + raise NotImplementedError("Steps must define a 'title' attribute.") + @property def run_duration(self) -> timedelta: if self.started_at and self.stopped_at: @@ -242,7 +247,7 @@ def should_retry(self, step_result: StepResult) -> bool: max_retries = self.max_dagger_error_retries if step_result.exc_info else self.max_retries return self.retry_count < max_retries and max_retries > 0 - async def retry(self, step_result, *args, **kwargs) -> StepResult: + async def retry(self, step_result: StepResult, *args, **kwargs) -> StepResult: self.retry_count += 1 self.logger.warn( f"Failed with error: {step_result.stderr}.\nRetry #{self.retry_count} in {self.retry_delay.total_seconds()} seconds..."