Skip to content

Commit 311b27f

Browse files
convert CDK s3-destinations to kotlin
1 parent 884b216 commit 311b27f

File tree

140 files changed

+3723
-4340
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

140 files changed

+3723
-4340
lines changed

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/BaseGcsDestination.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ public AirbyteConnectionStatus check(final JsonNode config) {
4545
final AmazonS3 s3Client = destinationConfig.getS3Client();
4646

4747
// Test single upload (for small files) permissions
48-
S3BaseChecks.testSingleUpload(s3Client, destinationConfig.getBucketName(), destinationConfig.getBucketPath());
48+
S3BaseChecks.testSingleUpload(s3Client, destinationConfig.bucketName, destinationConfig.bucketPath);
4949

5050
// Test multipart upload with stream transfer manager
51-
S3BaseChecks.testMultipartUpload(s3Client, destinationConfig.getBucketName(), destinationConfig.getBucketPath());
51+
S3BaseChecks.testMultipartUpload(s3Client, destinationConfig.bucketName, destinationConfig.bucketPath);
5252

5353
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
5454
} catch (final AmazonS3Exception e) {

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,18 @@ public static GcsDestinationConfig getGcsDestinationConfig(final JsonNode config
6161

6262
@Override
6363
protected AmazonS3 createS3Client() {
64-
switch (credentialConfig.getCredentialType()) {
64+
switch (credentialConfig.credentialType) {
6565
case HMAC_KEY -> {
6666
final GcsHmacKeyCredentialConfig hmacKeyCredential = (GcsHmacKeyCredentialConfig) credentialConfig;
6767
final BasicAWSCredentials awsCreds = new BasicAWSCredentials(hmacKeyCredential.getHmacKeyAccessId(), hmacKeyCredential.getHmacKeySecret());
6868

6969
return AmazonS3ClientBuilder.standard()
7070
.withEndpointConfiguration(
71-
new AwsClientBuilder.EndpointConfiguration(GCS_ENDPOINT, getBucketRegion()))
71+
new AwsClientBuilder.EndpointConfiguration(GCS_ENDPOINT, bucketRegion))
7272
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
7373
.build();
7474
}
75-
default -> throw new IllegalArgumentException("Unsupported credential type: " + credentialConfig.getCredentialType().name());
75+
default -> throw new IllegalArgumentException("Unsupported credential type: " + credentialConfig.credentialType.name());
7676
}
7777
}
7878

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroWriter.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,24 +69,24 @@ public GcsAvroWriter(final GcsDestinationConfig config,
6969

7070
final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.AVRO);
7171
objectKey = String.join("/", outputPrefix, outputFilename);
72-
gcsFileLocation = String.format("gs://%s/%s", config.getBucketName(), objectKey);
72+
gcsFileLocation = String.format("gs://%s/%s", config.bucketName, objectKey);
7373

74-
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(),
74+
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.bucketName,
7575
objectKey);
7676

7777
this.avroRecordFactory = new AvroRecordFactory(schema, converter);
7878
this.uploadManager = StreamTransferManagerFactory
79-
.create(config.getBucketName(), objectKey, s3Client)
79+
.create(config.bucketName, objectKey, s3Client)
8080
.setPartSize((long) DEFAULT_PART_SIZE_MB)
8181
.get();
8282
// We only need one output stream as we only have one input stream. This is reasonably performant.
8383
this.outputStream = uploadManager.getMultiPartOutputStreams().get(0);
8484

85-
final S3AvroFormatConfig formatConfig = (S3AvroFormatConfig) config.getFormatConfig();
85+
final S3AvroFormatConfig formatConfig = (S3AvroFormatConfig) config.formatConfig;
8686
// The DataFileWriter always uses binary encoding.
8787
// If json encoding is needed in the future, use the GenericDatumWriter directly.
8888
this.dataFileWriter = new DataFileWriter<>(new GenericDatumWriter<Record>())
89-
.setCodec(formatConfig.getCodecFactory())
89+
.setCodec(formatConfig.codecFactory)
9090
.create(schema, outputStream);
9191
}
9292

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvWriter.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,25 +48,25 @@ public GcsCsvWriter(final GcsDestinationConfig config,
4848
throws IOException {
4949
super(config, s3Client, configuredStream);
5050

51-
final S3CsvFormatConfig formatConfig = (S3CsvFormatConfig) config.getFormatConfig();
51+
final S3CsvFormatConfig formatConfig = (S3CsvFormatConfig) config.formatConfig;
5252
this.csvSheetGenerator = CsvSheetGenerator.Factory.create(configuredStream.getStream().getJsonSchema(), formatConfig);
5353

5454
final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.CSV);
5555
objectKey = String.join("/", outputPrefix, outputFilename);
56-
gcsFileLocation = String.format("gs://%s/%s", config.getBucketName(), objectKey);
56+
gcsFileLocation = String.format("gs://%s/%s", config.bucketName, objectKey);
5757

58-
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(),
58+
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.bucketName,
5959
objectKey);
6060

