Skip to content

Commit 2423c7d

Browse files
authored
Add failure origins to APM trace (#19665)
1 parent 373ba6a commit 2423c7d

File tree

7 files changed

+73
-16
lines changed

7 files changed

+73
-16
lines changed

airbyte-commons-worker/src/main/java/io/airbyte/workers/helper/FailureHelper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ public static FailureReason dbtFailure(final Throwable t, final Long jobId, fina
173173

174174
public static FailureReason unknownOriginFailure(final Throwable t, final Long jobId, final Integer attemptNumber) {
175175
return genericFailure(t, jobId, attemptNumber)
176+
.withFailureOrigin(FailureOrigin.UNKNOWN)
176177
.withExternalMessage("An unknown failure occurred");
177178
}
178179

airbyte-commons-worker/src/test/java/io/airbyte/workers/helper/FailureHelperTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,4 +179,14 @@ void testOrderedFailures() throws Exception {
179179
assertEquals(failureReasonList.get(0), TRACE_FAILURE_REASON);
180180
}
181181

182+
@Test
183+
void testUnknownOriginFailure() {
184+
final Throwable t = new RuntimeException();
185+
final Long jobId = 12345L;
186+
final Integer attemptNumber = 1;
187+
final FailureReason failureReason = FailureHelper.unknownOriginFailure(t, jobId, attemptNumber);
188+
assertEquals(FailureOrigin.UNKNOWN, failureReason.getFailureOrigin());
189+
assertEquals("An unknown failure occurred", failureReason.getExternalMessage());
190+
}
191+
182192
}

airbyte-config/config-models/src/main/resources/types/FailureReason.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ properties:
1818
- normalization
1919
- dbt
2020
- airbyte_platform
21+
- unknown
2122
failureType:
2223
description: Categorizes well known errors into types for programmatic handling. If not set, the type of error is not well known.
2324
type: string

airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceConstants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ public static final class Tags {
6363
*/
6464
public static final String DOCKER_IMAGE_KEY = "docker_image";
6565

66+
/**
67+
* Name of the APM trace tag that holds the failure origin(s) associated with the trace.
68+
*/
69+
public static final String FAILURE_ORIGINS_KEY = "failure_origins";
70+
6671
/**
6772
* Name of the APM trace tag that holds the job ID value associated with the trace.
6873
*/

airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public static String getReleaseStage(final ReleaseStage stage) {
2929
}
3030

3131
public static String getFailureOrigin(final FailureOrigin origin) {
32-
return origin != null ? origin.value() : UNKNOWN;
32+
return origin != null ? origin.value() : FailureOrigin.UNKNOWN.value();
3333
}
3434

3535
public static String getJobStatus(final JobStatus status) {

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
99
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
1010
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
11+
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.FAILURE_ORIGINS_KEY;
1112
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
1213
import static io.airbyte.persistence.job.models.AttemptStatus.FAILED;
1314

@@ -22,6 +23,7 @@
2223
import io.airbyte.config.Configs.WorkerEnvironment;
2324
import io.airbyte.config.DestinationConnection;
2425
import io.airbyte.config.FailureReason;
26+
import io.airbyte.config.FailureReason.FailureOrigin;
2527
import io.airbyte.config.JobConfig;
2628
import io.airbyte.config.JobOutput;
2729
import io.airbyte.config.JobSyncConfig;
@@ -56,6 +58,7 @@
5658
import io.airbyte.workers.run.TemporalWorkerRunFactory;
5759
import io.airbyte.workers.run.WorkerRun;
5860
import io.micronaut.context.annotation.Requires;
61+
import io.micronaut.core.util.CollectionUtils;
5962
import jakarta.inject.Singleton;
6063
import java.io.IOException;
6164
import java.nio.file.Path;
@@ -67,6 +70,7 @@
6770
import java.util.OptionalLong;
6871
import java.util.Set;
6972
import java.util.UUID;
73+
import java.util.stream.Collectors;
7074
import lombok.extern.slf4j.Slf4j;
7175

7276
@Slf4j
@@ -179,9 +183,8 @@ private void emitSrcIdDstIdToReleaseStagesMetric(final UUID srcId, final UUID ds
179183
@Override
180184
public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input) throws RetryableException {
181185
try {
182-
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, input.getJobId()));
183-
184186
final long jobId = input.getJobId();
187+
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId));
185188
final Job createdJob = jobPersistence.getJob(jobId);
186189

187190
final WorkerRun workerRun = temporalWorkerRunFactory.create(createdJob);
@@ -200,9 +203,8 @@ public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input)
200203
@Override
201204
public AttemptNumberCreationOutput createNewAttemptNumber(final AttemptCreationInput input) throws RetryableException {
202205
try {
203-
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, input.getJobId()));
204-
205206
final long jobId = input.getJobId();
207+
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId));
206208
final Job createdJob = jobPersistence.getJob(jobId);
207209

