File tree 4 files changed +8
-16
lines changed
airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk
4 files changed +8
-16
lines changed Original file line number Diff line number Diff line change 1
1
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
2
2
package io.airbyte.cdk.command
3
3
4
- import io.airbyte.cdk.read.LimitState
5
4
import io.micronaut.context.annotation.Factory
6
5
import jakarta.inject.Singleton
7
6
@@ -19,10 +18,7 @@ interface JdbcSourceConfiguration : SourceConfiguration {
19
18
/* * Ordered set of schemas for the connector to consider. */
20
19
val schemas: Set <String >
21
20
22
- /* * How many rows to query in the first batch. */
23
- val initialLimit: LimitState
24
- get() = LimitState .minimum
25
-
21
+ /* * When set, each table is queried individually to check for SELECT privileges. */
26
22
val checkPrivileges: Boolean
27
23
get() = true
28
24
Original file line number Diff line number Diff line change @@ -12,10 +12,10 @@ import kotlin.math.min
12
12
class MemoryFetchSizeEstimator (
13
13
val maxMemoryBytes : Long ,
14
14
val maxConcurrency : Int ,
15
- ) : StreamPartitionsCreatorUtils.FetchSizeEstimator {
15
+ ) {
16
16
private val log = KotlinLogging .logger {}
17
17
18
- override fun apply (rowByteSizeSample : Sample <Long >): Int {
18
+ fun apply (rowByteSizeSample : Sample <Long >): Int {
19
19
val maxRowBytes: Long = rowByteSizeSample.sampledValues.maxOrNull() ? : 0L
20
20
log.info {
21
21
" maximum row size in ${rowByteSizeSample.kind.name} table is $maxRowBytes bytes"
Original file line number Diff line number Diff line change @@ -8,7 +8,6 @@ import io.airbyte.cdk.discover.Field
8
8
import io.airbyte.cdk.util.Jsons
9
9
import io.github.oshai.kotlinlogging.KotlinLogging
10
10
import java.io.OutputStream
11
- import java.util.function.Function
12
11
import kotlin.random.Random
13
12
14
13
/* * Utilities for [StreamPartitionsCreator] that don't rely directly on its input state. */
@@ -73,8 +72,6 @@ class StreamPartitionsCreatorUtils(
73
72
return lbs.zip(ubs)
74
73
}
75
74
76
- fun interface FetchSizeEstimator : Function <Sample <Long >, Int >
77
-
78
75
fun rowByteSizeEstimator (): (ObjectNode ) -> Long {
79
76
val countingOutputStream =
80
77
object : OutputStream () {
Original file line number Diff line number Diff line change @@ -14,11 +14,10 @@ import java.util.concurrent.atomic.AtomicReference
14
14
import kotlinx.coroutines.sync.Semaphore
15
15
16
16
/* *
17
- * A [StreamReadContextManager] may be injected in a
18
- * [io.airbyte.cdk.source.PartitionsCreatorFactory] to provide it, and the
19
- * [io.airbyte.cdk.source.PartitionsCreator] and [io.airbyte.cdk.source.PartitionReader] instances
20
- * it creates, with a set of global singletons useful for implementing stream READs for a JDBC
21
- * source.
17
+ * A [StreamReadContextManager] may be injected in a [io.airbyte.cdk.read.PartitionsCreatorFactory]
18
+ * to provide it, and the[io.airbyte.cdk.read.PartitionsCreator] and
19
+ * [io.airbyte.cdk.read.PartitionReader] instances it creates, with a set of global singletons
20
+ * useful for implementing stream READs for a JDBC source.
22
21
*
23
22
* For each stream in the configured catalog, these global singletons are packaged in a
24
23
* [StreamReadContext] which bundles them with the corresponding [Stream] as well as a couple
@@ -61,7 +60,7 @@ class StreamReadContext(
61
60
val outputConsumer : OutputConsumer ,
62
61
val stream : Stream ,
63
62
) {
64
- val transientLimitState: TransientState <LimitState > = TransientState (configuration.initialLimit )
63
+ val transientLimitState: TransientState <LimitState > = TransientState (LimitState .minimum )
65
64
66
65
val transientCursorUpperBoundState: TransientState <JsonNode ?> = TransientState (null )
67
66
You can’t perform that action at this time.
0 commit comments