Skip to content

Java based S3 Multipart Client #4254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ public class CustomizationConfig {
*/
private String asyncClientDecorator;

/**
* Only for s3. A set of customization to related to multipart operations.
*/
private MultipartCustomization multipartCustomization;

/**
* Whether to skip generating endpoint tests from endpoint-tests.json
*/
Expand Down Expand Up @@ -665,4 +670,12 @@ public Map<String, ClientContextParam> getCustomClientContextParams() {
public void setCustomClientContextParams(Map<String, ClientContextParam> customClientContextParams) {
this.customClientContextParams = customClientContextParams;
}

public MultipartCustomization getMultipartCustomization() {
return this.multipartCustomization;
}

public void setMultipartCustomization(MultipartCustomization multipartCustomization) {
this.multipartCustomization = multipartCustomization;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.codegen.model.config.customization;

public class MultipartCustomization {
private String multipartConfigurationClass;
private String multipartConfigMethodDoc;
private String multipartEnableMethodDoc;
private String contextParamEnabledKey;
private String contextParamConfigKey;

public String getMultipartConfigurationClass() {
return multipartConfigurationClass;
}

public void setMultipartConfigurationClass(String multipartConfigurationClass) {
this.multipartConfigurationClass = multipartConfigurationClass;
}

public String getMultipartConfigMethodDoc() {
return multipartConfigMethodDoc;
}

public void setMultipartConfigMethodDoc(String multipartMethodDoc) {
this.multipartConfigMethodDoc = multipartMethodDoc;
}

public String getMultipartEnableMethodDoc() {
return multipartEnableMethodDoc;
}

public void setMultipartEnableMethodDoc(String multipartEnableMethodDoc) {
this.multipartEnableMethodDoc = multipartEnableMethodDoc;
}

public String getContextParamEnabledKey() {
return contextParamEnabledKey;
}

public void setContextParamEnabledKey(String contextParamEnabledKey) {
this.contextParamEnabledKey = contextParamEnabledKey;
}

public String getContextParamConfigKey() {
return contextParamConfigKey;
}

public void setContextParamConfigKey(String contextParamConfigKey) {
this.contextParamConfigKey = contextParamConfigKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.Collections;

/**
* Represents the a Poet generated class
* Represents a Poet generated class
*/
public interface ClassSpec {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

import com.squareup.javapoet.ClassName;
import com.squareup.javapoet.MethodSpec;
import com.squareup.javapoet.ParameterSpec;
import com.squareup.javapoet.ParameterizedTypeName;
import com.squareup.javapoet.TypeSpec;
import java.net.URI;
import javax.lang.model.element.Modifier;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.auth.token.credentials.SdkTokenProvider;
import software.amazon.awssdk.awscore.client.config.AwsClientOption;
import software.amazon.awssdk.codegen.model.config.customization.MultipartCustomization;
import software.amazon.awssdk.codegen.model.intermediate.IntermediateModel;
import software.amazon.awssdk.codegen.poet.ClassSpec;
import software.amazon.awssdk.codegen.poet.PoetExtension;
Expand Down Expand Up @@ -59,12 +61,12 @@ public AsyncClientBuilderClass(IntermediateModel model) {
@Override
public TypeSpec poetSpec() {
TypeSpec.Builder builder =
PoetUtils.createClassBuilder(builderClassName)
.addAnnotation(SdkInternalApi.class)
.addModifiers(Modifier.FINAL)
.superclass(ParameterizedTypeName.get(builderBaseClassName, builderInterfaceName, clientInterfaceName))
.addSuperinterface(builderInterfaceName)
.addJavadoc("Internal implementation of {@link $T}.", builderInterfaceName);
PoetUtils.createClassBuilder(builderClassName)
.addAnnotation(SdkInternalApi.class)
.addModifiers(Modifier.FINAL)
.superclass(ParameterizedTypeName.get(builderBaseClassName, builderInterfaceName, clientInterfaceName))
.addSuperinterface(builderInterfaceName)
.addJavadoc("Internal implementation of {@link $T}.", builderInterfaceName);

if (model.getEndpointOperation().isPresent()) {
builder.addMethod(endpointDiscoveryEnabled());
Expand All @@ -80,6 +82,12 @@ public TypeSpec poetSpec() {
builder.addMethod(bearerTokenProviderMethod());
}

MultipartCustomization multipartCustomization = model.getCustomizationConfig().getMultipartCustomization();
if (multipartCustomization != null) {
builder.addMethod(multipartEnabledMethod(multipartCustomization));
builder.addMethod(multipartConfigMethods(multipartCustomization));
}

builder.addMethod(buildClientMethod());
builder.addMethod(initializeServiceClientConfigMethod());

Expand Down Expand Up @@ -124,15 +132,15 @@ private MethodSpec endpointProviderMethod() {

private MethodSpec buildClientMethod() {
MethodSpec.Builder builder = MethodSpec.methodBuilder("buildClient")
.addAnnotation(Override.class)
.addModifiers(Modifier.PROTECTED, Modifier.FINAL)
.returns(clientInterfaceName)
.addStatement("$T clientConfiguration = super.asyncClientConfiguration()",
SdkClientConfiguration.class).addStatement("this.validateClientOptions"
+ "(clientConfiguration)")
.addStatement("$T serviceClientConfiguration = initializeServiceClientConfig"
+ "(clientConfiguration)",
serviceConfigClassName);
.addAnnotation(Override.class)
.addModifiers(Modifier.PROTECTED, Modifier.FINAL)
.returns(clientInterfaceName)
.addStatement("$T clientConfiguration = super.asyncClientConfiguration()",
SdkClientConfiguration.class)
.addStatement("this.validateClientOptions(clientConfiguration)")
.addStatement("$T serviceClientConfiguration = initializeServiceClientConfig"
+ "(clientConfiguration)",
serviceConfigClassName);

builder.addStatement("$1T client = new $2T(serviceClientConfiguration, clientConfiguration)",
clientInterfaceName, clientClassName);
Expand All @@ -156,6 +164,32 @@ private MethodSpec bearerTokenProviderMethod() {
.build();
}

private MethodSpec multipartEnabledMethod(MultipartCustomization multipartCustomization) {
return MethodSpec.methodBuilder("multipartEnabled")
.addAnnotation(Override.class)
.addModifiers(Modifier.PUBLIC)
.returns(builderInterfaceName)
.addParameter(Boolean.class, "enabled")
.addStatement("clientContextParams.put($N, enabled)",
multipartCustomization.getContextParamEnabledKey())
.addStatement("return this")
.build();
}

private MethodSpec multipartConfigMethods(MultipartCustomization multipartCustomization) {
ClassName mulitpartConfigClassName =
PoetUtils.classNameFromFqcn(multipartCustomization.getMultipartConfigurationClass());
return MethodSpec.methodBuilder("multipartConfiguration")
.addAnnotation(Override.class)
.addModifiers(Modifier.PUBLIC)
.addParameter(ParameterSpec.builder(mulitpartConfigClassName, "multipartConfig").build())
.returns(builderInterfaceName)
.addStatement("clientContextParams.put($N, multipartConfig)",
multipartCustomization.getContextParamConfigKey())
.addStatement("return this")
.build();
}

private MethodSpec initializeServiceClientConfigMethod() {
return MethodSpec.methodBuilder("initializeServiceClientConfig").addModifiers(Modifier.PRIVATE)
.addParameter(SdkClientConfiguration.class, "clientConfig")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,97 @@

import com.squareup.javapoet.ClassName;
import com.squareup.javapoet.CodeBlock;
import com.squareup.javapoet.MethodSpec;
import com.squareup.javapoet.ParameterSpec;
import com.squareup.javapoet.ParameterizedTypeName;
import com.squareup.javapoet.TypeSpec;
import java.util.function.Consumer;
import javax.lang.model.element.Modifier;
import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
import software.amazon.awssdk.codegen.model.config.customization.MultipartCustomization;
import software.amazon.awssdk.codegen.model.intermediate.IntermediateModel;
import software.amazon.awssdk.codegen.poet.ClassSpec;
import software.amazon.awssdk.codegen.poet.PoetUtils;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;

public class AsyncClientBuilderInterface implements ClassSpec {
private static final Logger log = Logger.loggerFor(AsyncClientBuilderInterface.class);

private final ClassName builderInterfaceName;
private final ClassName clientInterfaceName;
private final ClassName baseBuilderInterfaceName;
private final IntermediateModel model;

public AsyncClientBuilderInterface(IntermediateModel model) {
String basePackage = model.getMetadata().getFullClientPackageName();
this.clientInterfaceName = ClassName.get(basePackage, model.getMetadata().getAsyncInterface());
this.builderInterfaceName = ClassName.get(basePackage, model.getMetadata().getAsyncBuilderInterface());
this.baseBuilderInterfaceName = ClassName.get(basePackage, model.getMetadata().getBaseBuilderInterface());
this.model = model;
}

@Override
public TypeSpec poetSpec() {
return PoetUtils.createInterfaceBuilder(builderInterfaceName)
.addSuperinterface(ParameterizedTypeName.get(ClassName.get(AwsAsyncClientBuilder.class),
builderInterfaceName, clientInterfaceName))
.addSuperinterface(ParameterizedTypeName.get(baseBuilderInterfaceName,
builderInterfaceName, clientInterfaceName))
.addJavadoc(getJavadoc())
.build();
TypeSpec.Builder builder = PoetUtils
.createInterfaceBuilder(builderInterfaceName)
.addSuperinterface(ParameterizedTypeName.get(ClassName.get(AwsAsyncClientBuilder.class),
builderInterfaceName, clientInterfaceName))
.addSuperinterface(ParameterizedTypeName.get(baseBuilderInterfaceName,
builderInterfaceName, clientInterfaceName))
.addJavadoc(getJavadoc());

MultipartCustomization multipartCustomization = model.getCustomizationConfig().getMultipartCustomization();
if (multipartCustomization != null) {
includeMultipartMethod(builder, multipartCustomization);
}
return builder.build();
}

private void includeMultipartMethod(TypeSpec.Builder builder, MultipartCustomization multipartCustomization) {
log.debug(() -> String.format("Adding multipart config methods to builder interface for service '%s'",
model.getMetadata().getServiceId()));

// .multipartEnabled(Boolean)
builder.addMethod(
MethodSpec.methodBuilder("multipartEnabled")
.addModifiers(Modifier.DEFAULT, Modifier.PUBLIC)
.returns(builderInterfaceName)
.addParameter(Boolean.class, "enabled")
.addCode("throw new $T();", UnsupportedOperationException.class)
.addJavadoc(CodeBlock.of(multipartCustomization.getMultipartEnableMethodDoc()))
.build());

// .multipartConfiguration(MultipartConfiguration)
String multiPartConfigMethodName = "multipartConfiguration";
String multipartConfigClass = Validate.notNull(multipartCustomization.getMultipartConfigurationClass(),
"'multipartConfigurationClass' must be defined");
ClassName mulitpartConfigClassName = PoetUtils.classNameFromFqcn(multipartConfigClass);
builder.addMethod(
MethodSpec.methodBuilder(multiPartConfigMethodName)
.addModifiers(Modifier.DEFAULT, Modifier.PUBLIC)
.returns(builderInterfaceName)
.addParameter(ParameterSpec.builder(mulitpartConfigClassName, "multipartConfiguration").build())
.addCode("throw new $T();", UnsupportedOperationException.class)
.addJavadoc(CodeBlock.of(multipartCustomization.getMultipartConfigMethodDoc()))
.build());

// .multipartConfiguration(Consumer<MultipartConfiguration>)
ClassName mulitpartConfigBuilderClassName = PoetUtils.classNameFromFqcn(multipartConfigClass + ".Builder");
ParameterizedTypeName consumerBuilderType = ParameterizedTypeName.get(ClassName.get(Consumer.class),
mulitpartConfigBuilderClassName);
builder.addMethod(
MethodSpec.methodBuilder(multiPartConfigMethodName)
.addModifiers(Modifier.DEFAULT, Modifier.PUBLIC)
.returns(builderInterfaceName)
.addParameter(ParameterSpec.builder(consumerBuilderType, "multipartConfiguration").build())
.addStatement("$T builder = $T.builder()",
mulitpartConfigBuilderClassName,
mulitpartConfigClassName)
.addStatement("multipartConfiguration.accept(builder)")
.addStatement("return multipartConfiguration(builder.build())")
.addJavadoc(CodeBlock.of(multipartCustomization.getMultipartConfigMethodDoc()))
.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static software.amazon.awssdk.awscore.util.AwsHeader.AWS_REQUEST_ID;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -48,7 +49,7 @@ protected AwsResponseMetadata(Map<String, String> metadata) {
}

protected AwsResponseMetadata(AwsResponseMetadata responseMetadata) {
this(responseMetadata.metadata);
this(responseMetadata == null ? new HashMap<>() : responseMetadata.metadata);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.Validate;

/**
* Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is
Expand Down Expand Up @@ -399,4 +402,40 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content
static AsyncRequestBody empty() {
return fromBytes(new byte[0]);
}


/**
* Converts this {@link AsyncRequestBody} to a publisher of {@link AsyncRequestBody}s, each of which publishes a specific
* portion of the original data, based on the provided {@link AsyncRequestBodySplitConfiguration}. The default chunk size
* is 2MB and the default buffer size is 8MB.
*
* <p>
* If content length of this {@link AsyncRequestBody} is present, each divided {@link AsyncRequestBody} is delivered to the
* subscriber right after it's initialized.
* <p>
* If content length is null, it is sent after the entire content for that chunk is buffered.
* In this case, the configured {@code maxMemoryUsageInBytes} must be larger than or equal to {@code chunkSizeInBytes}.
*
* @see AsyncRequestBodySplitConfiguration
*/
default SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration splitConfiguration) {
Validate.notNull(splitConfiguration, "splitConfiguration");

return SplittingPublisher.builder()
.asyncRequestBody(this)
.chunkSizeInBytes(splitConfiguration.chunkSizeInBytes())
.bufferSizeInBytes(splitConfiguration.bufferSizeInBytes())
.build();
}

/**
* This is a convenience method that passes an instance of the {@link AsyncRequestBodySplitConfiguration} builder,
* avoiding the need to create one manually via {@link AsyncRequestBodySplitConfiguration#builder()}.
*
* @see #split(AsyncRequestBodySplitConfiguration)
*/
default SdkPublisher<AsyncRequestBody> split(Consumer<AsyncRequestBodySplitConfiguration.Builder> splitConfiguration) {
Validate.notNull(splitConfiguration, "splitConfiguration");
return split(AsyncRequestBodySplitConfiguration.builder().applyMutation(splitConfiguration).build());
}
}
Loading