-
Notifications
You must be signed in to change notification settings - Fork 4.5k
convert container-orchestrator to micronaut #19396
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ainer-orchestrator
…ainer-orchestrator
…ainer-orchestrator
…ainer-orchestrator
…ainer-orchestrator
…ainer-orchestrator
…ainer-orchestrator
…ainer-orchestrator
…ainer-orchestrator
…ainer-orchestrator
…ainer-orchestrator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…ainer-orchestrator
…rbytehq/airbyte into cole/micronaut-container-orchestrator
@@ -61,6 +61,30 @@ public static <T> T deserialize(final String jsonString, final Class<T> klass) { | |||
} | |||
} | |||
|
|||
public static <T> T deserialize(final String jsonString, final TypeReference<T> valueTypeRef) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
/** | ||
* Entrypoint for the application responsible for launching containers and handling all message | ||
* passing for replication, normalization, and dbt. Also, the current version relies on a heartbeat | ||
* from a Temporal worker. This will also be removed in the future so this can run fully async. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: I don't think this is right. The heartbeat is what tells the orchestrator to kill itself if Temporal goes down. @benmoriceau @gosusnp what will we replace this with?
...ontainer-orchestrator/src/main/java/io/airbyte/container_orchestrator/AsyncStateManager.java
Show resolved
Hide resolved
} | ||
} | ||
|
||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fancy!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AKA copy/paste friendly 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed! Wish I had known this earlier.
asyncStateManager.write(AsyncKubePodStatus.INITIALIZING); | ||
asyncStateManager.write(AsyncKubePodStatus.RUNNING); | ||
asyncStateManager.write(AsyncKubePodStatus.SUCCEEDED, jobOrchestrator.runJob().orElse("")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to confirm: this is only for tests yes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
caught my eye because the logic here seems too simple
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. This is the business logic
of this code.
How it use to work:
write(INITIALIZING)
- block until the configuration files have been copied over via
kubectl cp
write(RUNNING)
- run job
5write(SUCCEEDED)
How it works now:
- init-container blocks until configuration files have been copied over via
kubectl cp
write(INITIALIZING)
// not really necessary any more, as it immediately changeswrite(RUNNING)
- run job
write(SUCCEEDED)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the setup and configuration code that used to exist and run between INITIALIZING
and RUNNING
no longer exists as that is now handled by the initContainer
and Micronaut
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks for explaining.
feels like INITIALIZING
should be moved to the async orchestrator pod process. or removed entirely if we don't think it's useful to track that an attempt was even made.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
at least for me, writing asyncStateManager.write(AsyncKubePodStatus.SUCCEEDED, jobOrchestrator.runJob().orElse(""));
is somewhat confusing since it's not immediately clear runJob
is blocking.
Would have preferred a more linear style to clarify that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, seems like we could remove it entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thinking more, I feel like it's useful to keep for debugging. would make it obvious if anything crashed between the pod spinning up and actually running. don't feel strongly though
airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/Application.java
Show resolved
Hide resolved
@@ -2,7 +2,7 @@ | |||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | |||
*/ | |||
|
|||
package io.airbyte.container_orchestrator; | |||
package io.airbyte.container_orchestrator.orchestrator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I appreciate cleaning up the packages!
...strator/src/main/java/io/airbyte/container_orchestrator/orchestrator/DbtJobOrchestrator.java
Show resolved
Hide resolved
"image": "airbyte/container-orchestrator:dev-f0bb7a0ba3", | ||
"pullPolicy": "IfNotPresent" | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline
port: 9000 | ||
|
||
airbyte: | ||
config-dir: src/test/resources/files |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline
@@ -0,0 +1,31 @@ | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for the env testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe very nit: these should be sorted for easier reading/future proofing.
|
||
@Test | ||
void processFactory() { | ||
assertInstanceOf(KubeProcessFactory.class, processFactory); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting - so this is testing micronaut injects the right class based of env vars?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the idea is to catch an injection issue during tests. This kind of test won't be necessary if we remove all the @Factory
classes and make Micronaut manage everything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kind of like it even if it's fully micronaut managed. We've had several incidents where runtime injection wasn't correct due to either bad runtime dependencies or wrong configuration. Something like this would have help catch it.
@@ -0,0 +1 @@ | |||
normalization-orchestrator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this say container-orchestrator instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, this file tells the container-orchestrator what kind of orchestrator to spin-up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because we spin up the normalization orchestrator in tests?
@@ -52,15 +54,17 @@ public Class<NormalizationInput> getInputClass() { | |||
@Trace(operationName = JOB_ORCHESTRATOR_OPERATION_NAME) | |||
@Override | |||
public Optional<String> runJob() throws Exception { | |||
final JobRunConfig jobRunConfig = JobOrchestrator.readJobRunConfig(); | |||
// final JobRunConfig jobRunConfig = readJobRunConfig(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove commented out
|
||
public NormalizationJobOrchestrator(final Configs configs, final ProcessFactory processFactory) { | ||
public NormalizationJobOrchestrator(final Configs configs, final ProcessFactory processFactory, final JobRunConfig jobRunConfig) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to this change
...e-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/EventListeners.java
Show resolved
Hide resolved
@VisibleForTesting | ||
int run() { | ||
// set mdc scope for the remaining execution | ||
try (final var mdcScope = new MdcScope.Builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again for understanding, how does this interact with the logging set in EventListeners?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This maintains the existing logging workflow from the pre-micronaut version:
public void run() {
// this is found in the configureLogging() method, inlined here for ease of reading
final LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
ctx.reconfigure();
logClient.setJobMdc(/*...*/);
// set mdc scope for the remaining execution
try (final var mdcScope = new MdcScope.Builder()
.setLogPrefix(application)
.setPrefixColor(LoggingHelper.Color.CYAN_BACKGROUND)
.build()) {
// ...
}
}
So if the previous version was working as expected, this one will as well. The difference is the configuration of the logger was moved to the EventListener
to help keep the main Application
class as thin and single-purposed as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I'm missing something here then. What is the purpose of configuring the logger in the event listener if the configuration that matters is happening here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for the late review @colesnodgrass
Nicely done! Appreciate the general clean up along side the micronaut migration. All round better interface and readability.
Most of my comments are for understanding. I have a few suggestions here and there to improve readability.
What
How
deserialize
methods to theJsons
helper classContainerOrchestratorApp
toApplication
SUCCESS_FILE_NAME
WorkerHeartbeatServer
with itsHttpServlet
HeartbeatController
AsyncStateManager
interface withDefaultAsyncStateManager
class intoAsyncStateManager
classJobOrchestrator
classes to a new packageJobOrchestrator
interface with injected dependencies to the implementationsapplication.yml
micronaut-banner.txt
final _type_
->final var
Oddities
Application
class does a little too much currently and should probably be split into two different classesEventListeners.setEnvVars
method should probably be replaced with the env-vars being set on the container itself (in the worker when it creates this pod and containers)@Secured
annotation on theHeartbeatController
could (should?) be replaced with something else