Skip to content

Add Slack Alert lifecycle to Dagster for Metadata publish #28759

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ GITHUB_METADATA_SERVICE_TOKEN=""
NIGHTLY_REPORT_SLACK_WEBHOOK_URL=""
# METADATA_CDN_BASE_URL="https://connectors.airbyte.com/files"
DOCKER_HUB_USERNAME=""
DOCKER_HUB_PASSWORD=""
DOCKER_HUB_PASSWORD=""
SLACK_TOKEN = ""
PUBLISH_UPDATE_CHANNEL="#ben-test"
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from dagster import Definitions, ScheduleDefinition, load_assets_from_modules
from dagster import Definitions, ScheduleDefinition, EnvVar, load_assets_from_modules
from dagster_slack import SlackResource

from orchestrator.resources.gcp import gcp_gcs_client, gcs_directory_blobs, gcs_file_blob, gcs_file_manager
from orchestrator.resources.github import github_client, github_connector_repo, github_connectors_directory, github_workflow_runs
Expand Down Expand Up @@ -54,6 +55,10 @@
]
)

SLACK_RESOURCE_TREE = {
"slack": SlackResource(token=EnvVar("SLACK_TOKEN")),
}

GITHUB_RESOURCE_TREE = {
"github_client": github_client.configured({"github_token": {"env": "GITHUB_METADATA_SERVICE_TOKEN"}}),
"github_connector_repo": github_connector_repo.configured({"connector_repo_name": CONNECTOR_REPO_NAME}),
Expand All @@ -79,6 +84,7 @@
}

