Skip to content

Commit 6a6e54b

Browse files
authored
metadata-service [orchestrator]: improve stale metadata detection (#42962)
1 parent b43b111 commit 6a6e54b

File tree

4 files changed

+65
-58
lines changed

4 files changed

+65
-58
lines changed

airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@
189189
),
190190
ScheduleDefinition(job=generate_connector_test_summary_reports, cron_schedule="@hourly"),
191191
ScheduleDefinition(
192-
cron_schedule="0 8 * * *", # Daily at 8am US/Pacific
192+
cron_schedule="0 * * * *", # Every hour
193193
execution_timezone="US/Pacific",
194194
job=generate_stale_gcs_latest_metadata_file,
195195
),

airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/github.py

+62-55
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,17 @@
77
import hashlib
88
import os
99

10-
import dateutil
11-
import humanize
1210
import pandas as pd
11+
import yaml
1312
from dagster import OpExecutionContext, Output, asset
1413
from github import Repository
1514
from orchestrator.logging import sentry
15+
from orchestrator.models.metadata import LatestMetadataEntry, MetadataDefinition, PartialMetadataDefinition
1616
from orchestrator.ops.slack import send_slack_message
1717
from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe
1818

1919
GROUP_NAME = "github"
20+
TOOLING_TEAM_SLACK_TEAM_ID = "S077R8636CV"
2021

2122

