From 7150a185a0f47dbe87038b2978c6794835ba2d06 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 4 May 2022 09:54:54 -0700 Subject: [PATCH 1/4] use utf8 by default and iso as fallback --- .../source_salesforce/streams.py | 18 ++++++++++++++++-- .../source-salesforce/unit_tests/api_test.py | 6 ++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 45dbc96a2d4ee..842f65a7b43de 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -30,11 +30,13 @@ CSV_FIELD_SIZE_LIMIT = int(ctypes.c_ulong(-1).value // 2) csv.field_size_limit(CSV_FIELD_SIZE_LIMIT) +DEFAULT_ENCODING = "utf-8" + class SalesforceStream(HttpStream, ABC): page_size = 2000 transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) - encoding = "ISO-8859-1" + encoding = DEFAULT_ENCODING def __init__( self, sf_api: Salesforce, pk: str, stream_name: str, sobject_options: Mapping[str, Any] = None, schema: dict = None, **kwargs @@ -46,6 +48,18 @@ def __init__( self.schema: Mapping[str, Any] = schema # type: ignore[assignment] self.sobject_options = sobject_options + def decode(self, chunk): + if self.encoding == DEFAULT_ENCODING: + try: + decoded = chunk.decode(self.encoding) + return decoded + except UnicodeDecodeError as e: + self.encoding = "ISO-8859-1" + self.logger.info(f"Could not decode chunk. Falling back to {self.encoding} encoding. Error: {e}") + return self.decode(chunk) + else: + return chunk.decode(self.encoding) + @property def name(self) -> str: return self.stream_name @@ -275,7 +289,7 @@ def download_data(self, url: str, chunk_size: float = 1024) -> os.PathLike: with closing(self._send_http_request("GET", f"{url}/results", stream=True)) as response: with open(tmp_file, "w") as data_file: for chunk in response.iter_content(chunk_size=chunk_size): - data_file.writelines(self.filter_null_bytes(chunk.decode(self.encoding))) + data_file.writelines(self.filter_null_bytes(self.decode(chunk))) # check the file exists if os.path.isfile(tmp_file): return tmp_file diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index c95150d08daca..9b497be5ff28d 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -527,5 +527,7 @@ def test_convert_to_standard_instance(stream_config, stream_api): assert isinstance(rest_stream, IncrementalSalesforceStream) -def test_decoding(): - assert b"0\xe5".decode(SalesforceStream.encoding) == "0å" +def test_decoding(stream_config, stream_api): + stream_name = "AcceptedEventRelation" + stream = generate_stream(stream_name, stream_config, stream_api) + assert stream.decode(b"0\xe5") == "0å" From 1c5e29a3a1df8ca0c410cf2114de0afb9c333921 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 4 May 2022 09:59:50 -0700 Subject: [PATCH 2/4] test both --- .../connectors/source-salesforce/unit_tests/api_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py index 9b497be5ff28d..d8641723eb175 100644 --- a/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py +++ b/airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py @@ -530,4 +530,5 @@ def test_convert_to_standard_instance(stream_config, stream_api): def test_decoding(stream_config, stream_api): stream_name = "AcceptedEventRelation" stream = generate_stream(stream_name, stream_config, stream_api) + assert stream.decode(b"\xe9\x97\xb4\xe5\x8d\x95\xe7\x9a\x84\xe8\xaf\xb4 \xf0\x9f\xaa\x90") == "间单的说 🪐" assert stream.decode(b"0\xe5") == "0å" From 22270cc14193435fa72aa8e38bac513390c638ee Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 4 May 2022 10:39:11 -0700 Subject: [PATCH 3/4] add comment --- .../source-salesforce/source_salesforce/streams.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 842f65a7b43de..72c3580a02761 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -49,6 +49,11 @@ def __init__( self.sobject_options = sobject_options def decode(self, chunk): + """ + Most Salesforce instances use UTF-8, but some use ISO-8859-1. + By default, we'll decode using UTF-8, and fallback to ISO-8859-1 if it doesn't work. + See implementation considerations for more details https://developer.salesforce.com/docs/atlas.en-us.api.meta/api/implementation_considerations.htm + """ if self.encoding == DEFAULT_ENCODING: try: decoded = chunk.decode(self.encoding) From 8ad3a1df227a6a3e66acc3b77bd8999688f42521 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 4 May 2022 10:46:59 -0700 Subject: [PATCH 4/4] Bump version --- airbyte-integrations/connectors/source-salesforce/Dockerfile | 2 +- docs/integrations/sources/salesforce.md | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/Dockerfile b/airbyte-integrations/connectors/source-salesforce/Dockerfile index 232af7c81765b..6b49f0c006eee 100644 --- a/airbyte-integrations/connectors/source-salesforce/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce/Dockerfile @@ -13,5 +13,5 @@ RUN pip install . ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=1.0.7 +LABEL io.airbyte.version=1.0.8 LABEL io.airbyte.name=airbyte/source-salesforce diff --git a/docs/integrations/sources/salesforce.md b/docs/integrations/sources/salesforce.md index c6f39609b0c2d..30ebd3625d218 100644 --- a/docs/integrations/sources/salesforce.md +++ b/docs/integrations/sources/salesforce.md @@ -117,7 +117,8 @@ Now that you have set up the Salesforce source connector, check out the followin | Version | Date | Pull Request | Subject | |:--------|:-----------|:-------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------| -| 1.0.7 | 2022-04-27 | [12552](https://github.com/airbytehq/airbyte/pull/12552) | Decode responses as ISO-8859-1 instead of utf-8 | +| 1.0.8 | 2022-05-04 | [12576](https://github.com/airbytehq/airbyte/pull/12576) | Decode responses as utf-8 and fallback to ISO-8859-1 if needed | +| 1.0.7 | 2022-05-03 | [12552](https://github.com/airbytehq/airbyte/pull/12552) | Decode responses as ISO-8859-1 instead of utf-8 | | 1.0.4 | 2022-04-27 | [12335](https://github.com/airbytehq/airbyte/pull/12335) | Adding fixtures to mock time.sleep for connectors that explicitly sleep | | 1.0.3 | 2022-04-04 | [11692](https://github.com/airbytehq/airbyte/pull/11692) | Optimised memory usage for `BULK` API calls | | 1.0.2 | 2022-03-01 | [10751](https://github.com/airbytehq/airbyte/pull/10751) | Fix broken link anchor in connector configuration |