Skip to content

airbyte-ci: fix linting and missing annotations #33797

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/pipelines/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
logging.getLogger("httpx").setLevel(logging.WARNING)

# RichHandler does not work great in the CI environment, so we use a StreamHandler instead
logging_handler: Union[RichHandler, logging.StreamHandler] = RichHandler(rich_tracebacks=True) if not "CI" in os.environ else logging.StreamHandler()
logging_handler: Union[RichHandler, logging.StreamHandler] = RichHandler(rich_tracebacks=True) if "CI" not in os.environ else logging.StreamHandler()


logging.basicConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,6 @@ async def run_connector_build_pipeline(context: ConnectorContext, semaphore: any
if context.is_local and build_result.status is StepStatus.SUCCESS:
load_image_result = await LoadContainerToLocalDockerHost(context, per_platform_built_containers, image_tag).run()
step_results.append(load_image_result)
context.report = ConnectorReport(context, step_results, name="BUILD RESULTS")
return context.report
report = ConnectorReport(context, step_results, name="BUILD RESULTS")
context.report = report
return report
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from __future__ import annotations

from abc import ABC
from typing import List, Optional, Tuple

import docker # type: ignore
from dagger import Container, ExecError, Platform, QueryError
Expand All @@ -21,7 +20,7 @@ class BuildConnectorImagesBase(Step, ABC):
context: ConnectorContext

@property
def title(self):
def title(self) -> str:
return f"Build {self.context.connector.technical_name} docker image for platform(s) {', '.join(self.build_platforms)}"

def __init__(self, context: ConnectorContext) -> None:
Expand Down Expand Up @@ -60,20 +59,20 @@ async def _build_connector(self, platform: Platform, *args, **kwargs) -> Contain
class LoadContainerToLocalDockerHost(Step):
context: ConnectorContext

def __init__(self, context: ConnectorContext, containers: dict[Platform, Container], image_tag: Optional[str] = "dev") -> None:
def __init__(self, context: ConnectorContext, containers: dict[Platform, Container], image_tag: str = "dev") -> None:
super().__init__(context)
self.image_tag = image_tag
self.containers = containers

def _generate_dev_tag(self, platform: Platform, multi_platforms: bool):
def _generate_dev_tag(self, platform: Platform, multi_platforms: bool) -> str:
"""
When building for multiple platforms, we need to tag the image with the platform name.
There's no way to locally build a multi-arch image, so we need to tag the image with the platform name when the user passed multiple architecture options.
"""
return f"{self.image_tag}-{platform.replace('/', '-')}" if multi_platforms else self.image_tag

@property
def title(self):
def title(self) -> str:
return f"Load {self.image_name}:{self.image_tag} to the local docker host."

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ def __init__(self, context: ConnectorContext, normalization_image: str, build_pl
self.build_platform = build_platform
self.use_dev_normalization = normalization_image.endswith(":dev")
self.normalization_image = normalization_image
self.title = f"Build {self.normalization_image}" if self.use_dev_normalization else f"Pull {self.normalization_image}"

@property
def title(self) -> str:
return f"Build {self.normalization_image}" if self.use_dev_normalization else f"Pull {self.normalization_image}"

async def _run(self) -> StepResult:
if self.use_dev_normalization:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ async def _build_from_base_image(self, platform: Platform) -> Container:
# copy python dependencies from builder to connector container
customized_base.with_directory("/usr/local", builder.directory("/usr/local"))
.with_workdir(self.PATH_TO_INTEGRATION_CODE)
.with_file(main_file_name, (await self.context.get_connector_dir(include=main_file_name)).file(main_file_name))
.with_file(main_file_name, (await self.context.get_connector_dir(include=[main_file_name])).file(main_file_name))
.with_directory(
connector_snake_case_name,
(await self.context.get_connector_dir(include=connector_snake_case_name)).directory(connector_snake_case_name),
(await self.context.get_connector_dir(include=[connector_snake_case_name])).directory(connector_snake_case_name),
)
.with_env_variable("AIRBYTE_ENTRYPOINT", " ".join(entrypoint))
.with_entrypoint(entrypoint)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import datetime
from copy import deepcopy
from typing import TYPE_CHECKING

import semver
from dagger import Container, Directory
Expand All @@ -13,6 +13,9 @@
from pipelines.helpers.connectors import metadata_change_helpers
from pipelines.models.steps import Step, StepResult, StepStatus

if TYPE_CHECKING:
from anyio import Semaphore


def get_bumped_version(version: str, bump_type: str) -> str:
current_version = semver.VersionInfo.parse(version)
Expand All @@ -38,7 +41,7 @@ def __init__(
new_version: str,
changelog_entry: str,
pull_request_number: str,
):
) -> None:
super().__init__(context)
self.repo_dir = repo_dir
self.new_version = new_version
Expand Down Expand Up @@ -71,14 +74,14 @@ async def _run(self) -> StepResult:
output_artifact=updated_repo_dir,
)

