Skip to content

Notify commit owner via slack message #37803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dagster import Definitions, EnvVar, ScheduleDefinition, load_assets_from_modules
from dagster_slack import SlackResource
from metadata_service.constants import METADATA_FILE_NAME, METADATA_FOLDER
from orchestrator.assets import connector_test_report, github, metadata, registry, registry_entry, registry_report, specs_secrets_mask
from orchestrator.assets import connector_test_report, github, metadata, registry, registry_entry, registry_report, specs_secrets_mask, slack
from orchestrator.config import (
CI_MASTER_TEST_OUTPUT_REGEX,
CI_TEST_REPORT_PREFIX,
Expand Down Expand Up @@ -41,6 +41,7 @@

ASSETS = load_assets_from_modules(
[
slack,
github,
specs_secrets_mask,
metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,22 @@
import orchestrator.hacks as HACKS
import pandas as pd
import sentry_sdk
import yaml
from dagster import AutoMaterializePolicy, DynamicPartitionsDefinition, MetadataValue, OpExecutionContext, Output, asset
from dagster_gcp.gcs.file_manager import GCSFileHandle, GCSFileManager
from google.cloud import storage
from metadata_service.constants import ICON_FILE_NAME, METADATA_FILE_NAME
from metadata_service.models.generated.ConnectorRegistryDestinationDefinition import ConnectorRegistryDestinationDefinition
from metadata_service.models.generated.ConnectorRegistrySourceDefinition import ConnectorRegistrySourceDefinition
from metadata_service.models.transform import to_json_sanitized_dict
from metadata_service.spec_cache import SpecCache
from orchestrator.config import MAX_METADATA_PARTITION_RUN_REQUEST, VALID_REGISTRIES, get_public_url_for_gcs_file
from orchestrator.logging import sentry
from orchestrator.logging.publish_connector_lifecycle import PublishConnectorLifecycle, PublishConnectorLifecycleStage, StageStatus
from orchestrator.models.metadata import LatestMetadataEntry, MetadataDefinition
from orchestrator.utils.blob_helpers import yaml_blob_to_dict
from orchestrator.utils.dagster_helpers import OutputDataFrame
from orchestrator.utils.object_helpers import deep_copy_params
from pydantic import ValidationError
from pydantic import BaseModel, ValidationError
from pydash.objects import get

PolymorphicRegistryEntry = Union[ConnectorRegistrySourceDefinition, ConnectorRegistryDestinationDefinition]
Expand Down Expand Up @@ -332,25 +333,73 @@ def delete_registry_entry(registry_name, metadata_entry: LatestMetadataEntry, me


@sentry_sdk.trace
def safe_parse_metadata_definition(metadata_blob: storage.Blob) -> Optional[MetadataDefinition]:
def safe_parse_metadata_definition(file_name: str, metadata_dict: dict) -> Optional[MetadataDefinition]:
"""
Safely parse the metadata definition from the given metadata entry.
Handles the case where the metadata definition is invalid for in old versions of the metadata.
"""
yaml_string = metadata_blob.download_as_string().decode("utf-8")
metadata_dict = yaml.safe_load(yaml_string)

try:
return MetadataDefinition.parse_obj(metadata_dict)

except ValidationError as e:
# only raise the error if "latest" is in the path
if "latest" in metadata_blob.name:
if "latest" in file_name:
raise e
else:
print(f"WARNING: Could not parse metadata definition for {metadata_blob.name}. Error: {e}")
print(f"WARNING: Could not parse metadata definition for {file_name}. Error: {e}")
return None


def safe_get_slack_user_identifier(airbyte_slack_users: pd.DataFrame, metadata_dict: Union[dict, BaseModel]) -> Optional[str]:
"""
Safely get the slack user identifier from the given git info in the metadata file.
"""
if isinstance(metadata_dict, BaseModel):
metadata_dict = to_json_sanitized_dict(metadata_dict)

# if the slack users is empty or none, return none
if airbyte_slack_users is None or airbyte_slack_users.empty:
return None

commit_author = get(metadata_dict, "data.generated.git.commit_author")
commit_author_email = get(metadata_dict, "data.generated.git.commit_author_email")

# if the commit author email is not present, return author name or none
if not commit_author_email:
return commit_author

# if the commit author email is present, try to find the user in the slack users dataframe
# if the user is not found, return the author name or none
slack_user = airbyte_slack_users[airbyte_slack_users["email"] == commit_author_email]
if slack_user.empty:
slack_user = airbyte_slack_users[airbyte_slack_users["real_name"] == commit_author]

if slack_user.empty:
return commit_author

# if the user is found, return the slack real_name and id e.g. "John Doe (U12345678)"
slack_id = slack_user["id"].iloc[0]
slack_real_name = slack_user["real_name"].iloc[0]
return f"{slack_real_name} (<@{slack_id}>)"


def safe_get_commit_sha(metadata_dict: Union[dict, BaseModel]) -> Optional[str]:
"""
Safely get the git commit sha from the given git info in the metadata file.
"""
if isinstance(metadata_dict, BaseModel):
metadata_dict = to_json_sanitized_dict(metadata_dict)

# if the git commit sha is not present, return none
commit_sha = get(metadata_dict, "data.generated.git.commit_sha")
if not commit_sha:
return None

# if the git commit sha is present, return the commit sha
return commit_sha


# ASSETS


Expand All @@ -362,7 +411,7 @@ def safe_parse_metadata_definition(metadata_blob: storage.Blob) -> Optional[Meta
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
)
@sentry.instrument_asset_op
def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadataEntry]]:
def metadata_entry(context: OpExecutionContext, airbyte_slack_users: pd.DataFrame) -> Output[Optional[LatestMetadataEntry]]:
"""Parse and compute the LatestMetadataEntry for the given metadata file."""
etag = context.partition_key
context.log.info(f"Processing metadata file with etag {etag}")
Expand All @@ -373,16 +422,22 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
if not matching_blob:
raise Exception(f"Could not find blob with etag {etag}")

metadata_dict = yaml_blob_to_dict(matching_blob)
user_identifier = safe_get_slack_user_identifier(airbyte_slack_users, metadata_dict)
commit_sha = safe_get_commit_sha(metadata_dict)

metadata_file_path = matching_blob.name
PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.METADATA_VALIDATION,
StageStatus.IN_PROGRESS,
f"Found metadata file with path {metadata_file_path} for etag {etag}",
user_identifier=user_identifier,
commit_sha=commit_sha,
Comment on lines +435 to +436
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type hints on the log method are not accurate, they should be str | None instead of str for these two parameters.

)

# read the matching_blob into a metadata definition
metadata_def = safe_parse_metadata_definition(matching_blob)
metadata_def = safe_parse_metadata_definition(matching_blob.name, metadata_dict)

dagster_metadata = {
"bucket_name": matching_blob.bucket.name,
Expand All @@ -398,6 +453,8 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
PublishConnectorLifecycleStage.METADATA_VALIDATION,
StageStatus.FAILED,
f"Could not parse metadata definition for {metadata_file_path}, dont panic, this can be expected for old metadata files",
user_identifier=user_identifier,
commit_sha=commit_sha,
)
return Output(value=None, metadata=dagster_metadata)

Expand All @@ -422,6 +479,8 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
PublishConnectorLifecycleStage.METADATA_VALIDATION,
StageStatus.SUCCESS,
f"Successfully parsed metadata definition for {metadata_file_path}",
user_identifier=user_identifier,
commit_sha=commit_sha,
)

