Skip to content

Commit 44a85a9

Browse files
authored
fix: inject oauth params when generating sync input (#22635)
* fix: inject oauth params when generating sync inpt * make it final * rm oauth injection from job creator factory * keep it * keep it 2 * test that we're actually using the oauth values * fmt
1 parent a70d6e8 commit 44a85a9

File tree

2 files changed

+46
-14
lines changed

2 files changed

+46
-14
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
99
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
1010

11+
import com.fasterxml.jackson.databind.JsonNode;
1112
import datadog.trace.api.Trace;
1213
import io.airbyte.api.client.AirbyteApiClient;
1314
import io.airbyte.api.client.generated.AttemptApi;
@@ -39,6 +40,7 @@
3940
import io.airbyte.config.persistence.ConfigRepository;
4041
import io.airbyte.metrics.lib.ApmTraceUtils;
4142
import io.airbyte.persistence.job.JobPersistence;
43+
import io.airbyte.persistence.job.factory.OAuthConfigSupplier;
4244
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
4345
import io.airbyte.persistence.job.models.Job;
4446
import io.airbyte.persistence.job.models.JobRunConfig;
@@ -63,19 +65,22 @@ public class GenerateInputActivityImpl implements GenerateInputActivity {
6365
private final AttemptApi attemptApi;
6466
private final StateApi stateApi;
6567
private final FeatureFlags featureFlags;
68+
private final OAuthConfigSupplier oAuthConfigSupplier;
6669

6770
private static final Logger LOGGER = LoggerFactory.getLogger(GenerateInputActivity.class);
6871

6972
public GenerateInputActivityImpl(final JobPersistence jobPersistence,
7073
final ConfigRepository configRepository,
7174
final StateApi stateApi,
7275
final AttemptApi attemptApi,
73-
final FeatureFlags featureFlags) {
76+
final FeatureFlags featureFlags,
77+
final OAuthConfigSupplier oAuthConfigSupplier) {
7478
this.jobPersistence = jobPersistence;
7579
this.configRepository = configRepository;
7680
this.stateApi = stateApi;
7781
this.attemptApi = attemptApi;
7882
this.featureFlags = featureFlags;
83+
this.oAuthConfigSupplier = oAuthConfigSupplier;
7984
}
8085

8186
private Optional<State> getCurrentConnectionState(final UUID connectionId) {
@@ -122,7 +127,11 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) {
122127
if (ConfigType.SYNC.equals(jobConfigType)) {
123128
config = job.getConfig().getSync();
124129
final SourceConnection source = configRepository.getSourceConnection(standardSync.getSourceId());
125-
attemptSyncConfig.setSourceConfiguration(source.getConfiguration());
130+
final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters(
131+
source.getSourceDefinitionId(),
132+
source.getWorkspaceId(),
133+
source.getConfiguration());
134+
attemptSyncConfig.setSourceConfiguration(sourceConfiguration);
126135
} else if (ConfigType.RESET_CONNECTION.equals(jobConfigType)) {
127136
final JobResetConnectionConfig resetConnection = job.getConfig().getResetConnection();
128137
final ResetSourceConfiguration resetSourceConfiguration = resetConnection.getResetSourceConfiguration();
@@ -156,7 +165,11 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) {
156165
final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt);
157166

158167
final DestinationConnection destination = configRepository.getDestinationConnection(standardSync.getDestinationId());
159-
attemptSyncConfig.setDestinationConfiguration(destination.getConfiguration());
168+
final JsonNode destinationConfiguration = oAuthConfigSupplier.injectDestinationOAuthParameters(
169+
destination.getDestinationDefinitionId(),
170+
destination.getWorkspaceId(),
171+
destination.getConfiguration());
172+
attemptSyncConfig.setDestinationConfiguration(destinationConfiguration);
160173

161174
final StandardSourceDefinition sourceDefinition =
162175
configRepository.getSourceDefinitionFromSource(standardSync.getSourceId());

airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityTest.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.airbyte.config.persistence.ConfigNotFoundException;
3636
import io.airbyte.config.persistence.ConfigRepository;
3737
import io.airbyte.persistence.job.JobPersistence;
38+
import io.airbyte.persistence.job.factory.OAuthConfigSupplier;
3839
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
3940
import io.airbyte.persistence.job.models.Job;
4041
import io.airbyte.persistence.job.models.JobRunConfig;
@@ -58,15 +59,21 @@ class GenerateInputActivityTest {
5859
static private JobPersistence jobPersistence;
5960
static private ConfigRepository configRepository;
6061
static private GenerateInputActivityImpl generateInputActivity;
62+
static private OAuthConfigSupplier oAuthConfigSupplier;
6163
static private Job job;
6264

6365
static private final JsonNode SOURCE_CONFIGURATION = Jsons.jsonNode(Map.of("source_key", "source_value"));
66+
static private final JsonNode SOURCE_CONFIG_WITH_OAUTH = Jsons.jsonNode(Map.of("source_key", "source_value", "oauth", "oauth_value"));
6467
static private final JsonNode DESTINATION_CONFIGURATION = Jsons.jsonNode(Map.of("destination_key", "destination_value"));
68+
static private final JsonNode DESTINATION_CONFIG_WITH_OAUTH =
69+
Jsons.jsonNode(Map.of("destination_key", "destination_value", "oauth", "oauth_value"));
6570
static private final State STATE = new State().withState(Jsons.jsonNode(Map.of("state_key", "state_value")));
6671

72+
static private final UUID WORKSPACE_ID = UUID.randomUUID();
6773
static private final long JOB_ID = 1;
6874
static private final int ATTEMPT_ID = 1;
6975
static private final UUID SOURCE_ID = UUID.randomUUID();
76+
static private final UUID DESTINATION_DEFINITION_ID = UUID.randomUUID();
7077
static private final UUID DESTINATION_ID = UUID.randomUUID();
7178
static private final UUID CONNECTION_ID = UUID.randomUUID();
7279

@@ -75,24 +82,26 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio
7582
final StateApi stateApi = mock(StateApi.class);
7683
final FeatureFlags featureFlags = mock(FeatureFlags.class);
7784

85+
oAuthConfigSupplier = mock(OAuthConfigSupplier.class);
7886
attemptApi = mock(AttemptApi.class);
7987
jobPersistence = mock(JobPersistence.class);
8088
configRepository = mock(ConfigRepository.class);
81-
generateInputActivity = new GenerateInputActivityImpl(jobPersistence, configRepository, stateApi, attemptApi, featureFlags);
89+
generateInputActivity = new GenerateInputActivityImpl(jobPersistence, configRepository, stateApi, attemptApi, featureFlags, oAuthConfigSupplier);
8290

8391
job = mock(Job.class);
8492

8593
when(jobPersistence.getJob(JOB_ID)).thenReturn(job);
8694

87-
final UUID destinationDefinitionId = UUID.randomUUID();
88-
8995
final DestinationConnection destinationConnection = new DestinationConnection()
9096
.withDestinationId(DESTINATION_ID)
91-
.withDestinationDefinitionId(destinationDefinitionId)
97+
.withWorkspaceId(WORKSPACE_ID)
98+
.withDestinationDefinitionId(DESTINATION_DEFINITION_ID)
9299
.withConfiguration(DESTINATION_CONFIGURATION);
93100
when(configRepository.getDestinationConnection(DESTINATION_ID)).thenReturn(destinationConnection);
94-
when(configRepository.getStandardDestinationDefinition(destinationDefinitionId)).thenReturn(mock(StandardDestinationDefinition.class));
101+
when(configRepository.getStandardDestinationDefinition(DESTINATION_DEFINITION_ID)).thenReturn(mock(StandardDestinationDefinition.class));
95102
when(configRepository.getSourceDefinitionFromSource(SOURCE_ID)).thenReturn(mock(StandardSourceDefinition.class));
103+
when(oAuthConfigSupplier.injectDestinationOAuthParameters(DESTINATION_DEFINITION_ID, WORKSPACE_ID, DESTINATION_CONFIGURATION))
104+
.thenReturn(DESTINATION_CONFIG_WITH_OAUTH);
96105

97106
final StandardSync standardSync = new StandardSync()
98107
.withSourceId(SOURCE_ID)
@@ -109,10 +118,15 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio
109118
void testGetSyncWorkflowInput() throws JsonValidationException, ConfigNotFoundException, IOException, ApiException {
110119
final SyncInput syncInput = new SyncInput(ATTEMPT_ID, JOB_ID);
111120

121+
final UUID sourceDefinitionId = UUID.randomUUID();
112122
final SourceConnection sourceConnection = new SourceConnection()
113123
.withSourceId(SOURCE_ID)
124+
.withSourceDefinitionId(sourceDefinitionId)
125+
.withWorkspaceId(WORKSPACE_ID)
114126
.withConfiguration(SOURCE_CONFIGURATION);
115127
when(configRepository.getSourceConnection(SOURCE_ID)).thenReturn(sourceConnection);
128+
when(oAuthConfigSupplier.injectSourceOAuthParameters(sourceDefinitionId, WORKSPACE_ID, SOURCE_CONFIGURATION))
129+
.thenReturn(SOURCE_CONFIG_WITH_OAUTH);
116130

117131
final JobSyncConfig jobSyncConfig = new JobSyncConfig()
118132
.withWorkspaceId(UUID.randomUUID())
@@ -131,8 +145,8 @@ void testGetSyncWorkflowInput() throws JsonValidationException, ConfigNotFoundEx
131145
.withWorkspaceId(jobSyncConfig.getWorkspaceId())
132146
.withSourceId(SOURCE_ID)
133147
.withDestinationId(DESTINATION_ID)
134-
.withSourceConfiguration(SOURCE_CONFIGURATION)
135-
.withDestinationConfiguration(DESTINATION_CONFIGURATION)
148+
.withSourceConfiguration(SOURCE_CONFIG_WITH_OAUTH)
149+
.withDestinationConfiguration(DESTINATION_CONFIG_WITH_OAUTH)
136150
.withState(STATE)
137151
.withCatalog(jobSyncConfig.getConfiguredAirbyteCatalog())
138152
.withWorkspaceId(jobSyncConfig.getWorkspaceId());
@@ -161,10 +175,13 @@ void testGetSyncWorkflowInput() throws JsonValidationException, ConfigNotFoundEx
161175
assertEquals(expectedGeneratedJobInput, generatedJobInput);
162176

163177
final AttemptSyncConfig expectedAttemptSyncConfig = new AttemptSyncConfig()
164-
.withSourceConfiguration(SOURCE_CONFIGURATION)
165-
.withDestinationConfiguration(DESTINATION_CONFIGURATION)
178+
.withSourceConfiguration(SOURCE_CONFIG_WITH_OAUTH)
179+
.withDestinationConfiguration(DESTINATION_CONFIG_WITH_OAUTH)
166180
.withState(STATE);
167181

182+
verify(oAuthConfigSupplier).injectSourceOAuthParameters(sourceDefinitionId, WORKSPACE_ID, SOURCE_CONFIGURATION);
183+
verify(oAuthConfigSupplier).injectDestinationOAuthParameters(DESTINATION_DEFINITION_ID, WORKSPACE_ID, DESTINATION_CONFIGURATION);
184+
168185
verify(attemptApi).saveSyncConfig(new SaveAttemptSyncConfigRequestBody()
169186
.jobId(JOB_ID)
170187
.attemptNumber(ATTEMPT_ID)
@@ -192,7 +209,7 @@ void testGetResetSyncWorkflowInput() throws IOException, ApiException {
192209
.withSourceId(SOURCE_ID)
193210
.withDestinationId(DESTINATION_ID)
194211
.withSourceConfiguration(Jsons.emptyObject())
195-
.withDestinationConfiguration(DESTINATION_CONFIGURATION)
212+
.withDestinationConfiguration(DESTINATION_CONFIG_WITH_OAUTH)
196213
.withState(STATE)
197214
.withCatalog(jobResetConfig.getConfiguredAirbyteCatalog())
198215
.withWorkspaceId(jobResetConfig.getWorkspaceId());
@@ -222,9 +239,11 @@ void testGetResetSyncWorkflowInput() throws IOException, ApiException {
222239

223240
final AttemptSyncConfig expectedAttemptSyncConfig = new AttemptSyncConfig()
224241
.withSourceConfiguration(Jsons.emptyObject())
225-
.withDestinationConfiguration(DESTINATION_CONFIGURATION)
242+
.withDestinationConfiguration(DESTINATION_CONFIG_WITH_OAUTH)
226243
.withState(STATE);
227244

245+
verify(oAuthConfigSupplier).injectDestinationOAuthParameters(DESTINATION_DEFINITION_ID, WORKSPACE_ID, DESTINATION_CONFIGURATION);
246+
228247
verify(attemptApi).saveSyncConfig(new SaveAttemptSyncConfigRequestBody()
229248
.jobId(JOB_ID)
230249
.attemptNumber(ATTEMPT_ID)

0 commit comments

Comments
 (0)