def find_line_index_for_new_entry(self, markdown_text) -> int:
def find_line_index_for_new_entry(self, markdown_text: str) -> int:
lines = markdown_text.splitlines()
for line_index, line in enumerate(lines):
if "version" in line.lower() and "date" in line.lower() and "pull request" in line.lower() and "subject" in line.lower():
return line_index + 2
raise Exception("Could not find the changelog section table in the documentation file.")

def add_changelog_entry(self, og_doc_content) -> str:
def add_changelog_entry(self, og_doc_content: str) -> str:
today = datetime.date.today().strftime("%Y-%m-%d")
lines = og_doc_content.splitlines()
line_index_for_new_entry = self.find_line_index_for_new_entry(og_doc_content)
Expand All @@ -96,7 +99,7 @@ def __init__(
context: ConnectorContext,
repo_dir: Directory,
new_version: str,
):
) -> None:
super().__init__(context)
self.repo_dir = repo_dir
self.new_version = new_version
Expand Down Expand Up @@ -136,7 +139,7 @@ async def _run(self) -> StepResult:

async def run_connector_version_bump_pipeline(
context: ConnectorContext,
semaphore,
semaphore: Semaphore,
bump_type: str,
changelog_entry: str,
pull_request_number: str,
Expand Down Expand Up @@ -174,5 +177,6 @@ async def run_connector_version_bump_pipeline(
steps_results.append(add_changelog_entry_result)
final_repo_dir = add_changelog_entry_result.output_artifact
await og_repo_dir.diff(final_repo_dir).export(str(git.get_git_repo_path()))
context.report = ConnectorReport(context, steps_results, name="CONNECTOR VERSION BUMP RESULTS")
return context.report
report = ConnectorReport(context, steps_results, name="CONNECTOR VERSION BUMP RESULTS")
context.report = report
return report
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def get_selected_connectors_with_modified_files(
return selected_connectors_with_modified_files


def validate_environment(is_local: bool):
def validate_environment(is_local: bool) -> None:
"""Check if the required environment variables exist."""
if is_local:
if not Path(".git").is_dir():
Expand Down Expand Up @@ -236,7 +236,7 @@ def should_use_remote_secrets(use_remote_secrets: Optional[bool]) -> bool:
@click_ignore_unused_kwargs
async def connectors(
ctx: click.Context,
):
) -> None:
"""Group all the connectors-ci command."""
validate_environment(ctx.obj["is_local"])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from datetime import datetime
from types import TracebackType
from typing import Dict, Optional, Sequence
from typing import TYPE_CHECKING

import yaml # type: ignore
from anyio import Path
Expand All @@ -23,6 +23,10 @@
from pipelines.helpers.utils import METADATA_FILE_NAME
from pipelines.models.contexts.pipeline_context import PipelineContext

if TYPE_CHECKING:
from pathlib import Path as NativePath
from typing import Dict, FrozenSet, List, Optional, Sequence


class ConnectorContext(PipelineContext):
"""The connector context is used to store configuration for a specific connector pipeline run."""
Expand Down Expand Up @@ -62,7 +66,7 @@ def __init__(
concurrent_cat: Optional[bool] = False,
run_step_options: RunStepOptions = RunStepOptions(),
targeted_platforms: Sequence[Platform] = BUILD_PLATFORMS,
):
) -> None:
"""Initialize a connector context.

Args:
Expand Down Expand Up @@ -145,31 +149,31 @@ def s3_build_cache_secret_key_secret(self) -> Optional[Secret]:
return None

@property
def modified_files(self):
def modified_files(self) -> FrozenSet[NativePath]:
return self.connector.modified_files

@property
def secrets_dir(self) -> Optional[Directory]: # noqa D102
def secrets_dir(self) -> Optional[Directory]:
return self._secrets_dir

@secrets_dir.setter
def secrets_dir(self, secrets_dir: Directory): # noqa D102
def secrets_dir(self, secrets_dir: Directory) -> None:
self._secrets_dir = secrets_dir

@property
def updated_secrets_dir(self) -> Optional[Directory]: # noqa D102
def updated_secrets_dir(self) -> Optional[Directory]:
return self._updated_secrets_dir

@updated_secrets_dir.setter
def updated_secrets_dir(self, updated_secrets_dir: Directory): # noqa D102
def updated_secrets_dir(self, updated_secrets_dir: Directory) -> None:
self._updated_secrets_dir = updated_secrets_dir

@property
def connector_acceptance_test_source_dir(self) -> Directory: # noqa D102
def connector_acceptance_test_source_dir(self) -> Directory:
return self.get_repo_dir("airbyte-integrations/bases/connector-acceptance-test")

@property
def should_save_updated_secrets(self) -> bool: # noqa D102
def should_save_updated_secrets(self) -> bool:
return self.use_remote_secrets and self.updated_secrets_dir is not None

@property
Expand Down Expand Up @@ -208,12 +212,12 @@ def docker_hub_password_secret(self) -> Optional[Secret]:
return None
return self.dagger_client.set_secret("docker_hub_password", self.docker_hub_password)

async def get_connector_secrets(self):
async def get_connector_secrets(self) -> Dict[str, Secret]:
if self._connector_secrets is None:
self._connector_secrets = await secrets.get_connector_secrets(self)
return self._connector_secrets

async def get_connector_dir(self, exclude=None, include=None) -> Directory:
async def get_connector_dir(self, exclude: Optional[List[str]] = None, include: Optional[List[str]] = None) -> Directory:
"""Get the connector under test source code directory.

Args:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
@click.pass_context
async def list_connectors(
ctx: click.Context,
):
) -> bool:
selected_connectors = sorted(ctx.obj["selected_connectors_with_modified_files"], key=lambda x: x.technical_name)
table = Table(title=f"{len(selected_connectors)} selected connectors")
table.add_column("Modified")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import textwrap
from copy import deepcopy
from typing import Optional
from typing import TYPE_CHECKING

