Skip to content

Commit 5bc8ec2

Browse files
authored
🐛 Source salesforce: processing of failed jobs (#10141)
1 parent b230543 commit 5bc8ec2

File tree

13 files changed

+263
-90
lines changed

13 files changed

+263
-90
lines changed

airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b117307c-14b6-41aa-9422-947e34922962.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"sourceDefinitionId": "b117307c-14b6-41aa-9422-947e34922962",
33
"name": "Salesforce",
44
"dockerRepository": "airbyte/source-salesforce",
5-
"dockerImageTag": "0.1.22",
5+
"dockerImageTag": "0.1.23",
66
"documentationUrl": "https://docs.airbyte.io/integrations/sources/salesforce",
77
"icon": "salesforce.svg"
88
}

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,7 @@
648648
- name: Salesforce
649649
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
650650
dockerRepository: airbyte/source-salesforce
651-
dockerImageTag: 0.1.22
651+
dockerImageTag: 0.1.23
652652
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
653653
icon: salesforce.svg
654654
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

+1-11
Original file line numberDiff line numberDiff line change
@@ -6801,7 +6801,7 @@
68016801
supportsNormalization: false
68026802
supportsDBT: false
68036803
supported_destination_sync_modes: []
6804-
- dockerImage: "airbyte/source-salesforce:0.1.22"
6804+
- dockerImage: "airbyte/source-salesforce:0.1.23"
68056805
spec:
68066806
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
68076807
connectionSpecification:
@@ -6879,16 +6879,6 @@
68796879
title: "Streams filter criteria"
68806880
description: "Add selection criteria for streams to get only streams that\
68816881
\ are relevant to you"
6882-
wait_timeout:
6883-
title: "Response Waiting Time"
6884-
description: "Maximum wait time of Salesforce responses in minutes. This\
6885-
\ option is used for the BULK mode only. The default wait time of the\
6886-
\ Parent Batch in the Bulk Mode to wait for all the batches to finish\
6887-
\ processing is 20 minutes."
6888-
type: "integer"
6889-
minimum: 5
6890-
maximum: 60
6891-
default: 10
68926882
supportsNormalization: false
68936883
supportsDBT: false
68946884
supported_destination_sync_modes: []

airbyte-integrations/connectors/source-salesforce/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@ COPY source_salesforce ./source_salesforce
2525
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
2626
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
2727

28-
LABEL io.airbyte.version=0.1.22
28+
LABEL io.airbyte.version=0.1.23
2929
LABEL io.airbyte.name=airbyte/source-salesforce

airbyte-integrations/connectors/source-salesforce/acceptance-test-config.yml

+3
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ tests:
77
connection:
88
- config_path: "secrets/config.json"
99
status: "succeed"
10+
- config_path: "secrets/config_sandbox.json"
11+
status: "succeed"
1012
- config_path: "integration_tests/invalid_config.json"
1113
status: "failed"
14+
1215
discovery:
1316
- config_path: "secrets/config.json"
1417
basic_read:

airbyte-integrations/connectors/source-salesforce/integration_tests/bulk_error_test.py

+48
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44

55
import json
66
import logging
7+
import re
78
from pathlib import Path
89
from typing import Any, Mapping
910

1011
import pytest
12+
import requests_mock
13+
from airbyte_cdk.models import SyncMode
1114
from airbyte_cdk.sources.streams import Stream
1215
from source_salesforce.source import SourceSalesforce
1316

@@ -20,6 +23,12 @@ def parse_input_config():
2023
return json.loads(file.read())
2124

2225

26+
@pytest.fixture(name="input_sandbox_config")
27+
def parse_input_sandbox_config():
28+
with open(HERE.parent / "secrets/config_sandbox.json", "r") as file:
29+
return json.loads(file.read())
30+
31+
2332
def get_stream(input_config: Mapping[str, Any], stream_name: str) -> Stream:
2433
stream_cls = type("a", (object,), {"name": stream_name})
2534
configured_stream_cls = type("b", (object,), {"stream": stream_cls()})
@@ -42,3 +51,42 @@ def test_not_queryable_stream(caplog, input_config):
4251

