Skip to content

Commit 255a5bb

Browse files
authored
🐛 Source Github: validate input organizations and repositories (#15730)
Signed-off-by: Sergey Chvalyuk <[email protected]>
1 parent d800e3d commit 255a5bb

File tree

9 files changed

+91
-78
lines changed

9 files changed

+91
-78
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@
323323
- name: GitHub
324324
sourceDefinitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
325325
dockerRepository: airbyte/source-github
326-
dockerImageTag: 0.2.45
326+
dockerImageTag: 0.2.46
327327
documentationUrl: https://docs.airbyte.io/integrations/sources/github
328328
icon: github.svg
329329
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2701,7 +2701,7 @@
27012701
supportsNormalization: false
27022702
supportsDBT: false
27032703
supported_destination_sync_modes: []
2704-
- dockerImage: "airbyte/source-github:0.2.45"
2704+
- dockerImage: "airbyte/source-github:0.2.46"
27052705
spec:
27062706
documentationUrl: "https://docs.airbyte.com/integrations/sources/github"
27072707
connectionSpecification:

airbyte-integrations/connectors/source-github/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ RUN pip install .
1212
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1313
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1414

15-
LABEL io.airbyte.version=0.2.45
15+
LABEL io.airbyte.version=0.2.46
1616
LABEL io.airbyte.name=airbyte/source-github

airbyte-integrations/connectors/source-github/source_github/source.py

Lines changed: 40 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
#
44

55

6-
import re
76
from typing import Any, Dict, List, Mapping, Tuple
87

98
from airbyte_cdk import AirbyteLogger
@@ -50,43 +49,57 @@
5049
WorkflowRuns,
5150
Workflows,
5251
)
52+
from .utils import read_full_refresh
5353

5454
TOKEN_SEPARATOR = ","
5555
DEFAULT_PAGE_SIZE_FOR_LARGE_STREAM = 10
56-
# To scan all the repos within orgnaization, organization name could be
57-
# specified by using asteriks i.e. "airbytehq/*"
58-
ORGANIZATION_PATTERN = re.compile("^.*/\\*$")
5956

6057

6158
class SourceGithub(AbstractSource):
6259
@staticmethod
63-
def _generate_repositories(config: Mapping[str, Any], authenticator: MultipleTokenAuthenticator) -> Tuple[List[str], List[str]]:
60+
def _get_org_repositories(config: Mapping[str, Any], authenticator: MultipleTokenAuthenticator) -> Tuple[List[str], List[str]]:
6461
"""
65-
Parse repositories config line and produce two lists of repositories.
62+
Parse config.repository and produce two lists: organizations, repositories.
6663
Args:
6764
config (dict): Dict representing connector's config
6865
authenticator(MultipleTokenAuthenticator): authenticator object
69-
Returns:
70-
Tuple[List[str], List[str]]: Tuple of two lists: first representing
71-
repositories directly mentioned in config and second is
72-
organization repositories from orgs/{org}/repos request.
7366
"""
74-
repositories = list(filter(None, config["repository"].split(" ")))
75-
76-
if not repositories:
67+
config_repositories = set(filter(None, config["repository"].split(" ")))
68+
if not config_repositories:
7769
raise Exception("Field `repository` required to be provided for connect to Github API")
7870