return Output(value=metadata_entry, metadata=dagster_metadata)
Expand All @@ -434,19 +493,26 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
)
@sentry.instrument_asset_op
def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry]) -> Output[Optional[dict]]:
def registry_entry(
context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry], airbyte_slack_users: pd.DataFrame
) -> Output[Optional[dict]]:
"""
Generate the registry entry files from the given metadata file, and persist it to GCS.
"""
if not metadata_entry:
# if the metadata entry is invalid, return an empty dict
return Output(metadata={"empty_metadata": True}, value=None)

user_identifier = safe_get_slack_user_identifier(airbyte_slack_users, metadata_entry.metadata_definition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I'm not 💯 sure user_identifier makes it way to the registry, might be just used to log, feel free to ignore my comment then)

We'll publish contributors email (both airbyters and external contributors) to our registry, shall we be concerned about exposing these? Git info are public but not sure that as a contributor I'd expect my email to be publicaly available in our online registry.

commit_sha = safe_get_commit_sha(metadata_entry.metadata_definition)

PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION,
StageStatus.IN_PROGRESS,
f"Generating registry entry for {metadata_entry.file_path}",
user_identifier=user_identifier,
commit_sha=commit_sha,
)

spec_cache = SpecCache()
Expand Down Expand Up @@ -488,7 +554,9 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM
context,
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION,
StageStatus.SUCCESS,
f"Successfully generated {registry_name} registry entry for {metadata_entry.file_path} at {registry_url}",
f"Successfully generated {registry_name} registry entry for {metadata_entry.file_path} at {registry_url}.\n\n*This new Connector will be available for use in the platform on the next release (1-3 min)*",
user_identifier=user_identifier,
commit_sha=commit_sha,
)

# Log the registry entries that were deleted
Expand All @@ -498,6 +566,8 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION,
StageStatus.SUCCESS,
f"Successfully deleted {registry_name} registry entry for {metadata_entry.file_path}",
user_identifier=user_identifier,
commit_sha=commit_sha,
)

return Output(metadata=dagster_metadata, value=persisted_registry_entries)
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import os

import pandas as pd
from dagster import AutoMaterializePolicy, FreshnessPolicy, OpExecutionContext, Output, asset
from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe

GROUP_NAME = "slack"

USER_REQUEST_CHUNK_SIZE = 2000
MAX_REQUESTS = 5


