Skip to content

Commit 14a29a0

Browse files
authored
Auto Detect Schema Change environment variable (#19312)
* auto detect schema environment variable
1 parent fcf264e commit 14a29a0

File tree

21 files changed

+54
-7
lines changed

21 files changed

+54
-7
lines changed

.env

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,5 @@ METRIC_CLIENT=
113113
OTEL_COLLECTOR_ENDPOINT="http://host.docker.internal:4317"
114114

115115
USE_STREAM_CAPABLE_STATE=true
116+
AUTO_DETECT_SCHEMA=false
117+

.env.dev

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ SYNC_JOB_MAX_ATTEMPTS=3
2828
SYNC_JOB_MAX_TIMEOUT_DAYS=3
2929
WORKERS_MICRONAUT_ENVIRONMENTS=control-plane
3030
CRON_MICRONAUT_ENVIRONMENTS=control-plane
31+
AUTO_DETECT_SCHEMA=false
3132

3233
# Sentry
3334
SENTRY_DSN=""

airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ public class OrchestratorConstants {
6565
EnvConfigs.STATE_STORAGE_S3_ACCESS_KEY,
6666
EnvConfigs.STATE_STORAGE_S3_SECRET_ACCESS_KEY,
6767
EnvConfigs.STATE_STORAGE_S3_REGION,
68-
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE))
68+
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE,
69+
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA))
6970
.build();
7071

7172
public static final String INIT_FILE_ENV_MAP = "envMap.json";

airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,8 @@ private Map<String, String> getWorkerMetadata() {
212212
WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName,
213213
WorkerEnvConstants.WORKER_JOB_ID, jobId,
214214
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt),
215-
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()));
215+
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()),
216+
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()));
216217
}
217218

218219
}

airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ class AirbyteIntegrationLauncherTest {
5353
WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE,
5454
WorkerEnvConstants.WORKER_JOB_ID, JOB_ID,
5555
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT),
56-
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState()));
56+
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState()),
57+
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(new EnvVariableFeatureFlags().autoDetectSchema()));
5758

5859
private WorkerConfigs workerConfigs;
5960
@Mock

airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
public class EnvVariableFeatureFlags implements FeatureFlags {
1212

1313
public static final String USE_STREAM_CAPABLE_STATE = "USE_STREAM_CAPABLE_STATE";
14+
public static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA";
1415
public static final String LOG_CONNECTOR_MESSAGES = "LOG_CONNECTOR_MESSAGES";
1516
public static final String NEED_STATE_VALIDATION = "NEED_STATE_VALIDATION";
1617

@@ -31,6 +32,11 @@ public boolean useStreamCapableState() {
3132
return getEnvOrDefault(USE_STREAM_CAPABLE_STATE, false, Boolean::parseBoolean);
3233
}
3334

35+
@Override
36+
public boolean autoDetectSchema() {
37+
return getEnvOrDefault(AUTO_DETECT_SCHEMA, false, Boolean::parseBoolean);
38+
}
39+
3440
@Override
3541
public boolean logConnectorMessages() {
3642
return getEnvOrDefault(LOG_CONNECTOR_MESSAGES, false, Boolean::parseBoolean);

airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ public interface FeatureFlags {
1616

1717
boolean useStreamCapableState();
1818

19+
boolean autoDetectSchema();
20+
1921
boolean logConnectorMessages();
2022

2123
boolean needStateValidation();

airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,8 @@ public interface Configs {
687687
*/
688688
int getActivityNumberOfAttempt();
689689

690+
boolean getAutoDetectSchema();
691+
690692
enum TrackingStrategy {
691693
SEGMENT,
692694
LOGGING

airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ public class EnvConfigs implements Configs {
202202
private static final long DEFAULT_MAX_SYNC_WORKERS = 5;
203203
private static final long DEFAULT_MAX_NOTIFY_WORKERS = 5;
204204
private static final String DEFAULT_NETWORK = "host";
205+
private static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA";
205206

206207
public static final Map<String, Function<EnvConfigs, String>> JOB_SHARED_ENVS = Map.of(
207208
AIRBYTE_VERSION, (instance) -> instance.getAirbyteVersion().serialize(),
@@ -1050,6 +1051,11 @@ public int getWorkflowFailureRestartDelaySeconds() {
10501051
return Integer.parseInt(getEnvOrDefault(WORKFLOW_FAILURE_RESTART_DELAY_SECONDS, String.valueOf(10 * 60)));
10511052
}
10521053

1054+
@Override
1055+
public boolean getAutoDetectSchema() {
1056+
return getEnvOrDefault(AUTO_DETECT_SCHEMA, false);
1057+
}
1058+
10531059
@Override
10541060
public int getActivityNumberOfAttempt() {
10551061
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5"));

airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig(
7676
environmentVariables.put(DD_DOGSTATSD_PORT_ENV_VAR, dataDogStatsdPort);
7777
environmentVariables.put(PUBLISH_METRICS_ENV_VAR, shouldPublishMetrics);
7878
environmentVariables.put(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, Boolean.toString(featureFlags.useStreamCapableState()));
79+
environmentVariables.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, Boolean.toString(featureFlags.autoDetectSchema()));
7980
environmentVariables.put(JAVA_OPTS_ENV_VAR, containerOrchestratorJavaOpts);
8081

8182
if (System.getenv(DD_ENV_ENV_VAR) != null) {

0 commit comments

Comments
 (0)