-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Use worker config resource requirements by default in jobs #10231
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
lmossman
merged 6 commits into
master
from
lmossman/use-worker-config-resource-requirements-by-default
Feb 9, 2022
Merged
Changes from 4 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
268a2f1
use WorkerConfigs resource requirements by default for Jobs
lmossman d689c8d
format
lmossman 2438f2a
fix test
lmossman 39d21f0
set default resource requirements in EnvConfigs
lmossman e83b4db
Revert "set default resource requirements in EnvConfigs"
lmossman 6841f97
format
lmossman File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
import io.airbyte.config.JobConfig.ConfigType; | ||
import io.airbyte.config.JobResetConnectionConfig; | ||
import io.airbyte.config.JobSyncConfig; | ||
import io.airbyte.config.ResourceRequirements; | ||
import io.airbyte.config.SourceConnection; | ||
import io.airbyte.config.StandardSync; | ||
import io.airbyte.config.StandardSyncOperation; | ||
|
@@ -24,10 +25,14 @@ public class DefaultJobCreator implements JobCreator { | |
|
||
private final JobPersistence jobPersistence; | ||
private final ConfigRepository configRepository; | ||
private final ResourceRequirements workerResourceRequirements; | ||
|
||
public DefaultJobCreator(final JobPersistence jobPersistence, final ConfigRepository configRepository) { | ||
public DefaultJobCreator(final JobPersistence jobPersistence, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the class with the main changes of using the WorkerConfig resource requirements if they are not set on the StandardSync |
||
final ConfigRepository configRepository, | ||
final ResourceRequirements workerResourceRequirements) { | ||
this.jobPersistence = jobPersistence; | ||
this.configRepository = configRepository; | ||
this.workerResourceRequirements = workerResourceRequirements; | ||
} | ||
|
||
@Override | ||
|
@@ -50,7 +55,7 @@ public Optional<Long> createSyncJob(final SourceConnection source, | |
.withOperationSequence(standardSyncOperations) | ||
.withConfiguredAirbyteCatalog(standardSync.getCatalog()) | ||
.withState(null) | ||
.withResourceRequirements(standardSync.getResourceRequirements()); | ||
.withResourceRequirements(getJobResourceRequirements(standardSync)); | ||
|
||
configRepository.getConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState); | ||
|
||
|
@@ -86,12 +91,34 @@ public Optional<Long> createResetConnectionJob(final DestinationConnection desti | |
.withDestinationConfiguration(destination.getConfiguration()) | ||
.withOperationSequence(standardSyncOperations) | ||
.withConfiguredAirbyteCatalog(configuredAirbyteCatalog) | ||
.withResourceRequirements(standardSync.getResourceRequirements()); | ||
.withResourceRequirements(getJobResourceRequirements(standardSync)); | ||
|
||
final JobConfig jobConfig = new JobConfig() | ||
.withConfigType(ConfigType.RESET_CONNECTION) | ||
.withResetConnection(resetConnectionConfig); | ||
return jobPersistence.enqueueJob(standardSync.getConnectionId().toString(), jobConfig); | ||
} | ||
|
||
private ResourceRequirements getJobResourceRequirements(final StandardSync standardSync) { | ||
if (standardSync.getResourceRequirements() == null) { | ||
return workerResourceRequirements; | ||
} | ||
|
||
final ResourceRequirements jobResourceRequirements = standardSync.getResourceRequirements(); | ||
if (jobResourceRequirements.getCpuRequest() == null) { | ||
jobResourceRequirements.setCpuRequest(workerResourceRequirements.getCpuRequest()); | ||
} | ||
if (jobResourceRequirements.getCpuLimit() == null) { | ||
jobResourceRequirements.setCpuLimit(workerResourceRequirements.getCpuLimit()); | ||
} | ||
if (jobResourceRequirements.getMemoryRequest() == null) { | ||
jobResourceRequirements.setMemoryRequest(workerResourceRequirements.getMemoryRequest()); | ||
} | ||
if (jobResourceRequirements.getMemoryLimit() == null) { | ||
jobResourceRequirements.setMemoryLimit(workerResourceRequirements.getMemoryLimit()); | ||
} | ||
|
||
return jobResourceRequirements; | ||
} | ||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this should be safe to do here in the SchedulerApp, because I see that the resource requirement env variables exist in each of these environments:
airbyte/docker-compose.yaml
Lines 67 to 70 in 84d1456
airbyte/kube/overlays/stable/.env
Lines 57 to 60 in 84d1456
airbyte/charts/airbyte/templates/scheduler/deployment.yaml
Lines 127 to 146 in 84d1456