Skip to content

Build failure reasons for synchronous jobs (check/spec/discover) #14715

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Jul 19, 2022

Conversation

pedroslopez
Copy link
Contributor

@pedroslopez pedroslopez commented Jul 14, 2022

What

As described in Spec: Failure Reasons for Synchronous Jobs to report connector failures from synchronous jobs to sentry (#13857) we need to build and surface failure reasons from AirbyteTraceMessages for these jobs.

This PR focuses on building and surfacing these FailureReasons, while a follow up PR will focus on using this to actually send them to sentry.

How

  • Check/Spec/Discover workers now look for AirbyteTraceMessages and build a FailureReason for it when encountering a non-zero exit code.
  • Check/Spec/Discover workers now output a ConnectorJobOutput that contains a failureReason field
  • TemporalResponse is now marked as succeeded=false if the output is a ConnectorJobOutput with a FailureReason

Recommended reading order

  1. x.java
  2. y.python

🚨 User Impact 🚨

This change should have no user impact.

@github-actions github-actions bot added area/platform issues related to the platform area/scheduler area/server area/worker Related to worker labels Jul 14, 2022
@pedroslopez pedroslopez temporarily deployed to more-secrets July 14, 2022 15:37 Inactive
Comment on lines +116 to +120
<T, U> SynchronousResponse<T> execute(final ConfigType configType,
@Nullable final UUID connectorDefinitionId,
final Function<UUID, TemporalResponse<U>> executor,
final Function<U, T> outputMapper,
final UUID workspaceId) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method mainly remains the same, but to keep consumers of these SynchronousResponses the same, I introduced an outputMapper function for going from the common ConnectorJobOutput -> the specific response we want.

Comment on lines +130 to +131
track(jobId, configType, connectorDefinitionId, workspaceId, outputState, mappedOutput);
// TODO(pedro): report ConnectorJobOutput's failureReason to the JobErrorReporter, like the above
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep this PR somewhat manageable I've limited it to only building/surfacing the failure reasons for these jobs, but not actually reporting it to the JobErrorReporter (sentry). This is where that would happen, following the same pattern that's being used for the JobTracker (segment). This will come in a subsequent PR.

import io.airbyte.workers.Worker;

public interface CheckConnectionWorker extends Worker<StandardCheckConnectionInput, StandardCheckConnectionOutput> {}
public interface CheckConnectionWorker extends Worker<StandardCheckConnectionInput, ConnectorJobOutput> {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I don't love about using a common ConnectorJobOutput is we lose some type-safety in what these workers are returning, but seems ok as to stick with our existing convention of using POJOs/json schema for defining these outputs.

Comment on lines +71 to +72
messagesByType = streamFactory.create(IOs.newBufferedReader(stdout))
.collect(Collectors.groupingBy(AirbyteMessage::getType));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how we are building failure reasons from trace messages for each of these jobs:

  • Instead of just filtering for the desired output when reading the stream initially, collect the messages by type
  • filter for our desired output
  • filter for a trace message if needed and use the existing FailureHelper to produce a FailureReason from it

Comment on lines +475 to +480
boolean succeeded = exception == null;
if (succeeded && operationOutput instanceof ConnectorJobOutput) {
succeeded = getConnectorJobSucceeded((ConnectorJobOutput) operationOutput);
}

final JobMetadata metadata = new JobMetadata(succeeded, logPath);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the piece that determines whether the TemporalResponse was successful. Before it only considered exceptions as unsuccessful, and now we are also considering ConnectorJobOutputs with a failure reason as unsuccessful.

Comment on lines +74 to +81
if (failureOutput.getFailureReason() != null) {
syncOutput.setFailures(List.of(failureOutput.getFailureReason().withFailureOrigin(origin)));
} else {
final StandardCheckConnectionOutput checkOutput = failureOutput.getCheckConnection();
final Exception ex = new IllegalArgumentException(checkOutput.getMessage());
final FailureReason checkFailureReason = FailureHelper.checkFailure(ex, jobId, attemptId, origin);
syncOutput.setFailures(List.of(checkFailureReason));
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When building the failure for the checks that are executed before syncs, if it failed with a FailureReason from the connector just use that. Otherwise, build a FailureReason just as before.

@@ -350,8 +349,8 @@ private SyncCheckConnectionFailure checkConnections(final GenerateInputActivity.
log.info("SOURCE CHECK: Skipped");
} else {
log.info("SOURCE CHECK: Starting");
final StandardCheckConnectionOutput sourceCheckResponse = runMandatoryActivityWithOutput(checkActivity::run, checkSourceInput);
if (sourceCheckResponse.getStatus() == Status.FAILED) {
final ConnectorJobOutput sourceCheckResponse = runMandatoryActivityWithOutput(checkActivity::run, checkSourceInput);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This activity now has a different output type, being called from the ConnectionManagerWorkflow. I know when it comes to changing the inputs we have to do some version checks - is there anything special to do here because of this output change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benmoriceau is the expert here, but since these are short-lived activities (vs syncs which can take days and span multiple deployments), maybe we don't need to do a version check?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunatelly we need a new version here. You can check that by updating the workflowHistory in WorkflowReplayingTest by something generated from master. Here the test doesn't fail because it is already controlled by a version. The time that an activity takes to run doesn't influence how likely it can have a versioning issue. The main factor is where in the workflow the activity is run. For example the last activity is less likely to have a version issue because the workflow will terminate/continue as new after it while for the first activity it is more likely that the workflow will be unloaded from memory and potentially replayed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benmoriceau Ah! Thanks for pointing out the WorkflowReplayingTest - it did indeed fail and was super helpful for getting this working. I've updated this to consider a version in 6305fe4

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pedroslopez, about that. We have one last issue to ensure that the workflow are not block because of a version. After that we will clear all the version. For the future we will still keep the version but it will be removal in a timely manner.

@pedroslopez pedroslopez marked this pull request as ready for review July 14, 2022 16:10
Copy link
Contributor

@evantahler evantahler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice work @pedroslopez! I defer to the reviews of the other members of the team, but 👍 from me!

"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/ConnectorJobOutput.yaml
title: ConnectorJobOutput
description: connector command job output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
description: connector command job output
description: connector command job output for all connector commands other than READ and WRITE

@@ -350,8 +349,8 @@ private SyncCheckConnectionFailure checkConnections(final GenerateInputActivity.
log.info("SOURCE CHECK: Skipped");
} else {
log.info("SOURCE CHECK: Starting");
final StandardCheckConnectionOutput sourceCheckResponse = runMandatoryActivityWithOutput(checkActivity::run, checkSourceInput);
if (sourceCheckResponse.getStatus() == Status.FAILED) {
final ConnectorJobOutput sourceCheckResponse = runMandatoryActivityWithOutput(checkActivity::run, checkSourceInput);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benmoriceau is the expert here, but since these are short-lived activities (vs syncs which can take days and span multiple deployments), maybe we don't need to do a version check?

final UUID configId,
final long createdAt,
final long endedAt) {
public static <T, U> SynchronousResponse<T> fromTemporalResponse(final TemporalResponse<U> temporalResponse,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be SynchronousResponse<U> ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I wanted to keep the SynchronousResponse the same instead of the more generic type, the output is mapped and provided directly as an argument, so the TemporalResponse type and SynchronousResponse types can be different.

For example, for the discover job we have TemporalResponse<ConnectorJobOutput> and SynchronousResponse<AirbyteCatalog>

@@ -350,8 +349,8 @@ private SyncCheckConnectionFailure checkConnections(final GenerateInputActivity.
log.info("SOURCE CHECK: Skipped");
} else {
log.info("SOURCE CHECK: Starting");
final StandardCheckConnectionOutput sourceCheckResponse = runMandatoryActivityWithOutput(checkActivity::run, checkSourceInput);
if (sourceCheckResponse.getStatus() == Status.FAILED) {
final ConnectorJobOutput sourceCheckResponse = runMandatoryActivityWithOutput(checkActivity::run, checkSourceInput);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunatelly we need a new version here. You can check that by updating the workflowHistory in WorkflowReplayingTest by something generated from master. Here the test doesn't fail because it is already controlled by a version. The time that an activity takes to run doesn't influence how likely it can have a versioning issue. The main factor is where in the workflow the activity is run. For example the last activity is less likely to have a version issue because the workflow will terminate/continue as new after it while for the first activity it is more likely that the workflow will be unloaded from memory and potentially replayed.

@pedroslopez pedroslopez temporarily deployed to more-secrets July 17, 2022 01:31 Inactive
@pedroslopez pedroslopez temporarily deployed to more-secrets July 17, 2022 03:47 Inactive

return activity.run(new CheckConnectionInput(jobRunConfig, launcherConfig, connectionConfiguration));
return activity.runWithJobOutput(new CheckConnectionInput(jobRunConfig, launcherConfig, connectionConfiguration));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is less likely to happen because of the time that it takes to run this activity but we will need a version here.

Copy link
Contributor Author

@pedroslopez pedroslopez Jul 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in aefc013 - as part of this I also added a test with a previous workflow history to make sure it passes (it was indeed failing before adding the version checks)

@pedroslopez pedroslopez temporarily deployed to more-secrets July 19, 2022 20:38 Inactive
@pedroslopez pedroslopez requested a review from benmoriceau July 19, 2022 20:39
@pedroslopez pedroslopez temporarily deployed to more-secrets July 19, 2022 20:40 Inactive
@pedroslopez pedroslopez merged commit 198e580 into master Jul 19, 2022
@pedroslopez pedroslopez deleted the pedroslopez/synchr-job-reporter branch July 19, 2022 21:19
@edgao edgao mentioned this pull request Jul 19, 2022
mfsiega-airbyte pushed a commit that referenced this pull request Jul 21, 2022
)

* demo for surfacing synchronous job failures

* add missing changes for StandardDiscoverCatalogOutput impl

* extract trace message failure reason for discover job

* move to using a single pojo to represent synchronous job outputs

* format

* handle new output type in check before sync

* re-genericize DefaultSynchronousSchedulerClient.execute

* fix failing tests

* fix failing scheduler client tests

* get spec returns failure reason from trace message

* build failure reason from trace message for check job

* type safety

* only consider error-type trace messages

* add more tests

* just use nulls

* this was removed but incorrectly re-added when merging master into the branch

* check output version for workflow replay support

* refactor trace message finding to util method

* additionalProperties: true

* add versioning for CheckConnectionWorkflow

* update comment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants