Skip to content

Commit 644ace4

Browse files
authored
🐛 [On call / 171] Source Salesforce: fixed the bug when Bulk fetch took all memory of kube pods (#11692)
1 parent 1613904 commit 644ace4

File tree

10 files changed

+134
-47
lines changed

10 files changed

+134
-47
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,7 @@
689689
- name: Salesforce
690690
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
691691
dockerRepository: airbyte/source-salesforce
692-
dockerImageTag: 1.0.2
692+
dockerImageTag: 1.0.3
693693
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
694694
icon: salesforce.svg
695695
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7298,7 +7298,7 @@
72987298
supportsNormalization: false
72997299
supportsDBT: false
73007300
supported_destination_sync_modes: []
7301-
- dockerImage: "airbyte/source-salesforce:1.0.2"
7301+
- dockerImage: "airbyte/source-salesforce:1.0.3"
73027302
spec:
73037303
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
73047304
connectionSpecification:
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,17 @@
1-
FROM python:3.9.11-alpine3.15 as base
2-
FROM base as builder
1+
FROM python:3.9-slim
32

3+
# Bash is installed for more convenient debugging.
4+
RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*
45

5-
RUN apk --no-cache upgrade \
6-
&& pip install --upgrade pip \
7-
&& apk --no-cache add tzdata build-base
6+
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
87

98
WORKDIR /airbyte/integration_code
9+
COPY source_salesforce ./source_salesforce
1010
COPY setup.py ./
11-
RUN pip install --prefix=/install .
12-
13-
14-
FROM base
15-
COPY --from=builder /install /usr/local
16-
# add default timezone settings
17-
COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime
18-
RUN echo "Etc/UTC" > /etc/timezone
19-
20-
WORKDIR /airbyte/integration_code
2111
COPY main.py ./
22-
COPY source_salesforce ./source_salesforce
12+
RUN pip install .
2313

24-
25-
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
2614
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
2715

28-
LABEL io.airbyte.version=1.0.2
16+
LABEL io.airbyte.version=1.0.3
2917
LABEL io.airbyte.name=airbyte/source-salesforce

airbyte-integrations/connectors/source-salesforce/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ Customize `acceptance-test-config.yml` file to configure tests. See [Source Acce
101101
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
102102
To run your integration tests with acceptance tests, from the connector root, run
103103
```
104-
python -m pytest integration_tests -p integration_tests.acceptance
104+
docker build . --no-cache -t airbyte/source-salesforce:dev \
105+
&& python -m pytest -p source_acceptance_test.plugin
105106
```
106107
To run your integration tests with docker
107108

airbyte-integrations/connectors/source-salesforce/setup.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55

66
from setuptools import find_packages, setup
77

8-
MAIN_REQUIREMENTS = ["airbyte-cdk", "vcrpy==4.1.1"]
8+
MAIN_REQUIREMENTS = ["airbyte-cdk", "vcrpy==4.1.1", "pandas"]
99

10-
TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "requests_mock", "pytest-timeout"]
10+
TEST_REQUIREMENTS = ["pytest~=6.1", "requests_mock", "source-acceptance-test", "pytest-timeout"]
1111

1212
setup(
1313
name="source_salesforce",

airbyte-integrations/connectors/source-salesforce/source_salesforce/exceptions.py

+15
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,16 @@
33
#
44

55

6+
from airbyte_cdk.logger import AirbyteLogger
7+
8+
9+
class Error(Exception):
10+
"""Base Error class for other exceptions"""
11+
12+
# Define the instance of the Native Airbyte Logger
13+
logger = AirbyteLogger()
14+
15+
616
class SalesforceException(Exception):
717
"""
818
Default Salesforce exception.
@@ -13,3 +23,8 @@ class TypeSalesforceException(SalesforceException):
1323
"""
1424
We use this exception for unknown input data types for Salesforce.
1525
"""
26+
27+
28+
class TmpFileIOError(Error):
29+
def __init__(self, msg: str, err: str = None):
30+
self.logger.fatal(f"{msg}. Error: {err}")

airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py

+51-15
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,25 @@
44

55
import csv
66
import ctypes
7-
import io
87
import math
8+
import os
99
import time
1010
from abc import ABC
11+
from contextlib import closing
1112
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Type, Union
1213

14+
import pandas as pd
1315
import pendulum
1416
import requests # type: ignore[import]
1517
from airbyte_cdk.models import SyncMode
1618
from airbyte_cdk.sources.streams.http import HttpStream
1719
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
20+
from numpy import nan
1821
from pendulum import DateTime # type: ignore[attr-defined]
1922
from requests import codes, exceptions
2023

2124
from .api import UNSUPPORTED_FILTERING_STREAMS, Salesforce
22-
from .exceptions import SalesforceException
25+
from .exceptions import SalesforceException, TmpFileIOError
2326
from .rate_limiting import default_backoff_handler
2427

2528
# https://stackoverflow.com/a/54517228
@@ -136,17 +139,17 @@ def path(self, next_page_token: Mapping[str, Any] = None, **kwargs: Any) -> str:
136139
transformer = TypeTransformer(TransformConfig.CustomSchemaNormalization | TransformConfig.DefaultSchemaNormalization)
137140

138141
@default_backoff_handler(max_tries=5, factor=15)
139-
def _send_http_request(self, method: str, url: str, json: dict = None):
142+
def _send_http_request(self, method: str, url: str, json: dict = None, stream: bool = False):
140143
headers = self.authenticator.get_auth_header()
141-
response = self._session.request(method, url=url, headers=headers, json=json)
144+
response = self._session.request(method, url=url, headers=headers, json=json, stream=stream)
142145
if response.status_code not in [200, 204]:
143146
self.logger.error(f"error body: {response.text}, sobject options: {self.sobject_options}")
144147
response.raise_for_status()
145148
return response
146149

147150
def create_stream_job(self, query: str, url: str) -> Optional[str]:
148151
"""
149-
docs: https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/create_job.htm
152+
docs: https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/create_job.html
150153
"""
151154
json = {"operation": "queryAll", "query": query, "contentType": "CSV", "columnDelimiter": "COMMA", "lineEnding": "LF"}
152155
try:
@@ -210,7 +213,7 @@ def wait_for_job(self, url: str) -> str:
210213
# this is only job metadata without payload
211214
error_message = job_info.get("errorMessage")
212215
if not error_message:
213-
# not all failed response can have "errorMessage" and we need to print full response body
216+
# not all failed response can have "errorMessage" and we need to show full response body
214217
error_message = job_info
215218
self.logger.error(f"JobStatus: {job_status}, sobject options: {self.sobject_options}, error message: '{error_message}'")
216219

@@ -257,13 +260,47 @@ def filter_null_bytes(self, s: str):
257260
self.logger.warning("Filter 'null' bytes from string, size reduced %d -> %d chars", len(s), len(res))
258261
return res
259262

260-
def download_data(self, url: str) -> Iterable[Tuple[int, Mapping[str, Any]]]:
261-
job_data = self._send_http_request("GET", f"{url}/results")
262-
decoded_content = self.filter_null_bytes(job_data.content.decode("utf-8"))
263-
fp = io.StringIO(decoded_content, newline="")
264-
csv_data = csv.DictReader(fp, dialect="unix")
265-
for n, row in enumerate(csv_data, 1):
266-
yield n, row
263+
def download_data(self, url: str, chunk_size: float = 1024) -> os.PathLike:
264+
"""
265+
Retrieves binary data result from successfully `executed_job`, using chunks, to avoid local memory limitaions.
266+
@ url: string - the url of the `executed_job`
267+
@ chunk_size: float - the buffer size for each chunk to fetch from stream, in bytes, default: 1024 bytes
268+
269+
Returns the string with file path of downloaded binary data. Saved temporarily.
270+
"""
271+
# set filepath for binary data from response
272+
tmp_file = os.path.realpath(os.path.basename(url))
273+
with closing(self._send_http_request("GET", f"{url}/results", stream=True)) as response:
274+
with open(tmp_file, "w") as data_file:
275+
for chunk in response.iter_content(chunk_size=chunk_size):
276+
data_file.writelines(self.filter_null_bytes(chunk.decode("utf-8")))
277+
# check the file exists
278+
if os.path.isfile(tmp_file):
279+
return tmp_file
280+
else:
281+
raise TmpFileIOError(f"The IO/Error occured while verifying binary data. Stream: {self.name}, file {tmp_file} doesn't exist.")
282+
283+
def read_with_chunks(self, path: str = None, chunk_size: int = 100) -> Iterable[Tuple[int, Mapping[str, Any]]]:
284+
"""
285+
Reads the downloaded binary data, using lines chunks, set by `chunk_size`.
286+
@ path: string - the path to the downloaded temporarily binary data.
287+
@ chunk_size: int - the number of lines to read at a time, default: 100 lines / time.
288+
"""
289+
try:
290+
with open(path, "r", encoding="utf-8") as data:
291+
chunks = pd.read_csv(data, chunksize=chunk_size, iterator=True, dialect="unix")
292+
for chunk in chunks:
293+
chunk = chunk.replace({nan: None}).to_dict(orient="records")
294+
for n, row in enumerate(chunk, 1):
295+
yield n, row
296+
except pd.errors.EmptyDataError as e:
297+
self.logger.info(f"Empty data received. {e}")
298+
yield from []
299+
except IOError as ioe:
300+
raise TmpFileIOError(f"The IO/Error occured while reading tmp data. Called: {path}. Stream: {self.name}", ioe)
301+
finally:
302+
# remove binary tmp file, after data is read
303+
os.remove(path)
267304

268305
def abort_job(self, url: str):
269306
data = {"state": "Aborted"}
@@ -292,7 +329,6 @@ def request_params(
292329

293330
if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS:
294331
query += f"ORDER BY {self.primary_key} ASC LIMIT {self.page_size}"
295-
296332
return {"q": query}
297333

298334
def read_records(
@@ -325,7 +361,7 @@ def read_records(
325361

326362
count = 0
327363
record: Mapping[str, Any] = {}
328-
for count, record in self.download_data(url=job_full_url):
364+
for count, record in self.read_with_chunks(self.download_data(url=job_full_url)):
329365
yield record
330366
self.delete_job(url=job_full_url)
331367

airbyte-integrations/connectors/source-salesforce/unit_tests/api_test.py

+9-8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
33
#
44

5+
56
import csv
67
import io
78
import re
@@ -71,7 +72,7 @@ def test_stream_has_no_state_bulk_api_should_be_used(stream_config, stream_api):
7172
assert isinstance(stream, BulkSalesforceStream)
7273

7374

74-
@pytest.mark.parametrize("item_number", [0, 15, 2000, 2324, 193434])
75+
@pytest.mark.parametrize("item_number", [0, 15, 2000, 2324, 3000])
7576
def test_bulk_sync_pagination(item_number, stream_config, stream_api):
7677
stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api)
7778
test_ids = [i for i in range(1, item_number)]
@@ -203,12 +204,12 @@ def test_download_data_filter_null_bytes(stream_config, stream_api):
203204

204205
with requests_mock.Mocker() as m:
205206
m.register_uri("GET", f"{job_full_url}/results", content=b"\x00")
206-
res = list(stream.download_data(url=job_full_url))
207+
res = list(stream.read_with_chunks(stream.download_data(url=job_full_url)))
207208
assert res == []
208209

209210
m.register_uri("GET", f"{job_full_url}/results", content=b'"Id","IsDeleted"\n\x00"0014W000027f6UwQAI","false"\n\x00\x00')
210-
res = list(stream.download_data(url=job_full_url))
211-
assert res == [(1, {"Id": "0014W000027f6UwQAI", "IsDeleted": "false"})]
211+
res = list(stream.read_with_chunks(stream.download_data(url=job_full_url)))
212+
assert res == [(1, {"Id": "0014W000027f6UwQAI", "IsDeleted": False})]
212213

213214

214215
def test_check_connection_rate_limit(stream_config):
@@ -406,9 +407,9 @@ def test_csv_reader_dialect_unix():
406407
url = "https://fake-account.salesforce.com/services/data/v52.0/jobs/query/7504W00000bkgnpQAA"
407408

408409
data = [
409-
{"Id": "1", "Name": '"first_name" "last_name"'},
410-
{"Id": "2", "Name": "'" + 'first_name"\n' + "'" + 'last_name\n"'},
411-
{"Id": "3", "Name": "first_name last_name"},
410+
{"Id": 1, "Name": '"first_name" "last_name"'},
411+
{"Id": 2, "Name": "'" + 'first_name"\n' + "'" + 'last_name\n"'},
412+
{"Id": 3, "Name": "first_name last_name"},
412413
]
413414

414415
with io.StringIO("", newline="") as csvfile:
@@ -420,7 +421,7 @@ def test_csv_reader_dialect_unix():
420421

421422
with requests_mock.Mocker() as m:
422423
m.register_uri("GET", url + "/results", text=text)
423-
result = [dict(i[1]) for i in stream.download_data(url)]
424+
result = [dict(i[1]) for i in stream.read_with_chunks(stream.download_data(url))]
424425
assert result == data
425426

426427

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#
2+
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
6+
import tracemalloc
7+
8+
import pytest
9+
import requests_mock
10+
from conftest import generate_stream
11+
from source_salesforce.streams import BulkIncrementalSalesforceStream
12+
13+
14+
@pytest.mark.parametrize(
15+
"n_records, first_size, first_peak",
16+
(
17+
(1000, 0.4, 1),
18+
(10000, 1, 2),
19+
(100000, 4, 7),
20+
(200000, 7, 12),
21+
),
22+
ids=[
23+
"1k recods",
24+
"10k records",
25+
"100k records",
26+
"200k records",
27+
],
28+
)
29+
def test_memory_download_data(stream_config, stream_api, n_records, first_size, first_peak):
30+
job_full_url: str = "https://fase-account.salesforce.com/services/data/v52.0/jobs/query/7504W00000bkgnpQAA"
31+
stream: BulkIncrementalSalesforceStream = generate_stream("Account", stream_config, stream_api)
32+
content = b'"Id","IsDeleted"'
33+
for _ in range(n_records):
34+
content += b'"0014W000027f6UwQAI","false"\n'
35+
36+
with requests_mock.Mocker() as m:
37+
m.register_uri("GET", f"{job_full_url}/results", content=content)
38+
tracemalloc.start()
39+
for x in stream.read_with_chunks(stream.download_data(url=job_full_url)):
40+
pass
41+
fs, fp = tracemalloc.get_traced_memory()
42+
first_size_in_mb, first_peak_in_mb = fs / 1024**2, fp / 1024**2
43+
44+
assert first_size_in_mb < first_size
45+
assert first_peak_in_mb < first_peak

docs/integrations/sources/salesforce.md

+1
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ Now that you have set up the Salesforce source connector, check out the followin
122122

123123
| Version | Date | Pull Request | Subject |
124124
|:--------|:-----------| :--- |:---------------------------------------------------------------------------------------------------------------------------------|
125+
| 1.0.3 | 2022-04-04 | [11692](https://github.com/airbytehq/airbyte/pull/11692) | Optimised memory usage for `BULK` API calls |
125126
| 1.0.2 | 2022-03-01 | [10751](https://github.com/airbytehq/airbyte/pull/10751) | Fix broken link anchor in connector configuration |
126127
| 1.0.1 | 2022-02-27 | [10679](https://github.com/airbytehq/airbyte/pull/10679) | Reorganize input parameter order on the UI |
127128
| 1.0.0 | 2022-02-27 | [10516](https://github.com/airbytehq/airbyte/pull/10516) | Speed up schema discovery by using parallelism |

0 commit comments

Comments
 (0)