6161
this.uploadManager = StreamTransferManagerFactory
62-
.create(config.getBucketName(), objectKey, s3Client)
62+
.create(config.bucketName, objectKey, s3Client)
6363
.setPartSize((long) DEFAULT_PART_SIZE_MB)
6464
.get();
6565
// We only need one output stream as we only have one input stream. This is reasonably performant.
6666
this.outputStream = uploadManager.getMultiPartOutputStreams().get(0);
6767
this.csvPrinter = new CSVPrinter(new PrintWriter(outputStream, true, StandardCharsets.UTF_8),
6868
CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL)
69-
.withHeader(csvSheetGenerator.getHeaderRow().toArray(new String[0])));
69+
.withHeader(csvSheetGenerator.headerRow.toArray(new String[0])));
7070
}
7171

7272
@Override

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/jsonl/GcsJsonlWriter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ public GcsJsonlWriter(final GcsDestinationConfig config,
4949
final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.JSONL);
5050
objectKey = String.join("/", outputPrefix, outputFilename);
5151

52-
gcsFileLocation = String.format("gs://%s/%s", config.getBucketName(), objectKey);
53-
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), objectKey);
52+
gcsFileLocation = String.format("gs://%s/%s", config.bucketName, objectKey);
53+
LOGGER.info("Full GCS path for stream '{}': {}/{}", stream.getName(), config.bucketName, objectKey);
5454

5555
this.uploadManager = StreamTransferManagerFactory
56-
.create(config.getBucketName(), objectKey, s3Client)
56+
.create(config.bucketName, objectKey, s3Client)
5757
.get();
5858

5959
// We only need one output stream as we only have one input stream. This is reasonably performant.

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/parquet/GcsParquetWriter.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,23 +54,23 @@ public GcsParquetWriter(final GcsDestinationConfig config,
5454

5555
final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.PARQUET);
5656
objectKey = String.join("/", outputPrefix, outputFilename);
57-
LOGGER.info("Storage path for stream '{}': {}/{}", stream.getName(), config.getBucketName(), objectKey);
57+
LOGGER.info("Storage path for stream '{}': {}/{}", stream.getName(), config.bucketName, objectKey);
5858

59-
gcsFileLocation = String.format("s3a://%s/%s/%s", config.getBucketName(), outputPrefix, outputFilename);
59+
gcsFileLocation = String.format("s3a://%s/%s/%s", config.bucketName, outputPrefix, outputFilename);
6060
final URI uri = new URI(gcsFileLocation);
6161
final Path path = new Path(uri);
6262

6363
LOGGER.info("Full GCS path for stream '{}': {}", stream.getName(), path);
6464

65-
final S3ParquetFormatConfig formatConfig = (S3ParquetFormatConfig) config.getFormatConfig();
65+
final S3ParquetFormatConfig formatConfig = (S3ParquetFormatConfig) config.formatConfig;
6666
final Configuration hadoopConfig = getHadoopConfig(config);
6767
this.parquetWriter = AvroParquetWriter.<Record>builder(HadoopOutputFile.fromPath(path, hadoopConfig))
6868
.withSchema(schema)
69-
.withCompressionCodec(formatConfig.getCompressionCodec())
70-
.withRowGroupSize(formatConfig.getBlockSize())
71-
.withMaxPaddingSize(formatConfig.getMaxPaddingSize())
72-
.withPageSize(formatConfig.getPageSize())
73-
.withDictionaryPageSize(formatConfig.getDictionaryPageSize())
69+
.withCompressionCodec(formatConfig.compressionCodec)
70+
.withRowGroupSize(formatConfig.blockSize)
71+
.withMaxPaddingSize(formatConfig.maxPaddingSize)
72+
.withPageSize(formatConfig.pageSize)
73+
.withDictionaryPageSize(formatConfig.dictionaryPageSize)
7474
.withDictionaryEncoding(formatConfig.isDictionaryEncoding())
7575
.build();
7676
this.avroRecordFactory = new AvroRecordFactory(schema, converter);

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/java/io/airbyte/cdk/integrations/destination/gcs/writer/BaseGcsWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ protected BaseGcsWriter(final GcsDestinationConfig config,
5050
this.s3Client = s3Client;
5151
this.stream = configuredStream.getStream();
5252
this.syncMode = configuredStream.getDestinationSyncMode();
53-
this.outputPrefix = S3OutputPathHelper.getOutputPrefix(config.getBucketPath(), stream);
53+
this.outputPrefix = S3OutputPathHelper.getOutputPrefix(config.bucketPath, stream);
5454
}
5555

