10
10
import orchestrator .hacks as HACKS
11
11
import pandas as pd
12
12
import sentry_sdk
13
- import yaml
14
13
from dagster import AutoMaterializePolicy , DynamicPartitionsDefinition , MetadataValue , OpExecutionContext , Output , asset
15
14
from dagster_gcp .gcs .file_manager import GCSFileHandle , GCSFileManager
16
15
from google .cloud import storage
17
16
from metadata_service .constants import ICON_FILE_NAME , METADATA_FILE_NAME
18
17
from metadata_service .models .generated .ConnectorRegistryDestinationDefinition import ConnectorRegistryDestinationDefinition
19
18
from metadata_service .models .generated .ConnectorRegistrySourceDefinition import ConnectorRegistrySourceDefinition
19
+ from metadata_service .models .transform import to_json_sanitized_dict
20
20
from metadata_service .spec_cache import SpecCache
21
21
from orchestrator .config import MAX_METADATA_PARTITION_RUN_REQUEST , VALID_REGISTRIES , get_public_url_for_gcs_file
22
22
from orchestrator .logging import sentry
23
23
from orchestrator .logging .publish_connector_lifecycle import PublishConnectorLifecycle , PublishConnectorLifecycleStage , StageStatus
24
24
from orchestrator .models .metadata import LatestMetadataEntry , MetadataDefinition
25
+ from orchestrator .utils .blob_helpers import yaml_blob_to_dict
25
26
from orchestrator .utils .dagster_helpers import OutputDataFrame
26
27
from orchestrator .utils .object_helpers import deep_copy_params
27
- from pydantic import ValidationError
28
+ from pydantic import BaseModel , ValidationError
28
29
from pydash .objects import get
29
30
30
31
PolymorphicRegistryEntry = Union [ConnectorRegistrySourceDefinition , ConnectorRegistryDestinationDefinition ]
@@ -332,25 +333,73 @@ def delete_registry_entry(registry_name, metadata_entry: LatestMetadataEntry, me
332
333
333
334
334
335
@sentry_sdk .trace
335
- def safe_parse_metadata_definition (metadata_blob : storage . Blob ) -> Optional [MetadataDefinition ]:
336
+ def safe_parse_metadata_definition (file_name : str , metadata_dict : dict ) -> Optional [MetadataDefinition ]:
336
337
"""
337
338
Safely parse the metadata definition from the given metadata entry.
338
339
Handles the case where the metadata definition is invalid for in old versions of the metadata.
339
340
"""
340
- yaml_string = metadata_blob .download_as_string ().decode ("utf-8" )
341
- metadata_dict = yaml .safe_load (yaml_string )
341
+
342
342
try :
343
343
return MetadataDefinition .parse_obj (metadata_dict )
344
344
345
345
except ValidationError as e :
346
346
# only raise the error if "latest" is in the path
347
- if "latest" in metadata_blob . name :
347
+ if "latest" in file_name :
348
348
raise e
349
349
else :
350
- print (f"WARNING: Could not parse metadata definition for { metadata_blob . name } . Error: { e } " )
350
+ print (f"WARNING: Could not parse metadata definition for { file_name } . Error: { e } " )
351
351
return None
352
352
353
353
354
+ def safe_get_slack_user_identifier (airbyte_slack_users : pd .DataFrame , metadata_dict : Union [dict , BaseModel ]) -> Optional [str ]:
355
+ """
356
+ Safely get the slack user identifier from the given git info in the metadata file.
357
+ """
358
+ if isinstance (metadata_dict , BaseModel ):
359
+ metadata_dict = to_json_sanitized_dict (metadata_dict )
360
+
361
+ # if the slack users is empty or none, return none
362
+ if airbyte_slack_users is None or airbyte_slack_users .empty :
363
+ return None
364
+
365
+ commit_author = get (metadata_dict , "data.generated.git.commit_author" )
366
+ commit_author_email = get (metadata_dict , "data.generated.git.commit_author_email" )
367
+
368
+ # if the commit author email is not present, return author name or none
369
+ if not commit_author_email :
370
+ return commit_author
371
+
372
+ # if the commit author email is present, try to find the user in the slack users dataframe
373
+ # if the user is not found, return the author name or none
374
+ slack_user = airbyte_slack_users [airbyte_slack_users ["email" ] == commit_author_email ]
375
+ if slack_user .empty :
376
+ slack_user = airbyte_slack_users [airbyte_slack_users ["real_name" ] == commit_author ]
377
+
378
+ if slack_user .empty :
379
+ return commit_author
380
+
381
+ # if the user is found, return the slack real_name and id e.g. "John Doe (U12345678)"
382
+ slack_id = slack_user ["id" ].iloc [0 ]
383
+ slack_real_name = slack_user ["real_name" ].iloc [0 ]
384
+ return f"{ slack_real_name } (<@{ slack_id } >)"
385
+
386
+
387
+ def safe_get_commit_sha (metadata_dict : Union [dict , BaseModel ]) -> Optional [str ]:
388
+ """
389
+ Safely get the git commit sha from the given git info in the metadata file.
390
+ """
391
+ if isinstance (metadata_dict , BaseModel ):
392
+ metadata_dict = to_json_sanitized_dict (metadata_dict )
393
+
394
+ # if the git commit sha is not present, return none
395
+ commit_sha = get (metadata_dict , "data.generated.git.commit_sha" )
396
+ if not commit_sha :
397
+ return None
398
+
399
+ # if the git commit sha is present, return the commit sha
400
+ return commit_sha
401
+
402
+
354
403
# ASSETS
355
404
356
405
@@ -362,7 +411,7 @@ def safe_parse_metadata_definition(metadata_blob: storage.Blob) -> Optional[Meta
362
411
auto_materialize_policy = AutoMaterializePolicy .eager (max_materializations_per_minute = MAX_METADATA_PARTITION_RUN_REQUEST ),
363
412
)
364
413
@sentry .instrument_asset_op
365
- def metadata_entry (context : OpExecutionContext ) -> Output [Optional [LatestMetadataEntry ]]:
414
+ def metadata_entry (context : OpExecutionContext , airbyte_slack_users : pd . DataFrame ) -> Output [Optional [LatestMetadataEntry ]]:
366
415
"""Parse and compute the LatestMetadataEntry for the given metadata file."""
367
416
etag = context .partition_key
368
417
context .log .info (f"Processing metadata file with etag { etag } " )
@@ -373,16 +422,22 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
373
422
if not matching_blob :
374
423
raise Exception (f"Could not find blob with etag { etag } " )
375
424
425
+ metadata_dict = yaml_blob_to_dict (matching_blob )
426
+ user_identifier = safe_get_slack_user_identifier (airbyte_slack_users , metadata_dict )
427
+ commit_sha = safe_get_commit_sha (metadata_dict )
428
+
376
429
metadata_file_path = matching_blob .name
377
430
PublishConnectorLifecycle .log (
378
431
context ,
379
432
PublishConnectorLifecycleStage .METADATA_VALIDATION ,
380
433
StageStatus .IN_PROGRESS ,
381
434
f"Found metadata file with path { metadata_file_path } for etag { etag } " ,
435
+ user_identifier = user_identifier ,
436
+ commit_sha = commit_sha ,
382
437
)
383
438
384
439
# read the matching_blob into a metadata definition
385
- metadata_def = safe_parse_metadata_definition (matching_blob )
440
+ metadata_def = safe_parse_metadata_definition (matching_blob . name , metadata_dict )
386
441
387
442
dagster_metadata = {
388
443
"bucket_name" : matching_blob .bucket .name ,
@@ -398,6 +453,8 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
398
453
PublishConnectorLifecycleStage .METADATA_VALIDATION ,
399
454
StageStatus .FAILED ,
400
455
f"Could not parse metadata definition for { metadata_file_path } , dont panic, this can be expected for old metadata files" ,
456
+ user_identifier = user_identifier ,
457
+ commit_sha = commit_sha ,
401
458
)
402
459
return Output (value = None , metadata = dagster_metadata )
403
460
@@ -422,6 +479,8 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
422
479
PublishConnectorLifecycleStage .METADATA_VALIDATION ,
423
480
StageStatus .SUCCESS ,
424
481
f"Successfully parsed metadata definition for { metadata_file_path } " ,
482
+ user_identifier = user_identifier ,
483
+ commit_sha = commit_sha ,
425
484
)
426
485
427
486
return Output (value = metadata_entry , metadata = dagster_metadata )
@@ -434,19 +493,26 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
434
493
auto_materialize_policy = AutoMaterializePolicy .eager (max_materializations_per_minute = MAX_METADATA_PARTITION_RUN_REQUEST ),
435
494
)
436
495
@sentry .instrument_asset_op
437
- def registry_entry (context : OpExecutionContext , metadata_entry : Optional [LatestMetadataEntry ]) -> Output [Optional [dict ]]:
496
+ def registry_entry (
497
+ context : OpExecutionContext , metadata_entry : Optional [LatestMetadataEntry ], airbyte_slack_users : pd .DataFrame
498
+ ) -> Output [Optional [dict ]]:
438
499
"""
439
500
Generate the registry entry files from the given metadata file, and persist it to GCS.
440
501
"""
441
502
if not metadata_entry :
442
503
# if the metadata entry is invalid, return an empty dict
443
504
return Output (metadata = {"empty_metadata" : True }, value = None )
444
505
506
+ user_identifier = safe_get_slack_user_identifier (airbyte_slack_users , metadata_entry .metadata_definition )
507
+ commit_sha = safe_get_commit_sha (metadata_entry .metadata_definition )
508
+
445
509
PublishConnectorLifecycle .log (
446
510
context ,
447
511
PublishConnectorLifecycleStage .REGISTRY_ENTRY_GENERATION ,
448
512
StageStatus .IN_PROGRESS ,
449
513
f"Generating registry entry for { metadata_entry .file_path } " ,
514
+ user_identifier = user_identifier ,
515
+ commit_sha = commit_sha ,
450
516
)
451
517
452
518
spec_cache = SpecCache ()
@@ -488,7 +554,9 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM
488
554
context ,
489
555
PublishConnectorLifecycleStage .REGISTRY_ENTRY_GENERATION ,
490
556
StageStatus .SUCCESS ,
491
- f"Successfully generated { registry_name } registry entry for { metadata_entry .file_path } at { registry_url } " ,
557
+ f"Successfully generated { registry_name } registry entry for { metadata_entry .file_path } at { registry_url } .\n \n *This new Connector will be available for use in the platform on the next release (1-3 min)*" ,
558
+ user_identifier = user_identifier ,
559
+ commit_sha = commit_sha ,
492
560
)
493
561
494
562
# Log the registry entries that were deleted
@@ -498,6 +566,8 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM
498
566
PublishConnectorLifecycleStage .REGISTRY_ENTRY_GENERATION ,
499
567
StageStatus .SUCCESS ,
500
568
f"Successfully deleted { registry_name } registry entry for { metadata_entry .file_path } " ,
569
+ user_identifier = user_identifier ,
570
+ commit_sha = commit_sha ,
501
571
)
502
572
503
573
return Output (metadata = dagster_metadata , value = persisted_registry_entries )
0 commit comments