Skip to content

Commit edc58a7

Browse files
🐛 Source BigQuery: fix error with RECORD REPEATED fields (#35503)
Co-authored-by: Marcos Marx <[email protected]> Co-authored-by: marcosmarxm <[email protected]>
1 parent e671aa3 commit edc58a7

File tree

4 files changed

+65
-2
lines changed

4 files changed

+65
-2
lines changed

airbyte-integrations/connectors/source-bigquery/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: source
77
definitionId: bfd1ddf8-ae8a-4620-b1d7-55597d2ba08c
8-
dockerImageTag: 0.4.1
8+
dockerImageTag: 0.4.2
99
dockerRepository: airbyte/source-bigquery
1010
documentationUrl: https://docs.airbyte.com/integrations/sources/bigquery
1111
githubIssueLabel: source-bigquery

airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable;
1010

1111
import com.fasterxml.jackson.databind.JsonNode;
12+
import com.google.cloud.bigquery.Field;
1213
import com.google.cloud.bigquery.QueryParameterValue;
1314
import com.google.cloud.bigquery.StandardSQLTypeName;
1415
import com.google.cloud.bigquery.Table;
@@ -128,7 +129,12 @@ protected List<TableInfo<CommonField<StandardSQLTypeName>>> discoverInternal(fin
128129
.name(table.getTableId().getTable())
129130
.fields(Objects.requireNonNull(table.getDefinition().getSchema()).getFields().stream()
130131
.map(f -> {
131-
final StandardSQLTypeName standardType = f.getType().getStandardType();
132+
final StandardSQLTypeName standardType;
133+
if (f.getType().getStandardType() == StandardSQLTypeName.STRUCT && f.getMode() == Field.Mode.REPEATED) {
134+
standardType = StandardSQLTypeName.ARRAY;
135+
} else
136+
standardType = f.getType().getStandardType();
137+
132138
return new CommonField<>(f.getName(), standardType);
133139
})
134140
.collect(Collectors.toList()))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.source.bigquery;
6+
7+
import static io.airbyte.integrations.source.bigquery.BigQuerySource.CONFIG_DATASET_ID;
8+
import static org.junit.jupiter.api.Assertions.assertEquals;
9+
import static org.junit.jupiter.api.Assertions.assertNotNull;
10+
11+
import com.fasterxml.jackson.databind.JsonNode;
12+
import com.fasterxml.jackson.databind.ObjectMapper;
13+
import io.airbyte.commons.util.MoreIterators;
14+
import io.airbyte.protocol.models.Field;
15+
import io.airbyte.protocol.models.JsonSchemaType;
16+
import io.airbyte.protocol.models.v0.AirbyteMessage;
17+
import io.airbyte.protocol.models.v0.CatalogHelpers;
18+
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
19+
import java.sql.SQLException;
20+
import java.util.List;
21+
import org.junit.jupiter.api.Test;
22+
23+
public class BigQuerySourceStructRepeatedTest extends AbstractBigQuerySourceTest {
24+
25+
@Override
26+
public void createTable(String datasetId) throws SQLException {
27+
// create column name interval which should be escaped
28+
database.execute("CREATE TABLE " + datasetId + ".struct_repeated(id int64, key_value_pairs ARRAY<STRUCT<key STRING, value FLOAT64>>);");
29+
database.execute("INSERT INTO " + datasetId + ".struct_repeated (id, key_value_pairs) VALUES (1, [('a', 0.7), ('b', 0.8), ('c', 1.2)]);");
30+
}
31+
32+
@Test
33+
public void testReadSuccess() throws Exception {
34+
final List<AirbyteMessage> actualMessages = MoreIterators.toList(new BigQuerySource().read(config, getConfiguredCatalog(), null));
35+
36+
ObjectMapper mapper = new ObjectMapper();
37+
// JsonNode actualObj = mapper.readTree("{\"key_value_pairs\":[{ \"key\": \"a\",\"value\": \"0.7\"},
38+
// {\"key\": \"b\",\"value\": \"0.8\"}, {\"key\": \"c\",\"value\": \"1.2\"}]}");
39+
JsonNode actualObj = mapper.readTree("[{ \"key\": \"a\",\"value\": 0.7}, {\"key\": \"b\",\"value\": 0.8}, {\"key\": \"c\",\"value\": 1.2}]");
40+
41+
assertNotNull(actualMessages);
42+
assertEquals(1, actualMessages.size());
43+
44+
assertNotNull(actualMessages.get(0).getRecord().getData().get("id"));
45+
assertEquals(actualObj, actualMessages.get(0).getRecord().getData().get("key_value_pairs"));
46+
}
47+
48+
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
49+
return CatalogHelpers.createConfiguredAirbyteCatalog(
50+
"struct_repeated",
51+
config.get(CONFIG_DATASET_ID).asText(),
52+
Field.of("id", JsonSchemaType.NUMBER),
53+
Field.of("key_value_pairs", JsonSchemaType.ARRAY));
54+
}
55+
56+
}

docs/integrations/sources/bigquery.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ Once you've configured BigQuery as a source, delete the Service Account Key from
8888

8989
| Version | Date | Pull Request | Subject |
9090
|:--------|:-----------| :------------------------------------------------------- |:------------------------------------------------------------------------------------------------------------------------------------------|
91+
| 0.4.2 | 2024-02-22 | [35503](https://github.com/airbytehq/airbyte/pull/35503) | Source BigQuery: replicating RECORD REPEATED fields |
9192
| 0.4.1 | 2024-01-24 | [34453](https://github.com/airbytehq/airbyte/pull/34453) | bump CDK version |
9293
| 0.4.0 | 2023-12-18 | [33484](https://github.com/airbytehq/airbyte/pull/33484) | Remove LEGACY state |
9394
| 0.3.0 | 2023-06-26 | [27737](https://github.com/airbytehq/airbyte/pull/27737) | License Update: Elv2 |

0 commit comments

Comments
 (0)