5656
/**
@@ -62,7 +62,7 @@ protected BaseGcsWriter(final GcsDestinationConfig config,
6262
@Override
6363
public void initialize() throws IOException {
6464
try {
65-
final String bucket = config.getBucketName();
65+
final String bucket = config.bucketName;
6666
if (!gcsBucketExist(s3Client, bucket)) {
6767
LOGGER.info("Bucket {} does not exist; creating...", bucket);
6868
s3Client.createBucket(bucket);

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/java/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfigTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ public void testGetGcsDestinationConfig() throws IOException {
2424
final JsonNode configJson = Jsons.deserialize(MoreResources.readResource("test_config.json"));
2525

2626
final GcsDestinationConfig config = GcsDestinationConfig.getGcsDestinationConfig(configJson);
27-
assertEquals("test_bucket", config.getBucketName());
28-
assertEquals("test_path", config.getBucketPath());
29-
assertEquals("us-west1", config.getBucketRegion());
27+
assertEquals("test_bucket", config.bucketName);
28+
assertEquals("test_path", config.bucketPath);
29+
assertEquals("us-west1", config.bucketRegion);
3030

3131
final GcsCredentialConfig credentialConfig = config.getGcsCredentialConfig();
3232
assertTrue(credentialConfig instanceof GcsHmacKeyCredentialConfig);
@@ -35,11 +35,11 @@ public void testGetGcsDestinationConfig() throws IOException {
3535
assertEquals("test_access_id", hmacKeyConfig.getHmacKeyAccessId());
3636
assertEquals("test_secret", hmacKeyConfig.getHmacKeySecret());
3737

38-
final S3FormatConfig formatConfig = config.getFormatConfig();
38+
final S3FormatConfig formatConfig = config.formatConfig;
3939
assertTrue(formatConfig instanceof S3AvroFormatConfig);
4040

4141
final S3AvroFormatConfig avroFormatConfig = (S3AvroFormatConfig) formatConfig;
42-
assertEquals("deflate-5", avroFormatConfig.getCodecFactory().toString());
42+
assertEquals("deflate-5", avroFormatConfig.codecFactory.toString());
4343
}
4444

4545
}

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/java/io/airbyte/cdk/integrations/destination/gcs/avro/GcsAvroFormatConfigTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,11 @@ public void testHandlePartSizeConfig() throws IllegalAccessException {
111111
.getGcsDestinationConfig(config);
112112
ConfigTestUtils.assertBaseConfig(gcsDestinationConfig);
113113

114-
final S3FormatConfig formatConfig = gcsDestinationConfig.getFormatConfig();
115-
assertEquals("AVRO", formatConfig.getFormat().name());
114+
final S3FormatConfig formatConfig = gcsDestinationConfig.formatConfig;
115+
assertEquals("AVRO", formatConfig.format.name());
116116
// Assert that is set properly in config
117117
final StreamTransferManager streamTransferManager = StreamTransferManagerFactory
118-
.create(gcsDestinationConfig.getBucketName(), "objectKey", null)
118+
.create(gcsDestinationConfig.bucketName, "objectKey", null)
119119
.get();
120120

121121
final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true);
@@ -134,7 +134,7 @@ public void testHandleAbsenceOfPartSizeConfig() throws IllegalAccessException {
134134
ConfigTestUtils.assertBaseConfig(gcsDestinationConfig);
135135

136136
final StreamTransferManager streamTransferManager = StreamTransferManagerFactory
137-
.create(gcsDestinationConfig.getBucketName(), "objectKey", null)
137+
.create(gcsDestinationConfig.bucketName, "objectKey", null)
138138
.get();
139139

140140
final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true);

airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/test/java/io/airbyte/cdk/integrations/destination/gcs/csv/GcsCsvFormatConfigTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ public void testHandlePartSizeConfig() throws IllegalAccessException {
4646
final GcsDestinationConfig gcsDestinationConfig = GcsDestinationConfig.getGcsDestinationConfig(config);
4747
ConfigTestUtils.assertBaseConfig(gcsDestinationConfig);
4848

49-
final S3FormatConfig formatConfig = gcsDestinationConfig.getFormatConfig();
50-
assertEquals("CSV", formatConfig.getFormat().name());
49+
final S3FormatConfig formatConfig = gcsDestinationConfig.formatConfig;
50+
assertEquals("CSV", formatConfig.format.name());
5151
// Assert that is set properly in config
5252
final StreamTransferManager streamTransferManager = StreamTransferManagerFactory
53-
.create(gcsDestinationConfig.getBucketName(), "objectKey", null)
53+
.create(gcsDestinationConfig.bucketName, "objectKey", null)
5454
.get();
5555

5656
final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true);
@@ -69,7 +69,7 @@ public void testHandleAbsenceOfPartSizeConfig() throws IllegalAccessException {
6969
ConfigTestUtils.assertBaseConfig(gcsDestinationConfig);
7070

7171
final StreamTransferManager streamTransferManager = StreamTransferManagerFactory
72-
.create(gcsDestinationConfig.getBucketName(), "objectKey", null)
72+
.create(gcsDestinationConfig.bucketName, "objectKey", null)
7373
.get();
7474

7575
final Integer partSizeBytes = (Integer) FieldUtils.readField(streamTransferManager, "partSize", true);

0 commit comments

Comments
 (0)