Skip to content

Commit 2351396

Browse files
authored
🐛 Destination BigQuery Denormalized : fix datetime with timezone (#13286)
* Enable DAT * google format * handle DateTime with Timezone as Timestamp * format * disable "old date" case due to known BigQuery issue * disable array test case due to known BigQuery issue
1 parent 1dbd7ab commit 2351396

File tree

5 files changed

+178
-43
lines changed

5 files changed

+178
-43
lines changed

airbyte-integrations/connectors/destination-bigquery-denormalized/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ dependencies {
2424
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
2525
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-bigquery-denormalized')
2626
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
27+
integrationTestJavaImplementation project(':airbyte-db:lib')
2728

2829
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
2930
}

airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/JsonSchemaFormat.java

+37-11
Original file line numberDiff line numberDiff line change
@@ -5,32 +5,58 @@
55
package io.airbyte.integrations.destination.bigquery;
66

77
import com.google.cloud.bigquery.StandardSQLTypeName;
8+
import java.util.Arrays;
9+
import java.util.List;
10+
import javax.annotation.Nonnull;
11+
import javax.annotation.Nullable;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
814

915
/**
1016
* Mapping of JsonSchema formats to BigQuery Standard SQL types.
1117
*/
1218
public enum JsonSchemaFormat {
1319

14-
DATE("date", StandardSQLTypeName.DATE),
15-
DATETIME("date-time", StandardSQLTypeName.DATETIME),
16-
TIME("time", StandardSQLTypeName.TIME),
17-
TIMESTAMP("timestamp-micros", StandardSQLTypeName.TIMESTAMP);
20+
DATE("date", null, StandardSQLTypeName.DATE),
21+
DATETIME("date-time", null, StandardSQLTypeName.DATETIME),
22+
DATETIME_WITH_TZ("date-time", "timestamp_with_timezone", StandardSQLTypeName.TIMESTAMP),
23+
TIME("time", null, StandardSQLTypeName.TIME),
24+
TIMESTAMP("timestamp-micros", null, StandardSQLTypeName.TIMESTAMP);
1825

26+
private static final Logger LOGGER = LoggerFactory.getLogger(JsonSchemaFormat.class);
1927
private final String jsonSchemaFormat;
28+
private final String jsonSchemaAirbyteType;
2029
private final StandardSQLTypeName bigQueryType;
2130

22-
JsonSchemaFormat(final String jsonSchemaFormat, final StandardSQLTypeName bigQueryType) {
31+
JsonSchemaFormat(final String jsonSchemaFormat, final String jsonSchemaAirbyteType, final StandardSQLTypeName bigQueryType) {
32+
this.jsonSchemaAirbyteType = jsonSchemaAirbyteType;
2333
this.jsonSchemaFormat = jsonSchemaFormat;
2434
this.bigQueryType = bigQueryType;
2535
}
2636

27-
public static JsonSchemaFormat fromJsonSchemaFormat(final String value) {
28-
for (final JsonSchemaFormat type : values()) {
29-
if (value.equals(type.jsonSchemaFormat)) {
30-
return type;
31-
}
37+
public static JsonSchemaFormat fromJsonSchemaFormat(final @Nonnull String jsonSchemaFormat, final @Nullable String jsonSchemaAirbyteType) {
38+
List<JsonSchemaFormat> matchFormats = null;
39+
// Match by Format + Type
40+
if (jsonSchemaAirbyteType != null) {
41+
matchFormats = Arrays.stream(values())
42+
.filter(format -> jsonSchemaFormat.equals(format.jsonSchemaFormat) && jsonSchemaAirbyteType.equals(format.jsonSchemaAirbyteType)).toList();
43+
}
44+
45+
// Match by Format are no results already
46+
if (matchFormats == null || matchFormats.isEmpty()) {
47+
matchFormats =
48+
Arrays.stream(values()).filter(format -> jsonSchemaFormat.equals(format.jsonSchemaFormat) && format.jsonSchemaAirbyteType == null).toList();
49+
}
50+
51+
if (matchFormats.isEmpty()) {
52+
return null;
53+
} else if (matchFormats.size() > 1) {
54+
throw new RuntimeException(
55+
"Match with more than one json format! Matched formats : " + matchFormats + ", Inputs jsonSchemaFormat : " + jsonSchemaFormat
56+
+ ", jsonSchemaAirbyteType : " + jsonSchemaAirbyteType);
57+
} else {
58+
return matchFormats.get(0);
3259
}
33-
return null;
3460
}
3561

3662
public String getJsonSchemaFormat() {

airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class DefaultBigQueryDenormalizedRecordFormatter extends DefaultBigQueryR
4949
private static final String ANY_OF_FIELD = "anyOf";
5050
private static final String ARRAY_ITEMS_FIELD = "items";
5151
private static final String FORMAT_FIELD = "format";
52+
private static final String AIRBYTE_TYPE = "airbyte_type";
5253
private static final String REF_DEFINITION_KEY = "$ref";
5354
private static final ObjectMapper mapper = new ObjectMapper();
5455

@@ -84,8 +85,9 @@ private List<JsonNode> findArrays(final JsonNode node) {
8485
if (type.isArray()) {
8586
final ArrayNode typeNode = (ArrayNode) type;
8687
for (final JsonNode arrayTypeNode : typeNode) {
87-
if (arrayTypeNode.isTextual() && arrayTypeNode.textValue().equals("array"))
88+
if (arrayTypeNode.isTextual() && arrayTypeNode.textValue().equals("array")) {
8889
return true;
90+
}
8991
}
9092
} else if (type.isTextual()) {
9193
return jsonNode.asText().equals("array");
@@ -236,10 +238,10 @@ private List<Field> getSchemaFields(final StandardNameTransformer namingResolver
236238

237239
/**
238240
* @param properties - JSON schema with properties
239-
*
241+
* <p>
240242
* The method is responsible for population of fieldsContainRefDefinitionValue set with keys
241243
* contain $ref definition
242-
*
244+
* <p>
243245
* Currently, AirByte doesn't support parsing value by $ref key definition. The issue to
244246
* track this <a href="https://github.com/airbytehq/airbyte/issues/7725">7725</a>
245247
*/
@@ -346,8 +348,10 @@ private static Builder getField(final StandardNameTransformer namingResolver, fi
346348

347349
// If a specific format is defined, use their specific type instead of the JSON's one
348350
final JsonNode fieldFormat = updatedFileDefinition.get(FORMAT_FIELD);
351+
final JsonNode airbyteType = updatedFileDefinition.get(AIRBYTE_TYPE);
349352
if (fieldFormat != null) {
350-
final JsonSchemaFormat schemaFormat = JsonSchemaFormat.fromJsonSchemaFormat(fieldFormat.asText());
353+
final JsonSchemaFormat schemaFormat = JsonSchemaFormat.fromJsonSchemaFormat(fieldFormat.asText(),
354+
(airbyteType != null ? airbyteType.asText() : null));
351355
if (schemaFormat != null) {
352356
builder.setType(schemaFormat.getBigQueryType());
353357
}

airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java

+39-28
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.google.auth.oauth2.ServiceAccountCredentials;
1212
import com.google.cloud.bigquery.BigQuery;
1313
import com.google.cloud.bigquery.BigQueryOptions;
14+
import com.google.cloud.bigquery.ConnectionProperty;
1415
import com.google.cloud.bigquery.Dataset;
1516
import com.google.cloud.bigquery.DatasetInfo;
1617
import com.google.cloud.bigquery.Field;
@@ -23,15 +24,18 @@
2324
import com.google.cloud.bigquery.QueryJobConfiguration;
2425
import com.google.cloud.bigquery.TableResult;
2526
import com.google.common.collect.ImmutableMap;
26-
import com.google.common.collect.Maps;
27+
import com.google.common.collect.Streams;
2728
import io.airbyte.commons.json.Jsons;
2829
import io.airbyte.commons.resources.MoreResources;
2930
import io.airbyte.commons.string.Strings;
31+
import io.airbyte.db.bigquery.BigQueryResultSet;
32+
import io.airbyte.db.bigquery.BigQuerySourceOperations;
3033
import io.airbyte.integrations.base.JavaBaseConstants;
3134
import io.airbyte.integrations.destination.NamingConventionTransformer;
3235
import io.airbyte.integrations.destination.StandardNameTransformer;
3336
import io.airbyte.integrations.standardtest.destination.DataArgumentsProvider;
3437
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
38+
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
3539
import io.airbyte.protocol.models.AirbyteCatalog;
3640
import io.airbyte.protocol.models.AirbyteMessage;
3741
import io.airbyte.protocol.models.AirbyteRecordMessage;
@@ -42,13 +46,12 @@
4246
import java.nio.charset.StandardCharsets;
4347
import java.nio.file.Files;
4448
import java.nio.file.Path;
45-
import java.util.ArrayList;
49+
import java.util.Collections;
4650
import java.util.List;
47-
import java.util.Map;
4851
import java.util.Optional;
52+
import java.util.TimeZone;
4953
import java.util.UUID;
5054
import java.util.stream.Collectors;
51-
import java.util.stream.StreamSupport;
5255
import org.apache.commons.lang3.tuple.ImmutablePair;
5356
import org.junit.jupiter.params.ParameterizedTest;
5457
import org.junit.jupiter.params.provider.ArgumentsSource;
@@ -110,6 +113,27 @@ protected Optional<NamingConventionTransformer> getNameTransformer() {
110113
return Optional.of(NAME_TRANSFORMER);
111114
}
112115

116+
@Override
117+
protected TestDataComparator getTestDataComparator() {
118+
return new BigQueryDenormalizedTestDataComparator();
119+
}
120+
121+
@Override
122+
protected boolean supportBasicDataTypeTest() {
123+
return true;
124+
}
125+
126+
// #13154 Normalization issue
127+
@Override
128+
protected boolean supportArrayDataTypeTest() {
129+
return false;
130+
}
131+
132+
@Override
133+
protected boolean supportObjectDataTypeTest() {
134+
return true;
135+
}
136+
113137
@Override
114138
protected void assertNamespaceNormalization(final String testCaseId,
115139
final String expectedNormalizedNamespace,
@@ -143,42 +167,29 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
143167
final String namespace,
144168
final JsonNode streamSchema)
145169
throws Exception {
146-
return new ArrayList<>(retrieveRecordsFromTable(namingResolver.getIdentifier(streamName), namingResolver.getIdentifier(namespace)));
147-
}
148-
149-
@Override
150-
protected List<String> resolveIdentifier(final String identifier) {
151-
final List<String> result = new ArrayList<>();
152-
result.add(identifier);
153-
result.add(namingResolver.getIdentifier(identifier));
154-
return result;
170+
final String tableName = namingResolver.getIdentifier(streamName);
171+
final String schema = namingResolver.getIdentifier(namespace);
172+
return retrieveRecordsFromTable(tableName, schema);
155173
}
156174

157175
private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schema) throws InterruptedException {
176+
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
177+
158178
final QueryJobConfiguration queryConfig =
159179
QueryJobConfiguration
160180
.newBuilder(
161181
String.format("SELECT * FROM `%s`.`%s` order by %s asc;", schema, tableName,
162182
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
163-
.setUseLegacySql(false).build();
183+
// .setUseLegacySql(false)
184+
.setConnectionProperties(Collections.singletonList(ConnectionProperty.of("time_zone", "UTC")))
185+
.build();
164186

165187
final TableResult queryResults = executeQuery(bigquery, queryConfig).getLeft().getQueryResults();
166188
final FieldList fields = queryResults.getSchema().getFields();
189+
BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();
167190

168-
return StreamSupport
169-
.stream(queryResults.iterateAll().spliterator(), false)
170-
.map(row -> {
171-
final Map<String, Object> jsonMap = Maps.newHashMap();
172-
for (final Field field : fields) {
173-
final Object value = getTypedFieldValue(row, field);
174-
if (!isAirbyteColumn(field.getName()) && value != null) {
175-
jsonMap.put(field.getName(), value);
176-
}
177-
}
178-
return jsonMap;
179-
})
180-
.map(Jsons::jsonNode)
181-
.collect(Collectors.toList());
191+
return Streams.stream(queryResults.iterateAll())
192+
.map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList());
182193
}
183194

184195
private boolean isAirbyteColumn(final String name) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.bigquery;
6+
7+
import io.airbyte.integrations.destination.StandardNameTransformer;
8+
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
9+
import java.time.LocalDate;
10+
import java.time.LocalDateTime;
11+
import java.time.ZoneOffset;
12+
import java.time.ZonedDateTime;
13+
import java.time.format.DateTimeFormatter;
14+
import java.util.ArrayList;
15+
import java.util.List;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
19+
public class BigQueryDenormalizedTestDataComparator extends AdvancedTestDataComparator {
20+
21+
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedTestDataComparator.class);
22+
private static final String BIGQUERY_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
23+
24+
private final StandardNameTransformer namingResolver = new StandardNameTransformer();
25+
26+
@Override
27+
protected List<String> resolveIdentifier(final String identifier) {
28+
final List<String> result = new ArrayList<>();
29+
result.add(identifier);
30+
result.add(namingResolver.getIdentifier(identifier));
31+
return result;
32+
}
33+
34+
private LocalDate parseDate(String dateValue) {
35+
if (dateValue != null) {
36+
var format = (dateValue.matches(".+Z") ? BIGQUERY_DATETIME_FORMAT : AIRBYTE_DATE_FORMAT);
37+
return LocalDate.parse(dateValue, DateTimeFormatter.ofPattern(format));
38+
} else {
39+
return null;
40+
}
41+
}
42+
43+
private LocalDateTime parseDateTime(String dateTimeValue) {
44+
if (dateTimeValue != null) {
45+
var format = (dateTimeValue.matches(".+Z") ? BIGQUERY_DATETIME_FORMAT : AIRBYTE_DATETIME_FORMAT);
46+
return LocalDateTime.parse(dateTimeValue, DateTimeFormatter.ofPattern(format));
47+
} else {
48+
return null;
49+
}
50+
}
51+
52+
@Override
53+
protected boolean compareDateTimeValues(String expectedValue, String actualValue) {
54+
var destinationDate = parseDateTime(actualValue);
55+
var expectedDate = LocalDateTime.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT));
56+
if (expectedDate.isBefore(getBrokenDate().toLocalDateTime())) {
57+
LOGGER
58+
.warn("Validation is skipped due to known Normalization issue. Values older then 1583 year and with time zone stored wrongly(lose days).");
59+
return true;
60+
} else {
61+
return expectedDate.equals(destinationDate);
62+
}
63+
}
64+
65+
@Override
66+
protected boolean compareDateValues(String expectedValue, String actualValue) {
67+
var destinationDate = parseDate(actualValue);
68+
var expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATE_FORMAT));
69+
return expectedDate.equals(destinationDate);
70+
}
71+
72+
@Override
73+
protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) {
74+
return ZonedDateTime.of(LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(BIGQUERY_DATETIME_FORMAT)), ZoneOffset.UTC);
75+
}
76+
77+
@Override
78+
protected boolean compareDateTimeWithTzValues(String airbyteMessageValue, String destinationValue) {
79+
// #13123 Normalization issue
80+
if (parseDestinationDateWithTz(destinationValue).isBefore(getBrokenDate())) {
81+
LOGGER
82+
.warn("Validation is skipped due to known Normalization issue. Values older then 1583 year and with time zone stored wrongly(lose days).");
83+
return true;
84+
} else {
85+
return super.compareDateTimeWithTzValues(airbyteMessageValue, destinationValue);
86+
}
87+
}
88+
89+
// #13123 Normalization issue
90+
private ZonedDateTime getBrokenDate() {
91+
return ZonedDateTime.of(1583, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC);
92+
}
93+
}

0 commit comments

Comments
 (0)