From 90055665568404532b15d31fa52e5619b971e7af Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Tue, 15 Feb 2022 16:12:17 -0800 Subject: [PATCH 1/4] make status check interval env-configurable --- .../main/java/io/airbyte/config/Configs.java | 36 +++++++++++++ .../java/io/airbyte/config/EnvConfigs.java | 53 +++++++++++++++++++ .../io/airbyte/workers/WorkerConfigs.java | 16 ++---- 3 files changed, 94 insertions(+), 11 deletions(-) diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index 564bdf16cd614..a41d561622377 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -8,6 +8,7 @@ import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.storage.CloudStorageConfigs; import java.nio.file.Path; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; @@ -293,6 +294,41 @@ public interface Configs { */ String getJobKubeNamespace(); + /** + * Define the interval for checking for a Kubernetes pod status for a worker of an unspecified type. + * + * In seconds if specified by environment variable. + */ + Duration getDefaultWorkerStatusCheckInterval(); + + /** + * Define the interval for checking for "get spec" Kubernetes pod statuses. + * + * In seconds if specified by environment variable. + */ + Duration getSpecWorkerStatusCheckInterval(); + + /** + * Define the interval for checking for "check connection" Kubernetes pod statuses. + * + * In seconds if specified by environment variable. + */ + Duration getCheckWorkerStatusCheckInterval(); + + /** + * Define the interval for checking for "discover" Kubernetes pod statuses. + * + * In seconds if specified by environment variable. + */ + Duration getDiscoverWorkerStatusCheckInterval(); + + /** + * Define the interval for checking for "replication" Kubernetes pod statuses. + * + * In seconds if specified by environment variable. + */ + Duration getReplicationWorkerStatusCheckInterval(); + // Logging/Monitoring/Tracking /** * Define either S3, Minio or GCS as a logging backend. Kubernetes only. Multiple variables are diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index 985d40cb17eb2..958d62447c587 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -15,6 +15,7 @@ import io.airbyte.config.storage.CloudStorageConfigs.MinioConfig; import io.airbyte.config.storage.CloudStorageConfigs.S3Config; import java.nio.file.Path; +import java.time.Duration; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -117,6 +118,12 @@ public class EnvConfigs implements Configs { private static final String REPLICATION_ORCHESTRATOR_MEMORY_REQUEST = "REPLICATION_ORCHESTRATOR_MEMORY_REQUEST"; private static final String REPLICATION_ORCHESTRATOR_MEMORY_LIMIT = "REPLICATION_ORCHESTRATOR_MEMORY_LIMIT"; + private static final String DEFAULT_WORKER_STATUS_CHECK_INTERVAL = "DEFAULT_WORKER_STATUS_CHECK_INTERVAL"; + private static final String SPEC_WORKER_STATUS_CHECK_INTERVAL = "SPEC_WORKER_STATUS_CHECK_INTERVAL"; + private static final String CHECK_WORKER_STATUS_CHECK_INTERVAL = "CHECK_WORKER_STATUS_CHECK_INTERVAL"; + private static final String DISCOVER_WORKER_STATUS_CHECK_INTERVAL = "DISCOVER_WORKER_STATUS_CHECK_INTERVAL"; + private static final String REPLICATION_WORKER_STATUS_CHECK_INTERVAL = "REPLICATION_WORKER_STATUS_CHECK_INTERVAL"; + // defaults private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache"; public static final String DEFAULT_JOB_KUBE_NAMESPACE = "default"; @@ -133,6 +140,12 @@ public class EnvConfigs implements Configs { private static final long DEFAULT_MAXIMUM_WORKSPACE_SIZE_MB = 5000; private static final int DEFAULT_DATABASE_INITIALIZATION_TIMEOUT_MS = 60 * 1000; + private static final Duration DEFAULT_DEFAULT_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30); + private static final Duration DEFAULT_SPEC_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1); + private static final Duration DEFAULT_CHECK_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1); + private static final Duration DEFAULT_DISCOVER_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1); + private static final Duration DEFAULT_REPLICATION_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30); + public static final long DEFAULT_MAX_SPEC_WORKERS = 5; public static final long DEFAULT_MAX_CHECK_WORKERS = 5; public static final long DEFAULT_MAX_DISCOVER_WORKERS = 5; @@ -532,6 +545,46 @@ public String getJobKubeNamespace() { return getEnvOrDefault(JOB_KUBE_NAMESPACE, DEFAULT_JOB_KUBE_NAMESPACE); } + @Override + public Duration getDefaultWorkerStatusCheckInterval() { + return getEnvOrDefault( + DEFAULT_WORKER_STATUS_CHECK_INTERVAL, + DEFAULT_DEFAULT_WORKER_STATUS_CHECK_INTERVAL, + value -> Duration.ofSeconds(Integer.parseInt(value))); + } + + @Override + public Duration getSpecWorkerStatusCheckInterval() { + return getEnvOrDefault( + SPEC_WORKER_STATUS_CHECK_INTERVAL, + DEFAULT_SPEC_WORKER_STATUS_CHECK_INTERVAL, + value -> Duration.ofSeconds(Integer.parseInt(value))); + } + + @Override + public Duration getCheckWorkerStatusCheckInterval() { + return getEnvOrDefault( + CHECK_WORKER_STATUS_CHECK_INTERVAL, + DEFAULT_CHECK_WORKER_STATUS_CHECK_INTERVAL, + value -> Duration.ofSeconds(Integer.parseInt(value))); + } + + @Override + public Duration getDiscoverWorkerStatusCheckInterval() { + return getEnvOrDefault( + DISCOVER_WORKER_STATUS_CHECK_INTERVAL, + DEFAULT_DISCOVER_WORKER_STATUS_CHECK_INTERVAL, + value -> Duration.ofSeconds(Integer.parseInt(value))); + } + + @Override + public Duration getReplicationWorkerStatusCheckInterval() { + return getEnvOrDefault( + REPLICATION_WORKER_STATUS_CHECK_INTERVAL, + DEFAULT_REPLICATION_WORKER_STATUS_CHECK_INTERVAL, + value -> Duration.ofSeconds(Integer.parseInt(value))); + } + @Override public String getJobMainContainerCpuRequest() { return getEnvOrDefault(JOB_MAIN_CONTAINER_CPU_REQUEST, DEFAULT_JOB_CPU_REQUIREMENT); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerConfigs.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerConfigs.java index 39b785957b23c..10d85682f73e2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerConfigs.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerConfigs.java @@ -16,12 +16,6 @@ @AllArgsConstructor public class WorkerConfigs { - private static final Duration DEFAULT_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30); - private static final Duration SPEC_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1); - private static final Duration CHECK_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1); - private static final Duration DISCOVER_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(1); - private static final Duration REPLICATION_WORKER_STATUS_CHECK_INTERVAL = Duration.ofSeconds(30); - private final Configs.WorkerEnvironment workerEnvironment; private final ResourceRequirements resourceRequirements; private final List workerKubeTolerations; @@ -54,7 +48,7 @@ public WorkerConfigs(final Configs configs) { configs.getJobKubeBusyboxImage(), configs.getJobKubeCurlImage(), configs.getJobDefaultEnvMap(), - DEFAULT_WORKER_STATUS_CHECK_INTERVAL); + configs.getDefaultWorkerStatusCheckInterval()); } /** @@ -80,7 +74,7 @@ public static WorkerConfigs buildSpecWorkerConfigs(final Configs configs) { configs.getJobKubeBusyboxImage(), configs.getJobKubeCurlImage(), configs.getJobDefaultEnvMap(), - SPEC_WORKER_STATUS_CHECK_INTERVAL); + configs.getSpecWorkerStatusCheckInterval()); } /** @@ -106,7 +100,7 @@ public static WorkerConfigs buildCheckWorkerConfigs(final Configs configs) { configs.getJobKubeBusyboxImage(), configs.getJobKubeCurlImage(), configs.getJobDefaultEnvMap(), - CHECK_WORKER_STATUS_CHECK_INTERVAL); + configs.getCheckWorkerStatusCheckInterval()); } /** @@ -132,7 +126,7 @@ public static WorkerConfigs buildDiscoverWorkerConfigs(final Configs configs) { configs.getJobKubeBusyboxImage(), configs.getJobKubeCurlImage(), configs.getJobDefaultEnvMap(), - DISCOVER_WORKER_STATUS_CHECK_INTERVAL); + configs.getDiscoverWorkerStatusCheckInterval()); } public static WorkerConfigs buildReplicationWorkerConfigs(final Configs configs) { @@ -151,7 +145,7 @@ public static WorkerConfigs buildReplicationWorkerConfigs(final Configs configs) configs.getJobKubeBusyboxImage(), configs.getJobKubeCurlImage(), configs.getJobDefaultEnvMap(), - REPLICATION_WORKER_STATUS_CHECK_INTERVAL); + configs.getReplicationWorkerStatusCheckInterval()); } public Configs.WorkerEnvironment getWorkerEnvironment() { From 97159e3a8b3b9e93c211e42446908444cfd1f12e Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Tue, 15 Feb 2022 16:15:32 -0800 Subject: [PATCH 2/4] apply to test files to get the speed improvements --- .../dev-integration-test-schedulerv2/.env | 3 +++ .../worker-patch.yaml | 5 +++++ kube/overlays/dev-integration-test/.env | 3 +++ .../dev-integration-test/kustomization.yaml | 1 + .../dev-integration-test/speed-up-worker.yaml | 15 +++++++++++++++ 5 files changed, 27 insertions(+) create mode 100644 kube/overlays/dev-integration-test/speed-up-worker.yaml diff --git a/kube/overlays/dev-integration-test-schedulerv2/.env b/kube/overlays/dev-integration-test-schedulerv2/.env index c60ac581bd11f..a2f0978096671 100644 --- a/kube/overlays/dev-integration-test-schedulerv2/.env +++ b/kube/overlays/dev-integration-test-schedulerv2/.env @@ -68,3 +68,6 @@ JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= NEW_SCHEDULER=true CONTAINER_ORCHESTRATOR_ENABLED=true +# Set this lower to speed up tests significantly +REPLICATION_WORKER_STATUS_CHECK_INTERVAL=1 + diff --git a/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml b/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml index 32a6bbff83b8e..8e790435392ee 100644 --- a/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml +++ b/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml @@ -13,3 +13,8 @@ spec: configMapKeyRef: name: airbyte-env key: CONTAINER_ORCHESTRATOR_ENABLED + - name: REPLICATION_WORKER_STATUS_CHECK_INTERVAL + valueFrom: + configMapKeyRef: + name: airbyte-env + key: REPLICATION_WORKER_STATUS_CHECK_INTERVAL diff --git a/kube/overlays/dev-integration-test/.env b/kube/overlays/dev-integration-test/.env index a09e9b9ae23b0..b7bf847ea7537 100644 --- a/kube/overlays/dev-integration-test/.env +++ b/kube/overlays/dev-integration-test/.env @@ -67,3 +67,6 @@ JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= # Launch a separate pod to orchestrate sync steps CONTAINER_ORCHESTRATOR_ENABLED=false +# Set this lower to speed up tests significantly +REPLICATION_WORKER_STATUS_CHECK_INTERVAL=1 + diff --git a/kube/overlays/dev-integration-test/kustomization.yaml b/kube/overlays/dev-integration-test/kustomization.yaml index 04f0725377a98..d66fa91f5075e 100644 --- a/kube/overlays/dev-integration-test/kustomization.yaml +++ b/kube/overlays/dev-integration-test/kustomization.yaml @@ -32,3 +32,4 @@ secretGenerator: patchesStrategicMerge: - parallelize-worker.yaml + - speed-up-worker.yaml diff --git a/kube/overlays/dev-integration-test/speed-up-worker.yaml b/kube/overlays/dev-integration-test/speed-up-worker.yaml new file mode 100644 index 0000000000000..ce9e6a00a32ec --- /dev/null +++ b/kube/overlays/dev-integration-test/speed-up-worker.yaml @@ -0,0 +1,15 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: airbyte-worker +spec: + template: + spec: + containers: + - name: airbyte-worker-container + env: + - name: REPLICATION_WORKER_STATUS_CHECK_INTERVAL + valueFrom: + configMapKeyRef: + name: airbyte-env + key: REPLICATION_WORKER_STATUS_CHECK_INTERVAL From bf3c6a56125504ab78c43aaa040e1219660aa082 Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Tue, 15 Feb 2022 16:19:00 -0800 Subject: [PATCH 3/4] evert "apply to test files to get the speed improvements" This reverts commit 97159e3a8b3b9e93c211e42446908444cfd1f12e. --- .../dev-integration-test-schedulerv2/.env | 3 --- .../worker-patch.yaml | 5 ----- kube/overlays/dev-integration-test/.env | 3 --- .../dev-integration-test/kustomization.yaml | 1 - .../dev-integration-test/speed-up-worker.yaml | 15 --------------- 5 files changed, 27 deletions(-) delete mode 100644 kube/overlays/dev-integration-test/speed-up-worker.yaml diff --git a/kube/overlays/dev-integration-test-schedulerv2/.env b/kube/overlays/dev-integration-test-schedulerv2/.env index a2f0978096671..c60ac581bd11f 100644 --- a/kube/overlays/dev-integration-test-schedulerv2/.env +++ b/kube/overlays/dev-integration-test-schedulerv2/.env @@ -68,6 +68,3 @@ JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= NEW_SCHEDULER=true CONTAINER_ORCHESTRATOR_ENABLED=true -# Set this lower to speed up tests significantly -REPLICATION_WORKER_STATUS_CHECK_INTERVAL=1 - diff --git a/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml b/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml index 8e790435392ee..32a6bbff83b8e 100644 --- a/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml +++ b/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml @@ -13,8 +13,3 @@ spec: configMapKeyRef: name: airbyte-env key: CONTAINER_ORCHESTRATOR_ENABLED - - name: REPLICATION_WORKER_STATUS_CHECK_INTERVAL - valueFrom: - configMapKeyRef: - name: airbyte-env - key: REPLICATION_WORKER_STATUS_CHECK_INTERVAL diff --git a/kube/overlays/dev-integration-test/.env b/kube/overlays/dev-integration-test/.env index b7bf847ea7537..a09e9b9ae23b0 100644 --- a/kube/overlays/dev-integration-test/.env +++ b/kube/overlays/dev-integration-test/.env @@ -67,6 +67,3 @@ JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= # Launch a separate pod to orchestrate sync steps CONTAINER_ORCHESTRATOR_ENABLED=false -# Set this lower to speed up tests significantly -REPLICATION_WORKER_STATUS_CHECK_INTERVAL=1 - diff --git a/kube/overlays/dev-integration-test/kustomization.yaml b/kube/overlays/dev-integration-test/kustomization.yaml index d66fa91f5075e..04f0725377a98 100644 --- a/kube/overlays/dev-integration-test/kustomization.yaml +++ b/kube/overlays/dev-integration-test/kustomization.yaml @@ -32,4 +32,3 @@ secretGenerator: patchesStrategicMerge: - parallelize-worker.yaml - - speed-up-worker.yaml diff --git a/kube/overlays/dev-integration-test/speed-up-worker.yaml b/kube/overlays/dev-integration-test/speed-up-worker.yaml deleted file mode 100644 index ce9e6a00a32ec..0000000000000 --- a/kube/overlays/dev-integration-test/speed-up-worker.yaml +++ /dev/null @@ -1,15 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: airbyte-worker -spec: - template: - spec: - containers: - - name: airbyte-worker-container - env: - - name: REPLICATION_WORKER_STATUS_CHECK_INTERVAL - valueFrom: - configMapKeyRef: - name: airbyte-env - key: REPLICATION_WORKER_STATUS_CHECK_INTERVAL From 4642804df5da68f661eadff75ddf6339c9e79beb Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Tue, 15 Feb 2022 16:19:17 -0800 Subject: [PATCH 4/4] Revert "evert "apply to test files to get the speed improvements"" This reverts commit bf3c6a56125504ab78c43aaa040e1219660aa082. --- .../dev-integration-test-schedulerv2/.env | 3 +++ .../worker-patch.yaml | 5 +++++ kube/overlays/dev-integration-test/.env | 3 +++ .../dev-integration-test/kustomization.yaml | 1 + .../dev-integration-test/speed-up-worker.yaml | 15 +++++++++++++++ 5 files changed, 27 insertions(+) create mode 100644 kube/overlays/dev-integration-test/speed-up-worker.yaml diff --git a/kube/overlays/dev-integration-test-schedulerv2/.env b/kube/overlays/dev-integration-test-schedulerv2/.env index c60ac581bd11f..a2f0978096671 100644 --- a/kube/overlays/dev-integration-test-schedulerv2/.env +++ b/kube/overlays/dev-integration-test-schedulerv2/.env @@ -68,3 +68,6 @@ JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= NEW_SCHEDULER=true CONTAINER_ORCHESTRATOR_ENABLED=true +# Set this lower to speed up tests significantly +REPLICATION_WORKER_STATUS_CHECK_INTERVAL=1 + diff --git a/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml b/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml index 32a6bbff83b8e..8e790435392ee 100644 --- a/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml +++ b/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml @@ -13,3 +13,8 @@ spec: configMapKeyRef: name: airbyte-env key: CONTAINER_ORCHESTRATOR_ENABLED + - name: REPLICATION_WORKER_STATUS_CHECK_INTERVAL + valueFrom: + configMapKeyRef: + name: airbyte-env + key: REPLICATION_WORKER_STATUS_CHECK_INTERVAL diff --git a/kube/overlays/dev-integration-test/.env b/kube/overlays/dev-integration-test/.env index a09e9b9ae23b0..b7bf847ea7537 100644 --- a/kube/overlays/dev-integration-test/.env +++ b/kube/overlays/dev-integration-test/.env @@ -67,3 +67,6 @@ JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= # Launch a separate pod to orchestrate sync steps CONTAINER_ORCHESTRATOR_ENABLED=false +# Set this lower to speed up tests significantly +REPLICATION_WORKER_STATUS_CHECK_INTERVAL=1 + diff --git a/kube/overlays/dev-integration-test/kustomization.yaml b/kube/overlays/dev-integration-test/kustomization.yaml index 04f0725377a98..d66fa91f5075e 100644 --- a/kube/overlays/dev-integration-test/kustomization.yaml +++ b/kube/overlays/dev-integration-test/kustomization.yaml @@ -32,3 +32,4 @@ secretGenerator: patchesStrategicMerge: - parallelize-worker.yaml + - speed-up-worker.yaml diff --git a/kube/overlays/dev-integration-test/speed-up-worker.yaml b/kube/overlays/dev-integration-test/speed-up-worker.yaml new file mode 100644 index 0000000000000..ce9e6a00a32ec --- /dev/null +++ b/kube/overlays/dev-integration-test/speed-up-worker.yaml @@ -0,0 +1,15 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: airbyte-worker +spec: + template: + spec: + containers: + - name: airbyte-worker-container + env: + - name: REPLICATION_WORKER_STATUS_CHECK_INTERVAL + valueFrom: + configMapKeyRef: + name: airbyte-env + key: REPLICATION_WORKER_STATUS_CHECK_INTERVAL