from base_images import version_registry # type: ignore
from connector_ops.utils import ConnectorLanguage # type: ignore
Expand All @@ -17,6 +17,11 @@
from pipelines.helpers.connectors import metadata_change_helpers
from pipelines.models.steps import Step, StepResult, StepStatus

if TYPE_CHECKING:
from typing import Optional

from anyio import Semaphore


class UpgradeBaseImageMetadata(Step):
context: ConnectorContext
Expand All @@ -28,7 +33,7 @@ def __init__(
context: ConnectorContext,
repo_dir: Directory,
set_if_not_exists: bool = True,
):
) -> None:
super().__init__(context)
self.repo_dir = repo_dir
self.set_if_not_exists = set_if_not_exists
Expand Down Expand Up @@ -99,12 +104,12 @@ def __init__(
self,
context: ConnectorContext,
file_to_delete: str,
):
) -> None:
super().__init__(context)
self.file_to_delete = file_to_delete

@property
def title(self):
def title(self) -> str:
return f"Delete {self.file_to_delete}"

async def _run(self) -> StepResult:
Expand Down Expand Up @@ -161,7 +166,7 @@ async def _run(self) -> StepResult:
output_artifact=updated_repo_dir,
)

def add_build_instructions(self, og_doc_content) -> str:
def add_build_instructions(self, og_doc_content: str) -> str:

build_instructions_template = Template(
textwrap.dedent(
Expand Down Expand Up @@ -258,7 +263,7 @@ async def post_connector_install(connector_container: Container) -> Container:
return new_doc


async def run_connector_base_image_upgrade_pipeline(context: ConnectorContext, semaphore, set_if_not_exists: bool) -> Report:
async def run_connector_base_image_upgrade_pipeline(context: ConnectorContext, semaphore: Semaphore, set_if_not_exists: bool) -> Report:
"""Run a pipeline to upgrade for a single connector to use our base image."""
async with semaphore:
steps_results = []
Expand All @@ -273,11 +278,14 @@ async def run_connector_base_image_upgrade_pipeline(context: ConnectorContext, s
steps_results.append(update_base_image_in_metadata_result)
final_repo_dir = update_base_image_in_metadata_result.output_artifact
await og_repo_dir.diff(final_repo_dir).export(str(git.get_git_repo_path()))
context.report = ConnectorReport(context, steps_results, name="BASE IMAGE UPGRADE RESULTS")
return context.report
report = ConnectorReport(context, steps_results, name="BASE IMAGE UPGRADE RESULTS")
context.report = report
return report


async def run_connector_migration_to_base_image_pipeline(context: ConnectorContext, semaphore, pull_request_number: str):
async def run_connector_migration_to_base_image_pipeline(
context: ConnectorContext, semaphore: Semaphore, pull_request_number: str
) -> Report:
async with semaphore:
steps_results = []
async with context:
Expand Down Expand Up @@ -344,6 +352,6 @@ async def run_connector_migration_to_base_image_pipeline(context: ConnectorConte
# EXPORT MODIFIED FILES BACK TO HOST
final_repo_dir = add_build_instructions_to_doc_results.output_artifact
await og_repo_dir.diff(final_repo_dir).export(str(git.get_git_repo_path()))

context.report = ConnectorReport(context, steps_results, name="MIGRATE TO BASE IMAGE RESULTS")
return context.report
report = ConnectorReport(context, steps_results, name="MIGRATE TO BASE IMAGE RESULTS")
context.report = report
return report
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ async def context_to_step_result(context: PipelineContext) -> StepResult:
# TODO (ben): Refactor run_connectors_pipelines to wrap the whole pipeline in a dagger pipeline once Steps are refactored
async def run_report_complete_pipeline(
dagger_client: dagger.Client, contexts: List[ConnectorContext] | List[PublishConnectorContext] | List[PipelineContext]
):
) -> None:
"""Create and Save a report representing the run of the encompassing pipeline.

This is to denote when the pipeline is complete, useful for long running pipelines like nightlies.
"""

if not contexts:
return []
return

# Repurpose the first context to be the pipeline upload context to preserve timestamps
first_connector_context = contexts[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import List

import asyncclick as click
from pipelines import main_logger
Expand Down Expand Up @@ -68,7 +67,7 @@ async def publish(
metadata_service_gcs_credentials: str,
slack_webhook: str,
slack_channel: str,
):
) -> bool:
ctx.obj["spec_cache_gcs_credentials"] = spec_cache_gcs_credentials
ctx.obj["spec_cache_bucket_name"] = spec_cache_bucket_name
ctx.obj["metadata_service_bucket_name"] = metadata_service_bucket_name
Expand Down
Loading