79-
repositories_list: set = {repo for repo in repositories if not ORGANIZATION_PATTERN.match(repo)}
80-
organizations = [org.split("/")[0] for org in repositories if org not in repositories_list]
81-
organisation_repos = set()
82-
if organizations:
83-
repos = Repositories(authenticator=authenticator, organizations=organizations)
84-
for stream in repos.stream_slices(sync_mode=SyncMode.full_refresh):
85-
organisation_repos = organisation_repos.union(
86-
{r["full_name"] for r in repos.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream)}
87-
)
71+
repositories = set()
72+
organizations = set()
73+
unchecked_repos = set()
74+
unchecked_orgs = set()
75+
76+
for org_repos in config_repositories:
77+
org, _, repos = org_repos.partition("/")
78+
if repos == "*":
79+
unchecked_orgs.add(org)
80+
else:
81+
unchecked_repos.add(org_repos)
82+
83+
if unchecked_orgs:
84+
stream = Repositories(authenticator=authenticator, organizations=unchecked_orgs)
85+
for record in read_full_refresh(stream):
86+
repositories.add(record["full_name"])
87+
organizations.add(record["organization"])
88+
89+
unchecked_repos = unchecked_repos - repositories
90+
if unchecked_repos:
91+
stream = RepositoryStats(
92+
authenticator=authenticator,
93+
repositories=unchecked_repos,
94+
page_size_for_large_streams=config.get("page_size_for_large_streams", DEFAULT_PAGE_SIZE_FOR_LARGE_STREAM),
95+
)
96+
for record in read_full_refresh(stream):
97+
repositories.add(record["full_name"])
98+
organization = record.get("organization", {}).get("login")
99+
if organization:
100+
organizations.add(organization)
88101

89-
return list(repositories_list), list(organisation_repos)
102+
return list(organizations), list(repositories)
90103

91104
@staticmethod
92105
def _get_authenticator(config: Dict[str, Any]):
@@ -139,20 +152,9 @@ def _get_branches_data(selected_branches: str, full_refresh_args: Dict[str, Any]
139152
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
140153
try:
141154
authenticator = self._get_authenticator(config)
142-
# In case of getting repository list for given organization was
143-
# successfull no need of checking stats for every repository within
144-
# that organization.
145-
# Since we have "repo" scope requested it should grant access to private repos as well:
146-
# https://docs.github.com/en/developers/apps/building-oauth-apps/scopes-for-oauth-apps#available-scopes
147-
repositories, _ = self._generate_repositories(config=config, authenticator=authenticator)
148-
149-
repository_stats_stream = RepositoryStats(
150-
authenticator=authenticator,
151-
repositories=repositories,
152-
page_size_for_large_streams=config.get("page_size_for_large_streams", DEFAULT_PAGE_SIZE_FOR_LARGE_STREAM),
153-
)
154-
for stream_slice in repository_stats_stream.stream_slices(sync_mode=SyncMode.full_refresh):
155-
next(repository_stats_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice), None)
155+
_, repositories = self._get_org_repositories(config=config, authenticator=authenticator)
156+
if not repositories:
157+
return False, "no valid repositories found"
156158
return True, None
157159

158160
except Exception as e:
@@ -172,10 +174,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
172174

173175
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
174176
authenticator = self._get_authenticator(config)
175-
repos, organization_repos = self._generate_repositories(config=config, authenticator=authenticator)
176-
repositories = repos + organization_repos
177-
178-
organizations = list({org.split("/")[0] for org in repositories})
177+
organizations, repositories = self._get_org_repositories(config=config, authenticator=authenticator)
179178
page_size = config.get("page_size_for_large_streams", DEFAULT_PAGE_SIZE_FOR_LARGE_STREAM)
180179

181180
organization_args = {"authenticator": authenticator, "organizations": organizations}

airbyte-integrations/connectors/source-github/source_github/utils.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,21 @@
33
#
44

55

6+
from airbyte_cdk.models import SyncMode
7+
from airbyte_cdk.sources.streams import Stream
8+
9+
610
def getter(D: dict, key_or_keys):
711
if not isinstance(key_or_keys, list):
812
key_or_keys = [key_or_keys]
913
for k in key_or_keys:
1014
D = D[k]
1115
return D
16+
17+
18+
def read_full_refresh(stream_instance: Stream):
19+
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh)
20+
for _slice in slices:
21+
records = stream_instance.read_records(stream_slice=_slice, sync_mode=SyncMode.full_refresh)
22+
for record in records:
23+
yield record

airbyte-integrations/connectors/source-github/unit_tests/test_source.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def check_source(repo_line: str) -> AirbyteConnectionStatus:
1919

2020
@responses.activate
2121
def test_check_connection_repos_only():
22-
responses.add("GET", "https://api.github.com/repos/airbyte", json={})
22+
responses.add("GET", "https://api.github.com/repos/airbyte", json={"full_name": "airbyte"})
2323

