Skip to content

fix(python-cdk): add user friendly message for encoding errors #44438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
71819bb
Add user friendly message for encoding errors
Aug 20, 2024
674a643
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
strosek Aug 20, 2024
d9919d6
Fix mypy warning incompatible types in curried function _cast_types
Aug 20, 2024
7659e8a
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
strosek Aug 20, 2024
c855178
fix formatting
Aug 20, 2024
992a413
Bump source-gcs patch version
Aug 20, 2024
deff1d7
Raise EncodingError with clean context
Aug 21, 2024
891140c
Make exception throwing chain not broken
Aug 21, 2024
721afb7
Ignore mypy error, reduce filename size in error messages
Aug 21, 2024
9f58cb0
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
strosek Aug 21, 2024
43cbd61
Raise AirbyteTracedException in get_json_schema
Aug 21, 2024
ccc8cce
Raise Trace message for good UI display
Aug 21, 2024
c4436e3
Revert source-gcs patch bump
Aug 21, 2024
c82739c
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
strosek Aug 21, 2024
8871f0c
Tell configured encoding in error message
Aug 22, 2024
9e8f80f
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
strosek Aug 22, 2024
d16eda4
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
strosek Aug 22, 2024
4a1ef93
Raise AirbyteTracedException and summarize at the end
Aug 23, 2024
86f5518
Use ATE instead of EncodingError, add test encoding exception test case
Aug 24, 2024
8262c04
Removed unused code
Aug 26, 2024
03f19c8
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
strosek Aug 26, 2024
6da1d7a
Delete airbyte-integrations/connectors/source-gcs/unit_tests/test_sou…
strosek Aug 26, 2024
84a34bb
Make mypy ignore hint clearer
Aug 26, 2024
98d91fb
Fix index error when traceback not saved
Aug 26, 2024
63409c5
Fix index only reason available
Aug 26, 2024
404959b
Fix broken tests, fix dead code block in verify_check()
Aug 27, 2024
4c653d9
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
strosek Aug 27, 2024
7555981
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
girarda Aug 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import traceback
from typing import TYPE_CHECKING, Optional, Tuple

from airbyte_cdk import AirbyteTracedException
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.file_based.availability_strategy import AbstractFileBasedAvailabilityStrategy
from airbyte_cdk.sources.file_based.exceptions import CheckAvailabilityError, CustomFileBasedException, FileBasedSourceError
Expand Down Expand Up @@ -66,6 +67,8 @@ def check_availability_and_parsability(
# If the parser is set to not check parsability, we still want to check that we can open the file.
handle = stream.stream_reader.open_file(file, parser.file_read_mode, None, logger)
handle.close()
except AirbyteTracedException as ate:
raise ate
except CheckAvailabilityError:
return False, "".join(traceback.format_exc())

Expand Down Expand Up @@ -98,6 +101,8 @@ def _check_parse_record(self, stream: "AbstractFileBasedStream", file: RemoteFil
# consider the connection check successful even though it means
# we skip the schema validation check.
return
except AirbyteTracedException as ate:
raise ate
except Exception as exc:
raise CheckAvailabilityError(FileBasedSourceError.ERROR_READING_FILE, stream=stream.name, file=file.uri) from exc

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class FileBasedSourceError(Enum):
GLOB_PARSE_ERROR = (
"Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split."
)
ENCODING_ERROR = "File encoding error. The configured encoding must match file encoding."
ERROR_CASTING_VALUE = "Could not cast the value to the expected type."
ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = "Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string."
ERROR_DECODING_VALUE = "Expected a JSON-decodeable value but could not decode record."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
)

errors = []
tracebacks = []
for stream in streams:
if not isinstance(stream, AbstractFileBasedStream):
raise ValueError(f"Stream {stream} is not a file-based stream.")
Expand All @@ -130,12 +131,34 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
stream_is_available,
reason,
) = stream.availability_strategy.check_availability_and_parsability(stream, logger, self)
except AirbyteTracedException as ate:
errors.append(f"Unable to connect to stream {stream.name} - {ate.message}")
tracebacks.append(traceback.format_exc())
except Exception:
errors.append(f"Unable to connect to stream {stream.name} - {''.join(traceback.format_exc())}")
errors.append(f"Unable to connect to stream {stream.name}")
tracebacks.append(traceback.format_exc())
else:
if not stream_is_available and reason:
errors.append(reason)

if len(errors) == 1 and len(tracebacks) == 1:
raise AirbyteTracedException(
internal_message=tracebacks[0],
message=f"{errors[0]}",
failure_type=FailureType.config_error,
)
if len(errors) == 1 and len(tracebacks) == 0:
raise AirbyteTracedException(
message=f"{errors[0]}",
failure_type=FailureType.config_error,
)
elif len(errors) > 1:
raise AirbyteTracedException(
internal_message="\n".join(tracebacks),
message=f"{len(errors)} streams with errors: {', '.join(error for error in errors)}",
failure_type=FailureType.config_error,
)

return not bool(errors), (errors or None)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ def read_data(
quoting=csv.QUOTE_MINIMAL,
)
with stream_reader.open_file(file, file_read_mode, config_format.encoding, logger) as fp:
headers = self._get_headers(fp, config_format, dialect_name)
try:
headers = self._get_headers(fp, config_format, dialect_name)
except UnicodeError:
raise AirbyteTracedException(
message=f"{FileBasedSourceError.ENCODING_ERROR.value} Expected encoding: {config_format.encoding}",
)

