Skip to content

Commit e21ec5a

Browse files
authored
Add attempt status by release stage metrics. (#10659)
Add, - attempt_created_by_release_stage - attempt_failed_by_release_stage - attempt_succeeded_by_release_stage
1 parent 29095c6 commit e21ec5a

File tree

2 files changed

+24
-6
lines changed

2 files changed

+24
-6
lines changed

airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsRegistry.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,18 @@
3030
*/
3131
public enum MetricsRegistry {
3232

33+
ATTEMPT_CREATED_BY_RELEASE_STAGE(
34+
MetricEmittingApps.WORKER,
35+
"attempt_created_by_release_stage",
36+
"increments when a new attempt is created. attempts are double counted as this is tagged by release stage."),
37+
ATTEMPT_FAILED_BY_RELEASE_STAGE(
38+
MetricEmittingApps.WORKER,
39+
"attempt_failed_by_release_stage",
40+
"increments when an attempt fails. attempts are double counted as this is tagged by release stage."),
41+
ATTEMPT_SUCCEEDED_BY_RELEASE_STAGE(
42+
MetricEmittingApps.WORKER,
43+
"attempt_succeeded_by_release_stage",
44+
"increments when an attempts succeeds. attempts are double counted as this is tagged by release stage."),
3345
JOB_CANCELLED_BY_RELEASE_STAGE(
3446
MetricEmittingApps.WORKER,
3547
"job_cancelled_by_release_stage",

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,15 @@ private void emitSrcIdDstIdToReleaseStagesMetric(final UUID srcId, final UUID ds
109109
@Override
110110
public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input) throws RetryableException {
111111
try {
112-
final Job createdJob = jobPersistence.getJob(input.getJobId());
112+
final long jobId = input.getJobId();
113+
final Job createdJob = jobPersistence.getJob(jobId);
113114

114115
final WorkerRun workerRun = temporalWorkerRunFactory.create(createdJob);
115116
final Path logFilePath = workerRun.getJobRoot().resolve(LogClientSingleton.LOG_FILENAME);
116-
final int persistedAttemptId = jobPersistence.createAttempt(input.getJobId(), logFilePath);
117+
final int persistedAttemptId = jobPersistence.createAttempt(jobId, logFilePath);
118+
emitJobIdToReleaseStagesMetric(MetricsRegistry.ATTEMPT_CREATED_BY_RELEASE_STAGE, jobId);
117119

118120
LogClientSingleton.getInstance().setJobMdc(workerEnvironment, logConfigs, workerRun.getJobRoot());
119-
120121
return new AttemptCreationOutput(persistedAttemptId);
121122
} catch (final IOException e) {
122123
throw new RetryableException(e);
@@ -136,6 +137,7 @@ public void jobSuccess(final JobSuccessInput input) {
136137
log.warn("The job {} doesn't have any output for the attempt {}", jobId, attemptId);
137138
}
138139
jobPersistence.succeedAttempt(jobId, attemptId);
140+
emitJobIdToReleaseStagesMetric(MetricsRegistry.ATTEMPT_SUCCEEDED_BY_RELEASE_STAGE, jobId);
139141
final Job job = jobPersistence.getJob(jobId);
140142

141143
jobNotifier.successJob(job);
@@ -164,14 +166,18 @@ public void jobFailure(final JobFailureInput input) {
164166
@Override
165167
public void attemptFailure(final AttemptFailureInput input) {
166168
try {
167-
jobPersistence.failAttempt(input.getJobId(), input.getAttemptId());
168-
jobPersistence.writeAttemptFailureSummary(input.getJobId(), input.getAttemptId(), input.getAttemptFailureSummary());
169+
final int attemptId = input.getAttemptId();
170+
final long jobId = input.getJobId();
171+
172+
jobPersistence.failAttempt(jobId, attemptId);
173+
jobPersistence.writeAttemptFailureSummary(jobId, attemptId, input.getAttemptFailureSummary());
169174

170175
if (input.getStandardSyncOutput() != null) {
171176
final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput());
172-
jobPersistence.writeOutput(input.getJobId(), input.getAttemptId(), jobOutput);
177+
jobPersistence.writeOutput(jobId, attemptId, jobOutput);
173178
}
174179

180+
emitJobIdToReleaseStagesMetric(MetricsRegistry.ATTEMPT_FAILED_BY_RELEASE_STAGE, jobId);
175181
} catch (final IOException e) {
176182
throw new RetryableException(e);
177183
}

0 commit comments

Comments
 (0)