-
Notifications
You must be signed in to change notification settings - Fork 4.5k
continue workflows on restarts #10294
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...r-orchestrator/src/main/java/io/airbyte/container_orchestrator/ContainerOrchestratorApp.java
Outdated
Show resolved
Hide resolved
@@ -129,8 +129,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr | |||
ChildWorkflowOptions.newBuilder() | |||
.setWorkflowId("sync_" + maybeJobId.get()) | |||
.setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()) | |||
// This will cancel the child workflow when the parent is terminated | |||
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE) | |||
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some clarifying comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the idea that sending the cancel signal means that we can retry the launcher activity? terminate means it cannot be retried.
cancelling the launcher activity does not cancel its children, right? because when we resume we are reconnecting to the children. it just kills the launcher activity so it can be retried subsequently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do want cancellation to cancel its children. Otherwise, cancellation would not be able to stop a sync in progress. The current LauncherWorker
implementation kills everything for the connection when cancelled. This flag is necessary for this behavior; otherwise the cancellation doesn't propagate and the async kubernetes process is orphaned forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add info from our last two messages in the comments, but the behavior is correct imo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool. based on added info. i agree.
@@ -24,12 +26,20 @@ | |||
private static final int MAX_SYNC_TIMEOUT_DAYS = configs.getSyncJobMaxTimeoutDays(); | |||
private static final Duration DB_INTERACTION_TIMEOUT = Duration.ofSeconds(configs.getMaxActivityTimeoutSecond()); | |||
|
|||
// retry infinitely if the worker is killed without exceptions and dies due to timeouts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are examples of when this happens? do we risk getting stuck forever?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We rethrow everything as WorkerExceptions
into RuntimeExceptions
, so anything we do and catch should not be retried by this policy. Anything heartbeat related however should retry, which is what this allows.
Specifically we want to retry timeouts. The only risk of getting stuck forever would be if there was some way to get stuck in a heartbeat failure state consistently forever, which should not be possible. @benmoriceau any thoughts on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay. can you adjust the comment to explain this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain why it it not possible to get stuck in a heartbeat failure state forever?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talked with Benoit about this in person. The activity has the heartbeat timeout configured, so it's not possible to be stuck waiting for a heartbeat. However, it's worth adding a test explicitly to make sure that all other exceptions we may throw for the TemporalUtils
wrapper call are properly handled by this setting. I'm going to add a couple of unit tests for this to make sure we don't violate that expectation for this constant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
73b4668 has the tests for this. @benmoriceau and @cgardens is this sufficient? I did have to adjust the type of exception.
airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java
Outdated
Show resolved
Hide resolved
airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java
Show resolved
Hide resolved
airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java
Outdated
Show resolved
Hide resolved
fileMap, | ||
portMap); | ||
log.info("Creating " + podName + " for attempt number: " + jobRunConfig.getAttemptId()); | ||
killRunningPodsForConnection(podName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my understanding from our conversation about state machine yesterday was that we
- determine what pod we want to run
- kill any running pod (for the connection) that doesn't match it
- then if no pod is running that matches out running pod, start a pod
my understanding from this code change is the killing is now only happening in this conditional statement so the case where there is something running that doesn't match what we are looking for, then it won't get killed? is that what we want? am i misunderstanding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's more:
- Determine what pod we want to run
- If we haven't successfully initialized this pod in the past, kill any pod for that connection and create. If we can't create fail the attempt.
- Then attach to the pod (whether or not it was initialized this run)
Since we always run the deletion when creating, it eliminates the risk of our filtering down to a specific podName having an error in some way; it just axes everything for that connection id.
Do you think that's sufficiently close to the state machine diagram or should we change it on one side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup. i'm convinced.
…ner_orchestrator/ContainerOrchestratorApp.java Co-authored-by: Charles <[email protected]>
…/LauncherWorker.java Co-authored-by: Charles <[email protected]>
It actually looks like |
Please first take a look at the test cases to confirm that they're testing the correct behavior, then look at the settings changes for the sync workflow, and then finally at the handling for a deploy that happens while files are copied onto the pod.
Going to separate the timing change (30s -> 1s) to a separate PR to make it env var configurable.