Skip to content

🐛 Destination snowflake: Create final tables with uppercase naming #30056

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ public abstract class BaseSqlGeneratorIntegrationTest<DialectTableDefinition> {
Stream.of("_ab_cdc_deleted_at")).toList();
}

protected static final RecordDiffer DIFFER = new RecordDiffer(
Pair.of("id1", AirbyteProtocolType.INTEGER),
Pair.of("id2", AirbyteProtocolType.INTEGER),
Pair.of("updated_at", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE));
protected RecordDiffer DIFFER;

/**
* Subclasses may use these four StreamConfigs in their tests.
Expand Down Expand Up @@ -118,6 +115,14 @@ public abstract class BaseSqlGeneratorIntegrationTest<DialectTableDefinition> {

protected abstract DestinationHandler<DialectTableDefinition> getDestinationHandler();

/**
* Subclasses should override this method if they need to make changes to the stream ID. For
* example, you could upcase the final table name here.
*/
protected StreamId buildStreamId(final String namespace, final String finalTableName, final String rawTableName) {
return new StreamId(namespace, finalTableName, namespace, rawTableName, namespace, finalTableName);
}

/**
* Do any setup work to create a namespace for this test run. For example, this might create a
* BigQuery dataset, or a Snowflake schema.
Expand Down Expand Up @@ -198,12 +203,17 @@ public void setup() throws Exception {
final LinkedHashMap<ColumnId, AirbyteType> cdcColumns = new LinkedHashMap<>(COLUMNS);
cdcColumns.put(generator.buildColumnId("_ab_cdc_deleted_at"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE);

DIFFER = new RecordDiffer(
Pair.of(id1, AirbyteProtocolType.INTEGER),
Pair.of(id2, AirbyteProtocolType.INTEGER),
Pair.of(cursor, AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE));

namespace = Strings.addRandomSuffix("sql_generator_test", "_", 5);
// This is not a typical stream ID would look like, but SqlGenerator isn't allowed to make any
// assumptions about StreamId structure.
// In practice, the final table would be testDataset.users, and the raw table would be
// airbyte_internal.testDataset_raw__stream_users.
streamId = new StreamId(namespace, "users_final", namespace, "users_raw", namespace, "users_final");
streamId = buildStreamId(namespace, "users_final", "users_raw");

incrementalDedupStream = new StreamConfig(
streamId,
Expand Down Expand Up @@ -483,7 +493,7 @@ public void incrementalDedupNoCursor() throws Exception {
actualFinalRecords);
assertAll(
() -> assertEquals("bar", actualRawRecords.get(0).get("_airbyte_data").get("string").asText()),
() -> assertEquals("bar", actualFinalRecords.get(0).get("string").asText()));
() -> assertEquals("bar", actualFinalRecords.get(0).get(generator.buildColumnId("string").name()).asText()));
}

@Test
Expand Down Expand Up @@ -791,15 +801,10 @@ public void weirdColumnNames() throws Exception {
public void noCrashOnSpecialCharacters(final String specialChars) throws Exception {
final String str = namespace + "_" + specialChars;
final StreamId originalStreamId = generator.buildStreamId(str, str, "unused");
final StreamId modifiedStreamId = new StreamId(
final StreamId modifiedStreamId = buildStreamId(
originalStreamId.finalNamespace(),
originalStreamId.finalName(),
// hack for testing simplicity: put the raw tables in the final namespace. This makes cleanup
// easier.
originalStreamId.finalNamespace(),
"raw_table",
null,
null);
"raw_table");
final ColumnId columnId = generator.buildColumnId(str);
try {
createNamespace(modifiedStreamId.finalNamespace());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,7 @@ public abstract class BaseTypingDedupingTest {
throw new RuntimeException(e);
}
}
private static final RecordDiffer DIFFER = new RecordDiffer(
Pair.of("id1", AirbyteProtocolType.INTEGER),
Pair.of("id2", AirbyteProtocolType.INTEGER),
Pair.of("updated_at", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE),
Pair.of("old_cursor", AirbyteProtocolType.INTEGER));
private RecordDiffer DIFFER;

private String randomSuffix;
private JsonNode config;
Expand Down Expand Up @@ -112,7 +108,8 @@ public abstract class BaseTypingDedupingTest {
/**
* For a given stream, return the records that exist in the destination's final table. Each record
* must be in the format {"_airbyte_raw_id": "...", "_airbyte_extracted_at": "...", "_airbyte_meta":
* {...}, "field1": ..., "field2": ..., ...}.
* {...}, "field1": ..., "field2": ..., ...}. If the destination renames (e.g. upcases) the airbyte
* fields, this method must revert that naming to use the exact strings "_airbyte_raw_id", etc.
* <p>
* For JSON-valued columns, there is some nuance: a SQL null should be represented as a missing
* entry, whereas a JSON null should be represented as a
Expand All @@ -137,6 +134,8 @@ public abstract class BaseTypingDedupingTest {
*/
protected abstract void teardownStreamAndNamespace(String streamNamespace, String streamName) throws Exception;

