Skip to content

Commit e16b0d2

Browse files
authored
Destination Redshift: Limit Standard insert statement size < 16MB (#36973)
1 parent fda0829 commit e16b0d2

File tree

14 files changed

+294
-211
lines changed

14 files changed

+294
-211
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th
145145
| Version | Date | Pull Request | Subject |
146146
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
147147
| 0.29.13 | 2024-04-10 | [\#36981](https://github.com/airbytehq/airbyte/pull/36981) | DB sources : Emit analytics for data type serialization errors. |
148+
| 0.29.12 | 2024-04-10 | [\#36973](https://github.com/airbytehq/airbyte/pull/36973) | Destinations: Make flush batch size configurable for JdbcInsertFlush |
148149
| 0.29.11 | 2024-04-10 | [\#36865](https://github.com/airbytehq/airbyte/pull/36865) | Sources : Remove noisy log line. |
149150
| 0.29.10 | 2024-04-10 | [\#36805](https://github.com/airbytehq/airbyte/pull/36805) | Destinations: Enhance CatalogParser name collision handling; add DV2 tests for long identifiers |
150151
| 0.29.9 | 2024-04-09 | [\#36047](https://github.com/airbytehq/airbyte/pull/36047) | Destinations: CDK updates for raw-only destinations |

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt

+14-1
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,21 @@ import org.slf4j.LoggerFactory
5050

5151
abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationState>(
5252
driverClass: String,
53+
private val optimalBatchSizeBytes: Long,
5354
protected open val namingResolver: NamingConventionTransformer,
54-
protected val sqlOperations: SqlOperations
55+
protected val sqlOperations: SqlOperations,
5556
) : JdbcConnector(driverClass), Destination {
57+
58+
constructor(
59+
driverClass: String,
60+
namingResolver: NamingConventionTransformer,
61+
sqlOperations: SqlOperations,
62+
) : this(
63+
driverClass,
64+
JdbcBufferedConsumerFactory.DEFAULT_OPTIMAL_BATCH_SIZE_FOR_FLUSH,
65+
namingResolver,
66+
sqlOperations
67+
)
5668
protected val configSchemaKey: String
5769
get() = "schema"
5870

@@ -293,6 +305,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
293305
defaultNamespace,
294306
typerDeduper,
295307
getDataTransformer(parsedCatalog, defaultNamespace),
308+
optimalBatchSizeBytes,
296309
)
297310
}
298311

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt

+6-3
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ import org.slf4j.LoggerFactory
5252
object JdbcBufferedConsumerFactory {
5353
private val LOGGER: Logger = LoggerFactory.getLogger(JdbcBufferedConsumerFactory::class.java)
5454

55-
@JvmOverloads
55+
const val DEFAULT_OPTIMAL_BATCH_SIZE_FOR_FLUSH = 25 * 1024 * 1024L
56+
5657
fun createAsync(
5758
outputRecordCollector: Consumer<AirbyteMessage>,
5859
database: JdbcDatabase,
@@ -62,7 +63,8 @@ object JdbcBufferedConsumerFactory {
6263
catalog: ConfiguredAirbyteCatalog,
6364
defaultNamespace: String?,
6465
typerDeduper: TyperDeduper,
65-
dataTransformer: StreamAwareDataTransformer = IdentityDataTransformer()
66+
dataTransformer: StreamAwareDataTransformer = IdentityDataTransformer(),
67+
optimalBatchSizeBytes: Long = DEFAULT_OPTIMAL_BATCH_SIZE_FOR_FLUSH,
6668
): SerializedAirbyteMessageConsumer {
6769
val writeConfigs =
6870
createWriteConfigs(namingResolver, config, catalog, sqlOperations.isSchemaRequired)
@@ -71,7 +73,8 @@ object JdbcBufferedConsumerFactory {
7173
onStartFunction(database, sqlOperations, writeConfigs, typerDeduper),
7274
onCloseFunction(typerDeduper),
7375
JdbcInsertFlushFunction(
74-
recordWriterFunction(database, sqlOperations, writeConfigs, catalog)
76+
recordWriterFunction(database, sqlOperations, writeConfigs, catalog),
77+
optimalBatchSizeBytes
7578
),
7679
catalog,
7780
BufferManager((Runtime.getRuntime().maxMemory() * 0.2).toLong()),

airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcInsertFlushFunction.kt

+4-10
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,19 @@ package io.airbyte.cdk.integrations.destination.jdbc
66
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction
77
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
88
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.RecordWriter
9-
import io.airbyte.cdk.integrations.destination.jdbc.constants.GlobalDataSizeConstants
109
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
1110
import io.airbyte.protocol.models.v0.StreamDescriptor
1211
import java.util.stream.Stream
1312

14-
class JdbcInsertFlushFunction(private val recordWriter: RecordWriter<PartialAirbyteMessage>) :
15-
DestinationFlushFunction {
13+
class JdbcInsertFlushFunction(
14+
private val recordWriter: RecordWriter<PartialAirbyteMessage>,
15+
override val optimalBatchSizeBytes: Long
16+
) : DestinationFlushFunction {
1617
@Throws(Exception::class)
1718
override fun flush(desc: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
1819
recordWriter.accept(
1920
AirbyteStreamNameNamespacePair(desc.name, desc.namespace),
2021
stream.toList()
2122
)
2223
}
23-
24-
override val optimalBatchSizeBytes: Long
25-
get() = // TODO tune this value - currently SqlOperationUtils partitions 10K records per
26-
// insert statement,
27-
// but we'd like to stop doing that and instead control sql insert statement size via
28-
// batch size.
29-
GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES.toLong()
3024
}

airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt

+8-8
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ import org.slf4j.Logger
7575
import org.slf4j.LoggerFactory
7676

7777
abstract class DestinationAcceptanceTest {
78-
protected var TEST_SCHEMAS: HashSet<String> = HashSet()
78+
protected var testSchemas: HashSet<String> = HashSet()
7979

8080
private lateinit var testEnv: TestDestinationEnv
8181

@@ -197,7 +197,7 @@ abstract class DestinationAcceptanceTest {
197197
return null
198198
}
199199
val schema = config["schema"].asText()
200-
TEST_SCHEMAS!!.add(schema)
200+
testSchemas!!.add(schema)
201201
return schema
202202
}
203203

@@ -357,8 +357,8 @@ abstract class DestinationAcceptanceTest {
357357
LOGGER.info("localRoot: {}", localRoot)
358358
testEnv = TestDestinationEnv(localRoot)
359359
mConnectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater::class.java)
360-
TEST_SCHEMAS = HashSet()
361-
setup(testEnv, TEST_SCHEMAS)
360+
testSchemas = HashSet()
361+
setup(testEnv, testSchemas)
362362

363363
processFactory =
364364
DockerProcessFactory(
@@ -1201,7 +1201,7 @@ abstract class DestinationAcceptanceTest {
12011201
)
12021202
// A unique namespace is required to avoid test isolation problems.
12031203
val namespace = TestingNamespaces.generate("source_namespace")
1204-
TEST_SCHEMAS!!.add(namespace)
1204+
testSchemas!!.add(namespace)
12051205

12061206
catalog.streams.forEach(Consumer { stream: AirbyteStream -> stream.namespace = namespace })
12071207
val configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog)
@@ -1241,12 +1241,12 @@ abstract class DestinationAcceptanceTest {
12411241
AirbyteCatalog::class.java
12421242
)
12431243
val namespace1 = TestingNamespaces.generate("source_namespace")
1244-
TEST_SCHEMAS!!.add(namespace1)
1244+
testSchemas!!.add(namespace1)
12451245
catalog.streams.forEach(Consumer { stream: AirbyteStream -> stream.namespace = namespace1 })
12461246

12471247
val diffNamespaceStreams = ArrayList<AirbyteStream>()
12481248
val namespace2 = TestingNamespaces.generate("diff_source_namespace")
1249-
TEST_SCHEMAS!!.add(namespace2)
1249+
testSchemas!!.add(namespace2)
12501250
val mapper = MoreMappers.initMapper()
12511251
for (stream in catalog.streams) {
12521252
val clonedStream =
@@ -1341,7 +1341,7 @@ abstract class DestinationAcceptanceTest {
13411341
try {
13421342
runSyncAndVerifyStateOutput(config, messagesWithNewNamespace, configuredCatalog, false)
13431343
// Add to the list of schemas to clean up.
1344-
TEST_SCHEMAS!!.add(namespaceInCatalog)
1344+
testSchemas!!.add(namespaceInCatalog)
13451345
} catch (e: Exception) {
13461346
throw IOException(
13471347
String.format(

0 commit comments

Comments
 (0)