Skip to content

🐛 Destination snowflake: Create final tables with uppercase naming #30056

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ public abstract class BaseSqlGeneratorIntegrationTest<DialectTableDefinition> {
Stream.of("_ab_cdc_deleted_at")).toList();
}

protected static final RecordDiffer DIFFER = new RecordDiffer(
Pair.of("id1", AirbyteProtocolType.INTEGER),
Pair.of("id2", AirbyteProtocolType.INTEGER),
Pair.of("updated_at", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE));
protected RecordDiffer DIFFER;

/**
* Subclasses may use these four StreamConfigs in their tests.
Expand Down Expand Up @@ -118,6 +115,14 @@ public abstract class BaseSqlGeneratorIntegrationTest<DialectTableDefinition> {

protected abstract DestinationHandler<DialectTableDefinition> getDestinationHandler();

/**
* Subclasses should override this method if they need to make changes to the stream ID. For
* example, you could upcase the final table name here.
*/
protected StreamId buildStreamId(final String namespace, final String finalTableName, final String rawTableName) {
return new StreamId(namespace, finalTableName, namespace, rawTableName, namespace, finalTableName);
}

/**
* Do any setup work to create a namespace for this test run. For example, this might create a
* BigQuery dataset, or a Snowflake schema.
Expand Down Expand Up @@ -198,12 +203,17 @@ public void setup() throws Exception {
final LinkedHashMap<ColumnId, AirbyteType> cdcColumns = new LinkedHashMap<>(COLUMNS);
cdcColumns.put(generator.buildColumnId("_ab_cdc_deleted_at"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE);

DIFFER = new RecordDiffer(
Pair.of(id1, AirbyteProtocolType.INTEGER),
Pair.of(id2, AirbyteProtocolType.INTEGER),
Pair.of(cursor, AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE));

namespace = Strings.addRandomSuffix("sql_generator_test", "_", 5);
// This is not a typical stream ID would look like, but SqlGenerator isn't allowed to make any
// assumptions about StreamId structure.
// In practice, the final table would be testDataset.users, and the raw table would be
// airbyte_internal.testDataset_raw__stream_users.
streamId = new StreamId(namespace, "users_final", namespace, "users_raw", namespace, "users_final");
streamId = buildStreamId(namespace, "users_final", "users_raw");

incrementalDedupStream = new StreamConfig(
streamId,
Expand Down Expand Up @@ -483,7 +493,7 @@ public void incrementalDedupNoCursor() throws Exception {
actualFinalRecords);
assertAll(
() -> assertEquals("bar", actualRawRecords.get(0).get("_airbyte_data").get("string").asText()),
() -> assertEquals("bar", actualFinalRecords.get(0).get("string").asText()));
() -> assertEquals("bar", actualFinalRecords.get(0).get(generator.buildColumnId("string").name()).asText()));
}

@Test
Expand Down Expand Up @@ -791,15 +801,10 @@ public void weirdColumnNames() throws Exception {
public void noCrashOnSpecialCharacters(final String specialChars) throws Exception {
final String str = namespace + "_" + specialChars;
final StreamId originalStreamId = generator.buildStreamId(str, str, "unused");
final StreamId modifiedStreamId = new StreamId(
final StreamId modifiedStreamId = buildStreamId(
originalStreamId.finalNamespace(),
originalStreamId.finalName(),
// hack for testing simplicity: put the raw tables in the final namespace. This makes cleanup
// easier.
originalStreamId.finalNamespace(),
"raw_table",
null,
null);
"raw_table");
final ColumnId columnId = generator.buildColumnId(str);
try {
createNamespace(modifiedStreamId.finalNamespace());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,7 @@ public abstract class BaseTypingDedupingTest {
throw new RuntimeException(e);
}
}
private static final RecordDiffer DIFFER = new RecordDiffer(
Pair.of("id1", AirbyteProtocolType.INTEGER),
Pair.of("id2", AirbyteProtocolType.INTEGER),
Pair.of("updated_at", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE),
Pair.of("old_cursor", AirbyteProtocolType.INTEGER));
private RecordDiffer DIFFER;

private String randomSuffix;
private JsonNode config;
Expand Down Expand Up @@ -112,7 +108,8 @@ public abstract class BaseTypingDedupingTest {
/**
* For a given stream, return the records that exist in the destination's final table. Each record
* must be in the format {"_airbyte_raw_id": "...", "_airbyte_extracted_at": "...", "_airbyte_meta":
* {...}, "field1": ..., "field2": ..., ...}.
* {...}, "field1": ..., "field2": ..., ...}. If the destination renames (e.g. upcases) the airbyte
* fields, this method must revert that naming to use the exact strings "_airbyte_raw_id", etc.
* <p>
* For JSON-valued columns, there is some nuance: a SQL null should be represented as a missing
* entry, whereas a JSON null should be represented as a
Expand All @@ -137,6 +134,8 @@ public abstract class BaseTypingDedupingTest {
*/
protected abstract void teardownStreamAndNamespace(String streamNamespace, String streamName) throws Exception;

