Skip to content

Commit ccfbcc6

Browse files
committed
refactor: remove unused methods in JobPersistence (#16958)
1 parent ecef0c0 commit ccfbcc6

File tree

3 files changed

+3
-71
lines changed

3 files changed

+3
-71
lines changed

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -638,17 +638,6 @@ public Optional<Long> enqueueJob(final String scope, final JobConfig jobConfig,
638638
.map(r -> r.getValue("id", Long.class));
639639
}
640640

641-
// TODO: This is unused outside of test. Need to remove it.
642-
@Override
643-
public void resetJob(final long jobId) throws IOException {
644-
// TODO: stop using LocalDateTime
645-
// https://github.com/airbytehq/airbyte-platform-internal/issues/10815
646-
jobDatabase.query(ctx -> {
647-
updateJobStatus(ctx, jobId, JobStatus.PENDING);
648-
return null;
649-
});
650-
}
651-
652641
@Override
653642
public void cancelJob(final long jobId) throws IOException {
654643
// TODO: stop using LocalDateTime
@@ -960,7 +949,7 @@ public Long getJobCount(final Set<ConfigType> configTypes,
960949
.fetchOne().into(Long.class));
961950
}
962951

963-
public Result<Record> listJobsQuery(final Set<ConfigType> configTypes, final String configId, final int pagesize, String orderByString)
952+
private Result<Record> listJobsQuery(final Set<ConfigType> configTypes, final String configId, final int pagesize, String orderByString)
964953
throws IOException {
965954
return jobDatabase.query(ctx -> {
966955
final String jobsSubquery = "(" + ctx.select(DSL.asterisk()).from(JOBS)
@@ -1395,19 +1384,6 @@ public Optional<Job> getFirstReplicationJob(final UUID connectionId) throws IOEx
13951384
.flatMap(r -> getJobOptional(ctx, r.get("id", Long.class))));
13961385
}
13971386

1398-
@VisibleForTesting
1399-
List<AttemptWithJobInfo> listAttemptsWithJobInfo(final ConfigType configType, final Instant attemptEndedAtTimestamp, final int limit)
1400-
throws IOException {
1401-
// TODO: stop using LocalDateTime
1402-
// https://github.com/airbytehq/airbyte-platform-internal/issues/10815
1403-
final LocalDateTime timeConvertedIntoLocalDateTime = convertInstantToLocalDataTime(attemptEndedAtTimestamp);
1404-
return jobDatabase.query(ctx -> getAttemptsWithJobsFromResult(ctx.fetch(
1405-
BASE_JOB_SELECT_AND_JOIN + WHERE + "CAST(config_type AS VARCHAR) = ? AND " + " attempts.ended_at > ? ORDER BY attempts.ended_at ASC LIMIT ?",
1406-
toSqlName(configType),
1407-
timeConvertedIntoLocalDateTime,
1408-
limit)));
1409-
}
1410-
14111387
@Override
14121388
public Optional<String> getVersion() throws IOException {
14131389
return getMetadata(AirbyteVersion.AIRBYTE_VERSION_KEY_NAME).findFirst();

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,15 +89,6 @@ public interface JobPersistence {
8989
*/
9090
Optional<Long> enqueueJob(String scope, JobConfig jobConfig, boolean isScheduled) throws IOException;
9191

92-
/**
93-
* Set job status from current status to PENDING. Throws {@link IllegalStateException} if the job is
94-
* in a terminal state.
95-
*
96-
* @param jobId job to reset
97-
* @throws IOException exception due to interaction with persistence
98-
*/
99-
void resetJob(long jobId) throws IOException;
100-
10192
//
10293
// JOB LIFECYCLE
10394
//

airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1671,9 +1671,8 @@ void testGetFirstSyncJobForConnectionIdEmpty() throws IOException {
16711671
void testGetFirstSyncJobForConnectionId() throws IOException {
16721672
final long jobId1 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG, true).orElseThrow();
16731673
jobPersistence.succeedAttempt(jobId1, jobPersistence.createAttempt(jobId1, LOG_PATH));
1674-
final List<AttemptWithJobInfo> attemptsWithJobInfo =
1675-
jobPersistence.listAttemptsWithJobInfo(SYNC_JOB_CONFIG.getConfigType(), Instant.EPOCH, 1000);
1676-
final List<Attempt> attempts = Collections.singletonList(attemptsWithJobInfo.get(0).attempt);
1674+
final List<Attempt> attemptsWithJobInfo = jobPersistence.getJob(jobId1).attempts;
1675+
final List<Attempt> attempts = Collections.singletonList(attemptsWithJobInfo.get(0));
16771676

16781677
final Instant afterNow = NOW.plusSeconds(1000);
16791678
when(timeSupplier.get()).thenReturn(afterNow);
@@ -2261,38 +2260,4 @@ void testListJobsWithStatusesAndConfigTypesForConnection() throws IOException, I
22612260

22622261
}
22632262

2264-
@Nested
2265-
@DisplayName("When resetting job")
2266-
class ResetJob {
2267-
2268-
@Test
2269-
@DisplayName("Should reset job and put job in pending state")
2270-
void testResetJob() throws IOException {
2271-
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG, true).orElseThrow();
2272-
final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);
2273-
final Job created = jobPersistence.getJob(jobId);
2274-
2275-
jobPersistence.failAttempt(jobId, attemptNumber);
2276-
when(timeSupplier.get()).thenReturn(Instant.ofEpochMilli(4242));
2277-
jobPersistence.resetJob(jobId);
2278-
2279-
final Job updated = jobPersistence.getJob(jobId);
2280-
assertEquals(JobStatus.PENDING, updated.status);
2281-
assertNotEquals(created.updatedAtInSecond, updated.updatedAtInSecond);
2282-
}
2283-
2284-
@Test
2285-
@DisplayName("Should not be able to reset a cancelled job")
2286-
void testResetJobCancelled() throws IOException {
2287-
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG, true).orElseThrow();
2288-
2289-
jobPersistence.cancelJob(jobId);
2290-
assertDoesNotThrow(() -> jobPersistence.resetJob(jobId));
2291-
2292-
final Job updated = jobPersistence.getJob(jobId);
2293-
assertEquals(JobStatus.CANCELLED, updated.status);
2294-
}
2295-
2296-
}
2297-
22982263
}

0 commit comments

Comments
 (0)