1
1
#
2
- # Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2
+ # Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3
3
#
4
4
5
5
6
- import os
7
6
import logging
8
- from pathlib import Path
7
+ import shutil
9
8
import subprocess
10
- from typing import Optional
9
+ import tempfile
10
+ from pathlib import Path
11
+ from typing import Iterable , Optional
11
12
12
13
import git
14
+ import requests
13
15
14
- from .models import ConnectorQAReport
15
16
from .constants import (
16
- AIRBYTE_CLOUD_GITHUB_REPO_URL ,
17
- AIRBYTE_CLOUD_MAIN_BRANCH_NAME
17
+ AIRBYTE_PLATFORM_INTERNAL_GITHUB_REPO_URL ,
18
+ AIRBYTE_PLATFORM_INTERNAL_MAIN_BRANCH_NAME ,
19
+ AIRBYTE_PLATFORM_INTERNAL_PR_ENDPOINT ,
20
+ AIRBYTE_PLATFORM_INTERNAL_REPO_OWNER ,
21
+ GITHUB_API_COMMON_HEADERS ,
18
22
)
23
+ from .models import ConnectorQAReport
19
24
20
25
logger = logging .getLogger (__name__ )
21
- logger .setLevel (logging .INFO )
22
26
23
27
24
28
def clone_airbyte_cloud_repo (local_repo_path : Path ) -> git .Repo :
25
- logging .info (f"Cloning { AIRBYTE_CLOUD_GITHUB_REPO_URL } to { local_repo_path } " )
26
- return git .Repo .clone_from (AIRBYTE_CLOUD_GITHUB_REPO_URL , local_repo_path , branch = AIRBYTE_CLOUD_MAIN_BRANCH_NAME )
29
+ logger .info (f"Cloning { AIRBYTE_PLATFORM_INTERNAL_GITHUB_REPO_URL } to { local_repo_path } " )
30
+ return git .Repo .clone_from (
31
+ AIRBYTE_PLATFORM_INTERNAL_GITHUB_REPO_URL , local_repo_path , branch = AIRBYTE_PLATFORM_INTERNAL_MAIN_BRANCH_NAME
32
+ )
33
+
27
34
28
35
def get_definitions_mask_path (local_repo_path , definition_type : str ) -> Path :
29
- definitions_mask_path = local_repo_path / f"cloud-config/cloud-config-seed/src/main/resources/seed/{ definition_type } _definitions_mask.yaml"
36
+ definitions_mask_path = (
37
+ local_repo_path / f"cloud-config/cloud-config-seed/src/main/resources/seed/{ definition_type } _definitions_mask.yaml"
38
+ )
30
39
if not definitions_mask_path .exists ():
31
40
raise FileNotFoundError (f"Can't find the { definition_type } definitions mask" )
32
41
return definitions_mask_path
33
42
43
+
34
44
def checkout_new_branch (airbyte_cloud_repo : git .Repo , new_branch_name : str ) -> git .Head :
35
45
new_branch = airbyte_cloud_repo .create_head (new_branch_name )
36
46
new_branch .checkout ()
37
- logging .info (f"Checked out branch { new_branch_name } ." )
47
+ logger .info (f"Checked out branch { new_branch_name } ." )
38
48
return new_branch
39
49
50
+
40
51
def update_definitions_mask (connector : ConnectorQAReport , definitions_mask_path : Path ) -> Optional [Path ]:
41
52
with open (definitions_mask_path , "r" ) as definition_mask :
42
53
connector_already_in_mask = connector .connector_definition_id in definition_mask .read ()
43
54
if connector_already_in_mask :
44
- logging .warning (f"{ connector .connector_name } 's definition id is already in { definitions_mask_path } ." )
55
+ logger .warning (f"{ connector .connector_name } 's definition id is already in { definitions_mask_path } ." )
45
56
return None
46
57
47
58
to_append = f"""# { connector .connector_name } (from cloud availability updater)
@@ -50,31 +61,64 @@ def update_definitions_mask(connector: ConnectorQAReport, definitions_mask_path:
50
61
51
62
with open (definitions_mask_path , "a" ) as f :
52
63
f .write (to_append )
53
- logging .info (f"Updated { definitions_mask_path } with { connector .connector_name } 's definition id." )
64
+ logger .info (f"Updated { definitions_mask_path } with { connector .connector_name } 's definition id." )
54
65
return definitions_mask_path
55
66
67
+
56
68
def run_generate_cloud_connector_catalog (airbyte_cloud_repo_path : Path ) -> str :
57
69
result = subprocess .check_output (
58
- f"cd { airbyte_cloud_repo_path } && ./gradlew :cloud-config:cloud-config-seed:generateCloudConnectorCatalog" ,
59
- shell = True
60
- )
61
- logging .info ("Ran generateCloudConnectorCatalog Gradle Task" )
70
+ f"cd { airbyte_cloud_repo_path } && ./gradlew :cloud-config:cloud-config-seed:generateCloudConnectorCatalog" , shell = True
71
+ )
72
+ logger .info ("Ran generateCloudConnectorCatalog Gradle Task" )
62
73
return result .decode ()
63
74
75
+
64
76
def commit_all_files (airbyte_cloud_repo : git .Repo , commit_message : str ):
65
- airbyte_cloud_repo .git .add (' --all' )
77
+ airbyte_cloud_repo .git .add (" --all" )
66
78
airbyte_cloud_repo .git .commit (m = commit_message )
67
- logging .info (f"Committed file changes." )
79
+ logger .info ("Committed file changes." )
80
+
68
81
69
- def push_branch (airbyte_cloud_repo : git .Repo , branch :str ):
70
- airbyte_cloud_repo .git .push ("--set-upstream" , "origin" , branch )
71
- logging .info (f"Pushed branch { branch } to origin" )
82
+ def push_branch (airbyte_cloud_repo : git .Repo , branch : str ):
83
+ airbyte_cloud_repo .git .push ("--force" , "-- set-upstream" , "origin" , branch )
84
+ logger .info (f"Pushed branch { branch } to origin" )
72
85
73
- def deploy_new_connector_to_cloud_repo (
74
- airbyte_cloud_repo_path : Path ,
75
- airbyte_cloud_repo : git .Repo ,
76
- connector : ConnectorQAReport
77
- ):
86
+
87
+ def pr_already_created_for_branch (head_branch : str ) -> bool :
88
+ response = requests .get (
89
+ AIRBYTE_PLATFORM_INTERNAL_PR_ENDPOINT ,
90
+ headers = GITHUB_API_COMMON_HEADERS ,
91
+ params = {"head" : f"{ AIRBYTE_PLATFORM_INTERNAL_REPO_OWNER } :{ head_branch } " , "state" : "open" },
92
+ )
93
+ response .raise_for_status ()
94
+ return len (response .json ()) > 0
95
+
96
+
97
+ def create_pr (connector : ConnectorQAReport , branch : str ) -> Optional [requests .Response ]:
98
+ body = f"""The Cloud Availability Updater decided that it's the right time to make { connector .connector_name } available on Cloud!
99
+ - Technical name: { connector .connector_technical_name }
100
+ - Version: { connector .connector_version }
101
+ - Definition ID: { connector .connector_definition_id }
102
+ - OSS sync success rate: { connector .sync_success_rate }
103
+ - OSS number of connections: { connector .number_of_connections }
104
+ """
105
+ data = {
106
+ "title" : f"🤖 Add { connector .connector_technical_name } to cloud" ,
107
+ "body" : body ,
108
+ "head" : branch ,
109
+ "base" : AIRBYTE_PLATFORM_INTERNAL_MAIN_BRANCH_NAME ,
110
+ }
111
+ if not pr_already_created_for_branch (branch ):
112
+ response = requests .post (AIRBYTE_PLATFORM_INTERNAL_PR_ENDPOINT , headers = GITHUB_API_COMMON_HEADERS , json = data )
113
+ response .raise_for_status ()
114
+ pr_url = response .json ().get ("url" )
115
+ logger .info (f"A PR was opened for { connector .connector_technical_name } : { pr_url } " )
116
+ return response
117
+ else :
118
+ logger .warning (f"A PR already exists for branch { branch } " )
119
+
120
+
121
+ def deploy_new_connector_to_cloud_repo (airbyte_cloud_repo_path : Path , airbyte_cloud_repo : git .Repo , connector : ConnectorQAReport ):
78
122
"""Updates the local definitions mask on Airbyte cloud repo.
79
123
Calls the generateCloudConnectorCatalog gradle task.
80
124
Commits these changes on a new branch.
@@ -85,15 +129,22 @@ def deploy_new_connector_to_cloud_repo(
85
129
airbyte_cloud_repo (git.Repo): The Airbyte Cloud repo instance.
86
130
connector (ConnectorQAReport): The connector to add to a definitions mask.
87
131
"""
88
- airbyte_cloud_repo .git .checkout (AIRBYTE_CLOUD_MAIN_BRANCH_NAME )
132
+ airbyte_cloud_repo .git .checkout (AIRBYTE_PLATFORM_INTERNAL_MAIN_BRANCH_NAME )
89
133
new_branch_name = f"cloud-availability-updater/deploy-{ connector .connector_technical_name } "
90
134
checkout_new_branch (airbyte_cloud_repo , new_branch_name )
91
135
definitions_mask_path = get_definitions_mask_path (airbyte_cloud_repo_path , connector .connector_type )
92
- update_definitions_mask (connector , definitions_mask_path )
93
- run_generate_cloud_connector_catalog (airbyte_cloud_repo_path )
94
- commit_all_files (
95
- airbyte_cloud_repo ,
96
- f"🤖 Add { connector .connector_name } connector to cloud"
97
- )
98
- push_branch (airbyte_cloud_repo , new_branch_name )
99
- airbyte_cloud_repo .git .checkout (AIRBYTE_CLOUD_MAIN_BRANCH_NAME )
136
+ updated_files = update_definitions_mask (connector , definitions_mask_path )
137
+ if updated_files :
138
+ run_generate_cloud_connector_catalog (airbyte_cloud_repo_path )
139
+ commit_all_files (airbyte_cloud_repo , f"🤖 Add { connector .connector_name } connector to cloud" )
140
+ push_branch (airbyte_cloud_repo , new_branch_name )
141
+ create_pr (connector , new_branch_name )
142
+ airbyte_cloud_repo .git .checkout (AIRBYTE_PLATFORM_INTERNAL_MAIN_BRANCH_NAME )
143
+
144
+
145
+ def deploy_eligible_connectors_to_cloud_repo (eligible_connectors : Iterable ):
146
+ cloud_repo_path = Path (tempfile .mkdtemp ())
147
+ airbyte_cloud_repo = clone_airbyte_cloud_repo (cloud_repo_path )
148
+ for connector in eligible_connectors :
149
+ deploy_new_connector_to_cloud_repo (cloud_repo_path , airbyte_cloud_repo , connector )
150
+ shutil .rmtree (cloud_repo_path )
0 commit comments