4352
# check logs
4453
assert "is not queryable" in caplog.records[-1].message
54+
55+
56+
@pytest.mark.parametrize(
57+
"stream_name,log_messages",
58+
(
59+
(
60+
"Dashboard",
61+
["switch to STANDARD(non-BULK) sync"],
62+
),
63+
# CategoryNode has access limitation thus SF returns failed job statuses
64+
(
65+
"CategoryNode",
66+
["insufficient access rights on cross-reference id", "switch to STANDARD(non-BULK) sync"],
67+
),
68+
),
69+
ids=["successful_switching", "failed_switching"],
70+
)
71+
def test_failed_jobs_with_successful_switching(caplog, input_sandbox_config, stream_name, log_messages):
72+
stream = get_stream(input_sandbox_config, stream_name)
73+
expected_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh))
74+
75+
create_query_matcher = re.compile(r"jobs/query$")
76+
job_matcher = re.compile(r"jobs/query/fake_id$")
77+
loaded_record_ids = []
78+
with requests_mock.Mocker(real_http=True) as m:
79+
m.register_uri(
80+
"POST",
81+
create_query_matcher,
82+
json={
83+
"id": "fake_id",
84+
},
85+
)
86+
m.register_uri("GET", job_matcher, json={"state": "Failed", "errorMessage": "unknown error"})
87+
m.register_uri("DELETE", job_matcher, json={})
88+
with caplog.at_level(logging.WARNING):
89+
loaded_record_ids = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.full_refresh))
90+
for i, log_message in enumerate(log_messages, 1):
91+
assert log_message in caplog.records[-i].message
92+
assert loaded_record_ids == expected_record_ids

airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py

+26-17
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from .exceptions import TypeSalesforceException
1313
from .rate_limiting import default_backoff_handler
14-
from .utils import filter_streams
14+
from .utils import filter_streams_by_criteria
1515

1616
STRING_TYPES = [
1717
"byte",
@@ -191,7 +191,9 @@ def __init__(
191191
self.access_token = None
192192
self.instance_url = None
193193
self.session = requests.Session()
194-
self.is_sandbox = is_sandbox is True or (isinstance(is_sandbox, str) and is_sandbox.lower() == "true")
194+
self.is_sandbox = is_sandbox in [True, "true"]
195+
if self.is_sandbox:
196+
self.logger.info("using SANDBOX of Salesforce")
195197
self.start_date = start_date
196198

197199
def _get_standard_headers(self):
@@ -206,30 +208,37 @@ def filter_streams(self, stream_name: str) -> bool:
206208
return False
207209
return True
208210

209-
def get_validated_streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None):
210-
salesforce_objects = self.describe()["sobjects"]
211-
stream_objects = []
212-
for stream_object in salesforce_objects:
211+
def get_validated_streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None) -> Mapping[str, Any]:
212+
"""Selects all validated streams with additional filtering:
213+
1) skip all sobjects with negative value of the flag "queryable"
214+
2) user can set search criterias of necessary streams
215+
3) selection by catalog settings
216+
"""
217+
stream_objects = {}
218+
for stream_object in self.describe()["sobjects"]:
213219
if stream_object["queryable"]:
214-
stream_objects.append(stream_object)
220+
stream_objects[stream_object.pop("name")] = stream_object
215221
else:
216222
self.logger.warn(f"Stream {stream_object['name']} is not queryable and will be ignored.")
217223

218-
stream_names = [stream_object["name"] for stream_object in stream_objects]
219224
if catalog:
220-
return [configured_stream.stream.name for configured_stream in catalog.streams], stream_objects
225+
return {
226+
configured_stream.stream.name: stream_objects[configured_stream.stream.name]
227+
for configured_stream in catalog.streams
228+
if configured_stream.stream.name in stream_objects
229+
}
221230

