Skip to content

Commit 4625d90

Browse files
authored
🐛 BigQuery source: Fix nested structs (#6135)
* Fixed parsing nested Structs * Update bigquery.md * Clean up * Update Dockerfile
1 parent 8094a1d commit 4625d90

File tree

6 files changed

+143
-8
lines changed

6 files changed

+143
-8
lines changed

airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/bfd1ddf8-ae8a-4620-b1d7-55597d2ba08c.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"sourceDefinitionId": "bfd1ddf8-ae8a-4620-b1d7-55597d2ba08c",
33
"name": "BigQuery",
44
"dockerRepository": "airbyte/source-bigquery",
5-
"dockerImageTag": "0.1.1",
5+
"dockerImageTag": "0.1.2",
66
"documentationUrl": "https://docs.airbyte.io/integrations/sources/bigquery"
77
}

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@
410410
- sourceDefinitionId: bfd1ddf8-ae8a-4620-b1d7-55597d2ba08c
411411
name: BigQuery
412412
dockerRepository: airbyte/source-bigquery
413-
dockerImageTag: 0.1.1
413+
dockerImageTag: 0.1.2
414414
documentationUrl: https://docs.airbyte.io/integrations/sources/bigquery
415415
- sourceDefinitionId: 90916976-a132-4ce9-8bce-82a03dd58788
416416
name: BambooHR

airbyte-db/lib/src/main/java/io/airbyte/db/bigquery/BigQuerySourceOperations.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,18 @@ private void setJsonField(Field field, FieldValue fieldValue, ObjectNode node) {
108108
}
109109
} else if (fieldValue.getAttribute().equals(Attribute.RECORD)) {
110110
ObjectNode newNode = node.putObject(fieldName);
111-
field.getSubFields().forEach(recordField -> {
112-
setJsonField(recordField, fieldValue.getRecordValue().get(recordField.getName()), newNode);
113-
});
111+
FieldList subFields = field.getSubFields();
112+
try {
113+
// named get doesn't work here with nested arrays and objects; index is the only correlation between
114+
// field and field value
115+
if (subFields != null && !subFields.isEmpty()) {
116+
for (int i = 0; i < subFields.size(); i++) {
117+
setJsonField(field.getSubFields().get(i), fieldValue.getRecordValue().get(i), newNode);
118+
}
119+
}
120+
} catch (UnsupportedOperationException e) {
121+
LOGGER.error("Failed to parse Object field with name: ", fieldName, e.getMessage());
122+
}
114123
}
115124
}
116125

airbyte-integrations/connectors/source-bigquery/Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

