Skip to content

Commit b6825ee

Browse files
Bulk Load CDK: Simply Interface & Add Check (#45369)
1 parent 41a242b commit b6825ee

File tree

10 files changed

+96
-76
lines changed

10 files changed

+96
-76
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.check
6+
7+
import io.airbyte.cdk.Operation
8+
import io.airbyte.cdk.output.ExceptionHandler
9+
import io.micronaut.context.annotation.Requires
10+
import jakarta.inject.Singleton
11+
12+
@Singleton
13+
@Requires(property = Operation.PROPERTY, value = "check")
14+
@Requires(env = ["destination"])
15+
class CheckOperation(
16+
private val destination: DestinationCheck,
17+
private val exceptionHandler: ExceptionHandler,
18+
) : Operation {
19+
override fun execute() {
20+
try {
21+
destination.check()
22+
} catch (e: Exception) {
23+
exceptionHandler.handle(e)
24+
} finally {
25+
destination.cleanup()
26+
}
27+
}
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.check
6+
7+
interface DestinationCheck {
8+
fun check()
9+
fun cleanup() {}
10+
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/MemoryManager.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import kotlinx.coroutines.sync.withLock
1919
* TODO: Some degree of logging/monitoring around how accurate we're actually being?
2020
*/
2121
@Singleton
22-
class MemoryManager(availableMemoryProvider: AvailableMemoryProvider) {
22+
class MemoryManager(private val availableMemoryProvider: AvailableMemoryProvider) {
2323
private val totalMemoryBytes: Long = availableMemoryProvider.availableMemoryBytes
2424
private var usedMemoryBytes = AtomicLong(0L)
2525
private val mutex = Mutex()

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/OpenStreamTask.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55
package io.airbyte.cdk.task
66

77
import io.airbyte.cdk.command.DestinationStream
8-
import io.airbyte.cdk.write.Destination
8+
import io.airbyte.cdk.write.DestinationWrite
99
import io.airbyte.cdk.write.StreamLoader
1010
import io.micronaut.context.annotation.Secondary
1111
import jakarta.inject.Singleton
1212

1313
/**
14-
* Wraps @[StreamLoader.open] and starts the spill-to-disk tasks.
14+
* Wraps @[StreamLoader.start] and starts the spill-to-disk tasks.
1515
*
1616
* TODO: There's no reason to wait on initialization to start spilling to disk.
1717
*/
@@ -20,15 +20,15 @@ class OpenStreamTask(
2020
private val taskLauncher: DestinationTaskLauncher
2121
) : Task {
2222
override suspend fun execute() {
23-
streamLoader.open()
23+
streamLoader.start()
2424
taskLauncher.startSpillToDiskTasks(streamLoader)
2525
}
2626
}
2727

2828
@Singleton
2929
@Secondary
3030
class OpenStreamTaskFactory(
31-
private val destination: Destination,
31+
private val destination: DestinationWrite,
3232
) {
3333
fun make(taskLauncher: DestinationTaskLauncher, stream: DestinationStream): OpenStreamTask {
3434
return OpenStreamTask(destination.getStreamLoader(stream), taskLauncher)

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/SetupTask.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,18 @@
44

55
package io.airbyte.cdk.task
66

7-
import io.airbyte.cdk.write.Destination
7+
import io.airbyte.cdk.write.DestinationWrite
88
import io.micronaut.context.annotation.Secondary
99
import jakarta.inject.Singleton
1010

1111
/**
12-
* Wraps @[Destination.setup] and starts the open stream tasks.
12+
* Wraps @[DestinationWrite.setup] and starts the open stream tasks.
1313
*
1414
* TODO: This should call something like "TaskLauncher.setupComplete" and let it decide what to do
1515
* next.
1616
*/
1717
class SetupTask(
18-
private val destination: Destination,
18+
private val destination: DestinationWrite,
1919
private val taskLauncher: DestinationTaskLauncher
2020
) : Task {
2121
override suspend fun execute() {
@@ -27,7 +27,7 @@ class SetupTask(
2727
@Singleton
2828
@Secondary
2929
class SetupTaskFactory(
30-
private val destination: Destination,
30+
private val destination: DestinationWrite,
3131
) {
3232
fun make(taskLauncher: DestinationTaskLauncher): SetupTask {
3333
return SetupTask(destination, taskLauncher)

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/TeardownTask.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,19 @@
55
package io.airbyte.cdk.task
66

77
import io.airbyte.cdk.state.StreamsManager
8-
import io.airbyte.cdk.write.Destination
8+
import io.airbyte.cdk.write.DestinationWrite
99
import io.github.oshai.kotlinlogging.KotlinLogging
1010
import io.micronaut.context.annotation.Secondary
1111
import jakarta.inject.Singleton
1212
import java.util.concurrent.atomic.AtomicBoolean
1313

1414
/**
15-
* Wraps @[Destination.teardown] and stops the task launcher.
15+
* Wraps @[DestinationWrite.teardown] and stops the task launcher.
1616
*
1717
* TODO: Report teardown-complete and let the task launcher decide what to do next.
1818
*/
1919
class TeardownTask(
20-
private val destination: Destination,
20+
private val destination: DestinationWrite,
2121
private val streamsManager: StreamsManager,
2222
private val taskLauncher: DestinationTaskLauncher
2323
) : Task {
@@ -44,7 +44,7 @@ class TeardownTask(
4444
@Singleton
4545
@Secondary
4646
class TeardownTaskFactory(
47-
private val destination: Destination,
47+
private val destination: DestinationWrite,
4848
private val streamsManager: StreamsManager,
4949
) {
5050
fun make(taskLauncher: DestinationTaskLauncher): TeardownTask {

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/Destination.kt airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/DestinationWrite.kt

+12-8
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,30 @@ import io.micronaut.context.annotation.Secondary
99
import jakarta.inject.Singleton
1010

1111
/**
12-
* Implementor interface. Extended this only if you need to perform initialization and teardown
13-
* *across all streams*, or if your per-stream operations need shared global state.
14-
*
15-
* If initialization can be done on a per-stream basis, implement @[StreamLoaderFactory] instead.
12+
* Implementor interface. Every Destination must extend this and at least provide an implementation
13+
* of [getStreamLoader].
1614
*/
17-
interface Destination {
15+
interface DestinationWrite {
1816
// Called once before anything else
1917
suspend fun setup() {}
2018

2119
// Return a StreamLoader for the given stream
2220
fun getStreamLoader(stream: DestinationStream): StreamLoader
2321

24-
// Called once at the end of the job
22+
// Called once at the end of the job, unconditionally.
2523
suspend fun teardown(succeeded: Boolean = true) {}
2624
}
2725

2826
@Singleton
2927
@Secondary
30-
class DefaultDestination(private val streamLoaderFactory: StreamLoaderFactory) : Destination {
28+
class DefaultDestinationWrite : DestinationWrite {
29+
init {
30+
throw NotImplementedError(
31+
"DestinationWrite not implemented. Please create a custom @Singleton implementation."
32+
)
33+
}
34+
3135
override fun getStreamLoader(stream: DestinationStream): StreamLoader {
32-
return streamLoaderFactory.make(stream)
36+
throw NotImplementedError()
3337
}
3438
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/InputConsumer.kt

-10
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import io.airbyte.cdk.message.DestinationMessage
99
import io.airbyte.cdk.message.MessageQueueWriter
1010
import io.github.oshai.kotlinlogging.KLogger
1111
import io.github.oshai.kotlinlogging.KotlinLogging
12-
import io.micronaut.context.annotation.Factory
1312
import jakarta.inject.Singleton
1413
import java.io.InputStream
1514
import java.nio.charset.StandardCharsets
@@ -63,12 +62,3 @@ class DefaultInputConsumer(
6362
) : DeserializingInputStreamConsumer<DestinationMessage> {
6463
override val log = KotlinLogging.logger {}
6564
}
66-
67-
/** Override to provide a custom input stream. */
68-
@Factory
69-
class InputStreamFactory {
70-
@Singleton
71-
fun make(): InputStream {
72-
return System.`in`
73-
}
74-
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/StreamLoader.kt

+19-45
Original file line numberDiff line numberDiff line change
@@ -8,61 +8,35 @@ import io.airbyte.cdk.command.DestinationStream
88
import io.airbyte.cdk.message.Batch
99
import io.airbyte.cdk.message.DestinationRecord
1010
import io.airbyte.cdk.message.SimpleBatch
11-
import io.github.oshai.kotlinlogging.KotlinLogging
12-
import io.micronaut.context.annotation.Secondary
13-
import jakarta.inject.Singleton
1411

1512
/**
1613
* Implementor interface. The framework calls open and close once per stream at the beginning and
1714
* end of processing. The framework calls processRecords once per batch of records as batches of the
1815
* configured size become available. (Specified in @
19-
* [io.airbyte.cdk.command.WriteConfiguration.recordBatchSizeBytes]
16+
* [io.airbyte.cdk.command.WriteConfiguration.recordBatchSizeBytes])
2017
*
21-
* processBatch is called once per incomplete batch returned by either processRecords or
22-
* processBatch itself. See @[io.airbyte.cdk.message.Batch] for more details.
18+
* [start] is called once before any records are processed.
19+
*
20+
* [processRecords] is called whenever a batch of records is available for processing, and only
21+
* after [start] has returned successfully. The return value is a client-defined implementation of @
22+
* [Batch] that the framework may pass to [processBatch] and/or [finalize]. (See @[Batch] for more
23+
* details.)
24+
*
25+
* [processBatch] is called once per incomplete batch returned by either [processRecords] or
26+
* [processBatch] itself.
27+
*
28+
* [finalize] is called once after all records and batches have been processed successfully.
29+
*
30+
* [close] is called once after all records have been processed, regardless of success or failure.
31+
* If there are failed batches, they are passed in as an argument.
2332
*/
2433
interface StreamLoader {
2534
val stream: DestinationStream
2635

27-
suspend fun open() {}
36+
suspend fun start() {}
2837
suspend fun processRecords(records: Iterator<DestinationRecord>, totalSizeBytes: Long): Batch
29-
suspend fun processBatch(batch: Batch): Batch = SimpleBatch(state = Batch.State.COMPLETE)
30-
suspend fun close() {}
31-
}
32-
33-
/**
34-
* Default stream loader (Not yet implemented) will process the records into a locally staged file
35-
* of a format specified in the configuration.
36-
*/
37-
class DefaultStreamLoader(
38-
override val stream: DestinationStream,
39-
) : StreamLoader {
40-
val log = KotlinLogging.logger {}
41-
42-
override suspend fun processRecords(
43-
records: Iterator<DestinationRecord>,
44-
totalSizeBytes: Long
45-
): Batch {
46-
TODO(
47-
"Default implementation adds airbyte metadata, maybe flattens, no-op maps, and converts to destination format"
48-
)
49-
}
50-
}
51-
52-
/**
53-
* If you do not need to perform initialization and teardown across all streams, or if your
54-
* per-stream operations do not need shared global state, implement this interface instead of @
55-
* [Destination]. The framework will call it exactly once per stream to create instances that will
56-
* be used for the life cycle of the stream.
57-
*/
58-
interface StreamLoaderFactory {
59-
fun make(stream: DestinationStream): StreamLoader
60-
}
38+
suspend fun processBatch(batch: Batch): Batch = SimpleBatch(Batch.State.COMPLETE)
39+
suspend fun finalize() {}
6140

62-
@Singleton
63-
@Secondary
64-
class DefaultStreamLoaderFactory() : StreamLoaderFactory {
65-
override fun make(stream: DestinationStream): StreamLoader {
66-
TODO("See above")
67-
}
41+
suspend fun close(failedBatches: List<Batch> = emptyList()) {}
6842
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/WriteOperation.kt

+14
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ import io.airbyte.cdk.Operation
88
import io.airbyte.cdk.message.DestinationMessage
99
import io.airbyte.cdk.task.TaskLauncher
1010
import io.airbyte.cdk.task.TaskRunner
11+
import io.micronaut.context.annotation.Factory
1112
import io.micronaut.context.annotation.Requires
13+
import io.micronaut.context.annotation.Secondary
14+
import java.io.InputStream
1215
import javax.inject.Singleton
1316
import kotlinx.coroutines.launch
1417
import kotlinx.coroutines.runBlocking
@@ -34,3 +37,14 @@ class WriteOperation(
3437
}
3538
}
3639
}
40+
41+
/** Override to provide a custom input stream. */
42+
@Factory
43+
class InputStreamFactory {
44+
@Singleton
45+
@Secondary
46+
@Requires(property = Operation.PROPERTY, value = "write")
47+
fun make(): InputStream {
48+
return System.`in`
49+
}
50+
}

0 commit comments

Comments
 (0)