Skip to content

Rename File format related classes to be agnostic of S3 #37442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import io.airbyte.cdk.integrations.destination.gcs.credential.GcsCredentialType
import io.airbyte.cdk.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig
import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfig
import io.airbyte.cdk.integrations.destination.s3.S3DestinationConstants
import io.airbyte.cdk.integrations.destination.s3.S3FormatConfig
import io.airbyte.cdk.integrations.destination.s3.S3FormatConfigs.getS3FormatConfig
import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations
import io.airbyte.cdk.integrations.destination.s3.UploadFormatConfig
import io.airbyte.cdk.integrations.destination.s3.UploadFormatConfigFactory.getUploadFormatConfig

/**
* Currently we always reuse the S3 client for GCS. So the GCS config extends from the S3 config.
Expand All @@ -28,7 +28,7 @@ class GcsDestinationConfig(
bucketPath: String,
bucketRegion: String?,
val gcsCredentialConfig: GcsCredentialConfig,
formatConfig: S3FormatConfig
formatConfig: UploadFormatConfig
) :
S3DestinationConfig(
GCS_ENDPOINT,
Expand Down Expand Up @@ -76,7 +76,7 @@ class GcsDestinationConfig(
config["gcs_bucket_path"].asText(),
config["gcs_bucket_region"].asText(),
GcsCredentialConfigs.getCredentialConfig(config),
getS3FormatConfig(config)
getUploadFormatConfig(config)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.util.GcsUtils
import io.airbyte.cdk.integrations.destination.gcs.writer.BaseGcsWriter
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.avro.AvroRecordFactory
import io.airbyte.cdk.integrations.destination.s3.avro.JsonToAvroSchemaConverter
import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig
import io.airbyte.cdk.integrations.destination.s3.avro.UploadAvroFormatConfig
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
Expand Down Expand Up @@ -63,8 +63,7 @@ constructor(
)
LOGGER.info("Avro schema for stream {}: {}", stream.name, schema!!.toString(false))

val outputFilename: String =
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.AVRO)
val outputFilename: String = getOutputFilename(uploadTimestamp, FileUploadFormat.AVRO)
outputPath = java.lang.String.join("/", outputPrefix, outputFilename)
fileLocation = String.format("gs://%s/%s", config.bucketName, outputPath)

Expand All @@ -84,7 +83,7 @@ constructor(
// performant.
this.outputStream = uploadManager.multiPartOutputStreams[0]

val formatConfig = config.formatConfig as S3AvroFormatConfig
val formatConfig = config.formatConfig as UploadAvroFormatConfig
// The DataFileWriter always uses binary encoding.
// If json encoding is needed in the future, use the GenericDatumWriter directly.
this.dataFileWriter =
Expand Down Expand Up @@ -118,8 +117,8 @@ constructor(
uploadManager.abort()
}

override val fileFormat: S3Format
get() = S3Format.AVRO
override val fileFormat: FileUploadFormat
get() = FileUploadFormat.AVRO

companion object {
protected val LOGGER: Logger = LoggerFactory.getLogger(GcsAvroWriter::class.java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import com.amazonaws.services.s3.AmazonS3
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.writer.BaseGcsWriter
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSheetGenerator
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSheetGenerator.Factory.create
import io.airbyte.cdk.integrations.destination.s3.csv.S3CsvFormatConfig
import io.airbyte.cdk.integrations.destination.s3.csv.UploadCsvFormatConfig
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
Expand Down Expand Up @@ -43,11 +43,11 @@ class GcsCsvWriter(
override val outputPath: String

init {
val formatConfig = config.formatConfig as S3CsvFormatConfig
val formatConfig = config.formatConfig as UploadCsvFormatConfig
this.csvSheetGenerator = create(configuredStream.stream.jsonSchema, formatConfig)

val outputFilename: String =
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.CSV)
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, FileUploadFormat.CSV)
outputPath = java.lang.String.join("/", outputPrefix, outputFilename)
fileLocation = String.format("gs://%s/%s", config.bucketName, outputPath)

Expand Down Expand Up @@ -97,8 +97,8 @@ class GcsCsvWriter(
uploadManager.abort()
}

override val fileFormat: S3Format
get() = S3Format.CSV
override val fileFormat: FileUploadFormat
get() = FileUploadFormat.CSV

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(GcsCsvWriter::class.java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.writer.BaseGcsWriter
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
import io.airbyte.commons.jackson.MoreMappers
Expand Down Expand Up @@ -40,7 +40,7 @@ class GcsJsonlWriter(

init {
val outputFilename: String =
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.JSONL)
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, FileUploadFormat.JSONL)
outputPath = java.lang.String.join("/", outputPrefix, outputFilename)

fileLocation = String.format("gs://%s/%s", config.bucketName, outputPath)
Expand Down Expand Up @@ -84,8 +84,8 @@ class GcsJsonlWriter(
uploadManager.abort()
}

override val fileFormat: S3Format
get() = S3Format.JSONL
override val fileFormat: FileUploadFormat
get() = FileUploadFormat.JSONL

companion object {
protected val LOGGER: Logger = LoggerFactory.getLogger(GcsJsonlWriter::class.java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig
import io.airbyte.cdk.integrations.destination.gcs.util.GcsS3FileSystem
import io.airbyte.cdk.integrations.destination.gcs.writer.BaseGcsWriter
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.avro.AvroRecordFactory
import io.airbyte.cdk.integrations.destination.s3.parquet.S3ParquetFormatConfig
import io.airbyte.cdk.integrations.destination.s3.parquet.UploadParquetFormatConfig
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
Expand Down Expand Up @@ -46,7 +46,7 @@ class GcsParquetWriter(

init {
val outputFilename: String =
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.PARQUET)
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, FileUploadFormat.PARQUET)
outputPath = java.lang.String.join("/", outputPrefix, outputFilename)
LOGGER.info(
"Storage path for stream '{}': {}/{}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole class and the one above look suspicious. This init seems to be doing the same thing except for odd differences, like using an s3:// address here.... I'm definitely in need of understanding those thing better...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(just to be clear, I understand that this came from the autoformatting, so I'm not expecting any changes from you)

Expand All @@ -62,7 +62,7 @@ class GcsParquetWriter(

LOGGER.info("Full GCS path for stream '{}': {}", stream.name, path)

val formatConfig = config.formatConfig as S3ParquetFormatConfig
val formatConfig = config.formatConfig as UploadParquetFormatConfig
val hadoopConfig = getHadoopConfig(config)
this.parquetWriter =
AvroParquetWriter.builder<GenericData.Record>(
Expand Down Expand Up @@ -102,8 +102,8 @@ class GcsParquetWriter(
}
}

override val fileFormat: S3Format
get() = S3Format.PARQUET
override val fileFormat: FileUploadFormat
get() = FileUploadFormat.PARQUET

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(GcsParquetWriter::class.java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.DeleteObjectsRequest
import com.amazonaws.services.s3.model.HeadBucketRequest
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.S3DestinationConstants
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.util.S3OutputPathHelper.getOutputPrefix
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
import io.airbyte.protocol.models.v0.AirbyteStream
Expand Down Expand Up @@ -128,7 +128,7 @@ protected constructor(
private val LOGGER: Logger = LoggerFactory.getLogger(BaseGcsWriter::class.java)

// Filename: <upload-date>_<upload-millis>_0.<format-extension>
fun getOutputFilename(timestamp: Timestamp, format: S3Format): String {
fun getOutputFilename(timestamp: Timestamp, format: FileUploadFormat): String {
val formatter: DateFormat =
SimpleDateFormat(S3DestinationConstants.YYYY_MM_DD_FORMAT_STRING)
formatter.timeZone = TimeZone.getTimeZone("UTC")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package io.airbyte.cdk.integrations.destination.gcs

import io.airbyte.cdk.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig
import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig
import io.airbyte.cdk.integrations.destination.s3.avro.UploadAvroFormatConfig
import io.airbyte.commons.json.Jsons
import io.airbyte.commons.resources.MoreResources
import java.io.IOException
Expand All @@ -30,9 +30,9 @@ internal class GcsDestinationConfigTest {
Assertions.assertEquals("test_secret", hmacKeyConfig.hmacKeySecret)

val formatConfig = config.formatConfig
Assertions.assertTrue(formatConfig is S3AvroFormatConfig)
Assertions.assertTrue(formatConfig is UploadAvroFormatConfig)

val avroFormatConfig = formatConfig as S3AvroFormatConfig
val avroFormatConfig = formatConfig as UploadAvroFormatConfig
Assertions.assertEquals("deflate-5", avroFormatConfig.codecFactory.toString())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.amazonaws.services.s3.internal.Constants
import com.google.common.collect.Lists
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.util.ConfigTestUtils
import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig.Companion.parseCodecConfig
import io.airbyte.cdk.integrations.destination.s3.avro.UploadAvroFormatConfig.Companion.parseCodecConfig
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.commons.json.Jsons
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.google.common.collect.Lists
import io.airbyte.cdk.integrations.base.DestinationConfig.Companion.initialize
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig
import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig
import io.airbyte.cdk.integrations.destination.s3.avro.UploadAvroFormatConfig
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
Expand All @@ -34,7 +34,7 @@ internal class GcsAvroWriterTest {
"fake-bucketPath",
"fake-bucketRegion",
GcsHmacKeyCredentialConfig("fake-access-id", "fake-secret"),
S3AvroFormatConfig(ObjectMapper().createObjectNode())
UploadAvroFormatConfig(ObjectMapper().createObjectNode())
),
Mockito.mock(AmazonS3::class.java, Mockito.RETURNS_DEEP_STUBS),
ConfiguredAirbyteStream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package io.airbyte.cdk.integrations.destination.gcs

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.avro.JsonSchemaType
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import io.airbyte.cdk.integrations.standardtest.destination.argproviders.NumberDataTypeTestArgumentProvider
Expand All @@ -25,8 +25,8 @@ import org.junit.jupiter.api.Assertions
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ArgumentsSource

abstract class GcsAvroParquetDestinationAcceptanceTest(s3Format: S3Format) :
GcsDestinationAcceptanceTest(s3Format) {
abstract class GcsAvroParquetDestinationAcceptanceTest(fileUploadFormat: FileUploadFormat) :
GcsDestinationAcceptanceTest(fileUploadFormat) {
override fun getProtocolVersion() = ProtocolVersion.V1

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package io.airbyte.cdk.integrations.destination.gcs

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectReader
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.avro.AvroConstants
import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.getFieldNameUpdater
import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.pruneAirbyteJson
Expand All @@ -20,7 +20,7 @@ import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericDatumReader

abstract class GcsBaseAvroDestinationAcceptanceTest :
GcsAvroParquetDestinationAcceptanceTest(S3Format.AVRO) {
GcsAvroParquetDestinationAcceptanceTest(FileUploadFormat.AVRO) {
override val formatConfig: JsonNode?
get() =
Jsons.deserialize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.amazonaws.services.s3.model.S3Object
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.util.Flattening
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import io.airbyte.commons.json.Jsons
Expand All @@ -21,7 +21,8 @@ import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVRecord
import org.apache.commons.csv.QuoteMode

abstract class GcsBaseCsvDestinationAcceptanceTest : GcsDestinationAcceptanceTest(S3Format.CSV) {
abstract class GcsBaseCsvDestinationAcceptanceTest :
GcsDestinationAcceptanceTest(FileUploadFormat.CSV) {
override fun getProtocolVersion() = ProtocolVersion.V1

override val formatConfig: JsonNode?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package io.airbyte.cdk.integrations.destination.gcs
import com.amazonaws.services.s3.model.S3Object
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import io.airbyte.commons.json.Jsons
import java.io.BufferedReader
Expand All @@ -19,7 +19,7 @@ import kotlin.collections.List
import kotlin.collections.MutableList

abstract class GcsBaseJsonlDestinationAcceptanceTest :
GcsDestinationAcceptanceTest(S3Format.JSONL) {
GcsDestinationAcceptanceTest(FileUploadFormat.JSONL) {
override fun getProtocolVersion() = ProtocolVersion.V1

override val formatConfig: JsonNode?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package io.airbyte.cdk.integrations.destination.gcs
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectReader
import io.airbyte.cdk.integrations.destination.gcs.parquet.GcsParquetWriter
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.avro.AvroConstants
import io.airbyte.cdk.integrations.destination.s3.parquet.S3ParquetWriter.Companion.getHadoopConfig
import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.getFieldNameUpdater
Expand All @@ -25,7 +25,7 @@ import org.apache.parquet.avro.AvroReadSupport
import org.apache.parquet.hadoop.ParquetReader

abstract class GcsBaseParquetDestinationAcceptanceTest :
GcsAvroParquetDestinationAcceptanceTest(S3Format.PARQUET) {
GcsAvroParquetDestinationAcceptanceTest(FileUploadFormat.PARQUET) {
override fun getProtocolVersion() = ProtocolVersion.V1

override val formatConfig: JsonNode?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.ObjectNode
import com.google.common.collect.ImmutableMap
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations
import io.airbyte.cdk.integrations.standardtest.destination.DestinationAcceptanceTest
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
Expand Down Expand Up @@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory
* * Get the GCS bucket path from the constructor
* * Get the format config from [.getFormatConfig]
*/
abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format) :
abstract class GcsDestinationAcceptanceTest(protected val outputFormat: FileUploadFormat) :
DestinationAcceptanceTest() {
protected var configJson: JsonNode? = null
// Not a big fan of those mocks(). Here to make spotbugs happy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.airbyte.cdk.integrations.destination.s3

enum class S3Format(val fileExtension: String) {
enum class FileUploadFormat(val fileExtension: String) {
AVRO("avro"),
CSV("csv"),
JSONL("jsonl"),
Expand Down
Loading
Loading