Skip to content

connectors-ci: better modified connectors detection logic #28855

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
aabbf8c
connectors-ci: improve modified connectors detection
alafanechere Jul 29, 2023
9e6004b
Merge branch 'master' into augustin/connectors-ci/better-modified-con…
alafanechere Jul 31, 2023
d58269d
update changelog
alafanechere Jul 31, 2023
3ba05c9
Merge branch 'master' into augustin/connectors-ci/better-modified-con…
alafanechere Aug 1, 2023
98b7c1e
rename get_all_released_connectors to get_all_connectors_in_repo
alafanechere Aug 1, 2023
0e15170
bump connector ops version
alafanechere Aug 1, 2023
a2166ff
selected connector intersection instead of union
alafanechere Aug 1, 2023
0b05dc1
move selection logic to a dedicated function
alafanechere Aug 1, 2023
4675b13
wip testing
alafanechere Aug 1, 2023
b37862e
remove ctx logging
alafanechere Aug 1, 2023
bc6a175
ref: centralize selection logic even more
alafanechere Aug 2, 2023
c39be98
more tests
alafanechere Aug 2, 2023
110a7a2
better repo filtering on tests
alafanechere Aug 2, 2023
d3b8110
add --metadata-only-changes on publish workflow
alafanechere Aug 2, 2023
96e6eaf
Merge branch 'master' into augustin/connectors-ci/better-modified-con…
alafanechere Aug 2, 2023
3627b04
Merge branch 'master' into augustin/connectors-ci/better-modified-con…
alafanechere Aug 2, 2023
0de7a48
DEMO - to revert
alafanechere Aug 2, 2023
1da9809
update CLI doc
alafanechere Aug 2, 2023
c181cf6
fix should_run
alafanechere Aug 2, 2023
351725f
DEMO - to revert
alafanechere Aug 2, 2023
a2fe34b
test
alafanechere Aug 2, 2023
cc07429
Revert "DEMO - to revert"
alafanechere Aug 2, 2023
e9d21b5
Revert "DEMO - to revert"
alafanechere Aug 2, 2023
26a68e5
perform connector name validation at click option parsing
alafanechere Aug 3, 2023
a6bfcdb
Merge branch 'master' into augustin/connectors-ci/better-modified-con…
alafanechere Aug 3, 2023
22ef805
fix connector ops version bumping
alafanechere Aug 3, 2023
be62801
fix connector ops version bumping
alafanechere Aug 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions airbyte-ci/connectors/connector_ops/connector_ops/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def __repr__(self) -> str:

