Skip to content

Commit e65a97a

Browse files
authored
Replace constructors in stream transfer manager helper with a builder (#13253)
1 parent 0562e95 commit e65a97a

File tree

16 files changed

+183
-165
lines changed

16 files changed

+183
-165
lines changed

airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroWriter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory;
1616
import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
1717
import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig;
18-
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
18+
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
1919
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
2020
import io.airbyte.protocol.models.AirbyteRecordMessage;
2121
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
@@ -74,8 +74,10 @@ public GcsAvroWriter(final GcsDestinationConfig config,
7474
objectKey);
7575

7676
this.avroRecordFactory = new AvroRecordFactory(schema, converter);
77-
this.uploadManager = StreamTransferManagerHelper.getDefault(
78-
config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize());
77+
this.uploadManager = StreamTransferManagerFactory
78+
.create(config.getBucketName(), objectKey, s3Client)
79+
.setPartSize(config.getFormatConfig().getPartSize())
80+
.get();
7981
// We only need one output stream as we only have one input stream. This is reasonably performant.
8082
this.outputStream = uploadManager.getMultiPartOutputStreams().get(0);
8183

airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvWriter.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import io.airbyte.integrations.destination.s3.S3Format;
1414
import io.airbyte.integrations.destination.s3.csv.CsvSheetGenerator;
1515
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig;
16-
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
16+
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
1717
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
1818
import io.airbyte.protocol.models.AirbyteRecordMessage;
1919
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
@@ -56,8 +56,10 @@ public GcsCsvWriter(final GcsDestinationConfig config,
5656
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(),
5757
objectKey);
5858

59-
this.uploadManager = StreamTransferManagerHelper.getDefault(
60-
config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize());
59+
this.uploadManager = StreamTransferManagerFactory
60+
.create(config.getBucketName(), objectKey, s3Client)
61+
.setPartSize(config.getFormatConfig().getPartSize())
62+
.get();
6163
// We only need one output stream as we only have one input stream. This is reasonably performant.
6264
this.outputStream = uploadManager.getMultiPartOutputStreams().get(0);
6365
this.csvPrinter = new CSVPrinter(new PrintWriter(outputStream, true, StandardCharsets.UTF_8),
@@ -71,7 +73,7 @@ public void write(final UUID id, final AirbyteRecordMessage recordMessage) throw
7173
}
7274

7375
@Override
74-
public void write(JsonNode formattedData) throws IOException {
76+
public void write(final JsonNode formattedData) throws IOException {
7577
csvPrinter.printRecord(csvSheetGenerator.getDataRow(formattedData));
7678
}
7779

airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlWriter.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
1717
import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter;
1818
import io.airbyte.integrations.destination.s3.S3Format;
19-
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
19+
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
2020
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
2121
import io.airbyte.protocol.models.AirbyteRecordMessage;
2222
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
@@ -52,8 +52,10 @@ public GcsJsonlWriter(final GcsDestinationConfig config,
5252
gcsFileLocation = String.format("gs://%s/%s", config.getBucketName(), objectKey);
5353
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), objectKey);
5454

55-
this.uploadManager = StreamTransferManagerHelper.getDefault(
56-
config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize());
55+
this.uploadManager = StreamTransferManagerFactory
56+
.create(config.getBucketName(), objectKey, s3Client)
57+
.setPartSize(config.getFormatConfig().getPartSize())
58+
.get();
5759

5860
// We only need one output stream as we only have one input stream. This is reasonably performant.
5961
this.outputStream = uploadManager.getMultiPartOutputStreams().get(0);
@@ -70,7 +72,7 @@ public void write(final UUID id, final AirbyteRecordMessage recordMessage) {
7072
}
7173

7274
@Override
73-
public void write(JsonNode formattedData) throws IOException {
75+
public void write(final JsonNode formattedData) throws IOException {
7476
printWriter.println(Jsons.serialize(formattedData));
7577
}
7678

airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/avro/GcsAvroFormatConfigTest.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import io.airbyte.integrations.destination.s3.S3DestinationConstants;
1717
import io.airbyte.integrations.destination.s3.S3FormatConfig;
1818
import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig;
19-
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
19+
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
2020
import java.util.List;
2121
import org.apache.avro.file.CodecFactory;
2222
import org.apache.avro.file.DataFileConstants;
@@ -116,9 +116,10 @@ public void testHandlePartSizeConfig() throws IllegalAccessException {
116116
assertEquals("AVRO", formatConfig.getFormat().name());
117117
assertEquals(6, formatConfig.getPartSize());
118118
// Assert that is set properly in config
119-
final StreamTransferManager streamTransferManager = StreamTransferManagerHelper.getDefault(
120-
gcsDestinationConfig.getBucketName(), "objectKey", null,
121-
gcsDestinationConfig.getFormatConfig().getPartSize());
119+
final StreamTransferManager streamTransferManager = StreamTransferManagerFactory
120+
.create(gcsDestinationConfig.getBucketName(), "objectKey", null)
121+
.setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize())
122+
.get();
122123

123124
final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true);
124125
assertEquals(MB * 6, partSizeBytes);
@@ -135,9 +136,10 @@ public void testHandleAbsenceOfPartSizeConfig() throws IllegalAccessException {
135136
.getGcsDestinationConfig(config);
136137
ConfigTestUtils.assertBaseConfig(gcsDestinationConfig);
137138

138-
final StreamTransferManager streamTransferManager = StreamTransferManagerHelper.getDefault(
139-
gcsDestinationConfig.getBucketName(), "objectKey", null,
140-
gcsDestinationConfig.getFormatConfig().getPartSize());
139+
final StreamTransferManager streamTransferManager = StreamTransferManagerFactory
140+
.create(gcsDestinationConfig.getBucketName(), "objectKey", null)
141+
.setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize())
142+
.get();
141143

142144
final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true);
143145
assertEquals(MB * S3DestinationConstants.DEFAULT_PART_SIZE_MB, partSizeBytes);

airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/csv/GcsCsvFormatConfigTest.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import io.airbyte.integrations.destination.s3.S3DestinationConstants;
1717
import io.airbyte.integrations.destination.s3.S3FormatConfig;
1818
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening;
19-
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
19+
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
2020
import org.apache.commons.lang3.reflect.FieldUtils;
2121
import org.junit.jupiter.api.DisplayName;
2222
import org.junit.jupiter.api.Test;
@@ -52,9 +52,10 @@ public void testHandlePartSizeConfig() throws IllegalAccessException {
5252
assertEquals("CSV", formatConfig.getFormat().name());
5353
assertEquals(6, formatConfig.getPartSize());
5454
// Assert that is set properly in config
55-
final StreamTransferManager streamTransferManager = StreamTransferManagerHelper.getDefault(
56-
gcsDestinationConfig.getBucketName(), "objectKey", null,
57-
gcsDestinationConfig.getFormatConfig().getPartSize());
55+
final StreamTransferManager streamTransferManager = StreamTransferManagerFactory
56+
.create(gcsDestinationConfig.getBucketName(), "objectKey", null)
57+
.setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize())
58+
.get();
5859

5960
final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true);
6061
assertEquals(MB * 6, partSizeBytes);
@@ -71,9 +72,10 @@ public void testHandleAbsenceOfPartSizeConfig() throws IllegalAccessException {
7172
final GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config);
7273
ConfigTestUtils.assertBaseConfig(gcsDestinationConfig);
7374

74-
final StreamTransferManager streamTransferManager = StreamTransferManagerHelper.getDefault(
75-
gcsDestinationConfig.getBucketName(), "objectKey", null,
76-
gcsDestinationConfig.getFormatConfig().getPartSize());
75+
final StreamTransferManager streamTransferManager = StreamTransferManagerFactory
76+
.create(gcsDestinationConfig.getBucketName(), "objectKey", null)
77+
.setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize())
78+
.get();
7779

7880
final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true);
7981
assertEquals(MB * S3DestinationConstants.DEFAULT_PART_SIZE_MB, partSizeBytes);

airbyte-integrations/connectors/destination-gcs/src/test/java/io/airbyte/integrations/destination/gcs/jsonl/GcsJsonlFormatConfigTest.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import io.airbyte.integrations.destination.gcs.util.ConfigTestUtils;
1515
import io.airbyte.integrations.destination.s3.S3DestinationConstants;
1616
import io.airbyte.integrations.destination.s3.S3FormatConfig;
17-
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
17+
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
1818
import org.apache.commons.lang3.reflect.FieldUtils;
1919
import org.junit.jupiter.api.DisplayName;
2020
import org.junit.jupiter.api.Test;
@@ -39,9 +39,10 @@ public void testHandlePartSizeConfig() throws IllegalAccessException {
3939
assertEquals(6, formatConfig.getPartSize());
4040

4141
// Assert that is set properly in config
42-
final StreamTransferManager streamTransferManager = StreamTransferManagerHelper.getDefault(
43-
gcsDestinationConfig.getBucketName(), "objectKey", null,
44-
gcsDestinationConfig.getFormatConfig().getPartSize());
42+
final StreamTransferManager streamTransferManager = StreamTransferManagerFactory
43+
.create(gcsDestinationConfig.getBucketName(), "objectKey", null)
44+
.setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize())
45+
.get();
4546

