Skip to content

Source S3: updates for compatibility with the concurrent CDK #34591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,95 +4,95 @@ acceptance_tests:
- config_path: secrets/config.json
expect_records:
path: integration_tests/expected_records/csv.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/config_iam_role.json
expect_records:
path: integration_tests/expected_records/csv.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_csv_custom_encoding_config.json
expect_records:
path: integration_tests/expected_records/legacy_csv_custom_encoding.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_csv_custom_format_config.json
expect_records:
path: integration_tests/expected_records/legacy_csv_custom_format.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_csv_user_schema_config.json
expect_records:
path: integration_tests/expected_records/legacy_csv_user_schema.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_csv_no_header_config.json
expect_records:
path: integration_tests/expected_records/legacy_csv_no_header.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_csv_skip_rows_config.json
expect_records:
path: integration_tests/expected_records/legacy_csv_skip_rows.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_csv_skip_rows_no_header_config.json
expect_records:
path: integration_tests/expected_records/legacy_csv_skip_rows_no_header.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_csv_with_nulls_config.json
expect_records:
path: integration_tests/expected_records/legacy_csv_with_nulls.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_csv_with_null_bools_config.json
expect_records:
path: integration_tests/expected_records/legacy_csv_with_null_bools.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_parquet_config.json
expect_records:
path: integration_tests/expected_records/parquet.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/parquet_dataset_config.json
expect_records:
path: integration_tests/expected_records/parquet_dataset.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
Expand All @@ -107,71 +107,71 @@ acceptance_tests:
- config_path: secrets/v4_avro_config.json
expect_records:
path: integration_tests/expected_records/avro.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_jsonl_config.json
expect_records:
path: integration_tests/expected_records/jsonl.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/v4_jsonl_newlines_config.json
expect_records:
path: integration_tests/expected_records/jsonl_newlines.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/zip_config_csv.json
expect_records:
path: integration_tests/expected_records/zip_csv.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/zip_config_csv_custom_encoding.json
expect_records:
path: integration_tests/expected_records/zip_csv_custom_encoding.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/zip_config_jsonl.json
expect_records:
path: integration_tests/expected_records/zip_jsonl.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/zip_config_avro.json
expect_records:
path: integration_tests/expected_records/zip_avro.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/zip_config_parquet.json
expect_records:
path: integration_tests/expected_records/zip_parquet.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800
file_types:
skip_test: true
bypass_reason: "To be testes with the last config"
- config_path: secrets/unstructured_config.json
expect_records:
path: integration_tests/expected_records/unstructured.jsonl
exact_order: true
exact_order: false
timeout_seconds: 1800

connection:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerImageTag: 4.4.1
dockerImageTag: 4.5.0
dockerRepository: airbyte/source-s3
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
githubIssueLabel: source-s3
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk[file-based]==0.59.2", # temporarily pin until concurrency can be released
"airbyte-cdk[file-based]>=0.60.1",
"smart-open[s3]==5.1.0",
"wcmatch==8.4",
"dill==0.3.4",
Expand Down
11 changes: 10 additions & 1 deletion airbyte-integrations/connectors/source-s3/source_s3/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,17 @@

