Skip to content

make status checks configurable from env vars + use shorter replication interval for testing #10368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions airbyte-config/models/src/main/java/io/airbyte/config/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.storage.CloudStorageConfigs;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -293,6 +294,41 @@ public interface Configs {
*/
String getJobKubeNamespace();

/**
* Define the interval for checking for a Kubernetes pod status for a worker of an unspecified type.
*
* In seconds if specified by environment variable.
*/
Duration getDefaultWorkerStatusCheckInterval();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you go through the class doc string and follow that accordingly?

I'm guessing these should be marked as internal-use only since we don't want to expose them to OSS users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops missed this.

Added in #10390


/**
* Define the interval for checking for "get spec" Kubernetes pod statuses.
*
* In seconds if specified by environment variable.
*/
Duration getSpecWorkerStatusCheckInterval();

/**
* Define the interval for checking for "check connection" Kubernetes pod statuses.
*
* In seconds if specified by environment variable.
*/
Duration getCheckWorkerStatusCheckInterval();

/**
* Define the interval for checking for "discover" Kubernetes pod statuses.
*
* In seconds if specified by environment variable.
*/
Duration getDiscoverWorkerStatusCheckInterval();

/**
* Define the interval for checking for "replication" Kubernetes pod statuses.
*
* In seconds if specified by environment variable.
*/
Duration getReplicationWorkerStatusCheckInterval();

