diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/build.gradle b/airbyte-integrations/connectors/destination-bigquery-denormalized/build.gradle index b75143d4763a0..2e18cc08f484e 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/build.gradle @@ -24,6 +24,7 @@ dependencies { integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-bigquery-denormalized') integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs) + integrationTestJavaImplementation project(':airbyte-db:lib') implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/JsonSchemaFormat.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/JsonSchemaFormat.java index 24a02ae541261..c8b8402ef062b 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/JsonSchemaFormat.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/JsonSchemaFormat.java @@ -5,32 +5,58 @@ package io.airbyte.integrations.destination.bigquery; import com.google.cloud.bigquery.StandardSQLTypeName; +import java.util.Arrays; +import java.util.List; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Mapping of JsonSchema formats to BigQuery Standard SQL types. */ public enum JsonSchemaFormat { - DATE("date", StandardSQLTypeName.DATE), - DATETIME("date-time", StandardSQLTypeName.DATETIME), - TIME("time", StandardSQLTypeName.TIME), - TIMESTAMP("timestamp-micros", StandardSQLTypeName.TIMESTAMP); + DATE("date", null, StandardSQLTypeName.DATE), + DATETIME("date-time", null, StandardSQLTypeName.DATETIME), + DATETIME_WITH_TZ("date-time", "timestamp_with_timezone", StandardSQLTypeName.TIMESTAMP), + TIME("time", null, StandardSQLTypeName.TIME), + TIMESTAMP("timestamp-micros", null, StandardSQLTypeName.TIMESTAMP); + private static final Logger LOGGER = LoggerFactory.getLogger(JsonSchemaFormat.class); private final String jsonSchemaFormat; + private final String jsonSchemaAirbyteType; private final StandardSQLTypeName bigQueryType; - JsonSchemaFormat(final String jsonSchemaFormat, final StandardSQLTypeName bigQueryType) { + JsonSchemaFormat(final String jsonSchemaFormat, final String jsonSchemaAirbyteType, final StandardSQLTypeName bigQueryType) { + this.jsonSchemaAirbyteType = jsonSchemaAirbyteType; this.jsonSchemaFormat = jsonSchemaFormat; this.bigQueryType = bigQueryType; } - public static JsonSchemaFormat fromJsonSchemaFormat(final String value) { - for (final JsonSchemaFormat type : values()) { - if (value.equals(type.jsonSchemaFormat)) { - return type; - } + public static JsonSchemaFormat fromJsonSchemaFormat(final @Nonnull String jsonSchemaFormat, final @Nullable String jsonSchemaAirbyteType) { + List matchFormats = null; + // Match by Format + Type + if (jsonSchemaAirbyteType != null) { + matchFormats = Arrays.stream(values()) + .filter(format -> jsonSchemaFormat.equals(format.jsonSchemaFormat) && jsonSchemaAirbyteType.equals(format.jsonSchemaAirbyteType)).toList(); + } + + // Match by Format are no results already + if (matchFormats == null || matchFormats.isEmpty()) { + matchFormats = + Arrays.stream(values()).filter(format -> jsonSchemaFormat.equals(format.jsonSchemaFormat) && format.jsonSchemaAirbyteType == null).toList(); + } + + if (matchFormats.isEmpty()) { + return null; + } else if (matchFormats.size() > 1) { + throw new RuntimeException( + "Match with more than one json format! Matched formats : " + matchFormats + ", Inputs jsonSchemaFormat : " + jsonSchemaFormat + + ", jsonSchemaAirbyteType : " + jsonSchemaAirbyteType); + } else { + return matchFormats.get(0); } - return null; } public String getJsonSchemaFormat() { diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java index 72f32b1cbbbc9..9f68c6e17f5f9 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java @@ -49,6 +49,7 @@ public class DefaultBigQueryDenormalizedRecordFormatter extends DefaultBigQueryR private static final String ANY_OF_FIELD = "anyOf"; private static final String ARRAY_ITEMS_FIELD = "items"; private static final String FORMAT_FIELD = "format"; + private static final String AIRBYTE_TYPE = "airbyte_type"; private static final String REF_DEFINITION_KEY = "$ref"; private static final ObjectMapper mapper = new ObjectMapper(); @@ -84,8 +85,9 @@ private List findArrays(final JsonNode node) { if (type.isArray()) { final ArrayNode typeNode = (ArrayNode) type; for (final JsonNode arrayTypeNode : typeNode) { - if (arrayTypeNode.isTextual() && arrayTypeNode.textValue().equals("array")) + if (arrayTypeNode.isTextual() && arrayTypeNode.textValue().equals("array")) { return true; + } } } else if (type.isTextual()) { return jsonNode.asText().equals("array"); @@ -236,10 +238,10 @@ private List getSchemaFields(final StandardNameTransformer namingResolver /** * @param properties - JSON schema with properties - * + *

* The method is responsible for population of fieldsContainRefDefinitionValue set with keys * contain $ref definition - * + *

* Currently, AirByte doesn't support parsing value by $ref key definition. The issue to * track this 7725 */ @@ -346,8 +348,10 @@ private static Builder getField(final StandardNameTransformer namingResolver, fi // If a specific format is defined, use their specific type instead of the JSON's one final JsonNode fieldFormat = updatedFileDefinition.get(FORMAT_FIELD); + final JsonNode airbyteType = updatedFileDefinition.get(AIRBYTE_TYPE); if (fieldFormat != null) { - final JsonSchemaFormat schemaFormat = JsonSchemaFormat.fromJsonSchemaFormat(fieldFormat.asText()); + final JsonSchemaFormat schemaFormat = JsonSchemaFormat.fromJsonSchemaFormat(fieldFormat.asText(), + (airbyteType != null ? airbyteType.asText() : null)); if (schemaFormat != null) { builder.setType(schemaFormat.getBigQueryType()); } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java index b80a1cdebf9c7..a40f7cf0dd180 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java @@ -11,6 +11,7 @@ import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.ConnectionProperty; import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; @@ -23,15 +24,18 @@ import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.TableResult; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; +import com.google.common.collect.Streams; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.string.Strings; +import io.airbyte.db.bigquery.BigQueryResultSet; +import io.airbyte.db.bigquery.BigQuerySourceOperations; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.StandardNameTransformer; import io.airbyte.integrations.standardtest.destination.DataArgumentsProvider; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; +import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; @@ -42,13 +46,12 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; +import java.util.TimeZone; import java.util.UUID; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import org.apache.commons.lang3.tuple.ImmutablePair; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; @@ -110,6 +113,27 @@ protected Optional getNameTransformer() { return Optional.of(NAME_TRANSFORMER); } + @Override + protected TestDataComparator getTestDataComparator() { + return new BigQueryDenormalizedTestDataComparator(); + } + + @Override + protected boolean supportBasicDataTypeTest() { + return true; + } + + // #13154 Normalization issue + @Override + protected boolean supportArrayDataTypeTest() { + return false; + } + + @Override + protected boolean supportObjectDataTypeTest() { + return true; + } + @Override protected void assertNamespaceNormalization(final String testCaseId, final String expectedNormalizedNamespace, @@ -143,42 +167,29 @@ protected List retrieveRecords(final TestDestinationEnv env, final String namespace, final JsonNode streamSchema) throws Exception { - return new ArrayList<>(retrieveRecordsFromTable(namingResolver.getIdentifier(streamName), namingResolver.getIdentifier(namespace))); - } - - @Override - protected List resolveIdentifier(final String identifier) { - final List result = new ArrayList<>(); - result.add(identifier); - result.add(namingResolver.getIdentifier(identifier)); - return result; + final String tableName = namingResolver.getIdentifier(streamName); + final String schema = namingResolver.getIdentifier(namespace); + return retrieveRecordsFromTable(tableName, schema); } private List retrieveRecordsFromTable(final String tableName, final String schema) throws InterruptedException { + TimeZone.setDefault(TimeZone.getTimeZone("UTC")); + final QueryJobConfiguration queryConfig = QueryJobConfiguration .newBuilder( String.format("SELECT * FROM `%s`.`%s` order by %s asc;", schema, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) - .setUseLegacySql(false).build(); + // .setUseLegacySql(false) + .setConnectionProperties(Collections.singletonList(ConnectionProperty.of("time_zone", "UTC"))) + .build(); final TableResult queryResults = executeQuery(bigquery, queryConfig).getLeft().getQueryResults(); final FieldList fields = queryResults.getSchema().getFields(); + BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations(); - return StreamSupport - .stream(queryResults.iterateAll().spliterator(), false) - .map(row -> { - final Map jsonMap = Maps.newHashMap(); - for (final Field field : fields) { - final Object value = getTypedFieldValue(row, field); - if (!isAirbyteColumn(field.getName()) && value != null) { - jsonMap.put(field.getName(), value); - } - } - return jsonMap; - }) - .map(Jsons::jsonNode) - .collect(Collectors.toList()); + return Streams.stream(queryResults.iterateAll()) + .map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList()); } private boolean isAirbyteColumn(final String name) { diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedTestDataComparator.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedTestDataComparator.java new file mode 100644 index 0000000000000..6b12c746268d5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedTestDataComparator.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.bigquery; + +import io.airbyte.integrations.destination.StandardNameTransformer; +import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BigQueryDenormalizedTestDataComparator extends AdvancedTestDataComparator { + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedTestDataComparator.class); + private static final String BIGQUERY_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'"; + + private final StandardNameTransformer namingResolver = new StandardNameTransformer(); + + @Override + protected List resolveIdentifier(final String identifier) { + final List result = new ArrayList<>(); + result.add(identifier); + result.add(namingResolver.getIdentifier(identifier)); + return result; + } + + private LocalDate parseDate(String dateValue) { + if (dateValue != null) { + var format = (dateValue.matches(".+Z") ? BIGQUERY_DATETIME_FORMAT : AIRBYTE_DATE_FORMAT); + return LocalDate.parse(dateValue, DateTimeFormatter.ofPattern(format)); + } else { + return null; + } + } + + private LocalDateTime parseDateTime(String dateTimeValue) { + if (dateTimeValue != null) { + var format = (dateTimeValue.matches(".+Z") ? BIGQUERY_DATETIME_FORMAT : AIRBYTE_DATETIME_FORMAT); + return LocalDateTime.parse(dateTimeValue, DateTimeFormatter.ofPattern(format)); + } else { + return null; + } + } + + @Override + protected boolean compareDateTimeValues(String expectedValue, String actualValue) { + var destinationDate = parseDateTime(actualValue); + var expectedDate = LocalDateTime.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT)); + if (expectedDate.isBefore(getBrokenDate().toLocalDateTime())) { + LOGGER + .warn("Validation is skipped due to known Normalization issue. Values older then 1583 year and with time zone stored wrongly(lose days)."); + return true; + } else { + return expectedDate.equals(destinationDate); + } + } + + @Override + protected boolean compareDateValues(String expectedValue, String actualValue) { + var destinationDate = parseDate(actualValue); + var expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATE_FORMAT)); + return expectedDate.equals(destinationDate); + } + + @Override + protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) { + return ZonedDateTime.of(LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(BIGQUERY_DATETIME_FORMAT)), ZoneOffset.UTC); + } + + @Override + protected boolean compareDateTimeWithTzValues(String airbyteMessageValue, String destinationValue) { + // #13123 Normalization issue + if (parseDestinationDateWithTz(destinationValue).isBefore(getBrokenDate())) { + LOGGER + .warn("Validation is skipped due to known Normalization issue. Values older then 1583 year and with time zone stored wrongly(lose days)."); + return true; + } else { + return super.compareDateTimeWithTzValues(airbyteMessageValue, destinationValue); + } + } + + // #13123 Normalization issue + private ZonedDateTime getBrokenDate() { + return ZonedDateTime.of(1583, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + } +}