Skip to content

Commit 7340a4a

Browse files
committed
move weird migrators to new framework
1 parent da79f6e commit 7340a4a

File tree

13 files changed

+87
-184
lines changed

13 files changed

+87
-184
lines changed

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
2424
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
2525
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
26-
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator;
2726
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
2827
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteRecordMessage;
2928
import io.airbyte.commons.exceptions.ConnectionErrorException;
@@ -34,9 +33,10 @@
3433
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
3534
import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations;
3635
import io.airbyte.integrations.base.destination.typing_deduping.NoopTyperDeduper;
37-
import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator;
3836
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
37+
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
3938
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
39+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
4040
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
4141
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
4242
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
@@ -54,7 +54,8 @@
5454
import org.slf4j.Logger;
5555
import org.slf4j.LoggerFactory;
5656

57-
public abstract class AbstractJdbcDestination extends JdbcConnector implements Destination {
57+
public abstract class AbstractJdbcDestination<DestinationState extends MinimumDestinationState>
58+
extends JdbcConnector implements Destination {
5859

5960
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcDestination.class);
6061

@@ -254,10 +255,20 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map<Stri
254255

255256
protected abstract JdbcSqlGenerator getSqlGenerator();
256257

257-
protected abstract JdbcDestinationHandler<? extends MinimumDestinationState> getDestinationHandler(final String databaseName,
258+
protected abstract JdbcDestinationHandler<DestinationState> getDestinationHandler(final String databaseName,
258259
final JdbcDatabase database,
259260
final String rawTableSchema);
260261

262+
/**
263+
* Provide any migrations that the destination needs to run. Most destinations will need to provide an instande of
264+
* {@link io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator} at minimum.
265+
*/
266+
protected abstract List<Migration<DestinationState>> getMigrations(
267+
final JdbcDatabase database,
268+
final String databaseName,
269+
final SqlGenerator sqlGenerator,
270+
final DestinationHandler<DestinationState> destinationHandler);
271+
261272
/**
262273
* "database" key at root of the config json, for any other variants in config, override this
263274
* method.
@@ -319,17 +330,15 @@ private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAi
319330
.orElse(new CatalogParser(sqlGenerator))
320331
.parseCatalog(catalog);
321332
final String databaseName = getDatabaseName(config);
322-
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
323-
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
324-
final DestinationHandler<? extends MinimumDestinationState> destinationHandler =
333+
final DestinationHandler<DestinationState> destinationHandler =
325334
getDestinationHandler(databaseName, database, rawNamespaceOverride.orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
335+
final List<Migration<DestinationState>> migrations = getMigrations(database, databaseName, sqlGenerator, destinationHandler);
326336
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
327337
final TyperDeduper typerDeduper;
328338
if (disableTypeDedupe) {
329-
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
339+
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrations);
330340
} else {
331-
typerDeduper =
332-
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
341+
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrations);
333342
}
334343
return typerDeduper;
335344
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcV1V2Migrator.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
import io.airbyte.commons.exceptions.SQLRuntimeException;
1111
import io.airbyte.integrations.base.destination.typing_deduping.BaseDestinationV1V2Migrator;
1212
import io.airbyte.integrations.base.destination.typing_deduping.NamespacedTableName;
13+
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
1314
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
15+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
1416
import java.sql.ResultSet;
1517
import java.sql.SQLException;
1618
import java.util.Collection;
@@ -21,13 +23,18 @@
2123
* Largely based on
2224
* {@link io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV1V2Migrator}.
2325
*/
24-
public class JdbcV1V2Migrator extends BaseDestinationV1V2Migrator<TableDefinition> {
26+
public abstract class JdbcV1V2Migrator<DestinationState extends MinimumDestinationState>
27+
extends BaseDestinationV1V2Migrator<TableDefinition, DestinationState> {
2528

2629
private final NamingConventionTransformer namingConventionTransformer;
2730
private final JdbcDatabase database;
2831
private final String databaseName;
2932

30-
public JdbcV1V2Migrator(final NamingConventionTransformer namingConventionTransformer, final JdbcDatabase database, final String databaseName) {
33+
public JdbcV1V2Migrator(final NamingConventionTransformer namingConventionTransformer,
34+
final SqlGenerator sqlGenerator,
35+
final JdbcDatabase database,
36+
final String databaseName) {
37+
super(sqlGenerator);
3138
this.namingConventionTransformer = namingConventionTransformer;
3239
this.database = database;
3340
this.databaseName = databaseName;
@@ -72,5 +79,4 @@ protected NamespacedTableName convertToV1RawName(final StreamConfig streamConfig
7279
this.namingConventionTransformer.getIdentifier(streamConfig.id().originalNamespace()),
7380
tableName);
7481
}
75-
7682
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestinationTest.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,13 @@
1616
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
1717
import io.airbyte.commons.exceptions.ConfigErrorException;
1818
import io.airbyte.commons.json.Jsons;
19+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
20+
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
21+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
22+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
23+
import java.util.Collections;
1924
import java.util.HashMap;
25+
import java.util.List;
2026
import java.util.Map;
2127
import org.junit.jupiter.api.Test;
2228

@@ -112,7 +118,7 @@ void testInvalidExtraParam() {
112118
() -> new TestJdbcDestination().getConnectionProperties(buildConfigWithExtraJdbcParameters(extraParam)));
113119
}
114120

115-
static class TestJdbcDestination extends AbstractJdbcDestination {
121+
static class TestJdbcDestination extends AbstractJdbcDestination<MinimumDestinationState.Impl> {
116122

117123
private final Map<String, String> defaultProperties;
118124

@@ -137,15 +143,22 @@ public JsonNode toJdbcConfig(final JsonNode config) {
137143

138144
@Override
139145
protected JdbcSqlGenerator getSqlGenerator() {
140-
// TODO do we need to populate this?
141146
return null;
142147
}
143148

144149
@Override
145-
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database, String rawTableSchema) {
150+
protected JdbcDestinationHandler<MinimumDestinationState.Impl> getDestinationHandler(String databaseName, JdbcDatabase database, String rawTableSchema) {
146151
return null;
147152
}
148153

154+
@Override
155+
protected List<Migration<MinimumDestinationState.Impl>> getMigrations(JdbcDatabase database,
156+
String databaseName,
157+
SqlGenerator sqlGenerator,
158+
DestinationHandler<MinimumDestinationState.Impl> destinationHandler) {
159+
return Collections.emptyList();
160+
}
161+
149162
}
150163

151164
}

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.java

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,53 @@
77
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS;
88
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES;
99

10+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
11+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
1012
import io.airbyte.protocol.models.v0.DestinationSyncMode;
1113
import java.util.Collection;
1214
import java.util.Optional;
15+
import org.jetbrains.annotations.NotNull;
1316
import org.slf4j.Logger;
1417
import org.slf4j.LoggerFactory;
1518

16-
public abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> implements DestinationV1V2Migrator {
19+
public abstract class BaseDestinationV1V2Migrator<DialectTableDefinition, DestinationState extends MinimumDestinationState>
20+
implements Migration<DestinationState> {
1721

1822
protected static final Logger LOGGER = LoggerFactory.getLogger(BaseDestinationV1V2Migrator.class);
1923

24+
private final SqlGenerator sqlGenerator;
25+
26+
/**
27+
* Should never be called. Exists so that we can mock this object, because Mockito
28+
* requires a no-args constructor.
29+
*/
30+
protected BaseDestinationV1V2Migrator() {
31+
this(null);
32+
}
33+
34+
protected BaseDestinationV1V2Migrator(SqlGenerator sqlGenerator) {
35+
this.sqlGenerator = sqlGenerator;
36+
}
37+
38+
protected abstract DestinationState setV1V2MigrationDone(DestinationState state);
39+
40+
@NotNull
2041
@Override
21-
public void migrateIfNecessary(
22-
final SqlGenerator sqlGenerator,
23-
final DestinationHandler<?> destinationHandler,
24-
final StreamConfig streamConfig)
25-
throws Exception {
42+
public MigrationResult<DestinationState> migrateIfNecessary(
43+
@NotNull DestinationHandler<DestinationState> destinationHandler,
44+
@NotNull StreamConfig streamConfig,
45+
@NotNull DestinationInitialState<DestinationState> state) {
2646
LOGGER.info("Assessing whether migration is necessary for stream {}", streamConfig.id().finalName());
2747
if (shouldMigrate(streamConfig)) {
2848
LOGGER.info("Starting v2 Migration for stream {}", streamConfig.id().finalName());
2949
migrate(sqlGenerator, destinationHandler, streamConfig);
3050
LOGGER.info("V2 Migration completed successfully for stream {}", streamConfig.id().finalName());
51+
final DestinationState updatedState = setV1V2MigrationDone(state.destinationState());
52+
// The v2 raw table now exists. We should refetch the initial state.
53+
return new MigrationResult<>(updatedState, true);
3154
} else {
3255
LOGGER.info("No Migration Required for stream: {}", streamConfig.id().finalName());
56+
return new MigrationResult<>(state.destinationState(), false);
3357
}
3458

3559
}
@@ -40,12 +64,18 @@ public void migrateIfNecessary(
4064
* @param streamConfig the stream in question
4165
* @return whether to migrate the stream
4266
*/
43-
protected boolean shouldMigrate(final StreamConfig streamConfig) throws Exception {
67+
protected boolean shouldMigrate(final StreamConfig streamConfig) {
4468
final var v1RawTable = convertToV1RawName(streamConfig);
4569
LOGGER.info("Checking whether v1 raw table {} in dataset {} exists", v1RawTable.tableName(), v1RawTable.namespace());
4670
final var syncModeNeedsMigration = isMigrationRequiredForSyncMode(streamConfig.destinationSyncMode());
47-
final var noValidV2RawTableExists = !doesValidV2RawTableAlreadyExist(streamConfig);
48-
final var aValidV1RawTableExists = doesValidV1RawTableExist(v1RawTable.namespace(), v1RawTable.tableName());
71+
final boolean noValidV2RawTableExists;
72+
final boolean aValidV1RawTableExists;
73+
try {
74+
noValidV2RawTableExists = !doesValidV2RawTableAlreadyExist(streamConfig);
75+
aValidV1RawTableExists = doesValidV1RawTableExist(v1RawTable.namespace(), v1RawTable.tableName());
76+
} catch (Exception e) {
77+
throw new RuntimeException(e);
78+
}
4979
LOGGER.info("Migration Info: Required for Sync mode: {}, No existing v2 raw tables: {}, A v1 raw table exists: {}",
5080
syncModeNeedsMigration, noValidV2RawTableExists, aValidV1RawTableExists);
5181
return syncModeNeedsMigration && noValidV2RawTableExists && aValidV1RawTableExists;

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,6 @@ public class DefaultTyperDeduper<DestinationState extends MinimumDestinationStat
6161

6262
private final SqlGenerator sqlGenerator;
6363
private final DestinationHandler<DestinationState> destinationHandler;
64-
65-
private final DestinationV1V2Migrator v1V2Migrator;
66-
private final V2TableMigrator v2TableMigrator;
6764
private final List<Migration<DestinationState>> migrations;
6865
private final ParsedCatalog parsedCatalog;
6966
private Set<StreamId> overwriteStreamsWithTmpTable;
@@ -86,14 +83,10 @@ public class DefaultTyperDeduper<DestinationState extends MinimumDestinationStat
8683
public DefaultTyperDeduper(final SqlGenerator sqlGenerator,
8784
final DestinationHandler<DestinationState> destinationHandler,
8885
final ParsedCatalog parsedCatalog,
89-
final DestinationV1V2Migrator v1V2Migrator,
90-
final V2TableMigrator v2TableMigrator,
9186
final List<Migration<DestinationState>> migrations) {
9287
this.sqlGenerator = sqlGenerator;
9388
this.destinationHandler = destinationHandler;
9489
this.parsedCatalog = parsedCatalog;
95-
this.v1V2Migrator = v1V2Migrator;
96-
this.v2TableMigrator = v2TableMigrator;
9790
this.migrations = migrations;
9891
this.initialRawTableStateByStream = new ConcurrentHashMap<>();
9992
this.streamsWithSuccessfulSetup = ConcurrentHashMap.newKeySet(parsedCatalog.streams().size());
@@ -103,30 +96,13 @@ public DefaultTyperDeduper(final SqlGenerator sqlGenerator,
10396
new BasicThreadFactory.Builder().namingPattern(TYPE_AND_DEDUPE_THREAD_NAME).build());
10497
}
10598

106-
public DefaultTyperDeduper(
107-
final SqlGenerator sqlGenerator,
108-
final DestinationHandler<DestinationState> destinationHandler,
109-
final ParsedCatalog parsedCatalog,
110-
final DestinationV1V2Migrator v1V2Migrator,
111-
final List<Migration<DestinationState>> migrations) {
112-
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator(), migrations);
113-
}
114-
11599
@Override
116100
public void prepareSchemasAndRunMigrations() throws Exception {
117101
// Technically kind of weird to call this here, but it's the best place we have.
118102
// Ideally, we'd create just airbyte_internal here, and defer creating the final table schemas
119103
// until prepareFinalTables... but it doesn't really matter.
120104
TyperDeduperUtil.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog);
121105

122-
TyperDeduperUtil.executeWeirdMigrations(
123-
executorService,
124-
sqlGenerator,
125-
destinationHandler,
126-
v1V2Migrator,
127-
v2TableMigrator,
128-
parsedCatalog);
129-
130106
destinationInitialStatuses = TyperDeduperUtil.executeRawTableMigrations(
131107
executorService,
132108
destinationHandler,

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2Migrator.java

Lines changed: 0 additions & 25 deletions
This file was deleted.

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpDestinationV1V2Migrator.java

Lines changed: 0 additions & 17 deletions
This file was deleted.

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
@Slf4j
2929
public class NoOpTyperDeduperWithV1V2Migrations<DestinationState extends MinimumDestinationState> implements TyperDeduper {
3030

31-
private final DestinationV1V2Migrator v1V2Migrator;
32-
private final V2TableMigrator v2TableMigrator;
3331
private final List<Migration<DestinationState>> migrations;
3432
private final ExecutorService executorService;
3533
private final ParsedCatalog parsedCatalog;
@@ -39,14 +37,10 @@ public class NoOpTyperDeduperWithV1V2Migrations<DestinationState extends Minimum
3937
public NoOpTyperDeduperWithV1V2Migrations(final SqlGenerator sqlGenerator,
4038
final DestinationHandler<DestinationState> destinationHandler,
4139
final ParsedCatalog parsedCatalog,
42-
final DestinationV1V2Migrator v1V2Migrator,
43-
final V2TableMigrator v2TableMigrator,
4440
final List<Migration<DestinationState>> migrations) {
4541
this.sqlGenerator = sqlGenerator;
4642
this.destinationHandler = destinationHandler;
4743
this.parsedCatalog = parsedCatalog;
48-
this.v1V2Migrator = v1V2Migrator;
49-
this.v2TableMigrator = v2TableMigrator;
5044
this.migrations = migrations;
5145
this.executorService = Executors.newFixedThreadPool(getCountOfTypeAndDedupeThreads(),
5246
new BasicThreadFactory.Builder().namingPattern(TYPE_AND_DEDUPE_THREAD_NAME).build());
@@ -56,14 +50,6 @@ public NoOpTyperDeduperWithV1V2Migrations(final SqlGenerator sqlGenerator,
5650
public void prepareSchemasAndRunMigrations() throws Exception {
5751
TyperDeduperUtil.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog);
5852

59-
TyperDeduperUtil.executeWeirdMigrations(
60-
executorService,
61-
sqlGenerator,
62-
destinationHandler,
63-
v1V2Migrator,
64-
v2TableMigrator,
65-
parsedCatalog);
66-
6753
List<DestinationInitialStatus<DestinationState>> destinationInitialStatuses = TyperDeduperUtil.executeRawTableMigrations(
6854
executorService,
6955
destinationHandler,

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopV2TableMigrator.java

Lines changed: 0 additions & 14 deletions
This file was deleted.

0 commit comments

Comments
 (0)