protected abstract SqlGenerator<?> getSqlGenerator();

/**
* Destinations which need to clean up resources after an entire test finishes should override this
* method. For example, if you want to gracefully close a database connection, you should do that
Expand Down Expand Up @@ -164,6 +163,14 @@ public void setup() throws Exception {
streamNamespace = "typing_deduping_test" + getUniqueSuffix();
streamName = "test_stream" + getUniqueSuffix();
streamsToTearDown = new ArrayList<>();

final SqlGenerator<?> generator = getSqlGenerator();
DIFFER = new RecordDiffer(
Pair.of(generator.buildColumnId("id1"), AirbyteProtocolType.INTEGER),
Pair.of(generator.buildColumnId("id2"), AirbyteProtocolType.INTEGER),
Pair.of(generator.buildColumnId("updated_at"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE),
Pair.of(generator.buildColumnId("old_cursor"), AirbyteProtocolType.INTEGER));

LOGGER.info("Using stream namespace {} and name {}", streamNamespace, streamName);
}

Expand Down Expand Up @@ -409,7 +416,7 @@ public void testIncrementalSyncDropOneColumn() throws Exception {
// The raw data is unaffected by the schema, but the final table should not have a `name` column.
final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl").stream()
.peek(record -> ((ObjectNode) record).remove("name"))
.peek(record -> ((ObjectNode) record).remove(getSqlGenerator().buildColumnId("name").name()))
.toList();
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,39 @@
*/
public class RecordDiffer {

private final Comparator<JsonNode> recordIdentityComparator;
private final Comparator<JsonNode> recordSortComparator;
private final Function<JsonNode, String> recordIdentityExtractor;
private final Comparator<JsonNode> rawRecordIdentityComparator;
private final Comparator<JsonNode> rawRecordSortComparator;
private final Function<JsonNode, String> rawRecordIdentityExtractor;

private final Comparator<JsonNode> finalRecordIdentityComparator;
private final Comparator<JsonNode> finalRecordSortComparator;
private final Function<JsonNode, String> finalRecordIdentityExtractor;

/**
* @param identifyingColumns Which fields constitute a unique record (typically PK+cursor). Do _not_
* include extracted_at; it is handled automatically.
*/
@SafeVarargs
public RecordDiffer(final Pair<String, AirbyteType>... identifyingColumns) {
this.recordIdentityComparator = buildIdentityComparator(identifyingColumns);
this.recordSortComparator = recordIdentityComparator.thenComparing(record -> asString(record.get("_airbyte_raw_id")));
this.recordIdentityExtractor = buildIdentityExtractor(identifyingColumns);
public RecordDiffer(final Pair<ColumnId, AirbyteType>... identifyingColumns) {
final Pair<String, AirbyteType>[] rawTableIdentifyingColumns = Arrays.stream(identifyingColumns)
.map(p -> Pair.of(
// Raw tables always retain the original column names
p.getLeft().originalName(),
p.getRight()))
.toArray(Pair[]::new);
this.rawRecordIdentityComparator = buildIdentityComparator(rawTableIdentifyingColumns);
this.rawRecordSortComparator = rawRecordIdentityComparator.thenComparing(record -> asString(record.get("_airbyte_raw_id")));
this.rawRecordIdentityExtractor = buildIdentityExtractor(rawTableIdentifyingColumns);

final Pair<String, AirbyteType>[] finalTableIdentifyingColumns = Arrays.stream(identifyingColumns)
.map(p -> Pair.of(
// Final tables may have modified the column names, so use the final name here.
p.getLeft().name(),
p.getRight()))
.toArray(Pair[]::new);
this.finalRecordIdentityComparator = buildIdentityComparator(finalTableIdentifyingColumns);
this.finalRecordSortComparator = finalRecordIdentityComparator.thenComparing(record -> asString(record.get("_airbyte_raw_id")));
this.finalRecordIdentityExtractor = buildIdentityExtractor(finalTableIdentifyingColumns);
}

/**
Expand All @@ -70,9 +90,9 @@ public void diffRawTableRecords(final List<JsonNode> expectedRecords, final List
final String diff = diffRecords(
expectedRecords.stream().map(RecordDiffer::copyWithLiftedData).collect(toList()),
actualRecords.stream().map(RecordDiffer::copyWithLiftedData).collect(toList()),
recordIdentityComparator,
recordSortComparator,
recordIdentityExtractor);
rawRecordIdentityComparator,
rawRecordSortComparator,
rawRecordIdentityExtractor);

if (!diff.isEmpty()) {
fail("Raw table was incorrect.\n" + diff);
Expand All @@ -83,9 +103,9 @@ public void diffFinalTableRecords(final List<JsonNode> expectedRecords, final Li
final String diff = diffRecords(
expectedRecords,
actualRecords,
recordIdentityComparator,
recordSortComparator,
recordIdentityExtractor);
finalRecordIdentityComparator,
finalRecordSortComparator,
finalRecordIdentityExtractor);

if (!diff.isEmpty()) {
fail("Final table was incorrect.\n" + diff);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper
private final DestinationHandler<DialectTableDefinition> destinationHandler;

private final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator;
private final V2RawTableMigrator<DialectTableDefinition> v2RawTableMigrator;
private final V2TableMigrator<DialectTableDefinition> v2TableMigrator;
private final ParsedCatalog parsedCatalog;
private Set<StreamId> overwriteStreamsWithTmpTable;
private final Set<StreamId> streamsWithSuccesfulSetup;
Expand All @@ -46,12 +46,12 @@ public DefaultTyperDeduper(final SqlGenerator<DialectTableDefinition> sqlGenerat
final DestinationHandler<DialectTableDefinition> destinationHandler,
final ParsedCatalog parsedCatalog,
final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator,
final V2RawTableMigrator<DialectTableDefinition> v2RawTableMigrator) {
final V2TableMigrator<DialectTableDefinition> v2TableMigrator) {
this.sqlGenerator = sqlGenerator;
this.destinationHandler = destinationHandler;
this.parsedCatalog = parsedCatalog;
this.v1V2Migrator = v1V2Migrator;
this.v2RawTableMigrator = v2RawTableMigrator;
this.v2TableMigrator = v2TableMigrator;
this.streamsWithSuccesfulSetup = new HashSet<>();
}

Expand All @@ -60,7 +60,7 @@ public DefaultTyperDeduper(
final DestinationHandler<DialectTableDefinition> destinationHandler,
final ParsedCatalog parsedCatalog,
final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator) {
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2RawTableMigrator<>());
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator<>());
}

/**
Expand All @@ -82,7 +82,7 @@ public void prepareTables() throws Exception {
for (final StreamConfig stream : parsedCatalog.streams()) {
// Migrate the Raw Tables if this is the first v2 sync after a v1 sync
v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream);
v2RawTableMigrator.migrateIfNecessary(stream);
v2TableMigrator.migrateIfNecessary(stream);

final Optional<DialectTableDefinition> existingTable = destinationHandler.findExistingTable(stream.id());
if (existingTable.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.airbyte.integrations.base.destination.typing_deduping;

public class NoopV2RawTableMigrator<DialectTableDefinition> implements V2RawTableMigrator<DialectTableDefinition> {
public class NoopV2TableMigrator<DialectTableDefinition> implements V2TableMigrator<DialectTableDefinition> {

@Override
public void migrateIfNecessary(final StreamConfig streamConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

package io.airbyte.integrations.base.destination.typing_deduping;

public interface V2RawTableMigrator<DialectTableDefinition> {
public interface V2TableMigrator<DialectTableDefinition> {

void migrateIfNecessary(final StreamConfig streamConfig) throws InterruptedException;
void migrateIfNecessary(final StreamConfig streamConfig) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.2
LABEL io.airbyte.version=2.0.3
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.0.2
dockerImageTag: 2.0.3
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV1V2Migrator;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2RawTableMigrator;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2TableMigrator;
import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader;
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory;
import io.airbyte.integrations.destination.bigquery.uploader.UploaderType;
Expand Down Expand Up @@ -242,7 +242,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
final TyperDeduper typerDeduper;
parsedCatalog = catalogParser.parseCatalog(catalog);
final BigQueryV1V2Migrator migrator = new BigQueryV1V2Migrator(bigquery, namingResolver);
final BigQueryV2RawTableMigrator v2RawTableMigrator = new BigQueryV2RawTableMigrator(bigquery);
final BigQueryV2TableMigrator v2RawTableMigrator = new BigQueryV2TableMigrator(bigquery);
typerDeduper = new DefaultTyperDeduper<>(
sqlGenerator,
new BigQueryDestinationHandler(bigquery, datasetLocation),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@
import com.google.cloud.bigquery.TableId;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.V2RawTableMigrator;
import io.airbyte.integrations.base.destination.typing_deduping.V2TableMigrator;
import java.util.Map;
import org.apache.commons.text.StringSubstitutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryV2RawTableMigrator implements V2RawTableMigrator<TableDefinition> {
public class BigQueryV2TableMigrator implements V2TableMigrator<TableDefinition> {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryV2RawTableMigrator.class);
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryV2TableMigrator.class);

private final BigQuery bq;

public BigQueryV2RawTableMigrator(final BigQuery bq) {
public BigQueryV2TableMigrator(final BigQuery bq) {
this.bq = bq;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.google.cloud.bigquery.TableResult;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.destination.bigquery.BigQueryDestination;
import io.airbyte.integrations.destination.bigquery.BigQueryDestinationTestUtils;
Expand Down Expand Up @@ -79,6 +80,11 @@ protected void teardownStreamAndNamespace(String streamNamespace, final String s
bq.delete(DatasetId.of(streamNamespace), BigQuery.DatasetDeleteOption.deleteContents());
}

@Override
protected SqlGenerator<?> getSqlGenerator() {
return new BigQuerySqlGenerator(null);
}

/**
* Run a sync using 1.9.0 (which is the highest version that still creates v2 raw tables with JSON
* _airbyte_data). Then run a sync using our current version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV ENABLE_SENTRY true

LABEL io.airbyte.version=3.0.2
LABEL io.airbyte.version=3.1.0
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.0.2
dockerImageTag: 3.1.0
dockerRepository: airbyte/destination-snowflake
githubIssueLabel: destination-snowflake
icon: snowflake.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV1V2Migrator;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV2TableMigrator;
import io.airbyte.integrations.destination.staging.StagingConsumerFactory;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand All @@ -39,7 +40,7 @@
public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingDestination.class);
private static final String RAW_SCHEMA_OVERRIDE = "raw_data_schema";
public static final String RAW_SCHEMA_OVERRIDE = "raw_data_schema";
private final String airbyteEnvironment;

public SnowflakeInternalStagingDestination(final String airbyteEnvironment) {
Expand Down Expand Up @@ -143,7 +144,8 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
}
parsedCatalog = catalogParser.parseCatalog(catalog);
final SnowflakeV1V2Migrator migrator = new SnowflakeV1V2Migrator(getNamingResolver(), database, databaseName);
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator);
final SnowflakeV2TableMigrator v2TableMigrator = new SnowflakeV2TableMigrator(database, databaseName, sqlGenerator, snowflakeDestinationHandler);
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator);

return new StagingConsumerFactory().createAsync(
outputRecordCollector,
Expand Down
Loading