Skip to content

Commit 1143a6e

Browse files
committed
Update slack to use resource and bot
1 parent 08e7404 commit 1143a6e

File tree

8 files changed

+65
-16
lines changed

8 files changed

+65
-16
lines changed

airbyte-ci/connectors/metadata_service/orchestrator/.env.template

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@ GITHUB_METADATA_SERVICE_TOKEN=""
44
NIGHTLY_REPORT_SLACK_WEBHOOK_URL=""
55
# METADATA_CDN_BASE_URL="https://connectors.airbyte.com/files"
66
DOCKER_HUB_USERNAME=""
7-
DOCKER_HUB_PASSWORD=""
7+
DOCKER_HUB_PASSWORD=""
8+
SLACK_TOKEN = ""
9+
PUBLISH_UPDATE_CHANNEL="#ben-test"

airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/__init__.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4-
from dagster import Definitions, ScheduleDefinition, load_assets_from_modules
4+
from dagster import Definitions, ScheduleDefinition, EnvVar, load_assets_from_modules
5+
from dagster_slack import SlackResource
56

67
from orchestrator.resources.gcp import gcp_gcs_client, gcs_directory_blobs, gcs_file_blob, gcs_file_manager
78
from orchestrator.resources.github import github_client, github_connector_repo, github_connectors_directory, github_workflow_runs
@@ -54,6 +55,10 @@
5455
]
5556
)
5657

58+
SLACK_RESOURCE_TREE = {
59+
"slack": SlackResource(token=EnvVar("SLACK_TOKEN")),
60+
}
61+
5762
GITHUB_RESOURCE_TREE = {
5863
"github_client": github_client.configured({"github_token": {"env": "GITHUB_METADATA_SERVICE_TOKEN"}}),
5964
"github_connector_repo": github_connector_repo.configured({"connector_repo_name": CONNECTOR_REPO_NAME}),
@@ -79,6 +84,7 @@
7984
}
8085

