|
| 1 | +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. |
| 2 | + |
| 3 | +import re |
| 4 | +from pathlib import Path |
| 5 | +from unittest.mock import Mock |
| 6 | + |
| 7 | +import pytest |
| 8 | +from airbyte_cdk import AirbyteTracedException |
| 9 | +from airbyte_cdk.sources.file_based.availability_strategy import DefaultFileBasedAvailabilityStrategy |
| 10 | +from source_gcs import Config, Cursor, SourceGCS, SourceGCSStreamReader |
| 11 | + |
| 12 | + |
| 13 | +def _source_gcs(catalog, config): |
| 14 | + return SourceGCS( |
| 15 | + SourceGCSStreamReader(), |
| 16 | + Config, |
| 17 | + catalog, |
| 18 | + config, |
| 19 | + None, |
| 20 | + cursor_cls=Cursor, |
| 21 | + ) |
| 22 | + |
| 23 | + |
| 24 | +def _catalog_path(): |
| 25 | + return Path(__file__).resolve().parent.joinpath("resource/catalog/") |
| 26 | + |
| 27 | + |
| 28 | +def _config_path(): |
| 29 | + return Path(__file__).resolve().parent.joinpath("resource/config/") |
| 30 | + |
| 31 | + |
| 32 | +def _mock_encoding_error(): |
| 33 | + DefaultFileBasedAvailabilityStrategy.check_availability_and_parsability = Mock(side_effect=AirbyteTracedException(message="encoding error")) |
| 34 | + |
| 35 | + |
| 36 | +def test_check_connection_with_airbyte_traced_exception(logger): |
| 37 | + config = SourceGCS.read_config(f"{_config_path()}/config_bad_encoding_single_stream.json") |
| 38 | + catalog = SourceGCS.read_catalog(f"{_catalog_path()}/catalog_bad_encoding.json") |
| 39 | + source = _source_gcs(catalog, config) |
| 40 | + _mock_encoding_error() |
| 41 | + |
| 42 | + with pytest.raises(AirbyteTracedException) as ate: |
| 43 | + source.check_connection(logger, config) |
| 44 | + |
| 45 | + assert re.search(r"^Unable to connect.+encoding error", ate.value.message) |
| 46 | + |
| 47 | + |
| 48 | +def test_check_connection_with_airbyte_traced_exception_multiple_failures(logger): |
| 49 | + config = SourceGCS.read_config(f"{_config_path()}/config_bad_encoding.json") |
| 50 | + catalog = SourceGCS.read_catalog(f"{_catalog_path()}/catalog_bad_encoding.json") |
| 51 | + source = _source_gcs(catalog, config) |
| 52 | + _mock_encoding_error() |
| 53 | + |
| 54 | + with pytest.raises(AirbyteTracedException) as ate: |
| 55 | + source.check_connection(logger, config) |
| 56 | + |
| 57 | + assert re.search(r"2 streams with errors.+encoding error", ate.value.message) |
0 commit comments