1111
# Airbyte's build system uses these labels to know what to name and tag the docker images produced by this Dockerfile.
12-
LABEL io.airbyte.version=0.1.1
13-
LABEL io.airbyte.name=airbyte/source-bigquery
12+
LABEL io.airbyte.version=0.1.2
13+
LABEL io.airbyte.name=airbyte/source-bigquery
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2020 Airbyte
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
package io.airbyte.integrations.source.bigquery;
26+
27+
import static io.airbyte.integrations.source.bigquery.BigQuerySource.CONFIG_CREDS;
28+
import static io.airbyte.integrations.source.bigquery.BigQuerySource.CONFIG_DATASET_ID;
29+
import static io.airbyte.integrations.source.bigquery.BigQuerySource.CONFIG_PROJECT_ID;
30+
import static org.junit.jupiter.api.Assertions.assertEquals;
31+
import static org.junit.jupiter.api.Assertions.assertNotNull;
32+
33+
import com.fasterxml.jackson.databind.JsonNode;
34+
import com.google.cloud.bigquery.Dataset;
35+
import com.google.cloud.bigquery.DatasetInfo;
36+
import com.google.common.collect.ImmutableMap;
37+
import io.airbyte.commons.json.Jsons;
38+
import io.airbyte.commons.string.Strings;
39+
import io.airbyte.commons.util.MoreIterators;
40+
import io.airbyte.db.bigquery.BigQueryDatabase;
41+
import io.airbyte.protocol.models.AirbyteMessage;
42+
import io.airbyte.protocol.models.CatalogHelpers;
43+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
44+
import io.airbyte.protocol.models.Field;
45+
import io.airbyte.protocol.models.JsonSchemaPrimitive;
46+
import java.io.IOException;
47+
import java.nio.file.Files;
48+
import java.nio.file.Path;
49+
import java.sql.SQLException;
50+
import java.util.List;
51+
import org.junit.jupiter.api.AfterEach;
52+
import org.junit.jupiter.api.BeforeEach;
53+
import org.junit.jupiter.api.Test;
54+
55+
class BigQuerySourceTest {
56+
57+
private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json");
58+
private static final String STREAM_NAME = "id_and_name";
59+
60+
private BigQueryDatabase database;
61+
private Dataset dataset;
62+
private JsonNode config;
63+
64+
@BeforeEach
65+
void setUp() throws IOException, SQLException {
66+
if (!Files.exists(CREDENTIALS_PATH)) {
67+
throw new IllegalStateException(
68+
"Must provide path to a big query credentials file. By default {module-root}/" + CREDENTIALS_PATH
69+
+ ". Override by setting setting path with the CREDENTIALS_PATH constant.");
70+
}
71+
72+
final String credentialsJsonString = new String(Files.readAllBytes(CREDENTIALS_PATH));
73+
74+
final JsonNode credentialsJson = Jsons.deserialize(credentialsJsonString);
75+
final String projectId = credentialsJson.get(CONFIG_PROJECT_ID).asText();
76+
final String datasetLocation = "US";
77+
78+
final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8);
79+
80+
config = Jsons.jsonNode(ImmutableMap.builder()
81+
.put(CONFIG_PROJECT_ID, projectId)
82+
.put(CONFIG_CREDS, credentialsJsonString)
83+
.put(CONFIG_DATASET_ID, datasetId)
84+
.build());
85+
86+
database = new BigQueryDatabase(config.get(CONFIG_PROJECT_ID).asText(), credentialsJsonString);
87+
88+
final DatasetInfo datasetInfo =
89+
DatasetInfo.newBuilder(config.get(CONFIG_DATASET_ID).asText()).setLocation(datasetLocation).build();
90+
dataset = database.getBigQuery().create(datasetInfo);
91+
92+
database.execute(
93+
"CREATE TABLE " + datasetId
94+
+ ".id_and_name(id INT64, array_val ARRAY<STRUCT<key string, value STRUCT<string_val string>>>, object_val STRUCT<val_array ARRAY<STRUCT<value_str1 string>>, value_str2 string>);");
95+
database.execute(
96+
"INSERT INTO " + datasetId
97+
+ ".id_and_name (id, array_val, object_val) VALUES "
98+
+ "(1, [STRUCT('test1_1', STRUCT('struct1_1')), STRUCT('test1_2', STRUCT('struct1_2'))], STRUCT([STRUCT('value1_1'), STRUCT('value1_2')], 'test1_1')), "
99+
+ "(2, [STRUCT('test2_1', STRUCT('struct2_1')), STRUCT('test2_2', STRUCT('struct2_2'))], STRUCT([STRUCT('value2_1'), STRUCT('value2_2')], 'test2_1')), "
100+
+ "(3, [STRUCT('test3_1', STRUCT('struct3_1')), STRUCT('test3_2', STRUCT('struct3_2'))], STRUCT([STRUCT('value3_1'), STRUCT('value3_2')], 'test3_1'));");
101+
}
102+
103+
@AfterEach
104+
void tearDown() {
105+
database.cleanDataSet(dataset.getDatasetId().getDataset());
106+
}
107+
108+
@Test
109+
public void testReadSuccess() throws Exception {
110+
final List<AirbyteMessage> actualMessages = MoreIterators.toList(new BigQuerySource().read(config, getConfiguredCatalog(), null));
111+
112+
assertNotNull(actualMessages);
113+
assertEquals(3, actualMessages.size());
114+
}
115+
116+
private ConfiguredAirbyteCatalog getConfiguredCatalog() {
117+
return CatalogHelpers.createConfiguredAirbyteCatalog(
118+
STREAM_NAME,
119+
config.get(CONFIG_DATASET_ID).asText(),
120+
Field.of("id", JsonSchemaPrimitive.NUMBER),
121+
Field.of("array_val", JsonSchemaPrimitive.ARRAY),
122+
Field.of("object_val", JsonSchemaPrimitive.OBJECT));
123+
}
124+
125+
}

docs/integrations/sources/bigquery.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,5 +88,6 @@ Once you've configured BigQuery as a source, delete the Service Account Key from
8888

8989
| Version | Date | Pull Request | Subject |
9090
| :--- | :--- | :--- | :--- |
91+
| 0.1.2 | 2021-09-15 | [#6135](https://github.com/airbytehq/airbyte/pull/6135) | 🐛 BigQuery source: Fix nested structs |
9192
| 0.1.1 | 2021-07-28 | [#4981](https://github.com/airbytehq/airbyte/pull/4981) | 🐛 BigQuery source: Fix nested arrays |
92-
| 0.1.0 | 2021-07-22 | [#4457](https://github.com/airbytehq/airbyte/pull/4457) | 🎉 New Source: Big Query. |
93+
| 0.1.0 | 2021-07-22 | [#4457](https://github.com/airbytehq/airbyte/pull/4457) | 🎉 New Source: Big Query. |

0 commit comments

Comments
 (0)