Skip to content

Save streams to reset in job config when creating reset job #13703

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ description: configuration of the reset source
type: object
additionalProperties: true
required:
- streamDescriptors
- streamsToReset
properties:
streamDescriptors:
streamsToReset:
type: array
items:
"$ref": StreamDescriptor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import io.airbyte.config.JobResetConnectionConfig;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.JobTypeResourceLimit.JobType;
import io.airbyte.config.ResetSourceConfiguration;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.DestinationSyncMode;
Expand Down Expand Up @@ -93,7 +95,8 @@ public Optional<Long> createSyncJob(final SourceConnection source,
public Optional<Long> createResetConnectionJob(final DestinationConnection destination,
final StandardSync standardSync,
final String destinationDockerImage,
final List<StandardSyncOperation> standardSyncOperations)
final List<StandardSyncOperation> standardSyncOperations,
final List<StreamDescriptor> streamsToReset)
throws IOException {
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = standardSync.getCatalog();
configuredAirbyteCatalog.getStreams().forEach(configuredAirbyteStream -> {
Expand All @@ -110,7 +113,8 @@ public Optional<Long> createResetConnectionJob(final DestinationConnection desti
.withConfiguredAirbyteCatalog(configuredAirbyteCatalog)
.withResourceRequirements(ResourceRequirementsUtils.getResourceRequirements(
standardSync.getResourceRequirements(),
workerResourceRequirements));
workerResourceRequirements))
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(streamsToReset));

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StreamDescriptor;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -41,13 +42,15 @@ Optional<Long> createSyncJob(SourceConnection source,
* @param destination db model representing where data goes
* @param standardSync sync options
* @param destinationDockerImage docker image to use for the destination
* @param streamsToReset
* @return the new job if no other conflicting job was running, otherwise empty
* @throws IOException if something wrong happens
*/
Optional<Long> createResetConnectionJob(DestinationConnection destination,
StandardSync standardSync,
String destinationDockerImage,
List<StandardSyncOperation> standardSyncOperations)
List<StandardSyncOperation> standardSyncOperations,
List<StreamDescriptor> streamsToReset)
throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import io.airbyte.config.JobTypeResourceLimit.JobType;
import io.airbyte.config.OperatorNormalization;
import io.airbyte.config.OperatorNormalization.Option;
import io.airbyte.config.ResetSourceConfiguration;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
Expand Down Expand Up @@ -57,6 +59,8 @@ public class DefaultJobCreatorTest {
private static final StandardSync STANDARD_SYNC;
private static final StandardSyncOperation STANDARD_SYNC_OPERATION;
private static final long JOB_ID = 12L;
private static final StreamDescriptor STREAM_DESCRIPTOR1 = new StreamDescriptor().withName("stream 1").withNamespace("namespace 1");
private static final StreamDescriptor STREAM_DESCRIPTOR2 = new StreamDescriptor().withName("stream 2").withNamespace("namespace 2");

private JobPersistence jobPersistence;
private ConfigRepository configRepository;
Expand Down Expand Up @@ -337,7 +341,8 @@ void testCreateResetConnectionJob() throws IOException {
.withDestinationDockerImage(DESTINATION_IMAGE_NAME)
.withConfiguredAirbyteCatalog(expectedCatalog)
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements);
.withResourceRequirements(workerResourceRequirements)
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)));

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
Expand All @@ -350,7 +355,8 @@ void testCreateResetConnectionJob() throws IOException {
DESTINATION_CONNECTION,
STANDARD_SYNC,
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION)).orElseThrow();
List.of(STANDARD_SYNC_OPERATION),
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)).orElseThrow();
assertEquals(JOB_ID, jobId);
}

Expand All @@ -371,7 +377,8 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
.withDestinationDockerImage(DESTINATION_IMAGE_NAME)
.withConfiguredAirbyteCatalog(expectedCatalog)
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements);
.withResourceRequirements(workerResourceRequirements)
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)));

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
Expand All @@ -384,7 +391,8 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
DESTINATION_CONNECTION,
STANDARD_SYNC,
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION)).isEmpty());
List.of(STANDARD_SYNC_OPERATION),
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)).isEmpty());
}

}
10 changes: 8 additions & 2 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.StreamResetPersistence;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
Expand Down Expand Up @@ -133,6 +134,7 @@ public class WorkerApp {
private final Optional<ContainerOrchestratorConfig> containerOrchestratorConfig;
private final JobNotifier jobNotifier;
private final JobTracker jobTracker;
private final StreamResetPersistence streamResetPersistence;

public void start() {
final Map<String, String> mdc = MDC.getCopyOfContextMap();
Expand Down Expand Up @@ -190,7 +192,8 @@ private void registerConnectionManager(final WorkerFactory factory) {
jobNotifier,
jobTracker,
configRepository,
jobCreator),
jobCreator,
streamResetPersistence),
new ConfigFetchActivityImpl(configRepository, jobPersistence, configs, () -> Instant.now().getEpochSecond()),
new ConnectionDeletionActivityImpl(connectionHelper),
new CheckConnectionActivityImpl(
Expand Down Expand Up @@ -436,6 +439,8 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf

final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence, trackingClient);

final StreamResetPersistence streamResetPersistence = new StreamResetPersistence(configDatabase);

new WorkerApp(
workspaceRoot,
defaultProcessFactory,
Expand All @@ -462,7 +467,8 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf
connectionHelper,
containerOrchestratorConfig,
jobNotifier,
jobTracker).start();
jobTracker,
streamResetPersistence).start();
}

