Skip to content

Commit c8a7cb3

Browse files
authored
Handle null JobSyncConfig (#18969)
* Handle null JobSyncConfig * Add unit test * Fix PMD warning * Do not update jobs in terminal state * Fix failing tests * Fix compile error
1 parent 6e94249 commit c8a7cb3

File tree

6 files changed

+53
-21
lines changed

6 files changed

+53
-21
lines changed

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,10 @@ public void failJob(final long jobId) throws IOException {
222222

223223
private void updateJobStatus(final DSLContext ctx, final long jobId, final JobStatus newStatus, final LocalDateTime now) {
224224
final Job job = getJob(ctx, jobId);
225+
if (job.isJobInTerminalState()) {
226+
// If the job is already terminal, no need to set a new status
227+
return;
228+
}
225229
job.validateStatusTransition(newStatus);
226230
ctx.execute(
227231
"UPDATE jobs SET status = CAST(? as JOB_STATUS), updated_at = ? WHERE id = ?",

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/errorreporter/JobErrorReportingClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public interface JobErrorReportingClient {
1919
*/
2020
void reportJobFailureReason(@Nullable StandardWorkspace workspace,
2121
final FailureReason reason,
22-
final String dockerImage,
22+
@Nullable final String dockerImage,
2323
Map<String, String> metadata);
2424

2525
}

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/errorreporter/SentryJobErrorReportingClient.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,22 @@ static IHub createSentryHubWithDSN(final String sentryDSN) {
6161
@Override
6262
public void reportJobFailureReason(@Nullable final StandardWorkspace workspace,
6363
final FailureReason failureReason,
64-
final String dockerImage,
64+
@Nullable final String dockerImage,
6565
final Map<String, String> metadata) {
6666
final SentryEvent event = new SentryEvent();
6767

68-
// Remove invalid characters from the release name, use @ so sentry knows how to grab the tag
69-
// e.g. airbyte/source-xyz:1.2.0 -> [email protected]
70-
// More info at https://docs.sentry.io/product/cli/releases/#creating-releases
71-
final String release = dockerImage.replace("/", "-").replace(":", "@");
72-
event.setRelease(release);
73-
74-
// enhance event fingerprint to ensure separate grouping per connector
75-
final String[] releaseParts = release.split("@");
76-
if (releaseParts.length > 0) {
77-
event.setFingerprints(List.of("{{ default }}", releaseParts[0]));
68+
if (dockerImage != null) {
69+
// Remove invalid characters from the release name, use @ so sentry knows how to grab the tag
70+
// e.g. airbyte/source-xyz:1.2.0 -> [email protected]
71+
// More info at https://docs.sentry.io/product/cli/releases/#creating-releases
72+
final String release = dockerImage.replace("/", "-").replace(":", "@");
73+
event.setRelease(release);
74+
75+
// enhance event fingerprint to ensure separate grouping per connector
76+
final String[] releaseParts = release.split("@");
77+
if (releaseParts.length > 0) {
78+
event.setFingerprints(List.of("{{ default }}", releaseParts[0]));
79+
}
7880
}
7981

8082
// set workspace as the user in sentry to get impact and priority

airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS;
99
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS;
1010
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.SYNC_STATS;
11+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
1112
import static org.junit.jupiter.api.Assertions.assertEquals;
1213
import static org.junit.jupiter.api.Assertions.assertFalse;
1314
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -713,13 +714,13 @@ void testCancelJob() throws IOException {
713714
}
714715

715716
@Test
716-
@DisplayName("Should raise an exception if job is already succeeded")
717+
@DisplayName("Should not raise an exception if job is already succeeded")
717718
void testCancelJobAlreadySuccessful() throws IOException {
718719
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
719720
final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);
720721
jobPersistence.succeedAttempt(jobId, attemptNumber);
721722

722-
assertThrows(IllegalStateException.class, () -> jobPersistence.cancelJob(jobId));
723+
assertDoesNotThrow(() -> jobPersistence.cancelJob(jobId));
723724

724725
final Job updated = jobPersistence.getJob(jobId);
725726
assertEquals(JobStatus.SUCCEEDED, updated.getStatus());
@@ -867,13 +868,13 @@ void failJob() throws IOException {
867868
}
868869

869870
@Test
870-
@DisplayName("Should raise an exception if job is already succeeded")
871+
@DisplayName("Should not raise an exception if job is already succeeded")
871872
void testFailJobAlreadySucceeded() throws IOException {
872873
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
873874
final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);
874875
jobPersistence.succeedAttempt(jobId, attemptNumber);
875876

876-
assertThrows(IllegalStateException.class, () -> jobPersistence.failJob(jobId));
877+
assertDoesNotThrow(() -> jobPersistence.failJob(jobId));
877878

878879
final Job updated = jobPersistence.getJob(jobId);
879880
assertEquals(JobStatus.SUCCEEDED, updated.getStatus());
@@ -1653,7 +1654,7 @@ void testResetJobCancelled() throws IOException {
16531654
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
16541655

16551656
jobPersistence.cancelJob(jobId);
1656-
assertThrows(IllegalStateException.class, () -> jobPersistence.resetJob(jobId));
1657+
assertDoesNotThrow(() -> jobPersistence.resetJob(jobId));
16571658

16581659
final Job updated = jobPersistence.getJob(jobId);
16591660
assertEquals(JobStatus.CANCELLED, updated.getStatus());

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,8 +269,9 @@ public void jobFailure(final JobFailureInput input) {
269269
final UUID connectionId = UUID.fromString(job.getScope());
270270
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionId, JOB_ID_KEY, jobId));
271271
final JobSyncConfig jobSyncConfig = job.getConfig().getSync();
272-
final SyncJobReportingContext jobContext =
273-
new SyncJobReportingContext(jobId, jobSyncConfig.getSourceDockerImage(), jobSyncConfig.getDestinationDockerImage());
272+
final String sourceDockerImage = jobSyncConfig != null ? jobSyncConfig.getSourceDockerImage() : null;
273+
final String destinationDockerImage = jobSyncConfig != null ? jobSyncConfig.getDestinationDockerImage() : null;
274+
final SyncJobReportingContext jobContext = new SyncJobReportingContext(jobId, sourceDockerImage, destinationDockerImage);
274275
job.getLastFailedAttempt().flatMap(Attempt::getFailureSummary)
275276
.ifPresent(failureSummary -> jobErrorReporter.reportSyncJobFailure(connectionId, failureSummary, jobContext));
276277
trackCompletion(job, JobStatus.FAILED);

airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
@SuppressWarnings("PMD.JUnitTestsShouldIncludeAssert")
8383
class JobCreationAndStatusUpdateActivityTest {
8484

85+
public static final String REASON = "reason";
8586
@Mock
8687
private SyncJobFactory mJobFactory;
8788

@@ -393,10 +394,10 @@ void setJobFailure() throws IOException {
393394
Mockito.when(mJobPersistence.getJob(JOB_ID))
394395
.thenReturn(mJob);
395396

396-
jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, 1, CONNECTION_ID, "reason"));
397+
jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, 1, CONNECTION_ID, REASON));
397398

398399
verify(mJobPersistence).failJob(JOB_ID);
399-
verify(mJobNotifier).failJob(eq("reason"), Mockito.any());
400+
verify(mJobNotifier).failJob(eq(REASON), Mockito.any());
400401
verify(mJobErrorReporter).reportSyncJobFailure(eq(CONNECTION_ID), eq(failureSummary), Mockito.any());
401402
}
402403

@@ -414,6 +415,29 @@ void setJobFailureWrapException() throws IOException {
414415
verify(mJobtracker, times(1)).trackSyncForInternalFailure(JOB_ID, CONNECTION_ID, ATTEMPT_NUMBER, JobState.FAILED, exception);
415416
}
416417

418+
@Test
419+
void setJobFailureWithNullJobSyncConfig() throws IOException {
420+
final Attempt mAttempt = Mockito.mock(Attempt.class);
421+
Mockito.when(mAttempt.getFailureSummary()).thenReturn(Optional.of(failureSummary));
422+
423+
final JobConfig mJobConfig = Mockito.mock(JobConfig.class);
424+
Mockito.when(mJobConfig.getSync()).thenReturn(null);
425+
426+
final Job mJob = Mockito.mock(Job.class);
427+
Mockito.when(mJob.getScope()).thenReturn(CONNECTION_ID.toString());
428+
Mockito.when(mJob.getConfig()).thenReturn(mJobConfig);
429+
Mockito.when(mJob.getLastFailedAttempt()).thenReturn(Optional.of(mAttempt));
430+
431+
Mockito.when(mJobPersistence.getJob(JOB_ID))
432+
.thenReturn(mJob);
433+
434+
jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, 1, CONNECTION_ID, REASON));
435+
436+
verify(mJobPersistence).failJob(JOB_ID);
437+
verify(mJobNotifier).failJob(eq(REASON), Mockito.any());
438+
verify(mJobErrorReporter).reportSyncJobFailure(eq(CONNECTION_ID), eq(failureSummary), Mockito.any());
439+
}
440+
417441
@Test
418442
void setAttemptFailure() throws IOException {
419443
jobCreationAndStatusUpdateActivity

0 commit comments

Comments
 (0)