Skip to content

Commit ce5c007

Browse files
authored
🎉 Salesforce Source: Support "Replicate Incremental Deletes" (#10454)
1 parent f82dd2d commit ce5c007

File tree

2 files changed

+166
-686
lines changed

2 files changed

+166
-686
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import base64
2+
import json
3+
from datetime import datetime
4+
from pathlib import Path
5+
6+
import pytest
7+
import requests
8+
from airbyte_cdk.models import SyncMode
9+
from source_salesforce.api import Salesforce
10+
from source_salesforce.source import SourceSalesforce
11+
12+
HERE = Path(__file__).parent
13+
14+
NOTE_CONTENT = "It's the note for integration test"
15+
UPDATED_NOTE_CONTENT = "It's the updated note for integration test"
16+
17+
18+
@pytest.fixture(scope="module")
19+
def input_sandbox_config():
20+
with open(HERE.parent / "secrets/config_sandbox.json", "r") as file:
21+
return json.loads(file.read())
22+
23+
24+
@pytest.fixture(scope="module")
25+
def sf(input_sandbox_config):
26+
sf = Salesforce(**input_sandbox_config)
27+
sf.login()
28+
return sf
29+
30+
31+
@pytest.fixture(scope="module")
32+
def stream_name():
33+
return "ContentNote"
34+
35+
36+
@pytest.fixture(scope="module")
37+
def stream(input_sandbox_config, stream_name, sf):
38+
return SourceSalesforce.generate_streams(input_sandbox_config, {stream_name: None}, sf)[0]
39+
40+
41+
def _encode_content(text):
42+
base64_bytes = base64.b64encode(text.encode('utf-8'))
43+
return base64_bytes.decode('utf-8')
44+
45+
46+
def create_note(stream, headers):
47+
url = stream.url_base + f"/services/data/{stream.sf_api.version}/sobjects/{stream.name}"
48+
note_data = {
49+
"Title": "Integration Test",
50+
"Content": _encode_content(NOTE_CONTENT)
51+
}
52+
return requests.post(url, headers=headers, json=note_data)
53+
54+
55+
def delete_note(stream, note_id, headers):
56+
url = stream.url_base + f"/services/data/{stream.sf_api.version}/sobjects/{stream.name}/{note_id}"
57+
return requests.delete(url, headers=headers)
58+
59+
60+
def update_note(stream, note_id, headers):
61+
url = stream.url_base + f"/services/data/{stream.sf_api.version}/sobjects/{stream.name}/{note_id}"
62+
note_data = {
63+
"Content": _encode_content(UPDATED_NOTE_CONTENT)
64+
}
65+
return requests.patch(url, headers=headers, json=note_data)
66+
67+
68+
def get_stream_state():
69+
state_date = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
70+
return {"LastModifiedDate": state_date}
71+
72+
73+
def test_update_for_deleted_record(stream):
74+
headers = stream.authenticator.get_auth_header()
75+
response = create_note(stream, headers)
76+
assert response.status_code == 201, "Note was note created"
77+
78+
created_note_id = response.json()['id']
79+
80+
notes = set(record["Id"] for record in stream.read_records(sync_mode=None))
81+
assert created_note_id in notes, "No created note during the sync"
82+
83+
stream_state = get_stream_state()
84+
response = delete_note(stream, created_note_id, headers)
85+
assert response.status_code == 204, "Note was not deleted"
86+
87+
is_note_updated = False
88+
is_deleted = False
89+
for record in stream.read_records(sync_mode=SyncMode.incremental, stream_state=stream_state):
90+
if created_note_id == record["Id"]:
91+
is_note_updated = True
92+
is_deleted = record["IsDeleted"]
93+
break
94+
assert is_note_updated, "No deleted note during the sync"
95+
assert is_deleted, "Wrong field value for deleted note during the sync"
96+
97+
stream_state = get_stream_state()
98+
response = update_note(stream, created_note_id, headers)
99+
assert response.status_code == 404, "Note was updated, but should not"
100+
101+
notes = set(record["Id"] for record in stream.read_records(sync_mode=SyncMode.incremental, stream_state=stream_state))
102+
assert created_note_id not in notes, "Note was updated, but should not"
103+
104+
105+
def test_deleted_record(stream):
106+
headers = stream.authenticator.get_auth_header()
107+
response = create_note(stream, headers)
108+
assert response.status_code == 201, "Note was note created"
109+
110+
created_note_id = response.json()['id']
111+
112+
notes = set(record["Id"] for record in stream.read_records(sync_mode=None))
113+
assert created_note_id in notes, "No created note during the sync"
114+
115+
response = update_note(stream, created_note_id, headers)
116+
assert response.status_code == 204, "Note was not updated"
117+
118+
stream_state = get_stream_state()
119+
response = delete_note(stream, created_note_id, headers)
120+
assert response.status_code == 204, "Note was not deleted"
121+
122+
record = None
123+
for record in stream.read_records(sync_mode=SyncMode.incremental, stream_state=stream_state):
124+
if created_note_id == record["Id"]:
125+
break
126+
127+
assert record, "No updated note during the sync"
128+
assert record["IsDeleted"], "Wrong field value for deleted note during the sync"
129+
assert record["TextPreview"] == UPDATED_NOTE_CONTENT and record["TextPreview"] != NOTE_CONTENT, \
130+
"Note Content was not updated"

0 commit comments

Comments
 (0)