-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Notify commit owner via slack message #37803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,21 +10,22 @@ | |
import orchestrator.hacks as HACKS | ||
import pandas as pd | ||
import sentry_sdk | ||
import yaml | ||
from dagster import AutoMaterializePolicy, DynamicPartitionsDefinition, MetadataValue, OpExecutionContext, Output, asset | ||
from dagster_gcp.gcs.file_manager import GCSFileHandle, GCSFileManager | ||
from google.cloud import storage | ||
from metadata_service.constants import ICON_FILE_NAME, METADATA_FILE_NAME | ||
from metadata_service.models.generated.ConnectorRegistryDestinationDefinition import ConnectorRegistryDestinationDefinition | ||
from metadata_service.models.generated.ConnectorRegistrySourceDefinition import ConnectorRegistrySourceDefinition | ||
from metadata_service.models.transform import to_json_sanitized_dict | ||
from metadata_service.spec_cache import SpecCache | ||
from orchestrator.config import MAX_METADATA_PARTITION_RUN_REQUEST, VALID_REGISTRIES, get_public_url_for_gcs_file | ||
from orchestrator.logging import sentry | ||
from orchestrator.logging.publish_connector_lifecycle import PublishConnectorLifecycle, PublishConnectorLifecycleStage, StageStatus | ||
from orchestrator.models.metadata import LatestMetadataEntry, MetadataDefinition | ||
from orchestrator.utils.blob_helpers import yaml_blob_to_dict | ||
from orchestrator.utils.dagster_helpers import OutputDataFrame | ||
from orchestrator.utils.object_helpers import deep_copy_params | ||
from pydantic import ValidationError | ||
from pydantic import BaseModel, ValidationError | ||
from pydash.objects import get | ||
|
||
PolymorphicRegistryEntry = Union[ConnectorRegistrySourceDefinition, ConnectorRegistryDestinationDefinition] | ||
|
@@ -332,25 +333,73 @@ def delete_registry_entry(registry_name, metadata_entry: LatestMetadataEntry, me | |
|
||
|
||
@sentry_sdk.trace | ||
def safe_parse_metadata_definition(metadata_blob: storage.Blob) -> Optional[MetadataDefinition]: | ||
def safe_parse_metadata_definition(file_name: str, metadata_dict: dict) -> Optional[MetadataDefinition]: | ||
""" | ||
Safely parse the metadata definition from the given metadata entry. | ||
Handles the case where the metadata definition is invalid for in old versions of the metadata. | ||
""" | ||
yaml_string = metadata_blob.download_as_string().decode("utf-8") | ||
metadata_dict = yaml.safe_load(yaml_string) | ||
|
||
try: | ||
return MetadataDefinition.parse_obj(metadata_dict) | ||
|
||
except ValidationError as e: | ||
# only raise the error if "latest" is in the path | ||
if "latest" in metadata_blob.name: | ||
if "latest" in file_name: | ||
raise e | ||
else: | ||
print(f"WARNING: Could not parse metadata definition for {metadata_blob.name}. Error: {e}") | ||
print(f"WARNING: Could not parse metadata definition for {file_name}. Error: {e}") | ||
return None | ||
|
||
|
||
def safe_get_slack_user_identifier(airbyte_slack_users: pd.DataFrame, metadata_dict: Union[dict, BaseModel]) -> Optional[str]: | ||
""" | ||
Safely get the slack user identifier from the given git info in the metadata file. | ||
""" | ||
if isinstance(metadata_dict, BaseModel): | ||
metadata_dict = to_json_sanitized_dict(metadata_dict) | ||
|
||
# if the slack users is empty or none, return none | ||
if airbyte_slack_users is None or airbyte_slack_users.empty: | ||
return None | ||
|
||
commit_author = get(metadata_dict, "data.generated.git.commit_author") | ||
commit_author_email = get(metadata_dict, "data.generated.git.commit_author_email") | ||
|
||
# if the commit author email is not present, return author name or none | ||
if not commit_author_email: | ||
return commit_author | ||
|
||
# if the commit author email is present, try to find the user in the slack users dataframe | ||
# if the user is not found, return the author name or none | ||
slack_user = airbyte_slack_users[airbyte_slack_users["email"] == commit_author_email] | ||
if slack_user.empty: | ||
slack_user = airbyte_slack_users[airbyte_slack_users["real_name"] == commit_author] | ||
|
||
if slack_user.empty: | ||
return commit_author | ||
|
||
# if the user is found, return the slack real_name and id e.g. "John Doe (U12345678)" | ||
slack_id = slack_user["id"].iloc[0] | ||
slack_real_name = slack_user["real_name"].iloc[0] | ||
return f"{slack_real_name} (<@{slack_id}>)" | ||
|
||
|
||
def safe_get_commit_sha(metadata_dict: Union[dict, BaseModel]) -> Optional[str]: | ||
""" | ||
Safely get the git commit sha from the given git info in the metadata file. | ||
""" | ||
if isinstance(metadata_dict, BaseModel): | ||
metadata_dict = to_json_sanitized_dict(metadata_dict) | ||
|
||
# if the git commit sha is not present, return none | ||
commit_sha = get(metadata_dict, "data.generated.git.commit_sha") | ||
if not commit_sha: | ||
return None | ||
|
||
# if the git commit sha is present, return the commit sha | ||
return commit_sha | ||
|
||
|
||
# ASSETS | ||
|
||
|
||
|
@@ -362,7 +411,7 @@ def safe_parse_metadata_definition(metadata_blob: storage.Blob) -> Optional[Meta | |
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST), | ||
) | ||
@sentry.instrument_asset_op | ||
def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadataEntry]]: | ||
def metadata_entry(context: OpExecutionContext, airbyte_slack_users: pd.DataFrame) -> Output[Optional[LatestMetadataEntry]]: | ||
"""Parse and compute the LatestMetadataEntry for the given metadata file.""" | ||
etag = context.partition_key | ||
context.log.info(f"Processing metadata file with etag {etag}") | ||
|
@@ -373,16 +422,22 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat | |
if not matching_blob: | ||
raise Exception(f"Could not find blob with etag {etag}") | ||
|
||
metadata_dict = yaml_blob_to_dict(matching_blob) | ||
user_identifier = safe_get_slack_user_identifier(airbyte_slack_users, metadata_dict) | ||
commit_sha = safe_get_commit_sha(metadata_dict) | ||
|
||
metadata_file_path = matching_blob.name | ||
PublishConnectorLifecycle.log( | ||
context, | ||
PublishConnectorLifecycleStage.METADATA_VALIDATION, | ||
StageStatus.IN_PROGRESS, | ||
f"Found metadata file with path {metadata_file_path} for etag {etag}", | ||
user_identifier=user_identifier, | ||
commit_sha=commit_sha, | ||
) | ||
|
||
# read the matching_blob into a metadata definition | ||
metadata_def = safe_parse_metadata_definition(matching_blob) | ||
metadata_def = safe_parse_metadata_definition(matching_blob.name, metadata_dict) | ||
|
||
dagster_metadata = { | ||
"bucket_name": matching_blob.bucket.name, | ||
|
@@ -398,6 +453,8 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat | |
PublishConnectorLifecycleStage.METADATA_VALIDATION, | ||
StageStatus.FAILED, | ||
f"Could not parse metadata definition for {metadata_file_path}, dont panic, this can be expected for old metadata files", | ||
user_identifier=user_identifier, | ||
commit_sha=commit_sha, | ||
) | ||
return Output(value=None, metadata=dagster_metadata) | ||
|
||
|
@@ -422,6 +479,8 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat | |
PublishConnectorLifecycleStage.METADATA_VALIDATION, | ||
StageStatus.SUCCESS, | ||
f"Successfully parsed metadata definition for {metadata_file_path}", | ||
user_identifier=user_identifier, | ||
commit_sha=commit_sha, | ||
) | ||
|
||
return Output(value=metadata_entry, metadata=dagster_metadata) | ||
|
@@ -434,19 +493,26 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat | |
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST), | ||
) | ||
@sentry.instrument_asset_op | ||
def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry]) -> Output[Optional[dict]]: | ||
def registry_entry( | ||
context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry], airbyte_slack_users: pd.DataFrame | ||
) -> Output[Optional[dict]]: | ||
""" | ||
Generate the registry entry files from the given metadata file, and persist it to GCS. | ||
""" | ||
if not metadata_entry: | ||
# if the metadata entry is invalid, return an empty dict | ||
return Output(metadata={"empty_metadata": True}, value=None) | ||
|
||
user_identifier = safe_get_slack_user_identifier(airbyte_slack_users, metadata_entry.metadata_definition) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (I'm not 💯 sure We'll publish contributors email (both airbyters and external contributors) to our registry, shall we be concerned about exposing these? Git info are public but not sure that as a contributor I'd expect my email to be publicaly available in our online registry. |
||
commit_sha = safe_get_commit_sha(metadata_entry.metadata_definition) | ||
|
||
PublishConnectorLifecycle.log( | ||
context, | ||
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION, | ||
StageStatus.IN_PROGRESS, | ||
f"Generating registry entry for {metadata_entry.file_path}", | ||
user_identifier=user_identifier, | ||
commit_sha=commit_sha, | ||
) | ||
|
||
spec_cache = SpecCache() | ||
|
@@ -488,7 +554,9 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM | |
context, | ||
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION, | ||
StageStatus.SUCCESS, | ||
f"Successfully generated {registry_name} registry entry for {metadata_entry.file_path} at {registry_url}", | ||
f"Successfully generated {registry_name} registry entry for {metadata_entry.file_path} at {registry_url}.\n\n*This new Connector will be available for use in the platform on the next release (1-3 min)*", | ||
user_identifier=user_identifier, | ||
commit_sha=commit_sha, | ||
) | ||
|
||
# Log the registry entries that were deleted | ||
|
@@ -498,6 +566,8 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM | |
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION, | ||
StageStatus.SUCCESS, | ||
f"Successfully deleted {registry_name} registry entry for {metadata_entry.file_path}", | ||
user_identifier=user_identifier, | ||
commit_sha=commit_sha, | ||
) | ||
|
||
return Output(metadata=dagster_metadata, value=persisted_registry_entries) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
# | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
import os | ||
|
||
import pandas as pd | ||
from dagster import AutoMaterializePolicy, FreshnessPolicy, OpExecutionContext, Output, asset | ||
from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe | ||
|
||
GROUP_NAME = "slack" | ||
|
||
USER_REQUEST_CHUNK_SIZE = 2000 | ||
MAX_REQUESTS = 5 | ||
|
||
|
||
@asset( | ||
group_name=GROUP_NAME, | ||
required_resource_keys={"slack"}, | ||
auto_materialize_policy=AutoMaterializePolicy.eager(), | ||
freshness_policy=FreshnessPolicy(maximum_lag_minutes=60 * 12), | ||
) | ||
def airbyte_slack_users(context: OpExecutionContext) -> OutputDataFrame: | ||
""" | ||
Return a list of all users in the airbyte slack. | ||
""" | ||
if not os.getenv("SLACK_TOKEN"): | ||
context.log.info("Skipping Slack Users asset as SLACK_TOKEN is not set") | ||
return None | ||
|
||
client = context.resources.slack.get_client() | ||
users_response = client.users_list(limit=2000) | ||
metadata = users_response.data["response_metadata"] | ||
users = users_response.data["members"] | ||
requests_count = 1 | ||
|
||
while metadata["next_cursor"] and requests_count < MAX_REQUESTS: | ||
users_response = client.users_list(limit=2000, cursor=metadata["next_cursor"]) | ||
metadata = users_response.data["response_metadata"] | ||
users.extend(users_response.data["members"]) | ||
requests_count += 1 | ||
|
||
# Convert to a dataframe of id, real_name, and email | ||
# Remove any deleted or bot profiles | ||
users_df = pd.DataFrame(users) | ||
users_df = users_df[users_df["deleted"] == False] | ||
users_df = users_df[users_df["is_bot"] == False] | ||
users_df["email"] = users_df["profile"].apply(lambda x: x.get("email", None)) | ||
users_df = users_df[["id", "real_name", "email"]] | ||
|
||
return output_dataframe(users_df) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
|
||
import yaml | ||
from google.cloud import storage | ||
|
||
|
||
def yaml_blob_to_dict(yaml_blob: storage.Blob) -> dict: | ||
""" | ||
Convert the given yaml blob to a dictionary. | ||
""" | ||
yaml_string = yaml_blob.download_as_string().decode("utf-8") | ||
return yaml.safe_load(yaml_string) |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type hints on the
log
method are not accurate, they should bestr | None
instead ofstr
for these two parameters.