Skip to content

Commit a42532a

Browse files
authored
bug(registry): fix auto materialize (#38094)
1 parent 9d95dc0 commit a42532a

File tree

2 files changed

+35
-5
lines changed

2 files changed

+35
-5
lines changed

airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/assets/registry_entry.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ def safe_get_commit_sha(metadata_dict: Union[dict, BaseModel]) -> Optional[str]:
464464
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
465465
)
466466
@sentry.instrument_asset_op
467-
def metadata_entry(context: OpExecutionContext, airbyte_slack_users: pd.DataFrame) -> Output[Optional[LatestMetadataEntry]]:
467+
def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadataEntry]]:
468468
"""Parse and compute the LatestMetadataEntry for the given metadata file."""
469469
etag = context.partition_key
470470
context.log.info(f"Processing metadata file with etag {etag}")
@@ -475,6 +475,8 @@ def metadata_entry(context: OpExecutionContext, airbyte_slack_users: pd.DataFram
475475
if not matching_blob:
476476
raise Exception(f"Could not find blob with etag {etag}")
477477

478+
airbyte_slack_users = HACKS.get_airbyte_slack_users_from_graph(context)
479+
478480
metadata_dict = yaml_blob_to_dict(matching_blob)
479481
user_identifier = safe_get_slack_user_identifier(airbyte_slack_users, metadata_dict)
480482
commit_sha = safe_get_commit_sha(metadata_dict)
@@ -548,16 +550,16 @@ def metadata_entry(context: OpExecutionContext, airbyte_slack_users: pd.DataFram
548550
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
549551
)
550552
@sentry.instrument_asset_op
551-
def registry_entry(
552-
context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry], airbyte_slack_users: pd.DataFrame
553-
) -> Output[Optional[dict]]:
553+
def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry]) -> Output[Optional[dict]]:
554554
"""
555555
Generate the registry entry files from the given metadata file, and persist it to GCS.
556556
"""
557557
if not metadata_entry:
558558
# if the metadata entry is invalid, return an empty dict
559559
return Output(metadata={"empty_metadata": True}, value=None)
560560

561+
airbyte_slack_users = HACKS.get_airbyte_slack_users_from_graph(context)
562+
561563
user_identifier = safe_get_slack_user_identifier(airbyte_slack_users, metadata_entry.metadata_definition)
562564
commit_sha = safe_get_commit_sha(metadata_entry.metadata_definition)
563565

airbyte-ci/connectors/metadata_service/orchestrator/orchestrator/hacks.py

+29-1
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5-
from typing import Union
5+
from typing import Optional, Union
66

7+
import pandas as pd
8+
from dagster import OpExecutionContext
79
from metadata_service.constants import METADATA_FILE_NAME
810
from metadata_service.gcs_upload import get_metadata_remote_file_path
911
from metadata_service.models.generated.ConnectorRegistryDestinationDefinition import ConnectorRegistryDestinationDefinition
@@ -109,3 +111,29 @@ def sanitize_docker_repo_name_for_dependency_file(docker_repo_name: str) -> str:
109111
"""
110112

111113
return docker_repo_name.replace("airbyte/", "")
114+
115+
116+
def get_airbyte_slack_users_from_graph(context: OpExecutionContext) -> Optional[pd.DataFrame]:
117+
"""
118+
Get the airbyte slack users from the graph.
119+
120+
Important: Directly relates to the airbyte_slack_users asset. Requires the asset to be materialized in the graph.
121+
122+
Problem:
123+
I guess having dynamic partitioned assets that automatically materialize depending on another asset is a bit too much to ask for.
124+
125+
Solution:
126+
Just get the asset from the graph, but dont declare it as a dependency.
127+
128+
Context:
129+
https://airbytehq-team.slack.com/archives/C048P9GADFW/p1715276222825929
130+
"""
131+
try:
132+
from orchestrator import defn
133+
134+
airbyte_slack_users = defn.load_asset_value("airbyte_slack_users", instance=context.instance)
135+
context.log.info(f"Got airbyte slack users from graph: {airbyte_slack_users}")
136+
return airbyte_slack_users
137+
except Exception as e:
138+
context.log.error(f"Failed to get airbyte slack users from graph: {e}")
139+
return None

0 commit comments

Comments
 (0)