2424
status = check_source("airbyte airbyte airbyte")
2525
assert not status.message
@@ -31,8 +31,12 @@ def test_check_connection_repos_only():
3131
@responses.activate
3232
def test_check_connection_repos_and_org_repos():
3333
repos = [{"name": f"name {i}", "full_name": f"full name {i}", "updated_at": "2020-01-01T00:00:00Z"} for i in range(1000)]
34-
responses.add("GET", "https://api.github.com/repos/airbyte/test", json={})
35-
responses.add("GET", "https://api.github.com/repos/airbyte/test2", json={})
34+
responses.add(
35+
"GET", "https://api.github.com/repos/airbyte/test", json={"full_name": "airbyte/test", "organization": {"login": "airbyte"}}
36+
)
37+
responses.add(
38+
"GET", "https://api.github.com/repos/airbyte/test2", json={"full_name": "airbyte/test2", "organization": {"login": "airbyte"}}
39+
)
3640
responses.add("GET", "https://api.github.com/orgs/airbytehq/repos", json=repos)
3741
responses.add("GET", "https://api.github.com/orgs/org/repos", json=repos)
3842

@@ -95,13 +99,19 @@ def test_get_branches_data():
9599

96100

97101
@responses.activate
98-
def test_generate_repositories():
102+
def test_get_org_repositories():
99103

100104
source = SourceGithub()
101105

102106
with pytest.raises(Exception):
103107
config = {"repository": ""}
104-
source._generate_repositories(config, authenticator=None)
108+
source._get_org_repositories(config, authenticator=None)
109+
110+
responses.add(
111+
"GET",
112+
"https://api.github.com/repos/airbytehq/integration-test",
113+
json={"full_name": "airbytehq/integration-test", "organization": {"login": "airbytehq"}},
114+
)
105115

106116
responses.add(
107117
"GET",
@@ -113,9 +123,7 @@ def test_generate_repositories():
113123
)
114124

115125
config = {"repository": "airbytehq/integration-test docker/*"}
116-
repositories_list, organisation_repos = source._generate_repositories(config, authenticator=None)
126+
organisations, repositories = source._get_org_repositories(config, authenticator=None)
117127

118-
assert repositories_list == ["airbytehq/integration-test"]
119-
assert len(organisation_repos) == 2
120-
assert "docker/compose" in organisation_repos
121-
assert "docker/docker-py" in organisation_repos
128+
assert set(repositories) == {"airbytehq/integration-test", "docker/docker-py", "docker/compose"}
129+
assert set(organisations) == {"airbytehq", "docker"}

airbyte-integrations/connectors/source-github/unit_tests/test_stream.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@
4343
Users,
4444
WorkflowRuns,
4545
)
46+
from source_github.utils import read_full_refresh
4647

47-
from .utils import ProjectsResponsesAPI, read_full_refresh, read_incremental
48+
from .utils import ProjectsResponsesAPI, read_incremental
4849

4950
DEFAULT_BACKOFF_DELAYS = [5, 10, 20, 40, 80]
5051

@@ -108,7 +109,7 @@ def request_callback(request):
108109
)
109110

110111
stream = Organizations(organizations=["airbytehq"])
111-
read_full_refresh(stream)
112+
list(read_full_refresh(stream))
112113
assert len(responses.calls) == 2
113114
assert responses.calls[0].request.url == "https://api.github.com/orgs/airbytehq?per_page=100"
114115
assert responses.calls[1].request.url == "https://api.github.com/orgs/airbytehq?per_page=100"
@@ -139,7 +140,7 @@ def test_graphql_rate_limited(time_mock, sleep_mock):
139140
)
140141

141142
stream = PullRequestStats(repositories=["airbytehq/airbyte"], page_size_for_large_streams=30)
142-
records = read_full_refresh(stream)
143+
records = list(read_full_refresh(stream))
143144
assert records == []
144145
assert len(responses.calls) == 2
145146
assert responses.calls[0].request.url == "https://api.github.com/graphql"
@@ -159,7 +160,7 @@ def test_stream_teams_404():
159160
json={"message": "Not Found", "documentation_url": "https://docs.github.com/rest/reference/teams#list-teams"},
160161
)
161162

