Skip to content

Commit d48dbf9

Browse files
clnolljatinyadav-cc
authored andcommitted
Update file-based connectors for compatibility with concurrent CDK (airbytehq#34681)
1 parent b8d5193 commit d48dbf9

File tree

20 files changed

+98
-38
lines changed

20 files changed

+98
-38
lines changed

airbyte-integrations/connectors/source-azure-blob-storage/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ data:
77
connectorSubtype: file
88
connectorType: source
99
definitionId: fdaaba68-4875-4ed9-8fcd-4ae1e0a25093
10-
dockerImageTag: 0.3.2
10+
dockerImageTag: 0.3.3
1111
dockerRepository: airbyte/source-azure-blob-storage
1212
documentationUrl: https://docs.airbyte.com/integrations/sources/azure-blob-storage
1313
githubIssueLabel: source-azure-blob-storage

airbyte-integrations/connectors/source-azure-blob-storage/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from setuptools import find_packages, setup
77

88
MAIN_REQUIREMENTS = [
9-
"airbyte-cdk[file-based]==0.59.2", # pinned until compatible with https://github.com/airbytehq/airbyte/pull/34411
9+
"airbyte-cdk[file-based]>=0.60.1",
1010
"smart_open[azure]",
1111
"pytz",
1212
]

airbyte-integrations/connectors/source-azure-blob-storage/source_azure_blob_storage/run.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,16 @@
1414
def run():
1515
args = sys.argv[1:]
1616
catalog_path = AirbyteEntrypoint.extract_catalog(args)
17+
config_path = AirbyteEntrypoint.extract_config(args)
18+
state_path = AirbyteEntrypoint.extract_state(args)
1719
try:
18-
source = SourceAzureBlobStorage(SourceAzureBlobStorageStreamReader(), Config, catalog_path)
20+
source = SourceAzureBlobStorage(
21+
SourceAzureBlobStorageStreamReader(),
22+
Config,
23+
SourceAzureBlobStorage.read_catalog(catalog_path) if catalog_path else None,
24+
SourceAzureBlobStorage.read_config(config_path) if catalog_path else None,
25+
SourceAzureBlobStorage.read_state(state_path) if catalog_path else None,
26+
)
1927
except Exception:
2028
print(
2129
AirbyteMessage(

airbyte-integrations/connectors/source-azure-blob-storage/source_azure_blob_storage/source.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,15 @@
1111

1212

1313
class SourceAzureBlobStorage(FileBasedSource):
14-
def read_config(self, config_path: str) -> Mapping[str, Any]:
14+
@classmethod
15+
def read_config(cls, config_path: str) -> Mapping[str, Any]:
1516
"""
1617
Used to override the default read_config so that when the new file-based Azure Blob Storage connector processes a config
1718
in the legacy format, it can be transformed into the new config. This happens in entrypoint before we
1819
validate the config against the new spec.
1920
"""
20-
config = super().read_config(config_path)
21-
if not self._is_v1_config(config):
21+
config = FileBasedSource.read_config(config_path)
22+
if not cls._is_v1_config(config):
2223
converted_config = LegacyConfigTransformer.convert(config)
2324
emit_configuration_as_airbyte_control_message(converted_config)
2425
return converted_config

airbyte-integrations/connectors/source-gcs/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ data:
77
connectorSubtype: file
88
connectorType: source
99
definitionId: 2a8c41ae-8c23-4be0-a73f-2ab10ca1a820
10-
dockerImageTag: 0.3.5
10+
dockerImageTag: 0.3.6
1111
dockerRepository: airbyte/source-gcs
1212
documentationUrl: https://docs.airbyte.com/integrations/sources/gcs
1313
githubIssueLabel: source-gcs

airbyte-integrations/connectors/source-gcs/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from setuptools import find_packages, setup
77

88
MAIN_REQUIREMENTS = [
9-
"airbyte-cdk[file-based]==0.59.2", # pinned until compatible with https://github.com/airbytehq/airbyte/pull/34411
9+
"airbyte-cdk[file-based]>=0.60.1",
1010
"google-cloud-storage==2.12.0",
1111
"smart-open[s3]==5.1.0",
1212
"pandas==1.5.3",

airbyte-integrations/connectors/source-gcs/source_gcs/run.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,41 @@
44

55

66
import sys
7+
import traceback
8+
from datetime import datetime
79

810
from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
11+
from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteMessage, AirbyteTraceMessage, TraceType, Type
912
from source_gcs import Config, Cursor, SourceGCS, SourceGCSStreamReader
1013

1114

1215
def run():
1316
_args = sys.argv[1:]
14-
catalog_path = AirbyteEntrypoint.extract_catalog(_args)
15-
source = SourceGCS(SourceGCSStreamReader(), Config, catalog_path, cursor_cls=Cursor)
16-
launch(source, sys.argv[1:])
17+
try:
18+
catalog_path = AirbyteEntrypoint.extract_catalog(_args)
19+
config_path = AirbyteEntrypoint.extract_config(_args)
20+
state_path = AirbyteEntrypoint.extract_state(_args)
21+
source = SourceGCS(
22+
SourceGCSStreamReader(),
23+
Config,
24+
SourceGCS.read_catalog(catalog_path) if catalog_path else None,
25+
SourceGCS.read_config(config_path) if config_path else None,
26+
SourceGCS.read_state(state_path) if state_path else None,
27+
cursor_cls=Cursor,
28+
)
29+
except Exception:
30+
print(
31+
AirbyteMessage(
32+
type=Type.TRACE,
33+
trace=AirbyteTraceMessage(
34+
type=TraceType.ERROR,
35+
emitted_at=int(datetime.now().timestamp() * 1000),
36+
error=AirbyteErrorTraceMessage(
37+
message="Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance.",
38+
stack_trace=traceback.format_exc(),
39+
),
40+
),
41+
).json()
42+
)
43+
else:
44+
launch(source, sys.argv[1:])

airbyte-integrations/connectors/source-gcs/source_gcs/source.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@
1212

1313

1414
class SourceGCS(FileBasedSource):
15-
def read_config(self, config_path: str) -> Mapping[str, Any]:
15+
@classmethod
16+
def read_config(cls, config_path: str) -> Mapping[str, Any]:
1617
"""
1718
Override the default read_config to transform the legacy config format
1819
into the new one before validating it against the new spec.
1920
"""
20-
config = super().read_config(config_path)
21-
if not self._is_file_based_config(config):
21+
config = FileBasedSource.read_config(config_path)
22+
if not cls._is_file_based_config(config):
2223
parsed_legacy_config = SourceGCSSpec(**config)
2324
converted_config = LegacyConfigTransformer.convert(parsed_legacy_config)
2425
emit_configuration_as_airbyte_control_message(converted_config)

airbyte-integrations/connectors/source-google-drive/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ data:
77
connectorSubtype: file
88
connectorType: source
99
definitionId: 9f8dda77-1048-4368-815b-269bf54ee9b8
10-
dockerImageTag: 0.0.7
10+
dockerImageTag: 0.0.8
1111
dockerRepository: airbyte/source-google-drive
1212
githubIssueLabel: source-google-drive
1313
icon: google-drive.svg

airbyte-integrations/connectors/source-google-drive/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from setuptools import find_packages, setup
77

88
MAIN_REQUIREMENTS = [
9-
"airbyte-cdk[file-based]==0.59.2", # pinned until compatible with https://github.com/airbytehq/airbyte/pull/34411
9+
"airbyte-cdk[file-based]>=0.60.1",
1010
"google-api-python-client==2.104.0",
1111
"google-auth-httplib2==0.1.1",
1212
"google-auth-oauthlib==1.1.0",

0 commit comments

Comments
 (0)