Skip to content

Commit 01363dd

Browse files
vpipktoctavia-squidington-iiimarcosmarxm
authored
✨ Source Azure Blob Storage: add client_credentials auth (#50398)
Co-authored-by: Octavia Squidington III <[email protected]> Co-authored-by: Marcos Marx <[email protected]> Co-authored-by: marcosmarxm <[email protected]>
1 parent 8fbeb7b commit 01363dd

File tree

8 files changed

+186
-48
lines changed

8 files changed

+186
-48
lines changed

airbyte-integrations/connectors/source-azure-blob-storage/integration_tests/spec.json

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,7 @@
3535
"default": ["**"],
3636
"order": 1,
3737
"type": "array",
38-
"items": {
39-
"type": "string"
40-
}
38+
"items": { "type": "string" }
4139
},
4240
"legacy_prefix": {
4341
"title": "Legacy Prefix",
@@ -136,9 +134,7 @@
136134
"description": "A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.",
137135
"default": [],
138136
"type": "array",
139-
"items": {
140-
"type": "string"
141-
},
137+
"items": { "type": "string" },
142138
"uniqueItems": true
143139
},
144140
"strings_can_be_null": {
@@ -162,9 +158,7 @@
162158
"header_definition": {
163159
"title": "CSV Header Definition",
164160
"description": "How headers will be defined. `User Provided` assumes the CSV does not have a header row and uses the headers provided and `Autogenerated` assumes the CSV does not have a header row and the CDK will generate headers using for `f{i}` where `i` is the index starting from 0. Else, the default behavior is to use the header from the CSV file. If a user wants to autogenerate or provide column names for a CSV having headers, they can skip rows.",
165-
"default": {
166-
"header_definition_type": "From CSV"
167-
},
161+
"default": { "header_definition_type": "From CSV" },
168162
"oneOf": [
169163
{
170164
"title": "From CSV",
@@ -206,9 +200,7 @@
206200
"title": "Column Names",
207201
"description": "The column names that will be used while emitting the CSV records",
208202
"type": "array",
209-
"items": {
210-
"type": "string"
211-
}
203+
"items": { "type": "string" }
212204
}
213205
},
214206
"required": ["column_names", "header_definition_type"]
@@ -221,19 +213,15 @@
221213
"description": "A set of case-sensitive strings that should be interpreted as true values.",
222214
"default": ["y", "yes", "t", "true", "on", "1"],
223215
"type": "array",
224-
"items": {
225-
"type": "string"
226-
},
216+
"items": { "type": "string" },
227217
"uniqueItems": true
228218
},
229219
"false_values": {
230220
"title": "False Values",
231221
"description": "A set of case-sensitive strings that should be interpreted as false values.",
232222
"default": ["n", "no", "f", "false", "off", "0"],
233223
"type": "array",
234-
"items": {
235-
"type": "string"
236-
},
224+
"items": { "type": "string" },
237225
"uniqueItems": true
238226
},
239227
"inference_type": {
@@ -313,9 +301,7 @@
313301
"processing": {
314302
"title": "Processing",
315303
"description": "Processing configuration",
316-
"default": {
317-
"mode": "local"
318-
},
304+
"default": { "mode": "local" },
319305
"type": "object",
320306
"oneOf": [
321307
{
@@ -401,6 +387,43 @@
401387
"auth_type"
402388
]
403389
},
390+
{
391+
"title": "Authenticate via Client Credentials",
392+
"type": "object",
393+
"properties": {
394+
"auth_type": {
395+
"title": "Auth Type",
396+
"default": "client_credentials",
397+
"const": "client_credentials",
398+
"enum": ["client_credentials"],
399+
"type": "string"
400+
},
401+
"app_tenant_id": {
402+
"title": "Tenant ID",
403+
"description": "Tenant ID of the Microsoft Azure Application",
404+
"airbyte_secret": true,
405+
"type": "string"
406+
},
407+
"app_client_id": {
408+
"title": "Client ID",
409+
"description": "Client ID of your Microsoft developer application",
410+
"airbyte_secret": true,
411+
"type": "string"
412+
},
413+
"app_client_secret": {
414+
"title": "Client Secret",
415+
"description": "Client Secret of your Microsoft developer application",
416+
"airbyte_secret": true,
417+
"type": "string"
418+
}
419+
},
420+
"required": [
421+
"app_tenant_id",
422+
"app_client_id",
423+
"app_client_secret",
424+
"auth_type"
425+
]
426+
},
404427
{
405428
"title": "Authenticate via Storage Account Key",
406429
"type": "object",
@@ -485,12 +508,8 @@
485508
"type": "object",
486509
"additionalProperties": false,
487510
"properties": {
488-
"client_id": {
489-
"type": "string"
490-
},
491-
"client_secret": {
492-
"type": "string"
493-
}
511+
"client_id": { "type": "string" },
512+
"client_secret": { "type": "string" }
494513
}
495514
},
496515
"complete_oauth_server_output_specification": {

airbyte-integrations/connectors/source-azure-blob-storage/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ data:
1212
connectorSubtype: file
1313
connectorType: source
1414
definitionId: fdaaba68-4875-4ed9-8fcd-4ae1e0a25093
15-
dockerImageTag: 0.4.4
15+
dockerImageTag: 0.5.0
1616
dockerRepository: airbyte/source-azure-blob-storage
1717
documentationUrl: https://docs.airbyte.com/integrations/sources/azure-blob-storage
1818
githubIssueLabel: source-azure-blob-storage

airbyte-integrations/connectors/source-azure-blob-storage/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
33
build-backend = "poetry.core.masonry.api"
44

55
[tool.poetry]
6-
version = "0.4.4"
6+
version = "0.5.0"
77
name = "source-azure-blob-storage"
88
description = "Source implementation for Azure Blob Storage."
99
authors = [ "Airbyte <[email protected]>",]

airbyte-integrations/connectors/source-azure-blob-storage/source_azure_blob_storage/spec.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,25 @@ class Config(OneOfOptionConfig):
3636
)
3737

3838

39+
class ClientCredentials(BaseModel):
40+
class Config(OneOfOptionConfig):
41+
title = "Authenticate via Client Credentials"
42+
discriminator = "auth_type"
43+
44+
auth_type: Literal["client_credentials"] = Field("client_credentials", const=True)
45+
app_tenant_id: str = Field(title="Tenant ID", description="Tenant ID of the Microsoft Azure Application", airbyte_secret=True)
46+
app_client_id: str = Field(
47+
title="Client ID",
48+
description="Client ID of your Microsoft developer application",
49+
airbyte_secret=True,
50+
)
51+
app_client_secret: str = Field(
52+
title="Client Secret",
53+
description="Client Secret of your Microsoft developer application",
54+
airbyte_secret=True,
55+
)
56+
57+
3958
class StorageAccountKey(BaseModel):
4059
class Config(OneOfOptionConfig):
4160
title = "Authenticate via Storage Account Key"
@@ -61,7 +80,7 @@ class SourceAzureBlobStorageSpec(AbstractFileBasedSpec):
6180
def documentation_url(cls) -> AnyUrl:
6281
return AnyUrl("https://docs.airbyte.com/integrations/sources/azure-blob-storage", scheme="https")
6382

64-
credentials: Union[Oauth2, StorageAccountKey] = Field(
83+
credentials: Union[Oauth2, ClientCredentials, StorageAccountKey] = Field(
6584
title="Authentication",
6685
description="Credentials for connecting to the Azure Blob Storage",
6786
discriminator="auth_type",

airbyte-integrations/connectors/source-azure-blob-storage/source_azure_blob_storage/stream_reader.py

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33

44
import logging
55
from io import IOBase
6-
from typing import Iterable, List, Optional, Union
6+
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
77

88
import pytz
9-
from azure.core.credentials import AccessToken
9+
from azure.core.credentials import AccessToken, TokenCredential
1010
from azure.core.exceptions import ResourceNotFoundError
1111
from azure.storage.blob import BlobServiceClient, ContainerClient
1212
from smart_open import open
@@ -19,7 +19,46 @@
1919
from .spec import SourceAzureBlobStorageSpec
2020

2121

22-
class AzureOauth2Authenticator(Oauth2Authenticator):
22+
class AzureClientCredentialsAuthenticator(Oauth2Authenticator, TokenCredential):
23+
def __init__(self, tenant_id: str, client_id: str, client_secret: str, **kwargs):
24+
super().__init__(
25+
token_refresh_endpoint=f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token",
26+
client_id=client_id,
27+
client_secret=client_secret,
28+
grant_type="client_credentials",
29+
scopes=["https://storage.azure.com/.default"],
30+
refresh_token=None,
31+
)
32+
33+
def build_refresh_request_body(self) -> Mapping[str, Any]:
34+
"""
35+
Returns the request body to set on the refresh request
36+
37+
Override to define additional parameters
38+
"""
39+
payload: MutableMapping[str, Any] = {
40+
"grant_type": self.get_grant_type(),
41+
"client_id": self.get_client_id(),
42+
"client_secret": self.get_client_secret(),
43+
}
44+
45+
if self.get_scopes():
46+
payload["scope"] = " ".join(self.get_scopes())
47+
48+
if self.get_refresh_request_body():
49+
for key, val in self.get_refresh_request_body().items():
50+
# We defer to existing oauth constructs over custom configured fields
51+
if key not in payload:
52+
payload[key] = val
53+
54+
return payload
55+
56+
def get_token(self, *args, **kwargs) -> AccessToken:
57+
"""Parent class handles Oauth Refresh token logic."""
58+
return AccessToken(token=self.get_access_token(), expires_on=int(self.get_token_expiry_date().timestamp()))
59+
60+
61+
class AzureOauth2Authenticator(Oauth2Authenticator, TokenCredential):
2362
"""
2463
Authenticator for Azure Blob Storage SDK to align with azure.core.credentials.TokenCredential protocol
2564
"""
@@ -63,17 +102,24 @@ def azure_blob_service_client(self):
63102
return BlobServiceClient(self.account_url, credential=self._credentials)
64103

65104
@property
66-
def azure_credentials(self) -> Union[str, AzureOauth2Authenticator]:
105+
def azure_credentials(self) -> Union[str, AzureOauth2Authenticator, AzureClientCredentialsAuthenticator]:
67106
if not self._credentials:
68107
if self.config.credentials.auth_type == "storage_account_key":
69108
self._credentials = self.config.credentials.azure_blob_storage_account_key
70-
else:
109+
elif self.config.credentials.auth_type == "oauth2":
71110
self._credentials = AzureOauth2Authenticator(
72111
token_refresh_endpoint=f"https://login.microsoftonline.com/{self.config.credentials.tenant_id}/oauth2/v2.0/token",
73112
client_id=self.config.credentials.client_id,
74113
client_secret=self.config.credentials.client_secret,
75114
refresh_token=self.config.credentials.refresh_token,
76115
)
116+
elif self.config.credentials.auth_type == "client_credentials":
117+
self._credentials = AzureClientCredentialsAuthenticator(
118+
tenant_id=self.config.credentials.app_tenant_id,
119+
client_id=self.config.credentials.app_client_id,
120+
client_secret=self.config.credentials.app_client_secret,
121+
)
122+
77123
return self._credentials
78124

79125
def get_matching_files(

airbyte-integrations/connectors/source-azure-blob-storage/unit_tests/test_authenticator.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33

44
from azure.core.credentials import AccessToken
5-
from source_azure_blob_storage.stream_reader import AzureOauth2Authenticator
5+
from source_azure_blob_storage.stream_reader import AzureClientCredentialsAuthenticator, AzureOauth2Authenticator
66

77

88
def test_custom_authenticator(requests_mock):
@@ -24,3 +24,23 @@ def test_custom_authenticator(requests_mock):
2424
new_token = authenticator.get_token()
2525
assert isinstance(new_token, AccessToken)
2626
assert new_token.token == "access_token"
27+
28+
29+
def test_client_authenticator(requests_mock):
30+
authenticator = AzureClientCredentialsAuthenticator(
31+
token_refresh_endpoint="https://login.microsoftonline.com/tenant_id/oauth2/v2.0/token",
32+
tenant_id="tenant_id",
33+
client_id="client_id",
34+
client_secret="client_secret",
35+
)
36+
token_response = {
37+
"token_type": "Bearer",
38+
"scope": "https://storage.azure.com/.default",
39+
"expires_in": 3600,
40+
"ext_expires_in": 3600,
41+
"access_token": "access_token_123",
42+
}
43+
requests_mock.post("https://login.microsoftonline.com/tenant_id/oauth2/v2.0/token", json=token_response)
44+
new_token = authenticator.get_token()
45+
assert isinstance(new_token, AccessToken)
46+
assert new_token.token == "access_token_123"

airbyte-integrations/connectors/source-azure-blob-storage/unit_tests/test_config_migration.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,35 @@
33

44
import json
55
import os
6+
from pathlib import Path
7+
from shutil import copytree
8+
from tempfile import TemporaryDirectory
69
from typing import Any, Mapping
710

11+
from pytest import fixture
812
from source_azure_blob_storage import SourceAzureBlobStorage, SourceAzureBlobStorageSpec, SourceAzureBlobStorageStreamReader
913
from source_azure_blob_storage.config_migrations import MigrateCredentials, MigrateLegacyConfig
1014

1115
from airbyte_cdk.sources.file_based.stream.cursor import DefaultFileBasedCursor
1216

1317

18+
@fixture
19+
def temp_configs():
20+
config_path = f"{os.path.dirname(__file__)}/test_configs/"
21+
with TemporaryDirectory() as _tempdir:
22+
configs_dir = Path(_tempdir) / "test_configs"
23+
copytree(config_path, configs_dir)
24+
yield configs_dir
25+
26+
1427
# HELPERS
1528
def load_config(config_path: str) -> Mapping[str, Any]:
1629
with open(config_path, "r") as config:
1730
return json.load(config)
1831

1932

20-
def test_legacy_config_migration():
21-
config_path = f"{os.path.dirname(__file__)}/test_configs/test_legacy_config.json"
33+
def test_legacy_config_migration(temp_configs):
34+
config_path = str((Path(temp_configs) / "test_legacy_config.json").resolve())
2235
migration_instance = MigrateLegacyConfig
2336
source = SourceAzureBlobStorage(
2437
SourceAzureBlobStorageStreamReader(),
@@ -47,9 +60,11 @@ def test_legacy_config_migration():
4760
assert test_migrated_config == expected_config
4861

4962

50-
def test_credentials_config_migration():
51-
config_path = f"{os.path.dirname(__file__)}/test_configs/test_config_without_credentials.json"
63+
def test_credentials_config_migration(temp_configs):
64+
config_path = str((Path(temp_configs) / "test_config_without_credentials.json").resolve())
5265
initial_config = load_config(config_path)
66+
expected = initial_config["azure_blob_storage_account_key"]
67+
5368
migration_instance = MigrateCredentials
5469
source = SourceAzureBlobStorage(
5570
SourceAzureBlobStorageStreamReader(),
@@ -61,4 +76,4 @@ def test_credentials_config_migration():
6176
)
6277
migration_instance.migrate(["check", "--config", config_path], source)
6378
test_migrated_config = load_config(config_path)
64-
assert test_migrated_config["credentials"]["azure_blob_storage_account_key"] == initial_config["azure_blob_storage_account_key"]
79+
assert test_migrated_config["credentials"]["azure_blob_storage_account_key"] == expected

0 commit comments

Comments
 (0)