Skip to content

fix: inject oauth params when generating sync input #22635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;

import com.fasterxml.jackson.databind.JsonNode;
import datadog.trace.api.Trace;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.AttemptApi;
Expand Down Expand Up @@ -39,6 +40,7 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.factory.OAuthConfigSupplier;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.persistence.job.models.JobRunConfig;
Expand All @@ -63,19 +65,22 @@ public class GenerateInputActivityImpl implements GenerateInputActivity {
private final AttemptApi attemptApi;
private final StateApi stateApi;
private final FeatureFlags featureFlags;
private final OAuthConfigSupplier oAuthConfigSupplier;

private static final Logger LOGGER = LoggerFactory.getLogger(GenerateInputActivity.class);

public GenerateInputActivityImpl(final JobPersistence jobPersistence,
final ConfigRepository configRepository,
final StateApi stateApi,
final AttemptApi attemptApi,
final FeatureFlags featureFlags) {
final FeatureFlags featureFlags,
final OAuthConfigSupplier oAuthConfigSupplier) {
this.jobPersistence = jobPersistence;
this.configRepository = configRepository;
this.stateApi = stateApi;
this.attemptApi = attemptApi;
this.featureFlags = featureFlags;
this.oAuthConfigSupplier = oAuthConfigSupplier;
}

private Optional<State> getCurrentConnectionState(final UUID connectionId) {
Expand Down Expand Up @@ -122,7 +127,11 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) {
if (ConfigType.SYNC.equals(jobConfigType)) {
config = job.getConfig().getSync();
final SourceConnection source = configRepository.getSourceConnection(standardSync.getSourceId());
attemptSyncConfig.setSourceConfiguration(source.getConfiguration());
final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters(
source.getSourceDefinitionId(),
source.getWorkspaceId(),
source.getConfiguration());
attemptSyncConfig.setSourceConfiguration(sourceConfiguration);
} else if (ConfigType.RESET_CONNECTION.equals(jobConfigType)) {
final JobResetConnectionConfig resetConnection = job.getConfig().getResetConnection();
final ResetSourceConfiguration resetSourceConfiguration = resetConnection.getResetSourceConfiguration();
Expand Down Expand Up @@ -156,7 +165,11 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) {
final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt);

final DestinationConnection destination = configRepository.getDestinationConnection(standardSync.getDestinationId());
attemptSyncConfig.setDestinationConfiguration(destination.getConfiguration());
final JsonNode destinationConfiguration = oAuthConfigSupplier.injectDestinationOAuthParameters(
destination.getDestinationDefinitionId(),
destination.getWorkspaceId(),
destination.getConfiguration());
attemptSyncConfig.setDestinationConfiguration(destinationConfiguration);

final StandardSourceDefinition sourceDefinition =
configRepository.getSourceDefinitionFromSource(standardSync.getSourceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.factory.OAuthConfigSupplier;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.persistence.job.models.JobRunConfig;
Expand All @@ -58,15 +59,18 @@ class GenerateInputActivityTest {
static private JobPersistence jobPersistence;
static private ConfigRepository configRepository;
static private GenerateInputActivityImpl generateInputActivity;
static private OAuthConfigSupplier oAuthConfigSupplier;
static private Job job;

static private final JsonNode SOURCE_CONFIGURATION = Jsons.jsonNode(Map.of("source_key", "source_value"));
static private final JsonNode DESTINATION_CONFIGURATION = Jsons.jsonNode(Map.of("destination_key", "destination_value"));
static private final State STATE = new State().withState(Jsons.jsonNode(Map.of("state_key", "state_value")));

static private final UUID WORKSPACE_ID = UUID.randomUUID();
static private final long JOB_ID = 1;
static private final int ATTEMPT_ID = 1;
static private final UUID SOURCE_ID = UUID.randomUUID();
static private final UUID DESTINATION_DEFINITION_ID = UUID.randomUUID();
static private final UUID DESTINATION_ID = UUID.randomUUID();
static private final UUID CONNECTION_ID = UUID.randomUUID();

Expand All @@ -75,24 +79,26 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio
final StateApi stateApi = mock(StateApi.class);
final FeatureFlags featureFlags = mock(FeatureFlags.class);

oAuthConfigSupplier = mock(OAuthConfigSupplier.class);
attemptApi = mock(AttemptApi.class);
jobPersistence = mock(JobPersistence.class);
configRepository = mock(ConfigRepository.class);
generateInputActivity = new GenerateInputActivityImpl(jobPersistence, configRepository, stateApi, attemptApi, featureFlags);
generateInputActivity = new GenerateInputActivityImpl(jobPersistence, configRepository, stateApi, attemptApi, featureFlags, oAuthConfigSupplier);

job = mock(Job.class);

when(jobPersistence.getJob(JOB_ID)).thenReturn(job);

final UUID destinationDefinitionId = UUID.randomUUID();

final DestinationConnection destinationConnection = new DestinationConnection()
.withDestinationId(DESTINATION_ID)
.withDestinationDefinitionId(destinationDefinitionId)
.withWorkspaceId(WORKSPACE_ID)
.withDestinationDefinitionId(DESTINATION_DEFINITION_ID)
.withConfiguration(DESTINATION_CONFIGURATION);
when(configRepository.getDestinationConnection(DESTINATION_ID)).thenReturn(destinationConnection);
when(configRepository.getStandardDestinationDefinition(destinationDefinitionId)).thenReturn(mock(StandardDestinationDefinition.class));
when(configRepository.getStandardDestinationDefinition(DESTINATION_DEFINITION_ID)).thenReturn(mock(StandardDestinationDefinition.class));
when(configRepository.getSourceDefinitionFromSource(SOURCE_ID)).thenReturn(mock(StandardSourceDefinition.class));
when(oAuthConfigSupplier.injectDestinationOAuthParameters(DESTINATION_DEFINITION_ID, WORKSPACE_ID, DESTINATION_CONFIGURATION))
.thenReturn(DESTINATION_CONFIGURATION);

final StandardSync standardSync = new StandardSync()
.withSourceId(SOURCE_ID)
Expand All @@ -109,10 +115,15 @@ void setUp() throws IOException, JsonValidationException, ConfigNotFoundExceptio
void testGetSyncWorkflowInput() throws JsonValidationException, ConfigNotFoundException, IOException, ApiException {
final SyncInput syncInput = new SyncInput(ATTEMPT_ID, JOB_ID);

final UUID sourceDefinitionId = UUID.randomUUID();
final SourceConnection sourceConnection = new SourceConnection()
.withSourceId(SOURCE_ID)
.withSourceDefinitionId(sourceDefinitionId)
.withWorkspaceId(WORKSPACE_ID)
.withConfiguration(SOURCE_CONFIGURATION);
when(configRepository.getSourceConnection(SOURCE_ID)).thenReturn(sourceConnection);
when(oAuthConfigSupplier.injectSourceOAuthParameters(sourceDefinitionId, WORKSPACE_ID, SOURCE_CONFIGURATION))
.thenReturn(SOURCE_CONFIGURATION);

final JobSyncConfig jobSyncConfig = new JobSyncConfig()
.withWorkspaceId(UUID.randomUUID())
Expand Down Expand Up @@ -165,6 +176,9 @@ void testGetSyncWorkflowInput() throws JsonValidationException, ConfigNotFoundEx
.withDestinationConfiguration(DESTINATION_CONFIGURATION)
.withState(STATE);

verify(oAuthConfigSupplier).injectSourceOAuthParameters(sourceDefinitionId, WORKSPACE_ID, SOURCE_CONFIGURATION);
verify(oAuthConfigSupplier).injectDestinationOAuthParameters(DESTINATION_DEFINITION_ID, WORKSPACE_ID, DESTINATION_CONFIGURATION);

verify(attemptApi).saveSyncConfig(new SaveAttemptSyncConfigRequestBody()
.jobId(JOB_ID)
.attemptNumber(ATTEMPT_ID)
Expand Down Expand Up @@ -225,6 +239,8 @@ void testGetResetSyncWorkflowInput() throws IOException, ApiException {
.withDestinationConfiguration(DESTINATION_CONFIGURATION)
.withState(STATE);

verify(oAuthConfigSupplier).injectDestinationOAuthParameters(DESTINATION_DEFINITION_ID, WORKSPACE_ID, DESTINATION_CONFIGURATION);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could this test ensure that a secret was decrypted?

Copy link
Contributor Author

@pedroslopez pedroslopez Feb 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test to actually check that we're using the config returned by the injection method in 716fe1c. Internals of how the injection works left up to unit tests for the OAuthConfigSupplier.


verify(attemptApi).saveSyncConfig(new SaveAttemptSyncConfigRequestBody()
.jobId(JOB_ID)
.attemptNumber(ATTEMPT_ID)
Expand Down