Skip to content

Commit 58f18c4

Browse files
authored
BigQuery Denormalized : Cover arrays only if they are nested (#14023)
* stop covering any array. cover only if we have array of arrays (restriction of BigQuery) * add test with nested arrays and update existing tests * [14058] fix datetime arrays * [11109] cover only array of arrays by object instead of any array * [14058] fix datetime format fail when we have an array of objects with datetime * enable Array and Array+Object DATs * reopen Issue #11166 and disable functionality * Improve the tests by moving common part to Utils * Add tests to check `Array of arrays` cases * Increase version * Doc * format * review update: - update comment about reopen issue - added test case with multiply array sub values - fix nested arrays with datetime - add test case for nested arrays with datetime * fix date formatting * disable testAnyOf test and upd comments * remove some code duplication in the tests * [14668] cover by tests the BigQuery inheritance limitation * Make GCS implementation running same tests as standard impl * Make common format for returning date values to cover DateTime and Timestamp columns by one test * [15363] add backward compatibility for existing connections. * Populate stream config and messages by tablespace. Now it's required inside processing. * Compare only fields from the stream config * Rework BigQueryUploaderFactory and UploaderConfig to have possibility make a decision about array formmater before we create temporary table * Compare fields * remove extra logging * fix project:dataset format of the datasetId * missing import * remove debug logging * fix log messages * format * 4 > 3
1 parent 81e8a51 commit 58f18c4

File tree

33 files changed

+1370
-811
lines changed

33 files changed

+1370
-811
lines changed

airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,37 @@
44

55
package io.airbyte.integrations.destination.bigquery;
66

7+
import static com.google.cloud.bigquery.Field.Mode.REPEATED;
8+
79
import com.fasterxml.jackson.databind.JsonNode;
10+
import com.google.cloud.bigquery.Field;
11+
import com.google.cloud.bigquery.Table;
812
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
913
import io.airbyte.integrations.base.Destination;
1014
import io.airbyte.integrations.base.IntegrationRunner;
1115
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
1216
import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryDenormalizedRecordFormatter;
1317
import io.airbyte.integrations.destination.bigquery.formatter.GcsBigQueryDenormalizedRecordFormatter;
18+
import io.airbyte.integrations.destination.bigquery.formatter.arrayformater.LegacyArrayFormatter;
19+
import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader;
20+
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory;
1421
import io.airbyte.integrations.destination.bigquery.uploader.UploaderType;
22+
import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig;
1523
import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
24+
import io.airbyte.protocol.models.AirbyteStream;
25+
import java.io.IOException;
1626
import java.util.Map;
1727
import java.util.function.BiFunction;
1828
import java.util.function.Function;
29+
import javax.annotation.Nullable;
1930
import org.apache.avro.Schema;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
2033

2134
public class BigQueryDenormalizedDestination extends BigQueryDestination {
2235

36+
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedDestination.class);
37+
2338
@Override
2439
protected String getTargetTableName(final String streamName) {
2540
// This BigQuery destination does not write to a staging "raw" table but directly to a normalized
@@ -67,6 +82,119 @@ protected Function<String, String> getTargetTableNameTransformer(final BigQueryS
6782
return namingResolver::getIdentifier;
6883
}
6984

85+
@Override
86+
protected void putStreamIntoUploaderMap(AirbyteStream stream,
87+
UploaderConfig uploaderConfig,
88+
Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMap)
89+
throws IOException {
90+
Table existingTable =
91+
uploaderConfig.getBigQuery().getTable(uploaderConfig.getConfigStream().getStream().getNamespace(), uploaderConfig.getTargetTableName());
92+
BigQueryRecordFormatter formatter = uploaderConfig.getFormatter();
93+
94+
if (existingTable != null) {
95+
LOGGER.info("Target table already exists. Checking could we use the default destination processing.");
96+
if (!compareSchemas((formatter.getBigQuerySchema()), existingTable.getDefinition().getSchema())) {
97+
((DefaultBigQueryDenormalizedRecordFormatter) formatter).setArrayFormatter(new LegacyArrayFormatter());
98+
LOGGER.warn("Existing target table has different structure with the new destination processing. Trying legacy implementation.");
99+
} else {
100+
LOGGER.info("Existing target table {} has equal structure with the destination schema. Using the default array processing.",
101+
stream.getName());
102+
}
103+
} else {
104+
LOGGER.info("Target table is not created yet. The default destination processing will be used.");
105+
}
106+
107+
AbstractBigQueryUploader<?> uploader = BigQueryUploaderFactory.getUploader(uploaderConfig);
108+
uploaderMap.put(
109+
AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream),
110+
uploader);
111+
}
112+
113+
/**
114+
* Compare calculated bigquery schema and existing schema of the table. Note! We compare only fields
115+
* from the calculated schema to avoid manually created fields in the table.
116+
*
117+
* @param expectedSchema BigQuery schema of the table which we calculated using the stream schema
118+
* config
119+
* @param existingSchema BigQuery schema of the existing table (created by previous run)
120+
* @return Are calculated fields same as we have in the existing table
121+
*/
122+
private boolean compareSchemas(com.google.cloud.bigquery.Schema expectedSchema, @Nullable com.google.cloud.bigquery.Schema existingSchema) {
123+
if (expectedSchema != null && existingSchema == null) {
124+
LOGGER.warn("Existing schema is null when we expect {}", expectedSchema);
125+
return false;
126+
} else if (expectedSchema == null && existingSchema == null) {
127+
LOGGER.info("Existing and expected schemas are null.");
128+
return true;
129+
} else if (expectedSchema == null) {
130+
LOGGER.warn("Expected schema is null when we have existing schema {}", existingSchema);
131+
return false;
132+
}
133+
134+
var expectedFields = expectedSchema.getFields();
135+
var existingFields = existingSchema.getFields();
136+
137+
for (Field expectedField : expectedFields) {
138+
var existingField = existingFields.get(expectedField.getName());
139+
if (isDifferenceBetweenFields(expectedField, existingField)) {
140+
LOGGER.warn("Expected field {} is different from existing field {}", expectedField, existingField);
141+
return false;
142+
}
143+
}
144+
145+
LOGGER.info("Existing and expected schemas are equal.");
146+
return true;
147+
}
148+
149+
private boolean isDifferenceBetweenFields(Field expectedField, Field existingField) {
150+
if (existingField == null) {
151+
return true;
152+
} else {
153+
return !expectedField.getType().equals(existingField.getType())
154+
|| !compareRepeatedMode(expectedField, existingField)
155+
|| !compareSubFields(expectedField, existingField);
156+
}
157+
}
158+
159+
/**
160+
* Compare field modes. Field can have on of four modes: NULLABLE, REQUIRED, REPEATED, null. Only
161+
* the REPEATED mode difference is critical. The method fails only if at least one is REPEATED and
162+
* the second one is not.
163+
*
164+
* @param expectedField expected field structure
165+
* @param existingField existing field structure
166+
* @return is critical difference in the field modes
167+
*/
168+
private boolean compareRepeatedMode(Field expectedField, Field existingField) {
169+
var expectedMode = expectedField.getMode();
170+
var existingMode = existingField.getMode();
171+
172+
if (expectedMode != null && expectedMode.equals(REPEATED) || existingMode != null && existingMode.equals(REPEATED)) {
173+
return expectedMode != null && expectedMode.equals(existingMode);
174+
} else {
175+
return true;
176+
}
177+
}
178+
179+
private boolean compareSubFields(Field expectedField, Field existingField) {
180+
var expectedSubFields = expectedField.getSubFields();
181+
var existingSubFields = existingField.getSubFields();
182+
183+
if (expectedSubFields == null || expectedSubFields.isEmpty()) {
184+
return true;
185+
} else if (existingSubFields == null || existingSubFields.isEmpty()) {
186+
return false;
187+
} else {
188+
for (Field expectedSubField : expectedSubFields) {
189+
var existingSubField = existingSubFields.get(expectedSubField.getName());
190+
if (isDifferenceBetweenFields(expectedSubField, existingSubField)) {
191+
return false;
192+
}
193+
}
194+
return true;
195+
}
196+
}
197+
70198
public static void main(final String[] args) throws Exception {
71199
final Destination destination = new BigQueryDenormalizedDestination();
72200
new IntegrationRunner(destination).run(args);

airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java

Lines changed: 38 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import com.fasterxml.jackson.databind.JsonNode;
99
import com.fasterxml.jackson.databind.ObjectMapper;
1010
import com.fasterxml.jackson.databind.ObjectReader;
11-
import com.fasterxml.jackson.databind.node.ArrayNode;
1211
import com.fasterxml.jackson.databind.node.ObjectNode;
1312
import com.google.cloud.bigquery.Field;
1413
import com.google.cloud.bigquery.Field.Builder;
@@ -26,6 +25,8 @@
2625
import io.airbyte.integrations.destination.bigquery.BigQueryUtils;
2726
import io.airbyte.integrations.destination.bigquery.JsonSchemaFormat;
2827
import io.airbyte.integrations.destination.bigquery.JsonSchemaType;
28+
import io.airbyte.integrations.destination.bigquery.formatter.arrayformater.ArrayFormatter;
29+
import io.airbyte.integrations.destination.bigquery.formatter.arrayformater.DefaultArrayFormatter;
2930
import io.airbyte.protocol.models.AirbyteRecordMessage;
3031
import java.io.IOException;
3132
import java.util.Collections;
@@ -42,26 +43,39 @@ public class DefaultBigQueryDenormalizedRecordFormatter extends DefaultBigQueryR
4243

4344
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBigQueryDenormalizedRecordFormatter.class);
4445

45-
public static final String NESTED_ARRAY_FIELD = "big_query_array";
46-
protected static final String PROPERTIES_FIELD = "properties";
47-
private static final String TYPE_FIELD = "type";
46+
public static final String PROPERTIES_FIELD = "properties";
47+
public static final String TYPE_FIELD = "type";
4848
private static final String ALL_OF_FIELD = "allOf";
4949
private static final String ANY_OF_FIELD = "anyOf";
50-
private static final String ARRAY_ITEMS_FIELD = "items";
5150
private static final String FORMAT_FIELD = "format";
5251
private static final String AIRBYTE_TYPE = "airbyte_type";
5352
private static final String REF_DEFINITION_KEY = "$ref";
5453
private static final ObjectMapper mapper = new ObjectMapper();
5554

55+
protected ArrayFormatter arrayFormatter;
56+
5657
public DefaultBigQueryDenormalizedRecordFormatter(final JsonNode jsonSchema, final StandardNameTransformer namingResolver) {
5758
super(jsonSchema, namingResolver);
5859
}
5960

61+
private ArrayFormatter getArrayFormatter() {
62+
if (arrayFormatter == null) {
63+
arrayFormatter = new DefaultArrayFormatter();
64+
}
65+
return arrayFormatter;
66+
}
67+
68+
public void setArrayFormatter(ArrayFormatter arrayFormatter) {
69+
this.arrayFormatter = arrayFormatter;
70+
this.jsonSchema = formatJsonSchema(this.originalJsonSchema.deepCopy());
71+
this.bigQuerySchema = getBigQuerySchema(jsonSchema);
72+
}
73+
6074
@Override
6175
protected JsonNode formatJsonSchema(final JsonNode jsonSchema) {
62-
var modifiedJsonSchema = formatAllOfAndAnyOfFields(namingResolver, jsonSchema);
63-
populateEmptyArrays(modifiedJsonSchema);
64-
surroundArraysByObjects(modifiedJsonSchema);
76+
var modifiedJsonSchema = jsonSchema.deepCopy(); // Issue #5912 is reopened (PR #11166) formatAllOfAndAnyOfFields(namingResolver, jsonSchema);
77+
getArrayFormatter().populateEmptyArrays(modifiedJsonSchema);
78+
getArrayFormatter().surroundArraysByObjects(modifiedJsonSchema);
6579
return modifiedJsonSchema;
6680
}
6781

@@ -76,53 +90,6 @@ private JsonNode formatAllOfAndAnyOfFields(final StandardNameTransformer namingR
7690
return modifiedSchema;
7791
}
7892

79-
private List<JsonNode> findArrays(final JsonNode node) {
80-
if (node != null) {
81-
return node.findParents(TYPE_FIELD).stream()
82-
.filter(
83-
jsonNode -> {
84-
final JsonNode type = jsonNode.get(TYPE_FIELD);
85-
if (type.isArray()) {
86-
final ArrayNode typeNode = (ArrayNode) type;
87-
for (final JsonNode arrayTypeNode : typeNode) {
88-
if (arrayTypeNode.isTextual() && arrayTypeNode.textValue().equals("array")) {
89-
return true;
90-
}
91-
}
92-
} else if (type.isTextual()) {
93-
return jsonNode.asText().equals("array");
94-
}
95-
return false;
96-
})
97-
.collect(Collectors.toList());
98-
} else {
99-
return Collections.emptyList();
100-
}
101-
}
102-
103-
private void populateEmptyArrays(final JsonNode node) {
104-
findArrays(node).forEach(jsonNode -> {
105-
if (!jsonNode.has(ARRAY_ITEMS_FIELD)) {
106-
final ObjectNode nodeToChange = (ObjectNode) jsonNode;
107-
nodeToChange.putObject(ARRAY_ITEMS_FIELD).putArray(TYPE_FIELD).add("string");
108-
}
109-
});
110-
}
111-
112-
private void surroundArraysByObjects(final JsonNode node) {
113-
findArrays(node).forEach(
114-
jsonNode -> {
115-
final JsonNode arrayNode = jsonNode.deepCopy();
116-
117-
final ObjectNode newNode = (ObjectNode) jsonNode;
118-
newNode.removeAll();
119-
newNode.putArray(TYPE_FIELD).add("object");
120-
newNode.putObject(PROPERTIES_FIELD).set(NESTED_ARRAY_FIELD, arrayNode);
121-
122-
surroundArraysByObjects(arrayNode.get(ARRAY_ITEMS_FIELD));
123-
});
124-
}
125-
12693
@Override
12794
public JsonNode formatRecord(final AirbyteRecordMessage recordMessage) {
12895
// Bigquery represents TIMESTAMP to the microsecond precision, so we convert to microseconds then
@@ -153,25 +120,32 @@ protected void addAirbyteColumns(final ObjectNode data, final AirbyteRecordMessa
153120
data.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt);
154121
}
155122

156-
protected JsonNode formatData(final FieldList fields, final JsonNode root) {
123+
private JsonNode formatData(final FieldList fields, final JsonNode root) {
157124
// handles empty objects and arrays
158125
if (fields == null) {
159126
return root;
160127
}
161-
formatDateTimeFields(fields, root);
128+
JsonNode formattedData;
162129
if (root.isObject()) {
163-
return getObjectNode(fields, root);
130+
formattedData = getObjectNode(fields, root);
164131
} else if (root.isArray()) {
165-
return getArrayNode(fields, root);
132+
formattedData = getArrayNode(fields, root);
166133
} else {
167-
return root;
134+
formattedData = root;
168135
}
136+
formatDateTimeFields(fields, formattedData);
137+
138+
return formattedData;
169139
}
170140

171141
protected void formatDateTimeFields(final FieldList fields, final JsonNode root) {
172142
final List<String> dateTimeFields = BigQueryUtils.getDateTimeFieldsFromSchema(fields);
173143
if (!dateTimeFields.isEmpty() && !root.isNull()) {
174-
BigQueryUtils.transformJsonDateTimeToBigDataFormat(dateTimeFields, (ObjectNode) root);
144+
if (root.isArray()) {
145+
root.forEach(jsonNode -> BigQueryUtils.transformJsonDateTimeToBigDataFormat(dateTimeFields, jsonNode));
146+
} else {
147+
BigQueryUtils.transformJsonDateTimeToBigDataFormat(dateTimeFields, root);
148+
}
175149
}
176150
}
177151

@@ -185,11 +159,11 @@ private JsonNode getArrayNode(final FieldList fields, final JsonNode root) {
185159
} else {
186160
subFields = arrayField.getSubFields();
187161
}
188-
final JsonNode items = Jsons.jsonNode(MoreIterators.toList(root.elements()).stream()
162+
List<JsonNode> arrayItems = MoreIterators.toList(root.elements()).stream()
189163
.map(p -> formatData(subFields, p))
190-
.collect(Collectors.toList()));
164+
.toList();
191165

192-
return Jsons.jsonNode(ImmutableMap.of(NESTED_ARRAY_FIELD, items));
166+
return getArrayFormatter().formatArrayItems(arrayItems);
193167
}
194168

195169
private JsonNode getObjectNode(final FieldList fields, final JsonNode root) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.bigquery.formatter.arrayformater;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import java.util.List;
9+
10+
public interface ArrayFormatter {
11+
12+
void populateEmptyArrays(final JsonNode node);
13+
14+
void surroundArraysByObjects(final JsonNode node);
15+
16+
JsonNode formatArrayItems(final List<JsonNode> arrayItems);
17+
18+
}

0 commit comments

Comments
 (0)