@asset(
group_name=GROUP_NAME,
required_resource_keys={"slack"},
auto_materialize_policy=AutoMaterializePolicy.eager(),
freshness_policy=FreshnessPolicy(maximum_lag_minutes=60 * 12),
)
def airbyte_slack_users(context: OpExecutionContext) -> OutputDataFrame:
"""
Return a list of all users in the airbyte slack.
"""
if not os.getenv("SLACK_TOKEN"):
context.log.info("Skipping Slack Users asset as SLACK_TOKEN is not set")
return None

client = context.resources.slack.get_client()
users_response = client.users_list(limit=2000)
metadata = users_response.data["response_metadata"]
users = users_response.data["members"]
requests_count = 1

while metadata["next_cursor"] and requests_count < MAX_REQUESTS:
users_response = client.users_list(limit=2000, cursor=metadata["next_cursor"])
metadata = users_response.data["response_metadata"]
users.extend(users_response.data["members"])
requests_count += 1

# Convert to a dataframe of id, real_name, and email
# Remove any deleted or bot profiles
users_df = pd.DataFrame(users)
users_df = users_df[users_df["deleted"] == False]
users_df = users_df[users_df["is_bot"] == False]
users_df["email"] = users_df["profile"].apply(lambda x: x.get("email", None))
users_df = users_df[["id", "real_name", "email"]]

return output_dataframe(users_df)
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,6 @@ def get_public_metadata_service_url(file_path: str) -> str:
metadata_bucket = os.getenv("METADATA_BUCKET")
metadata_cdn_url = os.getenv("METADATA_CDN_BASE_URL")
return get_public_url_for_gcs_file(metadata_bucket, file_path, metadata_cdn_url)


REPO_URL = "https://github.com/airbytehq/airbyte/"
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from enum import Enum

from dagster import OpExecutionContext
from orchestrator.config import REPO_URL
from orchestrator.ops.slack import send_slack_message


Expand Down Expand Up @@ -56,19 +57,45 @@ def stage_to_log_level(stage_status: StageStatus) -> str:
else:
return "info"

def _commit_link(commit_sha: str) -> str:
"""Create a markdown link to a commit."""
commit_url = f"{REPO_URL}/commit/{commit_sha}"
return f"\ncommit: <{commit_url}|{commit_sha}>"

def _user_mention(user_identifier: str) -> str:
"""Create a markdown link to a user."""
return f"\nauthor: {user_identifier}"

@staticmethod
def create_log_message(
lifecycle_stage: PublishConnectorLifecycleStage,
stage_status: StageStatus,
message: str,
commit_sha: str = None,
user_identifier: str = None,
) -> str:
emoji = stage_status.to_emoji()
return f"*{emoji} _{lifecycle_stage}_ {stage_status}*: {message}"
final_message = f"*{emoji} _{lifecycle_stage}_ {stage_status}*:\n{message}"

if user_identifier:
final_message += PublishConnectorLifecycle._user_mention(user_identifier)

if commit_sha:
final_message += PublishConnectorLifecycle._commit_link(commit_sha)

return final_message

@staticmethod
def log(context: OpExecutionContext, lifecycle_stage: PublishConnectorLifecycleStage, stage_status: StageStatus, message: str):
def log(
context: OpExecutionContext,
lifecycle_stage: PublishConnectorLifecycleStage,
stage_status: StageStatus,
message: str,
commit_sha: str = None,
user_identifier: str = None,
):
"""Publish a connector notification log to logger and slack (if enabled)."""
message = PublishConnectorLifecycle.create_log_message(lifecycle_stage, stage_status, message)
message = PublishConnectorLifecycle.create_log_message(lifecycle_stage, stage_status, message, commit_sha, user_identifier)

level = PublishConnectorLifecycle.stage_to_log_level(stage_status)
log_method = getattr(context.log, level)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

import yaml
from google.cloud import storage


def yaml_blob_to_dict(yaml_blob: storage.Blob) -> dict:
"""
Convert the given yaml blob to a dictionary.
"""
yaml_string = yaml_blob.download_as_string().decode("utf-8")
return yaml.safe_load(yaml_string)

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
oss_sources_dataframe,
)
from orchestrator.models.metadata import LatestMetadataEntry, MetadataDefinition
from orchestrator.utils.blob_helpers import yaml_blob_to_dict
from pydantic import ValidationError

VALID_METADATA_DICT = {
Expand Down Expand Up @@ -64,11 +65,13 @@ def test_safe_parse_metadata_definition(blob_name, blob_content, expected_result
mock_blob.name = blob_name
mock_blob.download_as_string.return_value = blob_content.encode("utf-8")

metadata_dict = yaml_blob_to_dict(mock_blob)

if expected_exception:
with pytest.raises(expected_exception):
safe_parse_metadata_definition(mock_blob)
safe_parse_metadata_definition(mock_blob.name, metadata_dict)
else:
result = safe_parse_metadata_definition(mock_blob)
result = safe_parse_metadata_definition(mock_blob.name, metadata_dict)
# assert the name is set correctly
assert result == expected_result

Expand Down