Skip to content

Commit 67e82e1

Browse files
author
Marius Posta
authored
bulk-cdk-toolkit-extract-jdbc: add namespaceKind constant (#44767)
1 parent e911ab8 commit 67e82e1

File tree

10 files changed

+118
-78
lines changed

10 files changed

+118
-78
lines changed

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/command/JdbcSourceConfiguration.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ interface JdbcSourceConfiguration : SourceConfiguration {
1515
/** Properties map (with username, password, etc.) passed along to the JDBC driver. */
1616
val jdbcProperties: Map<String, String>
1717

18-
/** Ordered set of schemas for the connector to consider. */
19-
val schemas: Set<String>
18+
/** Ordered set of namespaces (typically, schemas) for the connector to consider. */
19+
val namespaces: Set<String>
2020

2121
/** When set, each table is queried individually to check for SELECT privileges. */
2222
val checkPrivileges: Boolean

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerier.kt

+27-13
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
package io.airbyte.cdk.discover
33

44
import io.airbyte.cdk.command.JdbcSourceConfiguration
5+
import io.airbyte.cdk.jdbc.DefaultJdbcConstants
6+
import io.airbyte.cdk.jdbc.DefaultJdbcConstants.NamespaceKind
57
import io.airbyte.cdk.jdbc.JdbcConnectionFactory
68
import io.airbyte.cdk.jdbc.NullFieldType
79
import io.airbyte.cdk.read.From
@@ -22,6 +24,7 @@ import java.sql.Statement
2224

2325
/** Default implementation of [MetadataQuerier]. */
2426
class JdbcMetadataQuerier(
27+
val constants: DefaultJdbcConstants,
2528
val config: JdbcSourceConfiguration,
2629
val selectQueryGenerator: SelectQueryGenerator,
2730
val fieldTypeMapper: FieldTypeMapper,
@@ -31,11 +34,18 @@ class JdbcMetadataQuerier(
3134

3235
private val log = KotlinLogging.logger {}
3336

37+
fun TableName.namespace(): String? =
38+
when (constants.namespaceKind) {
39+
NamespaceKind.CATALOG_AND_SCHEMA,
40+
NamespaceKind.CATALOG -> catalog
41+
NamespaceKind.SCHEMA -> schema
42+
}
43+
3444
override fun streamNamespaces(): List<String> =
35-
memoizedTableNames.mapNotNull { it.schema ?: it.catalog }.distinct()
45+
memoizedTableNames.mapNotNull { it.namespace() }.distinct()
3646

3747
override fun streamNames(streamNamespace: String?): List<String> =
38-
memoizedTableNames.filter { (it.schema ?: it.catalog) == streamNamespace }.map { it.name }
48+
memoizedTableNames.filter { it.namespace() == streamNamespace }.map { it.name }
3949

4050
fun <T> swallow(supplier: () -> T): T? {
4151
try {
@@ -51,8 +61,14 @@ class JdbcMetadataQuerier(
5161
try {
5262
val allTables = mutableSetOf<TableName>()
5363
val dbmd: DatabaseMetaData = conn.metaData
54-
for (schema in config.schemas + config.schemas.map { it.uppercase() }) {
55-
dbmd.getTables(null, schema, null, null).use { rs: ResultSet ->
64+
for (namespace in config.namespaces + config.namespaces.map { it.uppercase() }) {
65+
val (catalog: String?, schema: String?) =
66+
when (constants.namespaceKind) {
67+
NamespaceKind.CATALOG -> namespace to null
68+
NamespaceKind.SCHEMA -> null to namespace
69+
NamespaceKind.CATALOG_AND_SCHEMA -> namespace to namespace
70+
}
71+
dbmd.getTables(catalog, schema, null, null).use { rs: ResultSet ->
5672
while (rs.next()) {
5773
allTables.add(
5874
TableName(
@@ -65,10 +81,8 @@ class JdbcMetadataQuerier(
6581
}
6682
}
6783
}
68-
log.info { "Discovered ${allTables.size} table(s) in schemas ${config.schemas}." }
69-
return@lazy allTables.toList().sortedBy {
70-
"${it.catalog ?: ""}.${it.schema!!}.${it.name}.${it.type}"
71-
}
84+
log.info { "Discovered ${allTables.size} table(s) in namespaces ${config.namespaces}." }
85+
return@lazy allTables.toList().sortedBy { "${it.namespace()}.${it.name}.${it.type}" }
7286
} catch (e: Exception) {
7387
throw RuntimeException("Table name discovery query failed: ${e.message}", e)
7488
}
@@ -78,9 +92,7 @@ class JdbcMetadataQuerier(
7892
streamName: String,
7993
streamNamespace: String?,
8094
): TableName? =
81-
memoizedTableNames.find {
82-
it.name == streamName && (it.schema ?: it.catalog) == streamNamespace
83-
}
95+
memoizedTableNames.find { it.name == streamName && it.namespace() == streamNamespace }
8496

8597
val memoizedColumnMetadata: Map<TableName, List<ColumnMetadata>> by lazy {
8698
val joinMap: Map<TableName, TableName> =
@@ -90,7 +102,7 @@ class JdbcMetadataQuerier(
90102
try {
91103
val dbmd: DatabaseMetaData = conn.metaData
92104
memoizedTableNames
93-
.filter { it.catalog != null || it.schema != null }
105+
.filter { it.namespace() != null }
94106
.map { it.catalog to it.schema }
95107
.distinct()
96108
.forEach { (catalog: String?, schema: String?) ->
@@ -221,7 +233,7 @@ class JdbcMetadataQuerier(
221233
val querySpec =
222234
SelectQuerySpec(
223235
SelectColumns(columnIDs.map { Field(it, NullFieldType) }),
224-
From(table.name, table.schema ?: table.catalog),
236+
From(table.name, table.namespace()),
225237
limit = Limit(0),
226238
)
227239
return selectQueryGenerator.generate(querySpec.optimize()).sql
@@ -319,11 +331,13 @@ class JdbcMetadataQuerier(
319331
class Factory(
320332
val selectQueryGenerator: SelectQueryGenerator,
321333
val fieldTypeMapper: FieldTypeMapper,
334+
val constants: DefaultJdbcConstants,
322335
) : MetadataQuerier.Factory<JdbcSourceConfiguration> {
323336
/** The [JdbcSourceConfiguration] is deliberately not injected in order to support tests. */
324337
override fun session(config: JdbcSourceConfiguration): MetadataQuerier {
325338
val jdbcConnectionFactory = JdbcConnectionFactory(config)
326339
return JdbcMetadataQuerier(
340+
constants,
327341
config,
328342
selectQueryGenerator,
329343
fieldTypeMapper,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.jdbc
6+
7+
import io.micronaut.context.annotation.ConfigurationProperties
8+
9+
const val JDBC_PROPERTY_PREFIX = "airbyte.connector.extract.jdbc"
10+
11+
@ConfigurationProperties(JDBC_PROPERTY_PREFIX)
12+
data class DefaultJdbcConstants(
13+
val withSampling: Boolean = WITH_SAMPLING,
14+
val maxSampleSize: Int = TABLE_SAMPLE_SIZE,
15+
/** How many bytes per second we can expect the database to send to the connector. */
16+
val expectedThroughputBytesPerSecond: Long = THROUGHPUT_BYTES_PER_SECOND,
17+
/** Smallest possible fetchSize value. */
18+
val minFetchSize: Int = FETCH_SIZE_LOWER_BOUND,
19+
/** Default fetchSize value, in absence of any other estimate. */
20+
val defaultFetchSize: Int = DEFAULT_FETCH_SIZE,
21+
/** Largest possible fetchSize value. */
22+
val maxFetchSize: Int = FETCH_SIZE_UPPER_BOUND,
23+
/** How much of the JVM heap can we fill up with [java.sql.ResultSet] data. */
24+
val memoryCapacityRatio: Double = MEM_CAPACITY_RATIO,
25+
/** Estimated bytes used as overhead for each row in a [java.sql.ResultSet]. */
26+
val estimatedRecordOverheadBytes: Long = RECORD_OVERHEAD_BYTES,
27+
/** Estimated bytes used as overhead for each column value in a [java.sql.ResultSet]. */
28+
val estimatedFieldOverheadBytes: Long = FIELD_OVERHEAD_BYTES,
29+
/** Overrides the JVM heap capacity to provide determinism in tests. */
30+
val maxMemoryBytesForTesting: Long? = null,
31+
/** Whether the namespace field denotes a JDBC schema or a JDBC catalog. */
32+
val namespaceKind: NamespaceKind = NamespaceKind.SCHEMA,
33+
) {
34+
35+
enum class NamespaceKind {
36+
SCHEMA,
37+
CATALOG,
38+
CATALOG_AND_SCHEMA
39+
}
40+
41+
companion object {
42+
43+
// Sampling defaults.
44+
internal const val WITH_SAMPLING: Boolean = false
45+
internal const val TABLE_SAMPLE_SIZE: Int = 1024
46+
internal const val THROUGHPUT_BYTES_PER_SECOND: Long = 10L shl 20
47+
48+
// fetchSize defaults
49+
internal const val FETCH_SIZE_LOWER_BOUND: Int = 10
50+
internal const val DEFAULT_FETCH_SIZE: Int = 1_000
51+
internal const val FETCH_SIZE_UPPER_BOUND: Int = 10_000_000
52+
53+
// Memory estimate defaults.
54+
internal const val RECORD_OVERHEAD_BYTES = 16L
55+
internal const val FIELD_OVERHEAD_BYTES = 16L
56+
// We're targeting use of 60% of the available memory in order to allow
57+
// for some headroom for other garbage collection.
58+
internal const val MEM_CAPACITY_RATIO: Double = 0.6
59+
}
60+
}

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/DefaultJdbcSharedState.kt

+2-44
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
package io.airbyte.cdk.read
66

77
import io.airbyte.cdk.command.JdbcSourceConfiguration
8+
import io.airbyte.cdk.jdbc.DefaultJdbcConstants
89
import io.airbyte.cdk.output.OutputConsumer
9-
import io.micronaut.context.annotation.ConfigurationProperties
1010
import jakarta.inject.Singleton
1111
import kotlinx.coroutines.sync.Semaphore
1212

@@ -16,51 +16,9 @@ class DefaultJdbcSharedState(
1616
override val configuration: JdbcSourceConfiguration,
1717
override val outputConsumer: OutputConsumer,
1818
override val selectQuerier: SelectQuerier,
19-
val constants: Constants,
19+
val constants: DefaultJdbcConstants,
2020
) : JdbcSharedState {
2121

22-
@ConfigurationProperties(JDBC_PROPERTY_PREFIX)
23-
data class Constants(
24-
val withSampling: Boolean = WITH_SAMPLING,
25-
val maxSampleSize: Int = TABLE_SAMPLE_SIZE,
26-
/** How many bytes per second we can expect the database to send to the connector. */
27-
val expectedThroughputBytesPerSecond: Long = THROUGHPUT_BYTES_PER_SECOND,
28-
/** Smallest possible fetchSize value. */
29-
val minFetchSize: Int = FETCH_SIZE_LOWER_BOUND,
30-
/** Default fetchSize value, in absence of any other estimate. */
31-
val defaultFetchSize: Int = DEFAULT_FETCH_SIZE,
32-
/** Largest possible fetchSize value. */
33-
val maxFetchSize: Int = FETCH_SIZE_UPPER_BOUND,
34-
/** How much of the JVM heap can we fill up with [java.sql.ResultSet] data. */
35-
val memoryCapacityRatio: Double = MEM_CAPACITY_RATIO,
36-
/** Estimated bytes used as overhead for each row in a [java.sql.ResultSet]. */
37-
val estimatedRecordOverheadBytes: Long = RECORD_OVERHEAD_BYTES,
38-
/** Estimated bytes used as overhead for each column value in a [java.sql.ResultSet]. */
39-
val estimatedFieldOverheadBytes: Long = FIELD_OVERHEAD_BYTES,
40-
/** Overrides the JVM heap capacity to provide determinism in tests. */
41-
val maxMemoryBytesForTesting: Long? = null
42-
) {
43-
companion object {
44-
45-
// Sampling defaults.
46-
internal const val WITH_SAMPLING: Boolean = false
47-
internal const val TABLE_SAMPLE_SIZE: Int = 1024
48-
internal const val THROUGHPUT_BYTES_PER_SECOND: Long = 10L shl 20
49-
50-
// fetchSize defaults
51-
internal const val FETCH_SIZE_LOWER_BOUND: Int = 10
52-
internal const val DEFAULT_FETCH_SIZE: Int = 1_000
53-
internal const val FETCH_SIZE_UPPER_BOUND: Int = 10_000_000
54-
55-
// Memory estimate defaults.
56-
internal const val RECORD_OVERHEAD_BYTES = 16L
57-
internal const val FIELD_OVERHEAD_BYTES = 16L
58-
// We're targeting use of 60% of the available memory in order to allow
59-
// for some headroom for other garbage collection.
60-
internal const val MEM_CAPACITY_RATIO: Double = 0.6
61-
}
62-
}
63-
6422
override val withSampling: Boolean
6523
get() = constants.withSampling
6624

airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/read/JdbcPartitionsCreatorFactory.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.cdk.read
66

77
import io.airbyte.cdk.command.OpaqueStateValue
8+
import io.airbyte.cdk.jdbc.JDBC_PROPERTY_PREFIX
89
import io.micronaut.context.annotation.Requires
910
import jakarta.inject.Singleton
1011

@@ -68,6 +69,4 @@ class JdbcConcurrentPartitionsCreatorFactory<
6869
JdbcConcurrentPartitionsCreator(partition, partitionFactory)
6970
}
7071

71-
const val JDBC_PROPERTY_PREFIX = "airbyte.connector.extract.jdbc"
72-
73-
private const val MODE_PROPERTY = "$JDBC_PROPERTY_PREFIX.mode"
72+
private const val MODE_PROPERTY = "${JDBC_PROPERTY_PREFIX}.mode"

airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/discover/JdbcMetadataQuerierTest.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import io.airbyte.cdk.h2source.H2SourceConfiguration
66
import io.airbyte.cdk.h2source.H2SourceConfigurationFactory
77
import io.airbyte.cdk.h2source.H2SourceConfigurationJsonObject
88
import io.airbyte.cdk.h2source.H2SourceOperations
9+
import io.airbyte.cdk.jdbc.DefaultJdbcConstants
910
import java.sql.JDBCType
1011
import org.junit.jupiter.api.Assertions
1112
import org.junit.jupiter.api.Test
@@ -17,7 +18,12 @@ class JdbcMetadataQuerierTest {
1718
h2.execute("CREATE TABLE kv (k INT PRIMARY KEY, v VARCHAR(60))")
1819
}
1920

20-
val factory = JdbcMetadataQuerier.Factory(H2SourceOperations(), H2SourceOperations())
21+
val factory =
22+
JdbcMetadataQuerier.Factory(
23+
selectQueryGenerator = H2SourceOperations(),
24+
fieldTypeMapper = H2SourceOperations(),
25+
constants = DefaultJdbcConstants(),
26+
)
2127

2228
@Test
2329
fun test() {

airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/DefaultJdbcPartitionFactoryTest.kt

+8-7
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package io.airbyte.cdk.read
66

77
import io.airbyte.cdk.data.IntCodec
88
import io.airbyte.cdk.data.LocalDateCodec
9+
import io.airbyte.cdk.jdbc.DefaultJdbcConstants
910
import io.airbyte.cdk.output.InvalidCursor
1011
import io.airbyte.cdk.output.InvalidPrimaryKey
1112
import io.airbyte.cdk.output.ResetStream
@@ -51,7 +52,7 @@ class DefaultJdbcPartitionFactoryTest {
5152
stream.name,
5253
stream.namespace,
5354
sampleRateInvPow2 = 8,
54-
DefaultJdbcSharedState.Constants.TABLE_SAMPLE_SIZE,
55+
DefaultJdbcConstants.TABLE_SAMPLE_SIZE,
5556
),
5657
)
5758
)
@@ -87,7 +88,7 @@ class DefaultJdbcPartitionFactoryTest {
8788
stream.name,
8889
stream.namespace,
8990
sampleRateInvPow2 = 8,
90-
DefaultJdbcSharedState.Constants.TABLE_SAMPLE_SIZE,
91+
DefaultJdbcConstants.TABLE_SAMPLE_SIZE,
9192
),
9293
)
9394
)
@@ -132,7 +133,7 @@ class DefaultJdbcPartitionFactoryTest {
132133
stream.name,
133134
stream.namespace,
134135
sampleRateInvPow2 = 8,
135-
DefaultJdbcSharedState.Constants.TABLE_SAMPLE_SIZE,
136+
DefaultJdbcConstants.TABLE_SAMPLE_SIZE,
136137
),
137138
NoWhere,
138139
OrderBy(id),
@@ -202,7 +203,7 @@ class DefaultJdbcPartitionFactoryTest {
202203
stream.name,
203204
stream.namespace,
204205
sampleRateInvPow2 = 8,
205-
DefaultJdbcSharedState.Constants.TABLE_SAMPLE_SIZE,
206+
DefaultJdbcConstants.TABLE_SAMPLE_SIZE,
206207
),
207208
NoWhere,
208209
OrderBy(id)
@@ -308,7 +309,7 @@ class DefaultJdbcPartitionFactoryTest {
308309
stream.name,
309310
stream.namespace,
310311
sampleRateInvPow2 = 8,
311-
DefaultJdbcSharedState.Constants.TABLE_SAMPLE_SIZE,
312+
DefaultJdbcConstants.TABLE_SAMPLE_SIZE,
312313
),
313314
Where(Greater(id, IntCodec.encode(22))),
314315
OrderBy(id),
@@ -365,7 +366,7 @@ class DefaultJdbcPartitionFactoryTest {
365366
stream.name,
366367
stream.namespace,
367368
sampleRateInvPow2 = 8,
368-
DefaultJdbcSharedState.Constants.TABLE_SAMPLE_SIZE,
369+
DefaultJdbcConstants.TABLE_SAMPLE_SIZE,
369370
),
370371
Where(Greater(id, IntCodec.encode(22))),
371372
OrderBy(id)
@@ -446,7 +447,7 @@ class DefaultJdbcPartitionFactoryTest {
446447
stream.name,
447448
stream.namespace,
448449
sampleRateInvPow2 = 8,
449-
DefaultJdbcSharedState.Constants.TABLE_SAMPLE_SIZE,
450+
DefaultJdbcConstants.TABLE_SAMPLE_SIZE,
450451
),
451452
Where(
452453
And(

airbyte-cdk/bulk/toolkits/extract-jdbc/src/test/kotlin/io/airbyte/cdk/read/JdbcPartitionsCreatorTest.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package io.airbyte.cdk.read
66

77
import io.airbyte.cdk.data.IntCodec
88
import io.airbyte.cdk.data.LocalDateCodec
9+
import io.airbyte.cdk.jdbc.DefaultJdbcConstants
910
import io.airbyte.cdk.read.TestFixtures.assertFailures
1011
import io.airbyte.cdk.read.TestFixtures.factory
1112
import io.airbyte.cdk.read.TestFixtures.id
@@ -28,7 +29,7 @@ class JdbcPartitionsCreatorTest {
2829
val sharedState =
2930
sharedState(
3031
constants =
31-
DefaultJdbcSharedState.Constants(
32+
DefaultJdbcConstants(
3233
withSampling = true,
3334
maxSampleSize = 4,
3435
// absurdly low value to create many partitions
@@ -112,7 +113,7 @@ class JdbcPartitionsCreatorTest {
112113
val sharedState =
113114
sharedState(
114115
constants =
115-
DefaultJdbcSharedState.Constants(
116+
DefaultJdbcConstants(
116117
withSampling = true,
117118
maxSampleSize = 4,
118119
// absurdly low value to create many partitions
@@ -224,7 +225,7 @@ class JdbcPartitionsCreatorTest {
224225
val sharedState =
225226
sharedState(
226227
constants =
227-
DefaultJdbcSharedState.Constants(
228+
DefaultJdbcConstants(
228229
withSampling = true,
229230
maxSampleSize = 4,
230231
),
@@ -327,7 +328,7 @@ class JdbcPartitionsCreatorTest {
327328
val stream = stream()
328329
val sharedState =
329330
sharedState(
330-
constants = DefaultJdbcSharedState.Constants(withSampling = true),
331+
constants = DefaultJdbcConstants(withSampling = true),
331332
// The JdbcSequentialPartitionsCreator is not expected to query anything.
332333
mockedQueries = arrayOf()
333334
)

0 commit comments

Comments
 (0)