Skip to content

(Incomplete) First Cut Load CDK with E2E Destination #44822

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions airbyte-cdk/bulk/core/load/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
dependencies {
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
implementation 'org.apache.commons:commons-lang3:3.14.0'

// For ranges and rangesets
implementation("com.google.guava:guava:33.3.0-jre")

testFixturesApi testFixtures(project(':airbyte-cdk:bulk:core:bulk-cdk-core-base'))

testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.7.1")
implementation "org.jetbrains.kotlin:kotlin-reflect:2.0.0"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.command

import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton

/**
* Internal representation of destination streams. This is intended to be a case class specialized
* for usability.
*/
data class DestinationCatalog(
val streams: List<DestinationStream> = emptyList(),
) {
private val byDescriptor: Map<DestinationStream.Descriptor, DestinationStream> =
streams.associateBy { it.descriptor }

fun getStream(name: String, namespace: String): DestinationStream {
val descriptor = DestinationStream.Descriptor(namespace = namespace, name = name)
return byDescriptor[descriptor]
?: throw IllegalArgumentException("Stream not found: namespace=$namespace, name=$name")
}
}

@Factory
class DestinationCatalogFactory(
private val catalog: ConfiguredAirbyteCatalog,
private val streamFactory: DestinationStreamFactory
) {
@Singleton
fun make(): DestinationCatalog {
return DestinationCatalog(streams = catalog.streams.map { streamFactory.make(it) })
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.command

import io.micronaut.context.annotation.ConfigurationProperties
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton

@ConfigurationProperties("destination.config")
interface DestinationConfiguration : Configuration {
/**
* Micronaut factory which glues [ConfigurationJsonObjectSupplier] and
* [DestinationConfigurationFactory] together to produce a [DestinationConfiguration] singleton.
*/
@Factory
private class MicronautFactory {
@Singleton
fun <I : ConfigurationJsonObjectBase> sourceConfig(
pojoSupplier: ConfigurationJsonObjectSupplier<I>,
factory: DestinationConfigurationFactory<I, out DestinationConfiguration>,
): DestinationConfiguration = factory.make(pojoSupplier.get())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.command

import io.airbyte.cdk.ConfigErrorException

interface DestinationConfigurationFactory<
I : ConfigurationJsonObjectBase, O : DestinationConfiguration> {
fun makeWithoutExceptionHandling(pojo: I): O

/** Wraps [makeWithoutExceptionHandling] exceptions in [ConfigErrorException]. */
fun make(pojo: I): O =
try {
makeWithoutExceptionHandling(pojo)
} catch (e: Exception) {
// Wrap NPEs (mostly) in ConfigErrorException.
throw ConfigErrorException("Failed to build ConnectorConfiguration.", e)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.command

import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import jakarta.inject.Singleton

/**
* Internal representation of destination streams. This is intended to be a case class specialized
* for usability.
*
* TODO: Add missing info like sync type, generation_id, etc.
*
* TODO: Add dedicated schema type, converted from json-schema.
*/
class DestinationStream(val descriptor: Descriptor) {
data class Descriptor(val namespace: String, val name: String)

override fun hashCode(): Int {
return descriptor.hashCode()
}

override fun equals(other: Any?): Boolean {
return other is DestinationStream && descriptor == other.descriptor
}

override fun toString(): String {
return "DestinationStream(descriptor=$descriptor)"
}
}

@Singleton
class DestinationStreamFactory {
fun make(stream: ConfiguredAirbyteStream): DestinationStream {
return DestinationStream(
descriptor =
DestinationStream.Descriptor(
namespace = stream.stream.namespace,
name = stream.stream.name
)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.command

import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton

/**
* General configuration for the write operation. The implementor can override this to tweak runtime
* behavior.
*/
interface WriteConfiguration {
/** Batch accumulation settings. */
val recordBatchSizeBytes: Long
val firstStageTmpFilePrefix: String

/** Memory queue settings */
val maxMessageQueueMemoryUsageRatio: Double // as fraction of available memory
val estimatedRecordMemoryOverheadRatio: Double // 0 => No overhead, 1.0 => 2x overhead
}

@Singleton
@Secondary
open class DefaultWriteConfiguration : WriteConfiguration {
override val recordBatchSizeBytes: Long = 200L * 1024L * 1024L
override val firstStageTmpFilePrefix = "airbyte-cdk-load-staged-raw-records"

override val maxMessageQueueMemoryUsageRatio: Double = 0.2
override val estimatedRecordMemoryOverheadRatio: Double = 0.1
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.message

import io.airbyte.protocol.models.v0.AirbyteGlobalState
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStateStats
import io.airbyte.protocol.models.v0.AirbyteStreamState
import io.airbyte.protocol.models.v0.StreamDescriptor
import jakarta.inject.Singleton

/**
* Converts the internal @[DestinationStateMessage] case class to the Protocol state messages
* required by @[io.airbyte.cdk.output.OutputConsumer]
*/
interface AirbyteStateMessageFactory {
fun fromDestinationStateMessage(message: DestinationStateMessage): AirbyteStateMessage
}

@Singleton
class DefaultAirbyteStateMessageFactory : AirbyteStateMessageFactory {
override fun fromDestinationStateMessage(
message: DestinationStateMessage
): AirbyteStateMessage {
return when (message) {
is DestinationStreamState ->
AirbyteStateMessage()
.withSourceStats(
AirbyteStateStats()
.withRecordCount(message.sourceStats.recordCount.toDouble())
)
.withDestinationStats(
message.destinationStats?.let {
AirbyteStateStats().withRecordCount(it.recordCount.toDouble())
}
?: throw IllegalStateException(
"Destination stats must be provided for DestinationStreamState"
)
)
.withType(AirbyteStateMessage.AirbyteStateType.STREAM)
.withStream(fromStreamState(message.streamState))
is DestinationGlobalState ->
AirbyteStateMessage()
.withSourceStats(
AirbyteStateStats()
.withRecordCount(message.sourceStats.recordCount.toDouble())
)
.withDestinationStats(
message.destinationStats?.let {
AirbyteStateStats().withRecordCount(it.recordCount.toDouble())
}
)
.withType(AirbyteStateMessage.AirbyteStateType.GLOBAL)
.withGlobal(
AirbyteGlobalState()
.withSharedState(message.state)
.withStreamStates(message.streamStates.map { fromStreamState(it) })
)
}
}

private fun fromStreamState(
streamState: DestinationStateMessage.StreamState
): AirbyteStreamState {
return AirbyteStreamState()
.withStreamDescriptor(
StreamDescriptor()
.withNamespace(streamState.stream.descriptor.namespace)
.withName(streamState.stream.descriptor.name)
)
.withStreamState(streamState.state)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.message

import com.google.common.collect.Range
import com.google.common.collect.RangeSet
import com.google.common.collect.TreeRangeSet
import java.nio.file.Path

/**
* Represents an accumulated batch of records in some stage of processing.
*
* Emitted by @[io.airbyte.cdk.write.StreamLoader.processRecords] to describe the batch accumulated.
* Non-[State.COMPLETE] batches are routed to @[io.airbyte.cdk.write.StreamLoader.processBatch]
* re-entrantly until completion.
*
* The framework will track the association between the Batch and the range of records it
* represents, by [Batch.State]s. The [State.PERSISTED] state has special meaning: it indicates that
* the associated ranges have been persisted remotely, and that platform checkpoint messages can be
* emitted.
*
* [State.SPOOLED] is used internally to indicate that records have been spooled to disk for
* processing and should not be used by implementors.
*
* When a stream has been read to End-of-stream, and all ranges between 0 and End-of-stream are
* [State.COMPLETE], then all records are considered to have been processed.
*
* The intended usage for implementors is to implement the provided interfaces in case classes that
* contain the necessary metadata for processing, using them in @
* [io.airbyte.cdk.write.StreamLoader.processBatch] to route to the appropriate handler(s).
*
* For example:
*
* ```kotlin
* sealed class MyBatch: Batch
* data class MyLocalFile(
* override val path: Path,
* override val totalSizeBytes: Long
* ): StagedLocalFile
* data class MyRemoteObject(
* override val key: String
* ): RemoteObject
* // etc...
* ```
*/
interface Batch {
enum class State {
SPOOLED,
LOCAL,
PERSISTED,
COMPLETE
}

val state: State
}

/** Simple batch: use if you need no other metadata for processing. */
data class SimpleBatch(override val state: Batch.State) : Batch

/** Represents a file of records locally staged. */
abstract class StagedLocalFile() : Batch {
override val state: Batch.State = Batch.State.LOCAL
abstract val localPath: Path
abstract val totalSizeBytes: Long
}

/** Represents a remote object containing persisted records. */
abstract class RemoteObject() : Batch {
override val state: Batch.State = Batch.State.PERSISTED
abstract val key: String
}

/**
* Represents a file of raw records staged to disk for pre-processing. Used internally by the
* framework
*/
data class SpooledRawMessagesLocalFile(
override val localPath: Path,
override val totalSizeBytes: Long,
override val state: Batch.State = Batch.State.SPOOLED
) : StagedLocalFile()

/**
* Internally-used wrapper for tracking the association between a batch and the range of records it
* contains.
*/
data class BatchEnvelope<B : Batch>(
val batch: B,
val ranges: RangeSet<Long> = TreeRangeSet.create()
) {
constructor(
batch: B,
range: Range<Long>
) : this(batch = batch, ranges = TreeRangeSet.create(listOf(range)))

fun <C : Batch> withBatch(newBatch: C): BatchEnvelope<C> {
return BatchEnvelope(newBatch, ranges)
}
}
Loading
Loading