|
8 | 8 | import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
|
9 | 9 | import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
|
10 | 10 | import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
|
| 11 | +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.FAILURE_ORIGINS_KEY; |
11 | 12 | import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
|
12 | 13 | import static io.airbyte.persistence.job.models.AttemptStatus.FAILED;
|
13 | 14 |
|
|
22 | 23 | import io.airbyte.config.Configs.WorkerEnvironment;
|
23 | 24 | import io.airbyte.config.DestinationConnection;
|
24 | 25 | import io.airbyte.config.FailureReason;
|
| 26 | +import io.airbyte.config.FailureReason.FailureOrigin; |
25 | 27 | import io.airbyte.config.JobConfig;
|
26 | 28 | import io.airbyte.config.JobOutput;
|
27 | 29 | import io.airbyte.config.JobSyncConfig;
|
|
56 | 58 | import io.airbyte.workers.run.TemporalWorkerRunFactory;
|
57 | 59 | import io.airbyte.workers.run.WorkerRun;
|
58 | 60 | import io.micronaut.context.annotation.Requires;
|
| 61 | +import io.micronaut.core.util.CollectionUtils; |
59 | 62 | import jakarta.inject.Singleton;
|
60 | 63 | import java.io.IOException;
|
61 | 64 | import java.nio.file.Path;
|
|
67 | 70 | import java.util.OptionalLong;
|
68 | 71 | import java.util.Set;
|
69 | 72 | import java.util.UUID;
|
| 73 | +import java.util.stream.Collectors; |
70 | 74 | import lombok.extern.slf4j.Slf4j;
|
71 | 75 |
|
72 | 76 | @Slf4j
|
@@ -287,12 +291,16 @@ public void jobFailure(final JobFailureInput input) {
|
287 | 291 | @Override
|
288 | 292 | public void attemptFailure(final AttemptFailureInput input) {
|
289 | 293 | try {
|
290 |
| - ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId())); |
291 |
| - |
292 | 294 | final int attemptId = input.getAttemptId();
|
293 | 295 | final long jobId = input.getJobId();
|
294 | 296 | final AttemptFailureSummary failureSummary = input.getAttemptFailureSummary();
|
295 | 297 |
|
| 298 | + ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptId, JOB_ID_KEY, jobId)); |
| 299 | + if (CollectionUtils.isNotEmpty(failureSummary.getFailures())) { |
| 300 | + ApmTraceUtils.addTagsToTrace(Map.of(FAILURE_ORIGINS_KEY, failureSummary.getFailures().stream().map(FailureReason::getFailureOrigin).map( |
| 301 | + FailureOrigin::name).collect(Collectors.joining(",")))); |
| 302 | + } |
| 303 | + |
296 | 304 | jobPersistence.failAttempt(jobId, attemptId);
|
297 | 305 | jobPersistence.writeAttemptFailureSummary(jobId, attemptId, failureSummary);
|
298 | 306 |
|
|
0 commit comments