2223
def _get_md5_of_github_file(context: OpExecutionContext, github_connector_repo: Repository, path: str) -> str:
@@ -34,6 +35,11 @@ def _get_md5_of_github_file(context: OpExecutionContext, github_connector_repo:
3435
return base_64_value
3536

3637

38+
def _get_content_of_github_file(context: OpExecutionContext, github_connector_repo: Repository, path: str) -> str:
39+
context.log.debug(f"retrieving contents of {path}")
40+
return github_connector_repo.get_contents(path)
41+
42+
3743
@asset(required_resource_keys={"github_connectors_directory"}, group_name=GROUP_NAME)
3844
@sentry.instrument_asset_op
3945
def github_connector_folders(context):
@@ -65,75 +71,76 @@ def github_metadata_file_md5s(context):
6571
return Output(metadata_file_paths, metadata={"preview": metadata_file_paths})
6672

6773

68-
def _should_publish_have_ran(datetime_string: str) -> bool:
69-
"""
70-
Return true if the datetime is 2 hours old.
71-
72-
"""
73-
dt = dateutil.parser.parse(datetime_string)
74-
now = datetime.datetime.now(datetime.timezone.utc)
75-
two_hours_ago = now - datetime.timedelta(hours=2)
76-
return dt < two_hours_ago
77-
78-
79-
def _to_time_ago(datetime_string: str) -> str:
74+
@asset(required_resource_keys={"github_connector_repo", "github_connectors_metadata_files"}, group_name=GROUP_NAME)
75+
def github_metadata_definitions(context):
8076
"""
81-
Return a string of how long ago the datetime is human readable format. 10 min
77+
Return a list of all metadata definitions hosted on our github repo
8278
"""
83-
dt = dateutil.parser.parse(datetime_string)
84-
return humanize.naturaltime(dt)
79+
github_connector_repo = context.resources.github_connector_repo
80+
github_connectors_metadata_files = context.resources.github_connectors_metadata_files
8581

82+
metadata_definitions = []
83+
for metadata_file in github_connectors_metadata_files:
84+
metadata_raw = _get_content_of_github_file(context, github_connector_repo, metadata_file["path"])
85+
metadata_dict = yaml.safe_load(metadata_raw.decoded_content)
86+
metadata_definitions.append(
87+
LatestMetadataEntry(
88+
metadata_definition=MetadataDefinition.parse_obj(metadata_dict), last_modified=metadata_file["last_modified"]
89+
)
90+
)
8691

87-
def _is_stale(github_file_info: dict, latest_gcs_metadata_md5s: dict) -> bool:
88-
"""
89-
Return true if the github info is stale.
90-
"""
91-
not_in_gcs = latest_gcs_metadata_md5s.get(github_file_info["md5"]) is None
92-
return not_in_gcs and _should_publish_have_ran(github_file_info["last_modified"])
92+
return Output(metadata_definitions, metadata={"preview": [md.json() for md in metadata_definitions]})
9393

9494

95-
@asset(required_resource_keys={"slack", "latest_metadata_file_blobs"}, group_name=GROUP_NAME)
96-
def stale_gcs_latest_metadata_file(context, github_metadata_file_md5s: dict) -> OutputDataFrame:
95+
@asset(required_resource_keys={"slack"}, group_name=GROUP_NAME)
96+
def stale_gcs_latest_metadata_file(context, github_metadata_definitions: list, metadata_definitions: list) -> OutputDataFrame:
9797
"""
9898
Return a list of all metadata files in the github repo and denote whether they are stale or not.
9999
100100
Stale means that the file in the github repo is not in the latest metadata file blobs.
101101
"""
102-
human_readable_stale_bools = {True: "🚨 YES!!!", False: "No"}
103-
latest_gcs_metadata_file_blobs = context.resources.latest_metadata_file_blobs
104-
latest_gcs_metadata_md5s = {blob.md5_hash: blob.name for blob in latest_gcs_metadata_file_blobs}
105-
106-
stale_report = [
107-
{
108-
"stale": _is_stale(github_file_info, latest_gcs_metadata_md5s),
109-
"github_path": github_path,
110-
"github_md5": github_file_info["md5"],
111-
"github_last_modified": _to_time_ago(github_file_info["last_modified"]),
112-
"gcs_md5": latest_gcs_metadata_md5s.get(github_file_info["md5"]),
113-
"gcs_path": latest_gcs_metadata_md5s.get(github_file_info["md5"]),
114-
}
115-
for github_path, github_file_info in github_metadata_file_md5s.items()
116-
]
102+
latest_versions_on_gcs = {
103+
metadata_entry.metadata_definition.data.dockerRepository: metadata_entry.metadata_definition.data.dockerImageTag
104+
for metadata_entry in metadata_definitions
105+
if metadata_entry.metadata_definition.data.supportLevel != "archived"
106+
}
107+
108+
now = datetime.datetime.now(datetime.timezone.utc)
109+
latest_versions_on_github = {
110+
metadata_entry.metadata_definition.data.dockerRepository: metadata_entry.metadata_definition.data.dockerImageTag
111+
for metadata_entry in github_metadata_definitions
112+
if metadata_entry.metadata_definition.data.supportLevel
113+
!= "archived" # We give a 2 hour grace period for the metadata to be updated
114+
and datetime.datetime.strptime(metadata_entry.last_modified, "%a, %d %b %Y %H:%M:%S %Z").replace(tzinfo=datetime.timezone.utc)
115+
> now - datetime.timedelta(hours=2)
116+
}
117117

118-
stale_metadata_files_df = pd.DataFrame(stale_report)
118+
stale_connectors = []
119+
for docker_repository, github_docker_image_tag in latest_versions_on_github.items():
120+
gcs_docker_image_tag = latest_versions_on_gcs.get(docker_repository)
121+
if gcs_docker_image_tag != github_docker_image_tag:
122+
stale_connectors.append(
123+
{"connector": docker_repository, "master_version": github_docker_image_tag, "gcs_version": gcs_docker_image_tag}
124+
)
119125

120-
# sort by stale true to false, then by github_path
121-
stale_metadata_files_df = stale_metadata_files_df.sort_values(
122-
by=["stale", "github_path"],
123-
ascending=[False, True],
124-
)
126+
stale_connectors_df = pd.DataFrame(stale_connectors)
125127

126128
# If any stale files exist, report to slack
127129
channel = os.getenv("STALE_REPORT_CHANNEL")
128-
any_stale = stale_metadata_files_df["stale"].any()
129-
if channel and any_stale:
130-
only_stale_df = stale_metadata_files_df[stale_metadata_files_df["stale"] == True]
131-
pretty_stale_df = only_stale_df.replace(human_readable_stale_bools)
132-
stale_report_md = pretty_stale_df.to_markdown(index=False)
133-
send_slack_message(context, channel, stale_report_md, enable_code_block_wrapping=True)
134-
135-
stale_metadata_files_df.replace(human_readable_stale_bools, inplace=True)
136-
return output_dataframe(stale_metadata_files_df)
130+
any_stale = len(stale_connectors_df) > 0
131+
if channel:
132+
if any_stale:
133+
stale_report_md = stale_connectors_df.to_markdown(index=False)
134+
send_slack_message(context, channel, f"🚨 Stale metadata detected! (cc. <!subteam^{TOOLING_TEAM_SLACK_TEAM_ID}>)")
135+
send_slack_message(context, channel, stale_report_md, enable_code_block_wrapping=True)
136+
else:
137+
message = f"""
138+
Analyzed {len(github_metadata_definitions)} metadata files on our master branch and {len(metadata_definitions)} latest metadata files hosted in GCS.All MD5 hashes of these files.
139+
All MD5 hashes of our metadata files on master match the latest metadata files on GCS.
140+
No stale metadata: GCS metadata are up to date with metadata hosted on GCS.
141+
"""
142+
send_slack_message(context, channel, message)
143+
return output_dataframe(stale_connectors_df)
137144

138145

139146
@asset(required_resource_keys={"github_connector_nightly_workflow_successes"}, group_name=GROUP_NAME)

airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/metadata.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ def metadata_definitions(context: OpExecutionContext) -> List[LatestMetadataEntr
130130
metadata_entry = LatestMetadataEntry(
131131
metadata_definition=metadata_def,
132132
icon_url=icon_url,
133-
last_modified=blob.last_modified,
133+
last_modified=blob.updated.isoformat(),
134134
etag=blob.etag,
135135
file_path=blob.name,
136136
bucket_name=blob.bucket.name,

airbyte-ci/connectors/metadata_service/orchestrator/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "orchestrator"
3-
version = "0.1.4"
3+
version = "0.1.5"
44
description = ""
55
authors = ["Ben Church <[email protected]>"]
66
readme = "README.md"

0 commit comments

Comments
 (0)