Skip to content

Commit 511587c

Browse files
committed
pipe through to worker
1 parent ba88c44 commit 511587c

File tree

14 files changed

+147
-23
lines changed

14 files changed

+147
-23
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"$schema": http://json-schema.org/draft-07/schema#
3+
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/ActorType.yaml
4+
title: ActorType
5+
description: enum that describes different types of actors
6+
type: string
7+
enum:
8+
- source
9+
- destination

airbyte-config/models/src/main/resources/types/JobSyncConfig.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ properties:
3939
destinationDockerImage:
4040
description: Image name of the destination with tag.
4141
type: string
42+
sourceResourceRequirements:
43+
"$ref": ActorDefinitionResourceRequirements.yaml
44+
destinationResourceRequirements:
45+
"$ref": ActorDefinitionResourceRequirements.yaml
4246
operationSequence:
4347
description: Sequence of configurations of operations to apply as part of the sync
4448
type: array

airbyte-config/models/src/main/resources/types/StandardSyncInput.yaml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,16 @@ properties:
3535
catalog:
3636
description: the configured airbyte catalog
3737
type: object
38+
# necessary because the configuration declaration is in a separate package.
3839
existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog
3940
state:
4041
description: optional state of the previous run. this object is defined per integration.
4142
"$ref": State.yaml
4243
resourceRequirements:
43-
description: optional resource requirements to run sync workers
44+
description: optional resource requirements to run sync workers (confusing - overrides source and destination). takes precedence over source and destination resource requirements.
4445
type: object
45-
existingJavaType: io.airbyte.config.ResourceRequirements
46+
"$ref": ResourceRequirements.yaml
47+
sourceResourceRequirements:
48+
"$ref": ActorDefinitionResourceRequirements.yaml
49+
destinationResourceRequirements:
50+
"$ref": ActorDefinitionResourceRequirements.yaml

airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSchedulerJobClient.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.scheduler.client;
66

7+
import io.airbyte.config.ActorDefinitionResourceRequirements;
78
import io.airbyte.config.DestinationConnection;
89
import io.airbyte.config.SourceConnection;
910
import io.airbyte.config.StandardSync;
@@ -35,15 +36,19 @@ public Job createOrGetActiveSyncJob(final SourceConnection source,
3536
final StandardSync standardSync,
3637
final String sourceDockerImage,
3738
final String destinationDockerImage,
38-
final List<StandardSyncOperation> standardSyncOperations)
39+
final List<StandardSyncOperation> standardSyncOperations,
40+
final ActorDefinitionResourceRequirements sourceResourceRequirements,
41+
final ActorDefinitionResourceRequirements destinationResourceRequirements)
3942
throws IOException {
4043
final Optional<Long> jobIdOptional = jobCreator.createSyncJob(
4144
source,
4245
destination,
4346
standardSync,
4447
sourceDockerImage,
4548
destinationDockerImage,
46-
standardSyncOperations);
49+
standardSyncOperations,
50+
sourceResourceRequirements,
51+
destinationResourceRequirements);
4752

4853
final long jobId = jobIdOptional.isEmpty()
4954
? jobPersistence.getLastReplicationJob(standardSync.getConnectionId()).orElseThrow(() -> new RuntimeException("No job available")).getId()

airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/SchedulerJobClient.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.scheduler.client;
66

7+
import io.airbyte.config.ActorDefinitionResourceRequirements;
78
import io.airbyte.config.DestinationConnection;
89
import io.airbyte.config.SourceConnection;
910
import io.airbyte.config.StandardSync;
@@ -24,7 +25,9 @@ Job createOrGetActiveSyncJob(SourceConnection source,
2425
StandardSync standardSync,
2526
String sourceDockerImage,
2627
String destinationDockerImage,
27-
List<StandardSyncOperation> standardSyncOperations)
28+
List<StandardSyncOperation> standardSyncOperations,
29+
ActorDefinitionResourceRequirements sourceResourceRequirements,
30+
ActorDefinitionResourceRequirements destinationResourceRequirements)
2831
throws IOException;
2932

3033
Job createOrGetActiveResetConnectionJob(DestinationConnection destination,

airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSchedulerJobClientTest.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,13 @@ void testCreateSyncJob() throws IOException {
4646
final DestinationConnection destination = mock(DestinationConnection.class);
4747
final StandardSync standardSync = mock(StandardSync.class);
4848
final String destinationDockerImage = "airbyte/spaceport";
49-
when(jobCreator.createSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of()))
49+
when(jobCreator.createSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of(), null, null))
5050
.thenReturn(Optional.of(JOB_ID));
5151
when(jobPersistence.getJob(JOB_ID)).thenReturn(job);
5252

53-
assertEquals(job, client.createOrGetActiveSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of()));
53+
assertEquals(
54+
job,
55+
client.createOrGetActiveSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of(), null, null));
5456
}
5557

5658
@Test
@@ -61,14 +63,23 @@ void testCreateSyncJobAlreadyExist() throws IOException {
6163
final UUID connectionUuid = UUID.randomUUID();
6264
when(standardSync.getConnectionId()).thenReturn(connectionUuid);
6365
final String destinationDockerImage = "airbyte/spaceport";
64-
when(jobCreator.createSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of())).thenReturn(Optional.empty());
66+
when(jobCreator.createSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of(), null, null))
67+
.thenReturn(Optional.empty());
6568

