Skip to content

Commit e5c3f4b

Browse files
authored
Remove unused job persistence methods. (#18952)
Closes #12823. Also remove unused dump methods.
1 parent 612b3f1 commit e5c3f4b

File tree

5 files changed

+0
-185
lines changed

5 files changed

+0
-185
lines changed

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,6 @@ public class DefaultJobPersistence implements JobPersistence {
9191
private final int JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS;
9292

9393
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultJobPersistence.class);
94-
private static final Set<String> SYSTEM_SCHEMA = Set
95-
.of("pg_toast", "information_schema", "pg_catalog", "import_backup", "pg_internal",
96-
"catalog_history");
9794
public static final String ATTEMPT_NUMBER = "attempt_number";
9895
private static final String JOB_ID = "job_id";
9996
private static final String WHERE = "WHERE ";
@@ -797,18 +794,6 @@ public void setSecretMigrationDone() throws IOException {
797794
setMetadata(SECRET_MIGRATION_STATUS, "true");
798795
}
799796

800-
private final String SCHEDULER_MIGRATION_STATUS = "schedulerMigration";
801-
802-
@Override
803-
public boolean isSchedulerMigrated() throws IOException {
804-
return getMetadata(SCHEDULER_MIGRATION_STATUS).count() == 1;
805-
}
806-
807-
@Override
808-
public void setSchedulerMigrationDone() throws IOException {
809-
setMetadata(SCHEDULER_MIGRATION_STATUS, "true");
810-
}
811-
812797
@Override
813798
public Optional<String> getVersion() throws IOException {
814799
return getMetadata(AirbyteVersion.AIRBYTE_VERSION_KEY_NAME).findFirst();
@@ -915,27 +900,6 @@ public Map<JobsDatabaseSchema, Stream<JsonNode>> exportDatabase() throws IOExcep
915900
return exportDatabase(DEFAULT_SCHEMA);
916901
}
917902

918-
/**
919-
* This is different from {@link #exportDatabase()} cause it exports all the tables in all the
920-
* schemas available
921-
*/
922-
@Override
923-
public Map<String, Stream<JsonNode>> dump() throws IOException {
924-
final Map<String, Stream<JsonNode>> result = new HashMap<>();
925-
for (final String schema : listSchemas()) {
926-
final List<String> tables = listAllTables(schema);
927-
928-
for (final String table : tables) {
929-
if (result.containsKey(table)) {
930-
throw new RuntimeException("Multiple tables found with the same name " + table);
931-
}
932-
result.put(table.toUpperCase(), exportTable(schema, table));
933-
}
934-
}
935-
936-
return result;
937-
}
938-
939903
private Map<JobsDatabaseSchema, Stream<JsonNode>> exportDatabase(final String schema) throws IOException {
940904
final List<String> tables = listTables(schema);
941905
final Map<JobsDatabaseSchema, Stream<JsonNode>> result = new HashMap<>();
@@ -982,25 +946,6 @@ public void purgeJobHistory(final LocalDateTime asOfDate) {
982946
}
983947
}
984948

985-
private List<String> listAllTables(final String schema) throws IOException {
986-
if (schema != null) {
987-
return jobDatabase.query(context -> context.meta().getSchemas(schema).stream()
988-
.flatMap(s -> context.meta(s).getTables().stream())
989-
.map(Named::getName)
990-
.collect(Collectors.toList()));
991-
} else {
992-
return List.of();
993-
}
994-
}
995-
996-
private List<String> listSchemas() throws IOException {
997-
return jobDatabase.query(context -> context.meta().getSchemas().stream()
998-
.map(Named::getName)
999-
.filter(c -> !SYSTEM_SCHEMA.contains(c))
1000-
.collect(Collectors.toList()));
1001-
1002-
}
1003-
1004949
private Stream<JsonNode> exportTable(final String schema, final String tableName) throws IOException {
1005950
final Table<Record> tableSql = getTable(schema, tableName);
1006951
try (final Stream<Record> records = jobDatabase.query(ctx -> ctx.select(DSL.asterisk()).from(tableSql).fetchStream())) {

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,6 @@ List<JobWithStatusAndTimestamp> listJobStatusAndTimestampWithConnection(UUID con
280280
*/
281281
Map<JobsDatabaseSchema, Stream<JsonNode>> exportDatabase() throws IOException;
282282

283-
Map<String, Stream<JsonNode>> dump() throws IOException;
284-
285283
/**
286284
* Import all SQL tables from streams of JsonNode objects.
287285
*
@@ -306,22 +304,6 @@ List<JobWithStatusAndTimestamp> listJobStatusAndTimestampWithConnection(UUID con
306304
*/
307305
void setSecretMigrationDone() throws IOException;
308306

309-
/**
310-
* Check if the scheduler has been migrated to temporal.
311-
*
312-
* TODO (https://github.com/airbytehq/airbyte/issues/12823): remove this method after the next
313-
* "major" version bump as it will no longer be needed.
314-
*/
315-
boolean isSchedulerMigrated() throws IOException;
316-
317-
/**
318-
* Set that the scheduler migration has been performed.
319-
*
320-
* TODO (https://github.com/airbytehq/airbyte/issues/12823): remove this method after the next
321-
* "major" version bump as it will no longer be needed.
322-
*/
323-
void setSchedulerMigrationDone() throws IOException;
324-
325307
List<AttemptNormalizationStatus> getAttemptNormalizationStatusesForJob(final Long jobId) throws IOException;
326308

327309
}

airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -566,15 +566,6 @@ void testSecretMigrationMetadata() throws IOException {
566566
assertTrue(isMigrated);
567567
}
568568

569-
@Test
570-
void testSchedulerMigrationMetadata() throws IOException {
571-
boolean isMigrated = jobPersistence.isSchedulerMigrated();
572-
assertFalse(isMigrated);
573-
jobPersistence.setSchedulerMigrationDone();
574-
isMigrated = jobPersistence.isSchedulerMigrated();
575-
assertTrue(isMigrated);
576-
}
577-
578569
@Test
579570
void testAirbyteProtocolVersionMaxMetadata() throws IOException {
580571
assertTrue(jobPersistence.getAirbyteProtocolVersionMax().isEmpty());

airbyte-server/src/main/java/io/airbyte/server/ServerApp.java

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package io.airbyte.server;
66

7-
import com.google.common.annotations.VisibleForTesting;
87
import io.airbyte.analytics.Deployment;
98
import io.airbyte.analytics.TrackingClient;
109
import io.airbyte.analytics.TrackingClientSingleton;
@@ -18,10 +17,7 @@
1817
import io.airbyte.commons.version.AirbyteVersion;
1918
import io.airbyte.config.Configs;
2019
import io.airbyte.config.EnvConfigs;
21-
import io.airbyte.config.StandardSync;
22-
import io.airbyte.config.StandardSync.Status;
2320
import io.airbyte.config.helpers.LogClientSingleton;
24-
import io.airbyte.config.persistence.ConfigNotFoundException;
2521
import io.airbyte.config.persistence.ConfigRepository;
2622
import io.airbyte.config.persistence.SecretsRepositoryReader;
2723
import io.airbyte.config.persistence.SecretsRepositoryWriter;
@@ -70,16 +66,12 @@
7066
import io.airbyte.server.scheduler.EventRunner;
7167
import io.airbyte.server.scheduler.TemporalEventRunner;
7268
import io.airbyte.validation.json.JsonSchemaValidator;
73-
import io.airbyte.validation.json.JsonValidationException;
7469
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
7570
import io.temporal.serviceclient.WorkflowServiceStubs;
76-
import java.io.IOException;
7771
import java.net.http.HttpClient;
7872
import java.util.Map;
7973
import java.util.Optional;
8074
import java.util.Set;
81-
import java.util.UUID;
82-
import java.util.stream.Collectors;
8375
import javax.sql.DataSource;
8476
import org.eclipse.jetty.server.Server;
8577
import org.eclipse.jetty.servlet.ServletContextHandler;
@@ -256,14 +248,6 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
256248
final HttpClient httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build();
257249
final EventRunner eventRunner = new TemporalEventRunner(temporalClient);
258250

259-
// It is important that the migration to the temporal scheduler is performed before the server
260-
// accepts any requests.
261-
// This is why this migration is performed here instead of in the bootloader - so that the server
262-
// blocks on this.
263-
// TODO (https://github.com/airbytehq/airbyte/issues/12823): remove this method after the next
264-
// "major" version bump as it will no longer be needed.
265-
migrateExistingConnectionsToTemporalScheduler(configRepository, jobPersistence, eventRunner);
266-
267251
final WorkspaceHelper workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence);
268252

269253
final JsonSchemaValidator schemaValidator = new JsonSchemaValidator();
@@ -370,28 +354,6 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
370354
workspacesHandler);
371355
}
372356

373-
@VisibleForTesting
374-
static void migrateExistingConnectionsToTemporalScheduler(final ConfigRepository configRepository,
375-
final JobPersistence jobPersistence,
376-
final EventRunner eventRunner)
377-
throws JsonValidationException, ConfigNotFoundException, IOException {
378-
// Skip the migration if it was already performed, to save on resources/startup time
379-
if (jobPersistence.isSchedulerMigrated()) {
380-
LOGGER.info("Migration to temporal scheduler has already been performed");
381-
return;
382-
}
383-
384-
LOGGER.info("Start migration to the new scheduler...");
385-
final Set<UUID> connectionIds =
386-
configRepository.listStandardSyncs().stream()
387-
.filter(standardSync -> standardSync.getStatus() == Status.ACTIVE || standardSync.getStatus() == Status.INACTIVE)
388-
.map(StandardSync::getConnectionId)
389-
.collect(Collectors.toSet());
390-
eventRunner.migrateSyncIfNeeded(connectionIds);
391-
jobPersistence.setSchedulerMigrationDone();
392-
LOGGER.info("Done migrating to the new scheduler...");
393-
}
394-
395357
public static void main(final String[] args) {
396358
try {
397359
final Configs configs = new EnvConfigs();

airbyte-server/src/test/java/io/airbyte/server/ServerAppTest.java

Lines changed: 0 additions & 65 deletions
This file was deleted.

0 commit comments

Comments
 (0)