Skip to content

🐛 Destination BigQuery Denormalized : fix datetime with timezone #13286

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-bigquery-denormalized')
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
integrationTestJavaImplementation project(':airbyte-db:lib')

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,58 @@
package io.airbyte.integrations.destination.bigquery;

import com.google.cloud.bigquery.StandardSQLTypeName;
import java.util.Arrays;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Mapping of JsonSchema formats to BigQuery Standard SQL types.
*/
public enum JsonSchemaFormat {

DATE("date", StandardSQLTypeName.DATE),
DATETIME("date-time", StandardSQLTypeName.DATETIME),
TIME("time", StandardSQLTypeName.TIME),
TIMESTAMP("timestamp-micros", StandardSQLTypeName.TIMESTAMP);
DATE("date", null, StandardSQLTypeName.DATE),
DATETIME("date-time", null, StandardSQLTypeName.DATETIME),
DATETIME_WITH_TZ("date-time", "timestamp_with_timezone", StandardSQLTypeName.TIMESTAMP),
TIME("time", null, StandardSQLTypeName.TIME),
TIMESTAMP("timestamp-micros", null, StandardSQLTypeName.TIMESTAMP);

private static final Logger LOGGER = LoggerFactory.getLogger(JsonSchemaFormat.class);
private final String jsonSchemaFormat;
private final String jsonSchemaAirbyteType;
private final StandardSQLTypeName bigQueryType;

JsonSchemaFormat(final String jsonSchemaFormat, final StandardSQLTypeName bigQueryType) {
JsonSchemaFormat(final String jsonSchemaFormat, final String jsonSchemaAirbyteType, final StandardSQLTypeName bigQueryType) {
this.jsonSchemaAirbyteType = jsonSchemaAirbyteType;
this.jsonSchemaFormat = jsonSchemaFormat;
this.bigQueryType = bigQueryType;
}

public static JsonSchemaFormat fromJsonSchemaFormat(final String value) {
for (final JsonSchemaFormat type : values()) {
if (value.equals(type.jsonSchemaFormat)) {
return type;
}
public static JsonSchemaFormat fromJsonSchemaFormat(final @Nonnull String jsonSchemaFormat, final @Nullable String jsonSchemaAirbyteType) {
List<JsonSchemaFormat> matchFormats = null;
// Match by Format + Type
if (jsonSchemaAirbyteType != null) {
matchFormats = Arrays.stream(values())
.filter(format -> jsonSchemaFormat.equals(format.jsonSchemaFormat) && jsonSchemaAirbyteType.equals(format.jsonSchemaAirbyteType)).toList();
}

// Match by Format are no results already
if (matchFormats == null || matchFormats.isEmpty()) {
matchFormats =
Arrays.stream(values()).filter(format -> jsonSchemaFormat.equals(format.jsonSchemaFormat) && format.jsonSchemaAirbyteType == null).toList();
}

if (matchFormats.isEmpty()) {
return null;
} else if (matchFormats.size() > 1) {
throw new RuntimeException(
"Match with more than one json format! Matched formats : " + matchFormats + ", Inputs jsonSchemaFormat : " + jsonSchemaFormat
+ ", jsonSchemaAirbyteType : " + jsonSchemaAirbyteType);
} else {
return matchFormats.get(0);
}
return null;
}

public String getJsonSchemaFormat() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class DefaultBigQueryDenormalizedRecordFormatter extends DefaultBigQueryR
private static final String ANY_OF_FIELD = "anyOf";
private static final String ARRAY_ITEMS_FIELD = "items";
private static final String FORMAT_FIELD = "format";
private static final String AIRBYTE_TYPE = "airbyte_type";
private static final String REF_DEFINITION_KEY = "$ref";
private static final ObjectMapper mapper = new ObjectMapper();

Expand Down Expand Up @@ -84,8 +85,9 @@ private List<JsonNode> findArrays(final JsonNode node) {
if (type.isArray()) {
final ArrayNode typeNode = (ArrayNode) type;
for (final JsonNode arrayTypeNode : typeNode) {
if (arrayTypeNode.isTextual() && arrayTypeNode.textValue().equals("array"))
if (arrayTypeNode.isTextual() && arrayTypeNode.textValue().equals("array")) {
return true;
}
}
} else if (type.isTextual()) {
return jsonNode.asText().equals("array");
Expand Down Expand Up @@ -236,10 +238,10 @@ private List<Field> getSchemaFields(final StandardNameTransformer namingResolver

/**
* @param properties - JSON schema with properties
*
* <p>
* The method is responsible for population of fieldsContainRefDefinitionValue set with keys
* contain $ref definition
*
* <p>
* Currently, AirByte doesn't support parsing value by $ref key definition. The issue to
* track this <a href="https://github.com/airbytehq/airbyte/issues/7725">7725</a>
*/
Expand Down Expand Up @@ -346,8 +348,10 @@ private static Builder getField(final StandardNameTransformer namingResolver, fi

// If a specific format is defined, use their specific type instead of the JSON's one
final JsonNode fieldFormat = updatedFileDefinition.get(FORMAT_FIELD);
final JsonNode airbyteType = updatedFileDefinition.get(AIRBYTE_TYPE);
if (fieldFormat != null) {
final JsonSchemaFormat schemaFormat = JsonSchemaFormat.fromJsonSchemaFormat(fieldFormat.asText());
final JsonSchemaFormat schemaFormat = JsonSchemaFormat.fromJsonSchemaFormat(fieldFormat.asText(),
(airbyteType != null ? airbyteType.asText() : null));
if (schemaFormat != null) {
builder.setType(schemaFormat.getBigQueryType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.ConnectionProperty;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
Expand All @@ -23,15 +24,18 @@
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.bigquery.BigQueryResultSet;
import io.airbyte.db.bigquery.BigQuerySourceOperations;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.standardtest.destination.DataArgumentsProvider;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand All @@ -42,13 +46,12 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
Expand Down Expand Up @@ -110,6 +113,27 @@ protected Optional<NamingConventionTransformer> getNameTransformer() {
return Optional.of(NAME_TRANSFORMER);
}

@Override
protected TestDataComparator getTestDataComparator() {
return new BigQueryDenormalizedTestDataComparator();
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

// #13154 Normalization issue
@Override
protected boolean supportArrayDataTypeTest() {
return false;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected void assertNamespaceNormalization(final String testCaseId,
final String expectedNormalizedNamespace,
Expand Down Expand Up @@ -143,42 +167,29 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
final String namespace,
final JsonNode streamSchema)
throws Exception {
return new ArrayList<>(retrieveRecordsFromTable(namingResolver.getIdentifier(streamName), namingResolver.getIdentifier(namespace)));
}

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
result.add(identifier);
result.add(namingResolver.getIdentifier(identifier));
return result;
final String tableName = namingResolver.getIdentifier(streamName);
final String schema = namingResolver.getIdentifier(namespace);
return retrieveRecordsFromTable(tableName, schema);
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schema) throws InterruptedException {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));

final QueryJobConfiguration queryConfig =
QueryJobConfiguration
.newBuilder(
String.format("SELECT * FROM `%s`.`%s` order by %s asc;", schema, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.setUseLegacySql(false).build();
// .setUseLegacySql(false)
.setConnectionProperties(Collections.singletonList(ConnectionProperty.of("time_zone", "UTC")))
.build();

final TableResult queryResults = executeQuery(bigquery, queryConfig).getLeft().getQueryResults();
final FieldList fields = queryResults.getSchema().getFields();
BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();

return StreamSupport
.stream(queryResults.iterateAll().spliterator(), false)
.map(row -> {
final Map<String, Object> jsonMap = Maps.newHashMap();
for (final Field field : fields) {
final Object value = getTypedFieldValue(row, field);
if (!isAirbyteColumn(field.getName()) && value != null) {
jsonMap.put(field.getName(), value);
}
}
return jsonMap;
})
.map(Jsons::jsonNode)
.collect(Collectors.toList());
return Streams.stream(queryResults.iterateAll())
.map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList());
}

private boolean isAirbyteColumn(final String name) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery;

import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryDenormalizedTestDataComparator extends AdvancedTestDataComparator {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedTestDataComparator.class);
private static final String BIGQUERY_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";

private final StandardNameTransformer namingResolver = new StandardNameTransformer();

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
result.add(identifier);
result.add(namingResolver.getIdentifier(identifier));
return result;
}

private LocalDate parseDate(String dateValue) {
if (dateValue != null) {
var format = (dateValue.matches(".+Z") ? BIGQUERY_DATETIME_FORMAT : AIRBYTE_DATE_FORMAT);
return LocalDate.parse(dateValue, DateTimeFormatter.ofPattern(format));
} else {
return null;
}
}

private LocalDateTime parseDateTime(String dateTimeValue) {
if (dateTimeValue != null) {
var format = (dateTimeValue.matches(".+Z") ? BIGQUERY_DATETIME_FORMAT : AIRBYTE_DATETIME_FORMAT);
return LocalDateTime.parse(dateTimeValue, DateTimeFormatter.ofPattern(format));
} else {
return null;
}
}

@Override
protected boolean compareDateTimeValues(String expectedValue, String actualValue) {
var destinationDate = parseDateTime(actualValue);
var expectedDate = LocalDateTime.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT));
if (expectedDate.isBefore(getBrokenDate().toLocalDateTime())) {
LOGGER
.warn("Validation is skipped due to known Normalization issue. Values older then 1583 year and with time zone stored wrongly(lose days).");
return true;
} else {
return expectedDate.equals(destinationDate);
}
}

@Override
protected boolean compareDateValues(String expectedValue, String actualValue) {
var destinationDate = parseDate(actualValue);
var expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATE_FORMAT));
return expectedDate.equals(destinationDate);
}

@Override
protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) {
return ZonedDateTime.of(LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(BIGQUERY_DATETIME_FORMAT)), ZoneOffset.UTC);
}

@Override
protected boolean compareDateTimeWithTzValues(String airbyteMessageValue, String destinationValue) {
// #13123 Normalization issue
if (parseDestinationDateWithTz(destinationValue).isBefore(getBrokenDate())) {
LOGGER
.warn("Validation is skipped due to known Normalization issue. Values older then 1583 year and with time zone stored wrongly(lose days).");
return true;
} else {
return super.compareDateTimeWithTzValues(airbyteMessageValue, destinationValue);
}
}

// #13123 Normalization issue
private ZonedDateTime getBrokenDate() {
return ZonedDateTime.of(1583, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC);
}
}