Skip to content

Commit de01d68

Browse files
committed
Enable DAT
1 parent d7a1fe8 commit de01d68

File tree

3 files changed

+113
-49
lines changed

3 files changed

+113
-49
lines changed

airbyte-integrations/connectors/destination-bigquery-denormalized/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ dependencies {
2424
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
2525
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-bigquery-denormalized')
2626
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
27+
integrationTestJavaImplementation project(':airbyte-db:lib')
2728

2829
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
2930
}

airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java

+42-49
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,11 @@
99
import com.fasterxml.jackson.databind.JsonNode;
1010
import com.fasterxml.jackson.databind.node.ObjectNode;
1111
import com.google.auth.oauth2.ServiceAccountCredentials;
12-
import com.google.cloud.bigquery.BigQuery;
13-
import com.google.cloud.bigquery.BigQueryOptions;
14-
import com.google.cloud.bigquery.Dataset;
15-
import com.google.cloud.bigquery.DatasetInfo;
16-
import com.google.cloud.bigquery.Field;
17-
import com.google.cloud.bigquery.FieldList;
18-
import com.google.cloud.bigquery.FieldValue;
19-
import com.google.cloud.bigquery.FieldValueList;
20-
import com.google.cloud.bigquery.Job;
21-
import com.google.cloud.bigquery.JobId;
22-
import com.google.cloud.bigquery.JobInfo;
23-
import com.google.cloud.bigquery.QueryJobConfiguration;
24-
import com.google.cloud.bigquery.TableResult;
12+
import com.google.cloud.bigquery.*;
2513
import com.google.common.collect.ImmutableMap;
26-
import com.google.common.collect.Maps;
14+
import io.airbyte.db.bigquery.BigQueryResultSet;
15+
import io.airbyte.db.bigquery.BigQuerySourceOperations;
16+
import com.google.common.collect.Streams;
2717
import io.airbyte.commons.json.Jsons;
2818
import io.airbyte.commons.resources.MoreResources;
2919
import io.airbyte.commons.string.Strings;
@@ -32,6 +22,7 @@
3222
import io.airbyte.integrations.destination.StandardNameTransformer;
3323
import io.airbyte.integrations.standardtest.destination.DataArgumentsProvider;
3424
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
25+
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
3526
import io.airbyte.protocol.models.AirbyteCatalog;
3627
import io.airbyte.protocol.models.AirbyteMessage;
3728
import io.airbyte.protocol.models.AirbyteRecordMessage;
@@ -42,13 +33,8 @@
4233
import java.nio.charset.StandardCharsets;
4334
import java.nio.file.Files;
4435
import java.nio.file.Path;
45-
import java.util.ArrayList;
46-
import java.util.List;
47-
import java.util.Map;
48-
import java.util.Optional;
49-
import java.util.UUID;
36+
import java.util.*;
5037
import java.util.stream.Collectors;
51-
import java.util.stream.StreamSupport;
5238
import org.apache.commons.lang3.tuple.ImmutablePair;
5339
import org.junit.jupiter.params.ParameterizedTest;
5440
import org.junit.jupiter.params.provider.ArgumentsSource;
@@ -110,6 +96,26 @@ protected Optional<NamingConventionTransformer> getNameTransformer() {
11096
return Optional.of(NAME_TRANSFORMER);
11197
}
11298

