Skip to content

Commit c6a8f55

Browse files
committed
Add datadog tracking to record schema validation errors
1 parent 7b33619 commit c6a8f55

File tree

4 files changed

+37
-6
lines changed

4 files changed

+37
-6
lines changed

airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ public Optional<String> runJob() throws Exception {
9797
new NamespacingMapper(syncInput.getNamespaceDefinition(), syncInput.getNamespaceFormat(), syncInput.getPrefix()),
9898
new DefaultAirbyteDestination(workerConfigs, destinationLauncher),
9999
new AirbyteMessageTracker(),
100-
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)));
100+
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)),
101+
configs);
101102

102103
log.info("Running replication worker...");
103104
final Path jobRoot = WorkerUtils.getJobRoot(configs.getWorkspaceRoot(), jobRunConfig.getJobId(), jobRunConfig.getAttemptId());

airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,10 @@ public enum OssMetricsRegistry implements MetricsRegistry {
9191
"oldest running job in seconds"),
9292
OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(MetricEmittingApps.METRICS_REPORTER,
9393
"overall_job_runtime_in_last_hour_by_terminal_state_secs",
94-
"overall job runtime - scheduling and execution for all attempts - for jobs that reach terminal states in the last hour. tagged by terminal states.");
94+
"overall job runtime - scheduling and execution for all attempts - for jobs that reach terminal states in the last hour. tagged by terminal states."),
95+
NUM_RECORD_SCHEMA_VALIDATION_ERRORS(MetricEmittingApps.WORKER,
96+
"record_schema_validation_error",
97+
"number of record schema validation errors");
9598

9699
private final MetricEmittingApp application;
97100
private final String metricName;

airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.workers.general;
66

7+
import io.airbyte.config.Configs;
78
import io.airbyte.config.FailureReason;
89
import io.airbyte.config.ReplicationAttemptSummary;
910
import io.airbyte.config.ReplicationOutput;
@@ -14,6 +15,10 @@
1415
import io.airbyte.config.SyncStats;
1516
import io.airbyte.config.WorkerDestinationConfig;
1617
import io.airbyte.config.WorkerSourceConfig;
18+
import io.airbyte.metrics.lib.DatadogClientConfiguration;
19+
import io.airbyte.metrics.lib.DogStatsDMetricSingleton;
20+
import io.airbyte.metrics.lib.MetricEmittingApps;
21+
import io.airbyte.metrics.lib.OssMetricsRegistry;
1722
import io.airbyte.protocol.models.AirbyteMessage;
1823
import io.airbyte.protocol.models.AirbyteRecordMessage;
1924
import io.airbyte.workers.*;
@@ -77,14 +82,16 @@ public class DefaultReplicationWorker implements ReplicationWorker {
7782
private final AtomicBoolean cancelled;
7883
private final AtomicBoolean hasFailed;
7984
private final RecordSchemaValidator recordSchemaValidator;
85+
private final Configs configs;
8086

8187
public DefaultReplicationWorker(final String jobId,
8288
final int attempt,
8389
final AirbyteSource source,
8490
final AirbyteMapper mapper,
8591
final AirbyteDestination destination,
8692
final MessageTracker messageTracker,
87-
final RecordSchemaValidator recordSchemaValidator) {
93+
final RecordSchemaValidator recordSchemaValidator,
94+
final Configs configs) {
8895
this.jobId = jobId;
8996
this.attempt = attempt;
9097
this.source = source;
@@ -93,11 +100,22 @@ public DefaultReplicationWorker(final String jobId,
93100
this.messageTracker = messageTracker;
94101
this.executors = Executors.newFixedThreadPool(2);
95102
this.recordSchemaValidator = recordSchemaValidator;
103+
this.configs = configs;
96104

97105
this.cancelled = new AtomicBoolean(false);
98106
this.hasFailed = new AtomicBoolean(false);
99107
}
100108

109+
public DefaultReplicationWorker(final String jobId,
110+
final int attempt,
111+
final AirbyteSource source,
112+
final AirbyteMapper mapper,
113+
final AirbyteDestination destination,
114+
final MessageTracker messageTracker,
115+
final RecordSchemaValidator recordSchemaValidator) {
116+
this(jobId, attempt, source, mapper, destination, messageTracker, recordSchemaValidator, null);
117+
}
118+
101119
/**
102120
* Run executes two threads. The first pipes data from STDOUT of the source to STDIN of the
103121
* destination. The second listen on STDOUT of the destination. The goal of this second thread is to
@@ -111,7 +129,7 @@ public DefaultReplicationWorker(final String jobId,
111129
* @throws WorkerException
112130
*/
113131
@Override
114-
public ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRoot) throws WorkerException {
132+
public final ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRoot) throws WorkerException {
115133
LOGGER.info("start sync worker. job id: {} attempt id: {}", jobId, attempt);
116134

117135
// todo (cgardens) - this should not be happening in the worker. this is configuration information
@@ -153,7 +171,7 @@ public ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRo
153171
});
154172

155173
final CompletableFuture<?> replicationThreadFuture = CompletableFuture.runAsync(
156-
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator),
174+
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, configs),
157175
executors).whenComplete((msg, ex) -> {
158176
if (ex != null) {
159177
if (ex.getCause() instanceof SourceException) {
@@ -292,7 +310,8 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
292310
final AirbyteMapper mapper,
293311
final MessageTracker messageTracker,
294312
final Map<String, String> mdc,
295-
final RecordSchemaValidator recordSchemaValidator) {
313+
final RecordSchemaValidator recordSchemaValidator,
314+
final Configs configs) {
296315
return () -> {
297316
MDC.setContextMap(mdc);
298317
LOGGER.info("Replication thread started.");
@@ -354,8 +373,15 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
354373
}
355374
LOGGER.info("Total records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted()));
356375
if (!validationErrors.isEmpty()) {
376+
DogStatsDMetricSingleton.initialize(MetricEmittingApps.WORKER, new DatadogClientConfiguration(configs));
357377
validationErrors.forEach((stream, errorPair) -> {
358378
LOGGER.warn("Schema validation errors found for stream {}. Error messages: {}", stream, errorPair.getLeft());
379+
final String[] validationErrorMetadata = {
380+
"docker_repo:airbyte/test", // dockerImage.split(":")[0]
381+
"docker_version:0.0.0", // dockerImage.split(":")[1]
382+
"stream:" + stream
383+
};
384+
DogStatsDMetricSingleton.count(OssMetricsRegistry.NUM_RECORD_SCHEMA_VALIDATION_ERRORS, 1, validationErrorMetadata);
359385
});
360386
}
361387

airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/OrchestratorConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class OrchestratorConstants {
4141
EnvConfigs.JOB_MAIN_CONTAINER_MEMORY_LIMIT,
4242
EnvConfigs.JOB_DEFAULT_ENV_MAP,
4343
EnvConfigs.LOCAL_ROOT,
44+
EnvConfigs.PUBLISH_METRICS,
4445
LOG_LEVEL,
4546
LogClientSingleton.GCS_LOG_BUCKET,
4647
LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS,

0 commit comments

Comments
 (0)