162-
assert read_full_refresh(stream) == []
163+
assert list(read_full_refresh(stream)) == []
163164
assert len(responses.calls) == 1
164165
assert responses.calls[0].request.url == "https://api.github.com/orgs/org_name/teams?per_page=100"
165166

@@ -170,7 +171,7 @@ def test_stream_organizations_read():
170171
stream = Organizations(**organization_args)
171172
responses.add("GET", "https://api.github.com/orgs/org1", json={"id": 1})
172173
responses.add("GET", "https://api.github.com/orgs/org2", json={"id": 2})
173-
records = read_full_refresh(stream)
174+
records = list(read_full_refresh(stream))
174175
assert records == [{"id": 1}, {"id": 2}]
175176

176177

@@ -180,7 +181,7 @@ def test_stream_teams_read():
180181
stream = Teams(**organization_args)
181182
responses.add("GET", "https://api.github.com/orgs/org1/teams", json=[{"id": 1}, {"id": 2}])
182183
responses.add("GET", "https://api.github.com/orgs/org2/teams", json=[{"id": 3}])
183-
records = read_full_refresh(stream)
184+
records = list(read_full_refresh(stream))
184185
assert records == [{"id": 1, "organization": "org1"}, {"id": 2, "organization": "org1"}, {"id": 3, "organization": "org2"}]
185186
assert len(responses.calls) == 2
186187
assert responses.calls[0].request.url == "https://api.github.com/orgs/org1/teams?per_page=100"
@@ -193,7 +194,7 @@ def test_stream_users_read():
193194
stream = Users(**organization_args)
194195
responses.add("GET", "https://api.github.com/orgs/org1/members", json=[{"id": 1}, {"id": 2}])
195196
responses.add("GET", "https://api.github.com/orgs/org2/members", json=[{"id": 3}])
196-
records = read_full_refresh(stream)
197+
records = list(read_full_refresh(stream))
197198
assert records == [{"id": 1, "organization": "org1"}, {"id": 2, "organization": "org1"}, {"id": 3, "organization": "org2"}]
198199
assert len(responses.calls) == 2
199200
assert responses.calls[0].request.url == "https://api.github.com/orgs/org1/members?per_page=100"
@@ -212,7 +213,7 @@ def test_stream_repositories_404():
212213
json={"message": "Not Found", "documentation_url": "https://docs.github.com/rest/reference/repos#list-organization-repositories"},
213214
)
214215

215-
assert read_full_refresh(stream) == []
216+
assert list(read_full_refresh(stream)) == []
216217
assert len(responses.calls) == 1
217218
assert responses.calls[0].request.url == "https://api.github.com/orgs/org_name/repos?per_page=100&sort=updated&direction=desc"
218219

@@ -226,7 +227,7 @@ def test_stream_repositories_read():
226227
"GET", "https://api.github.com/orgs/org1/repos", json=[{"id": 1, "updated_at": updated_at}, {"id": 2, "updated_at": updated_at}]
227228
)
228229
responses.add("GET", "https://api.github.com/orgs/org2/repos", json=[{"id": 3, "updated_at": updated_at}])
229-
records = read_full_refresh(stream)
230+
records = list(read_full_refresh(stream))
230231
assert records == [
231232
{"id": 1, "organization": "org1", "updated_at": updated_at},
232233
{"id": 2, "organization": "org1", "updated_at": updated_at},
@@ -250,7 +251,7 @@ def test_stream_projects_disabled():
250251
json={"message": "Projects are disabled for this repository", "documentation_url": "https://docs.github.com/v3/projects"},
251252
)
252253

253-
assert read_full_refresh(stream) == []
254+
assert list(read_full_refresh(stream)) == []
254255
assert len(responses.calls) == 1
255256
assert responses.calls[0].request.url == "https://api.github.com/repos/test_repo/projects?per_page=100&state=all"
256257

@@ -429,7 +430,7 @@ def test_stream_pull_request_commits():
429430
json=[{"sha": 3}, {"sha": 4}],
430431
)
431432