4647
final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true);
4748
assertEquals(MB * 6, partSizeBytes);
@@ -58,9 +59,10 @@ public void testHandleAbsenceOfPartSizeConfig() throws IllegalAccessException {
5859
.getGcsDestinationConfig(config);
5960
ConfigTestUtils.assertBaseConfig(gcsDestinationConfig);
6061

61-
final StreamTransferManager streamTransferManager = StreamTransferManagerHelper.getDefault(
62-
gcsDestinationConfig.getBucketName(), "objectKey", null,
63-
gcsDestinationConfig.getFormatConfig().getPartSize());
62+
final StreamTransferManager streamTransferManager = StreamTransferManagerFactory
63+
.create(gcsDestinationConfig.getBucketName(), "objectKey", null)
64+
.setPartSize(gcsDestinationConfig.getFormatConfig().getPartSize())
65+
.get();
6466

6567
final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true);
6668
assertEquals(MB * S3DestinationConstants.DEFAULT_PART_SIZE_MB, partSizeBytes);

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import io.airbyte.integrations.destination.NamingConventionTransformer;
1818
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
1919
import io.airbyte.integrations.destination.s3.util.S3NameTransformer;
20-
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
20+
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
2121
import io.airbyte.protocol.models.AirbyteConnectionStatus;
2222
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
2323
import io.airbyte.protocol.models.AirbyteMessage;
@@ -77,7 +77,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
7777
}
7878
}
7979

80-
public static void testSingleUpload(final AmazonS3 s3Client, final String bucketName, String bucketPath) {
80+
public static void testSingleUpload(final AmazonS3 s3Client, final String bucketName, final String bucketPath) {
8181
LOGGER.info("Started testing if all required credentials assigned to user for single file uploading");
8282
if (bucketPath.endsWith("/")) {
8383
throw new RuntimeException("Bucket Path should not end with /");
@@ -91,17 +91,13 @@ public static void testSingleUpload(final AmazonS3 s3Client, final String bucket
9191
LOGGER.info("Finished checking for normal upload mode");
9292
}
9393

94-
public static void testMultipartUpload(final AmazonS3 s3Client, final String bucketName, String bucketPath) throws IOException {
94+
public static void testMultipartUpload(final AmazonS3 s3Client, final String bucketName, final String bucketPath) throws IOException {
9595
LOGGER.info("Started testing if all required credentials assigned to user for multipart upload");
9696
if (bucketPath.endsWith("/")) {
9797
throw new RuntimeException("Bucket Path should not end with /");
9898
}
9999
final String testFile = bucketPath + "/" + "test_" + System.currentTimeMillis();
100-
final StreamTransferManager manager = StreamTransferManagerHelper.getDefault(
101-
bucketName,
102-
testFile,
103-
s3Client,
104-
(long) StreamTransferManagerHelper.DEFAULT_PART_SIZE_MB);
100+
final StreamTransferManager manager = StreamTransferManagerFactory.create(bucketName, testFile, s3Client).get();
105101
boolean success = false;
106102
try (final MultiPartOutputStream outputStream = manager.getMultiPartOutputStreams().get(0);
107103
final CSVPrinter csvPrinter = new CSVPrinter(new PrintWriter(outputStream, true, StandardCharsets.UTF_8), CSVFormat.DEFAULT)) {

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import io.airbyte.commons.string.Strings;
1919
import io.airbyte.integrations.destination.NamingConventionTransformer;
2020
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
21-
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
21+
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
2222
import java.io.IOException;
2323
import java.io.InputStream;
2424
import java.io.OutputStream;
@@ -141,8 +141,10 @@ private String loadDataIntoBucket(final String objectPath, final SerializableBuf
141141
for (final BlobDecorator blobDecorator : blobDecorators) {
142142
blobDecorator.updateMetadata(metadata, getMetadataMapping());
143143
}
144-
final StreamTransferManager uploadManager = StreamTransferManagerHelper
145-
.getDefault(bucket, fullObjectKey, s3Client, partSize, metadata)
144+
final StreamTransferManager uploadManager = StreamTransferManagerFactory.create(bucket, fullObjectKey, s3Client)
145+
.setPartSize(partSize)
146+
.setUserMetadata(metadata)
147+
.get()
146148
.checkIntegrity(true)
147149
.numUploadThreads(DEFAULT_UPLOAD_THREADS)
148150
.queueCapacity(DEFAULT_QUEUE_CAPACITY);
@@ -293,7 +295,7 @@ protected Map<String, String> getMetadataMapping() {
293295
AesCbcEnvelopeEncryptionBlobDecorator.INITIALIZATION_VECTOR, "x-amz-iv");
294296
}
295297

296-
public void uploadManifest(String bucketName, String manifestFilePath, String manifestContents) {
298+
public void uploadManifest(final String bucketName, final String manifestFilePath, final String manifestContents) {
297299
s3Client.putObject(s3Config.getBucketName(), manifestFilePath, manifestContents);
298300
}
299301

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/S3AvroWriter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import com.fasterxml.jackson.databind.JsonNode;
1111
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
1212
import io.airbyte.integrations.destination.s3.S3Format;
13-
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
13+
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
1414
import io.airbyte.integrations.destination.s3.writer.BaseS3Writer;
1515
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
1616
import io.airbyte.protocol.models.AirbyteRecordMessage;
@@ -54,8 +54,10 @@ public S3AvroWriter(final S3DestinationConfig config,
5454
gcsFileLocation = String.format("gs://%s/%s", config.getBucketName(), objectKey);
5555

5656
this.avroRecordFactory = new AvroRecordFactory(schema, converter);
57-
this.uploadManager = StreamTransferManagerHelper.getDefault(
58-
config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize());
57+
this.uploadManager = StreamTransferManagerFactory
58+
.create(config.getBucketName(), objectKey, s3Client)
59+
.setPartSize(config.getFormatConfig().getPartSize())
60+
.get();
5961
// We only need one output stream as we only have one input stream. This is reasonably performant.
6062
this.outputStream = uploadManager.getMultiPartOutputStreams().get(0);
6163

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/csv/S3CsvWriter.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import com.fasterxml.jackson.databind.JsonNode;
1111
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
1212
import io.airbyte.integrations.destination.s3.S3Format;
13-
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
13+
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerFactory;
1414
import io.airbyte.integrations.destination.s3.writer.BaseS3Writer;
1515
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
1616
import io.airbyte.protocol.models.AirbyteRecordMessage;
@@ -59,7 +59,10 @@ private S3CsvWriter(final S3DestinationConfig config,
5959
objectKey);
6060
gcsFileLocation = String.format("gs://%s/%s", config.getBucketName(), objectKey);
6161

62-
this.uploadManager = StreamTransferManagerHelper.getDefault(config.getBucketName(), objectKey, s3Client, config.getFormatConfig().getPartSize())
62+
this.uploadManager = StreamTransferManagerFactory
63+
.create(config.getBucketName(), objectKey, s3Client)
64+
.setPartSize(config.getFormatConfig().getPartSize())
65+
.get()
6366
.numUploadThreads(uploadThreads)
6467
.queueCapacity(queueCapacity);
6568
// We only need one output stream as we only have one input stream. This is reasonably performant.
@@ -76,8 +79,8 @@ public static class Builder {
7679
private final AmazonS3 s3Client;
7780
private final ConfiguredAirbyteStream configuredStream;
7881
private final Timestamp uploadTimestamp;
79-
private int uploadThreads = StreamTransferManagerHelper.DEFAULT_UPLOAD_THREADS;
80-
private int queueCapacity = StreamTransferManagerHelper.DEFAULT_QUEUE_CAPACITY;
82+
private int uploadThreads = StreamTransferManagerFactory.DEFAULT_UPLOAD_THREADS;
83+
private int queueCapacity = StreamTransferManagerFactory.DEFAULT_QUEUE_CAPACITY;
8184
private boolean withHeader = true;
8285
private CSVFormat csvSettings = CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL);
8386
private CsvSheetGenerator csvSheetGenerator;

0 commit comments

Comments
 (0)