Skip to content

Commit 665e6db

Browse files
Split into bucketPath and fullOutputPath
1 parent 34363d1 commit 665e6db

File tree

7 files changed

+42
-38
lines changed

7 files changed

+42
-38
lines changed

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/record_buffer/FileBuffer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.io.InputStream;
1313
import java.io.OutputStream;
1414
import java.nio.file.Files;
15+
import java.util.UUID;
1516
import org.slf4j.Logger;
1617
import org.slf4j.LoggerFactory;
1718

@@ -46,7 +47,7 @@ public FileBuffer(final String fileExtension) {
4647
@Override
4748
public OutputStream getOutputStream() throws IOException {
4849
if (outputStream == null || tempFile == null) {
49-
tempFile = Files.createTempFile("", fileExtension).toFile();
50+
tempFile = Files.createTempFile(UUID.randomUUID().toString(), fileExtension).toFile();
5051
outputStream = new BufferedOutputStream(new FileOutputStream(tempFile));
5152
}
5253
return outputStream;

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,14 @@ private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(
8080
final AirbyteStream abStream = stream.getStream();
8181
final String namespace = abStream.getNamespace();
8282
final String streamName = abStream.getName();
83-
final String bucketPath = config.get(BUCKET_PATH_FIELD).asText();
83+
final String outputBucketPath = config.get(BUCKET_PATH_FIELD).asText();
8484
final String customOutputFormat = String.join("/",
85-
bucketPath,
85+
outputBucketPath,
8686
config.has(PATH_FORMAT_FIELD) && !config.get(PATH_FORMAT_FIELD).asText().isBlank() ? config.get(PATH_FORMAT_FIELD).asText()
8787
: S3DestinationConstants.DEFAULT_PATH_FORMAT);
88-
final String outputBucketPath = storageOperations.getBucketObjectPath(namespace, streamName, SYNC_DATETIME, customOutputFormat);
88+
final String fullOutputPath = storageOperations.getBucketObjectPath(namespace, streamName, SYNC_DATETIME, customOutputFormat);
8989
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
90-
final WriteConfig writeConfig = new WriteConfig(namespace, streamName, outputBucketPath, syncMode);
90+
final WriteConfig writeConfig = new WriteConfig(namespace, streamName, outputBucketPath, fullOutputPath, syncMode);
9191
LOGGER.info("Write config: {}", writeConfig);
9292
return writeConfig;
9393
};
@@ -100,13 +100,13 @@ private OnStartFunction onStartFunction(final BlobStorageOperations storageOpera
100100
if (writeConfig.getSyncMode().equals(DestinationSyncMode.OVERWRITE)) {
101101
final String namespace = writeConfig.getNamespace();
102102
final String stream = writeConfig.getStreamName();
103-
final String outputBucketPath = writeConfig.getOutputBucketPath();
104-
LOGGER.info("Clearing storage area in destination started for namespace {} stream {} bucketObject {}", namespace, stream, outputBucketPath);
103+
final String bucketPath = writeConfig.getOutputBucketPath();
104+
LOGGER.info("Clearing storage area in destination started for namespace {} stream {} bucketObject {}", namespace, stream, bucketPath);
105105
AirbyteSentry.executeWithTracing("PrepareStreamStorage",
106-
() -> storageOperations.dropBucketObject(outputBucketPath),
107-
Map.of("namespace", Objects.requireNonNullElse(namespace, "null"), "stream", stream, "storage", outputBucketPath));
106+
() -> storageOperations.dropBucketObject(bucketPath),
107+
Map.of("namespace", Objects.requireNonNullElse(namespace, "null"), "stream", stream, "storage", bucketPath));
108108
LOGGER.info("Clearing storage area in destination completed for namespace {} stream {} bucketObject {}", namespace, stream,
109-
outputBucketPath);
109+
bucketPath);
110110
}
111111
}
112112
LOGGER.info("Preparing storage area in destination completed.");
@@ -139,7 +139,7 @@ private CheckedBiConsumer<AirbyteStreamNameNamespacePair, SerializableBuffer, Ex
139139
writer,
140140
writeConfig.getNamespace(),
141141
writeConfig.getStreamName(),
142-
writeConfig.getOutputBucketPath()));
142+
writeConfig.getFullOutputPath()));
143143
} catch (final Exception e) {
144144
LOGGER.error("Failed to flush and upload buffer to storage:", e);
145145
throw new RuntimeException("Failed to upload buffer to storage", e);
@@ -153,7 +153,7 @@ private OnCloseFunction onCloseFunction(final BlobStorageOperations storageOpera
153153
if (hasFailed) {
154154
LOGGER.info("Cleaning up destination started for {} streams", writeConfigs.size());
155155
for (final WriteConfig writeConfig : writeConfigs) {
156-
storageOperations.cleanUpBucketObject(writeConfig.getOutputBucketPath(), writeConfig.getStoredFiles());
156+
storageOperations.cleanUpBucketObject(writeConfig.getFullOutputPath(), writeConfig.getStoredFiles());
157157
writeConfig.clearStoredFiles();
158158
}
159159
LOGGER.info("Cleaning up destination completed.");

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConstants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public final class S3DestinationConstants {
1212
public static final S3NameTransformer NAME_TRANSFORMER = new S3NameTransformer();
1313
public static final String PART_SIZE_MB_ARG_NAME = "part_size_mb";
1414
public static final int DEFAULT_PART_SIZE_MB = 10;
15-
public static final String DEFAULT_PATH_FORMAT = "${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_${PART_ID}";
15+
public static final String DEFAULT_PATH_FORMAT = "${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_";
1616

1717
private S3DestinationConstants() {}
1818

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3StorageOperations.java

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import io.airbyte.integrations.destination.NamingConventionTransformer;
1818
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
1919
import io.airbyte.integrations.destination.s3.util.StreamTransferManagerHelper;
20-
import java.io.IOException;
2120
import java.io.InputStream;
2221
import java.util.ArrayList;
2322
import java.util.List;
@@ -50,7 +49,7 @@ public S3StorageOperations(final NamingConventionTransformer nameTransformer, fi
5049
public String getBucketObjectPath(final String namespace, final String streamName, final DateTime writeDatetime, final String customPathFormat) {
5150
final String namespaceStr = nameTransformer.getNamespace(isNotBlank(namespace) ? namespace : "");
5251
final String streamNameStr = nameTransformer.getIdentifier(streamName);
53-
final String objectPath = nameTransformer.applyDefaultCase(
52+
return nameTransformer.applyDefaultCase(
5453
customPathFormat
5554
.replaceAll(Pattern.quote("${NAMESPACE}"), namespaceStr)
5655
.replaceAll(Pattern.quote("${STREAM_NAME}"), streamNameStr)
@@ -64,22 +63,6 @@ public String getBucketObjectPath(final String namespace, final String streamNam
6463
.replaceAll(Pattern.quote("${EPOCH}"), String.format("%d", writeDatetime.getMillis()))
6564
.replaceAll(Pattern.quote("${UUID}"), String.format("%s", UUID.randomUUID()))
6665
.replaceAll("/+", "/"));
67-
if (customPathFormat.contains("${PART_ID}")) {
68-
return objectPath.replaceAll(Pattern.quote("${PART_ID}"), String.format("%s", getPartId(objectPath)));
69-
} else {
70-
return objectPath;
71-
}
72-
}
73-
74-
private String getPartId(final String objectPath) {
75-
final String bucket = s3Config.getBucketName();
76-
final ObjectListing objects = s3Client.listObjects(bucket, objectPath);
77-
if (objects.isTruncated()) {
78-
// bucket contains too many objects, use an uuid instead
79-
return UUID.randomUUID().toString();
80-
} else {
81-
return Integer.toString(objects.getObjectSummaries().size());
82-
}
8366
}
8467

8568
@Override
@@ -122,12 +105,12 @@ public String uploadRecordsToBucket(final SerializableBuffer recordsData, final
122105
return recordsData.getFilename();
123106
}
124107

125-
private void loadDataIntoBucket(final String objectPath, final SerializableBuffer recordsData) throws IOException {
108+
private void loadDataIntoBucket(final String objectPath, final SerializableBuffer recordsData) {
126109
final long partSize = s3Config.getFormatConfig() != null ? s3Config.getFormatConfig().getPartSize() : DEFAULT_PART_SIZE;
127110
final String bucket = s3Config.getBucketName();
128-
final String objectKey = String.format("%s%s", objectPath, recordsData.getFilename());
111+
final String objectKeyWithPartId = String.format("%s%s", objectPath, getPartId(objectPath));
129112
final StreamTransferManager uploadManager = StreamTransferManagerHelper
130-
.getDefault(bucket, objectKey, s3Client, partSize)
113+
.getDefault(bucket, objectKeyWithPartId, s3Client, partSize)
131114
.checkIntegrity(true)
132115
.numUploadThreads(DEFAULT_UPLOAD_THREADS)
133116
.queueCapacity(DEFAULT_QUEUE_CAPACITY);
@@ -146,12 +129,23 @@ private void loadDataIntoBucket(final String objectPath, final SerializableBuffe
146129
uploadManager.complete();
147130
}
148131
}
149-
if (!s3Client.doesObjectExist(bucket, objectKey)) {
150-
LOGGER.error("Failed to upload data into storage, object {} not found", objectKey);
132+
if (!s3Client.doesObjectExist(bucket, objectKeyWithPartId)) {
133+
LOGGER.error("Failed to upload data into storage, object {} not found", objectKeyWithPartId);
151134
throw new RuntimeException("Upload failed");
152135
}
153136
}
154137

138+
private String getPartId(final String objectPath) {
139+
final String bucket = s3Config.getBucketName();
140+
final ObjectListing objects = s3Client.listObjects(bucket, objectPath);
141+
if (objects.isTruncated()) {
142+
// bucket contains too many objects, use an uuid instead
143+
return UUID.randomUUID().toString();
144+
} else {
145+
return Integer.toString(objects.getObjectSummaries().size());
146+
}
147+
}
148+
155149
@Override
156150
public void dropBucketObject(final String objectPath) {
157151
cleanUpBucketObject(objectPath, List.of());

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/WriteConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,19 @@ public class WriteConfig {
1616
private final String namespace;
1717
private final String streamName;
1818
private final String outputBucketPath;
19+
private final String fullOutputPath;
1920
private final DestinationSyncMode syncMode;
2021
private final List<String> storedFiles;
2122

2223
public WriteConfig(final String namespace,
2324
final String streamName,
2425
final String outputBucketPath,
26+
final String fullOutputPath,
2527
final DestinationSyncMode syncMode) {
2628
this.namespace = namespace;
2729
this.streamName = streamName;
2830
this.outputBucketPath = outputBucketPath;
31+
this.fullOutputPath = fullOutputPath;
2932
this.syncMode = syncMode;
3033
this.storedFiles = new ArrayList<>();
3134
}
@@ -42,6 +45,10 @@ public String getOutputBucketPath() {
4245
return outputBucketPath;
4346
}
4447

48+
public String getFullOutputPath() {
49+
return fullOutputPath;
50+
}
51+
4552
public DestinationSyncMode getSyncMode() {
4653
return syncMode;
4754
}
@@ -64,6 +71,7 @@ public String toString() {
6471
"streamName=" + streamName +
6572
", namespace=" + namespace +
6673
", outputBucketPath=" + outputBucketPath +
74+
", fullOutputPath=" + fullOutputPath +
6775
", syncMode=" + syncMode +
6876
'}';
6977
}

airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,9 @@ protected List<S3ObjectSummary> getAllSyncedObjects(final String streamName, fin
100100
streamNameStr,
101101
DateTime.now(DateTimeZone.UTC),
102102
S3DestinationConstants.DEFAULT_PATH_FORMAT);
103+
final String parentFolder = outputPrefix.substring(0, outputPrefix.lastIndexOf("/") + 1);
103104
final List<S3ObjectSummary> objectSummaries = s3Client
104-
.listObjects(config.getBucketName(), outputPrefix)
105+
.listObjects(config.getBucketName(), parentFolder)
105106
.getObjectSummaries()
106107
.stream()
107108
.filter(o -> o.getKey().contains(streamNameStr + "/"))

airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/avro/AvroSerializedBufferTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class AvroSerializedBufferTest {
5555
public void testSnappyAvroWriter() throws Exception {
5656
final S3AvroFormatConfig config = new S3AvroFormatConfig(Jsons.jsonNode(Map.of("compression_codec", Map.of(
5757
"codec", "snappy"))));
58-
runTest(new InMemoryBuffer(AVRO_FILE_EXTENSION), 970L, 980L, config, getExpectedString());
58+
runTest(new InMemoryBuffer(AVRO_FILE_EXTENSION), 965L, 980L, config, getExpectedString());
5959
}
6060

6161
@Test

0 commit comments

Comments
 (0)