432-
records = read_full_refresh(stream)
433+
records = list(read_full_refresh(stream))
433434
assert records == [
434435
{"sha": 1, "repository": "organization/repository", "pull_number": 2},
435436
{"sha": 2, "repository": "organization/repository", "pull_number": 2},
@@ -785,7 +786,7 @@ def get_records(cursor_field):
785786
]:
786787
stream = cls(**repository_args_with_start_date)
787788
responses.add("GET", url, json=get_json_response(stream.cursor_field))
788-
records = read_full_refresh(stream)
789+
records = list(read_full_refresh(stream))
789790
assert records == get_records(stream.cursor_field)[1:2]
790791

791792
for cls, url in [
@@ -796,7 +797,7 @@ def get_records(cursor_field):
796797
]:
797798
stream = cls(**repository_args)
798799
responses.add("GET", url, json=get_json_response(stream.cursor_field))
799-
records = read_full_refresh(stream)
800+
records = list(read_full_refresh(stream))
800801
assert records == get_records(stream.cursor_field)
801802

802803
responses.add(
@@ -809,7 +810,7 @@ def get_records(cursor_field):
809810
)
810811

811812
stream = Stargazers(**repository_args_with_start_date)
812-
records = read_full_refresh(stream)
813+
records = list(read_full_refresh(stream))
813814
assert records == [{"repository": "organization/repository", "starred_at": "2022-02-02T00:00:00Z", "user": {"id": 2}, "user_id": 2}]
814815

815816

@@ -863,7 +864,7 @@ def test_stream_team_members_full_refresh():
863864
responses.add("GET", "https://api.github.com/orgs/org1/teams/team2/memberships/login2", json={"username": "login2"})
864865

865866
stream = TeamMembers(parent=Teams(**organization_args), **repository_args)
866-
records = read_full_refresh(stream)
867+
records = list(read_full_refresh(stream))
867868

868869
assert records == [
869870
{"login": "login1", "organization": "org1", "team_slug": "team1"},
@@ -872,7 +873,7 @@ def test_stream_team_members_full_refresh():
872873
]
873874

874875
stream = TeamMemberships(parent=stream, **repository_args)
875-
records = read_full_refresh(stream)
876+
records = list(read_full_refresh(stream))
876877

877878
assert records == [
878879
{"username": "login1", "organization": "org1", "team_slug": "team1"},

airbyte-integrations/connectors/source-github/unit_tests/utils.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,6 @@
99
from airbyte_cdk.sources.streams import Stream
1010

1111

12-
def read_full_refresh(stream_instance: Stream):
13-
records = []
14-
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh)
15-
for slice in slices:
16-
records.extend(list(stream_instance.read_records(stream_slice=slice, sync_mode=SyncMode.full_refresh)))
17-
return records
18-
19-
2012
def read_incremental(stream_instance: Stream, stream_state: MutableMapping[str, Any]):
2113
res = []
2214
slices = stream_instance.stream_slices(sync_mode=SyncMode.incremental, stream_state=stream_state)

docs/integrations/sources/github.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ The GitHub connector should not run into GitHub API limitations under normal usa
141141

142142
| Version | Date | Pull Request | Subject |
143143
|:--------|:-----------| :--- |:-------------------------------------------------------------------------------------------------------------|
144+
| 0.2.46 | 2022-08-17 | [15730](https://github.com/airbytehq/airbyte/pull/15730) | Validate input organizations and repositories |
144145
| 0.2.45 | 2022-08-11 | [15420](https://github.com/airbytehq/airbyte/pull/15420) | "User" object can be "null" |
145146
| 0.2.44 | 2022-08-01 | [14795](https://github.com/airbytehq/airbyte/pull/14795) | Use GraphQL for `pull_request_comment_reactions` stream |
146147
| 0.2.43 | 2022-07-26 | [15049](https://github.com/airbytehq/airbyte/pull/15049) | Bugfix schemas for streams `deployments`, `workflow_runs`, `teams` |

0 commit comments

Comments
 (0)