-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Build failure reasons for synchronous jobs (check/spec/discover) #14715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
<T, U> SynchronousResponse<T> execute(final ConfigType configType, | ||
@Nullable final UUID connectorDefinitionId, | ||
final Function<UUID, TemporalResponse<U>> executor, | ||
final Function<U, T> outputMapper, | ||
final UUID workspaceId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method mainly remains the same, but to keep consumers of these SynchronousResponse
s the same, I introduced an outputMapper
function for going from the common ConnectorJobOutput
-> the specific response we want.
track(jobId, configType, connectorDefinitionId, workspaceId, outputState, mappedOutput); | ||
// TODO(pedro): report ConnectorJobOutput's failureReason to the JobErrorReporter, like the above |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To keep this PR somewhat manageable I've limited it to only building/surfacing the failure reasons for these jobs, but not actually reporting it to the JobErrorReporter (sentry). This is where that would happen, following the same pattern that's being used for the JobTracker (segment). This will come in a subsequent PR.
import io.airbyte.workers.Worker; | ||
|
||
public interface CheckConnectionWorker extends Worker<StandardCheckConnectionInput, StandardCheckConnectionOutput> {} | ||
public interface CheckConnectionWorker extends Worker<StandardCheckConnectionInput, ConnectorJobOutput> {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I don't love about using a common ConnectorJobOutput
is we lose some type-safety in what these workers are returning, but seems ok as to stick with our existing convention of using POJOs/json schema for defining these outputs.
messagesByType = streamFactory.create(IOs.newBufferedReader(stdout)) | ||
.collect(Collectors.groupingBy(AirbyteMessage::getType)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how we are building failure reasons from trace messages for each of these jobs:
- Instead of just filtering for the desired output when reading the stream initially, collect the messages by type
- filter for our desired output
- filter for a trace message if needed and use the existing FailureHelper to produce a FailureReason from it
boolean succeeded = exception == null; | ||
if (succeeded && operationOutput instanceof ConnectorJobOutput) { | ||
succeeded = getConnectorJobSucceeded((ConnectorJobOutput) operationOutput); | ||
} | ||
|
||
final JobMetadata metadata = new JobMetadata(succeeded, logPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the piece that determines whether the TemporalResponse was successful. Before it only considered exceptions as unsuccessful, and now we are also considering ConnectorJobOutputs with a failure reason as unsuccessful.
if (failureOutput.getFailureReason() != null) { | ||
syncOutput.setFailures(List.of(failureOutput.getFailureReason().withFailureOrigin(origin))); | ||
} else { | ||
final StandardCheckConnectionOutput checkOutput = failureOutput.getCheckConnection(); | ||
final Exception ex = new IllegalArgumentException(checkOutput.getMessage()); | ||
final FailureReason checkFailureReason = FailureHelper.checkFailure(ex, jobId, attemptId, origin); | ||
syncOutput.setFailures(List.of(checkFailureReason)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When building the failure for the checks that are executed before syncs, if it failed with a FailureReason from the connector just use that. Otherwise, build a FailureReason just as before.
@@ -350,8 +349,8 @@ private SyncCheckConnectionFailure checkConnections(final GenerateInputActivity. | |||
log.info("SOURCE CHECK: Skipped"); | |||
} else { | |||
log.info("SOURCE CHECK: Starting"); | |||
final StandardCheckConnectionOutput sourceCheckResponse = runMandatoryActivityWithOutput(checkActivity::run, checkSourceInput); | |||
if (sourceCheckResponse.getStatus() == Status.FAILED) { | |||
final ConnectorJobOutput sourceCheckResponse = runMandatoryActivityWithOutput(checkActivity::run, checkSourceInput); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This activity now has a different output type, being called from the ConnectionManagerWorkflow. I know when it comes to changing the inputs we have to do some version checks - is there anything special to do here because of this output change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@benmoriceau is the expert here, but since these are short-lived activities (vs syncs which can take days and span multiple deployments), maybe we don't need to do a version check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfortunatelly we need a new version here. You can check that by updating the workflowHistory in WorkflowReplayingTest
by something generated from master. Here the test doesn't fail because it is already controlled by a version. The time that an activity takes to run doesn't influence how likely it can have a versioning issue. The main factor is where in the workflow the activity is run. For example the last activity is less likely to have a version issue because the workflow will terminate/continue as new after it while for the first activity it is more likely that the workflow will be unloaded from memory and potentially replayed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@benmoriceau Ah! Thanks for pointing out the WorkflowReplayingTest
- it did indeed fail and was super helpful for getting this working. I've updated this to consider a version in 6305fe4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pedroslopez, about that. We have one last issue to ensure that the workflow are not block because of a version. After that we will clear all the version. For the future we will still keep the version but it will be removal in a timely manner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really nice work @pedroslopez! I defer to the reviews of the other members of the team, but 👍 from me!
"$schema": http://json-schema.org/draft-07/schema# | ||
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/ConnectorJobOutput.yaml | ||
title: ConnectorJobOutput | ||
description: connector command job output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
description: connector command job output | |
description: connector command job output for all connector commands other than READ and WRITE |
airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java
Outdated
Show resolved
Hide resolved
@@ -350,8 +349,8 @@ private SyncCheckConnectionFailure checkConnections(final GenerateInputActivity. | |||
log.info("SOURCE CHECK: Skipped"); | |||
} else { | |||
log.info("SOURCE CHECK: Starting"); | |||
final StandardCheckConnectionOutput sourceCheckResponse = runMandatoryActivityWithOutput(checkActivity::run, checkSourceInput); | |||
if (sourceCheckResponse.getStatus() == Status.FAILED) { | |||
final ConnectorJobOutput sourceCheckResponse = runMandatoryActivityWithOutput(checkActivity::run, checkSourceInput); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@benmoriceau is the expert here, but since these are short-lived activities (vs syncs which can take days and span multiple deployments), maybe we don't need to do a version check?
airbyte-config/config-models/src/main/resources/types/ConnectorJobOutput.yaml
Show resolved
Hide resolved
final UUID configId, | ||
final long createdAt, | ||
final long endedAt) { | ||
public static <T, U> SynchronousResponse<T> fromTemporalResponse(final TemporalResponse<U> temporalResponse, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be SynchronousResponse<U>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I wanted to keep the SynchronousResponse the same instead of the more generic type, the output is mapped and provided directly as an argument, so the TemporalResponse type and SynchronousResponse types can be different.
For example, for the discover job we have TemporalResponse<ConnectorJobOutput>
and SynchronousResponse<AirbyteCatalog>
@@ -350,8 +349,8 @@ private SyncCheckConnectionFailure checkConnections(final GenerateInputActivity. | |||
log.info("SOURCE CHECK: Skipped"); | |||
} else { | |||
log.info("SOURCE CHECK: Starting"); | |||
final StandardCheckConnectionOutput sourceCheckResponse = runMandatoryActivityWithOutput(checkActivity::run, checkSourceInput); | |||
if (sourceCheckResponse.getStatus() == Status.FAILED) { | |||
final ConnectorJobOutput sourceCheckResponse = runMandatoryActivityWithOutput(checkActivity::run, checkSourceInput); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfortunatelly we need a new version here. You can check that by updating the workflowHistory in WorkflowReplayingTest
by something generated from master. Here the test doesn't fail because it is already controlled by a version. The time that an activity takes to run doesn't influence how likely it can have a versioning issue. The main factor is where in the workflow the activity is run. For example the last activity is less likely to have a version issue because the workflow will terminate/continue as new after it while for the first activity it is more likely that the workflow will be unloaded from memory and potentially replayed.
|
||
return activity.run(new CheckConnectionInput(jobRunConfig, launcherConfig, connectionConfiguration)); | ||
return activity.runWithJobOutput(new CheckConnectionInput(jobRunConfig, launcherConfig, connectionConfiguration)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is less likely to happen because of the time that it takes to run this activity but we will need a version here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added in aefc013 - as part of this I also added a test with a previous workflow history to make sure it passes (it was indeed failing before adding the version checks)
) * demo for surfacing synchronous job failures * add missing changes for StandardDiscoverCatalogOutput impl * extract trace message failure reason for discover job * move to using a single pojo to represent synchronous job outputs * format * handle new output type in check before sync * re-genericize DefaultSynchronousSchedulerClient.execute * fix failing tests * fix failing scheduler client tests * get spec returns failure reason from trace message * build failure reason from trace message for check job * type safety * only consider error-type trace messages * add more tests * just use nulls * this was removed but incorrectly re-added when merging master into the branch * check output version for workflow replay support * refactor trace message finding to util method * additionalProperties: true * add versioning for CheckConnectionWorkflow * update comment
What
As described in Spec: Failure Reasons for Synchronous Jobs to report connector failures from synchronous jobs to sentry (#13857) we need to build and surface failure reasons from
AirbyteTraceMessage
s for these jobs.This PR focuses on building and surfacing these FailureReasons, while a follow up PR will focus on using this to actually send them to sentry.
How
AirbyteTraceMessage
s and build a FailureReason for it when encountering a non-zero exit code.ConnectorJobOutput
that contains afailureReason
fieldsucceeded=false
if the output is aConnectorJobOutput
with aFailureReason
Recommended reading order
x.java
y.python
🚨 User Impact 🚨
This change should have no user impact.