Skip to content

Commit 91f1862

Browse files
destination-s3: assume role auth (#38204)
1 parent 5a510dc commit 91f1862

File tree

24 files changed

+554
-129
lines changed

24 files changed

+554
-129
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.35.5 | 2024-05-17 | [\#38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume-role authentication to s3 |
177178
| 0.35.2 | 2024-05-13 | [\#38104](https://github.com/airbytehq/airbyte/pull/38104) | Handle transient error messages |
178179
| 0.35.0 | 2024-05-13 | [\#38127](https://github.com/airbytehq/airbyte/pull/38127) | Destinations: Populate generation/sync ID on StreamConfig |
179180
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.35.4
1+
version=0.35.5

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -367,10 +367,14 @@ abstract class DestinationAcceptanceTest {
367367
workspaceRoot.toString(),
368368
localRoot.toString(),
369369
"host",
370-
emptyMap()
370+
getConnectorEnv()
371371
)
372372
}
373373

374+
open fun getConnectorEnv(): Map<String, String> {
375+
return emptyMap()
376+
}
377+
374378
@AfterEach
375379
@Throws(Exception::class)
376380
fun tearDownInternal() {

airbyte-cdk/java/airbyte-cdk/s3-destinations/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ dependencies {
2121

2222
// Re-export dependencies for gcs-destinations.
2323
api 'com.amazonaws:aws-java-sdk-s3:1.12.647'
24+
api 'com.amazonaws:aws-java-sdk-sts:1.12.647'
2425
api ('com.github.airbytehq:json-avro-converter:1.1.0') { exclude group: 'ch.qos.logback', module: 'logback-classic'}
2526
api 'com.github.alexmojaki:s3-stream-upload:2.2.4'
2627
api 'org.apache.avro:avro:1.11.3'

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/BaseS3Destination.kt

+5-3
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ import org.slf4j.LoggerFactory
2222

2323
abstract class BaseS3Destination
2424
protected constructor(
25-
protected val configFactory: S3DestinationConfigFactory = S3DestinationConfigFactory()
25+
protected val configFactory: S3DestinationConfigFactory = S3DestinationConfigFactory(),
26+
protected val environment: Map<String, String> = System.getenv()
2627
) : BaseConnector(), Destination {
2728
private val nameTransformer: NamingConventionTransformer = S3NameTransformer()
2829

2930
override fun check(config: JsonNode): AirbyteConnectionStatus? {
3031
try {
31-
val destinationConfig = configFactory.getS3DestinationConfig(config, storageProvider())
32+
val destinationConfig =
33+
configFactory.getS3DestinationConfig(config, storageProvider(), environment)
3234
val s3Client = destinationConfig.getS3Client()
3335

3436
S3BaseChecks.testIAMUserHasListObjectPermission(s3Client, destinationConfig.bucketName)
@@ -60,7 +62,7 @@ protected constructor(
6062
catalog: ConfiguredAirbyteCatalog,
6163
outputRecordCollector: Consumer<AirbyteMessage>
6264
): AirbyteMessageConsumer? {
63-
val s3Config = configFactory.getS3DestinationConfig(config, storageProvider())
65+
val s3Config = configFactory.getS3DestinationConfig(config, storageProvider(), environment)
6466
return S3ConsumerFactory()
6567
.create(
6668
outputRecordCollector,

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfig.kt

+58-39
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,7 @@ import com.amazonaws.services.s3.AmazonS3
1111
import com.amazonaws.services.s3.AmazonS3ClientBuilder
1212
import com.fasterxml.jackson.databind.JsonNode
1313
import io.airbyte.cdk.integrations.destination.s3.constant.S3Constants
14-
import io.airbyte.cdk.integrations.destination.s3.credential.S3AWSDefaultProfileCredentialConfig
15-
import io.airbyte.cdk.integrations.destination.s3.credential.S3AccessKeyCredentialConfig
16-
import io.airbyte.cdk.integrations.destination.s3.credential.S3CredentialConfig
17-
import io.airbyte.cdk.integrations.destination.s3.credential.S3CredentialType
14+
import io.airbyte.cdk.integrations.destination.s3.credential.*
1815
import java.util.*
1916
import javax.annotation.Nonnull
2017
import org.slf4j.Logger
@@ -34,6 +31,7 @@ open class S3DestinationConfig {
3431
val formatConfig: UploadFormatConfig?
3532
var fileNamePattern: String? = null
3633
private set
34+
var environment: Map<String, String>
3735

3836
private val lock = Any()
3937
private var s3Client: AmazonS3?
@@ -70,6 +68,7 @@ open class S3DestinationConfig {
7068
this.s3CredentialConfig = credentialConfig
7169
this.formatConfig = formatConfig
7270
this.s3Client = s3Client
71+
this.environment = System.getenv()
7372
}
7473

7574
constructor(
@@ -83,7 +82,8 @@ open class S3DestinationConfig {
8382
s3Client: AmazonS3?,
8483
fileNamePattern: String?,
8584
checkIntegrity: Boolean,
86-
uploadThreadsCount: Int
85+
uploadThreadsCount: Int,
86+
environment: Map<String, String> = System.getenv()
8787
) {
8888
this.endpoint = endpoint
8989
this.bucketName = bucketName
@@ -96,6 +96,7 @@ open class S3DestinationConfig {
9696
this.fileNamePattern = fileNamePattern
9797
this.isCheckIntegrity = checkIntegrity
9898
this.uploadThreadsCount = uploadThreadsCount
99+
this.environment = environment
99100
}
100101

101102
fun resetS3Client(): AmazonS3 {
@@ -113,36 +114,38 @@ open class S3DestinationConfig {
113114
val credentialsProvider = s3CredentialConfig!!.s3CredentialsProvider
114115
val credentialType = s3CredentialConfig.credentialType
115116

116-
if (S3CredentialType.DEFAULT_PROFILE == credentialType) {
117-
return AmazonS3ClientBuilder.standard()
118-
.withRegion(bucketRegion)
119-
.withCredentials(credentialsProvider) // the SDK defaults to RetryMode.LEGACY
120-
// (https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html)
121-
// this _can_ be configured via environment variable, but it seems more reliable to
122-
// configure it
123-
// programmatically
124-
.withClientConfiguration(ClientConfiguration().withRetryMode(RetryMode.STANDARD))
125-
.build()
126-
}
127-
128-
if (null == endpoint || endpoint.isEmpty()) {
129-
return AmazonS3ClientBuilder.standard()
130-
.withCredentials(credentialsProvider)
131-
.withRegion(bucketRegion)
132-
.build()
117+
val clientBuilder = AmazonS3ClientBuilder.standard().withCredentials(credentialsProvider)
118+
when (credentialType) {
119+
S3CredentialType.DEFAULT_PROFILE,
120+
S3CredentialType.ASSUME_ROLE ->
121+
clientBuilder
122+
.withRegion(bucketRegion)
123+
// the SDK defaults to RetryMode.LEGACY
124+
// (https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html)
125+
// this _can_ be configured via environment variable, but it seems more reliable
126+
// to
127+
// configure it
128+
// programmatically
129+
.withClientConfiguration(
130+
ClientConfiguration().withRetryMode(RetryMode.STANDARD)
131+
)
132+
S3CredentialType.ACCESS_KEY -> {
133+
if (null == endpoint || endpoint.isEmpty()) {
134+
clientBuilder.withRegion(bucketRegion)
135+
} else {
136+
val clientConfiguration = ClientConfiguration().withProtocol(Protocol.HTTPS)
137+
clientConfiguration.signerOverride = "AWSS3V4SignerType"
138+
139+
clientBuilder
140+
.withEndpointConfiguration(
141+
AwsClientBuilder.EndpointConfiguration(endpoint, bucketRegion)
142+
)
143+
.withPathStyleAccessEnabled(true)
144+
.withClientConfiguration(clientConfiguration)
145+
}
146+
}
133147
}
134-
135-
val clientConfiguration = ClientConfiguration().withProtocol(Protocol.HTTPS)
136-
clientConfiguration.signerOverride = "AWSS3V4SignerType"
137-
138-
return AmazonS3ClientBuilder.standard()
139-
.withEndpointConfiguration(
140-
AwsClientBuilder.EndpointConfiguration(endpoint, bucketRegion)
141-
)
142-
.withPathStyleAccessEnabled(true)
143-
.withClientConfiguration(clientConfiguration)
144-
.withCredentials(credentialsProvider)
145-
.build()
148+
return clientBuilder.build()
146149
}
147150

148151
override fun equals(other: Any?): Boolean {
@@ -188,6 +191,7 @@ open class S3DestinationConfig {
188191
private var checkIntegrity = true
189192

190193
private var uploadThreadsCount = S3StorageOperations.DEFAULT_UPLOAD_THREADS
194+
private var environment: Map<String, String> = System.getenv()
191195

192196
fun withBucketName(bucketName: String): Builder {
193197
this.bucketName = bucketName
@@ -249,6 +253,11 @@ open class S3DestinationConfig {
249253
return this
250254
}
251255

256+
fun withEnvironment(environment: Map<String, String>): Builder {
257+
this.environment = environment
258+
return this
259+
}
260+
252261
fun get(): S3DestinationConfig {
253262
return S3DestinationConfig(
254263
endpoint,
@@ -261,7 +270,8 @@ open class S3DestinationConfig {
261270
s3Client,
262271
fileNamePattern,
263272
checkIntegrity,
264-
uploadThreadsCount
273+
uploadThreadsCount,
274+
environment
265275
)
266276
}
267277
}
@@ -284,14 +294,18 @@ open class S3DestinationConfig {
284294
}
285295

286296
@JvmStatic
287-
fun getS3DestinationConfig(@Nonnull config: JsonNode): S3DestinationConfig {
288-
return getS3DestinationConfig(config, StorageProvider.AWS_S3)
297+
fun getS3DestinationConfig(
298+
@Nonnull config: JsonNode,
299+
environment: Map<String, String> = System.getenv()
300+
): S3DestinationConfig {
301+
return getS3DestinationConfig(config, StorageProvider.AWS_S3, environment)
289302
}
290303

291304
@JvmStatic
292305
fun getS3DestinationConfig(
293306
@Nonnull config: JsonNode,
294-
@Nonnull storageProvider: StorageProvider
307+
@Nonnull storageProvider: StorageProvider = StorageProvider.AWS_S3,
308+
environment: Map<String, String> = System.getenv()
295309
): S3DestinationConfig {
296310
var builder =
297311
create(
@@ -343,6 +357,11 @@ open class S3DestinationConfig {
343357
getProperty(config, S3Constants.ACCESS_KEY_ID),
344358
getProperty(config, S3Constants.SECRET_ACCESS_KEY)
345359
)
360+
} else if (config.has(S3Constants.ROLE_ARN)) {
361+
S3AssumeRoleCredentialConfig(
362+
getProperty(config, S3Constants.ROLE_ARN)!!,
363+
environment
364+
)
346365
} else {
347366
S3AWSDefaultProfileCredentialConfig()
348367
}
@@ -357,7 +376,7 @@ open class S3DestinationConfig {
357376
UploadFormatConfigFactory.getUploadFormatConfig(config)
358377
)
359378
}
360-
379+
builder.withEnvironment(environment)
361380
return builder.get()
362381
}
363382

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfigFactory.kt

+7-2
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,13 @@ import javax.annotation.Nonnull
99
open class S3DestinationConfigFactory {
1010
open fun getS3DestinationConfig(
1111
config: JsonNode,
12-
@Nonnull storageProvider: StorageProvider
12+
@Nonnull storageProvider: StorageProvider,
13+
environment: Map<String, String>
1314
): S3DestinationConfig {
14-
return S3DestinationConfig.Companion.getS3DestinationConfig(config, storageProvider)
15+
return S3DestinationConfig.Companion.getS3DestinationConfig(
16+
config = config,
17+
storageProvider = storageProvider,
18+
environment = environment
19+
)
1520
}
1621
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/constant/S3Constants.kt

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class S3Constants {
1616
const val SECRET_ACCESS_KEY: String = "secret_access_key"
1717
const val S_3_BUCKET_NAME: String = "s3_bucket_name"
1818
const val S_3_BUCKET_REGION: String = "s3_bucket_region"
19+
const val ROLE_ARN: String = "role_arn"
1920

2021
// r2 requires account_id
2122
const val ACCOUNT_ID: String = "account_id"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
package io.airbyte.cdk.integrations.destination.s3.credential
5+
6+
import com.amazonaws.auth.AWSCredentialsProvider
7+
import com.amazonaws.auth.AWSStaticCredentialsProvider
8+
import com.amazonaws.auth.BasicAWSCredentials
9+
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider
10+
import com.amazonaws.regions.Regions
11+
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient
12+
import java.util.concurrent.Executors
13+
import java.util.concurrent.ThreadFactory
14+
15+
private const val AIRBYTE_STS_SESSION_NAME = "airbyte-sts-session"
16+
17+
internal class DaemonThreadFactory : ThreadFactory {
18+
override fun newThread(runnable: Runnable): Thread {
19+
val thread = Executors.defaultThreadFactory().newThread(runnable)
20+
thread.isDaemon = true
21+
return thread
22+
}
23+
}
24+
25+
/**
26+
* The S3AssumeRoleCredentialConfig implementation of the S3CredentialConfig returns an
27+
* STSAssumeRoleSessionCredentialsProvider. The STSAssumeRoleSessionCredentialsProvider
28+
* automatically refreshes assumed role credentials on a background thread. The roleArn comes from
29+
* the spec and the externalId, which is used to protect against confused deputy problems, and also
30+
* is provided through the orchestrator via an environment variable. As of 5/2024, the externalId is
31+
* set to the workspaceId.
32+
*
33+
* @param roleArn The Amazon Resource Name (ARN) of the role to assume.
34+
*/
35+
class S3AssumeRoleCredentialConfig(private val roleArn: String, environment: Map<String, String>) :
36+
S3CredentialConfig {
37+
private val externalId: String = environment.getValue("AWS_ASSUME_ROLE_EXTERNAL_ID")
38+
39+
override val credentialType: S3CredentialType = S3CredentialType.ASSUME_ROLE
40+
41+
/**
42+
* AWSCredentialsProvider implementation that uses the AWS Security Token Service to assume a
43+
* Role and create temporary, short-lived sessions to use for authentication. This credentials
44+
* provider uses a background thread to refresh credentials. This background thread can be shut
45+
* down via the close() method when the credentials provider is no longer used.
46+
*/
47+
override val s3CredentialsProvider: AWSCredentialsProvider by lazy {
48+
STSAssumeRoleSessionCredentialsProvider.Builder(roleArn, AIRBYTE_STS_SESSION_NAME)
49+
.withExternalId(externalId)
50+
.withStsClient(
51+
AWSSecurityTokenServiceClient.builder()
52+
.withRegion(Regions.DEFAULT_REGION)
53+
.withCredentials(getCredentialProvider(environment))
54+
.build()
55+
)
56+
.withAsyncRefreshExecutor(Executors.newSingleThreadExecutor(DaemonThreadFactory()))
57+
.build()
58+
}
59+
60+
companion object {
61+
@JvmStatic
62+
fun getCredentialProvider(environment: Map<String, String>): AWSStaticCredentialsProvider {
63+
return AWSStaticCredentialsProvider(
64+
BasicAWSCredentials(
65+
environment.getValue("AWS_ACCESS_KEY_ID"),
66+
environment.getValue("AWS_SECRET_ACCESS_KEY")
67+
)
68+
)
69+
}
70+
}
71+
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/credential/S3CredentialConfig.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package io.airbyte.cdk.integrations.destination.s3.credential
55

66
import com.amazonaws.auth.AWSCredentialsProvider
77

8-
interface S3CredentialConfig : BlobStorageCredentialConfig<S3CredentialType> {
8+
interface S3CredentialConfig {
99
val s3CredentialsProvider: AWSCredentialsProvider
10+
val credentialType: S3CredentialType
1011
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/credential/S3CredentialType.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ package io.airbyte.cdk.integrations.destination.s3.credential
55

66
enum class S3CredentialType {
77
ACCESS_KEY,
8-
DEFAULT_PROFILE
8+
DEFAULT_PROFILE,
9+
ASSUME_ROLE
910
}

airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,11 @@ protected constructor(protected val outputFormat: FileUploadFormat) : Destinatio
127127
.set<JsonNode>("format", formatConfig)
128128
this.configJson = configJson
129129
this.s3DestinationConfig =
130-
S3DestinationConfig.getS3DestinationConfig(configJson, storageProvider())
130+
S3DestinationConfig.getS3DestinationConfig(
131+
configJson,
132+
storageProvider(),
133+
getConnectorEnv()
134+
)
131135
LOGGER.info(
132136
"Test full path: {}/{}",
133137
s3DestinationConfig.bucketName,

airbyte-integrations/connectors/destination-s3/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
airbyteJavaConnector {
77
cdkVersionRequired = '0.30.8'
88
features = ['db-destinations', 's3-destinations']
9-
useLocalCdk = false
9+
useLocalCdk = true
1010
}
1111

1212
airbyteJavaConnector.addCdkDependencies()

0 commit comments

Comments
 (0)