|
8 | 8 | import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
|
9 | 9 | import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
|
10 | 10 | import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
|
11 |
| -import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.FAILURE_ORIGINS_KEY; |
12 | 11 | import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
|
13 | 12 | import static io.airbyte.persistence.job.models.AttemptStatus.FAILED;
|
14 | 13 |
|
|
23 | 22 | import io.airbyte.config.Configs.WorkerEnvironment;
|
24 | 23 | import io.airbyte.config.DestinationConnection;
|
25 | 24 | import io.airbyte.config.FailureReason;
|
26 |
| -import io.airbyte.config.FailureReason.FailureOrigin; |
27 | 25 | import io.airbyte.config.JobConfig;
|
28 | 26 | import io.airbyte.config.JobOutput;
|
29 | 27 | import io.airbyte.config.JobSyncConfig;
|
|
58 | 56 | import io.airbyte.workers.run.TemporalWorkerRunFactory;
|
59 | 57 | import io.airbyte.workers.run.WorkerRun;
|
60 | 58 | import io.micronaut.context.annotation.Requires;
|
61 |
| -import io.micronaut.core.util.CollectionUtils; |
62 | 59 | import jakarta.inject.Singleton;
|
63 | 60 | import java.io.IOException;
|
64 | 61 | import java.nio.file.Path;
|
|
70 | 67 | import java.util.OptionalLong;
|
71 | 68 | import java.util.Set;
|
72 | 69 | import java.util.UUID;
|
73 |
| -import java.util.stream.Collectors; |
74 | 70 | import lombok.extern.slf4j.Slf4j;
|
75 | 71 |
|
76 | 72 | @Slf4j
|
@@ -291,16 +287,12 @@ public void jobFailure(final JobFailureInput input) {
|
291 | 287 | @Override
|
292 | 288 | public void attemptFailure(final AttemptFailureInput input) {
|
293 | 289 | try {
|
| 290 | + ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId())); |
| 291 | + |
294 | 292 | final int attemptId = input.getAttemptId();
|
295 | 293 | final long jobId = input.getJobId();
|
296 | 294 | final AttemptFailureSummary failureSummary = input.getAttemptFailureSummary();
|
297 | 295 |
|
298 |
| - ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptId, JOB_ID_KEY, jobId)); |
299 |
| - if (CollectionUtils.isNotEmpty(failureSummary.getFailures())) { |
300 |
| - ApmTraceUtils.addTagsToTrace(Map.of(FAILURE_ORIGINS_KEY, failureSummary.getFailures().stream().map(FailureReason::getFailureOrigin).map( |
301 |
| - FailureOrigin::name).collect(Collectors.joining(",")))); |
302 |
| - } |
303 |
| - |
304 | 296 | jobPersistence.failAttempt(jobId, attemptId);
|
305 | 297 | jobPersistence.writeAttemptFailureSummary(jobId, attemptId, failureSummary);
|
306 | 298 |
|
|
0 commit comments