Skip to content

Commit 512e493

Browse files
committed
feat: worker for connector progressive rollouts (#13886)
1 parent d74ce0d commit 512e493

31 files changed

+1065
-0
lines changed

airbyte-api/server-api/src/main/kotlin/io/airbyte/api/client/AirbyteApiClient.kt

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import io.airbyte.api.client.generated.ActorDefinitionVersionApi
99
import io.airbyte.api.client.generated.AttemptApi
1010
import io.airbyte.api.client.generated.ConnectionApi
1111
import io.airbyte.api.client.generated.ConnectorBuilderProjectApi
12+
import io.airbyte.api.client.generated.ConnectorRolloutApi
1213
import io.airbyte.api.client.generated.DeploymentMetadataApi
1314
import io.airbyte.api.client.generated.DestinationApi
1415
import io.airbyte.api.client.generated.DestinationDefinitionApi
@@ -67,6 +68,7 @@ class AirbyteApiClient(
6768
val attemptApi = AttemptApi(basePath = basePath, client = httpClient, policy = policy)
6869
val connectionApi = ConnectionApi(basePath = basePath, client = httpClient, policy = policy)
6970
val connectorBuilderProjectApi = ConnectorBuilderProjectApi(basePath = basePath, client = httpClient, policy = policy)
71+
val connectorRolloutApi = ConnectorRolloutApi(basePath = basePath, client = httpClient, policy = policy)
7072
val deploymentMetadataApi = DeploymentMetadataApi(basePath = basePath, client = httpClient, policy = policy)
7173
val destinationApi = DestinationApi(basePath = basePath, client = httpClient, policy = policy)
7274
val destinationDefinitionApi = DestinationDefinitionApi(basePath = basePath, client = httpClient, policy = policy)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
ARG JAVA_WORKER_BASE_IMAGE_VERSION=2.2.2
2+
3+
FROM scratch as builder
4+
WORKDIR /app
5+
ADD airbyte-app.tar /app
6+
7+
FROM airbyte/airbyte-base-java-worker-image:${JAVA_WORKER_BASE_IMAGE_VERSION}
8+
9+
ENV APPLICATION airbyte-connector-rollout-worker
10+
ENV VERSION ${VERSION}
11+
# 5005 is the remote debug port
12+
EXPOSE 5005
13+
14+
WORKDIR /app
15+
COPY --chown=airbyte:airbyte --from=builder /app /app
16+
USER airbyte:airbyte
17+
18+
ENTRYPOINT ["/bin/bash", "-c", "airbyte-app/bin/${APPLICATION}"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Connector Rollout Worker
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
plugins {
2+
id("io.micronaut.application") version "4.4.2"
3+
id("io.airbyte.gradle.jvm.app")
4+
id("io.airbyte.gradle.jvm.lib")
5+
id("io.airbyte.gradle.publish")
6+
id("io.airbyte.gradle.docker")
7+
application
8+
}
9+
10+
group = "io.airbyte.connector.rollout.worker"
11+
12+
java {
13+
sourceCompatibility = JavaVersion.VERSION_21
14+
targetCompatibility = JavaVersion.VERSION_21
15+
}
16+
17+
repositories {
18+
mavenCentral()
19+
}
20+
21+
dependencies {
22+
implementation(project(mapOf("path" to ":oss:airbyte-commons-temporal")))
23+
// TODO: remove the deps not being used
24+
compileOnly(libs.lombok)
25+
annotationProcessor(libs.lombok) // Lombok must be added BEFORE Micronaut
26+
27+
implementation("io.temporal:temporal-sdk:1.25.0")
28+
implementation(project(":oss:airbyte-config:config-models"))
29+
implementation(project(":oss:airbyte-api:server-api"))
30+
implementation(project(":oss:airbyte-commons-temporal"))
31+
implementation(project(":oss:airbyte-commons-temporal-core"))
32+
implementation(libs.airbyte.protocol)
33+
34+
annotationProcessor("io.micronaut:micronaut-http-validation")
35+
annotationProcessor("io.micronaut.serde:micronaut-serde-processor")
36+
implementation("io.micronaut.serde:micronaut-serde-jackson")
37+
compileOnly("io.micronaut:micronaut-http-client")
38+
testImplementation("io.micronaut:micronaut-http-client")
39+
runtimeOnly("org.yaml:snakeyaml:1.33")
40+
41+
}
42+
43+
application {
44+
// Default to running ConnectorRolloutWorker
45+
mainClass.set("io.airbyte.connector.rollout.worker.ConnectorRolloutWorkerApplication")
46+
}
47+
48+
tasks.jar {
49+
manifest {
50+
attributes(
51+
"Main-Class" to "io.airbyte.connector.rollout.worker.ConnectorRolloutWorkerApplication"
52+
)
53+
}
54+
55+
archiveBaseName.set("run-connector-rollout-worker")
56+
archiveVersion.set("") // Remove the version from the JAR file name
57+
}
58+
59+
tasks.withType<JavaCompile> {
60+
options.encoding = "UTF-8"
61+
options.compilerArgs.addAll(listOf("-Xlint:unchecked", "-Xlint:deprecation"))
62+
}
63+
64+
micronaut {
65+
runtime("netty")
66+
testRuntime("junit5")
67+
processing {
68+
incremental(true)
69+
annotations("io.airbyte.connector.rollout.worker.*")
70+
}
71+
}
72+
73+
74+
tasks.withType<Jar> {
75+
duplicatesStrategy = DuplicatesStrategy.INCLUDE
76+
}
77+
78+
airbyte {
79+
application {
80+
mainClass.set("io.airbyte.connector.rollout.worker.ConnectorRolloutWorkerApplication")
81+
defaultJvmArgs = listOf("-XX:+ExitOnOutOfMemoryError", "-XX:MaxRAMPercentage=75.0")
82+
localEnvVars.putAll(
83+
mapOf(
84+
"AIRBYTE_ROLE" to "undefined",
85+
"AIRBYTE_VERSION" to "dev",
86+
"DATA_PLANE_ID" to "local",
87+
"MICRONAUT_ENVIRONMENTS" to "test"
88+
)
89+
)
90+
}
91+
docker {
92+
imageName.set("connector-rollout-worker")
93+
}
94+
}
95+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.connector.rollout.worker
6+
7+
import io.temporal.worker.WorkerFactory
8+
import jakarta.inject.Named
9+
import jakarta.inject.Singleton
10+
import org.slf4j.LoggerFactory
11+
12+
@Singleton
13+
class ConnectorRolloutWorker(
14+
@Named("connectorRolloutWorkerFactory") private val workerFactory: WorkerFactory,
15+
) {
16+
private val log = LoggerFactory.getLogger(ConnectorRolloutWorker::class.java)
17+
18+
fun startWorker() {
19+
workerFactory.start()
20+
}
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.connector.rollout.worker
6+
7+
import io.micronaut.runtime.Micronaut
8+
9+
object ConnectorRolloutWorkerApplication {
10+
@JvmStatic
11+
fun main(args: Array<String>) {
12+
val context = Micronaut.run(ConnectorRolloutWorkerApplication::class.java)
13+
val rolloutWorker = context.getBean(ConnectorRolloutWorker::class.java)
14+
rolloutWorker.startWorker()
15+
}
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.connector.rollout.worker
6+
7+
import io.airbyte.connector.rollout.worker.activities.FinalizeRolloutActivityImpl
8+
import io.airbyte.connector.rollout.worker.activities.FindRolloutActivityImpl
9+
import io.airbyte.connector.rollout.worker.activities.GetRolloutActivityImpl
10+
import io.airbyte.connector.rollout.worker.activities.StartRolloutActivityImpl
11+
import io.airbyte.connector.rollout.worker.activities.UpdateRolloutActivityImpl
12+
import io.micronaut.context.annotation.Factory
13+
import io.temporal.client.WorkflowClient
14+
import io.temporal.worker.Worker
15+
import io.temporal.worker.WorkerFactory
16+
import jakarta.inject.Named
17+
import jakarta.inject.Singleton
18+
import org.slf4j.LoggerFactory
19+
20+
@Factory
21+
class ConnectorRolloutWorkerFactory {
22+
private val log = LoggerFactory.getLogger(ConnectorRolloutWorkerFactory::class.java)
23+
24+
@Singleton
25+
@Named("connectorRolloutWorkerFactory")
26+
fun connectorRolloutWorkerFactory(
27+
workflowClient: WorkflowClient,
28+
startRolloutActivityImpl: StartRolloutActivityImpl,
29+
getRolloutActivityImpl: GetRolloutActivityImpl,
30+
findRolloutActivityImpl: FindRolloutActivityImpl,
31+
updateRolloutActivityImpl: UpdateRolloutActivityImpl,
32+
finalizeRolloutActivityImpl: FinalizeRolloutActivityImpl,
33+
): WorkerFactory {
34+
log.info("ConnectorRolloutWorkerFactory registering workflow")
35+
val workerFactory = WorkerFactory.newInstance(workflowClient)
36+
val worker: Worker = workerFactory.newWorker(Constants.TASK_QUEUE)
37+
worker.registerWorkflowImplementationTypes(ConnectorRolloutWorkflowImpl::class.java)
38+
worker.registerActivitiesImplementations(
39+
startRolloutActivityImpl,
40+
getRolloutActivityImpl,
41+
findRolloutActivityImpl,
42+
updateRolloutActivityImpl,
43+
finalizeRolloutActivityImpl,
44+
)
45+
return workerFactory
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.connector.rollout.worker
6+
7+
import io.airbyte.config.ConnectorEnumRolloutState
8+
import io.airbyte.connector.rollout.worker.models.ConnectorRolloutActivityInputFinalize
9+
import io.airbyte.connector.rollout.worker.models.ConnectorRolloutActivityInputFind
10+
import io.airbyte.connector.rollout.worker.models.ConnectorRolloutActivityInputGet
11+
import io.airbyte.connector.rollout.worker.models.ConnectorRolloutActivityInputStart
12+
import io.airbyte.connector.rollout.worker.models.ConnectorRolloutActivityInputUpdate
13+
import io.airbyte.connector.rollout.worker.models.ConnectorRolloutOutput
14+
import io.temporal.workflow.UpdateMethod
15+
import io.temporal.workflow.UpdateValidatorMethod
16+
import io.temporal.workflow.WorkflowInterface
17+
import io.temporal.workflow.WorkflowMethod
18+
19+
@WorkflowInterface
20+
interface ConnectorRolloutWorkflow {
21+
@WorkflowMethod
22+
fun run(input: ConnectorRolloutActivityInputStart): ConnectorEnumRolloutState
23+
24+
@UpdateMethod
25+
fun startRollout(input: ConnectorRolloutActivityInputStart): ConnectorRolloutOutput
26+
27+
@UpdateValidatorMethod(updateName = "startRollout")
28+
fun startRolloutValidator(input: ConnectorRolloutActivityInputStart)
29+
30+
@UpdateMethod
31+
fun findRollout(input: ConnectorRolloutActivityInputFind): List<ConnectorRolloutOutput>
32+
33+
@UpdateValidatorMethod(updateName = "findRollout")
34+
fun findRolloutValidator(input: ConnectorRolloutActivityInputFind)
35+
36+
@UpdateMethod
37+
fun getRollout(input: ConnectorRolloutActivityInputGet): ConnectorRolloutOutput
38+
39+
@UpdateValidatorMethod(updateName = "getRollout")
40+
fun getRolloutValidator(input: ConnectorRolloutActivityInputGet)
41+
42+
@UpdateMethod
43+
fun updateRollout(input: ConnectorRolloutActivityInputUpdate): ConnectorRolloutOutput
44+
45+
@UpdateValidatorMethod(updateName = "updateRollout")
46+
fun updateRolloutValidator(input: ConnectorRolloutActivityInputUpdate)
47+
48+
@UpdateMethod
49+
fun finalizeRollout(input: ConnectorRolloutActivityInputFinalize): ConnectorRolloutOutput
50+
51+
@UpdateValidatorMethod(updateName = "finalizeRollout")
52+
fun finalizeRolloutValidator(input: ConnectorRolloutActivityInputFinalize)
53+
}

0 commit comments

Comments
 (0)