6669
final Job currentJob = mock(Job.class);
6770
when(currentJob.getId()).thenReturn(42L);
6871
when(jobPersistence.getLastReplicationJob(connectionUuid)).thenReturn(Optional.of(currentJob));
6972
when(jobPersistence.getJob(42L)).thenReturn(currentJob);
7073

71-
assertEquals(currentJob, client.createOrGetActiveSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of()));
74+
assertEquals(currentJob, client.createOrGetActiveSyncJob(
75+
source,
76+
destination,
77+
standardSync,
78+
DOCKER_IMAGE,
79+
destinationDockerImage,
80+
List.of(),
81+
null,
82+
null));
7283
}
7384

7485
@Test

airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobCreator.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.scheduler.persistence;
66

7+
import io.airbyte.config.ActorDefinitionResourceRequirements;
78
import io.airbyte.config.DestinationConnection;
89
import io.airbyte.config.JobConfig;
910
import io.airbyte.config.JobConfig.ConfigType;
@@ -41,7 +42,9 @@ public Optional<Long> createSyncJob(final SourceConnection source,
4142
final StandardSync standardSync,
4243
final String sourceDockerImageName,
4344
final String destinationDockerImageName,
44-
final List<StandardSyncOperation> standardSyncOperations)
45+
final List<StandardSyncOperation> standardSyncOperations,
46+
final ActorDefinitionResourceRequirements sourceResourceReqs,
47+
final ActorDefinitionResourceRequirements destinationResourceReqs)
4548
throws IOException {
4649
// reusing this isn't going to quite work.
4750
final JobSyncConfig jobSyncConfig = new JobSyncConfig()
@@ -55,7 +58,9 @@ public Optional<Long> createSyncJob(final SourceConnection source,
5558
.withOperationSequence(standardSyncOperations)
5659
.withConfiguredAirbyteCatalog(standardSync.getCatalog())
5760
.withState(null)
58-
.withResourceRequirements(getJobResourceRequirements(standardSync));
61+
.withResourceRequirements(getJobResourceRequirements(standardSync))
62+
.withSourceResourceRequirements(sourceResourceReqs)
63+
.withDestinationResourceRequirements(destinationResourceReqs);
5964

6065
configRepository.getConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState);
6166

airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobCreator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.scheduler.persistence;
66

7+
import io.airbyte.config.ActorDefinitionResourceRequirements;
78
import io.airbyte.config.DestinationConnection;
89
import io.airbyte.config.SourceConnection;
910
import io.airbyte.config.StandardSync;
@@ -29,7 +30,9 @@ Optional<Long> createSyncJob(SourceConnection source,
2930
StandardSync standardSync,
3031
String sourceDockerImage,
3132
String destinationDockerImage,
32-
List<StandardSyncOperation> standardSyncOperations)
33+
List<StandardSyncOperation> standardSyncOperations,
34+
ActorDefinitionResourceRequirements sourceResourceReqs,
35+
ActorDefinitionResourceRequirements destinationResourceReqs)
3336
throws IOException;
3437

3538
/**

airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactory.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,10 @@ public Long create(final UUID connectionId) {
5050
destinationConnection.getWorkspaceId(),
5151
destinationConnection.getConfiguration());
5252
destinationConnection.withConfiguration(destinationConfiguration);
53-
final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(sourceConnection.getSourceDefinitionId());
54-
final StandardDestinationDefinition destinationDefinition =
55-
configRepository.getStandardDestinationDefinition(destinationConnection.getDestinationDefinitionId());
53+
final StandardSourceDefinition sourceDefinition = configRepository
54+
.getStandardSourceDefinition(sourceConnection.getSourceDefinitionId());
55+
final StandardDestinationDefinition destinationDefinition = configRepository
56+
.getStandardDestinationDefinition(destinationConnection.getDestinationDefinitionId());
5657

5758
final String sourceImageName = DockerUtils.getTaggedImageName(sourceDefinition.getDockerRepository(), sourceDefinition.getDockerImageTag());
5859
final String destinationImageName =
@@ -70,7 +71,9 @@ public Long create(final UUID connectionId) {
7071
standardSync,
7172
sourceImageName,
7273
destinationImageName,
73-
standardSyncOperations)
74+
standardSyncOperations,
75+
sourceDefinition.getResourceRequirements(),
76+
destinationDefinition.getResourceRequirements())
7477
.orElseThrow(() -> new IllegalStateException("We shouldn't be trying to create a new sync job if there is one running already."));
7578

7679
} catch (final IOException | JsonValidationException | ConfigNotFoundException e) {

airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobCreatorTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,9 @@ void testCreateSyncJob() throws IOException {
151151
STANDARD_SYNC,
152152
SOURCE_IMAGE_NAME,
153153
DESTINATION_IMAGE_NAME,
154-
List.of(STANDARD_SYNC_OPERATION)).orElseThrow();
154+
List.of(STANDARD_SYNC_OPERATION),
155+
null,
156+
null).orElseThrow();
155157
assertEquals(JOB_ID, jobId);
156158
}
157159

@@ -182,7 +184,9 @@ void testCreateSyncJobEnsureNoQueuing() throws IOException {
182184
STANDARD_SYNC,
183185
SOURCE_IMAGE_NAME,
184186
DESTINATION_IMAGE_NAME,
185-
List.of(STANDARD_SYNC_OPERATION)).isEmpty());
187+
List.of(STANDARD_SYNC_OPERATION),
188+
null,
189+
null).isEmpty());
186190
}
187191

188192
@Test

0 commit comments

Comments
 (0)