protected abstract SqlGenerator<?> getSqlGenerator();

/**
* Destinations which need to clean up resources after an entire test finishes should override this
* method. For example, if you want to gracefully close a database connection, you should do that
Expand Down Expand Up @@ -164,6 +163,14 @@ public void setup() throws Exception {
streamNamespace = "typing_deduping_test" + getUniqueSuffix();
streamName = "test_stream" + getUniqueSuffix();
streamsToTearDown = new ArrayList<>();

final SqlGenerator<?> generator = getSqlGenerator();
DIFFER = new RecordDiffer(
Pair.of(generator.buildColumnId("id1"), AirbyteProtocolType.INTEGER),
Pair.of(generator.buildColumnId("id2"), AirbyteProtocolType.INTEGER),
Pair.of(generator.buildColumnId("updated_at"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE),
Pair.of(generator.buildColumnId("old_cursor"), AirbyteProtocolType.INTEGER));

LOGGER.info("Using stream namespace {} and name {}", streamNamespace, streamName);
}

Expand Down Expand Up @@ -409,7 +416,7 @@ public void testIncrementalSyncDropOneColumn() throws Exception {
// The raw data is unaffected by the schema, but the final table should not have a `name` column.
final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl").stream()
.peek(record -> ((ObjectNode) record).remove("name"))
.peek(record -> ((ObjectNode) record).remove(getSqlGenerator().buildColumnId("name").name()))
.toList();
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,39 @@
*/
public class RecordDiffer {

private final Comparator<JsonNode> recordIdentityComparator;
private final Comparator<JsonNode> recordSortComparator;
private final Function<JsonNode, String> recordIdentityExtractor;
private final Comparator<JsonNode> rawRecordIdentityComparator;
private final Comparator<JsonNode> rawRecordSortComparator;
private final Function<JsonNode, String> rawRecordIdentityExtractor;

private final Comparator<JsonNode> finalRecordIdentityComparator;
private final Comparator<JsonNode> finalRecordSortComparator;
private final Function<JsonNode, String> finalRecordIdentityExtractor;

/**
* @param identifyingColumns Which fields constitute a unique record (typically PK+cursor). Do _not_
* include extracted_at; it is handled automatically.
*/
@SafeVarargs
public RecordDiffer(final Pair<String, AirbyteType>... identifyingColumns) {
this.recordIdentityComparator = buildIdentityComparator(identifyingColumns);
this.recordSortComparator = recordIdentityComparator.thenComparing(record -> asString(record.get("_airbyte_raw_id")));
this.recordIdentityExtractor = buildIdentityExtractor(identifyingColumns);
public RecordDiffer(final Pair<ColumnId, AirbyteType>... identifyingColumns) {
final Pair<String, AirbyteType>[] rawTableIdentifyingColumns = Arrays.stream(identifyingColumns)
.map(p -> Pair.of(
// Raw tables always retain the original column names
p.getLeft().originalName(),
p.getRight()))
.toArray(Pair[]::new);
this.rawRecordIdentityComparator = buildIdentityComparator(rawTableIdentifyingColumns);
this.rawRecordSortComparator = rawRecordIdentityComparator.thenComparing(record -> asString(record.get("_airbyte_raw_id")));
this.rawRecordIdentityExtractor = buildIdentityExtractor(rawTableIdentifyingColumns);

final Pair<String, AirbyteType>[] finalTableIdentifyingColumns = Arrays.stream(identifyingColumns)
.map(p -> Pair.of(
// Final tables may have modified the column names, so use the final name here.
p.getLeft().name(),
p.getRight()))
.toArray(Pair[]::new);
this.finalRecordIdentityComparator = buildIdentityComparator(finalTableIdentifyingColumns);
this.finalRecordSortComparator = finalRecordIdentityComparator.thenComparing(record -> asString(record.get("_airbyte_raw_id")));
this.finalRecordIdentityExtractor = buildIdentityExtractor(finalTableIdentifyingColumns);
}

/**
Expand All @@ -70,9 +90,9 @@ public void diffRawTableRecords(final List<JsonNode> expectedRecords, final List
final String diff = diffRecords(
expectedRecords.stream().map(RecordDiffer::copyWithLiftedData).collect(toList()),
actualRecords.stream().map(RecordDiffer::copyWithLiftedData).collect(toList()),
recordIdentityComparator,
recordSortComparator,
recordIdentityExtractor);
rawRecordIdentityComparator,
rawRecordSortComparator,
rawRecordIdentityExtractor);

if (!diff.isEmpty()) {
fail("Raw table was incorrect.\n" + diff);
Expand All @@ -83,9 +103,9 @@ public void diffFinalTableRecords(final List<JsonNode> expectedRecords, final Li
final String diff = diffRecords(
expectedRecords,
actualRecords,
recordIdentityComparator,
recordSortComparator,
recordIdentityExtractor);
finalRecordIdentityComparator,
finalRecordSortComparator,
finalRecordIdentityExtractor);

if (!diff.isEmpty()) {
fail("Final table was incorrect.\n" + diff);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.google.cloud.bigquery.TableResult;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.destination.bigquery.BigQueryDestination;
import io.airbyte.integrations.destination.bigquery.BigQueryDestinationTestUtils;
Expand Down Expand Up @@ -79,6 +80,11 @@ protected void teardownStreamAndNamespace(String streamNamespace, final String s
bq.delete(DatasetId.of(streamNamespace), BigQuery.DatasetDeleteOption.deleteContents());
}

@Override
protected SqlGenerator<?> getSqlGenerator() {
return new BigQuerySqlGenerator(null);
}

/**
* Run a sync using 1.9.0 (which is the highest version that still creates v2 raw tables with JSON
* _airbyte_data). Then run a sync using our current version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public Optional<SnowflakeTableDefinition> findExistingTable(final StreamId id) t
ORDER BY ordinal_position;
""",
databaseName.toUpperCase(),
id.finalNamespace(),
id.finalName()).stream()
id.finalNamespace().toUpperCase(),
id.finalName().toUpperCase()).stream()
.collect(LinkedHashMap::new,
(map, row) -> map.put(row.get("COLUMN_NAME").asText(), row.get("DATA_TYPE").asText()),
LinkedHashMap::putAll);
Expand All @@ -65,8 +65,8 @@ public boolean isFinalTableEmpty(final StreamId id) throws SQLException {
AND table_name = ?
""",
databaseName.toUpperCase(),
id.finalNamespace(),
id.finalName());
id.finalNamespace().toUpperCase(),
id.finalName().toUpperCase());
return rowCount == 0;
}

Expand Down
Loading