Skip to content

Commit 6e9cdc8

Browse files
jbfbellbnchrchbazarnovalafanecherelazebnyi
authored
✨ Destination BigQuery - Add v1v2 Migration (#28962)
* Add everything for BQ but migrate, refactor interface after practical work * Make new default methods, refactor to single implemented method * MigrationInterface and BQ impl created * Trying to integrate with standard inserts * remove unnecessary NameAndNamespacePair class * Shimmed in * Java Docs * Initial Testing Setup * Tests! * Move Migrator into TyperDeduper * Functional Migration * Add Integration Test * Pr updates * bump version * bump version * version bump * Update to airbyte-ci-internal (#29026) * 🐛 Source Github, Instagram, Zendesk-support, Zendesk-talk: fix CAT tests fail on `spec` (#28910) * connectors-ci: better modified connectors detection logic (#28855) * connectors-ci: report path should always start with `airbyte-ci/` (#29030) * make report path always start with airbyte-ci * revert report path in orchestrator * add more test cases * bump version * Updated docs (#29019) * CDK: Embedded reader utils (#28873) * relax pydantic dep * Automated Commit - Format and Process Resources Changes * wip * wrap up base integration * add init file * introduce CDK runner and improve error message * make state param optional * update protocol models * review comments * always run incremental if possible * fix --------- Co-authored-by: flash1293 <[email protected]> * 🤖 Bump minor version of Airbyte CDK * 🚨🚨 Low code CDK: Decouple SimpleRetriever and HttpStream (#28657) * fix tests * format * review comments * Automated Commit - Formatting Changes * review comments * review comments * review comments * log all messages * log all message * review comments * review comments * Automated Commit - Formatting Changes * add comment --------- Co-authored-by: flash1293 <[email protected]> * 🤖 Bump minor version of Airbyte CDK * 🐛 Source Github, Instagram, Zendesk Support / Talk - revert `spec` changes and improve (#29031) * Source oauth0: new streams and fix incremental (#29001) * Add new streams Organizations,OrganizationMembers,OrganizationMemberRoles * relax schema definition to allow additional fields * Bump image tag version * revert some changes to the old schemas * Format python so gradle can pass * update incremental * remove unused print * fix unit test --------- Co-authored-by: Vasilis Gavriilidis <[email protected]> * 🐛 Source Mongo: Fix failing acceptance tests (#28816) * Fix failing acceptance tests * Fix failing strict acceptance tests * Source-Greenhouse: Fix unit tests for new CDK version (#28969) Fix unit tests * Add CSV options to the CSV parser (#28491) * remove invalid legacy option * remove unused option * the tests pass but this is quite messy * very slight clean up * Add skip options to csv format * fix some of the typing issues * fixme comment * remove extra log message * fix typing issues * skip before header * skip after header * format * add another test * Automated Commit - Formatting Changes * auto generate column names * delete dead code * update title and description * true and false values * Update the tests * Add comment * missing test * rename * update expected spec * move to method * Update comment * fix typo * remove unused import * Add a comment * None records do not pass the WaitForDiscoverPolicy * format * remove second branch to ensure we always go through the same processing * Raise an exception if the record is None * reset * Update tests * handle unquoted newlines * Automated Commit - Formatting Changes * Update test case so the quoting is explicit * Update comment * Automated Commit - Formatting Changes * Fail validation if skipping rows before header and header is autogenerated * always fail if a record cannot be parsed * format * set write line_no in error message * remove none check * Automated Commit - Formatting Changes * enable autogenerate test * remove duplicate test * missing unit tests * Update * remove branching * remove unused none check * Update tests * remove branching * format * extract to function * comment * missing type * type annotation * use set * Document that the strings are case-sensitive * public -> private * add unit test * newline --------- Co-authored-by: girarda <[email protected]> * Dagster: Add sentry logging (#28822) * Add sentry * add sentry decorator * Add traces * Use sentry trace * Improve duplicate logging * Add comments * DNC * Fix up issues * Move to scopes * Remove breadcrumb * Update lock * ✨Source Shortio: Migrate Python CDK to Low-code CDK (#28950) * Migrate Shortio to Low-Code * Update abnormal state * Format * Update Docs * Fix metadata.yaml * Add pagination * Add incremental sync * add incremental parameters * update metadata * rollback update version * release date --------- Co-authored-by: marcosmarxm <[email protected]> * Update to new verbiage (#29051) * [skip ci] Metadata: Remove leading underscore (#29024) * DNC * Add test models * Add model test * Remove underscore from metadata files * Regenerate models * Add test to check for key transformation * Allow additional fields on metadata * Delete transform * Proof of concept parallel source stream reading implementation for MySQL (#26580) * Proof of concept parallel source stream reading implementation for MySQL * Automated Change * Add read method that supports concurrent execution to Source interface * Remove parallel iterator * Ensure that executor service is stopped * Automated Commit - Format and Process Resources Changes * Expose method to fix compilation issue * Use concurrent map to avoid access issues * Automated Commit - Format and Process Resources Changes * Ensure concurrent streams finish before closing source * Fix compile issue * Formatting * Exclude concurrent stream threads from orphan thread watcher * Automated Commit - Format and Process Resources Changes * Refactor orphaned thread logic to account for concurrent execution * PR feedback * Implement readStreams in wrapper source * Automated Commit - Format and Process Resources Changes * Add readStream override * Automated Commit - Format and Process Resources Changes * 🤖 Auto format source-mysql code [skip ci] * 🤖 Auto format source-mysql code [skip ci] * 🤖 Auto format source-mysql code [skip ci] * 🤖 Auto format source-mysql code [skip ci] * 🤖 Auto format source-mysql code [skip ci] * Debug logging * Reduce logging level * Replace synchronized calls to System.out.println when concurrent * Close consumer * Flush before close * Automated Commit - Format and Process Resources Changes * Remove charset * Use ASCII and flush periodically for parallel streams * Test performance harness patch * Automated Commit - Format and Process Resources Changes * Cleanup * Logging to identify concurrent read enabled * Mark parameter as final --------- Co-authored-by: jdpgrailsdev <[email protected]> Co-authored-by: octavia-squidington-iii <[email protected]> Co-authored-by: Rodi Reich Zilberman <[email protected]> Co-authored-by: rodireich <[email protected]> * connectors-ci: disable dependency scanning (#29033) * updates (#29059) * Metadata: skip breaking change validation on prerelease (#29017) * skip breaking change validation * Move ValidatorOpts higher in call * Add prerelease test * Fix test * ✨ Source MongoDB Internal POC: Generate Test Data (#29049) * Add script to generate test data * Fix prose * Update credentials example * PR feedback * Bump Airbyte version from 0.50.12 to 0.50.13 * Bump versions for mssql strict-encrypt (#28964) * Bump versions for mssql strict-encrypt * Fix failing test * Fix failing test * 🎨 Improve replication method selection UX (#28882) * update replication method in MySQL source * bump version * update expected specs * update registries * bump strict encrypt version * make password always_show * change url * update registries * 🐛 Avoid writing records to log (#29047) * Avoid writing records to log * Update version * Rollout ctid cdc (#28708) * source-postgres: enable ctid+cdc implementation * 100% ctid rollout for cdc * remove CtidFeatureFlags * fix CdcPostgresSourceAcceptanceTest * Bump versions and release notes * Fix compilation error due to previous merge --------- Co-authored-by: subodh <[email protected]> * connectors-ci: fix `unhashable type 'set'` (#29064) * Add Slack Alert lifecycle to Dagster for Metadata publish (#28759) * DNC * Add slack lifecycle logging * Update to use slack * Update slack to use resource and bot * Improve markdown * Improve log * Add sensor logging * Extend sensor time * merge conflict * PR Refactoring * Make the tests work * remove unnecessary classes, pr feedback * more merging * Update airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java Co-authored-by: Edward Gao <[email protected]> * snowflake updates --------- Co-authored-by: Ben Church <[email protected]> Co-authored-by: Baz <[email protected]> Co-authored-by: Augustin <[email protected]> Co-authored-by: Serhii Lazebnyi <[email protected]> Co-authored-by: Joe Reuter <[email protected]> Co-authored-by: flash1293 <[email protected]> Co-authored-by: Marcos Marx <[email protected]> Co-authored-by: Vasilis Gavriilidis <[email protected]> Co-authored-by: Jonathan Pearlin <[email protected]> Co-authored-by: Alexandre Girard <[email protected]> Co-authored-by: girarda <[email protected]> Co-authored-by: btkcodedev <[email protected]> Co-authored-by: marcosmarxm <[email protected]> Co-authored-by: Natalie Kwong <[email protected]> Co-authored-by: jdpgrailsdev <[email protected]> Co-authored-by: octavia-squidington-iii <[email protected]> Co-authored-by: Rodi Reich Zilberman <[email protected]> Co-authored-by: rodireich <[email protected]> Co-authored-by: Alexandre Cuoci <[email protected]> Co-authored-by: terencecho <[email protected]> Co-authored-by: Lake Mossman <[email protected]> Co-authored-by: Benoit Moriceau <[email protected]> Co-authored-by: subodh <[email protected]> Co-authored-by: Edward Gao <[email protected]>
1 parent cfc6834 commit 6e9cdc8

File tree

35 files changed

+862
-258
lines changed

35 files changed

+862
-258
lines changed

airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java

+33
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import static org.junit.jupiter.api.Assertions.assertAll;
1010
import static org.junit.jupiter.api.Assertions.assertEquals;
1111
import static org.junit.jupiter.api.Assertions.assertFalse;
12+
import static org.junit.jupiter.api.Assertions.assertNull;
1213
import static org.junit.jupiter.api.Assertions.assertThrows;
1314
import static org.junit.jupiter.api.Assertions.assertTrue;
1415
import static org.junit.jupiter.api.Assertions.fail;
@@ -21,6 +22,7 @@
2122
import io.airbyte.protocol.models.v0.SyncMode;
2223
import java.util.LinkedHashMap;
2324
import java.util.List;
25+
import java.util.Map;
2426
import java.util.Optional;
2527
import java.util.stream.Stream;
2628
import org.apache.commons.lang3.tuple.Pair;
@@ -122,6 +124,11 @@ public abstract class BaseSqlGeneratorIntegrationTest<DialectTableDefinition> {
122124
*/
123125
protected abstract void createRawTable(StreamId streamId) throws Exception;
124126

127+
/**
128+
* Creates a raw table in the v1 format
129+
*/
130+
protected abstract void createV1RawTable(StreamId v1RawTable) throws Exception;
131+
125132
/**
126133
* Create a final table usingi the StreamId's finalTableId. Subclasses are recommended to hardcode
127134
* the columns from {@link #FINAL_TABLE_COLUMN_NAMES} or {@link #FINAL_TABLE_COLUMN_NAMES_CDC}. The
@@ -132,6 +139,8 @@ public abstract class BaseSqlGeneratorIntegrationTest<DialectTableDefinition> {
132139

133140
protected abstract void insertRawTableRecords(StreamId streamId, List<JsonNode> records) throws Exception;
134141

142+
protected abstract void insertV1RawTableRecords(StreamId streamId, List<JsonNode> records) throws Exception;
143+
135144
protected abstract void insertFinalTableRecords(boolean includeCdcDeletedAt, StreamId streamId, String suffix, List<JsonNode> records)
136145
throws Exception;
137146

@@ -709,6 +718,30 @@ public void weirdColumnNames() throws Exception {
709718
dumpFinalTableRecords(streamId, ""));
710719
}
711720

721+
@Test
722+
public void testV1V2migration() throws Exception {
723+
// This is maybe a little hacky, but it avoids having to refactor this entire class and subclasses
724+
// for something that is going away
725+
StreamId v1RawTableStreamId = new StreamId(null, null, streamId.finalNamespace(), "v1_" + streamId.rawName(), null, null);
726+
createV1RawTable(v1RawTableStreamId);
727+
insertV1RawTableRecords(v1RawTableStreamId, singletonList(Jsons.jsonNode(Map.of(
728+
"_airbyte_ab_id", "v1v2",
729+
"_airbyte_emitted_at", "2023-01-01T00:00:00Z",
730+
"_airbyte_data", "{\"hello\": \"world\"}"))));
731+
final String migration = generator.migrateFromV1toV2(streamId, v1RawTableStreamId.rawNamespace(), v1RawTableStreamId.rawName());
732+
destinationHandler.execute(migration);
733+
List<JsonNode> v1RawRecords = dumpRawTableRecords(v1RawTableStreamId);
734+
List<JsonNode> v2RawRecords = dumpRawTableRecords(streamId);
735+
assertAll(
736+
() -> assertEquals(1, v1RawRecords.size()),
737+
() -> assertEquals(1, v2RawRecords.size()),
738+
() -> assertEquals(v1RawRecords.get(0).get("_airbyte_ab_id").asText(), v2RawRecords.get(0).get("_airbyte_raw_id").asText()),
739+
() -> assertEquals(Jsons.deserialize(v1RawRecords.get(0).get("_airbyte_data").asText()), v2RawRecords.get(0).get("_airbyte_data")),
740+
() -> assertEquals(v1RawRecords.get(0).get("_airbyte_emitted_at").asText(), v2RawRecords.get(0).get("_airbyte_extracted_at").asText()),
741+
() -> assertNull(v2RawRecords.get(0).get("_airbyte_loaded_at")));
742+
743+
}
744+
712745
private void verifyRecords(final String expectedRawRecordsFile,
713746
final List<JsonNode> actualRawRecords,
714747
final String expectedFinalRecordsFile,

airbyte-integrations/bases/base-typing-deduping/build.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@ plugins {
33
}
44

55
dependencies {
6-
implementation libs.airbyte.protocol
6+
implementation libs.airbyte.protocol
7+
implementation project(path: ':airbyte-integrations:bases:base-java')
78
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.base.destination.typing_deduping;
6+
7+
import static io.airbyte.integrations.base.JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS;
8+
import static io.airbyte.integrations.base.JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES;
9+
10+
import io.airbyte.protocol.models.v0.DestinationSyncMode;
11+
import java.util.Collection;
12+
import java.util.Optional;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
public abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> implements DestinationV1V2Migrator {
17+
18+
Logger LOGGER = LoggerFactory.getLogger(BaseDestinationV1V2Migrator.class);
19+
20+
@Override
21+
public void migrateIfNecessary(
22+
final SqlGenerator sqlGenerator,
23+
final DestinationHandler destinationHandler,
24+
final StreamConfig streamConfig)
25+
throws TableNotMigratedException, UnexpectedSchemaException {
26+
if (shouldMigrate(streamConfig)) {
27+
LOGGER.info("Starting v2 Migration for stream {}", streamConfig.id().finalName());
28+
migrate(sqlGenerator, destinationHandler, streamConfig);
29+
}
30+
}
31+
32+
/**
33+
* Determine whether a given stream needs to be migrated from v1 to v2
34+
*
35+
* @param streamConfig the stream in question
36+
* @return whether to migrate the stream
37+
*/
38+
protected boolean shouldMigrate(final StreamConfig streamConfig) {
39+
final var v1RawTable = convertToV1RawName(streamConfig);
40+
return isMigrationRequiredForSyncMode(streamConfig.destinationSyncMode())
41+
&& !doesValidV2RawTableAlreadyExist(streamConfig)
42+
&& doesValidV1RawTableExist(v1RawTable.namespace(), v1RawTable.tableName());
43+
}
44+
45+
/**
46+
* Execute sql statements that converts a v1 raw table to a v2 raw table. Leaves the v1 raw table
47+
* intact
48+
*
49+
* @param sqlGenerator the class which generates dialect specific sql statements
50+
* @param destinationHandler the class which executes the sql statements
51+
* @param streamConfig the stream to migrate the raw table of
52+
*/
53+
public void migrate(final SqlGenerator<DialectTableDefinition> sqlGenerator,
54+
final DestinationHandler<DialectTableDefinition> destinationHandler,
55+
final StreamConfig streamConfig)
56+
throws TableNotMigratedException {
57+
final var namespacedTableName = convertToV1RawName(streamConfig);
58+
final var migrateAndReset = String.join("\n",
59+
sqlGenerator.migrateFromV1toV2(streamConfig.id(), namespacedTableName.namespace(),
60+
namespacedTableName.tableName()),
61+
sqlGenerator.softReset(streamConfig));
62+
try {
63+
destinationHandler.execute(migrateAndReset);
64+
} catch (Exception e) {
65+
final var message = "Attempted and failed to migrate stream %s".formatted(streamConfig.id().finalName());
66+
throw new TableNotMigratedException(message, e);
67+
}
68+
}
69+
70+
/**
71+
* Checks the schema of the v1 raw table to ensure it matches the expected format
72+
*
73+
* @param existingV2AirbyteRawTable the v1 raw table
74+
* @return whether the schema is as expected
75+
*/
76+
private boolean doesV1RawTableMatchExpectedSchema(DialectTableDefinition existingV2AirbyteRawTable) {
77+
78+
return schemaMatchesExpectation(existingV2AirbyteRawTable, LEGACY_RAW_TABLE_COLUMNS);
79+
}
80+
81+
/**
82+
* Checks the schema of the v2 raw table to ensure it matches the expected format
83+
*
84+
* @param existingV2AirbyteRawTable the v2 raw table
85+
*/
86+
private void validateAirbyteInternalNamespaceRawTableMatchExpectedV2Schema(DialectTableDefinition existingV2AirbyteRawTable) {
87+
if (!schemaMatchesExpectation(existingV2AirbyteRawTable, V2_RAW_TABLE_COLUMN_NAMES)) {
88+
throw new UnexpectedSchemaException("Destination V2 Raw Table does not match expected Schema");
89+
}
90+
}
91+
92+
/**
93+
* If the sync mode is a full refresh and we overwrite the table then there is no need to migrate
94+
*
95+
* @param destinationSyncMode destination sync mode
96+
* @return whether this is full refresh overwrite
97+
*/
98+
private boolean isMigrationRequiredForSyncMode(final DestinationSyncMode destinationSyncMode) {
99+
return !DestinationSyncMode.OVERWRITE.equals(destinationSyncMode);
100+
}
101+
102+
/**
103+
* Checks if a valid destinations v2 raw table already exists
104+
*
105+
* @param streamConfig the raw table to check
106+
* @return whether it exists and is in the correct format
107+
*/
108+
private boolean doesValidV2RawTableAlreadyExist(final StreamConfig streamConfig) {
109+
if (doesAirbyteInternalNamespaceExist(streamConfig)) {
110+
final var existingV2Table = getTableIfExists(streamConfig.id().rawNamespace(), streamConfig.id().rawName());
111+
existingV2Table.ifPresent(this::validateAirbyteInternalNamespaceRawTableMatchExpectedV2Schema);
112+
return existingV2Table.isPresent();
113+
}
114+
return false;
115+
}
116+
117+
/**
118+
* Checks if a valid v1 raw table already exists
119+
*
120+
* @param namespace
121+
* @param tableName
122+
* @return whether it exists and is in the correct format
123+
*/
124+
private boolean doesValidV1RawTableExist(final String namespace, final String tableName) {
125+
final var existingV1RawTable = getTableIfExists(namespace, tableName);
126+
return existingV1RawTable.isPresent() && doesV1RawTableMatchExpectedSchema(existingV1RawTable.get());
127+
}
128+
129+
/**
130+
* Checks to see if Airbyte's internal schema for destinations v2 exists
131+
*
132+
* @param streamConfig the stream to check
133+
* @return whether the schema exists
134+
*/
135+
abstract protected boolean doesAirbyteInternalNamespaceExist(StreamConfig streamConfig);
136+
137+
/**
138+
* Checks a Table's schema and compares it to an expected schema to make sure it matches
139+
*
140+
* @param existingTable the table to check
141+
* @param columns the expected schema
142+
* @return whether the existing table schema matches the expectation
143+
*/
144+
abstract protected boolean schemaMatchesExpectation(DialectTableDefinition existingTable, Collection<String> columns);
145+
146+
/**
147+
* Get a reference ta a table if it exists
148+
*
149+
* @param namespace
150+
* @param tableName
151+
* @return an optional potentially containing a reference to the table
152+
*/
153+
abstract protected Optional<DialectTableDefinition> getTableIfExists(String namespace, String tableName);
154+
155+
/**
156+
* We use different naming conventions for raw table names in destinations v2, we need a way to map
157+
* that back to v1 names
158+
*
159+
* @param streamConfig the stream in question
160+
* @return the valid v1 name and namespace for the same stream
161+
*/
162+
abstract protected NamespacedTableName convertToV1RawName(StreamConfig streamConfig);
163+
164+
}

airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CollectionUtils.java

+7
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ public static boolean containsIgnoreCase(final Collection<String> collection, fi
3434
* @return whether all searchTerms are in the searchCollection
3535
*/
3636
public static boolean containsAllIgnoreCase(final Collection<String> searchCollection, final Collection<String> searchTerms) {
37+
if (searchTerms.isEmpty()) {
38+
// There isn't a good behavior for an empty collection. Without this check, an empty collection
39+
// would always return
40+
// true, but it feels misleading to say that the searchCollection does "contain all" when
41+
// searchTerms is empty
42+
throw new IllegalArgumentException("Search Terms collection may not be empty");
43+
}
3744
return searchTerms.stream().allMatch(term -> containsIgnoreCase(searchCollection, term));
3845
}
3946

airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
* <p>
2020
* In a typical sync, destinations should call the methods:
2121
* <ol>
22-
* <li>{@link #prepareFinalTables()} once at the start of the sync</li>
22+
* <li>{@link #prepareTables()} once at the start of the sync</li>
2323
* <li>{@link #typeAndDedupe(String, String)} as needed throughout the sync</li>
2424
* <li>{@link #commitFinalTables()} once at the end of the sync</li>
2525
* </ol>
@@ -35,15 +35,19 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper
3535

3636
private final SqlGenerator<DialectTableDefinition> sqlGenerator;
3737
private final DestinationHandler<DialectTableDefinition> destinationHandler;
38+
39+
private final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator;
3840
private final ParsedCatalog parsedCatalog;
3941
private Set<StreamId> overwriteStreamsWithTmpTable;
4042

4143
public DefaultTyperDeduper(SqlGenerator<DialectTableDefinition> sqlGenerator,
4244
DestinationHandler<DialectTableDefinition> destinationHandler,
43-
ParsedCatalog parsedCatalog) {
45+
ParsedCatalog parsedCatalog,
46+
DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator) {
4447
this.sqlGenerator = sqlGenerator;
4548
this.destinationHandler = destinationHandler;
4649
this.parsedCatalog = parsedCatalog;
50+
this.v1V2Migrator = v1V2Migrator;
4751
}
4852

4953
/**
@@ -52,7 +56,7 @@ public DefaultTyperDeduper(SqlGenerator<DialectTableDefinition> sqlGenerator,
5256
* empty) we write to a temporary final table, and swap it into the true final table at the end of
5357
* the sync. This is to prevent user downtime during a sync.
5458
*/
55-
public void prepareFinalTables() throws Exception {
59+
public void prepareTables() throws Exception {
5660
if (overwriteStreamsWithTmpTable != null) {
5761
throw new IllegalStateException("Tables were already prepared.");
5862
}
@@ -63,6 +67,8 @@ public void prepareFinalTables() throws Exception {
6367
// Also, for OVERWRITE streams, decide if we're writing directly to the final table, or into an
6468
// _airbyte_tmp table.
6569
for (StreamConfig stream : parsedCatalog.streams()) {
70+
// Migrate the Raw Tables if this is the first v2 sync after a v1 sync
71+
v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream);
6672
final Optional<DialectTableDefinition> existingTable = destinationHandler.findExistingTable(stream.id());
6773
if (existingTable.isPresent()) {
6874
// The table already exists. Decide whether we're writing to it directly, or using a tmp table.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.base.destination.typing_deduping;
6+
7+
public interface DestinationV1V2Migrator<DialectTableDefinition> {
8+
9+
/**
10+
* This is the primary entrypoint to this interface
11+
* <p>
12+
* Determine whether a migration is necessary for a given stream and if so, migrate the raw table
13+
* and rebuild the final table with a soft reset
14+
*
15+
* @param sqlGenerator the class to use to generate sql
16+
* @param destinationHandler the handler to execute the sql statements
17+
* @param streamConfig the stream to assess migration needs
18+
*/
19+
void migrateIfNecessary(
20+
final SqlGenerator<DialectTableDefinition> sqlGenerator,
21+
final DestinationHandler<DialectTableDefinition> destinationHandler,
22+
final StreamConfig streamConfig)
23+
throws TableNotMigratedException, UnexpectedSchemaException;
24+
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.base.destination.typing_deduping;
6+
7+
// yet another namespace, name combo class
8+
public record NamespacedTableName(String namespace, String tableName) {
9+
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.base.destination.typing_deduping;
6+
7+
public class NoOpDestinationV1V2Migrator<DialectTableDefinition> implements DestinationV1V2Migrator<DialectTableDefinition> {
8+
9+
@Override
10+
public void migrateIfNecessary(final SqlGenerator<DialectTableDefinition> sqlGenerator,
11+
final DestinationHandler<DialectTableDefinition> destinationHandler,
12+
final StreamConfig streamConfig)
13+
throws TableNotMigratedException, UnexpectedSchemaException {
14+
// Do nothing
15+
}
16+
17+
}

airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopTyperDeduper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
public class NoopTyperDeduper implements TyperDeduper {
88

99
@Override
10-
public void prepareFinalTables() throws Exception {
10+
public void prepareTables() throws Exception {
1111

1212
}
1313

airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.java

+11
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,15 @@ public interface SqlGenerator<DialectTableDefinition> {
7676
*/
7777
String overwriteFinalTable(StreamId stream, String finalSuffix);
7878

79+
/**
80+
* Creates a sql query which will create a v2 raw table from the v1 raw table, then performs a soft
81+
* reset.
82+
*
83+
* @param streamId the stream to migrate
84+
* @param namespace
85+
* @param tableName
86+
* @return a string containing the necessary sql to migrate
87+
*/
88+
String migrateFromV1toV2(StreamId streamId, String namespace, String tableName);
89+
7990
}

airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TableNotMigratedException.java

+4
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,8 @@ public TableNotMigratedException(String message) {
1414
super(message);
1515
}
1616

17+
public TableNotMigratedException(String message, Throwable cause) {
18+
super(message, cause);
19+
}
20+
1721
}

0 commit comments

Comments
 (0)