Skip to content

Commit 780c98c

Browse files
authored
Add test that check that we continue a reset as a reset if it failed (#10806)
This is adding tests to make sure that a reset is continued as a reset after an attempt or as a job when the maximum amount of attempt is reach. It also fixes the workflow to continue as a reset in a new job if it fails more than the maximum number of attempt. Open question: - Is it what we want for the job (continue as a reset if the job failed)? - Do we need to respect the schedule if the reset failed more than the maximum attempts?
1 parent 7fb0993 commit 780c98c

10 files changed

+610
-206
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,17 @@ public void submitConnectionUpdaterAsync(final UUID connectionId) {
219219
final ConnectionManagerWorkflow connectionManagerWorkflow = getWorkflowOptionsWithWorkflowId(ConnectionManagerWorkflow.class,
220220
TemporalJobType.CONNECTION_UPDATER, getConnectionManagerName(connectionId));
221221
final BatchRequest signalRequest = client.newSignalWithStartRequest();
222-
final ConnectionUpdaterInput input = new ConnectionUpdaterInput(connectionId, null, null, false, 1, null, false);
222+
final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder()
223+
.connectionId(connectionId)
224+
.jobId(null)
225+
.attemptId(null)
226+
.fromFailure(false)
227+
.attemptNumber(1)
228+
.workflowState(null)
229+
.resetConnection(false)
230+
.fromJobResetFailure(false)
231+
.build();
232+
223233
signalRequest.add(connectionManagerWorkflow::run, input);
224234

225235
WorkflowClient.start(connectionManagerWorkflow::run, input);

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java

-6
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,6 @@ public interface ConnectionManagerWorkflow {
6363
@SignalMethod
6464
void retryFailedActivity();
6565

66-
/**
67-
* Use for testing in order to simulate a job failure.
68-
*/
69-
@SignalMethod
70-
void simulateFailure();
71-
7266
/**
7367
* Return the current state of the workflow.
7468
*/

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java

+13-13
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn
122122
// resetConnection flag to the next run so that that run can execute the actual reset
123123
workflowState.setResetConnection(connectionUpdaterInput.isResetConnection());
124124

125+
workflowState.setResetWithScheduling(connectionUpdaterInput.isFromJobResetFailure());
126+
125127
final Duration timeToWait = getTimeToWait(connectionUpdaterInput.getConnectionId());
126128

127129
Workflow.await(timeToWait,
@@ -209,10 +211,11 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,
209211
final int maxAttempt = configFetchActivity.getMaxAttempt().getMaxAttempt();
210212
final int attemptNumber = connectionUpdaterInput.getAttemptNumber();
211213

214+
if (workflowState.isResetConnection()) {
215+
workflowState.setContinueAsReset(true);
216+
}
217+
212218
if (maxAttempt > attemptNumber) {
213-
if (workflowState.isResetConnection()) {
214-
workflowState.setContinueAsReset(true);
215-
}
216219
// restart from failure
217220
connectionUpdaterInput.setAttemptNumber(attemptNumber + 1);
218221
connectionUpdaterInput.setFromFailure(true);
@@ -222,6 +225,9 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,
222225
"Job failed after too many retries for connection " + connectionId));
223226

224227
resetNewConnectionInput(connectionUpdaterInput);
228+
if (workflowState.isResetConnection()) {
229+
connectionUpdaterInput.setFromJobResetFailure(true);
230+
}
225231
}
226232
}
227233

@@ -265,6 +271,7 @@ public void connectionUpdated() {
265271
@Override
266272
public void resetConnection() {
267273
workflowState.setResetConnection(true);
274+
workflowState.setResetWithScheduling(false);
268275
if (workflowState.isRunning()) {
269276
workflowState.setCancelledForReset(true);
270277
cancellableSyncWorkflow.cancel();
@@ -276,11 +283,6 @@ public void retryFailedActivity() {
276283
workflowState.setRetryFailedActivity(true);
277284
}
278285

279-
@Override
280-
public void simulateFailure() {
281-
workflowState.setFailed(true);
282-
}
283-
284286
@Override
285287
public WorkflowState getState() {
286288
return workflowState;
@@ -313,7 +315,8 @@ public QuarantinedInformation getQuarantinedInformation() {
313315
* delete
314316
*/
315317
private Boolean skipScheduling() {
316-
return workflowState.isSkipScheduling() || workflowState.isDeleted() || workflowState.isUpdated() || workflowState.isResetConnection();
318+
return workflowState.isSkipScheduling() || workflowState.isDeleted() || workflowState.isUpdated() ||
319+
(!workflowState.isResetWithScheduling() && workflowState.isResetConnection());
317320
}
318321

319322
private void prepareForNextRunAndContinueAsNew(final ConnectionUpdaterInput connectionUpdaterInput) {
@@ -495,10 +498,7 @@ private boolean getFailStatus(final StandardSyncOutput standardSyncOutput) {
495498
return true;
496499
}
497500

498-
// For testing purpose we simulate a failure using a signal method to avoid having to do a static
499-
// mock.
500-
// We do override failure reason in this case.
501-
return false || workflowState.isFailed();
501+
return false;
502502
}
503503

504504
/**

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionUpdaterInput.java

+3
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@
88
import java.util.UUID;
99
import javax.annotation.Nullable;
1010
import lombok.AllArgsConstructor;
11+
import lombok.Builder;
1112
import lombok.Data;
1213
import lombok.NoArgsConstructor;
1314
import lombok.NonNull;
1415

1516
@Data
1617
@AllArgsConstructor
1718
@NoArgsConstructor
19+
@Builder
1820
public class ConnectionUpdaterInput {
1921

2022
@NonNull
@@ -36,5 +38,6 @@ public class ConnectionUpdaterInput {
3638
@Nullable
3739
private WorkflowState workflowState;
3840
private boolean resetConnection;
41+
private boolean fromJobResetFailure = false;
3942

4043
}

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java

+11
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan
3434
private boolean quarantined = false;
3535
private boolean success = true;
3636
private boolean cancelledForReset = false;
37+
private boolean resetWithScheduling = false;
3738

3839
public void setRunning(final boolean running) {
3940
final ChangedStateEvent event = new ChangedStateEvent(
@@ -131,6 +132,16 @@ public void setCancelledForReset(final boolean cancelledForReset) {
131132
this.cancelledForReset = cancelledForReset;
132133
}
133134

135+
public void setResetWithScheduling(final boolean resetWithScheduling) {
136+
final ChangedStateEvent event = new ChangedStateEvent(
137+
StateField.RESET_WITH_SCHEDULING,
138+
resetWithScheduling);
139+
stateChangedListener.addEvent(id, event);
140+
this.resetWithScheduling = resetWithScheduling;
141+
}
142+
143+
// TODO: bmoric -> This is noisy when inpecting the list of event, it should be just a single reset
144+
// event.
134145
public void reset() {
135146
this.setRunning(false);
136147
this.setDeleted(false);

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ enum StateField {
3535
QUARANTINED,
3636
SUCCESS,
3737
CANCELLED_FOR_RESET,
38+
RESET_WITH_SCHEDULING,
3839
}
3940

4041
@Value

0 commit comments

Comments
 (0)