@functools.lru_cache(maxsize=2)
def get_local_dependency_paths(self, with_test_dependencies: bool = True) -> Set[Path]:
dependencies_paths = [self.code_directory]
dependencies_paths = []
if self.language == ConnectorLanguage.JAVA:
dependencies_paths += get_all_gradle_dependencies(
self.code_directory / "build.gradle", with_test_dependencies=with_test_dependencies
Expand Down Expand Up @@ -352,8 +352,11 @@ def get_changed_connectors(


def get_all_released_connectors() -> Set:
repo = git.Repo(search_parent_directories=True)
repo_path = repo.working_tree_dir

return {
Connector(Path(metadata_file).parent.name)
for metadata_file in glob("airbyte-integrations/connectors/**/metadata.yaml", recursive=True)
for metadata_file in glob(f"{repo_path}/airbyte-integrations/connectors/**/metadata.yaml", recursive=True)
if SCAFFOLD_CONNECTOR_GLOB not in metadata_file
}
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/connector_ops/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "connector_ops"
version = "0.2.1"
version = "0.2.2"
description = "Packaged maintained by the connector operations team to perform CI for connectors"
authors = ["Airbyte <[email protected]>"]

Expand Down
7 changes: 4 additions & 3 deletions airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,10 @@ This command runs tests for the metadata service orchestrator.
`airbyte-ci metadata test orchestrator`

## Changelog
| Version | PR | Description |
| ------- | --- | ------------------------------------------------------------------------------------------ |
| 0.1.0 | | Alpha version not in production yet. All the commands described in this doc are available. |
| Version | PR | Description |
| ------- | --------------------------------------------------------- | ------------------------------------------------------------------------------------------ |
| 0.1.1 | [#28855](https://github.com/airbytehq/airbyte/pull/28855) | Improved the modified connector detection for connectors commands. |
| 0.1.0 | | Alpha version not in production yet. All the commands described in this doc are available. |

## More info
This project is owned by the Connectors Operations team.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import List

import click
from github import PullRequest
from pipelines import github, main_logger
from pipelines.bases import CIContext
from pipelines.utils import (
Expand All @@ -17,7 +18,6 @@
get_modified_files_in_commit,
get_modified_files_in_pull_request,
)
from github import PullRequest

from .groups.connectors import connectors
from .groups.metadata import metadata
Expand Down Expand Up @@ -133,6 +133,7 @@ def airbyte_ci(
main_logger.info(f"Pipeline Start Timestamp: {pipeline_start_timestamp}")
main_logger.info(f"Modified Files: {ctx.obj['modified_files']}")


airbyte_ci.add_command(connectors)
airbyte_ci.add_command(metadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
import os
import sys
from pathlib import Path
from typing import Any, Dict, Tuple
from typing import Set, Tuple, Union

import anyio
import click
from connector_ops.utils import Connector, ConnectorLanguage, console, get_all_released_connectors
from pipelines import main_logger
from pipelines.builds import run_connector_build_pipeline
from pipelines.contexts import ConnectorContext, ContextState, PublishConnectorContext
Expand All @@ -19,8 +20,7 @@
from pipelines.pipelines.connectors import run_connectors_pipelines
from pipelines.publish import reorder_contexts, run_connector_publish_pipeline
from pipelines.tests import run_connector_test_pipeline
from pipelines.utils import DaggerPipelineCommand, get_modified_connectors
from connector_ops.utils import ConnectorLanguage, console, get_all_released_connectors
from pipelines.utils import DaggerPipelineCommand, get_connector_modified_files, get_modified_connectors_and_files
from rich.table import Table
from rich.text import Text

Expand Down Expand Up @@ -87,45 +87,26 @@ def connectors(

ctx.ensure_object(dict)
ctx.obj["use_remote_secrets"] = use_remote_secrets
ctx.obj["connector_names"] = names
ctx.obj["connector_languages"] = languages
ctx.obj["release_states"] = release_stages
ctx.obj["modified"] = modified
Comment on lines -90 to -93
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after this change we won't be setting this fields - just confirming, do we need or reference this context anywhere else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think they're used elsewhere. Will confirm by running a pre-release. I'm not against re-adding these if needed but ATM I believe keeping ctx.obj with used fields is better for readability / debugging.

ctx.obj["selected_connectors_names"] = set(names) if names else set()
ctx.obj["selected_connectors_languages"] = set(languages) if languages else set()
ctx.obj["selected_connectors_release_stages"] = set(release_stages) if release_stages else set()
ctx.obj["select_modified_connectors"] = modified
ctx.obj["concurrency"] = concurrency
ctx.obj["execute_timeout"] = execute_timeout

all_connectors = get_all_released_connectors()

# We get the modified connectors and downstream connector deps, and files
modified_connectors_and_files = get_modified_connectors(ctx.obj["modified_files"])

# We select all connectors by default
# and attach modified files to them
selected_connectors_and_files = {connector: modified_connectors_and_files.get(connector, []) for connector in all_connectors}

if modified:
selected_connectors_and_files = modified_connectors_and_files
if names:
selected_connectors_and_files = {
connector: selected_connectors_and_files[connector]
for connector in selected_connectors_and_files
if connector.technical_name in names
}
if languages:
selected_connectors_and_files = {
connector: selected_connectors_and_files[connector]
for connector in selected_connectors_and_files
if connector.language in languages
}
if release_stages:
selected_connectors_and_files = {
connector: selected_connectors_and_files[connector]
for connector in selected_connectors_and_files
if connector.release_stage in release_stages
}

ctx.obj["selected_connectors_and_files"] = selected_connectors_and_files
ctx.obj["selected_connectors_names"] = [c.technical_name for c in selected_connectors_and_files.keys()]
ctx.obj["all_connectors"] = get_all_released_connectors()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the get_all_released_connectors() name is a bit confusing. Reading the name and seeing released I would expect this to only have things that are published, but I think it returns all connectors.

What does released refer to here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Let me find a better name.

ctx.obj["selected_connectors_by_names"] = {Connector(technical_name=name) for name in names}
ctx.obj["selected_connectors_by_languages"] = {connector for connector in ctx.obj["all_connectors"] if connector.language in languages}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should selected_connectors_by_names also be using ctx.obj["all_connectors"] like *_by_languages and *_by_release_stages is doing, rather than building a new Connector with only the technical name?

ctx.obj["selected_connectors_by_release_stages"] = {
connector for connector in ctx.obj["all_connectors"] if connector.release_stage in release_stages
}
ctx.obj["selected_connectors"] = (
ctx.obj["selected_connectors_by_names"]
| ctx.obj["selected_connectors_by_languages"]
| ctx.obj["selected_connectors_by_release_stages"]
)
ctx.obj["selected_connectors_and_files"] = {
connector: get_connector_modified_files(connector, ctx.obj["modified_files"]) for connector in ctx.obj["selected_connectors"]
}


@connectors.command(cls=DaggerPipelineCommand, help="Test all the selected connectors.")
Expand All @@ -142,7 +123,13 @@ def test(
main_logger.info("Skipping connectors tests for draft pull request.")
sys.exit(0)

main_logger.info(f"Will run the test pipeline for the following connectors: {', '.join(ctx.obj['selected_connectors_names'])}")
if ctx.obj["select_modified_connectors"]:
ctx.obj["selected_connectors_and_files"] = {
**ctx.obj["selected_connectors_and_files"],
**get_modified_connectors_and_files(ctx.obj["modified_files"]),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand what's going on here with the **?

I think this means "merge ctx.obj["selected_connectors_and_files"] with get_modified_connectors_and_files(ctx.obj["modified_files"]), but shouldn't select_modified_connectors only have the result of get_modified_connectors_and_files?

It seems like this is adding what you've selected with --name or other args with the modified connectors. But I would expect the --name, --language, etc to filter the modified connectors. At least that was the way it was working before and is how I stopped java connectors from publishing here #28344

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep ** expands a dictionnary, in this context it indeeds merge two dictonaries into one.
And yes you're right it slightly changes the logic. I'll try to revert to the original.
The expected logic is:

  • Combination of same options like --name, --language etc. act as OR
  • Combination of different options are AND --language=python --language=java --release-stage=generally_available should only test Python and java GA connectors.

I'll make sure to add a test for this.

Copy link
Contributor

@pedroslopez pedroslopez Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alafanechere I think we're still or-ing the modified filter with the other filters. Why does --modified not work like the others?

For example --modified --language=java would select only the modified java connectors. Maybe I'm not reading this right...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pedroslopez You're right again 😮‍💨 I originally thought it was OK to or the modified connectors but for the publish use case is was cool to just bypass specific connector language.
I changed the code structure again to centralize the logic even more and added thorough unit tests.

main_logger.info(ctx.obj)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is logging the whole ctx.obj intentional? Asking since it can include secrets

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for spotting this. A debugging leftover...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we still need to remove this guy! 😅

log_selected_connectors(ctx.obj["selected_connectors_and_files"])
if ctx.obj["selected_connectors_and_files"]:
update_global_commit_status_check_for_tests(ctx.obj, "pending")
else:
Expand Down Expand Up @@ -198,7 +185,14 @@ def send_commit_status_check() -> None:
@connectors.command(cls=DaggerPipelineCommand, help="Build all images for the selected connectors.")
@click.pass_context
def build(ctx: click.Context) -> bool:
main_logger.info(f"Will build the following connectors: {', '.join(ctx.obj['selected_connectors_names'])}.")
"""Runs a build pipeline for the selected connectors."""
if ctx.obj["select_modified_connectors"]:
ctx.obj["selected_connectors_and_files"] = {
**ctx.obj["selected_connectors_and_files"],
**get_modified_connectors_and_files(ctx.obj["modified_files"]),
}

log_selected_connectors(ctx.obj["selected_connectors_and_files"])
connectors_contexts = [
ConnectorContext(
pipeline_name=f"Build connector {connector.technical_name}",
Expand Down Expand Up @@ -305,17 +299,19 @@ def publish(
ctx.obj["spec_cache_bucket_name"] = spec_cache_bucket_name
ctx.obj["metadata_service_bucket_name"] = metadata_service_bucket_name
ctx.obj["metadata_service_gcs_credentials"] = metadata_service_gcs_credentials

if ctx.obj["is_local"]:
click.confirm(
"Publishing from a local environment is not recommend and requires to be logged in Airbyte's DockerHub registry, do you want to continue?",
"Publishing from a local environment is not recommended and requires to be logged in Airbyte's DockerHub registry, do you want to continue?",
abort=True,
)

selected_connectors_and_files = ctx.obj["selected_connectors_and_files"]
selected_connectors_names = ctx.obj["selected_connectors_names"]
if ctx.obj["select_modified_connectors"]:
ctx.obj["selected_connectors_and_files"] = {
**ctx.obj["selected_connectors_and_files"],
**get_modified_connectors_and_files(ctx.obj["modified_files"], metadata_modification_only=True, dependency_scanning=False),
}

main_logger.info(f"Will publish the following connectors: {', '.join(selected_connectors_names)}")
log_selected_connectors(ctx.obj["selected_connectors_and_files"])
publish_connector_contexts = reorder_contexts(
[
PublishConnectorContext(
Expand All @@ -342,7 +338,7 @@ def publish(
ci_gcs_credentials=ctx.obj["ci_gcs_credentials"],
pull_request=ctx.obj.get("pull_request"),
)
for connector, modified_files in selected_connectors_and_files.items()
for connector, modified_files in ctx.obj["selected_connectors_and_files"].items()
]
)

Expand Down Expand Up @@ -394,23 +390,18 @@ def list(
return True


@connectors.command(cls=DaggerPipelineCommand, help="Autoformat connector code.")
@connectors.command(name="format", cls=DaggerPipelineCommand, help="Autoformat connector code.")
@click.pass_context
def format(ctx: click.Context) -> bool:
if ctx.obj["modified"]:
# We only want to format the connector that with modified files on the current branch.
connectors_and_files_to_format = [
(connector, modified_files) for connector, modified_files in ctx.obj["selected_connectors_and_files"].items() if modified_files
]
else:
# We explicitly want to format specific connectors
connectors_and_files_to_format = [
(connector, modified_files) for connector, modified_files in ctx.obj["selected_connectors_and_files"].items()
]
def format_code(ctx: click.Context) -> bool:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small renaming, format is a reserved function name in python.

if ctx.obj["select_modified_connectors"]:
ctx.obj["selected_connectors_and_files"] = {
**ctx.obj["selected_connectors_and_files"],
**get_modified_connectors_and_files(ctx.obj["modified_files"]),
}

if connectors_and_files_to_format:
if ctx.obj["selected_connectors_and_files"]:
main_logger.info(
f"Will format the following connectors: {', '.join([connector.technical_name for connector, _ in connectors_and_files_to_format])}."
f"Will format the following connectors: {', '.join([connector.technical_name for connector, _ in ctx.obj['selected_connectors_and_files'].keys()])}."
)
else:
main_logger.info("No connectors to format.")
Expand All @@ -435,7 +426,7 @@ def format(ctx: click.Context) -> bool:
pull_request=ctx.obj.get("pull_request"),
should_save_report=False,
)
for connector, modified_files in connectors_and_files_to_format
for connector, modified_files in ctx.obj["selected_connectors_and_files"].items()
]

anyio.run(
Expand All @@ -449,3 +440,11 @@ def format(ctx: click.Context) -> bool:
)

return True


def log_selected_connectors(selected_connectors_and_files: dict[Connector, Set[Union[str, Path]]]) -> None:
if selected_connectors_and_files:
selected_connectors_names = [c.technical_name for c in selected_connectors_and_files.keys()]
main_logger.info(f"Will run on the following connectors: {', '.join(selected_connectors_names)}.")
else:
main_logger.info("No connectors to run.")
60 changes: 34 additions & 26 deletions airbyte-ci/connectors/pipelines/pipelines/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,35 @@
import re
import sys
import unicodedata

from glob import glob
from io import TextIOWrapper
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Set, Tuple, Union
from io import TextIOWrapper

import anyio
import asyncer
import click
import git
from pipelines import consts, main_logger, sentry_utils
from pipelines.consts import GCS_PUBLIC_DOMAIN
from connector_ops.utils import get_all_released_connectors, get_changed_connectors
from dagger import Client, Config, Connection, Container, DaggerError, ExecError, File, ImageLayerCompression, QueryError, Secret
from google.cloud import storage
from google.oauth2 import service_account
from more_itertools import chunked
from pipelines import consts, main_logger, sentry_utils
from pipelines.consts import GCS_PUBLIC_DOMAIN

if TYPE_CHECKING:
from pipelines.contexts import ConnectorContext
from connector_ops.utils import Connector
from github import PullRequest
from pipelines.contexts import ConnectorContext

DAGGER_CONFIG = Config(log_output=sys.stderr)
AIRBYTE_REPO_URL = "https://github.com/airbytehq/airbyte.git"
METADATA_FILE_NAME = "metadata.yaml"
METADATA_ICON_FILE_NAME = "icon.svg"
DIFF_FILTER = "MADRT" # Modified, Added, Deleted, Renamed, Type changed
IGNORED_FILE_EXTENSIONS = [".md"]
ALL_CONNECTOR_DEPENDENCIES = [(connector, connector.get_local_dependency_paths()) for connector in get_all_released_connectors()]


# This utils will probably be redundant once https://github.com/dagger/dagger/issues/3764 is implemented
Expand Down Expand Up @@ -323,47 +324,54 @@ def _file_path_starts_with(given_file_path: Path, starts_with_path: Path) -> boo
return given_file_path_parts[: len(starts_with_path_parts)] == starts_with_path_parts


def _find_modified_connectors(file: Union[str, Path], all_dependencies: list) -> dict:
"""Find all connectors whose dependencies were modified."""
def _find_modified_connectors(file_path: Union[str, Path], dependency_scanning: bool = True) -> dict:
"""Find all connectors impacted by the file change."""
modified_connectors = {}
for connector, connector_dependencies in all_dependencies:
for connector_dependency in connector_dependencies:
file_path = Path(file)

if _file_path_starts_with(file_path, connector_dependency):
# Add the connector to the modified connectors
modified_connectors.setdefault(connector, [])
connector_directory_path = Path(connector.code_directory)

# If the file is in the connector directory, add it to the modified files
if _file_path_starts_with(file_path, connector_directory_path):
modified_connectors[connector].append(file)
else:
main_logger.info(f"Adding connector '{connector}' due to dependency modification: '{file}'.")
for connector, connector_dependencies in ALL_CONNECTOR_DEPENDENCIES:
if Path(file_path).is_relative_to(Path(connector.code_directory)):
main_logger.info(f"Adding connector '{connector}' due to connector file modification: {file_path}.")
modified_connectors.setdefault(connector, [])
modified_connectors[connector].append(file_path)

if dependency_scanning:
for connector_dependency in connector_dependencies:
if Path(file_path).is_relative_to(Path(connector_dependency)):
# Add the connector to the modified connectors
modified_connectors.setdefault(connector, [])
main_logger.info(f"Adding connector '{connector}' due to dependency modification: '{file_path}'.")

return modified_connectors


def get_modified_connectors(modified_files: Set[Union[str, Path]]) -> dict:
def get_modified_connectors_and_files(
modified_files: Set[Union[str, Path]], metadata_modification_only: bool = False, dependency_scanning: bool = True
) -> dict[Connector, Set[Union[str, Path]]]:
"""Create a mapping of modified connectors (key) and modified files (value).
As we call connector.get_local_dependencies_paths() any modification to a dependency will trigger connector pipeline for all connectors that depend on it.
The get_local_dependencies_paths function currently computes dependencies for Java connectors only.
It's especially useful to trigger tests of strict-encrypt variant when a change is made to the base connector.
Or to tests all jdbc connectors when a change is made to source-jdbc or base-java.
We'll consider extending the dependency resolution to Python connectors once we confirm that it's needed and feasible in term of scale.
"""
all_connector_dependencies = [(connector, connector.get_local_dependency_paths()) for connector in get_all_released_connectors()]

# Ignore files with certain extensions
modified_files = [file for file in modified_files if not _is_ignored_file(file)]

if metadata_modification_only:
modified_files = get_modified_metadata_files(modified_files)
main_logger.info(f"Modified metadata files: {modified_files}")
modified_connectors = {}
for modified_file in modified_files:
modified_connectors.update(_find_modified_connectors(modified_file, all_connector_dependencies))

modified_connectors.update(_find_modified_connectors(modified_file, dependency_scanning))
return modified_connectors


def get_connector_modified_files(connector: Connector, all_modified_files: Set[Union[str, Path]]) -> Set[Union[str, Path]]:
for modified_file in all_modified_files:
modified_file_path = Path(modified_file)
if modified_file_path.is_relative_to(connector.code_directory):
yield modified_file


def get_modified_metadata_files(modified_files: Set[Union[str, Path]]) -> Set[Path]:
return {
Path(str(f))
Expand Down
Loading