Skip to content

Commit 003fc6e

Browse files
authored
🐛 Source Github: increase discovery time (#36429)
1 parent 05f0c64 commit 003fc6e

File tree

8 files changed

+113
-124
lines changed

8 files changed

+113
-124
lines changed

airbyte-integrations/connectors/source-github/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ data:
1010
connectorSubtype: api
1111
connectorType: source
1212
definitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
13-
dockerImageTag: 1.7.0
13+
dockerImageTag: 1.7.1
1414
dockerRepository: airbyte/source-github
1515
documentationUrl: https://docs.airbyte.com/integrations/sources/github
1616
githubIssueLabel: source-github

airbyte-integrations/connectors/source-github/pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "1.7.0"
6+
version = "1.7.1"
77
name = "source-github"
88
description = "Source implementation for GitHub."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-github/source_github/source.py

+17-52
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
from os import getenv
6-
from typing import Any, Dict, List, Mapping, MutableMapping, Tuple
6+
from typing import Any, List, Mapping, MutableMapping, Optional, Tuple
77
from urllib.parse import urlparse
88

99
from airbyte_cdk import AirbyteLogger
@@ -65,7 +65,9 @@ class SourceGithub(AbstractSource):
6565
continue_sync_on_stream_failure = True
6666

6767
@staticmethod
68-
def _get_org_repositories(config: Mapping[str, Any], authenticator: MultipleTokenAuthenticator) -> Tuple[List[str], List[str]]:
68+
def _get_org_repositories(
69+
config: Mapping[str, Any], authenticator: MultipleTokenAuthenticator
70+
) -> Tuple[List[str], List[str], Optional[str]]:
6971
"""
7072
Parse config/repositories and produce two lists: organizations, repositories.
7173
Args:
@@ -78,16 +80,19 @@ def _get_org_repositories(config: Mapping[str, Any], authenticator: MultipleToke
7880
organizations = set()
7981
unchecked_repos = set()
8082
unchecked_orgs = set()
83+
pattern = None
8184

8285
for org_repos in config_repositories:
83-
org, _, repos = org_repos.partition("/")
84-
if repos == "*":
85-
unchecked_orgs.add(org)
86+
_, _, repos = org_repos.partition("/")
87+
if "*" in repos:
88+
unchecked_orgs.add(org_repos)
8689
else:
8790
unchecked_repos.add(org_repos)
8891

8992
if unchecked_orgs:
90-
stream = Repositories(authenticator=authenticator, organizations=unchecked_orgs, api_url=config.get("api_url"))
93+
org_names = [org.split("/")[0] for org in unchecked_orgs]
94+
pattern = "|".join([f"({org.replace('*', '.*')})" for org in unchecked_orgs])
95+
stream = Repositories(authenticator=authenticator, organizations=org_names, api_url=config.get("api_url"), pattern=pattern)
9196
for record in read_full_refresh(stream):
9297
repositories.add(record["full_name"])
9398
organizations.add(record["organization"])
@@ -96,7 +101,7 @@ def _get_org_repositories(config: Mapping[str, Any], authenticator: MultipleToke
96101
if unchecked_repos:
97102
stream = RepositoryStats(
98103
authenticator=authenticator,
99-
repositories=unchecked_repos,
104+
repositories=list(unchecked_repos),
100105
api_url=config.get("api_url"),
101106
# This parameter is deprecated and in future will be used sane default, page_size: 10
102107
page_size_for_large_streams=config.get("page_size_for_large_streams", constants.DEFAULT_PAGE_SIZE_FOR_LARGE_STREAM),
@@ -107,7 +112,7 @@ def _get_org_repositories(config: Mapping[str, Any], authenticator: MultipleToke
107112
if organization:
108113
organizations.add(organization)
109114

110-
return list(organizations), list(repositories)
115+
return list(organizations), list(repositories), pattern
111116

112117
@staticmethod
113118
def get_access_token(config: Mapping[str, Any]):
@@ -169,45 +174,6 @@ def _validate_branches(self, config: MutableMapping[str, Any]) -> MutableMapping
169174
def _is_http_allowed() -> bool:
170175
return getenv("DEPLOYMENT_MODE", "").upper() != "CLOUD"
171176

172-
@staticmethod
173-
def _get_branches_data(
174-
selected_branches: List, full_refresh_args: Dict[str, Any] = None
175-
) -> Tuple[Dict[str, str], Dict[str, List[str]]]:
176-
selected_branches = set(selected_branches)
177-
178-
# Get the default branch for each repository
179-
default_branches = {}
180-
repository_stats_stream = RepositoryStats(**full_refresh_args)
181-
for stream_slice in repository_stats_stream.stream_slices(sync_mode=SyncMode.full_refresh):
182-
default_branches.update(
183-
{
184-
repo_stats["full_name"]: repo_stats["default_branch"]
185-
for repo_stats in repository_stats_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)
186-
}
187-
)
188-
189-
all_branches = []
190-
branches_stream = Branches(**full_refresh_args)
191-
for stream_slice in branches_stream.stream_slices(sync_mode=SyncMode.full_refresh):
192-
for branch in branches_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice):
193-
all_branches.append(f"{branch['repository']}/{branch['name']}")
194-
195-
# Create mapping of repository to list of branches to pull commits for
196-
# If no branches are specified for a repo, use its default branch
197-
branches_to_pull: Dict[str, List[str]] = {}
198-
for repo in full_refresh_args["repositories"]:
199-
repo_branches = []
200-
for branch in selected_branches:
201-
branch_parts = branch.split("/", 2)
202-
if "/".join(branch_parts[:2]) == repo and branch in all_branches:
203-
repo_branches.append(branch_parts[-1])
204-
if not repo_branches:
205-
repo_branches = [default_branches[repo]]
206-
207-
branches_to_pull[repo] = repo_branches
208-
209-
return default_branches, branches_to_pull
210-
211177
def user_friendly_error_message(self, message: str) -> str:
212178
user_message = ""
213179
if "404 Client Error: Not Found for url: https://api.github.com/repos/" in message:
@@ -229,7 +195,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
229195
config = self._validate_and_transform_config(config)
230196
try:
231197
authenticator = self._get_authenticator(config)
232-
_, repositories = self._get_org_repositories(config=config, authenticator=authenticator)
198+
_, repositories, _ = self._get_org_repositories(config=config, authenticator=authenticator)
233199
if not repositories:
234200
return (
235201
False,
@@ -246,7 +212,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
246212
authenticator = self._get_authenticator(config)
247213
config = self._validate_and_transform_config(config)
248214
try:
249-
organizations, repositories = self._get_org_repositories(config=config, authenticator=authenticator)
215+
organizations, repositories, pattern = self._get_org_repositories(config=config, authenticator=authenticator)
250216
except Exception as e:
251217
message = repr(e)
252218
user_message = self.user_friendly_error_message(message)
@@ -291,7 +257,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
291257
}
292258
repository_args_with_start_date = {**repository_args, "start_date": start_date}
293259

294-
default_branches, branches_to_pull = self._get_branches_data(config.get("branch", []), repository_args)
295260
pull_requests_stream = PullRequests(**repository_args_with_start_date)
296261
projects_stream = Projects(**repository_args_with_start_date)
297262
project_columns_stream = ProjectColumns(projects_stream, **repository_args_with_start_date)
@@ -307,7 +272,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
307272
Comments(**repository_args_with_start_date),
308273
CommitCommentReactions(**repository_args_with_start_date),
309274
CommitComments(**repository_args_with_start_date),
310-
Commits(**repository_args_with_start_date, branches_to_pull=branches_to_pull, default_branches=default_branches),
275+
Commits(**repository_args_with_start_date, branches_to_pull=config.get("branches", [])),
311276
ContributorActivity(**repository_args),
312277
Deployments(**repository_args_with_start_date),
313278
Events(**repository_args_with_start_date),
@@ -327,7 +292,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
327292
ProjectsV2(**repository_args_with_start_date),
328293
pull_requests_stream,
329294
Releases(**repository_args_with_start_date),
330-
Repositories(**organization_args_with_start_date),
295+
Repositories(**organization_args_with_start_date, pattern=pattern),
331296
ReviewComments(**repository_args_with_start_date),
332297
Reviews(**repository_args_with_start_date),
333298
Stargazers(**repository_args_with_start_date),

airbyte-integrations/connectors/source-github/source_github/spec.json

+7-6
Original file line numberDiff line numberDiff line change
@@ -81,18 +81,19 @@
8181
"type": "array",
8282
"items": {
8383
"type": "string",
84-
"pattern": "^([\\w.-]+/(\\*|[\\w.-]+(?<!\\.git))\\s+)*[\\w.-]+/(\\*|[\\w.-]+(?<!\\.git))$"
84+
"pattern": "^[\\w.-]+/(([\\w.-]*\\*)|[\\w.-]+(?<!\\.git))$"
8585
},
8686
"minItems": 1,
8787
"examples": [
88-
"airbytehq/airbyte airbytehq/another-repo",
88+
"airbytehq/airbyte",
89+
"airbytehq/another-repo",
8990
"airbytehq/*",
90-
"airbytehq/airbyte"
91+
"airbytehq/a*"
9192
],
9293
"title": "GitHub Repositories",
93-
"description": "List of GitHub organizations/repositories, e.g. `airbytehq/airbyte` for single repository, `airbytehq/*` for get all repositories from organization and `airbytehq/airbyte airbytehq/another-repo` for multiple repositories.",
94+
"description": "List of GitHub organizations/repositories, e.g. `airbytehq/airbyte` for single repository, `airbytehq/*` for get all repositories from organization and `airbytehq/a* for matching multiple repositories by pattern.",
9495
"order": 1,
95-
"pattern_descriptor": "org/repo org/another-repo org/*"
96+
"pattern_descriptor": "org/repo org/another-repo org/* org/a*"
9697
},
9798
"start_date": {
9899
"type": "string",
@@ -126,7 +127,7 @@
126127
"type": "string"
127128
},
128129
"title": "Branches",
129-
"examples": ["airbytehq/airbyte/master airbytehq/airbyte/my-branch"],
130+
"examples": ["airbytehq/airbyte/master", "airbytehq/airbyte/my-branch"],
130131
"description": "List of GitHub repository branches to pull commits for, e.g. `airbytehq/airbyte/master`. If no branches are specified for a repository, the default branch will be pulled.",
131132
"order": 4,
132133
"pattern_descriptor": "org/repo/branch1 org/repo/branch2"

airbyte-integrations/connectors/source-github/source_github/streams.py

+40-5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5+
import re
56
import time
67
from abc import ABC, abstractmethod
78
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
@@ -441,12 +442,18 @@ class Repositories(SemiIncrementalMixin, Organizations):
441442
"direction": "desc",
442443
}
443444

445+
def __init__(self, *args, pattern: Optional[str] = None, **kwargs):
446+
self._pattern = re.compile(pattern) if pattern else pattern
447+
super().__init__(*args, **kwargs)
448+
444449
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
445450
return f"orgs/{stream_slice['organization']}/repos"
446451

447452
def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]:
448453
for record in response.json(): # GitHub puts records in an array.
449-
yield self.transform(record=record, stream_slice=stream_slice)
454+
record = self.transform(record=record, stream_slice=stream_slice)
455+
if not self._pattern or self._pattern.match(record["full_name"]):
456+
yield record
450457

451458

452459
class Tags(GithubStream):
@@ -676,10 +683,13 @@ class Commits(IncrementalMixin, GithubStream):
676683
cursor_field = "created_at"
677684
slice_keys = ["repository", "branch"]
678685

679-
def __init__(self, branches_to_pull: Mapping[str, List[str]], default_branches: Mapping[str, str], **kwargs):
686+
def __init__(self, branches_to_pull: List[str], **kwargs):
680687
super().__init__(**kwargs)
681-
self.branches_to_pull = branches_to_pull
682-
self.default_branches = default_branches
688+
kwargs.pop("start_date")
689+
self.branches_to_repos = {}
690+
self.branches_to_pull = set(branches_to_pull)
691+
self.branches_stream = Branches(**kwargs)
692+
self.repositories_stream = RepositoryStats(**kwargs)
683693

684694
def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
685695
params = super(IncrementalMixin, self).request_params(stream_state=stream_state, stream_slice=stream_slice, **kwargs)
@@ -690,9 +700,10 @@ def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[
690700
return params
691701

692702
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
703+
self._validate_branches_to_pull()
693704
for stream_slice in super().stream_slices(**kwargs):
694705
repository = stream_slice["repository"]
695-
for branch in self.branches_to_pull.get(repository, []):
706+
for branch in self.branches_to_repos.get(repository, []):
696707
yield {"branch": branch, "repository": repository}
697708

698709
def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str, Any]) -> MutableMapping[str, Any]:
@@ -718,6 +729,30 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
718729
current_stream_state.setdefault(repository, {}).setdefault(branch, {})[self.cursor_field] = updated_state
719730
return current_stream_state
720731

732+
def _validate_branches_to_pull(self):
733+
# Get the default branch for each repository
734+
default_branches = {}
735+
for stream_slice in self.repositories_stream.stream_slices(sync_mode=SyncMode.full_refresh):
736+
for repo_stats in self.repositories_stream.read_records(stream_slice=stream_slice, sync_mode=SyncMode.full_refresh):
737+
default_branches[repo_stats["full_name"]] = repo_stats["default_branch"]
738+
739+
all_branches = []
740+
for stream_slice in self.branches_stream.stream_slices(sync_mode=SyncMode.full_refresh):
741+
for branch in self.branches_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice):
742+
all_branches.append(f"{branch['repository']}/{branch['name']}")
743+
744+
# Create mapping of repository to list of branches to pull commits for
745+
# If no branches are specified for a repo, use its default branch
746+
for repo in self.repositories:
747+
repo_branches = []
748+
for branch in self.branches_to_pull:
749+
branch_parts = branch.split("/", 2)
750+
if "/".join(branch_parts[:2]) == repo and branch in all_branches:
751+
repo_branches.append(branch_parts[-1])
752+
if not repo_branches:
753+
repo_branches = [default_branches[repo]]
754+
self.branches_to_repos[repo] = repo_branches
755+
721756

722757
class Issues(IncrementalMixin, GithubStream):
723758
"""

airbyte-integrations/connectors/source-github/unit_tests/test_source.py

+5-47
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def test_check_connection_repos_and_org_repos(rate_limit_mock_response):
105105

106106
@responses.activate
107107
def test_check_connection_org_only(rate_limit_mock_response):
108-
repos = [{"name": f"name {i}", "full_name": f"full name {i}", "updated_at": "2020-01-01T00:00:00Z"} for i in range(1000)]
108+
repos = [{"name": f"name {i}", "full_name": f"airbytehq/full name {i}", "updated_at": "2020-01-01T00:00:00Z"} for i in range(1000)]
109109
responses.add("GET", "https://api.github.com/orgs/airbytehq/repos", json=repos)
110110

111111
status = check_source("airbytehq/*")
@@ -115,49 +115,6 @@ def test_check_connection_org_only(rate_limit_mock_response):
115115
assert len(responses.calls) == 2
116116

117117

118-
@responses.activate
119-
def test_get_branches_data():
120-
121-
repository_args = {"repositories": ["airbytehq/integration-test"], "page_size_for_large_streams": 10}
122-
123-
source = SourceGithub()
124-
125-
responses.add(
126-
"GET",
127-
"https://api.github.com/repos/airbytehq/integration-test",
128-
json={"full_name": "airbytehq/integration-test", "default_branch": "master"},
129-
)
130-
131-
responses.add(
132-
"GET",
133-
"https://api.github.com/repos/airbytehq/integration-test/branches",
134-
json=[
135-
{"repository": "airbytehq/integration-test", "name": "feature/branch_0"},
136-
{"repository": "airbytehq/integration-test", "name": "feature/branch_1"},
137-
{"repository": "airbytehq/integration-test", "name": "feature/branch_2"},
138-
{"repository": "airbytehq/integration-test", "name": "master"},
139-
],
140-
)
141-
142-
default_branches, branches_to_pull = source._get_branches_data([], repository_args)
143-
assert default_branches == {"airbytehq/integration-test": "master"}
144-
assert branches_to_pull == {"airbytehq/integration-test": ["master"]}
145-
146-
default_branches, branches_to_pull = source._get_branches_data(
147-
[
148-
"airbytehq/integration-test/feature/branch_0",
149-
"airbytehq/integration-test/feature/branch_1",
150-
"airbytehq/integration-test/feature/branch_3",
151-
],
152-
repository_args,
153-
)
154-
155-
assert default_branches == {"airbytehq/integration-test": "master"}
156-
assert len(branches_to_pull["airbytehq/integration-test"]) == 2
157-
assert "feature/branch_0" in branches_to_pull["airbytehq/integration-test"]
158-
assert "feature/branch_1" in branches_to_pull["airbytehq/integration-test"]
159-
160-
161118
@responses.activate
162119
def test_get_org_repositories():
163120
responses.add(
@@ -178,15 +135,15 @@ def test_get_org_repositories():
178135
config = {"repositories": ["airbytehq/integration-test", "docker/*"]}
179136
source = SourceGithub()
180137
config = source._ensure_default_values(config)
181-
organisations, repositories = source._get_org_repositories(config, authenticator=None)
138+
organisations, repositories, _ = source._get_org_repositories(config, authenticator=None)
182139

183140
assert set(repositories) == {"airbytehq/integration-test", "docker/docker-py", "docker/compose"}
184141
assert set(organisations) == {"airbytehq", "docker"}
185142

186143

187144
@responses.activate
188145
def test_organization_or_repo_available(monkeypatch, rate_limit_mock_response):
189-
monkeypatch.setattr(SourceGithub, "_get_org_repositories", MagicMock(return_value=(False, False)))
146+
monkeypatch.setattr(SourceGithub, "_get_org_repositories", MagicMock(return_value=(False, False, None)))
190147
source = SourceGithub()
191148
with pytest.raises(Exception) as exc_info:
192149
config = {"access_token": "test_token", "repository": ""}
@@ -209,6 +166,7 @@ def test_check_config_repository():
209166
"airbyte_hq/airbyte",
210167
"airbytehq/123",
211168
"airbytehq/airbytexgit",
169+
"airbytehq/a*",
212170
]
213171

214172
repos_fail = [
@@ -242,7 +200,7 @@ def test_check_config_repository():
242200

243201
@responses.activate
244202
def test_streams_no_streams_available_error(monkeypatch, rate_limit_mock_response):
245-
monkeypatch.setattr(SourceGithub, "_get_org_repositories", MagicMock(return_value=(False, False)))
203+
monkeypatch.setattr(SourceGithub, "_get_org_repositories", MagicMock(return_value=(False, False, None)))
246204
with pytest.raises(AirbyteTracedException) as e:
247205
SourceGithub().streams(config={"access_token": "test_token", "repository": "airbytehq/airbyte-test"})
248206
assert str(e.value) == "No streams available. Please check permissions"

0 commit comments

Comments
 (0)