-
Notifications
You must be signed in to change notification settings - Fork 4.6k
fix(python-cdk): add user friendly message for encoding errors #44438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 17 commits
71819bb
674a643
d9919d6
7659e8a
c855178
992a413
deff1d7
891140c
721afb7
9f58cb0
43cbd61
ccc8cce
c4436e3
c82739c
8871f0c
9e8f80f
d16eda4
4a1ef93
86f5518
8262c04
03f19c8
6da1d7a
84a34bb
98d91fb
63409c5
404959b
4c653d9
7555981
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
from airbyte_cdk.models import Type as MessageType | ||
from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType | ||
from airbyte_cdk.sources.file_based.exceptions import ( | ||
EncodingError, | ||
FileBasedSourceError, | ||
InvalidSchemaError, | ||
MissingSchemaError, | ||
|
@@ -54,7 +55,7 @@ def state(self, value: MutableMapping[str, Any]) -> None: | |
"""State setter, accept state serialized by state getter.""" | ||
self._cursor.set_initial_state(value) | ||
|
||
@property | ||
@property # type: ignore # no suitable parent cursor type | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is it not suitable / why can't the typing error be fixed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I looked at several of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the error exactly? Is the GCS cursor not an AbstractFileBasedCursor? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is that GCS cursor is not generic, as this file belongs to the CDK. Error is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What’s the expected type from the parent class? Is it really a mismatch? If not, can the comment be more clear about why this is a mypy problem, not with the type There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not a mismatch. |
||
def cursor(self) -> Optional[AbstractFileBasedCursor]: | ||
return self._cursor | ||
|
||
|
@@ -172,13 +173,19 @@ def get_json_schema(self) -> JsonSchema: | |
try: | ||
schema = self._get_raw_json_schema() | ||
except InvalidSchemaError as config_exception: | ||
self.logger.exception(FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value, exc_info=config_exception) | ||
raise AirbyteTracedException( | ||
internal_message="Please check the logged errors for more information.", | ||
message=FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value, | ||
exception=AirbyteTracedException(exception=config_exception), | ||
failure_type=FailureType.config_error, | ||
) | ||
except EncodingError as encoding_error: | ||
raise AirbyteTracedException( | ||
internal_message="File encoding does not match configuration.", | ||
message=f"{FileBasedSourceError.ENCODING_ERROR.value} Expected encoding: {encoding_error.expected_encoding}.", | ||
exception=encoding_error, | ||
failure_type=FailureType.config_error, | ||
) | ||
except Exception as exc: | ||
raise SchemaInferenceError(FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name) from exc | ||
else: | ||
|
@@ -279,6 +286,8 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: | |
for task in done: | ||
try: | ||
base_schema = merge_schemas(base_schema, task.result()) | ||
except EncodingError as encoding_error: | ||
raise encoding_error | ||
except Exception as exc: | ||
self.logger.error(f"An error occurred inferring the schema. \n {traceback.format_exc()}", exc_info=exc) | ||
|
||
|
@@ -287,6 +296,8 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: | |
async def _infer_file_schema(self, file: RemoteFile) -> SchemaType: | ||
try: | ||
return await self.get_parser().infer_schema(self.config, file, self.stream_reader, self.logger) | ||
except EncodingError as encoding_error: | ||
raise encoding_error | ||
except Exception as exc: | ||
raise SchemaInferenceError( | ||
FileBasedSourceError.SCHEMA_INFERENCE_ERROR, | ||
|
Uh oh!
There was an error while loading. Please reload this page.