Skip to content

Add datadog tracking to record schema validation errors #13393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 33 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
29cc06b
Add datadog tracking to record schema validation errors
alovew Jun 1, 2022
08a0e38
add dockerImage
alovew Jun 2, 2022
824bd6e
protect against incomplete docker info
alovew Jun 7, 2022
dfeb991
add dd env vars
alovew Jun 8, 2022
81b0d9a
add new datadog metric reporter class
alovew Jun 9, 2022
e158c7f
pass in new class
alovew Jun 9, 2022
bd7b98b
make param optional
alovew Jun 9, 2022
fe54c4b
check for env vars before creating metrics reporter
alovew Jun 9, 2022
4450038
update name of metric
alovew Jun 9, 2022
ac3f7c6
Use generic metric client
alovew Jun 27, 2022
6f663cf
add metric client env var
alovew Jun 27, 2022
775fa10
initialize metric factory
alovew Jun 28, 2022
3382f69
log metric client
alovew Jun 28, 2022
06ebebe
formatting
alovew Jul 5, 2022
8164867
add test record schema validation error
alovew Jul 21, 2022
30cd31b
formatting
alovew Jul 21, 2022
f3a6a36
fix spotbugs
alovew Jul 22, 2022
3746e1d
remove reinitialization
alovew Jul 22, 2022
d547bf4
fix after rebase
alovew Jul 22, 2022
91f9697
remove another reinitialization
alovew Jul 22, 2022
acd87de
Warn instead of throwing error
alovew Jul 22, 2022
23f1f80
add logging
alovew Jul 26, 2022
90447ac
initialize metric client
alovew Jul 26, 2022
0794344
better logging
alovew Jul 26, 2022
6774d4d
formatting
alovew Jul 26, 2022
fb6a29e
add metric client env var
alovew Jul 27, 2022
13b7118
logging
alovew Jul 27, 2022
915fc4a
update count signature
alovew Aug 4, 2022
4543272
throw error
alovew Aug 4, 2022
2bb66e1
add dd host and port
alovew Aug 5, 2022
647949f
add publish metrics env var
alovew Aug 5, 2022
4206773
remove logging
alovew Aug 5, 2022
0c4c3fc
update errors
alovew Aug 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public class EnvConfigs implements Configs {
private static final String SECRET_PERSISTENCE = "SECRET_PERSISTENCE";
public static final String JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET = "JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET";
public static final String PUBLISH_METRICS = "PUBLISH_METRICS";
public static final String DD_AGENT_HOST = "DD_AGENT_HOST";
public static final String DD_DOGSTATSD_PORT = "DD_DOGSTATSD_PORT";
private static final String CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION = "CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION";
private static final String CONFIGS_DATABASE_INITIALIZATION_TIMEOUT_MS = "CONFIGS_DATABASE_INITIALIZATION_TIMEOUT_MS";
private static final String JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION = "JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION";
Expand All @@ -97,8 +99,6 @@ public class EnvConfigs implements Configs {
private static final String CONTAINER_ORCHESTRATOR_SECRET_NAME = "CONTAINER_ORCHESTRATOR_SECRET_NAME";
private static final String CONTAINER_ORCHESTRATOR_SECRET_MOUNT_PATH = "CONTAINER_ORCHESTRATOR_SECRET_MOUNT_PATH";
private static final String CONTAINER_ORCHESTRATOR_IMAGE = "CONTAINER_ORCHESTRATOR_IMAGE";
private static final String DD_AGENT_HOST = "DD_AGENT_HOST";
private static final String DD_DOGSTATSD_PORT = "DD_DOGSTATSD_PORT";
public static final String DD_CONSTANT_TAGS = "DD_CONSTANT_TAGS";
public static final String STATE_STORAGE_S3_BUCKET_NAME = "STATE_STORAGE_S3_BUCKET_NAME";
public static final String STATE_STORAGE_S3_REGION = "STATE_STORAGE_S3_REGION";
Expand Down Expand Up @@ -132,7 +132,7 @@ public class EnvConfigs implements Configs {
private static final String MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE = "MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE";
private static final String MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE = "MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE";

private static final String METRIC_CLIENT = "METRIC_CLIENT";
public static final String METRIC_CLIENT = "METRIC_CLIENT";
private static final String OTEL_COLLECTOR_ENDPOINT = "OTEL_COLLECTOR_ENDPOINT";

// job-type-specific overrides
Expand Down Expand Up @@ -798,7 +798,7 @@ public String getDDDogStatsDPort() {

@Override
public List<String> getDDConstantTags() {
String tagsString = getEnvOrDefault(DD_CONSTANT_TAGS, "");
final String tagsString = getEnvOrDefault(DD_CONSTANT_TAGS, "");
return Splitter.on(",")
.splitToStream(tagsString)
.filter(s -> !s.trim().isBlank())
Expand Down
1 change: 1 addition & 0 deletions airbyte-container-orchestrator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-scheduler:scheduler-persistence')
implementation project(':airbyte-workers')
implementation project(':airbyte-metrics:metrics-lib')

testImplementation 'org.mockito:mockito-inline:2.13.0'
testImplementation libs.postgresql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
import io.airbyte.config.Configs;
import io.airbyte.config.ReplicationOutput;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricEmittingApps;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.RecordSchemaValidator;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerMetricReporter;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.general.DefaultReplicationWorker;
import io.airbyte.workers.general.ReplicationWorker;
Expand Down Expand Up @@ -96,6 +100,10 @@ public Optional<String> runJob() throws Exception {
featureFlags.useStreamCapableState())
: new DefaultAirbyteSource(workerConfigs, sourceLauncher);

MetricClientFactory.initialize(MetricEmittingApps.WORKER);
final MetricClient metricClient = MetricClientFactory.getMetricClient();
final WorkerMetricReporter metricReporter = new WorkerMetricReporter(metricClient, sourceLauncherConfig.getDockerImage());

log.info("Setting up replication worker...");
final ReplicationWorker replicationWorker = new DefaultReplicationWorker(
jobRunConfig.getJobId(),
Expand All @@ -104,7 +112,8 @@ public Optional<String> runJob() throws Exception {
new NamespacingMapper(syncInput.getNamespaceDefinition(), syncInput.getNamespaceFormat(), syncInput.getPrefix()),
new DefaultAirbyteDestination(workerConfigs, destinationLauncher),
new AirbyteMessageTracker(),
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)));
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)),
metricReporter);

log.info("Running replication worker...");
final Path jobRoot = WorkerUtils.getJobRoot(configs.getWorkspaceRoot(), jobRunConfig.getJobId(), jobRunConfig.getAttemptId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,16 @@ public synchronized static MetricClient getMetricClient() {
*/
public static synchronized void initialize(final MetricEmittingApp metricEmittingApp) {
if (metricClient != null) {
throw new RuntimeException("You cannot initialize configuration more than once.");
LOGGER.warn("Metric client is already initialized to " + configs.getMetricClient());
return;
}

if (DATADOG_METRIC_CLIENT.equals(configs.getMetricClient())) {
initializeDatadogMetricClient(metricEmittingApp);
if (configs.getDDAgentHost() == null || configs.getDDDogStatsDPort() == null) {
throw new RuntimeException("DD_AGENT_HOST is null or DD_DOGSTATSD_PORT is null. Both are required to use the DataDog Metric Client");
} else {
initializeDatadogMetricClient(metricEmittingApp);
}
} else if (OTEL_METRIC_CLIENT.equals(configs.getMetricClient())) {
initializeOpenTelemetryMetricClient(metricEmittingApp);
} else {
Expand Down Expand Up @@ -90,7 +95,7 @@ public String host() {
* Returning null for default get function because the host has been overridden above.
*/
@Override
public String get(String key) {
public String get(final String key) {
return null;
}

Expand All @@ -105,7 +110,7 @@ public String get(String key) {
public static MeterRegistry getMeterRegistry() {

if (DATADOG_METRIC_CLIENT.equals(configs.getMetricClient())) {
StatsdConfig config = getDatadogStatsDConfig();
final StatsdConfig config = getDatadogStatsDConfig();
return new StatsdMeterRegistry(config, Clock.SYSTEM);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,18 @@ public enum OssMetricsRegistry implements MetricsRegistry {
OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(MetricEmittingApps.METRICS_REPORTER,
"overall_job_runtime_in_last_hour_by_terminal_state_secs",
"overall job runtime - scheduling and execution for all attempts - for jobs that reach terminal states in the last hour. tagged by terminal states."),

TEMPORAL_WORKFLOW_ATTEMPT(MetricEmittingApps.WORKER,
"temporal_workflow_attempt",
"count of the number of workflow attempts"),

TEMPORAL_WORKFLOW_SUCCESS(MetricEmittingApps.WORKER,
"temporal_workflow_success",
"count of the number of successful workflow syncs."),

TEMPORAL_WORKFLOW_FAILURE(MetricEmittingApps.WORKER,
"temporal_workflow_failure",
"count of the number of workflow failures");
"count of the number of workflow failures"),
NUM_SOURCE_STREAMS_WITH_RECORD_SCHEMA_VALIDATION_ERRORS(MetricEmittingApps.WORKER,
"record_schema_validation_error",
"number of record schema validation errors");

private final MetricEmittingApp application;
private final String metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ void testMetricClientFactoryCreateSuccess() {
});
}

@Test
@DisplayName("Should throw error if MetricClientFactory create a metric client multiple times;")
void testMetricClientFactoryCreateMultipleTimesThrows() {
Assertions.assertThrows(RuntimeException.class, () -> {
MetricClientFactory.initialize(MetricEmittingApps.METRICS_REPORTER);
MetricClientFactory.initialize(MetricEmittingApps.METRICS_REPORTER);
});
}

@Test
@DisplayName("Should not return null if metric client not specified;")
void testMicroMeterRegistryRuturnsNullForEmptyClientConfig() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers;

import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.OssMetricsRegistry;

public class WorkerMetricReporter {

private final String dockerRepo;
private final String dockerVersion;
private final MetricClient metricClient;

public WorkerMetricReporter(final MetricClient metricClient, final String dockerImage) {
final String[] dockerImageInfo = dockerImage.split(":");
this.dockerRepo = dockerImageInfo[0];
this.dockerVersion = dockerImageInfo.length > 1 ? dockerImageInfo[1] : "";
this.metricClient = metricClient;
}

public void trackSchemaValidationError(final String stream) {
metricClient.count(OssMetricsRegistry.NUM_SOURCE_STREAMS_WITH_RECORD_SCHEMA_VALIDATION_ERRORS, 1, new MetricAttribute("docker_repo", dockerRepo),
new MetricAttribute("docker_version", dockerVersion), new MetricAttribute("stream", stream));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,16 @@ public class DefaultReplicationWorker implements ReplicationWorker {
private final AtomicBoolean cancelled;
private final AtomicBoolean hasFailed;
private final RecordSchemaValidator recordSchemaValidator;
private final WorkerMetricReporter metricReporter;

public DefaultReplicationWorker(final String jobId,
final int attempt,
final AirbyteSource source,
final AirbyteMapper mapper,
final AirbyteDestination destination,
final MessageTracker messageTracker,
final RecordSchemaValidator recordSchemaValidator) {
final RecordSchemaValidator recordSchemaValidator,
final WorkerMetricReporter metricReporter) {
this.jobId = jobId;
this.attempt = attempt;
this.source = source;
Expand All @@ -94,6 +96,7 @@ public DefaultReplicationWorker(final String jobId,
this.messageTracker = messageTracker;
this.executors = Executors.newFixedThreadPool(2);
this.recordSchemaValidator = recordSchemaValidator;
this.metricReporter = metricReporter;

this.cancelled = new AtomicBoolean(false);
this.hasFailed = new AtomicBoolean(false);
Expand All @@ -112,7 +115,7 @@ public DefaultReplicationWorker(final String jobId,
* @throws WorkerException
*/
@Override
public ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRoot) throws WorkerException {
public final ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRoot) throws WorkerException {
LOGGER.info("start sync worker. job id: {} attempt id: {}", jobId, attempt);

// todo (cgardens) - this should not be happening in the worker. this is configuration information
Expand Down Expand Up @@ -154,7 +157,7 @@ public ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRo
});

final CompletableFuture<?> replicationThreadFuture = CompletableFuture.runAsync(
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator),
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter),
executors).whenComplete((msg, ex) -> {
if (ex != null) {
if (ex.getCause() instanceof SourceException) {
Expand Down Expand Up @@ -293,7 +296,8 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
final AirbyteMapper mapper,
final MessageTracker messageTracker,
final Map<String, String> mdc,
final RecordSchemaValidator recordSchemaValidator) {
final RecordSchemaValidator recordSchemaValidator,
final WorkerMetricReporter metricReporter) {
return () -> {
MDC.setContextMap(mdc);
LOGGER.info("Replication thread started.");
Expand Down Expand Up @@ -337,6 +341,7 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
if (!validationErrors.isEmpty()) {
validationErrors.forEach((stream, errorPair) -> {
LOGGER.warn("Schema validation errors found for stream {}. Error messages: {}", stream, errorPair.getLeft());
metricReporter.trackSchemaValidationError(stream);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.workers.WorkerApp;
Expand Down Expand Up @@ -282,6 +283,11 @@ public void create(final Map<String, String> allLabels,

}

final EnvConfigs envConfigs = new EnvConfigs();
envVars.add(new EnvVar(EnvConfigs.METRIC_CLIENT, envConfigs.getMetricClient(), null));
envVars.add(new EnvVar(EnvConfigs.DD_AGENT_HOST, envConfigs.getDDAgentHost(), null));
envVars.add(new EnvVar(EnvConfigs.DD_DOGSTATSD_PORT, envConfigs.getDDDogStatsDPort(), null));
envVars.add(new EnvVar(EnvConfigs.PUBLISH_METRICS, Boolean.toString(envConfigs.getPublishMetrics()), null));
envVars.add(new EnvVar(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, Boolean.toString(useStreamCapableState), null));
final List<ContainerPort> containerPorts = KubePodProcess.createContainerPortList(portMap);
containerPorts.add(new ContainerPort(WorkerApp.KUBE_HEARTBEAT_PORT, null, null, null, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public LauncherWorker(final UUID connectionId,
final ResourceRequirements resourceRequirements,
final Class<OUTPUT> outputClass,
final Supplier<ActivityExecutionContext> activityContext) {

this.connectionId = connectionId;
this.application = application;
this.podNamePrefix = podNamePrefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public class OrchestratorConstants {
EnvConfigs.JOB_MAIN_CONTAINER_MEMORY_LIMIT,
EnvConfigs.JOB_DEFAULT_ENV_MAP,
EnvConfigs.LOCAL_ROOT,
EnvConfigs.PUBLISH_METRICS,
EnvConfigs.DD_AGENT_HOST,
EnvConfigs.DD_DOGSTATSD_PORT,
EnvConfigs.METRIC_CLIENT,
LOG_LEVEL,
LogClientSingleton.GCS_LOG_BUCKET,
LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricEmittingApps;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.scheduler.persistence.JobPersistence;
Expand All @@ -26,6 +29,7 @@
import io.airbyte.workers.WorkerApp.ContainerOrchestratorConfig;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerMetricReporter;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.general.DefaultReplicationWorker;
import io.airbyte.workers.internal.AirbyteMessageTracker;
Expand Down Expand Up @@ -210,6 +214,9 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equals(sourceLauncherConfig.getDockerImage())
? new EmptyAirbyteSource(useStreamCapableState)
: new DefaultAirbyteSource(workerConfigs, sourceLauncher);
MetricClientFactory.initialize(MetricEmittingApps.WORKER);
final MetricClient metricClient = MetricClientFactory.getMetricClient();
final WorkerMetricReporter metricReporter = new WorkerMetricReporter(metricClient, sourceLauncherConfig.getDockerImage());

return new DefaultReplicationWorker(
jobRunConfig.getJobId(),
Expand All @@ -218,7 +225,8 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
new NamespacingMapper(syncInput.getNamespaceDefinition(), syncInput.getNamespaceFormat(), syncInput.getPrefix()),
new DefaultAirbyteDestination(workerConfigs, destinationLauncher),
new AirbyteMessageTracker(),
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)));
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)),
metricReporter);
};
}

Expand Down
Loading