231+
stream_names = list(stream_objects.keys())
222232
if config.get("streams_criteria"):
223233
filtered_stream_list = []
224234
for stream_criteria in config["streams_criteria"]:
225-
filtered_stream_list += filter_streams(
235+
filtered_stream_list += filter_streams_by_criteria(
226236
streams_list=stream_names, search_word=stream_criteria["value"], search_criteria=stream_criteria["criteria"]
227237
)
228238
stream_names = list(set(filtered_stream_list))
229239

230240
validated_streams = [stream_name for stream_name in stream_names if self.filter_streams(stream_name)]
231-
validated_stream_objects = [stream_object for stream_object in stream_objects if stream_object["name"] in validated_streams]
232-
return validated_streams, validated_stream_objects
241+
return {stream_name: sobject_options for stream_name, sobject_options in stream_objects.items() if stream_name in validated_streams}
233242

234243
@default_backoff_handler(max_tries=5, factor=15)
235244
def _make_request(
@@ -261,20 +270,20 @@ def login(self):
261270
self.access_token = auth["access_token"]
262271
self.instance_url = auth["instance_url"]
263272

264-
def describe(self, sobject: str = None, stream_objects: List = None) -> Mapping[str, Any]:
273+
def describe(self, sobject: str = None, sobject_options: Mapping[str, Any] = None) -> Mapping[str, Any]:
265274
"""Describes all objects or a specific object"""
266275
headers = self._get_standard_headers()
267276

268277
endpoint = "sobjects" if not sobject else f"sobjects/{sobject}/describe"
269278

270279
url = f"{self.instance_url}/services/data/{self.version}/{endpoint}"
271280
resp = self._make_request("GET", url, headers=headers)
272-
if resp.status_code == 404:
273-
self.logger.error(f"Filtered stream objects: {stream_objects}")
281+
if resp.status_code == 404 and sobject:
282+
self.logger.error(f"not found a description for the sobject '{sobject}'. Sobject options: {sobject_options}")
274283
return resp.json()
275284

276-
def generate_schema(self, stream_name: str = None, stream_objects: List = None) -> Mapping[str, Any]:
277-
response = self.describe(stream_name, stream_objects)
285+
def generate_schema(self, stream_name: str = None, stream_options: Mapping[str, Any] = None) -> Mapping[str, Any]:
286+
response = self.describe(stream_name, stream_options)
278287
schema = {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "additionalProperties": True, "properties": {}}
279288
for field in response["fields"]:
280289
schema["properties"][field["name"]] = self.field_to_property_schema(field)

airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py

+6-8
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,18 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
3939
def generate_streams(
4040
cls,
4141
config: Mapping[str, Any],
42-
stream_names: List[str],
42+
stream_objects: Mapping[str, Any],
4343
sf_object: Salesforce,
4444
state: Mapping[str, Any] = None,
45-
stream_objects: List = None,
4645
) -> List[Stream]:
4746
""" "Generates a list of stream by their names. It can be used for different tests too"""
4847
authenticator = TokenAuthenticator(sf_object.access_token)
4948
streams = []
50-
for stream_name in stream_names:
51-
streams_kwargs = {}
49+
for stream_name, sobject_options in stream_objects.items():
50+
streams_kwargs = {"sobject_options": sobject_options}
5251
stream_state = state.get(stream_name, {}) if state else {}
5352

54-
selected_properties = sf_object.generate_schema(stream_name, stream_objects).get("properties", {})
53+
selected_properties = sf_object.generate_schema(stream_name, sobject_options).get("properties", {})
5554
# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
5655
properties_not_supported_by_bulk = {
5756
key: value for key, value in selected_properties.items() if value.get("format") == "base64" or "object" in value["type"]
@@ -63,7 +62,6 @@ def generate_streams(
6362
else:
6463
# Use BULK API
6564
full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream
66-
streams_kwargs["wait_timeout"] = config.get("wait_timeout")
6765

6866
json_schema = sf_object.generate_schema(stream_name, stream_objects)
6967
pk, replication_key = sf_object.get_pk_and_replication_key(json_schema)
@@ -77,8 +75,8 @@ def generate_streams(
7775

7876
def streams(self, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog = None, state: Mapping[str, Any] = None) -> List[Stream]:
7977
sf = self._get_sf_object(config)
80-
stream_names, stream_objects = sf.get_validated_streams(config=config, catalog=catalog)
81-
return self.generate_streams(config, stream_names, sf, state=state, stream_objects=stream_objects)
78+
stream_objects = sf.get_validated_streams(config=config, catalog=catalog)
79+
return self.generate_streams(config, stream_objects, sf, state=state)
8280

8381
def read(
8482
self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None

airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.json

+28-16
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44
"$schema": "http://json-schema.org/draft-07/schema#",
55
"title": "Salesforce Source Spec",
66
"type": "object",
7-
"required": ["client_id", "client_secret", "refresh_token"],
7+
"required": [
8+
"client_id",
9+
"client_secret",
10+
"refresh_token"
11+
],
812
"additionalProperties": true,
913
"properties": {
1014
"auth_type": {
@@ -33,7 +37,10 @@
3337
"description": "Date in the format 2017-01-25. Any data before this date will not be replicated. This field uses the \"updated\" field if available, otherwise the \"created\" fields if they are available for a stream. If not set, then by default all your data is replicated.",
3438
"type": "string",
3539
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z|[0-9]{4}-[0-9]{2}-[0-9]{2}$",
36-
"examples": ["2021-07-25", "2021-07-25T00:00:00Z"]
40+
"examples": [
41+
"2021-07-25",
42+
"2021-07-25T00:00:00Z"
43+
]
3744
},
3845
"is_sandbox": {
3946
"title": "Sandbox",
@@ -45,7 +52,10 @@
4552
"type": "array",
4653
"items": {
4754
"type": "object",
48-
"required": ["criteria", "value"],
55+
"required": [
56+
"criteria",
57+
"value"
58+
],
4959
"properties": {
5060
"criteria": {
5161
"type": "string",
@@ -70,20 +80,14 @@
7080
},
7181
"title": "Streams filter criteria",
7282
"description": "Add selection criteria for streams to get only streams that are relevant to you"
73-
},
74-
"wait_timeout": {
75-
"title": "Response Waiting Time",
76-
"description": "Maximum wait time of Salesforce responses in minutes. This option is used for the BULK mode only. The default wait time of the Parent Batch in the Bulk Mode to wait for all the batches to finish processing is 20 minutes.",
77-
"type": "integer",
78-
"minimum": 5,
79-
"maximum": 60,
80-
"default": 10
8183
}
8284
}
8385
},
8486
"advanced_auth": {
8587
"auth_flow_type": "oauth2.0",
86-
"predicate_key": ["auth_type"],
88+
"predicate_key": [
89+
"auth_type"
90+
],
8791
"predicate_value": "Client",
8892
"oauth_config_specification": {
8993
"oauth_user_input_from_connector_config_specification": {
@@ -92,7 +96,9 @@
9296
"properties": {
9397
"is_sandbox": {
9498
"type": "boolean",
95-
"path_in_connector_config": ["is_sandbox"]
99+
"path_in_connector_config": [
100+
"is_sandbox"
101+
]
96102
}
97103
}
98104
},
@@ -102,7 +108,9 @@
102108
"properties": {
103109
"refresh_token": {
104110
"type": "string",
105-
"path_in_connector_config": ["refresh_token"]
111+
"path_in_connector_config": [
112+
"refresh_token"
113+
]
106114
}
107115
}
108116
},
@@ -124,11 +132,15 @@
124132
"properties": {
125133
"client_id": {
126134
"type": "string",
127-
"path_in_connector_config": ["client_id"]
135+
"path_in_connector_config": [
136+
"client_id"
137+
]
128138
},
129139
"client_secret": {
130140
"type": "string",
131-
"path_in_connector_config": ["client_secret"]
141+
"path_in_connector_config": [
142+
"client_secret"
143+
]
132144
}
133145
}
134146
}

0 commit comments

Comments
 (0)