METADATA_RESOURCE_TREE = {
**SLACK_RESOURCE_TREE,
**GCS_RESOURCE_TREE,
"all_metadata_file_blobs": gcs_directory_blobs.configured(
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": METADATA_FOLDER, "match_regex": f".*/{METADATA_FILE_NAME}$"}
Expand All @@ -89,6 +95,7 @@
}

REGISTRY_RESOURCE_TREE = {
**SLACK_RESOURCE_TREE,
**GCS_RESOURCE_TREE,
"latest_oss_registry_gcs_blob": gcs_file_blob.configured(
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": REGISTRIES_FOLDER, "gcs_filename": "oss_registry.json"}
Expand All @@ -99,6 +106,7 @@
}

REGISTRY_ENTRY_RESOURCE_TREE = {
**SLACK_RESOURCE_TREE,
**GCS_RESOURCE_TREE,
"latest_cloud_registry_entries_file_blobs": gcs_directory_blobs.configured(
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": METADATA_FOLDER, "match_regex": f".*latest/cloud.json$"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def generate_nightly_report(context: OpExecutionContext) -> Output[pd.DataFrame]
nightly_report_complete_md = render_connector_nightly_report_md(nightly_report_connector_matrix_df, nightly_report_complete_df)
slack_webhook_url = os.getenv("NIGHTLY_REPORT_SLACK_WEBHOOK_URL")
if slack_webhook_url:
send_slack_webhook(slack_webhook_url, nightly_report_complete_md)
send_slack_webhook(slack_webhook_url, nightly_report_complete_md, wrap_in_code_block=True)

return Output(
nightly_report_connector_matrix_df,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from metadata_service.models.generated.ConnectorRegistryV0 import ConnectorRegistryV0
from metadata_service.utils import to_json_sanitized_dict
from orchestrator.assets.registry_entry import read_registry_entry_blob
from orchestrator.logging.publish_connector_lifecycle import PublishConnectorLifecycle, PublishConnectorLifecycleStage, StageStatus

from typing import List

Expand Down Expand Up @@ -38,6 +39,7 @@ def persist_registry_to_json(


def generate_and_persist_registry(
context: OpExecutionContext,
registry_entry_file_blobs: List[storage.Blob],
registry_directory_manager: GCSFileManager,
registry_name: str,
Expand All @@ -51,6 +53,12 @@ def generate_and_persist_registry(
Returns:
Output[ConnectorRegistryV0]: The registry.
"""
PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.REGISTRY_GENERATION,
StageStatus.IN_PROGRESS,
f"Generating {registry_name} registry...",
)
registry_dict = {"sources": [], "destinations": []}
for blob in registry_entry_file_blobs:
registry_entry, connector_type = read_registry_entry_blob(blob)
Expand All @@ -70,13 +78,20 @@ def generate_and_persist_registry(
"gcs_path": MetadataValue.url(file_handle.public_url),
}

PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.REGISTRY_GENERATION,
StageStatus.SUCCESS,
f"New {registry_name} registry available at {file_handle.public_url}",
)

return Output(metadata=metadata, value=registry_model)


# Registry Generation


@asset(required_resource_keys={"registry_directory_manager", "latest_oss_registry_entries_file_blobs"}, group_name=GROUP_NAME)
@asset(required_resource_keys={"slack", "registry_directory_manager", "latest_oss_registry_entries_file_blobs"}, group_name=GROUP_NAME)
def persisted_oss_registry(context: OpExecutionContext) -> Output[ConnectorRegistryV0]:
"""
This asset is used to generate the oss registry from the registry entries.
Expand All @@ -86,13 +101,14 @@ def persisted_oss_registry(context: OpExecutionContext) -> Output[ConnectorRegis
latest_oss_registry_entries_file_blobs = context.resources.latest_oss_registry_entries_file_blobs

return generate_and_persist_registry(
context=context,
registry_entry_file_blobs=latest_oss_registry_entries_file_blobs,
registry_directory_manager=registry_directory_manager,
registry_name=registry_name,
)


@asset(required_resource_keys={"registry_directory_manager", "latest_cloud_registry_entries_file_blobs"}, group_name=GROUP_NAME)
@asset(required_resource_keys={"slack", "registry_directory_manager", "latest_cloud_registry_entries_file_blobs"}, group_name=GROUP_NAME)
def persisted_cloud_registry(context: OpExecutionContext) -> Output[ConnectorRegistryV0]:
"""
This asset is used to generate the cloud registry from the registry entries.
Expand All @@ -102,6 +118,7 @@ def persisted_cloud_registry(context: OpExecutionContext) -> Output[ConnectorReg
latest_cloud_registry_entries_file_blobs = context.resources.latest_cloud_registry_entries_file_blobs

return generate_and_persist_registry(
context=context,
registry_entry_file_blobs=latest_cloud_registry_entries_file_blobs,
registry_directory_manager=registry_directory_manager,
registry_name=registry_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from orchestrator.utils.dagster_helpers import OutputDataFrame
from orchestrator.models.metadata import MetadataDefinition, LatestMetadataEntry
from orchestrator.config import get_public_url_for_gcs_file, VALID_REGISTRIES, MAX_METADATA_PARTITION_RUN_REQUEST
from orchestrator.logging.publish_connector_lifecycle import PublishConnectorLifecycle, PublishConnectorLifecycleStage, StageStatus

from typing import List, Optional, Tuple, Union

Expand Down Expand Up @@ -304,7 +305,7 @@ def safe_parse_metadata_definition(metadata_blob: storage.Blob) -> Optional[Meta


@asset(
required_resource_keys={"all_metadata_file_blobs"},
required_resource_keys={"slack", "all_metadata_file_blobs"},
group_name=GROUP_NAME,
partitions_def=metadata_partitions_def,
output_required=False,
Expand All @@ -322,7 +323,12 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
raise Exception(f"Could not find blob with etag {etag}")

metadata_file_path = matching_blob.name
context.log.info(f"Found metadata file with path {metadata_file_path} for etag {etag}")
PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.METADATA_VALIDATION,
StageStatus.IN_PROGRESS,
f"Found metadata file with path {metadata_file_path} for etag {etag}",
)

# read the matching_blob into a metadata definition
metadata_def = safe_parse_metadata_definition(matching_blob)
Expand All @@ -336,7 +342,12 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat

# return only if the metadata definition is valid
if not metadata_def:
context.log.warn(f"Could not parse metadata definition for {metadata_file_path}")
PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.METADATA_VALIDATION,
StageStatus.FAILED,
f"Could not parse metadata definition for {metadata_file_path}, dont panic, this can be expected for old metadata files",
)
return Output(value=None, metadata=dagster_metadata)

icon_file_path = metadata_file_path.replace(METADATA_FILE_NAME, ICON_FILE_NAME)
Expand All @@ -355,11 +366,18 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
file_path=metadata_file_path,
)

PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.METADATA_VALIDATION,
StageStatus.SUCCESS,
f"Successfully parsed metadata definition for {metadata_file_path}",
)

return Output(value=metadata_entry, metadata=dagster_metadata)


@asset(
required_resource_keys={"root_metadata_directory_manager"},
required_resource_keys={"slack", "root_metadata_directory_manager"},
group_name=GROUP_NAME,
partitions_def=metadata_partitions_def,
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
Expand All @@ -372,6 +390,13 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM
# if the metadata entry is invalid, return an empty dict
return Output(metadata={"empty_metadata": True}, value=None)

PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION,
StageStatus.IN_PROGRESS,
f"Generating registry entry for {metadata_entry.file_path}",
)

cached_specs = pd.DataFrame(list_cached_specs())

root_metadata_directory_manager = context.resources.root_metadata_directory_manager
Expand Down Expand Up @@ -400,4 +425,22 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM
**dagster_metadata_delete,
}

# Log the registry entries that were created
for registry_name, registry_url in persisted_registry_entries.items():
PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION,
StageStatus.SUCCESS,
f"Successfully generated {registry_name} registry entry for {metadata_entry.file_path} at {registry_url}",
)

# Log the registry entries that were deleted
for registry_name, registry_url in deleted_registry_entries.items():
PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION,
StageStatus.SUCCESS,
f"Successfully deleted {registry_name} registry entry for {metadata_entry.file_path}",
)

return Output(metadata=dagster_metadata, value=persisted_registry_entries)
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import os

from enum import Enum
from dagster import OpExecutionContext
from orchestrator.ops.slack import send_slack_message


class StageStatus(str, Enum):
IN_PROGRESS = "in_progress"
SUCCESS = "success"
FAILED = "failed"

def __str__(self) -> str:
# convert to upper case
return self.value.replace("_", " ").upper()

def to_emoji(self) -> str:
if self == StageStatus.IN_PROGRESS:
return "🟡"
elif self == StageStatus.SUCCESS:
return "🟢"
elif self == StageStatus.FAILED:
return "🔴"
else:
return ""


class PublishConnectorLifecycleStage(str, Enum):
METADATA_VALIDATION = "metadata_validation"
REGISTRY_ENTRY_GENERATION = "registry_entry_generation"
REGISTRY_GENERATION = "registry_generation"

def __str__(self) -> str:
# convert to title case
return self.value.replace("_", " ").title()


class PublishConnectorLifecycle:
"""
This class is used to log the lifecycle of a publishing a connector to the registries.

It is used to log to the logger and slack (if enabled).

This is nessesary as this lifecycle is not a single job, asset, resource, schedule, or sensor.
"""

@staticmethod
def stage_to_log_level(stage_status: StageStatus) -> str:
if stage_status == StageStatus.FAILED:
return "error"
else:
return "info"

@staticmethod
def create_log_message(
lifecycle_stage: PublishConnectorLifecycleStage,
stage_status: StageStatus,
message: str,
) -> str:
emoji = stage_status.to_emoji()
return f"*{emoji} _{lifecycle_stage}_ {stage_status}*: {message}"

@staticmethod
def log(context: OpExecutionContext, lifecycle_stage: PublishConnectorLifecycleStage, stage_status: StageStatus, message: str):
"""Publish a connector notification log to logger and slack (if enabled)."""
message = PublishConnectorLifecycle.create_log_message(lifecycle_stage, stage_status, message)

level = PublishConnectorLifecycle.stage_to_log_level(stage_status)
log_method = getattr(context.log, level)
log_method(message)
channel = os.getenv("PUBLISH_UPDATE_CHANNEL")
if channel:
slack_message = f"🤖 {message}"
send_slack_message(context, channel, slack_message)
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from dagster import op
import os

from dagster import op, OpExecutionContext
from slack_sdk import WebhookClient
from dagster_slack import SlackResource


def chunk_messages(report):
Expand All @@ -18,4 +21,17 @@ def send_slack_webhook(webhook_url, report):
webhook = WebhookClient(webhook_url)
for msg in chunk_messages(report):
# Wrap in code block as slack does not support markdown in webhooks
webhook.send(text=f"```{msg}```")
webhook.send(f"```{msg}```")


def send_slack_message(context: OpExecutionContext, channel: str, message: str):
"""
Send a slack message to the given channel.

Args:
context (OpExecutionContext): The execution context.
channel (str): The channel to send the message to.
message (str): The message to send.
"""
if os.getenv("SLACK_TOKEN"):
context.resources.slack.get_client().chat_postMessage(channel=channel, text=message)
Copy link
Contributor

@erohmensing erohmensing Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used to work on a slack integration in python a long time ago and the blast from the past that is chat_postMessage hits so hard lol

Loading