99+
@Override
100+
protected TestDataComparator getTestDataComparator() {
101+
return new BigQueryDenormalizedTestDataComparator();
102+
}
103+
104+
@Override
105+
protected boolean supportBasicDataTypeTest() {
106+
return true;
107+
}
108+
109+
@Override
110+
protected boolean supportArrayDataTypeTest() {
111+
return true;
112+
}
113+
114+
@Override
115+
protected boolean supportObjectDataTypeTest() {
116+
return true;
117+
}
118+
113119
@Override
114120
protected void assertNamespaceNormalization(final String testCaseId,
115121
final String expectedNormalizedNamespace,
@@ -142,43 +148,30 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
142148
final String streamName,
143149
final String namespace,
144150
final JsonNode streamSchema)
145-
throws Exception {
146-
return new ArrayList<>(retrieveRecordsFromTable(namingResolver.getIdentifier(streamName), namingResolver.getIdentifier(namespace)));
147-
}
148-
149-
@Override
150-
protected List<String> resolveIdentifier(final String identifier) {
151-
final List<String> result = new ArrayList<>();
152-
result.add(identifier);
153-
result.add(namingResolver.getIdentifier(identifier));
154-
return result;
151+
throws Exception {
152+
final String tableName = namingResolver.getIdentifier(streamName);
153+
final String schema = namingResolver.getIdentifier(namespace);
154+
return retrieveRecordsFromTable(tableName, schema);
155155
}
156156

157157
private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schema) throws InterruptedException {
158+
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
159+
158160
final QueryJobConfiguration queryConfig =
159-
QueryJobConfiguration
160-
.newBuilder(
161-
String.format("SELECT * FROM `%s`.`%s` order by %s asc;", schema, tableName,
162-
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
163-
.setUseLegacySql(false).build();
161+
QueryJobConfiguration
162+
.newBuilder(
163+
String.format("SELECT * FROM `%s`.`%s` order by %s asc;", schema, tableName,
164+
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
165+
// .setUseLegacySql(false)
166+
.setConnectionProperties(Collections.singletonList(ConnectionProperty.of("time_zone", "UTC")))
167+
.build();
164168

165169
final TableResult queryResults = executeQuery(bigquery, queryConfig).getLeft().getQueryResults();
166170
final FieldList fields = queryResults.getSchema().getFields();
171+
BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();
167172

168-
return StreamSupport
169-
.stream(queryResults.iterateAll().spliterator(), false)
170-
.map(row -> {
171-
final Map<String, Object> jsonMap = Maps.newHashMap();
172-
for (final Field field : fields) {
173-
final Object value = getTypedFieldValue(row, field);
174-
if (!isAirbyteColumn(field.getName()) && value != null) {
175-
jsonMap.put(field.getName(), value);
176-
}
177-
}
178-
return jsonMap;
179-
})
180-
.map(Jsons::jsonNode)
181-
.collect(Collectors.toList());
173+
return Streams.stream(queryResults.iterateAll())
174+
.map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList());
182175
}
183176

184177
private boolean isAirbyteColumn(final String name) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package io.airbyte.integrations.destination.bigquery;
2+
3+
import io.airbyte.integrations.destination.StandardNameTransformer;
4+
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
5+
6+
import java.time.LocalDate;
7+
import java.time.LocalDateTime;
8+
import java.time.ZoneOffset;
9+
import java.time.ZonedDateTime;
10+
import java.time.format.DateTimeFormatter;
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
14+
public class BigQueryDenormalizedTestDataComparator extends AdvancedTestDataComparator {
15+
16+
private static final String BIGQUERY_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
17+
18+
private final StandardNameTransformer namingResolver = new StandardNameTransformer();
19+
20+
@Override
21+
protected List<String> resolveIdentifier(final String identifier) {
22+
final List<String> result = new ArrayList<>();
23+
result.add(identifier);
24+
result.add(namingResolver.getIdentifier(identifier));
25+
return result;
26+
}
27+
28+
private LocalDate parseDate(String dateValue) {
29+
if (dateValue != null) {
30+
var format = (dateValue.matches(".+Z") ? BIGQUERY_DATETIME_FORMAT : AIRBYTE_DATE_FORMAT);
31+
return LocalDate.parse(dateValue, DateTimeFormatter.ofPattern(format));
32+
} else {
33+
return null;
34+
}
35+
}
36+
37+
private LocalDateTime parseDateTime(String dateTimeValue) {
38+
if (dateTimeValue != null) {
39+
var format = (dateTimeValue.matches(".+Z") ? BIGQUERY_DATETIME_FORMAT : AIRBYTE_DATETIME_FORMAT);
40+
return LocalDateTime.parse(dateTimeValue, DateTimeFormatter.ofPattern(format));
41+
} else {
42+
return null;
43+
}
44+
}
45+
46+
@Override
47+
protected boolean compareDateTimeValues(String expectedValue, String actualValue) {
48+
var destinationDate = parseDateTime(actualValue);
49+
var expectedDate = LocalDateTime.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT));
50+
return expectedDate.equals(destinationDate);
51+
}
52+
53+
@Override
54+
protected boolean compareDateValues(String expectedValue, String actualValue) {
55+
var destinationDate = parseDate(actualValue);
56+
var expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATE_FORMAT));
57+
return expectedDate.equals(destinationDate);
58+
}
59+
60+
@Override
61+
protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) {
62+
return ZonedDateTime.of(LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(BIGQUERY_DATETIME_FORMAT)), ZoneOffset.UTC);
63+
}
64+
65+
@Override
66+
protected boolean compareDateTimeWithTzValues(String airbyteMessageValue, String destinationValue) {
67+
return super.compareDateTimeWithTzValues(airbyteMessageValue, destinationValue);
68+
}
69+
70+
}

0 commit comments

Comments
 (0)