// Logging/Monitoring/Tracking
/**
* Define either S3, Minio or GCS as a logging backend. Kubernetes only. Multiple variables are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.config.storage.CloudStorageConfigs.MinioConfig;
import io.airbyte.config.storage.CloudStorageConfigs.S3Config;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -117,6 +118,12 @@ public class EnvConfigs implements Configs {
private static final String REPLICATION_ORCHESTRATOR_MEMORY_REQUEST = "REPLICATION_ORCHESTRATOR_MEMORY_REQUEST";
private static final String REPLICATION_ORCHESTRATOR_MEMORY_LIMIT = "REPLICATION_ORCHESTRATOR_MEMORY_LIMIT";

private static final String DEFAULT_WORKER_STATUS_CHECK_INTERVAL = "DEFAULT_WORKER_STATUS_CHECK_INTERVAL";
private static final String SPEC_WORKER_STATUS_CHECK_INTERVAL = "SPEC_WORKER_STATUS_CHECK_INTERVAL";
private static final String CHECK_WORKER_STATUS_CHECK_INTERVAL = "CHECK_WORKER_STATUS_CHECK_INTERVAL";
private static final String DISCOVER_WORKER_STATUS_CHECK_INTERVAL = "DISCOVER_WORKER_STATUS_CHECK_INTERVAL";
private static final String REPLICATION_WORKER_STATUS_CHECK_INTERVAL = "REPLICATION_WORKER_STATUS_CHECK_INTERVAL";

// defaults
private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache";
public static final String DEFAULT_JOB_KUBE_NAMESPACE = "default";
Expand All @@ -133,6 +140,12 @@ public class EnvConfigs implements Configs {
private static final long DEFAULT_MAXIMUM_WORKSPACE_SIZE_MB = 5000;
private static final int DEFAULT_DATABASE_INITIALIZATION_TIMEOUT_MS = 60 * 1000;

private static final Duration DEFAULT_DEFAULT_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30);
private static final Duration DEFAULT_SPEC_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1);
private static final Duration DEFAULT_CHECK_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1);
private static final Duration DEFAULT_DISCOVER_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1);
private static final Duration DEFAULT_REPLICATION_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30);

public static final long DEFAULT_MAX_SPEC_WORKERS = 5;
public static final long DEFAULT_MAX_CHECK_WORKERS = 5;
public static final long DEFAULT_MAX_DISCOVER_WORKERS = 5;
Expand Down Expand Up @@ -532,6 +545,46 @@ public String getJobKubeNamespace() {
return getEnvOrDefault(JOB_KUBE_NAMESPACE, DEFAULT_JOB_KUBE_NAMESPACE);
}

@Override
public Duration getDefaultWorkerStatusCheckInterval() {
return getEnvOrDefault(
DEFAULT_WORKER_STATUS_CHECK_INTERVAL,
DEFAULT_DEFAULT_WORKER_STATUS_CHECK_INTERVAL,
value -> Duration.ofSeconds(Integer.parseInt(value)));
}

@Override
public Duration getSpecWorkerStatusCheckInterval() {
return getEnvOrDefault(
SPEC_WORKER_STATUS_CHECK_INTERVAL,
DEFAULT_SPEC_WORKER_STATUS_CHECK_INTERVAL,
value -> Duration.ofSeconds(Integer.parseInt(value)));
}

@Override
public Duration getCheckWorkerStatusCheckInterval() {
return getEnvOrDefault(
CHECK_WORKER_STATUS_CHECK_INTERVAL,
DEFAULT_CHECK_WORKER_STATUS_CHECK_INTERVAL,
value -> Duration.ofSeconds(Integer.parseInt(value)));
}

@Override
public Duration getDiscoverWorkerStatusCheckInterval() {
return getEnvOrDefault(
DISCOVER_WORKER_STATUS_CHECK_INTERVAL,
DEFAULT_DISCOVER_WORKER_STATUS_CHECK_INTERVAL,
value -> Duration.ofSeconds(Integer.parseInt(value)));
}

@Override
public Duration getReplicationWorkerStatusCheckInterval() {
return getEnvOrDefault(
REPLICATION_WORKER_STATUS_CHECK_INTERVAL,
DEFAULT_REPLICATION_WORKER_STATUS_CHECK_INTERVAL,
value -> Duration.ofSeconds(Integer.parseInt(value)));
}

@Override
public String getJobMainContainerCpuRequest() {
return getEnvOrDefault(JOB_MAIN_CONTAINER_CPU_REQUEST, DEFAULT_JOB_CPU_REQUIREMENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@
@AllArgsConstructor
public class WorkerConfigs {

private static final Duration DEFAULT_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30);
private static final Duration SPEC_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1);
private static final Duration CHECK_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1);
private static final Duration DISCOVER_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1);
private static final Duration REPLICATION_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30);

private final Configs.WorkerEnvironment workerEnvironment;
private final ResourceRequirements resourceRequirements;
private final List<TolerationPOJO> workerKubeTolerations;
Expand Down Expand Up @@ -54,7 +48,7 @@ public WorkerConfigs(final Configs configs) {
configs.getJobKubeBusyboxImage(),
configs.getJobKubeCurlImage(),
configs.getJobDefaultEnvMap(),
DEFAULT_WORKER_STATUS_CHECK_INTERVAL);
configs.getDefaultWorkerStatusCheckInterval());
}

/**
Expand All @@ -80,7 +74,7 @@ public static WorkerConfigs buildSpecWorkerConfigs(final Configs configs) {
configs.getJobKubeBusyboxImage(),
configs.getJobKubeCurlImage(),
configs.getJobDefaultEnvMap(),
SPEC_WORKER_STATUS_CHECK_INTERVAL);
configs.getSpecWorkerStatusCheckInterval());
}

/**
Expand All @@ -106,7 +100,7 @@ public static WorkerConfigs buildCheckWorkerConfigs(final Configs configs) {
configs.getJobKubeBusyboxImage(),
configs.getJobKubeCurlImage(),
configs.getJobDefaultEnvMap(),
CHECK_WORKER_STATUS_CHECK_INTERVAL);
configs.getCheckWorkerStatusCheckInterval());
}

/**
Expand All @@ -132,7 +126,7 @@ public static WorkerConfigs buildDiscoverWorkerConfigs(final Configs configs) {
configs.getJobKubeBusyboxImage(),
configs.getJobKubeCurlImage(),
configs.getJobDefaultEnvMap(),
DISCOVER_WORKER_STATUS_CHECK_INTERVAL);
configs.getDiscoverWorkerStatusCheckInterval());
}

public static WorkerConfigs buildReplicationWorkerConfigs(final Configs configs) {
Expand All @@ -151,7 +145,7 @@ public static WorkerConfigs buildReplicationWorkerConfigs(final Configs configs)
configs.getJobKubeBusyboxImage(),
configs.getJobKubeCurlImage(),
configs.getJobDefaultEnvMap(),
REPLICATION_WORKER_STATUS_CHECK_INTERVAL);
configs.getReplicationWorkerStatusCheckInterval());
}

public Configs.WorkerEnvironment getWorkerEnvironment() {
Expand Down
3 changes: 3 additions & 0 deletions kube/overlays/dev-integration-test-schedulerv2/.env
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,6 @@ JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY=
NEW_SCHEDULER=true
CONTAINER_ORCHESTRATOR_ENABLED=true

# Set this lower to speed up tests significantly
REPLICATION_WORKER_STATUS_CHECK_INTERVAL=1

Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@ spec:
configMapKeyRef:
name: airbyte-env
key: CONTAINER_ORCHESTRATOR_ENABLED
- name: REPLICATION_WORKER_STATUS_CHECK_INTERVAL
valueFrom:
configMapKeyRef:
name: airbyte-env
key: REPLICATION_WORKER_STATUS_CHECK_INTERVAL
3 changes: 3 additions & 0 deletions kube/overlays/dev-integration-test/.env
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,6 @@ JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY=
# Launch a separate pod to orchestrate sync steps
CONTAINER_ORCHESTRATOR_ENABLED=false

# Set this lower to speed up tests significantly
REPLICATION_WORKER_STATUS_CHECK_INTERVAL=1

1 change: 1 addition & 0 deletions kube/overlays/dev-integration-test/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ secretGenerator:

patchesStrategicMerge:
- parallelize-worker.yaml
- speed-up-worker.yaml
15 changes: 15 additions & 0 deletions kube/overlays/dev-integration-test/speed-up-worker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: airbyte-worker
spec:
template:
spec:
containers:
- name: airbyte-worker-container
env:
- name: REPLICATION_WORKER_STATUS_CHECK_INTERVAL
valueFrom:
configMapKeyRef:
name: airbyte-env
key: REPLICATION_WORKER_STATUS_CHECK_INTERVAL