public static void main(final String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.StreamResetPersistence;
import io.airbyte.db.instance.configs.jooq.generated.enums.ReleaseStage;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricTags;
Expand Down Expand Up @@ -58,6 +60,7 @@ public class JobCreationAndStatusUpdateActivityImpl implements JobCreationAndSta
private final JobTracker jobTracker;
private final ConfigRepository configRepository;
private final JobCreator jobCreator;
private final StreamResetPersistence streamResetPersistence;

@Override
public JobCreationOutput createNewJob(final JobCreationInput input) {
Expand All @@ -83,8 +86,9 @@ public JobCreationOutput createNewJob(final JobCreationInput input) {
standardSyncOperations.add(standardSyncOperation);
}

final List<StreamDescriptor> streamsToReset = streamResetPersistence.getStreamResets(input.getConnectionId());
final Optional<Long> jobIdOptional =
jobCreator.createResetConnectionJob(destination, standardSync, destinationImageName, standardSyncOperations);
jobCreator.createResetConnectionJob(destination, standardSync, destinationImageName, standardSyncOperations, streamsToReset);

final long jobId = jobIdOptional.isEmpty()
? jobPersistence.getLastReplicationJob(standardSync.getConnectionId()).orElseThrow(() -> new RuntimeException("No job available")).getId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,32 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;

import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobOutput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.StreamResetPersistence;
import io.airbyte.scheduler.models.Attempt;
import io.airbyte.scheduler.models.AttemptStatus;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.persistence.JobCreator;
import io.airbyte.scheduler.persistence.JobNotifier;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory;
Expand All @@ -50,6 +56,7 @@
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DisplayName;
Expand Down Expand Up @@ -90,13 +97,27 @@ public class JobCreationAndStatusUpdateActivityTest {
@Mock
private ConfigRepository mConfigRepository;

@Mock
private JobCreator mJobCreator;

@Mock
private StreamResetPersistence mStreamResetPersistence;

@InjectMocks
private JobCreationAndStatusUpdateActivityImpl jobCreationAndStatusUpdateActivity;

private static final UUID CONNECTION_ID = UUID.randomUUID();
private static final UUID DESTINATION_ID = UUID.randomUUID();
private static final UUID DESTINATION_DEFINITION_ID = UUID.randomUUID();
private static final String DOCKER_REPOSITORY = "docker-repo";
private static final String DOCKER_IMAGE_TAG = "0.0.1";
private static final String DOCKER_IMAGE_NAME = DockerUtils.getTaggedImageName(DOCKER_REPOSITORY, DOCKER_IMAGE_TAG);
private static final long JOB_ID = 123L;
private static final int ATTEMPT_ID = 0;
private static final int ATTEMPT_NUMBER = 1;
private static final StreamDescriptor STREAM_DESCRIPTOR1 = new StreamDescriptor().withName("stream 1").withNamespace("namespace 1");
private static final StreamDescriptor STREAM_DESCRIPTOR2 = new StreamDescriptor().withName("stream 2").withNamespace("namespace 2");

private static final StandardSyncOutput standardSyncOutput = new StandardSyncOutput()
.withStandardSyncSummary(
new StandardSyncSummary()
Expand Down Expand Up @@ -127,6 +148,28 @@ public void createJob() throws JsonValidationException, ConfigNotFoundException,
Assertions.assertThat(output.getJobId()).isEqualTo(JOB_ID);
}

@Test
@DisplayName("Test reset job creation")
public void createResetJob() throws JsonValidationException, ConfigNotFoundException, IOException {
final StandardSync standardSync = new StandardSync().withDestinationId(DESTINATION_ID);
Mockito.when(mConfigRepository.getStandardSync(CONNECTION_ID)).thenReturn(standardSync);
final DestinationConnection destination = new DestinationConnection().withDestinationDefinitionId(DESTINATION_DEFINITION_ID);
Mockito.when(mConfigRepository.getDestinationConnection(DESTINATION_ID)).thenReturn(destination);
final StandardDestinationDefinition destinationDefinition = new StandardDestinationDefinition()
.withDockerRepository(DOCKER_REPOSITORY)
.withDockerImageTag(DOCKER_IMAGE_TAG);
Mockito.when(mConfigRepository.getStandardDestinationDefinition(DESTINATION_DEFINITION_ID)).thenReturn(destinationDefinition);
final List<StreamDescriptor> streamsToReset = List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2);
Mockito.when(mStreamResetPersistence.getStreamResets(CONNECTION_ID)).thenReturn(streamsToReset);

Mockito.when(mJobCreator.createResetConnectionJob(destination, standardSync, DOCKER_IMAGE_NAME, List.of(), streamsToReset))
.thenReturn(Optional.of(JOB_ID));

final JobCreationOutput output = jobCreationAndStatusUpdateActivity.createNewJob(new JobCreationInput(CONNECTION_ID, true));

Assertions.assertThat(output.getJobId()).isEqualTo(JOB_ID);
}

@Test
@DisplayName("Test attempt creation")
public void createAttempt() throws IOException {
Expand Down