208210
final WorkerRun workerRun = temporalWorkerRunFactory.create(createdJob);
@@ -221,10 +223,9 @@ public AttemptNumberCreationOutput createNewAttemptNumber(final AttemptCreationI
221223
@Override
222224
public void jobSuccess(final JobSuccessInput input) {
223225
try {
224-
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId()));
225-
226226
final long jobId = input.getJobId();
227227
final int attemptId = input.getAttemptId();
228+
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptId, JOB_ID_KEY, jobId));
228229

229230
if (input.getStandardSyncOutput() != null) {
230231
final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput());
@@ -287,12 +288,13 @@ public void jobFailure(final JobFailureInput input) {
287288
@Override
288289
public void attemptFailure(final AttemptFailureInput input) {
289290
try {
290-
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId()));
291-
292291
final int attemptId = input.getAttemptId();
293292
final long jobId = input.getJobId();
294293
final AttemptFailureSummary failureSummary = input.getAttemptFailureSummary();
295294

295+
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptId, JOB_ID_KEY, jobId));
296+
traceFailures(failureSummary);
297+
296298
jobPersistence.failAttempt(jobId, attemptId);
297299
jobPersistence.writeAttemptFailureSummary(jobId, attemptId, failureSummary);
298300

@@ -302,11 +304,7 @@ public void attemptFailure(final AttemptFailureInput input) {
302304
}
303305

304306
emitJobIdToReleaseStagesMetric(OssMetricsRegistry.ATTEMPT_FAILED_BY_RELEASE_STAGE, jobId);
305-
for (final FailureReason reason : failureSummary.getFailures()) {
306-
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1,
307-
new MetricAttribute(MetricTags.FAILURE_ORIGIN, MetricTags.getFailureOrigin(reason.getFailureOrigin())));
308-
}
309-
307+
trackFailures(failureSummary);
310308
} catch (final IOException e) {
311309
throw new RetryableException(e);
312310
}
@@ -329,10 +327,9 @@ public void attemptFailureWithAttemptNumber(final AttemptNumberFailureInput inpu
329327
@Override
330328
public void jobCancelled(final JobCancelledInput input) {
331329
try {
332-
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId()));
333-
334330
final long jobId = input.getJobId();
335331
final int attemptId = input.getAttemptId();
332+
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptId, JOB_ID_KEY, jobId));
336333
jobPersistence.failAttempt(jobId, attemptId);
337334
jobPersistence.writeAttemptFailureSummary(jobId, attemptId, input.getAttemptFailureSummary());
338335
jobPersistence.cancelJob(jobId);
@@ -487,4 +484,37 @@ private void trackCompletionForInternalFailure(final Long jobId,
487484
jobTracker.trackSyncForInternalFailure(jobId, connectionId, attemptId, Enums.convertTo(status, JobState.class), e);
488485
}
489486

487+
/**
488+
* Adds the failure origins to the APM trace.
489+
*
490+
* @param failureSummary The {@link AttemptFailureSummary} containing the failure reason(s).
491+
*/
492+
private void traceFailures(final AttemptFailureSummary failureSummary) {
493+
if (failureSummary != null) {
494+
if (CollectionUtils.isNotEmpty(failureSummary.getFailures())) {
495+
ApmTraceUtils.addTagsToTrace(Map.of(FAILURE_ORIGINS_KEY, failureSummary.getFailures().stream().map(FailureReason::getFailureOrigin).map(
496+
FailureOrigin::name).collect(Collectors.joining(","))));
497+
}
498+
} else {
499+
ApmTraceUtils.addTagsToTrace(Map.of(FAILURE_ORIGINS_KEY, FailureOrigin.UNKNOWN.value()));
500+
}
501+
}
502+
503+
/**
504+
* Records a metric for each failure reason.
505+
*
506+
* @param failureSummary The {@link AttemptFailureSummary} containing the failure reason(s).
507+
*/
508+
private void trackFailures(final AttemptFailureSummary failureSummary) {
509+
if (failureSummary != null) {
510+
for (final FailureReason reason : failureSummary.getFailures()) {
511+
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1,
512+
new MetricAttribute(MetricTags.FAILURE_ORIGIN, MetricTags.getFailureOrigin(reason.getFailureOrigin())));
513+
}
514+
} else {
515+
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1,
516+
new MetricAttribute(MetricTags.FAILURE_ORIGIN, FailureOrigin.UNKNOWN.value()));
517+
}
518+
}
519+
490520
}

airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,16 @@ void setAttemptFailure() throws IOException {
448448
verify(mJobPersistence).writeAttemptFailureSummary(JOB_ID, ATTEMPT_ID, failureSummary);
449449
}
450450

451+
@Test
452+
void setAttemptFailureManuallyTerminated() throws IOException {
453+
jobCreationAndStatusUpdateActivity
454+
.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID, CONNECTION_ID, standardSyncOutput, null));
455+
456+
verify(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_ID);
457+
verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput);
458+
verify(mJobPersistence).writeAttemptFailureSummary(JOB_ID, ATTEMPT_ID, null);
459+
}
460+
451461
@Test
452462
void setAttemptFailureWrapException() throws IOException {
453463
final Exception exception = new IOException(TEST_EXCEPTION_MESSAGE);

0 commit comments

Comments
 (0)