Skip to content

Commit 501b2d2

Browse files
committed
maybe move everything to new classes?
1 parent 045fec6 commit 501b2d2

File tree

13 files changed

+50
-181
lines changed

13 files changed

+50
-181
lines changed

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
2424
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
2525
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
26-
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator;
2726
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
2827
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteRecordMessage;
2928
import io.airbyte.commons.exceptions.ConnectionErrorException;
@@ -34,9 +33,9 @@
3433
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
3534
import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations;
3635
import io.airbyte.integrations.base.destination.typing_deduping.NoopTyperDeduper;
37-
import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator;
3836
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
3937
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
38+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
4039
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
4140
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
4241
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
@@ -54,7 +53,8 @@
5453
import org.slf4j.Logger;
5554
import org.slf4j.LoggerFactory;
5655

57-
public abstract class AbstractJdbcDestination extends JdbcConnector implements Destination {
56+
public abstract class AbstractJdbcDestination<DestinationState extends MinimumDestinationState>
57+
extends JdbcConnector implements Destination {
5858

5959
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcDestination.class);
6060

@@ -254,10 +254,20 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map<Stri
254254

255255
protected abstract JdbcSqlGenerator getSqlGenerator();
256256

257-
protected abstract JdbcDestinationHandler<? extends MinimumDestinationState> getDestinationHandler(final String databaseName,
257+
protected abstract JdbcDestinationHandler<DestinationState> getDestinationHandler(final String databaseName,
258258
final JdbcDatabase database,
259259
final String rawTableSchema);
260260

261+
/**
262+
* Provide any migrations that the destination needs to run. Most destinations will need to provide an instande of
263+
* {@link io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator} at minimum.
264+
*/
265+
protected abstract List<Migration<DestinationState>> getMigrations(
266+
final NamingConventionTransformer namingResolver,
267+
final JdbcDatabase database,
268+
final String databaseName,
269+
final JdbcSqlGenerator sqlGenerator);
270+
261271
/**
262272
* "database" key at root of the config json, for any other variants in config, override this
263273
* method.
@@ -319,17 +329,15 @@ private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAi
319329
.orElse(new CatalogParser(sqlGenerator))
320330
.parseCatalog(catalog);
321331
final String databaseName = getDatabaseName(config);
322-
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
323-
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
324-
final DestinationHandler<? extends MinimumDestinationState> destinationHandler =
332+
final DestinationHandler<DestinationState> destinationHandler =
325333
getDestinationHandler(databaseName, database, rawNamespaceOverride.orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
334+
final List<Migration<DestinationState>> migrations = getMigrations(namingResolver, database, databaseName, sqlGenerator);
326335
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
327336
final TyperDeduper typerDeduper;
328337
if (disableTypeDedupe) {
329-
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
338+
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrations);
330339
} else {
331-
typerDeduper =
332-
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
340+
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrations);
333341
}
334342
return typerDeduper;
335343
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcV1V2Migrator.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
import io.airbyte.commons.exceptions.SQLRuntimeException;
1111
import io.airbyte.integrations.base.destination.typing_deduping.BaseDestinationV1V2Migrator;
1212
import io.airbyte.integrations.base.destination.typing_deduping.NamespacedTableName;
13+
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
1314
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
15+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
1416
import java.sql.ResultSet;
1517
import java.sql.SQLException;
1618
import java.util.Collection;
@@ -21,13 +23,18 @@
2123
* Largely based on
2224
* {@link io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV1V2Migrator}.
2325
*/
24-
public class JdbcV1V2Migrator extends BaseDestinationV1V2Migrator<TableDefinition> {
26+
public abstract class JdbcV1V2Migrator<DestinationState extends MinimumDestinationState>
27+
extends BaseDestinationV1V2Migrator<TableDefinition, DestinationState> {
2528

2629
private final NamingConventionTransformer namingConventionTransformer;
2730
private final JdbcDatabase database;
2831
private final String databaseName;
2932

30-
public JdbcV1V2Migrator(final NamingConventionTransformer namingConventionTransformer, final JdbcDatabase database, final String databaseName) {
33+
public JdbcV1V2Migrator(final NamingConventionTransformer namingConventionTransformer,
34+
final SqlGenerator sqlGenerator,
35+
final JdbcDatabase database,
36+
final String databaseName) {
37+
super(sqlGenerator);
3138
this.namingConventionTransformer = namingConventionTransformer;
3239
this.database = database;
3340
this.databaseName = databaseName;
@@ -72,5 +79,4 @@ protected NamespacedTableName convertToV1RawName(final StreamConfig streamConfig
7279
this.namingConventionTransformer.getIdentifier(streamConfig.id().originalNamespace()),
7380
tableName);
7481
}
75-
7682
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestinationTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,17 @@
1111
import com.google.common.collect.ImmutableMap;
1212
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
1313
import io.airbyte.cdk.db.jdbc.JdbcUtils;
14+
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
1415
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
1516
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
1617
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
1718
import io.airbyte.commons.exceptions.ConfigErrorException;
1819
import io.airbyte.commons.json.Jsons;
20+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
21+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
22+
import java.util.Collections;
1923
import java.util.HashMap;
24+
import java.util.List;
2025
import java.util.Map;
2126
import org.junit.jupiter.api.Test;
2227

@@ -146,6 +151,11 @@ protected JdbcDestinationHandler getDestinationHandler(String databaseName, Jdbc
146151
return null;
147152
}
148153

154+
@Override
155+
protected List<Migration<?>> getMigrations(NamingConventionTransformer namingResolver, JdbcDatabase database, String databaseName, JdbcSqlGenerator sqlGenerator) {
156+
return Collections.emptyList();
157+
}
158+
149159
}
150160

151161
}

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,6 @@ public class DefaultTyperDeduper<DestinationState extends MinimumDestinationStat
5757

5858
private final SqlGenerator sqlGenerator;
5959
private final DestinationHandler<DestinationState> destinationHandler;
60-
61-
private final DestinationV1V2Migrator v1V2Migrator;
62-
private final V2TableMigrator v2TableMigrator;
6360
private final List<Migration<DestinationState>> migrations;
6461
private final ParsedCatalog parsedCatalog;
6562
private Set<StreamId> overwriteStreamsWithTmpTable;
@@ -82,14 +79,10 @@ public class DefaultTyperDeduper<DestinationState extends MinimumDestinationStat
8279
public DefaultTyperDeduper(final SqlGenerator sqlGenerator,
8380
final DestinationHandler<DestinationState> destinationHandler,
8481
final ParsedCatalog parsedCatalog,
85-
final DestinationV1V2Migrator v1V2Migrator,
86-
final V2TableMigrator v2TableMigrator,
8782
final List<Migration<DestinationState>> migrations) {
8883
this.sqlGenerator = sqlGenerator;
8984
this.destinationHandler = destinationHandler;
9085
this.parsedCatalog = parsedCatalog;
91-
this.v1V2Migrator = v1V2Migrator;
92-
this.v2TableMigrator = v2TableMigrator;
9386
this.migrations = migrations;
9487
this.initialRawTableStateByStream = new ConcurrentHashMap<>();
9588
this.streamsWithSuccessfulSetup = ConcurrentHashMap.newKeySet(parsedCatalog.streams().size());
@@ -99,30 +92,13 @@ public DefaultTyperDeduper(final SqlGenerator sqlGenerator,
9992
new BasicThreadFactory.Builder().namingPattern(TYPE_AND_DEDUPE_THREAD_NAME).build());
10093
}
10194

102-
public DefaultTyperDeduper(
103-
final SqlGenerator sqlGenerator,
104-
final DestinationHandler<DestinationState> destinationHandler,
105-
final ParsedCatalog parsedCatalog,
106-
final DestinationV1V2Migrator v1V2Migrator,
107-
final List<Migration<DestinationState>> migrations) {
108-
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator(), migrations);
109-
}
110-
11195
@Override
11296
public void prepareSchemasAndRawTables() throws Exception {
11397
// Technically kind of weird to call this here, but it's the best place we have.
11498
// Ideally, we'd create just airbyte_internal here, and defer creating the final table schemas
11599
// until prepareFinalTables... but it doesn't really matter.
116100
TyperDeduperUtil.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog);
117101

118-
TyperDeduperUtil.executeWeirdMigrations(
119-
executorService,
120-
sqlGenerator,
121-
destinationHandler,
122-
v1V2Migrator,
123-
v2TableMigrator,
124-
parsedCatalog);
125-
126102
destinationInitialStates = TyperDeduperUtil.executeRawTableMigrations(
127103
executorService,
128104
destinationHandler,

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2Migrator.java

Lines changed: 0 additions & 25 deletions
This file was deleted.

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpDestinationV1V2Migrator.java

Lines changed: 0 additions & 17 deletions
This file was deleted.

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
@Slf4j
2929
public class NoOpTyperDeduperWithV1V2Migrations<DestinationState extends MinimumDestinationState> implements TyperDeduper {
3030

31-
private final DestinationV1V2Migrator v1V2Migrator;
32-
private final V2TableMigrator v2TableMigrator;
3331
private final List<Migration<DestinationState>> migrations;
3432
private final ExecutorService executorService;
3533
private final ParsedCatalog parsedCatalog;
@@ -39,14 +37,10 @@ public class NoOpTyperDeduperWithV1V2Migrations<DestinationState extends Minimum
3937
public NoOpTyperDeduperWithV1V2Migrations(final SqlGenerator sqlGenerator,
4038
final DestinationHandler<DestinationState> destinationHandler,
4139
final ParsedCatalog parsedCatalog,
42-
final DestinationV1V2Migrator v1V2Migrator,
43-
final V2TableMigrator v2TableMigrator,
4440
final List<Migration<DestinationState>> migrations) {
4541
this.sqlGenerator = sqlGenerator;
4642
this.destinationHandler = destinationHandler;
4743
this.parsedCatalog = parsedCatalog;
48-
this.v1V2Migrator = v1V2Migrator;
49-
this.v2TableMigrator = v2TableMigrator;
5044
this.migrations = migrations;
5145
this.executorService = Executors.newFixedThreadPool(getCountOfTypeAndDedupeThreads(),
5246
new BasicThreadFactory.Builder().namingPattern(TYPE_AND_DEDUPE_THREAD_NAME).build());
@@ -56,14 +50,6 @@ public NoOpTyperDeduperWithV1V2Migrations(final SqlGenerator sqlGenerator,
5650
public void prepareSchemasAndRawTables() throws Exception {
5751
TyperDeduperUtil.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog);
5852

59-
TyperDeduperUtil.executeWeirdMigrations(
60-
executorService,
61-
sqlGenerator,
62-
destinationHandler,
63-
v1V2Migrator,
64-
v2TableMigrator,
65-
parsedCatalog);
66-
6753
List<DestinationInitialState<DestinationState>> destinationInitialStates = TyperDeduperUtil.executeRawTableMigrations(
6854
executorService,
6955
destinationHandler,

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopV2TableMigrator.java

Lines changed: 0 additions & 14 deletions
This file was deleted.

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -78,39 +78,6 @@ class TyperDeduperUtil {
7878
return currentStates
7979
}
8080

81-
/**
82-
* The legacy-style migrations (V1V2Migrator, V2TableMigrator) need to run before we gather
83-
* initial state, because they're dumb and weird.
84-
* (specifically: SnowflakeV2TableMigrator inspects the final tables and triggers a soft reset
85-
* directly within the migration).
86-
* TODO: Migrate these migrations to the new migration system.
87-
* This will also reduce the number of times we need to query DB metadata, since (a) we can rely
88-
* on the gatherInitialState values, and (b) we can add a DestinationState field for these migrations.
89-
* It also enables us to not trigger multiple soft resets in a single sync.
90-
*/
91-
@JvmStatic
92-
fun <DestinationState> executeWeirdMigrations(
93-
executorService: ExecutorService,
94-
sqlGenerator: SqlGenerator,
95-
destinationHandler: DestinationHandler<DestinationState>,
96-
v1V2Migrator: DestinationV1V2Migrator,
97-
v2TableMigrator: V2TableMigrator,
98-
parsedCatalog: ParsedCatalog
99-
) {
100-
val futures = parsedCatalog.streams.map {
101-
CompletableFuture.supplyAsync(
102-
{
103-
v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, it)
104-
v2TableMigrator.migrateIfNecessary(it)
105-
},
106-
executorService
107-
)
108-
}
109-
getResultsOrLogAndThrowFirst(
110-
"The following exceptions were thrown attempting to run migrations:\n",
111-
CompletableFutures.allOf(futures.toList()).toCompletableFuture().join())
112-
}
113-
11481
/**
11582
* Extracts all the "raw" and "final" schemas identified in the [parsedCatalog] and ensures they
11683
* exist in the Destination Database.

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/V2TableMigrator.java

Lines changed: 0 additions & 15 deletions
This file was deleted.

0 commit comments

Comments
 (0)