Skip to content

Commit e0225c1

Browse files
authored
Destination BigQuery: Consolidation of objects to StreamConfig, cleanup (#38131)
## What Removing redundant references and duplicate information passed around using `WriteConfig` objects. No functional changes and resurrected all the information needed through `StreamConfig` and adapted changes accordingly. This PR should be in a mergeable state with no functional changes after the ones down the stack are published. ## Review guide * Removed references of `BigQueryWriteConfig` and reused already built `StreamConfig` * Removing unnecessary `StagingOperations` interface and made concrete class, this will help for later adding a shim on this and refactoring without large changes * Removed other unnecessary references of getting dynamic schema, `WriteDispostion` etc. Probably remnant of bigquery-denormalized bespoke connector. ## User Impact <!-- * What is the end result perceived by the user? * If there are negative side effects, please list them. --> ## Can this PR be safely reverted and rolled back? <!-- * If unsure, leave it blank. --> - [x] YES 💚 - [ ] NO ❌
1 parent 13e16de commit e0225c1

23 files changed

+103
-325
lines changed

airbyte-integrations/connectors/destination-bigquery/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
airbyteJavaConnector {
6-
cdkVersionRequired = '0.34.4'
6+
cdkVersionRequired = '0.35.0'
77
features = [
88
'db-destinations',
99
'datastore-bigquery',

airbyte-integrations/connectors/destination-bigquery/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
8-
dockerImageTag: 2.4.19
8+
dockerImageTag: 2.4.20
99
dockerRepository: airbyte/destination-bigquery
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
1111
githubIssueLabel: destination-bigquery

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncFlush.java

+16-12
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.integrations.destination.bigquery;
66

7+
import com.google.cloud.bigquery.TableId;
78
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns;
89
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction;
910
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
@@ -12,6 +13,9 @@
1213
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer;
1314
import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator;
1415
import io.airbyte.commons.json.Jsons;
16+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
17+
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
18+
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
1519
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
1620
import io.airbyte.protocol.models.v0.StreamDescriptor;
1721
import java.util.Map;
@@ -25,15 +29,15 @@
2529
@Slf4j
2630
class BigQueryAsyncFlush implements DestinationFlushFunction {
2731

28-
private final Map<StreamDescriptor, BigQueryWriteConfig> streamDescToWriteConfig;
29-
private final BigQueryStagingOperations stagingOperations;
32+
private final Map<StreamDescriptor, StreamConfig> streamConfigMap;
33+
private final BigQueryGcsOperations stagingOperations;
3034
private final ConfiguredAirbyteCatalog catalog;
3135

3236
public BigQueryAsyncFlush(
33-
final Map<StreamDescriptor, BigQueryWriteConfig> streamDescToWriteConfig,
34-
final BigQueryStagingOperations stagingOperations,
37+
final Map<StreamDescriptor, StreamConfig> streamConfigMap,
38+
final BigQueryGcsOperations stagingOperations,
3539
final ConfiguredAirbyteCatalog catalog) {
36-
this.streamDescToWriteConfig = streamDescToWriteConfig;
40+
this.streamConfigMap = streamConfigMap;
3741
this.stagingOperations = stagingOperations;
3842
this.catalog = catalog;
3943
}
@@ -60,20 +64,20 @@ public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessag
6064

6165
writer.flush();
6266
log.info("Flushing CSV buffer for stream {} ({}) to staging", decs.getName(), FileUtils.byteCountToDisplaySize(writer.getByteCount()));
63-
if (!streamDescToWriteConfig.containsKey(decs)) {
67+
if (!streamConfigMap.containsKey(decs)) {
6468
throw new IllegalArgumentException(
6569
String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s", Jsons.serialize(catalog)));
6670
}
6771

68-
final BigQueryWriteConfig writeConfig = streamDescToWriteConfig.get(decs);
72+
final StreamId streamId = streamConfigMap.get(decs).getId();
6973
try {
70-
final String stagedFileName = stagingOperations.uploadRecordsToStage(writeConfig.datasetId(), writeConfig.streamName(), writer);
74+
final String stagedFileName = stagingOperations.uploadRecordsToStage(streamId.getRawNamespace(), streamId.getOriginalName(), writer);
7175

7276
stagingOperations.copyIntoTableFromStage(
73-
writeConfig.datasetId(),
74-
writeConfig.streamName(),
75-
writeConfig.targetTableId(),
76-
writeConfig.tableSchema(),
77+
streamId.getRawNamespace(),
78+
streamId.getOriginalName(),
79+
TableId.of(streamId.getRawNamespace(), streamId.getRawName()),
80+
BigQueryRecordFormatter.SCHEMA_V2,
7781
stagedFileName);
7882
} catch (final Exception e) {
7983
log.error("Failed to flush and commit buffer data into destination's raw table", e);

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package io.airbyte.integrations.destination.bigquery;
66

7-
import com.google.cloud.bigquery.BigQuery;
87
import com.google.common.util.concurrent.RateLimiter;
98
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction;
109
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
@@ -22,13 +21,9 @@ public class BigQueryAsyncStandardFlush implements DestinationFlushFunction {
2221

2322
// TODO remove this once the async framework supports rate-limiting/backpressuring
2423
private static final RateLimiter rateLimiter = RateLimiter.create(0.07);
25-
26-
private final BigQuery bigQuery;
2724
private final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap;
2825

29-
public BigQueryAsyncStandardFlush(final BigQuery bigQuery,
30-
final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap) {
31-
this.bigQuery = bigQuery;
26+
public BigQueryAsyncStandardFlush(final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap) {
3227
this.uploaderMap = uploaderMap;
3328
}
3429

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java

+17-23
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer;
2323
import io.airbyte.cdk.integrations.base.Destination;
2424
import io.airbyte.cdk.integrations.base.IntegrationRunner;
25+
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
2526
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
2627
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
2728
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
@@ -64,10 +65,10 @@
6465
import java.util.concurrent.ConcurrentHashMap;
6566
import java.util.concurrent.ConcurrentMap;
6667
import java.util.function.Consumer;
67-
import java.util.function.Function;
6868
import java.util.function.Supplier;
6969
import org.apache.commons.lang3.StringUtils;
7070
import org.apache.commons.lang3.tuple.ImmutablePair;
71+
import org.jetbrains.annotations.NotNull;
7172
import org.joda.time.DateTime;
7273
import org.joda.time.DateTimeZone;
7374
import org.slf4j.Logger;
@@ -223,9 +224,9 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
223224

224225
@Override
225226
@SuppressWarnings("deprecation")
226-
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config,
227-
final ConfiguredAirbyteCatalog catalog,
228-
final Consumer<AirbyteMessage> outputRecordCollector)
227+
public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final @NotNull JsonNode config,
228+
final @NotNull ConfiguredAirbyteCatalog catalog,
229+
final @NotNull Consumer<AirbyteMessage> outputRecordCollector)
229230
throws Exception {
230231
final UploadingMethod uploadingMethod = BigQueryUtils.getLoadingMethod(config);
231232
final String defaultNamespace = BigQueryUtils.getDatasetId(config);
@@ -234,7 +235,8 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
234235
final String datasetLocation = BigQueryUtils.getDatasetLocation(config);
235236
final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation);
236237
final Optional<String> rawNamespaceOverride = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_DATA_DATASET);
237-
final ParsedCatalog parsedCatalog = parseCatalog(config, catalog, datasetLocation, rawNamespaceOverride);
238+
final ParsedCatalog parsedCatalog = parseCatalog(sqlGenerator, defaultNamespace,
239+
rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE), catalog);
238240
final BigQuery bigquery = getBigQuery(config);
239241
final TyperDeduper typerDeduper =
240242
buildTyperDeduper(sqlGenerator, parsedCatalog, bigquery, datasetLocation, disableTypeDedupe);
@@ -269,22 +271,20 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
269271
final DateTime syncDatetime = DateTime.now(DateTimeZone.UTC);
270272
final boolean keepStagingFiles = BigQueryUtils.isKeepFilesInGcs(config);
271273
final GcsStorageOperations gcsOperations = new GcsStorageOperations(gcsNameTransformer, gcsConfig.getS3Client(), gcsConfig);
272-
final BigQueryStagingOperations bigQueryGcsOperations = new BigQueryGcsOperations(
274+
final BigQueryGcsOperations bigQueryGcsOperations = new BigQueryGcsOperations(
273275
bigquery,
274276
gcsNameTransformer,
275277
gcsConfig,
276278
gcsOperations,
279+
datasetLocation,
277280
stagingId,
278281
syncDatetime,
279282
keepStagingFiles);
280283

281284
return new BigQueryStagingConsumerFactory().createAsync(
282-
config,
283285
catalog,
284286
outputRecordCollector,
285287
bigQueryGcsOperations,
286-
getCsvRecordFormatterCreator(namingResolver),
287-
namingResolver::getTmpTableName,
288288
typerDeduper,
289289
parsedCatalog,
290290
BigQueryUtils.getDatasetId(config));
@@ -368,7 +368,7 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
368368
catalog,
369369
parsedCatalog);
370370

371-
final Optional<String> bqNamespace = Optional.ofNullable(BigQueryUtils.getDatasetId(config));
371+
final String bqNamespace = BigQueryUtils.getDatasetId(config);
372372

373373
return new BigQueryRecordStandardConsumer(
374374
outputRecordCollector,
@@ -406,18 +406,14 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
406406
throw new RuntimeException(e);
407407
}
408408
},
409-
bigquery,
410409
catalog,
411410
bqNamespace,
412411
writeConfigs);
413412
}
414413

415-
protected Function<JsonNode, BigQueryRecordFormatter> getCsvRecordFormatterCreator(final BigQuerySQLNameTransformer namingResolver) {
416-
return streamSchema -> new BigQueryRecordFormatter(namingResolver);
417-
}
418-
419414
private void setDefaultStreamNamespace(final ConfiguredAirbyteCatalog catalog, final String namespace) {
420-
// Set the default namespace on streams with null namespace. This means we don't need to repeat this
415+
// Set the default originalNamespace on streams with null originalNamespace. This means we don't
416+
// need to repeat this
421417
// logic in the rest of the connector.
422418
// (record messages still need to handle null namespaces though, which currently happens in e.g.
423419
// AsyncStreamConsumer#accept)
@@ -429,13 +425,11 @@ private void setDefaultStreamNamespace(final ConfiguredAirbyteCatalog catalog, f
429425
}
430426
}
431427

432-
private ParsedCatalog parseCatalog(final JsonNode config,
433-
final ConfiguredAirbyteCatalog catalog,
434-
final String datasetLocation,
435-
final Optional<String> rawNamespaceOverride) {
436-
final BigQuerySqlGenerator sqlGenerator = new BigQuerySqlGenerator(config.get(BigQueryConsts.CONFIG_PROJECT_ID).asText(), datasetLocation);
437-
final CatalogParser catalogParser = rawNamespaceOverride.map(s -> new CatalogParser(sqlGenerator, s))
438-
.orElseGet(() -> new CatalogParser(sqlGenerator));
428+
private ParsedCatalog parseCatalog(final BigQuerySqlGenerator sqlGenerator,
429+
final String defaultNamespace,
430+
final String rawNamespaceOverride,
431+
final ConfiguredAirbyteCatalog catalog) {
432+
final CatalogParser catalogParser = new CatalogParser(sqlGenerator, rawNamespaceOverride);
439433

440434
return catalogParser.parseCatalog(catalog);
441435
}

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryGcsOperations.java

+6-12
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,16 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30-
public class BigQueryGcsOperations implements BigQueryStagingOperations {
30+
public class BigQueryGcsOperations {
3131

3232
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryGcsOperations.class);
3333

3434
private final BigQuery bigQuery;
3535
private final StandardNameTransformer gcsNameTransformer;
3636
private final GcsDestinationConfig gcsConfig;
3737
private final GcsStorageOperations gcsStorageOperations;
38+
39+
private final String datasetLocation;
3840
private final UUID randomStagingId;
3941
private final DateTime syncDatetime;
4042
private final boolean keepStagingFiles;
@@ -44,13 +46,15 @@ public BigQueryGcsOperations(final BigQuery bigQuery,
4446
final StandardNameTransformer gcsNameTransformer,
4547
final GcsDestinationConfig gcsConfig,
4648
final GcsStorageOperations gcsStorageOperations,
49+
final String datasetLocation,
4750
final UUID randomStagingId,
4851
final DateTime syncDatetime,
4952
final boolean keepStagingFiles) {
5053
this.bigQuery = bigQuery;
5154
this.gcsNameTransformer = gcsNameTransformer;
5255
this.gcsConfig = gcsConfig;
5356
this.gcsStorageOperations = gcsStorageOperations;
57+
this.datasetLocation = datasetLocation;
5458
this.randomStagingId = randomStagingId;
5559
this.syncDatetime = syncDatetime;
5660
this.keepStagingFiles = keepStagingFiles;
@@ -69,7 +73,6 @@ private String getStagingRootPath(final String datasetId, final String stream) {
6973
/**
7074
* @return {@code <bucket-path>/<dataset-id>_<stream-name>/<year>/<month>/<day>/<hour>/<uuid>/}
7175
*/
72-
@Override
7376
public String getStagingFullPath(final String datasetId, final String stream) {
7477
return gcsNameTransformer.applyDefaultCase(String.format("%s/%s/%02d/%02d/%02d/%s/",
7578
getStagingRootPath(datasetId, stream),
@@ -80,8 +83,7 @@ public String getStagingFullPath(final String datasetId, final String stream) {
8083
randomStagingId));
8184
}
8285

83-
@Override
84-
public void createSchemaIfNotExists(final String datasetId, final String datasetLocation) {
86+
public void createSchemaIfNotExists(final String datasetId) {
8587
if (!existingSchemas.contains(datasetId)) {
8688
LOGGER.info("Creating dataset {}", datasetId);
8789
try {
@@ -97,20 +99,17 @@ public void createSchemaIfNotExists(final String datasetId, final String dataset
9799
}
98100
}
99101

100-
@Override
101102
public void createTableIfNotExists(final TableId tableId, final Schema tableSchema) {
102103
LOGGER.info("Creating target table {}", tableId);
103104
BigQueryUtils.createPartitionedTableIfNotExists(bigQuery, tableId, tableSchema);
104105
}
105106

106-
@Override
107107
public void createStageIfNotExists(final String datasetId, final String stream) {
108108
final String objectPath = getStagingFullPath(datasetId, stream);
109109
LOGGER.info("Creating staging path for stream {} (dataset {}): {}", stream, datasetId, objectPath);
110110
gcsStorageOperations.createBucketIfNotExists();
111111
}
112112

113-
@Override
114113
public String uploadRecordsToStage(final String datasetId, final String stream, final SerializableBuffer writer) {
115114
final String objectPath = getStagingFullPath(datasetId, stream);
116115
LOGGER.info("Uploading records to staging for stream {} (dataset {}): {}", stream, datasetId, objectPath);
@@ -125,7 +124,6 @@ public String uploadRecordsToStage(final String datasetId, final String stream,
125124
* Reference
126125
* https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html
127126
*/
128-
@Override
129127
public void copyIntoTableFromStage(final String datasetId,
130128
final String stream,
131129
final TableId tableId,
@@ -159,7 +157,6 @@ public void copyIntoTableFromStage(final String datasetId,
159157
}
160158
}
161159

162-
@Override
163160
@Deprecated
164161
public void cleanUpStage(final String datasetId, final String stream, final List<String> stagedFiles) {
165162
if (keepStagingFiles) {
@@ -170,13 +167,11 @@ public void cleanUpStage(final String datasetId, final String stream, final List
170167
gcsStorageOperations.cleanUpBucketObject(getStagingRootPath(datasetId, stream), stagedFiles);
171168
}
172169

173-
@Override
174170
public void dropTableIfExists(final String datasetId, final TableId tableId) {
175171
LOGGER.info("Deleting target table {} (dataset {})", tableId, datasetId);
176172
bigQuery.delete(tableId);
177173
}
178174

179-
@Override
180175
public void dropStageIfExists(final String datasetId, final String stream) {
181176
if (keepStagingFiles) {
182177
return;
@@ -200,7 +195,6 @@ public void dropStageIfExists(final String datasetId, final String stream) {
200195
* @param tableId table name
201196
* @param schema schema of the table to be deleted/created
202197
*/
203-
@Override
204198
public void truncateTableIfExists(final String datasetId,
205199
final TableId tableId,
206200
final Schema schema) {

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package io.airbyte.integrations.destination.bigquery;
66

7-
import com.google.cloud.bigquery.BigQuery;
87
import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer;
98
import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager;
109
import io.airbyte.cdk.integrations.destination.async.state.FlushFailure;
@@ -28,17 +27,16 @@ public class BigQueryRecordStandardConsumer extends AsyncStreamConsumer {
2827
public BigQueryRecordStandardConsumer(Consumer<AirbyteMessage> outputRecordCollector,
2928
OnStartFunction onStart,
3029
OnCloseFunction onClose,
31-
BigQuery bigQuery,
3230
ConfiguredAirbyteCatalog catalog,
33-
Optional<String> defaultNamespace,
31+
String defaultNamespace,
3432
Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap) {
3533
super(outputRecordCollector,
3634
onStart,
3735
onClose,
38-
new BigQueryAsyncStandardFlush(bigQuery, uploaderMap),
36+
new BigQueryAsyncStandardFlush(uploaderMap),
3937
catalog,
4038
new BufferManager((long) (Runtime.getRuntime().maxMemory() * 0.5)),
41-
defaultNamespace,
39+
Optional.ofNullable(defaultNamespace),
4240
new FlushFailure(),
4341
Executors.newFixedThreadPool(2));
4442
}

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQuerySQLNameTransformer.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ public String convertStreamName(final String input) {
2424
}
2525

2626
/**
27-
* BigQuery allows a number to be the first character of a namespace. Datasets that begin with an
28-
* underscore are hidden databases, and we cannot query <hidden-dataset>.INFORMATION_SCHEMA. So we
29-
* append a letter instead of underscore for normalization. Reference:
27+
* BigQuery allows a number to be the first character of a originalNamespace. Datasets that begin
28+
* with an underscore are hidden databases, and we cannot query <hidden-dataset>.INFORMATION_SCHEMA.
29+
* So we append a letter instead of underscore for normalization. Reference:
3030
* https://cloud.google.com/bigquery/docs/datasets#dataset-naming
3131
*/
3232
@Override

0 commit comments

Comments
 (0)