Skip to content

Commit 3a47e8b

Browse files
authored
Destination S3 Data Lake: extract AWS-specific pieces; move generic stuff to toolkit (#53697)
1 parent d642703 commit 3a47e8b

File tree

31 files changed

+444
-352
lines changed

31 files changed

+444
-352
lines changed

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ dependencies {
99
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load')
1010
api "org.apache.iceberg:iceberg-core:${project.ext.apacheIcebergVersion}"
1111
api "org.apache.iceberg:iceberg-api:${project.ext.apacheIcebergVersion}"
12+
api("org.apache.iceberg:iceberg-data:${project.ext.apacheIcebergVersion}")
1213
api "org.apache.iceberg:iceberg-parquet:${project.ext.apacheIcebergVersion}"
1314
api "org.apache.iceberg:iceberg-nessie:${project.ext.apacheIcebergVersion}"
1415

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
33
*/
44

5-
package io.airbyte.integrations.destination.s3_data_lake.io;
5+
package io.airbyte.cdk.load.toolkits.iceberg.parquet.io;
66

77
import io.airbyte.cdk.ConfigErrorException;
88
import java.io.IOException;
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
33
*/
44

5-
package io.airbyte.integrations.destination.s3_data_lake
5+
package io.airbyte.cdk.load.toolkits.iceberg.parquet
66

77
import io.airbyte.cdk.ConfigErrorException
88
import jakarta.inject.Singleton
@@ -19,22 +19,22 @@ import org.apache.iceberg.types.Types.*
1919
* The "supertype" is a type to which both input types can safely be promoted without data loss. For
2020
* instance, INT can be promoted to LONG, FLOAT can be promoted to DOUBLE, etc.
2121
*
22-
* @property S3DataLakeTypesComparator comparator used to verify deep type equality.
22+
* @property IcebergTypesComparator comparator used to verify deep type equality.
2323
*/
2424
@Singleton
25-
class S3DataLakeSuperTypeFinder(private val s3DataLakeTypesComparator: S3DataLakeTypesComparator) {
25+
class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesComparator) {
2626
private val unsupportedTypeIds = setOf(BINARY, DECIMAL, FIXED, UUID, MAP, TIMESTAMP_NANO)
2727

2828
/**
2929
* Returns a supertype for [existingType] and [incomingType] if one exists.
30-
* - If they are deeply equal (according to [S3DataLakeTypesComparator.typesAreEqual]), returns
31-
* the [existingType] as-is.
30+
* - If they are deeply equal (according to [IcebergTypesComparator.typesAreEqual]), returns the
31+
* [existingType] as-is.
3232
* - Otherwise, attempts to combine them into a valid supertype.
3333
* - Throws [ConfigErrorException] if no valid supertype can be found.
3434
*/
3535
fun findSuperType(existingType: Type, incomingType: Type, columnName: String): Type {
3636
// If the two types are already deeply equal, return one of them (arbitrary).
37-
if (s3DataLakeTypesComparator.typesAreEqual(incomingType, existingType)) {
37+
if (icebergTypesComparator.typesAreEqual(incomingType, existingType)) {
3838
return existingType
3939
}
4040
// Otherwise, attempt to combine them into a valid supertype.
Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,19 @@
22
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
33
*/
44

5-
package io.airbyte.integrations.destination.s3_data_lake
5+
package io.airbyte.cdk.load.toolkits.iceberg.parquet
66

77
import io.airbyte.cdk.ConfigErrorException
8-
import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeTypesComparator.Companion.PARENT_CHILD_SEPARATOR
9-
import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeTypesComparator.Companion.splitIntoParentAndLeaf
8+
import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTypesComparator.Companion.PARENT_CHILD_SEPARATOR
9+
import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTypesComparator.Companion.splitIntoParentAndLeaf
1010
import jakarta.inject.Singleton
1111
import org.apache.iceberg.Schema
1212
import org.apache.iceberg.Table
1313
import org.apache.iceberg.UpdateSchema
1414
import org.apache.iceberg.types.Type
1515
import org.apache.iceberg.types.Type.PrimitiveType
1616

17-
/** Describes how the [S3DataLakeTableSynchronizer] handles column type changes. */
17+
/** Describes how the [IcebergTableSynchronizer] handles column type changes. */
1818
enum class ColumnTypeChangeBehavior {
1919
/**
2020
* Find the supertype between the old and new types, throwing an error if Iceberg does not
@@ -30,7 +30,7 @@ enum class ColumnTypeChangeBehavior {
3030
};
3131

3232
/**
33-
* If true, [S3DataLakeTableSynchronizer.maybeApplySchemaChanges] will commit the schema update
33+
* If true, [IcebergTableSynchronizer.maybeApplySchemaChanges] will commit the schema update
3434
* itself. If false, the caller is responsible for calling
3535
* `schemaUpdateResult.pendingUpdate?.commit()`.
3636
*/
@@ -50,9 +50,9 @@ enum class ColumnTypeChangeBehavior {
5050
* @property superTypeFinder Used to find a common supertype when data types differ.
5151
*/
5252
@Singleton
53-
class S3DataLakeTableSynchronizer(
54-
private val comparator: S3DataLakeTypesComparator,
55-
private val superTypeFinder: S3DataLakeSuperTypeFinder,
53+
class IcebergTableSynchronizer(
54+
private val comparator: IcebergTypesComparator,
55+
private val superTypeFinder: IcebergSuperTypeFinder,
5656
) {
5757
/**
5858
* Compare [table]'s current schema with [incomingSchema] and apply changes as needed:
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
33
*/
44

5-
package io.airbyte.integrations.destination.s3_data_lake
5+
package io.airbyte.cdk.load.toolkits.iceberg.parquet
66

77
import jakarta.inject.Singleton
88
import org.apache.iceberg.Schema
@@ -17,7 +17,7 @@ import org.apache.iceberg.types.Types
1717
* - Columns that changed from required to optional.
1818
*/
1919
@Singleton
20-
class S3DataLakeTypesComparator {
20+
class IcebergTypesComparator {
2121

2222
companion object {
2323
/** Separator used to represent nested field paths: parent~child. */
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.toolkits.iceberg.parquet
6+
7+
import io.airbyte.cdk.load.command.DestinationStream
8+
import org.apache.iceberg.catalog.Namespace
9+
import org.apache.iceberg.catalog.TableIdentifier
10+
11+
/**
12+
* Convert our internal stream descriptor to an Iceberg [TableIdentifier]. Implementations should
13+
* handle catalog-specific naming restrictions.
14+
*/
15+
// TODO accept default namespace in config as a val here
16+
interface TableIdGenerator {
17+
fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier
18+
}
19+
20+
class SimpleTableIdGenerator(private val configNamespace: String? = "") : TableIdGenerator {
21+
override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier {
22+
val namespace = stream.namespace ?: configNamespace
23+
return tableIdOf(namespace!!, stream.name)
24+
}
25+
}
26+
27+
// iceberg namespace+name must both be nonnull.
28+
fun tableIdOf(namespace: String, name: String): TableIdentifier =
29+
TableIdentifier.of(Namespace.of(namespace), name)
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
33
*/
44

5-
package io.airbyte.integrations.destination.s3_data_lake.io
5+
package io.airbyte.cdk.load.toolkits.iceberg.parquet.io
66

77
import jakarta.inject.Singleton
88
import org.apache.iceberg.Table
@@ -16,7 +16,7 @@ import org.apache.iceberg.io.SupportsPrefixOperations
1616
* catalog implementations do not clear the underlying files written to table storage.
1717
*/
1818
@Singleton
19-
class S3DataLakeTableCleaner(private val s3DataLakeUtil: S3DataLakeUtil) {
19+
class IcebergTableCleaner(private val icebergUtil: IcebergUtil) {
2020

2121
/**
2222
* Clears the table identified by the provided [TableIdentifier]. This removes all data and
@@ -49,7 +49,7 @@ class S3DataLakeTableCleaner(private val s3DataLakeUtil: S3DataLakeUtil) {
4949
val genIdsToDelete =
5050
generationIdSuffix
5151
.filter {
52-
s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(it)
52+
icebergUtil.assertGenerationIdSuffixIsOfValidFormat(it)
5353
true
5454
}
5555
.toSet()
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
33
*/
44

5-
package io.airbyte.integrations.destination.s3_data_lake.io
5+
package io.airbyte.cdk.load.toolkits.iceberg.parquet.io
66

77
import io.airbyte.cdk.load.command.Append
88
import io.airbyte.cdk.load.command.Dedupe
@@ -30,7 +30,7 @@ import org.apache.iceberg.util.PropertyUtil
3030
* and whether primary keys are configured on the destination table's schema.
3131
*/
3232
@Singleton
33-
class S3DataLakeTableWriterFactory(private val s3DataLakeUtil: S3DataLakeUtil) {
33+
class IcebergTableWriterFactory(private val icebergUtil: IcebergUtil) {
3434
/**
3535
* Creates a new [BaseTaskWriter] based on the configuration of the destination target [Table].
3636
*
@@ -45,7 +45,7 @@ class S3DataLakeTableWriterFactory(private val s3DataLakeUtil: S3DataLakeUtil) {
4545
importType: ImportType,
4646
schema: Schema
4747
): BaseTaskWriter<Record> {
48-
s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(generationId)
48+
icebergUtil.assertGenerationIdSuffixIsOfValidFormat(generationId)
4949
val format =
5050
FileFormat.valueOf(
5151
table
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.toolkits.iceberg.parquet.io
6+
7+
import io.airbyte.cdk.load.command.Dedupe
8+
import io.airbyte.cdk.load.command.DestinationStream
9+
import io.airbyte.cdk.load.command.ImportType
10+
import io.airbyte.cdk.load.data.MapperPipeline
11+
import io.airbyte.cdk.load.data.NullValue
12+
import io.airbyte.cdk.load.data.ObjectValue
13+
import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergRecord
14+
import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergSchema
15+
import io.airbyte.cdk.load.data.withAirbyteMeta
16+
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
17+
import io.airbyte.cdk.load.toolkits.iceberg.parquet.TableIdGenerator
18+
import io.github.oshai.kotlinlogging.KotlinLogging
19+
import javax.inject.Singleton
20+
import org.apache.hadoop.conf.Configuration
21+
import org.apache.iceberg.CatalogUtil
22+
import org.apache.iceberg.FileFormat
23+
import org.apache.iceberg.Schema
24+
import org.apache.iceberg.SortOrder
25+
import org.apache.iceberg.Table
26+
import org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT
27+
import org.apache.iceberg.catalog.Catalog
28+
import org.apache.iceberg.catalog.SupportsNamespaces
29+
import org.apache.iceberg.data.Record
30+
import org.apache.iceberg.exceptions.AlreadyExistsException
31+
32+
private val logger = KotlinLogging.logger {}
33+
34+
const val AIRBYTE_CDC_DELETE_COLUMN = "_ab_cdc_deleted_at"
35+
36+
@Singleton
37+
class IcebergUtil(private val tableIdGenerator: TableIdGenerator) {
38+
class InvalidFormatException(message: String) : Exception(message)
39+
40+
private val generationIdRegex = Regex("""ab-generation-id-\d+-e""")
41+
42+
fun assertGenerationIdSuffixIsOfValidFormat(generationId: String) {
43+
if (!generationIdRegex.matches(generationId)) {
44+
throw InvalidFormatException(
45+
"Invalid format: $generationId. Expected format is 'ab-generation-id-<number>-e'",
46+
)
47+
}
48+
}
49+
50+
fun constructGenerationIdSuffix(stream: DestinationStream): String {
51+
return constructGenerationIdSuffix(stream.generationId)
52+
}
53+
54+
fun constructGenerationIdSuffix(generationId: Long): String {
55+
if (generationId < 0) {
56+
throw IllegalArgumentException(
57+
"GenerationId must be non-negative. Provided: $generationId",
58+
)
59+
}
60+
return "ab-generation-id-${generationId}-e"
61+
}
62+
/**
63+
* Builds an Iceberg [Catalog].
64+
*
65+
* @param catalogName The name of the catalog.
66+
* @param properties The map of catalog configuration properties.
67+
* @return The configured Iceberg [Catalog].
68+
*/
69+
fun createCatalog(catalogName: String, properties: Map<String, String>): Catalog {
70+
return CatalogUtil.buildIcebergCatalog(catalogName, properties, Configuration())
71+
}
72+
73+
/** Create the namespace if it doesn't already exist. */
74+
fun createNamespace(streamDescriptor: DestinationStream.Descriptor, catalog: Catalog) {
75+
val tableIdentifier = tableIdGenerator.toTableIdentifier(streamDescriptor)
76+
synchronized(tableIdentifier.namespace()) {
77+
if (
78+
catalog is SupportsNamespaces &&
79+
!catalog.namespaceExists(tableIdentifier.namespace())
80+
) {
81+
try {
82+
catalog.createNamespace(tableIdentifier.namespace())
83+
logger.info { "Created namespace '${tableIdentifier.namespace()}'." }
84+
} catch (e: AlreadyExistsException) {
85+
// This exception occurs when multiple threads attempt to write to the same
86+
// namespace in parallel.
87+
// One thread may create the namespace successfully, causing the other threads
88+
// to encounter this exception
89+
// when they also try to create the namespace.
90+
logger.info {
91+
"Namespace '${tableIdentifier.namespace()}' was likely created by another thread during parallel operations."
92+
}
93+
}
94+
}
95+
}
96+
}
97+
98+
/**
99+
* Builds (if necessary) an Iceberg [Table]. This includes creating the table's namespace if it
100+
* does not already exist. If the [Table] already exists, it is loaded from the [Catalog].
101+
*
102+
* @param streamDescriptor The [DestinationStream.Descriptor] that contains the Airbyte stream's
103+
* namespace and name.
104+
* @param catalog The Iceberg [Catalog] that contains the [Table] or should contain it once
105+
* created.
106+
* @param schema The Iceberg [Schema] associated with the [Table].
107+
* @param properties The [Table] configuration properties derived from the [Catalog].
108+
* @return The Iceberg [Table], created if it does not yet exist.
109+
*/
110+
fun createTable(
111+
streamDescriptor: DestinationStream.Descriptor,
112+
catalog: Catalog,
113+
schema: Schema,
114+
properties: Map<String, String>
115+
): Table {
116+
val tableIdentifier = tableIdGenerator.toTableIdentifier(streamDescriptor)
117+
return if (!catalog.tableExists(tableIdentifier)) {
118+
logger.info { "Creating Iceberg table '$tableIdentifier'...." }
119+
catalog
120+
.buildTable(tableIdentifier, schema)
121+
.withProperties(properties)
122+
.withProperty(DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name.lowercase())
123+
.withSortOrder(getSortOrder(schema = schema))
124+
.create()
125+
} else {
126+
logger.info { "Loading Iceberg table $tableIdentifier ..." }
127+
catalog.loadTable(tableIdentifier)
128+
}
129+
}
130+
131+
/**
132+
* Converts an Airbyte [DestinationRecordAirbyteValue] into an Iceberg [Record]. The converted
133+
* record will be wrapped to include [Operation] information, which is used by the writer to
134+
* determine how to write the data to the underlying Iceberg files.
135+
*
136+
* @param record The Airbyte [DestinationRecordAirbyteValue] record to be converted for writing
137+
* by Iceberg.
138+
* @param stream The Airbyte [DestinationStream] that contains information about the stream.
139+
* @param tableSchema The Iceberg [Table] [Schema].
140+
* @param pipeline The [MapperPipeline] used to convert the Airbyte record to an Iceberg record.
141+
* @return An Iceberg [Record] representation of the Airbyte [DestinationRecordAirbyteValue].
142+
*/
143+
fun toRecord(
144+
record: DestinationRecordAirbyteValue,
145+
stream: DestinationStream,
146+
tableSchema: Schema,
147+
pipeline: MapperPipeline
148+
): Record {
149+
val dataMapped =
150+
pipeline
151+
.map(record.data, record.meta?.changes)
152+
.withAirbyteMeta(stream, record.emittedAtMs, true)
153+
// TODO figure out how to detect the actual operation value
154+
return RecordWrapper(
155+
delegate = dataMapped.toIcebergRecord(tableSchema),
156+
operation = getOperation(record = record, importType = stream.importType)
157+
)
158+
}
159+
160+
fun toIcebergSchema(stream: DestinationStream, pipeline: MapperPipeline): Schema {
161+
val primaryKeys =
162+
when (stream.importType) {
163+
is Dedupe -> (stream.importType as Dedupe).primaryKey
164+
else -> emptyList()
165+
}
166+
return pipeline.finalSchema.withAirbyteMeta(true).toIcebergSchema(primaryKeys)
167+
}
168+
169+
private fun getSortOrder(schema: Schema): SortOrder {
170+
val builder = SortOrder.builderFor(schema)
171+
schema.identifierFieldNames().forEach { builder.asc(it) }
172+
return builder.build()
173+
}
174+
175+
private fun getOperation(
176+
record: DestinationRecordAirbyteValue,
177+
importType: ImportType,
178+
): Operation =
179+
if (
180+
record.data is ObjectValue &&
181+
(record.data as ObjectValue).values[AIRBYTE_CDC_DELETE_COLUMN] != null &&
182+
(record.data as ObjectValue).values[AIRBYTE_CDC_DELETE_COLUMN] !is NullValue
183+
) {
184+
Operation.DELETE
185+
} else if (importType is Dedupe) {
186+
Operation.UPDATE
187+
} else {
188+
Operation.INSERT
189+
}
190+
}

0 commit comments

Comments
 (0)