8186
METADATA_RESOURCE_TREE = {
87+
**SLACK_RESOURCE_TREE,
8288
**GCS_RESOURCE_TREE,
8389
"all_metadata_file_blobs": gcs_directory_blobs.configured(
8490
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": METADATA_FOLDER, "match_regex": f".*/{METADATA_FILE_NAME}$"}
@@ -89,6 +95,7 @@
8995
}
9096

9197
REGISTRY_RESOURCE_TREE = {
98+
**SLACK_RESOURCE_TREE,
9299
**GCS_RESOURCE_TREE,
93100
"latest_oss_registry_gcs_blob": gcs_file_blob.configured(
94101
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": REGISTRIES_FOLDER, "gcs_filename": "oss_registry.json"}
@@ -99,6 +106,7 @@
99106
}
100107

101108
REGISTRY_ENTRY_RESOURCE_TREE = {
109+
**SLACK_RESOURCE_TREE,
102110
**GCS_RESOURCE_TREE,
103111
"latest_cloud_registry_entries_file_blobs": gcs_directory_blobs.configured(
104112
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": METADATA_FOLDER, "match_regex": f".*latest/cloud.json$"}

airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def generate_and_persist_registry(
9191
# Registry Generation
9292

9393

94-
@asset(required_resource_keys={"registry_directory_manager", "latest_oss_registry_entries_file_blobs"}, group_name=GROUP_NAME)
94+
@asset(required_resource_keys={"slack", "registry_directory_manager", "latest_oss_registry_entries_file_blobs"}, group_name=GROUP_NAME)
9595
def persisted_oss_registry(context: OpExecutionContext) -> Output[ConnectorRegistryV0]:
9696
"""
9797
This asset is used to generate the oss registry from the registry entries.
@@ -108,7 +108,7 @@ def persisted_oss_registry(context: OpExecutionContext) -> Output[ConnectorRegis
108108
)
109109

110110

111-
@asset(required_resource_keys={"registry_directory_manager", "latest_cloud_registry_entries_file_blobs"}, group_name=GROUP_NAME)
111+
@asset(required_resource_keys={"slack", "registry_directory_manager", "latest_cloud_registry_entries_file_blobs"}, group_name=GROUP_NAME)
112112
def persisted_cloud_registry(context: OpExecutionContext) -> Output[ConnectorRegistryV0]:
113113
"""
114114
This asset is used to generate the cloud registry from the registry entries.

airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry_entry.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ def safe_parse_metadata_definition(metadata_blob: storage.Blob) -> Optional[Meta
305305

306306

307307
@asset(
308-
required_resource_keys={"all_metadata_file_blobs"},
308+
required_resource_keys={"slack", "all_metadata_file_blobs"},
309309
group_name=GROUP_NAME,
310310
partitions_def=metadata_partitions_def,
311311
output_required=False,
@@ -377,7 +377,7 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
377377

378378

379379
@asset(
380-
required_resource_keys={"root_metadata_directory_manager"},
380+
required_resource_keys={"slack", "root_metadata_directory_manager"},
381381
group_name=GROUP_NAME,
382382
partitions_def=metadata_partitions_def,
383383
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),

airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/logging/publish_connector_lifecycle.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from enum import Enum
44
from dagster import OpExecutionContext
5-
from orchestrator.ops.slack import send_slack_webhook
5+
from orchestrator.ops.slack import send_slack_message
66

77

88
class StageStatus(str, Enum):
@@ -36,6 +36,14 @@ def __str__(self) -> str:
3636

3737

3838
class PublishConnectorLifecycle:
39+
"""
40+
This class is used to log the lifecycle of a publishing a connector to the registries.
41+
42+
It is used to log to the logger and slack (if enabled).
43+
44+
This is nessesary as this lifecycle is not a single job, asset, resource, schedule, or sensor.
45+
"""
46+
3947
@staticmethod
4048
def stage_to_log_level(stage_status: StageStatus) -> str:
4149
if stage_status == StageStatus.FAILED:
@@ -60,8 +68,7 @@ def log(context: OpExecutionContext, lifecycle_stage: PublishConnectorLifecycleS
6068
level = PublishConnectorLifecycle.stage_to_log_level(stage_status)
6169
log_method = getattr(context.log, level)
6270
log_method(message)
63-
64-
slack_webhook_url = os.getenv("PUBLISH_UPDATE_SLACK_WEBHOOK_URL")
65-
if slack_webhook_url:
71+
channel = os.getenv("PUBLISH_UPDATE_CHANNEL")
72+
if channel:
6673
slack_message = f"🤖 {message}"
67-
send_slack_webhook(slack_webhook_url, slack_message)
74+
send_slack_message(context, channel, slack_message)

airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/ops/slack.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1+
import os
2+
3+
from dagster import op, OpExecutionContext
14
from slack_sdk import WebhookClient
5+
from dagster_slack import SlackResource
26

37

48
def chunk_messages(report):
@@ -12,10 +16,22 @@ def chunk_messages(report):
1216
yield msg
1317

1418

15-
def send_slack_webhook(webhook_url, report, wrap_in_code_block=False):
19+
@op
20+
def send_slack_webhook(webhook_url, report):
1621
webhook = WebhookClient(webhook_url)
1722
for msg in chunk_messages(report):
1823
# Wrap in code block as slack does not support markdown in webhooks
19-
if wrap_in_code_block:
20-
msg = f"```{msg}```"
21-
webhook.send(msg)
24+
webhook.send(f"```{msg}```")
25+
26+
27+
def send_slack_message(context: OpExecutionContext, channel: str, message: str):
28+
"""
29+
Send a slack message to the given channel.
30+
31+
Args:
32+
context (OpExecutionContext): The execution context.
33+
channel (str): The channel to send the message to.
34+
message (str): The message to send.
35+
"""
36+
if os.getenv("SLACK_TOKEN"):
37+
context.resources.slack.get_client().chat_postMessage(channel=channel, text=message)

airbyte-ci/connectors/metadata_service/orchestrator/poetry.lock

Lines changed: 16 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

airbyte-ci/connectors/metadata_service/orchestrator/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ poetry2setup = "^1.1.0"
2626
slack-sdk = "^3.21.3"
2727
poetry = "^1.5.1"
2828
pydantic = "^1.10.6"
29+
dagster-slack = "^0.20.2"
2930

3031

3132
[tool.poetry.group.dev.dependencies]

0 commit comments

Comments
 (0)