rows_to_skip = (
config_format.skip_rows_before_header
Expand Down Expand Up @@ -274,7 +279,7 @@ def _pre_propcess_property_types(property_types: Dict[str, Any]) -> Mapping[str,

@staticmethod
def _cast_types(
row: Dict[str, str], deduped_property_types: Dict[str, str], config_format: CsvFormat, logger: logging.Logger
row: Dict[str, str], deduped_property_types: Mapping[str, str], config_format: CsvFormat, logger: logging.Logger
) -> Dict[str, Any]:
"""
Casts the values in the input 'row' dictionary according to the types defined in the JSON schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def state(self, value: MutableMapping[str, Any]) -> None:
"""State setter, accept state serialized by state getter."""
self._cursor.set_initial_state(value)

@property
@property # type: ignore # mypy complains wrong type, but AbstractFileBasedCursor is parent of file-based cursors
def cursor(self) -> Optional[AbstractFileBasedCursor]:
return self._cursor

Expand Down Expand Up @@ -172,13 +172,14 @@ def get_json_schema(self) -> JsonSchema:
try:
schema = self._get_raw_json_schema()
except InvalidSchemaError as config_exception:
self.logger.exception(FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value, exc_info=config_exception)
raise AirbyteTracedException(
internal_message="Please check the logged errors for more information.",
message=FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value,
exception=AirbyteTracedException(exception=config_exception),
failure_type=FailureType.config_error,
)
except AirbyteTracedException as ate:
raise ate
except Exception as exc:
raise SchemaInferenceError(FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name) from exc
else:
Expand Down Expand Up @@ -279,6 +280,8 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
for task in done:
try:
base_schema = merge_schemas(base_schema, task.result())
except AirbyteTracedException as ate:
raise ate
except Exception as exc:
self.logger.error(f"An error occurred inferring the schema. \n {traceback.format_exc()}", exc_info=exc)

Expand All @@ -287,6 +290,8 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
async def _infer_file_schema(self, file: RemoteFile) -> SchemaType:
try:
return await self.get_parser().infer_schema(self.config, file, self.stream_reader, self.logger)
except AirbyteTracedException as ate:
raise ate
except Exception as exc:
raise SchemaInferenceError(
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,17 @@ def test_parse_field_size_larger_than_default_python_maximum(self) -> None:
data_generator = self._read_data()
assert list(data_generator) == [{"header1": "1", "header2": long_string}]

def test_read_data_with_encoding_error(self) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test ATE raised.

self._stream_reader.open_file.return_value = CsvFileBuilder().with_data(["something"]).build()
self._csv_reader._get_headers = Mock(side_effect=UnicodeDecodeError("encoding", b"", 0, 1, "reason"))

with pytest.raises(AirbyteTracedException) as ate:
data_generator = self._read_data()
assert len(list(data_generator)) == 0

assert "encoding" in ate.value.message
assert self._csv_reader._get_headers.called

def _read_data(self) -> Generator[Dict[str, str], None, None]:
data_generator = self._csv_reader.read_data(
self._config,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk import AirbyteTracedException
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError
from unit_tests.sources.file_based.helpers import (
FailingSchemaValidationPolicy,
Expand Down Expand Up @@ -130,7 +130,7 @@
_base_failure_scenario.copy()
.set_name("error_empty_stream_scenario")
.set_source_builder(_base_failure_scenario.copy().source_builder.copy().set_files({}))
.set_expected_check_error(None, FileBasedSourceError.EMPTY_STREAM.value)
.set_expected_check_error(AirbyteTracedException, FileBasedSourceError.EMPTY_STREAM.value)
).build()


Expand All @@ -142,7 +142,7 @@
TestErrorListMatchingFilesInMemoryFilesStreamReader(files=_base_failure_scenario.source_builder._files, file_type="csv")
)
)
.set_expected_check_error(None, FileBasedSourceError.ERROR_LISTING_FILES.value)
.set_expected_check_error(AirbyteTracedException, FileBasedSourceError.ERROR_LISTING_FILES.value)
).build()


Expand All @@ -154,7 +154,7 @@
TestErrorOpenFileInMemoryFilesStreamReader(files=_base_failure_scenario.source_builder._files, file_type="csv")
)
)
.set_expected_check_error(None, FileBasedSourceError.ERROR_READING_FILE.value)
.set_expected_check_error(AirbyteTracedException, FileBasedSourceError.ERROR_READING_FILE.value)
).build()


Expand Down Expand Up @@ -216,5 +216,5 @@
],
}
)
.set_expected_check_error(None, FileBasedSourceError.ERROR_READING_FILE.value)
.set_expected_check_error(AirbyteTracedException, FileBasedSourceError.ERROR_READING_FILE.value)
).build()
Original file line number Diff line number Diff line change
Expand Up @@ -3240,6 +3240,7 @@
}
)
.set_expected_records(None)
.set_expected_check_error(AirbyteTracedException, None)
).build()

csv_no_records_scenario: TestScenario[InMemoryFilesSource] = (
Expand Down Expand Up @@ -3343,4 +3344,5 @@
}
)
.set_expected_records(None)
.set_expected_check_error(AirbyteTracedException, None)
).build()
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,11 @@ def verify_check(capsys: CaptureFixture[str], tmp_path: PosixPath, scenario: Tes
expected_exc, expected_msg = scenario.expected_check_error

if expected_exc:
with pytest.raises(expected_exc):
output = check(capsys, tmp_path, scenario)
if expected_msg:
# expected_msg is a string. what's the expected value field?
assert expected_msg in output["message"] # type: ignore
assert output["status"] == scenario.expected_check_status
with pytest.raises(expected_exc) as exc:
check(capsys, tmp_path, scenario)

if expected_msg:
Copy link
Contributor Author

@strosek strosek Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block was never executed because previous block exited with the exception. Also, output was None, since the code never returned. Tests should be more thorough now.

assert expected_msg in exc.value.message

else:
output = check(capsys, tmp_path, scenario)
Expand Down
Loading