Skip to content

fix(python-cdk): add user friendly message for encoding errors #44438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
71819bb
Add user friendly message for encoding errors
Aug 20, 2024
674a643
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
strosek Aug 20, 2024
d9919d6
Fix mypy warning incompatible types in curried function _cast_types
Aug 20, 2024
7659e8a
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
strosek Aug 20, 2024
c855178
fix formatting
Aug 20, 2024
992a413
Bump source-gcs patch version
Aug 20, 2024
deff1d7
Raise EncodingError with clean context
Aug 21, 2024
891140c
Make exception throwing chain not broken
Aug 21, 2024
721afb7
Ignore mypy error, reduce filename size in error messages
Aug 21, 2024
9f58cb0
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
strosek Aug 21, 2024
43cbd61
Raise AirbyteTracedException in get_json_schema
Aug 21, 2024
ccc8cce
Raise Trace message for good UI display
Aug 21, 2024
c4436e3
Revert source-gcs patch bump
Aug 21, 2024
c82739c
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
strosek Aug 21, 2024
8871f0c
Tell configured encoding in error message
Aug 22, 2024
9e8f80f
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
strosek Aug 22, 2024
d16eda4
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
strosek Aug 22, 2024
4a1ef93
Raise AirbyteTracedException and summarize at the end
Aug 23, 2024
86f5518
Use ATE instead of EncodingError, add test encoding exception test case
Aug 24, 2024
8262c04
Removed unused code
Aug 26, 2024
03f19c8
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
strosek Aug 26, 2024
6da1d7a
Delete airbyte-integrations/connectors/source-gcs/unit_tests/test_sou…
strosek Aug 26, 2024
84a34bb
Make mypy ignore hint clearer
Aug 26, 2024
98d91fb
Fix index error when traceback not saved
Aug 26, 2024
63409c5
Fix index only reason available
Aug 26, 2024
404959b
Fix broken tests, fix dead code block in verify_check()
Aug 27, 2024
4c653d9
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
strosek Aug 27, 2024
7555981
Merge branch 'master' into strosek/gcs_decode_error_issue_8952
girarda Aug 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from airbyte_cdk.sources import Source
from airbyte_cdk.sources.file_based.availability_strategy import AbstractFileBasedAvailabilityStrategy
from airbyte_cdk.sources.file_based.exceptions import CheckAvailabilityError, CustomFileBasedException, FileBasedSourceError
from airbyte_cdk.sources.file_based.exceptions import CheckAvailabilityError, CustomFileBasedException, EncodingError, FileBasedSourceError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import conforms_to_schema
Expand Down Expand Up @@ -66,6 +66,8 @@ def check_availability_and_parsability(
# If the parser is set to not check parsability, we still want to check that we can open the file.
handle = stream.stream_reader.open_file(file, parser.file_read_mode, None, logger)
handle.close()
except EncodingError as encoding_error:
raise encoding_error
except CheckAvailabilityError:
return False, "".join(traceback.format_exc())

Expand Down Expand Up @@ -98,6 +100,8 @@ def _check_parse_record(self, stream: "AbstractFileBasedStream", file: RemoteFil
# consider the connection check successful even though it means
# we skip the schema validation check.
return
except EncodingError as encoding_error:
raise encoding_error
except Exception as exc:
raise CheckAvailabilityError(FileBasedSourceError.ERROR_READING_FILE, stream=stream.name, file=file.uri) from exc

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ class FileBasedSourceError(Enum):
GLOB_PARSE_ERROR = (
"Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split."
)
ENCODING_ERROR = (
"Error decoding file. Please check that the configured encoding matches the encoding of the file. Default encoding is UTF-8."
)
ERROR_CASTING_VALUE = "Could not cast the value to the expected type."
ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = "Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string."
ERROR_DECODING_VALUE = "Expected a JSON-decodeable value but could not decode record."
Expand Down Expand Up @@ -76,6 +79,10 @@ class ConfigValidationError(BaseFileBasedSourceError):
pass


class EncodingError(BaseFileBasedSourceError):
pass


class InvalidSchemaError(BaseFileBasedSourceError):
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig, ValidationPolicy
from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy, DefaultDiscoveryPolicy
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedErrorsCollector, FileBasedSourceError
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, EncodingError, FileBasedErrorsCollector, FileBasedSourceError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.file_types import default_parsers
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
Expand Down Expand Up @@ -130,6 +130,13 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
stream_is_available,
reason,
) = stream.availability_strategy.check_availability_and_parsability(stream, logger, self)
except EncodingError as encoding_error:
raise AirbyteTracedException(
internal_message="File encoding does not match configuration.",
message=FileBasedSourceError.ENCODING_ERROR.value,
exception=encoding_error,
failure_type=FailureType.config_error,
)
except Exception:
errors.append(f"Unable to connect to stream {stream.name} - {''.join(traceback.format_exc())}")
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat, CsvHeaderAutogenerated, CsvHeaderUserProvided, InferenceType
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
from airbyte_cdk.sources.file_based.exceptions import EncodingError, FileBasedSourceError, RecordParseError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
Expand Down Expand Up @@ -52,7 +52,14 @@ def read_data(
quoting=csv.QUOTE_MINIMAL,
)
with stream_reader.open_file(file, file_read_mode, config_format.encoding, logger) as fp:
headers = self._get_headers(fp, config_format, dialect_name)
try:
headers = self._get_headers(fp, config_format, dialect_name)
except UnicodeError:
file_name = file.uri
# Reduce huge file names in error messages
if "?" in file.uri:
file_name = file.uri[: file.uri.find("?")]
raise EncodingError(FileBasedSourceError.ENCODING_ERROR, filename=file_name) from None

rows_to_skip = (
config_format.skip_rows_before_header
Expand Down Expand Up @@ -274,7 +281,7 @@ def _pre_propcess_property_types(property_types: Dict[str, Any]) -> Mapping[str,

@staticmethod
def _cast_types(
row: Dict[str, str], deduped_property_types: Dict[str, str], config_format: CsvFormat, logger: logging.Logger
row: Dict[str, str], deduped_property_types: Mapping[str, str], config_format: CsvFormat, logger: logging.Logger
) -> Dict[str, Any]:
"""
Casts the values in the input 'row' dictionary according to the types defined in the JSON schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
from airbyte_cdk.sources.file_based.exceptions import (
EncodingError,
FileBasedSourceError,
InvalidSchemaError,
MissingSchemaError,
Expand Down Expand Up @@ -54,7 +55,7 @@ def state(self, value: MutableMapping[str, Any]) -> None:
"""State setter, accept state serialized by state getter."""
self._cursor.set_initial_state(value)

@property
@property # type: ignore # no suitable parent cursor type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it not suitable / why can't the typing error be fixed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at several of the Cursor offered by the IDE, and they didn't seem right. The only Cursor object that fixed the problem was the GCS Cursor, but this is a more generic file as it belongs to the CDK and not the specific GCS source.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the error exactly? Is the GCS cursor not an AbstractFileBasedCursor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that GCS cursor is not generic, as this file belongs to the CDK. Error is error: Signature of "cursor" incompatible with supertype "Stream" [override]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What’s the expected type from the parent class? Is it really a mismatch? If not, can the comment be more clear about why this is a mypy problem, not with the type

Copy link
Contributor Author

@strosek strosek Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a mismatch. gcs.Cursor inherits from DefualtFileBasedCursor
, which inherits from AbstractFileBasedCursor. Improved comment.

def cursor(self) -> Optional[AbstractFileBasedCursor]:
return self._cursor

Expand Down Expand Up @@ -172,13 +173,19 @@ def get_json_schema(self) -> JsonSchema:
try:
schema = self._get_raw_json_schema()
except InvalidSchemaError as config_exception:
self.logger.exception(FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value, exc_info=config_exception)
raise AirbyteTracedException(
internal_message="Please check the logged errors for more information.",
message=FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value,
exception=AirbyteTracedException(exception=config_exception),
failure_type=FailureType.config_error,
)
except EncodingError as encoding_error:
raise AirbyteTracedException(
internal_message="File encoding does not match configuration.",
message=FileBasedSourceError.ENCODING_ERROR.value,
exception=encoding_error,
failure_type=FailureType.config_error,
)
except Exception as exc:
raise SchemaInferenceError(FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name) from exc
else:
Expand Down Expand Up @@ -279,6 +286,8 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
for task in done:
try:
base_schema = merge_schemas(base_schema, task.result())
except EncodingError as encoding_error:
raise encoding_error
except Exception as exc:
self.logger.error(f"An error occurred inferring the schema. \n {traceback.format_exc()}", exc_info=exc)

Expand All @@ -287,6 +296,8 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
async def _infer_file_schema(self, file: RemoteFile) -> SchemaType:
try:
return await self.get_parser().infer_schema(self.config, file, self.stream_reader, self.logger)
except EncodingError as encoding_error:
raise encoding_error
except Exception as exc:
raise SchemaInferenceError(
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
Expand Down
Loading