Skip to content

Commit 1cdfdbe

Browse files
colesnodgrasssophia-wileylmossmansupertopheralovew
authored
convert container-orchestrator to micronaut (#19396)
* wip; add micronaut * add additional json deserializer methods * wip; converting to micronaut * misc cleanup * wip; broken * wip; still broken * wip * formatting * minor code cleanup; no actual changes * wip; still broken * removed commented out code; no longer broken * wip; clean-up micronaut code * cleanup; format * fix pmd issues * remove unused file * init ApplicationTest * edited link (#19444) * move 'Example values' into intl (#19446) * Revert "Update action.yml (#19416)" (#19450) This reverts commit 78fb528. * Notifications Workflow (#18735) * notification workflow * Bmoric/remove unused code (#19188) * Tmp * Move when the deletion is performed * Re-enable disable test * PR comments * Use cancel * rename * Fix test and version check position * remove unused temporal deletion code * Remove false todo * Rm repeated test * Rm unused import * Make sure that long running activity are not retried (#19452) * Parse list of dicts in json_schema_helper.find_nodes() (#19386) * Get test on nested list/dict passing - use index to query next object for list * Fix flakecheck * Test that get_node provides correct value * Improve test and test cases * Rewrite method for better comprehension * Add test for base-level key. Rewrite method for comprehension and handling this case * adding tests * fix test * formatting * remove unused dependencies * add missing test resource * format * add missing test resource (real) * format * add back protocol-models dep * format * pr feedback; log stacktrace Co-authored-by: Sophia Wiley <[email protected]> Co-authored-by: Lake Mossman <[email protected]> Co-authored-by: Topher Lubaway <[email protected]> Co-authored-by: Anne <[email protected]> Co-authored-by: Benoit Moriceau <[email protected]> Co-authored-by: Ella Rohm-Ensing <[email protected]>
1 parent 77112b0 commit 1cdfdbe

35 files changed

+1258
-628
lines changed

airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ public void create(final Map<String, String> allLabels,
393393
copyFilesToKubeConfigVolumeMain(createdPod, updatedFileMap);
394394
}
395395

396-
public static void copyFilesToKubeConfigVolumeMain(final Pod podDefinition, final Map<String, String> files) {
396+
private static void copyFilesToKubeConfigVolumeMain(final Pod podDefinition, final Map<String, String> files) {
397397
final List<Map.Entry<String, String>> fileEntries = new ArrayList<>(files.entrySet());
398398

399399
// copy this file last to indicate that the copy has completed

airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java

+24
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,30 @@ public static <T> T deserialize(final String jsonString, final Class<T> klass) {
6161
}
6262
}
6363

64+
public static <T> T deserialize(final String jsonString, final TypeReference<T> valueTypeRef) {
65+
try {
66+
return OBJECT_MAPPER.readValue(jsonString, valueTypeRef);
67+
} catch (final IOException e) {
68+
throw new RuntimeException(e);
69+
}
70+
}
71+
72+
public static <T> T deserialize(final File file, final Class<T> klass) {
73+
try {
74+
return OBJECT_MAPPER.readValue(file, klass);
75+
} catch (final IOException e) {
76+
throw new RuntimeException(e);
77+
}
78+
}
79+
80+
public static <T> T deserialize(final File file, final TypeReference<T> valueTypeRef) {
81+
try {
82+
return OBJECT_MAPPER.readValue(file, valueTypeRef);
83+
} catch (final IOException e) {
84+
throw new RuntimeException(e);
85+
}
86+
}
87+
6488
public static <T> T convertValue(final Object object, final Class<T> klass) {
6589
return OBJECT_MAPPER.convertValue(object, klass);
6690
}

airbyte-commons/src/main/java/io/airbyte/commons/logging/MdcScope.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ public class MdcScope implements AutoCloseable {
3535
public MdcScope(final Map<String, String> keyValuesToAdd) {
3636
originalContextMap = MDC.getCopyOfContextMap();
3737

38-
keyValuesToAdd.forEach(
39-
(key, value) -> MDC.put(key, value));
38+
keyValuesToAdd.forEach(MDC::put);
4039
}
4140

4241
@Override

airbyte-container-orchestrator/build.gradle

+12-11
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,30 @@ plugins {
33
}
44

55
dependencies {
6+
annotationProcessor platform(libs.micronaut.bom)
7+
annotationProcessor libs.bundles.micronaut.annotation.processor
8+
9+
implementation platform(libs.micronaut.bom)
10+
implementation libs.bundles.micronaut
11+
612
implementation 'io.fabric8:kubernetes-client:5.12.2'
7-
implementation 'org.apache.commons:commons-lang3:3.11'
8-
implementation 'org.apache.commons:commons-text:1.10.0'
9-
implementation 'org.eclipse.jetty:jetty-server:9.4.31.v20200723'
10-
implementation 'org.eclipse.jetty:jetty-servlet:9.4.31.v20200723'
1113
implementation libs.bundles.datadog
1214

1315
implementation project(':airbyte-api')
1416
implementation project(':airbyte-config:config-models')
15-
implementation project(':airbyte-config:config-persistence')
1617
implementation project(':airbyte-commons-protocol')
1718
implementation project(':airbyte-commons-temporal')
1819
implementation project(':airbyte-commons-worker')
1920
implementation project(':airbyte-config:init')
20-
implementation project(':airbyte-db:db-lib')
2121
implementation project(':airbyte-json-validation')
2222
implementation project(':airbyte-protocol:protocol-models')
23-
implementation project(':airbyte-persistence:job-persistence')
2423
implementation project(':airbyte-metrics:metrics-lib')
2524
implementation project(':airbyte-worker-models')
2625

26+
testAnnotationProcessor platform(libs.micronaut.bom)
27+
testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor
28+
29+
testImplementation libs.bundles.micronaut.test
2730
testImplementation 'org.mockito:mockito-inline:2.13.0'
2831
testImplementation libs.postgresql
2932
testImplementation libs.platform.testcontainers
@@ -32,11 +35,9 @@ dependencies {
3235
testImplementation project(':airbyte-commons-docker')
3336
}
3437

35-
36-
mainClassName = 'io.airbyte.container_orchestrator.ContainerOrchestratorApp'
37-
3838
application {
39-
mainClass = mainClassName
39+
applicationName = "airbyte-container-orchestrator"
40+
mainClass = "io.airbyte.container_orchestrator.Application"
4041
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
4142
}
4243

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.container_orchestrator;
6+
7+
import com.google.common.annotations.VisibleForTesting;
8+
import io.airbyte.commons.logging.LoggingHelper;
9+
import io.airbyte.commons.logging.MdcScope;
10+
import io.airbyte.container_orchestrator.orchestrator.JobOrchestrator;
11+
import io.airbyte.workers.process.AsyncKubePodStatus;
12+
import io.micronaut.runtime.Micronaut;
13+
import jakarta.inject.Named;
14+
import jakarta.inject.Singleton;
15+
import java.lang.invoke.MethodHandles;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
19+
/**
20+
* Entrypoint for the application responsible for launching containers and handling all message
21+
* passing for replication, normalization, and dbt. Also, the current version relies on a heartbeat
22+
* from a Temporal worker. This will also be removed in the future so this can run fully async.
23+
* <p>
24+
* This application retrieves most of its configuration from copied files from the calling Temporal
25+
* worker.
26+
* <p>
27+
* This app uses default logging which is directly captured by the calling Temporal worker. In the
28+
* future this will need to independently interact with cloud storage.
29+
*/
30+
@SuppressWarnings({"PMD.AvoidCatchingThrowable", "PMD.DoNotTerminateVM", "PMD.AvoidFieldNameMatchingTypeName"})
31+
@Singleton
32+
public class Application {
33+
34+
public static void main(final String[] args) {
35+
// To mimic previous behavior, assume an exit code of 1 unless Application.run returns otherwise.
36+
var exitCode = 1;
37+
try (final var ctx = Micronaut.run(Application.class, args)) {
38+
exitCode = ctx.getBean(Application.class).run();
39+
} catch (final Throwable t) {
40+
log.error("could not run {}", t.getMessage(), t);
41+
} finally {
42+
// this mimics the pre-micronaut code, unsure if there is a better way in micronaut to ensure a
43+
// non-zero exit code
44+
System.exit(exitCode);
45+
}
46+
}
47+
48+
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
49+
50+
private final String application;
51+
private final JobOrchestrator<?> jobOrchestrator;
52+
private final AsyncStateManager asyncStateManager;
53+
54+
public Application(@Named("application") final String application,
55+
final JobOrchestrator<?> jobOrchestrator,
56+
final AsyncStateManager asyncStateManager) {
57+
this.application = application;
58+
this.jobOrchestrator = jobOrchestrator;
59+
this.asyncStateManager = asyncStateManager;
60+
}
61+
62+
/**
63+
* Configures logging/mdc scope, and creates all objects necessary to handle state updates.
64+
* <p>
65+
* Handles state updates (including writing failures) and running the job orchestrator. As much of
66+
* the initialization as possible should go in here, so it's logged properly and the state storage
67+
* is updated appropriately.
68+
*/
69+
@VisibleForTesting
70+
int run() {
71+
// set mdc scope for the remaining execution
72+
try (final var mdcScope = new MdcScope.Builder()
73+
.setLogPrefix(application)
74+
.setPrefixColor(LoggingHelper.Color.CYAN_BACKGROUND)
75+
.build()) {
76+
77+
asyncStateManager.write(AsyncKubePodStatus.INITIALIZING);
78+
asyncStateManager.write(AsyncKubePodStatus.RUNNING);
79+
asyncStateManager.write(AsyncKubePodStatus.SUCCEEDED, jobOrchestrator.runJob().orElse(""));
80+
} catch (final Throwable t) {
81+
log.error("Killing orchestrator because of an Exception", t);
82+
asyncStateManager.write(AsyncKubePodStatus.FAILED);
83+
return 1;
84+
}
85+
86+
return 0;
87+
}
88+
89+
}

airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/AsyncStateManager.java

+69-8
Original file line numberDiff line numberDiff line change
@@ -4,37 +4,98 @@
44

55
package io.airbyte.container_orchestrator;
66

7+
import com.google.common.annotations.VisibleForTesting;
78
import io.airbyte.workers.process.AsyncKubePodStatus;
89
import io.airbyte.workers.process.KubePodInfo;
10+
import io.airbyte.workers.storage.DocumentStoreClient;
11+
import jakarta.inject.Singleton;
12+
import java.lang.invoke.MethodHandles;
13+
import java.util.List;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
916

1017
/**
1118
* The state manager writes the "truth" for states of the async pod process. If the store isn't
1219
* updated by the underlying pod, it will appear as failed.
13-
*
20+
* <p>
1421
* It doesn't have a single value for a state. Instead, in a location on cloud storage or disk, it
1522
* writes every state it's encountered.
1623
*/
17-
public interface AsyncStateManager {
24+
@Singleton
25+
public class AsyncStateManager {
26+
27+
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
28+
private static final List<AsyncKubePodStatus> STATUS_CHECK_ORDER = List.of(
29+
// terminal states first
30+
AsyncKubePodStatus.FAILED,
31+
AsyncKubePodStatus.SUCCEEDED,
32+
// then check in progress state
33+
AsyncKubePodStatus.RUNNING,
34+
// then check for initialization state
35+
AsyncKubePodStatus.INITIALIZING);
36+
37+
private final DocumentStoreClient documentStoreClient;
38+
private final KubePodInfo kubePodInfo;
39+
40+
public AsyncStateManager(final DocumentStoreClient documentStoreClient, final KubePodInfo kubePodInfo) {
41+
this.documentStoreClient = documentStoreClient;
42+
this.kubePodInfo = kubePodInfo;
43+
}
1844

1945
/**
20-
* Writes a file containing a string value to a location designated by the input status.
46+
* Writes an empty file to a location designated by the input status.
2147
*/
22-
void write(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status, final String value);
48+
public void write(final AsyncKubePodStatus status, final String value) {
49+
final var key = getDocumentStoreKey(status);
50+
log.info("Writing async status {} for {}...", status, kubePodInfo);
51+
documentStoreClient.write(key, value);
52+
}
2353

2454
/**
25-
* Writes an empty file to a location designated by the input status.
55+
* Writes a file containing a string value to a location designated by the input status.
2656
*/
27-
void write(final KubePodInfo kubePodInfo, final AsyncKubePodStatus status);
57+
public void write(final AsyncKubePodStatus status) {
58+
write(status, "");
59+
}
2860

2961
/**
3062
* Interprets the state given all written state messages for the pod.
63+
* <p>
64+
* Checks terminal states first, then running, then initialized. Defaults to not started.
65+
* <p>
66+
* The order matters here!
3167
*/
32-
AsyncKubePodStatus getStatus(final KubePodInfo kubePodInfo);
68+
public AsyncKubePodStatus getStatus() {
69+
return STATUS_CHECK_ORDER.stream()
70+
.filter(this::statusFileExists)
71+
.findFirst()
72+
.orElse(AsyncKubePodStatus.NOT_STARTED);
73+
}
3374

3475
/**
3576
* @return the output stored in the success file. This can be an empty string.
3677
* @throws IllegalArgumentException if no success file exists
3778
*/
38-
String getOutput(final KubePodInfo kubePodInfo) throws IllegalArgumentException;
79+
public String getOutput() throws IllegalArgumentException {
80+
final var key = getDocumentStoreKey(AsyncKubePodStatus.SUCCEEDED);
81+
final var output = documentStoreClient.read(key);
82+
83+
return output.orElseThrow(() -> new IllegalArgumentException("Expected to retrieve output from a successfully completed pod!"));
84+
85+
}
86+
87+
/**
88+
* IMPORTANT: Changing the storage location will orphan already existing kube pods when the new
89+
* version is deployed!
90+
*/
91+
@VisibleForTesting
92+
String getDocumentStoreKey(final AsyncKubePodStatus status) {
93+
return kubePodInfo.namespace() + "/" + kubePodInfo.name() + "/" + status.name();
94+
}
95+
96+
private boolean statusFileExists(final AsyncKubePodStatus status) {
97+
final var key = getDocumentStoreKey(status);
98+
return documentStoreClient.read(key).isPresent();
99+
}
39100

40101
}

0 commit comments

Comments
 (0)