Skip to content

Commit 73b4668

Browse files
committed
test for retry logic
1 parent ea5f565 commit 73b4668

File tree

2 files changed

+150
-6
lines changed

2 files changed

+150
-6
lines changed

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66

77
import io.airbyte.config.Configs;
88
import io.airbyte.config.EnvConfigs;
9-
import io.airbyte.workers.WorkerException;
109
import io.airbyte.workers.temporal.TemporalUtils;
1110
import io.temporal.activity.ActivityCancellationType;
1211
import io.temporal.activity.ActivityOptions;
12+
import io.temporal.client.WorkflowFailedException;
1313
import io.temporal.common.RetryOptions;
1414
import java.time.Duration;
1515

@@ -28,11 +28,11 @@ public class ActivityConfiguration {
2828

2929
// retry infinitely if the worker is killed without exceptions and dies due to timeouts
3030
// but fail for everything thrown by the call itself which is rethrown as runtime exceptions
31-
private static final RetryOptions ORCHESTRATOR_RETRY = RetryOptions.newBuilder()
32-
.setDoNotRetry(RuntimeException.class.getName(), WorkerException.class.getName())
31+
public static final RetryOptions ORCHESTRATOR_RETRY = RetryOptions.newBuilder()
32+
.setDoNotRetry(RuntimeException.class.getName(), WorkflowFailedException.class.getName())
3333
.build();
3434

35-
private static final RetryOptions RETRY_POLICY = new EnvConfigs().getContainerOrchestratorEnabled() ? ORCHESTRATOR_RETRY : TemporalUtils.NO_RETRY;
35+
public static final RetryOptions RETRY_POLICY = new EnvConfigs().getContainerOrchestratorEnabled() ? ORCHESTRATOR_RETRY : TemporalUtils.NO_RETRY;
3636

3737
public static final ActivityOptions LONG_RUN_OPTIONS = ActivityOptions.newBuilder()
3838
.setScheduleToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS))

airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalUtilsTest.java

+146-2
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@
55
package io.airbyte.workers.temporal;
66

77
import static io.airbyte.workers.temporal.TemporalUtils.getTemporalClientWhenConnected;
8-
import static org.junit.jupiter.api.Assertions.assertEquals;
9-
import static org.junit.jupiter.api.Assertions.assertThrows;
8+
import static org.junit.jupiter.api.Assertions.*;
109
import static org.mockito.ArgumentMatchers.any;
1110
import static org.mockito.Mockito.doAnswer;
1211
import static org.mockito.Mockito.mock;
1312
import static org.mockito.Mockito.when;
1413

1514
import io.airbyte.commons.concurrency.VoidCallable;
15+
import io.airbyte.workers.WorkerException;
16+
import io.airbyte.workers.temporal.scheduling.shared.ActivityConfiguration;
1617
import io.temporal.activity.ActivityCancellationType;
1718
import io.temporal.activity.ActivityInterface;
1819
import io.temporal.activity.ActivityMethod;
@@ -22,6 +23,7 @@
2223
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
2324
import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse;
2425
import io.temporal.client.WorkflowClient;
26+
import io.temporal.client.WorkflowFailedException;
2527
import io.temporal.client.WorkflowOptions;
2628
import io.temporal.serviceclient.WorkflowServiceStubs;
2729
import io.temporal.testing.TestWorkflowEnvironment;
@@ -34,6 +36,8 @@
3436
import java.util.concurrent.CompletableFuture;
3537
import java.util.concurrent.CountDownLatch;
3638
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.atomic.AtomicInteger;
40+
import java.util.concurrent.atomic.AtomicReference;
3741
import java.util.function.Supplier;
3842
import org.apache.commons.lang3.tuple.ImmutablePair;
3943
import org.junit.jupiter.api.Test;
@@ -125,6 +129,70 @@ public void testWaitThatTimesOut() {
125129
});
126130
}
127131

