Skip to content

bulk-cdk-toolkit-extract-jdbc: add namespaceKind constant #44767

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ interface JdbcSourceConfiguration : SourceConfiguration {
/** Properties map (with username, password, etc.) passed along to the JDBC driver. */
val jdbcProperties: Map<String, String>

/** Ordered set of schemas for the connector to consider. */
val schemas: Set<String>
/** Ordered set of namespaces (typically, schemas) for the connector to consider. */
val namespaces: Set<String>

/** When set, each table is queried individually to check for SELECT privileges. */
val checkPrivileges: Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
package io.airbyte.cdk.discover

import io.airbyte.cdk.command.JdbcSourceConfiguration
import io.airbyte.cdk.jdbc.DefaultJdbcConstants
import io.airbyte.cdk.jdbc.DefaultJdbcConstants.NamespaceKind
import io.airbyte.cdk.jdbc.JdbcConnectionFactory
import io.airbyte.cdk.jdbc.NullFieldType
import io.airbyte.cdk.read.From
Expand All @@ -22,6 +24,7 @@ import java.sql.Statement

/** Default implementation of [MetadataQuerier]. */
class JdbcMetadataQuerier(
val constants: DefaultJdbcConstants,
val config: JdbcSourceConfiguration,
val selectQueryGenerator: SelectQueryGenerator,
val fieldTypeMapper: FieldTypeMapper,
Expand All @@ -31,11 +34,18 @@ class JdbcMetadataQuerier(

private val log = KotlinLogging.logger {}

fun TableName.namespace(): String? =
when (constants.namespaceKind) {
NamespaceKind.CATALOG_AND_SCHEMA,
NamespaceKind.CATALOG -> catalog
NamespaceKind.SCHEMA -> schema
}

override fun streamNamespaces(): List<String> =
memoizedTableNames.mapNotNull { it.schema ?: it.catalog }.distinct()
memoizedTableNames.mapNotNull { it.namespace() }.distinct()

override fun streamNames(streamNamespace: String?): List<String> =
memoizedTableNames.filter { (it.schema ?: it.catalog) == streamNamespace }.map { it.name }
memoizedTableNames.filter { it.namespace() == streamNamespace }.map { it.name }

fun <T> swallow(supplier: () -> T): T? {
try {
Expand All @@ -51,8 +61,14 @@ class JdbcMetadataQuerier(
try {
val allTables = mutableSetOf<TableName>()
val dbmd: DatabaseMetaData = conn.metaData
for (schema in config.schemas + config.schemas.map { it.uppercase() }) {
dbmd.getTables(null, schema, null, null).use { rs: ResultSet ->
for (namespace in config.namespaces + config.namespaces.map { it.uppercase() }) {
val (catalog: String?, schema: String?) =
when (constants.namespaceKind) {
NamespaceKind.CATALOG -> namespace to null
NamespaceKind.SCHEMA -> null to namespace
NamespaceKind.CATALOG_AND_SCHEMA -> namespace to namespace
}
dbmd.getTables(catalog, schema, null, null).use { rs: ResultSet ->
while (rs.next()) {
allTables.add(
TableName(
Expand All @@ -65,10 +81,8 @@ class JdbcMetadataQuerier(
}
}
}
log.info { "Discovered ${allTables.size} table(s) in schemas ${config.schemas}." }
return@lazy allTables.toList().sortedBy {
"${it.catalog ?: ""}.${it.schema!!}.${it.name}.${it.type}"
}
log.info { "Discovered ${allTables.size} table(s) in namespaces ${config.namespaces}." }
return@lazy allTables.toList().sortedBy { "${it.namespace()}.${it.name}.${it.type}" }
} catch (e: Exception) {
throw RuntimeException("Table name discovery query failed: ${e.message}", e)
}
Expand All @@ -78,9 +92,7 @@ class JdbcMetadataQuerier(
streamName: String,
streamNamespace: String?,
): TableName? =
memoizedTableNames.find {
it.name == streamName && (it.schema ?: it.catalog) == streamNamespace
}
memoizedTableNames.find { it.name == streamName && it.namespace() == streamNamespace }

val memoizedColumnMetadata: Map<TableName, List<ColumnMetadata>> by lazy {
val joinMap: Map<TableName, TableName> =
Expand All @@ -90,7 +102,7 @@ class JdbcMetadataQuerier(
try {
val dbmd: DatabaseMetaData = conn.metaData
memoizedTableNames
.filter { it.catalog != null || it.schema != null }
.filter { it.namespace() != null }
.map { it.catalog to it.schema }
.distinct()
.forEach { (catalog: String?, schema: String?) ->
Expand Down Expand Up @@ -221,7 +233,7 @@ class JdbcMetadataQuerier(
val querySpec =
SelectQuerySpec(
SelectColumns(columnIDs.map { Field(it, NullFieldType) }),
From(table.name, table.schema ?: table.catalog),
From(table.name, table.namespace()),
limit = Limit(0),
)
return selectQueryGenerator.generate(querySpec.optimize()).sql
Expand Down Expand Up @@ -319,11 +331,13 @@ class JdbcMetadataQuerier(
class Factory(
val selectQueryGenerator: SelectQueryGenerator,
val fieldTypeMapper: FieldTypeMapper,
val constants: DefaultJdbcConstants,
) : MetadataQuerier.Factory<JdbcSourceConfiguration> {
/** The [JdbcSourceConfiguration] is deliberately not injected in order to support tests. */
override fun session(config: JdbcSourceConfiguration): MetadataQuerier {
val jdbcConnectionFactory = JdbcConnectionFactory(config)
return JdbcMetadataQuerier(
constants,
config,
selectQueryGenerator,
fieldTypeMapper,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.jdbc

import io.micronaut.context.annotation.ConfigurationProperties

const val JDBC_PROPERTY_PREFIX = "airbyte.connector.extract.jdbc"

@ConfigurationProperties(JDBC_PROPERTY_PREFIX)
data class DefaultJdbcConstants(
val withSampling: Boolean = WITH_SAMPLING,
val maxSampleSize: Int = TABLE_SAMPLE_SIZE,
/** How many bytes per second we can expect the database to send to the connector. */
val expectedThroughputBytesPerSecond: Long = THROUGHPUT_BYTES_PER_SECOND,
/** Smallest possible fetchSize value. */
val minFetchSize: Int = FETCH_SIZE_LOWER_BOUND,
/** Default fetchSize value, in absence of any other estimate. */
val defaultFetchSize: Int = DEFAULT_FETCH_SIZE,
/** Largest possible fetchSize value. */
val maxFetchSize: Int = FETCH_SIZE_UPPER_BOUND,
/** How much of the JVM heap can we fill up with [java.sql.ResultSet] data. */
val memoryCapacityRatio: Double = MEM_CAPACITY_RATIO,
/** Estimated bytes used as overhead for each row in a [java.sql.ResultSet]. */
val estimatedRecordOverheadBytes: Long = RECORD_OVERHEAD_BYTES,
/** Estimated bytes used as overhead for each column value in a [java.sql.ResultSet]. */
val estimatedFieldOverheadBytes: Long = FIELD_OVERHEAD_BYTES,
/** Overrides the JVM heap capacity to provide determinism in tests. */
val maxMemoryBytesForTesting: Long? = null,
/** Whether the namespace field denotes a JDBC schema or a JDBC catalog. */
val namespaceKind: NamespaceKind = NamespaceKind.SCHEMA,
) {

enum class NamespaceKind {
SCHEMA,
CATALOG,
CATALOG_AND_SCHEMA
}

companion object {

// Sampling defaults.
internal const val WITH_SAMPLING: Boolean = false
internal const val TABLE_SAMPLE_SIZE: Int = 1024
internal const val THROUGHPUT_BYTES_PER_SECOND: Long = 10L shl 20

// fetchSize defaults
internal const val FETCH_SIZE_LOWER_BOUND: Int = 10
internal const val DEFAULT_FETCH_SIZE: Int = 1_000
internal const val FETCH_SIZE_UPPER_BOUND: Int = 10_000_000

// Memory estimate defaults.
internal const val RECORD_OVERHEAD_BYTES = 16L
internal const val FIELD_OVERHEAD_BYTES = 16L
// We're targeting use of 60% of the available memory in order to allow
// for some headroom for other garbage collection.
internal const val MEM_CAPACITY_RATIO: Double = 0.6
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package io.airbyte.cdk.read

import io.airbyte.cdk.command.JdbcSourceConfiguration
import io.airbyte.cdk.jdbc.DefaultJdbcConstants
import io.airbyte.cdk.output.OutputConsumer
import io.micronaut.context.annotation.ConfigurationProperties
import jakarta.inject.Singleton
import kotlinx.coroutines.sync.Semaphore

Expand All @@ -16,51 +16,9 @@ class DefaultJdbcSharedState(
override val configuration: JdbcSourceConfiguration,
override val outputConsumer: OutputConsumer,
override val selectQuerier: SelectQuerier,
val constants: Constants,
val constants: DefaultJdbcConstants,
) : JdbcSharedState {

@ConfigurationProperties(JDBC_PROPERTY_PREFIX)
data class Constants(
val withSampling: Boolean = WITH_SAMPLING,
val maxSampleSize: Int = TABLE_SAMPLE_SIZE,
/** How many bytes per second we can expect the database to send to the connector. */
val expectedThroughputBytesPerSecond: Long = THROUGHPUT_BYTES_PER_SECOND,
/** Smallest possible fetchSize value. */
val minFetchSize: Int = FETCH_SIZE_LOWER_BOUND,
/** Default fetchSize value, in absence of any other estimate. */
val defaultFetchSize: Int = DEFAULT_FETCH_SIZE,
/** Largest possible fetchSize value. */
val maxFetchSize: Int = FETCH_SIZE_UPPER_BOUND,
/** How much of the JVM heap can we fill up with [java.sql.ResultSet] data. */
val memoryCapacityRatio: Double = MEM_CAPACITY_RATIO,
/** Estimated bytes used as overhead for each row in a [java.sql.ResultSet]. */
val estimatedRecordOverheadBytes: Long = RECORD_OVERHEAD_BYTES,
/** Estimated bytes used as overhead for each column value in a [java.sql.ResultSet]. */
val estimatedFieldOverheadBytes: Long = FIELD_OVERHEAD_BYTES,
/** Overrides the JVM heap capacity to provide determinism in tests. */
val maxMemoryBytesForTesting: Long? = null
) {
companion object {

// Sampling defaults.
internal const val WITH_SAMPLING: Boolean = false
internal const val TABLE_SAMPLE_SIZE: Int = 1024
internal const val THROUGHPUT_BYTES_PER_SECOND: Long = 10L shl 20

// fetchSize defaults
internal const val FETCH_SIZE_LOWER_BOUND: Int = 10
internal const val DEFAULT_FETCH_SIZE: Int = 1_000
internal const val FETCH_SIZE_UPPER_BOUND: Int = 10_000_000

// Memory estimate defaults.
internal const val RECORD_OVERHEAD_BYTES = 16L
internal const val FIELD_OVERHEAD_BYTES = 16L
// We're targeting use of 60% of the available memory in order to allow
// for some headroom for other garbage collection.
internal const val MEM_CAPACITY_RATIO: Double = 0.6
}
}

override val withSampling: Boolean
get() = constants.withSampling

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.cdk.read

import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.jdbc.JDBC_PROPERTY_PREFIX
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton

Expand Down Expand Up @@ -68,6 +69,4 @@ class JdbcConcurrentPartitionsCreatorFactory<
JdbcConcurrentPartitionsCreator(partition, partitionFactory)
}

const val JDBC_PROPERTY_PREFIX = "airbyte.connector.extract.jdbc"

private const val MODE_PROPERTY = "$JDBC_PROPERTY_PREFIX.mode"
private const val MODE_PROPERTY = "${JDBC_PROPERTY_PREFIX}.mode"
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.airbyte.cdk.h2source.H2SourceConfiguration
import io.airbyte.cdk.h2source.H2SourceConfigurationFactory
import io.airbyte.cdk.h2source.H2SourceConfigurationJsonObject
import io.airbyte.cdk.h2source.H2SourceOperations
import io.airbyte.cdk.jdbc.DefaultJdbcConstants
import java.sql.JDBCType
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
Expand All @@ -17,7 +18,12 @@ class JdbcMetadataQuerierTest {
h2.execute("CREATE TABLE kv (k INT PRIMARY KEY, v VARCHAR(60))")
}

val factory = JdbcMetadataQuerier.Factory(H2SourceOperations(), H2SourceOperations())
val factory =
JdbcMetadataQuerier.Factory(
selectQueryGenerator = H2SourceOperations(),
fieldTypeMapper = H2SourceOperations(),
constants = DefaultJdbcConstants(),
)

@Test
fun test() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.cdk.read

import io.airbyte.cdk.data.IntCodec
import io.airbyte.cdk.data.LocalDateCodec
import io.airbyte.cdk.jdbc.DefaultJdbcConstants
import io.airbyte.cdk.output.InvalidCursor
import io.airbyte.cdk.output.InvalidPrimaryKey
import io.airbyte.cdk.output.ResetStream
Expand Down Expand Up @@ -51,7 +52,7 @@ class DefaultJdbcPartitionFactoryTest {
stream.name,
stream.namespace,
sampleRateInvPow2 = 8,
DefaultJdbcSharedState.Constants.TABLE_SAMPLE_SIZE,
DefaultJdbcConstants.TABLE_SAMPLE_SIZE,
),
)
)
Expand Down Expand Up @@ -87,7 +88,7 @@ class DefaultJdbcPartitionFactoryTest {
stream.name,
stream.namespace,
sampleRateInvPow2 = 8,
DefaultJdbcSharedState.Constants.TABLE_SAMPLE_SIZE,
DefaultJdbcConstants.TABLE_SAMPLE_SIZE,
),
)
)
Expand Down Expand Up @@ -132,7 +133,7 @@ class DefaultJdbcPartitionFactoryTest {
stream.name,
stream.namespace,
sampleRateInvPow2 = 8,
DefaultJdbcSharedState.Constants.TABLE_SAMPLE_SIZE,
DefaultJdbcConstants.TABLE_SAMPLE_SIZE,
),
NoWhere,
OrderBy(id),
Expand Down Expand Up @@ -202,7 +203,7 @@ class DefaultJdbcPartitionFactoryTest {
stream.name,
stream.namespace,
sampleRateInvPow2 = 8,
DefaultJdbcSharedState.Constants.TABLE_SAMPLE_SIZE,
DefaultJdbcConstants.TABLE_SAMPLE_SIZE,
),
NoWhere,
OrderBy(id)
Expand Down Expand Up @@ -308,7 +309,7 @@ class DefaultJdbcPartitionFactoryTest {
stream.name,
stream.namespace,
sampleRateInvPow2 = 8,
DefaultJdbcSharedState.Constants.TABLE_SAMPLE_SIZE,
DefaultJdbcConstants.TABLE_SAMPLE_SIZE,
),
Where(Greater(id, IntCodec.encode(22))),
OrderBy(id),
Expand Down Expand Up @@ -365,7 +366,7 @@ class DefaultJdbcPartitionFactoryTest {
stream.name,
stream.namespace,
sampleRateInvPow2 = 8,
DefaultJdbcSharedState.Constants.TABLE_SAMPLE_SIZE,
DefaultJdbcConstants.TABLE_SAMPLE_SIZE,
),
Where(Greater(id, IntCodec.encode(22))),
OrderBy(id)
Expand Down Expand Up @@ -446,7 +447,7 @@ class DefaultJdbcPartitionFactoryTest {
stream.name,
stream.namespace,
sampleRateInvPow2 = 8,
DefaultJdbcSharedState.Constants.TABLE_SAMPLE_SIZE,
DefaultJdbcConstants.TABLE_SAMPLE_SIZE,
),
Where(
And(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.cdk.read

import io.airbyte.cdk.data.IntCodec
import io.airbyte.cdk.data.LocalDateCodec
import io.airbyte.cdk.jdbc.DefaultJdbcConstants
import io.airbyte.cdk.read.TestFixtures.assertFailures
import io.airbyte.cdk.read.TestFixtures.factory
import io.airbyte.cdk.read.TestFixtures.id
Expand All @@ -28,7 +29,7 @@ class JdbcPartitionsCreatorTest {
val sharedState =
sharedState(
constants =
DefaultJdbcSharedState.Constants(
DefaultJdbcConstants(
withSampling = true,
maxSampleSize = 4,
// absurdly low value to create many partitions
Expand Down Expand Up @@ -112,7 +113,7 @@ class JdbcPartitionsCreatorTest {
val sharedState =
sharedState(
constants =
DefaultJdbcSharedState.Constants(
DefaultJdbcConstants(
withSampling = true,
maxSampleSize = 4,
// absurdly low value to create many partitions
Expand Down Expand Up @@ -224,7 +225,7 @@ class JdbcPartitionsCreatorTest {
val sharedState =
sharedState(
constants =
DefaultJdbcSharedState.Constants(
DefaultJdbcConstants(
withSampling = true,
maxSampleSize = 4,
),
Expand Down Expand Up @@ -327,7 +328,7 @@ class JdbcPartitionsCreatorTest {
val stream = stream()
val sharedState =
sharedState(
constants = DefaultJdbcSharedState.Constants(withSampling = true),
constants = DefaultJdbcConstants(withSampling = true),
// The JdbcSequentialPartitionsCreator is not expected to query anything.
mockedQueries = arrayOf()
)
Expand Down
Loading
Loading