Skip to content

Commit 6c3555b

Browse files
author
Dmytro
authored
Salesforce oAuth backend. (#7212)
1 parent 7ae250d commit 6c3555b

File tree

4 files changed

+329
-0
lines changed

4 files changed

+329
-0
lines changed

airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.google.common.collect.ImmutableMap;
88
import io.airbyte.config.persistence.ConfigRepository;
99
import io.airbyte.oauth.flows.AsanaOAuthFlow;
10+
import io.airbyte.oauth.flows.SalesforceOAuthFlow;
1011
import io.airbyte.oauth.flows.FacebookMarketingOAuthFlow;
1112
import io.airbyte.oauth.flows.TrelloOAuthFlow;
1213
import io.airbyte.oauth.flows.google.GoogleAdsOAuthFlow;
@@ -25,6 +26,7 @@ public OAuthImplementationFactory(final ConfigRepository configRepository) {
2526
.put("airbyte/source-google-ads", new GoogleAdsOAuthFlow(configRepository))
2627
.put("airbyte/source-google-analytics-v4", new GoogleAnalyticsOAuthFlow(configRepository))
2728
.put("airbyte/source-google-search-console", new GoogleSearchConsoleOAuthFlow(configRepository))
29+
.put("airbyte/source-salesforce", new SalesforceOAuthFlow(configRepository))
2830
.put("airbyte/source-trello", new TrelloOAuthFlow(configRepository))
2931
.build();
3032
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.oauth.flows;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import com.google.common.annotations.VisibleForTesting;
9+
import com.google.common.collect.ImmutableMap;
10+
import io.airbyte.commons.json.Jsons;
11+
import io.airbyte.config.persistence.ConfigRepository;
12+
import io.airbyte.oauth.BaseOAuthFlow;
13+
import java.io.IOException;
14+
import java.net.URISyntaxException;
15+
import java.net.http.HttpClient;
16+
import java.util.Map;
17+
import java.util.UUID;
18+
import java.util.function.Supplier;
19+
import org.apache.http.client.utils.URIBuilder;
20+
21+
/**
22+
* Following docs from https://help.salesforce.com/s/articleView?language=en_US&id=sf.remoteaccess_oauth_web_server_flow.htm
23+
*/
24+
public class SalesforceOAuthFlow extends BaseOAuthFlow {
25+
26+
private static final String AUTHORIZE_URL = "https://login.salesforce.com/services/oauth2/authorize";
27+
private static final String ACCESS_TOKEN_URL = "https://login.salesforce.com/services/oauth2/token";
28+
29+
public SalesforceOAuthFlow(ConfigRepository configRepository) {
30+
super(configRepository);
31+
}
32+
33+
@VisibleForTesting
34+
SalesforceOAuthFlow(ConfigRepository configRepository, HttpClient httpClient, Supplier<String> stateSupplier) {
35+
super(configRepository, httpClient, stateSupplier);
36+
}
37+
38+
@Override
39+
protected String formatConsentUrl(UUID definitionId, String clientId, String redirectUrl) throws IOException {
40+
try {
41+
return new URIBuilder(AUTHORIZE_URL)
42+
.addParameter("client_id", clientId)
43+
.addParameter("redirect_uri", redirectUrl)
44+
.addParameter("response_type", "code")
45+
.addParameter("state", getState())
46+
.build().toString();
47+
} catch (URISyntaxException e) {
48+
throw new IOException("Failed to format Consent URL for OAuth flow", e);
49+
}
50+
}
51+
52+
@Override
53+
protected String getAccessTokenUrl() {
54+
return ACCESS_TOKEN_URL;
55+
}
56+
57+
@Override
58+
protected Map<String, String> getAccessTokenQueryParameters(String clientId, String clientSecret, String authCode, String redirectUrl) {
59+
return ImmutableMap.<String, String>builder()
60+
.putAll(super.getAccessTokenQueryParameters(clientId, clientSecret, authCode, redirectUrl))
61+
.put("grant_type", "authorization_code")
62+
.build();
63+
}
64+
65+
@Override
66+
protected Map<String, Object> extractRefreshToken(JsonNode data) throws IOException {
67+
System.out.println(Jsons.serialize(data));
68+
if (data.has("refresh_token")) {
69+
final String refreshToken = data.get("refresh_token").asText();
70+
return Map.of("refresh_token", refreshToken);
71+
} else {
72+
throw new IOException(String.format("Missing 'refresh_token' in query params from %s", ACCESS_TOKEN_URL));
73+
}
74+
}
75+
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.oauth.flows;
6+
7+
import static org.junit.jupiter.api.Assertions.assertTrue;
8+
import static org.mockito.Mockito.mock;
9+
import static org.mockito.Mockito.when;
10+
11+
import com.fasterxml.jackson.databind.JsonNode;
12+
import com.google.common.collect.ImmutableMap;
13+
import com.sun.net.httpserver.HttpExchange;
14+
import com.sun.net.httpserver.HttpHandler;
15+
import com.sun.net.httpserver.HttpServer;
16+
import io.airbyte.commons.json.Jsons;
17+
import io.airbyte.config.SourceOAuthParameter;
18+
import io.airbyte.config.persistence.ConfigNotFoundException;
19+
import io.airbyte.config.persistence.ConfigRepository;
20+
import io.airbyte.validation.json.JsonValidationException;
21+
import java.io.IOException;
22+
import java.io.OutputStream;
23+
import java.net.InetSocketAddress;
24+
import java.nio.file.Files;
25+
import java.nio.file.Path;
26+
import java.util.HashMap;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.UUID;
30+
import org.junit.jupiter.api.AfterEach;
31+
import org.junit.jupiter.api.BeforeEach;
32+
import org.junit.jupiter.api.Test;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
public class SalesforceOAuthFlowIntegrationTest {
37+
38+
private static final Logger LOGGER = LoggerFactory.getLogger(SalesforceOAuthFlowIntegrationTest.class);
39+
private static final String REDIRECT_URL = "http://localhost:8000/code";
40+
private static final Path CREDENTIALS_PATH = Path.of("secrets/salesforce.json");
41+
42+
private ConfigRepository configRepository;
43+
private SalesforceOAuthFlow salesforceOAuthFlow;
44+
private HttpServer server;
45+
private ServerHandler serverHandler;
46+
47+
@BeforeEach
48+
public void setup() throws IOException {
49+
if (!Files.exists(CREDENTIALS_PATH)) {
50+
throw new IllegalStateException(
51+
"Must provide path to a oauth credentials file.");
52+
}
53+
configRepository = mock(ConfigRepository.class);
54+
salesforceOAuthFlow = new SalesforceOAuthFlow(configRepository);
55+
56+
server = HttpServer.create(new InetSocketAddress(8000), 0);
57+
server.setExecutor(null); // creates a default executor
58+
server.start();
59+
serverHandler = new ServerHandler("code");
60+
server.createContext("/code", serverHandler);
61+
}
62+
63+
@AfterEach
64+
void tearDown() {
65+
server.stop(1);
66+
}
67+
68+
@Test
69+
public void testFullSalesforceOAuthFlow() throws InterruptedException, ConfigNotFoundException, IOException, JsonValidationException {
70+
int limit = 20;
71+
final UUID workspaceId = UUID.randomUUID();
72+
final UUID definitionId = UUID.randomUUID();
73+
final String fullConfigAsString = new String(Files.readAllBytes(CREDENTIALS_PATH));
74+
final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString);
75+
final String clientId = credentialsJson.get("client_id").asText();
76+
when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter()
77+
.withOauthParameterId(UUID.randomUUID())
78+
.withSourceDefinitionId(definitionId)
79+
.withWorkspaceId(workspaceId)
80+
.withConfiguration(Jsons.jsonNode(ImmutableMap.builder()
81+
.put("client_id", clientId)
82+
.put("client_secret", credentialsJson.get("client_secret").asText())
83+
.build()))));
84+
final String url = salesforceOAuthFlow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL);
85+
LOGGER.info("Waiting for user consent at: {}", url);
86+
// TODO: To automate, start a selenium job to navigate to the Consent URL and click on allowing
87+
// access...
88+
while (!serverHandler.isSucceeded() && limit > 0) {
89+
Thread.sleep(1000);
90+
limit -= 1;
91+
}
92+
assertTrue(serverHandler.isSucceeded(), "Failed to get User consent on time");
93+
final Map<String, Object> params = salesforceOAuthFlow.completeSourceOAuth(workspaceId, definitionId,
94+
Map.of("code", serverHandler.getParamValue()), REDIRECT_URL);
95+
LOGGER.info("Response from completing OAuth Flow is: {}", params.toString());
96+
assertTrue(params.containsKey("refresh_token"));
97+
assertTrue(params.get("refresh_token").toString().length() > 0);
98+
}
99+
100+
static class ServerHandler implements HttpHandler {
101+
102+
final private String expectedParam;
103+
private Map responseQuery;
104+
private String paramValue;
105+
private boolean succeeded;
106+
107+
public ServerHandler(String expectedParam) {
108+
this.expectedParam = expectedParam;
109+
this.paramValue = "";
110+
this.succeeded = false;
111+
}
112+
113+
public boolean isSucceeded() {
114+
return succeeded;
115+
}
116+
117+
public String getParamValue() {
118+
return paramValue;
119+
}
120+
121+
public Map getResponseQuery() {
122+
return responseQuery;
123+
}
124+
125+
@Override
126+
public void handle(HttpExchange t) {
127+
final String query = t.getRequestURI().getQuery();
128+
LOGGER.info("Received query: '{}'", query);
129+
final Map<String, String> data;
130+
try {
131+
data = deserialize(query);
132+
final String response;
133+
if (data != null && data.containsKey(expectedParam)) {
134+
paramValue = data.get(expectedParam);
135+
response = String.format("Successfully extracted %s:\n'%s'\nTest should be continuing the OAuth Flow to retrieve the refresh_token...",
136+
expectedParam, paramValue);
137+
responseQuery = data;
138+
LOGGER.info(response);
139+
t.sendResponseHeaders(200, response.length());
140+
succeeded = true;
141+
} else {
142+
response = String.format("Unable to parse query params from redirected url: %s", query);
143+
t.sendResponseHeaders(500, response.length());
144+
}
145+
final OutputStream os = t.getResponseBody();
146+
os.write(response.getBytes());
147+
os.close();
148+
} catch (RuntimeException | IOException e) {
149+
LOGGER.error("Failed to parse from body {}", query, e);
150+
}
151+
}
152+
153+
private static Map<String, String> deserialize(String query) {
154+
if (query == null) {
155+
return null;
156+
}
157+
final Map<String, String> result = new HashMap<>();
158+
for (String param : query.split("&")) {
159+
String[] entry = param.split("=", 2);
160+
if (entry.length > 1) {
161+
result.put(entry[0], entry[1]);
162+
} else {
163+
result.put(entry[0], "");
164+
}
165+
}
166+
return result;
167+
}
168+
169+
}
170+
171+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.oauth.flows;
6+
7+
import static org.junit.jupiter.api.Assertions.assertEquals;
8+
import static org.mockito.ArgumentMatchers.any;
9+
import static org.mockito.Mockito.mock;
10+
import static org.mockito.Mockito.when;
11+
12+
import com.google.common.collect.ImmutableMap;
13+
import io.airbyte.commons.json.Jsons;
14+
import io.airbyte.config.SourceOAuthParameter;
15+
import io.airbyte.config.persistence.ConfigNotFoundException;
16+
import io.airbyte.config.persistence.ConfigRepository;
17+
import io.airbyte.validation.json.JsonValidationException;
18+
import java.io.IOException;
19+
import java.net.http.HttpClient;
20+
import java.net.http.HttpResponse;
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.UUID;
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
26+
27+
public class SalesforceOAuthFlowTest {
28+
29+
private UUID workspaceId;
30+
private UUID definitionId;
31+
private ConfigRepository configRepository;
32+
private SalesforceOAuthFlow salesforceoAuthFlow;
33+
private HttpClient httpClient;
34+
35+
private static final String REDIRECT_URL = "https://airbyte.io";
36+
37+
private static String getConstantState() {
38+
return "state";
39+
}
40+
41+
@BeforeEach
42+
public void setup() throws IOException, JsonValidationException {
43+
workspaceId = UUID.randomUUID();
44+
definitionId = UUID.randomUUID();
45+
configRepository = mock(ConfigRepository.class);
46+
httpClient = mock(HttpClient.class);
47+
when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter()
48+
.withOauthParameterId(UUID.randomUUID())
49+
.withSourceDefinitionId(definitionId)
50+
.withWorkspaceId(workspaceId)
51+
.withConfiguration(Jsons.jsonNode(ImmutableMap.builder()
52+
.put("client_id", "test_client_id")
53+
.put("client_secret", "test_client_secret")
54+
.build()))));
55+
salesforceoAuthFlow = new SalesforceOAuthFlow(configRepository, httpClient, SalesforceOAuthFlowTest::getConstantState);
56+
57+
}
58+
59+
@Test
60+
public void testGetSourceConcentUrl() throws IOException, InterruptedException, ConfigNotFoundException {
61+
final String concentUrl =
62+
salesforceoAuthFlow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL);
63+
assertEquals(concentUrl,
64+
"https://login.salesforce.com/services/oauth2/authorize?client_id=test_client_id&redirect_uri=https%3A%2F%2Fairbyte.io&response_type=code&state=state");
65+
}
66+
67+
@Test
68+
public void testCompleteSourceOAuth() throws IOException, JsonValidationException, InterruptedException, ConfigNotFoundException {
69+
70+
Map<String, String> returnedCredentials = Map.of("refresh_token", "refresh_token_response");
71+
final HttpResponse response = mock(HttpResponse.class);
72+
when(response.body()).thenReturn(Jsons.serialize(returnedCredentials));
73+
when(httpClient.send(any(), any())).thenReturn(response);
74+
final Map<String, Object> queryParams = Map.of("code", "test_code");
75+
final Map<String, Object> actualQueryParams =
76+
salesforceoAuthFlow.completeSourceOAuth(workspaceId, definitionId, queryParams, REDIRECT_URL);
77+
assertEquals(Jsons.serialize(returnedCredentials), Jsons.serialize(actualQueryParams));
78+
}
79+
80+
}

0 commit comments

Comments
 (0)