def get_source(args: List[str]):
catalog_path = AirbyteEntrypoint.extract_catalog(args)
config_path = AirbyteEntrypoint.extract_config(args)
state_path = AirbyteEntrypoint.extract_state(args)
try:
return SourceS3(SourceS3StreamReader(), Config, catalog_path, cursor_cls=Cursor)
return SourceS3(
SourceS3StreamReader(),
Config,
SourceS3.read_catalog(catalog_path) if catalog_path else None,
SourceS3.read_config(config_path) if config_path else None,
SourceS3.read_state(state_path) if state_path else None,
cursor_cls=Cursor,
)
except Exception:
print(
AirbyteMessage(
Expand Down
12 changes: 8 additions & 4 deletions airbyte-integrations/connectors/source-s3/source_s3/v4/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from airbyte_cdk.config_observation import emit_configuration_as_airbyte_control_message
from airbyte_cdk.models import ConnectorSpecification
from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource
from airbyte_cdk.sources.file_based.file_based_source import DEFAULT_CONCURRENCY, FileBasedSource
from airbyte_cdk.utils import is_cloud_environment
from source_s3.source import SourceS3Spec
from source_s3.v4.legacy_config_transformer import LegacyConfigTransformer
Expand All @@ -21,14 +21,17 @@


class SourceS3(FileBasedSource):
def read_config(self, config_path: str) -> Mapping[str, Any]:
_concurrency_level = DEFAULT_CONCURRENCY
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there value in allowing the user to configure the concurrency level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to allow each source to turn on concurrency intentionally, instead of accidentally deploying it when the next version of the connector is deployed.


@classmethod
def read_config(cls, config_path: str) -> Mapping[str, Any]:
"""
Used to override the default read_config so that when the new file-based S3 connector processes a config
in the legacy format, it can be transformed into the new config. This happens in entrypoint before we
validate the config against the new spec.
"""
config = super().read_config(config_path)
if not self._is_v4_config(config):
if not SourceS3._is_v4_config(config):
parsed_legacy_config = SourceS3Spec(**config)
converted_config = LegacyConfigTransformer.convert(parsed_legacy_config)
emit_configuration_as_airbyte_control_message(converted_config)
Expand Down Expand Up @@ -66,7 +69,8 @@ def spec(self, *args: Any, **kwargs: Any) -> ConnectorSpecification:
connectionSpecification=s4_spec,
)

def _is_v4_config(self, config: Mapping[str, Any]) -> bool:
@staticmethod
def _is_v4_config(config: Mapping[str, Any]) -> bool:
return "streams" in config

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
class SourceTest(unittest.TestCase):
def setUp(self) -> None:
self._stream_reader = Mock(spec=SourceS3StreamReader)
self._source = SourceS3(self._stream_reader, Config, str(TEST_FILES_FOLDER.joinpath("catalog.json")))
self._source = SourceS3(
self._stream_reader,
Config,
SourceS3.read_catalog(str(TEST_FILES_FOLDER.joinpath("catalog.json"))),
SourceS3.read_config(str(TEST_FILES_FOLDER.joinpath("v3_config.json"))),
None,
)

@patch("source_s3.v4.source.emit_configuration_as_airbyte_control_message")
def test_given_config_is_v3_when_read_config_then_emit_new_config(self, emit_config_mock) -> None:
Expand Down
11 changes: 10 additions & 1 deletion airbyte-integrations/connectors/source-s3/v4_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,17 @@

def get_source(args: List[str]):
catalog_path = AirbyteEntrypoint.extract_catalog(args)
config_path = AirbyteEntrypoint.extract_config(args)
state_path = AirbyteEntrypoint.extract_state(args)
try:
return SourceS3(SourceS3StreamReader(), Config, catalog_path, cursor_cls=Cursor)
return SourceS3(
SourceS3StreamReader(),
Config,
SourceS3.read_catalog(catalog_path) if catalog_path else None,
SourceS3.read_config(config_path) if config_path else None,
SourceS3.read_state(state_path) if state_path else None,
cursor_cls=Cursor,
)
except Exception:
print(
AirbyteMessage(
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,9 @@ To perform the text extraction from PDF and Docx files, the connector uses the [

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------|
| 4.5.0 | 2024-02-01 | [34591](https://github.com/airbytehq/airbyte/pull/34591) | Run full refresh syncs concurrently |
| 4.4.1 | 2024-01-30 | [34665](https://github.com/airbytehq/airbyte/pull/34665) | Pin moto & CDK version |
| 4.4.0 | 2024-01-12 | [33818](https://github.com/airbytehq/airbyte/pull/33818) | Add IAM Role Authentication |
| 4.4.0 | 2023-01-12 | [33818](https://github.com/airbytehq/airbyte/pull/33818) | Add IAM Role Authentication |
| 4.3.1 | 2024-01-04 | [33937](https://github.com/airbytehq/airbyte/pull/33937) | Prepare for airbyte-lib |
| 4.3.0 | 2023-12-14 | [33411](https://github.com/airbytehq/airbyte/pull/33411) | Bump CDK version to auto-set primary key for document file streams and support raw txt files |
| 4.2.4 | 2023-12-06 | [33187](https://github.com/airbytehq/airbyte/pull/33187) | Bump CDK version to hide source-defined primary key |
Expand Down