Skip to content

Commit 1188221

Browse files
authored
Destination BigQuery: Nuking old remnants (#38111)
1 parent 1685e4c commit 1188221

17 files changed

+124
-843
lines changed

airbyte-integrations/connectors/destination-bigquery/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
8-
dockerImageTag: 2.4.17
8+
dockerImageTag: 2.4.18
99
dockerRepository: airbyte/destination-bigquery
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
1111
githubIssueLabel: destination-bigquery

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryAsyncStandardFlush.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import com.google.common.util.concurrent.RateLimiter;
99
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction;
1010
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage;
11-
import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader;
11+
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader;
1212
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
1313
import io.airbyte.protocol.models.v0.StreamDescriptor;
1414
import java.util.concurrent.ConcurrentMap;
@@ -24,18 +24,18 @@ public class BigQueryAsyncStandardFlush implements DestinationFlushFunction {
2424
private static final RateLimiter rateLimiter = RateLimiter.create(0.07);
2525

2626
private final BigQuery bigQuery;
27-
private final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>>> uploaderMap;
27+
private final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap;
2828

2929
public BigQueryAsyncStandardFlush(final BigQuery bigQuery,
30-
final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>>> uploaderMap) {
30+
final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap) {
3131
this.bigQuery = bigQuery;
3232
this.uploaderMap = uploaderMap;
3333
}
3434

3535
@Override
3636
public void flush(final StreamDescriptor decs, final Stream<PartialAirbyteMessage> stream) throws Exception {
3737
rateLimiter.acquire();
38-
final ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMapSupplied = uploaderMap.get();
38+
final ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader> uploaderMapSupplied = uploaderMap.get();
3939
final AtomicInteger recordCount = new AtomicInteger();
4040
stream.forEach(aibyteMessage -> {
4141
try {

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java

+15-17
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,11 @@
3838
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
3939
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
4040
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
41-
import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryRecordFormatter;
42-
import io.airbyte.integrations.destination.bigquery.formatter.GcsCsvBigQueryRecordFormatter;
4341
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler;
4442
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator;
4543
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV1V2Migrator;
4644
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2TableMigrator;
47-
import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader;
45+
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader;
4846
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory;
4947
import io.airbyte.integrations.destination.bigquery.uploader.UploaderType;
5048
import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig;
@@ -292,14 +290,14 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
292290
BigQueryUtils.getDatasetId(config));
293291
}
294292

295-
protected Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>>> getUploaderMap(
296-
final BigQuery bigquery,
297-
final JsonNode config,
298-
final ConfiguredAirbyteCatalog catalog,
299-
final ParsedCatalog parsedCatalog)
293+
protected Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> getUploaderMap(
294+
final BigQuery bigquery,
295+
final JsonNode config,
296+
final ConfiguredAirbyteCatalog catalog,
297+
final ParsedCatalog parsedCatalog)
300298
throws IOException {
301299
return () -> {
302-
final ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMap = new ConcurrentHashMap<>();
300+
final ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader> uploaderMap = new ConcurrentHashMap<>();
303301
for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) {
304302
final AirbyteStream stream = configStream.getStream();
305303
final StreamConfig parsedStream;
@@ -315,7 +313,7 @@ protected Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQuer
315313
.configStream(configStream)
316314
.parsedStream(parsedStream)
317315
.config(config)
318-
.formatterMap(getFormatterMap(stream.getJsonSchema()))
316+
.formatterMap(getFormatterMap())
319317
.targetTableName(targetTableName)
320318
// This refers to whether this is BQ denormalized or not
321319
.isDefaultAirbyteTmpSchema(isDefaultAirbyteTmpTableSchema())
@@ -333,7 +331,7 @@ protected Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQuer
333331

334332
protected void putStreamIntoUploaderMap(final AirbyteStream stream,
335333
final UploaderConfig uploaderConfig,
336-
final Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMap)
334+
final Map<AirbyteStreamNameNamespacePair, BigQueryDirectUploader> uploaderMap)
337335
throws IOException {
338336
uploaderMap.put(
339337
AirbyteStreamNameNamespacePair.fromAirbyteStream(stream),
@@ -351,10 +349,10 @@ protected boolean isDefaultAirbyteTmpTableSchema() {
351349
return true;
352350
}
353351

354-
protected Map<UploaderType, BigQueryRecordFormatter> getFormatterMap(final JsonNode jsonSchema) {
352+
protected Map<UploaderType, BigQueryRecordFormatter> getFormatterMap() {
355353
return Map.of(
356-
UploaderType.STANDARD, new DefaultBigQueryRecordFormatter(jsonSchema, namingResolver),
357-
UploaderType.CSV, new GcsCsvBigQueryRecordFormatter(jsonSchema, namingResolver));
354+
UploaderType.STANDARD, new BigQueryRecordFormatter(namingResolver),
355+
UploaderType.CSV, new BigQueryRecordFormatter(namingResolver));
358356
}
359357

360358
private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuery bigquery,
@@ -364,7 +362,7 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
364362
final Consumer<AirbyteMessage> outputRecordCollector,
365363
final TyperDeduper typerDeduper)
366364
throws Exception {
367-
final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>>> writeConfigs = getUploaderMap(
365+
final Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> writeConfigs = getUploaderMap(
368366
bigquery,
369367
config,
370368
catalog,
@@ -390,7 +388,7 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
390388
LOGGER.info("Raw table {} not found, continuing with creation", rawTableId);
391389
}
392390
LOGGER.info("Creating table {}", rawTableId);
393-
BigQueryUtils.createPartitionedTableIfNotExists(bigquery, rawTableId, DefaultBigQueryRecordFormatter.SCHEMA_V2);
391+
BigQueryUtils.createPartitionedTableIfNotExists(bigquery, rawTableId, BigQueryRecordFormatter.SCHEMA_V2);
394392
} else {
395393
uploader.createRawTable();
396394
}
@@ -415,7 +413,7 @@ private SerializedAirbyteMessageConsumer getStandardRecordConsumer(final BigQuer
415413
}
416414

417415
protected Function<JsonNode, BigQueryRecordFormatter> getCsvRecordFormatterCreator(final BigQuerySQLNameTransformer namingResolver) {
418-
return streamSchema -> new GcsCsvBigQueryRecordFormatter(streamSchema, namingResolver);
416+
return streamSchema -> new BigQueryRecordFormatter(namingResolver);
419417
}
420418

421419
private void setDefaultStreamNamespace(final ConfiguredAirbyteCatalog catalog, final String namespace) {

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java

-138
This file was deleted.

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordStandardConsumer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import io.airbyte.cdk.integrations.destination.async.state.FlushFailure;
1111
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction;
1212
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction;
13-
import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader;
13+
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryDirectUploader;
1414
import io.airbyte.protocol.models.v0.AirbyteMessage;
1515
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
1616
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
@@ -31,7 +31,7 @@ public BigQueryRecordStandardConsumer(Consumer<AirbyteMessage> outputRecordColle
3131
BigQuery bigQuery,
3232
ConfiguredAirbyteCatalog catalog,
3333
Optional<String> defaultNamespace,
34-
Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>>> uploaderMap) {
34+
Supplier<ConcurrentMap<AirbyteStreamNameNamespacePair, BigQueryDirectUploader>> uploaderMap) {
3535
super(outputRecordCollector,
3636
onStart,
3737
onClose,

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryStagingConsumerFactory.java

-7
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer;
1414
import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager;
1515
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction;
16-
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer;
1716
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction;
1817
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction;
1918
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
@@ -117,12 +116,6 @@ private Map<StreamDescriptor, BigQueryWriteConfig> createWriteConfigs(final Json
117116
}
118117

119118
/**
120-
* Sets up {@link BufferedStreamConsumer} with creation of the destination's raw tables
121-
*
122-
* <p>
123-
* Note: targetTableId is synonymous with airbyte_raw table
124-
* </p>
125-
*
126119
* @param bigQueryGcsOperations collection of Google Cloud Storage Operations
127120
* @param writeConfigs configuration settings used to describe how to write data and where it exists
128121
*/

0 commit comments

Comments
 (0)