132+
@Test
133+
public void testRuntimeExceptionOnHeartbeatWrapper() {
134+
final TestWorkflowEnvironment testEnv = TestWorkflowEnvironment.newInstance();
135+
final Worker worker = testEnv.newWorker(TASK_QUEUE);
136+
worker.registerWorkflowImplementationTypes(TestFailingWorkflow.WorkflowImpl.class);
137+
final WorkflowClient client = testEnv.getWorkflowClient();
138+
final AtomicInteger timesReachedEnd = new AtomicInteger(0);
139+
worker.registerActivitiesImplementations(new TestFailingWorkflow.Activity1Impl(timesReachedEnd));
140+
testEnv.start();
141+
142+
final TestFailingWorkflow workflowStub =
143+
client.newWorkflowStub(TestFailingWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
144+
145+
// test runtime first
146+
assertThrows(RuntimeException.class, () -> {
147+
workflowStub.run("runtime");
148+
});
149+
150+
// we should never retry enough to reach the end
151+
assertEquals(0, timesReachedEnd.get());
152+
}
153+
154+
@Test
155+
public void testWorkerExceptionOnHeartbeatWrapper() {
156+
final TestWorkflowEnvironment testEnv = TestWorkflowEnvironment.newInstance();
157+
final Worker worker = testEnv.newWorker(TASK_QUEUE);
158+
worker.registerWorkflowImplementationTypes(TestFailingWorkflow.WorkflowImpl.class);
159+
final WorkflowClient client = testEnv.getWorkflowClient();
160+
final AtomicInteger timesReachedEnd = new AtomicInteger(0);
161+
worker.registerActivitiesImplementations(new TestFailingWorkflow.Activity1Impl(timesReachedEnd));
162+
testEnv.start();
163+
164+
final TestFailingWorkflow workflowStub =
165+
client.newWorkflowStub(TestFailingWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
166+
167+
// throws workerexception wrapped in a WorkflowFailedException
168+
assertThrows(WorkflowFailedException.class, () -> {
169+
workflowStub.run("worker");
170+
});
171+
172+
// we should never retry enough to reach the end
173+
assertEquals(0, timesReachedEnd.get());
174+
}
175+
176+
@Test
177+
public void testHeartbeatFailureOnHeartbeatWrapper() {
178+
final TestWorkflowEnvironment testEnv = TestWorkflowEnvironment.newInstance();
179+
final Worker worker = testEnv.newWorker(TASK_QUEUE);
180+
worker.registerWorkflowImplementationTypes(TestFailingWorkflow.WorkflowImpl.class);
181+
final WorkflowClient client = testEnv.getWorkflowClient();
182+
final AtomicInteger timesReachedEnd = new AtomicInteger(0);
183+
worker.registerActivitiesImplementations(new TestFailingWorkflow.Activity1Impl(timesReachedEnd));
184+
testEnv.start();
185+
186+
final TestFailingWorkflow workflowStub =
187+
client.newWorkflowStub(TestFailingWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
188+
189+
// test timeout on first attempt only
190+
workflowStub.run("timeout");
191+
192+
// we should be able to retry in this case because we only timeout in first run
193+
assertEquals(1, timesReachedEnd.get());
194+
}
195+
128196
@WorkflowInterface
129197
public interface TestWorkflow {
130198

@@ -190,4 +258,80 @@ public void activity() {
190258

191259
}
192260

261+
@WorkflowInterface
262+
public interface TestFailingWorkflow {
263+
264+
@WorkflowMethod
265+
String run(String arg);
266+
267+
class WorkflowImpl implements TestFailingWorkflow {
268+
269+
private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowImpl.class);
270+
271+
final ActivityOptions options = ActivityOptions.newBuilder()
272+
.setScheduleToCloseTimeout(Duration.ofMinutes(30))
273+
.setStartToCloseTimeout(Duration.ofMinutes(30))
274+
.setScheduleToStartTimeout(Duration.ofMinutes(30))
275+
.setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
276+
.setRetryOptions(ActivityConfiguration.ORCHESTRATOR_RETRY)
277+
.setHeartbeatTimeout(Duration.ofSeconds(1))
278+
.build();
279+
280+
private final Activity1 activity1 = Workflow.newActivityStub(Activity1.class, options);
281+
282+
@Override
283+
public String run(final String arg) {
284+
285+
LOGGER.info("workflow before activity 1");
286+
activity1.activity(arg);
287+
LOGGER.info("workflow after all activities");
288+
289+
return "completed";
290+
}
291+
292+
}
293+
294+
@ActivityInterface
295+
interface Activity1 {
296+
297+
@ActivityMethod
298+
void activity(String arg);
299+
300+
}
301+
302+
class Activity1Impl implements Activity1 {
303+
304+
private static final Logger LOGGER = LoggerFactory.getLogger(TestWorkflow.Activity1Impl.class);
305+
private static final String ACTIVITY1 = "activity1";
306+
307+
private final AtomicInteger timesReachedEnd;
308+
309+
public Activity1Impl(final AtomicInteger timesReachedEnd) {
310+
this.timesReachedEnd = timesReachedEnd;
311+
}
312+
313+
public void activity(String arg) {
314+
LOGGER.info("before: {}", ACTIVITY1);
315+
TemporalUtils.withBackgroundHeartbeat(new AtomicReference<>(null), () -> {
316+
if (timesReachedEnd.get() == 0) {
317+
if (arg.equals("runtime")) {
318+
throw new RuntimeException("failed");
319+
} else if (arg.equals("timeout")) {
320+
Thread.sleep(10000);
321+
return null;
322+
} else {
323+
throw new WorkerException("failed");
324+
}
325+
} else {
326+
return null;
327+
}
328+
});
329+
timesReachedEnd.incrementAndGet();
330+
LOGGER.info("before: {}", ACTIVITY1);
331+
}
332+
333+
}
334+
335+
}
336+
193337
}

0 commit comments

Comments
 (0)