Skip to content

🐛 Fix bucket path for destination s3 #11496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,19 @@ private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(
final AirbyteStream abStream = stream.getStream();
final String namespace = abStream.getNamespace();
final String streamName = abStream.getName();
final String outputNamespace = getOutputNamespace(abStream, config.get(BUCKET_PATH_FIELD).asText(), namingResolver);
final String customOutputFormat =
config.has(PATH_FORMAT_FIELD) && !config.get(PATH_FORMAT_FIELD).asText().isBlank() ? config.get(PATH_FORMAT_FIELD).asText()
: S3DestinationConstants.DEFAULT_PATH_FORMAT;
final String outputBucketPath = storageOperations.getBucketObjectPath(outputNamespace, streamName, SYNC_DATETIME, customOutputFormat);
final String bucketPath = config.get(BUCKET_PATH_FIELD).asText();
final String customOutputFormat = String.join("/",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does customOutputFormat still make sense as the name here when we're including bucketPath?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can still customize the end part of the bucket path

it just always starts with the bucketPath now

bucketPath,
config.has(PATH_FORMAT_FIELD) && !config.get(PATH_FORMAT_FIELD).asText().isBlank() ?
config.get(PATH_FORMAT_FIELD).asText() : S3DestinationConstants.DEFAULT_PATH_FORMAT);
final String outputBucketPath = storageOperations.getBucketObjectPath(namespace, streamName, SYNC_DATETIME, customOutputFormat);
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
final WriteConfig writeConfig = new WriteConfig(namespace, streamName, outputNamespace, outputBucketPath, syncMode);
final WriteConfig writeConfig = new WriteConfig(namespace, streamName, outputBucketPath, syncMode);
LOGGER.info("Write config: {}", writeConfig);
return writeConfig;
};
}

private static String getOutputNamespace(final AirbyteStream stream,
final String defaultDestNamespace,
final NamingConventionTransformer namingResolver) {
return stream.getNamespace() != null
? namingResolver.getNamespace(stream.getNamespace())
: namingResolver.getNamespace(defaultDestNamespace);
}

private OnStartFunction onStartFunction(final BlobStorageOperations storageOperations, final List<WriteConfig> writeConfigs) {
return () -> {
LOGGER.info("Preparing bucket in destination started for {} streams", writeConfigs.size());
Expand Down Expand Up @@ -144,7 +137,7 @@ private CheckedBiConsumer<AirbyteStreamNameNamespacePair, SerializableBuffer, Ex
writer.flush();
writeConfig.addStoredFile(storageOperations.uploadRecordsToBucket(
writer,
writeConfig.getOutputNamespace(),
writeConfig.getNamespace(),
writeConfig.getStreamName(),
writeConfig.getOutputBucketPath()));
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@ public class WriteConfig {

private final String namespace;
private final String streamName;
private final String outputNamespace;
private final String outputBucketPath;
private final DestinationSyncMode syncMode;
private final List<String> storedFiles;

public WriteConfig(final String namespace,
final String streamName,
final String outputNamespace,
final String outputBucketPath,
final DestinationSyncMode syncMode) {
this.namespace = namespace;
this.streamName = streamName;
this.outputNamespace = outputNamespace;
this.outputBucketPath = outputBucketPath;
this.syncMode = syncMode;
this.storedFiles = new ArrayList<>();
Expand All @@ -41,10 +38,6 @@ public String getStreamName() {
return streamName;
}

public String getOutputNamespace() {
return outputNamespace;
}

public String getOutputBucketPath() {
return outputBucketPath;
}
Expand All @@ -70,7 +63,6 @@ public String toString() {
return "WriteConfig{" +
"streamName=" + streamName +
", namespace=" + namespace +
", outputNamespace=" + outputNamespace +
", outputBucketPath=" + outputBucketPath +
", syncMode=" + syncMode +
'}';
Expand Down