Skip to content

Commit 5b95bd5

Browse files
gosusnpakashkulk
authored andcommitted
Fix flaky tests (#19459)
* Expose WorkflowState in JobsDebugInfo * Make attribute required * Update the tests * Protect more tests
1 parent 605ee16 commit 5b95bd5

File tree

9 files changed

+117
-6
lines changed

9 files changed

+117
-6
lines changed

airbyte-api/src/main/openapi/config.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4019,6 +4019,13 @@ components:
40194019
$ref: "#/components/schemas/SourceDefinitionRead"
40204020
destinationDefinition:
40214021
$ref: "#/components/schemas/DestinationDefinitionRead"
4022+
WorkflowStateRead:
4023+
type: object
4024+
required:
4025+
- running
4026+
properties:
4027+
running:
4028+
type: boolean
40224029
JobWithAttemptsRead:
40234030
type: object
40244031
properties:
@@ -4210,6 +4217,8 @@ components:
42104217
type: array
42114218
items:
42124219
$ref: "#/components/schemas/AttemptInfoRead"
4220+
workflowState:
4221+
$ref: "#/components/schemas/WorkflowStateRead"
42134222
AttemptInfoRead:
42144223
type: object
42154224
required:

airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/ConnectionManagerUtils.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,16 +213,21 @@ public ConnectionManagerWorkflow getConnectionManagerWorkflow(final WorkflowClie
213213
return connectionManagerWorkflow;
214214
}
215215

216-
boolean isWorkflowStateRunning(final WorkflowClient client, final UUID connectionId) {
216+
Optional<WorkflowState> getWorkflowState(final WorkflowClient client, final UUID connectionId) {
217217
try {
218218
final ConnectionManagerWorkflow connectionManagerWorkflow = client.newWorkflowStub(ConnectionManagerWorkflow.class,
219219
getConnectionManagerName(connectionId));
220-
return connectionManagerWorkflow.getState().isRunning();
220+
return Optional.of(connectionManagerWorkflow.getState());
221221
} catch (final Exception e) {
222-
return false;
222+
log.error("Exception thrown while checking workflow state for connection id {}", connectionId, e);
223+
return Optional.empty();
223224
}
224225
}
225226

227+
boolean isWorkflowStateRunning(final WorkflowClient client, final UUID connectionId) {
228+
return getWorkflowState(client, connectionId).map(WorkflowState::isRunning).orElse(false);
229+
}
230+
226231
public WorkflowExecutionStatus getConnectionManagerWorkflowStatus(final WorkflowClient workflowClient, final UUID connectionId) {
227232
final DescribeWorkflowExecutionRequest describeWorkflowExecutionRequest = DescribeWorkflowExecutionRequest.newBuilder()
228233
.setExecution(WorkflowExecution.newBuilder()

airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.airbyte.commons.temporal.scheduling.DiscoverCatalogWorkflow;
1717
import io.airbyte.commons.temporal.scheduling.SpecWorkflow;
1818
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
19+
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
1920
import io.airbyte.config.ConnectorJobOutput;
2021
import io.airbyte.config.JobCheckConnectionConfig;
2122
import io.airbyte.config.JobDiscoverCatalogConfig;
@@ -191,6 +192,10 @@ public static class ManualOperationResult {
191192

192193
}
193194

195+
public Optional<WorkflowState> getWorkflowState(final UUID connectionId) {
196+
return connectionManagerUtils.getWorkflowState(client, connectionId);
197+
}
198+
194199
public ManualOperationResult startNewManualSync(final UUID connectionId) {
195200
log.info("Manual sync request");
196201

airbyte-server/src/main/java/io/airbyte/server/ServerApp.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
315315
sourceDefinitionsHandler,
316316
destinationHandler,
317317
destinationDefinitionsHandler,
318-
configs.getAirbyteVersion());
318+
configs.getAirbyteVersion(),
319+
temporalClient);
319320

320321
final LogsHandler logsHandler = new LogsHandler(configs);
321322

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.server.converters;
6+
7+
import io.airbyte.api.model.generated.WorkflowStateRead;
8+
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
9+
10+
public class WorkflowStateConverter {
11+
12+
public WorkflowStateRead getWorkflowStateRead(final WorkflowState workflowState) {
13+
return new WorkflowStateRead().running(workflowState.isRunning());
14+
}
15+
16+
}

airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.airbyte.api.model.generated.SourceIdRequestBody;
2626
import io.airbyte.api.model.generated.SourceRead;
2727
import io.airbyte.commons.enums.Enums;
28+
import io.airbyte.commons.temporal.TemporalClient;
2829
import io.airbyte.commons.version.AirbyteVersion;
2930
import io.airbyte.config.Configs.WorkerEnvironment;
3031
import io.airbyte.config.JobConfig;
@@ -35,6 +36,7 @@
3536
import io.airbyte.persistence.job.models.Job;
3637
import io.airbyte.persistence.job.models.JobStatus;
3738
import io.airbyte.server.converters.JobConverter;
39+
import io.airbyte.server.converters.WorkflowStateConverter;
3840
import io.airbyte.validation.json.JsonValidationException;
3941
import java.io.IOException;
4042
import java.util.Collections;
@@ -54,7 +56,9 @@ public class JobHistoryHandler {
5456
public static final int DEFAULT_PAGE_SIZE = 200;
5557
private final JobPersistence jobPersistence;
5658
private final JobConverter jobConverter;
59+
private final WorkflowStateConverter workflowStateConverter;
5760
private final AirbyteVersion airbyteVersion;
61+
private final TemporalClient temporalClient;
5862

5963
public JobHistoryHandler(final JobPersistence jobPersistence,
6064
final WorkerEnvironment workerEnvironment,
@@ -64,15 +68,32 @@ public JobHistoryHandler(final JobPersistence jobPersistence,
6468
final SourceDefinitionsHandler sourceDefinitionsHandler,
6569
final DestinationHandler destinationHandler,
6670
final DestinationDefinitionsHandler destinationDefinitionsHandler,
67-
final AirbyteVersion airbyteVersion) {
71+
final AirbyteVersion airbyteVersion,
72+
final TemporalClient temporalClient) {
6873
jobConverter = new JobConverter(workerEnvironment, logConfigs);
74+
workflowStateConverter = new WorkflowStateConverter();
6975
this.jobPersistence = jobPersistence;
7076
this.connectionsHandler = connectionsHandler;
7177
this.sourceHandler = sourceHandler;
7278
this.sourceDefinitionsHandler = sourceDefinitionsHandler;
7379
this.destinationHandler = destinationHandler;
7480
this.destinationDefinitionsHandler = destinationDefinitionsHandler;
7581
this.airbyteVersion = airbyteVersion;
82+
this.temporalClient = temporalClient;
83+
}
84+
85+
@Deprecated(forRemoval = true)
86+
public JobHistoryHandler(final JobPersistence jobPersistence,
87+
final WorkerEnvironment workerEnvironment,
88+
final LogConfigs logConfigs,
89+
final ConnectionsHandler connectionsHandler,
90+
final SourceHandler sourceHandler,
91+
final SourceDefinitionsHandler sourceDefinitionsHandler,
92+
final DestinationHandler destinationHandler,
93+
final DestinationDefinitionsHandler destinationDefinitionsHandler,
94+
final AirbyteVersion airbyteVersion) {
95+
this(jobPersistence, workerEnvironment, logConfigs, connectionsHandler, sourceHandler, sourceDefinitionsHandler, destinationHandler,
96+
destinationDefinitionsHandler, airbyteVersion, null);
7697
}
7798

7899
@SuppressWarnings("UnstableApiUsage")
@@ -122,7 +143,15 @@ public JobDebugInfoRead getJobDebugInfo(final JobIdRequestBody jobIdRequestBody)
122143
final Job job = jobPersistence.getJob(jobIdRequestBody.getId());
123144
final JobInfoRead jobinfoRead = jobConverter.getJobInfoRead(job);
124145

125-
return buildJobDebugInfoRead(jobinfoRead);
146+
final JobDebugInfoRead jobDebugInfoRead = buildJobDebugInfoRead(jobinfoRead);
147+
if (temporalClient != null) {
148+
final UUID connectionId = UUID.fromString(job.getScope());
149+
temporalClient.getWorkflowState(connectionId)
150+
.map(workflowStateConverter::getWorkflowStateRead)
151+
.ifPresent(jobDebugInfoRead::setWorkflowState);
152+
}
153+
154+
return jobDebugInfoRead;
126155
}
127156

128157
public Optional<JobRead> getLatestRunningSyncJob(final UUID connectionId) throws IOException {

airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.airbyte.api.client.model.generated.DestinationRead;
3434
import io.airbyte.api.client.model.generated.DestinationSyncMode;
3535
import io.airbyte.api.client.model.generated.JobConfigType;
36+
import io.airbyte.api.client.model.generated.JobDebugInfoRead;
3637
import io.airbyte.api.client.model.generated.JobIdRequestBody;
3738
import io.airbyte.api.client.model.generated.JobListRequestBody;
3839
import io.airbyte.api.client.model.generated.JobRead;
@@ -794,6 +795,23 @@ public static JobRead waitWhileJobHasStatus(final JobsApi jobsApi,
794795
return job;
795796
}
796797

798+
@SuppressWarnings("BusyWait")
799+
public static void waitWhileJobIsRunning(final JobsApi jobsApi, final JobRead job, final Duration maxWaitTime)
800+
throws ApiException, InterruptedException {
801+
final Instant waitStart = Instant.now();
802+
JobDebugInfoRead jobDebugInfoRead = jobsApi.getJobDebugInfo(new JobIdRequestBody().id(job.getId()));
803+
LOGGER.info("workflow state: {}", jobDebugInfoRead.getWorkflowState());
804+
while (jobDebugInfoRead.getWorkflowState() != null && jobDebugInfoRead.getWorkflowState().getRunning()) {
805+
if (Duration.between(waitStart, Instant.now()).compareTo(maxWaitTime) > 0) {
806+
LOGGER.info("Max wait time of {} has been reached. Stopping wait.", maxWaitTime);
807+
break;
808+
}
809+
LOGGER.info("waiting: job id: {}, workflowState.isRunning is still true", job.getId());
810+
sleep(1000);
811+
jobDebugInfoRead = jobsApi.getJobDebugInfo(new JobIdRequestBody().id(job.getId()));
812+
}
813+
}
814+
797815
@SuppressWarnings("BusyWait")
798816
public static ConnectionState waitForConnectionState(final AirbyteApiClient apiClient, final UUID connectionId)
799817
throws ApiException, InterruptedException {

airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.STREAM_NAME;
1414
import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.waitForSuccessfulJob;
1515
import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.waitWhileJobHasStatus;
16+
import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.waitWhileJobIsRunning;
1617
import static java.lang.Thread.sleep;
1718
import static org.junit.jupiter.api.Assertions.assertEquals;
1819
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -641,6 +642,11 @@ void testIncrementalSync() throws Exception {
641642
final JobInfoRead jobInfoRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId));
642643
waitWhileJobHasStatus(apiClient.getJobsApi(), jobInfoRead.getJob(),
643644
Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING, JobStatus.INCOMPLETE, JobStatus.FAILED));
645+
// This is a band-aid to prevent some race conditions where the job status was updated but we may
646+
// still be cleaning up some data in the reset table. This would be an argument for reworking the
647+
// source of truth of the replication workflow state to be in DB rather than in Memory and
648+
// serialized automagically by temporal
649+
waitWhileJobIsRunning(apiClient.getJobsApi(), jobInfoRead.getJob(), Duration.ofMinutes(1));
644650

645651
LOGGER.info("state after reset: {}", apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
646652

@@ -933,6 +939,11 @@ void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws Except
933939
final JobInfoRead jobInfoRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId));
934940
waitWhileJobHasStatus(apiClient.getJobsApi(), jobInfoRead.getJob(),
935941
Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING, JobStatus.INCOMPLETE, JobStatus.FAILED));
942+
// This is a band-aid to prevent some race conditions where the job status was updated but we may
943+
// still be cleaning up some data in the reset table. This would be an argument for reworking the
944+
// source of truth of the replication workflow state to be in DB rather than in Memory and
945+
// serialized automagically by temporal
946+
waitWhileJobIsRunning(apiClient.getJobsApi(), jobInfoRead.getJob(), Duration.ofMinutes(1));
936947

937948
LOGGER.info("state after reset: {}", apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
938949

@@ -1238,6 +1249,11 @@ void testIncrementalSyncMultipleStreams() throws Exception {
12381249
final JobInfoRead jobInfoRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId));
12391250
waitWhileJobHasStatus(apiClient.getJobsApi(), jobInfoRead.getJob(),
12401251
Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING, JobStatus.INCOMPLETE, JobStatus.FAILED));
1252+
// This is a band-aid to prevent some race conditions where the job status was updated but we may
1253+
// still be cleaning up some data in the reset table. This would be an argument for reworking the
1254+
// source of truth of the replication workflow state to be in DB rather than in Memory and
1255+
// serialized automagically by temporal
1256+
waitWhileJobIsRunning(apiClient.getJobsApi(), jobInfoRead.getJob(), Duration.ofMinutes(1));
12411257

12421258
LOGGER.info("state after reset: {}", apiClient.getStateApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
12431259

docs/reference/api/generated-api-html/index.html

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4612,6 +4612,9 @@ <h3 class="field-label">Return type</h3>
46124612
<h3 class="field-label">Example data</h3>
46134613
<div class="example-data-content-type">Content-Type: application/json</div>
46144614
<pre class="example"><code>{
4615+
"workflowState" : {
4616+
"running" : true
4617+
},
46154618
"job" : {
46164619
"configId" : "configId",
46174620
"sourceDefinition" : {
@@ -10516,6 +10519,7 @@ <h3>Table of Contents</h3>
1051610519
<li><a href="#WebBackendWorkspaceStateResult"><code>WebBackendWorkspaceStateResult</code> - </a></li>
1051710520
<li><a href="#WebhookConfigRead"><code>WebhookConfigRead</code> - </a></li>
1051810521
<li><a href="#WebhookConfigWrite"><code>WebhookConfigWrite</code> - </a></li>
10522+
<li><a href="#WorkflowStateRead"><code>WorkflowStateRead</code> - </a></li>
1051910523
<li><a href="#WorkspaceCreate"><code>WorkspaceCreate</code> - </a></li>
1052010524
<li><a href="#WorkspaceGiveFeedback"><code>WorkspaceGiveFeedback</code> - </a></li>
1052110525
<li><a href="#WorkspaceIdRequestBody"><code>WorkspaceIdRequestBody</code> - </a></li>
@@ -11289,6 +11293,7 @@ <h3><a name="JobDebugInfoRead"><code>JobDebugInfoRead</code> - </a> <a class="up
1128911293
<div class="field-items">
1129011294
<div class="param">job </div><div class="param-desc"><span class="param-type"><a href="#JobDebugRead">JobDebugRead</a></span> </div>
1129111295
<div class="param">attempts </div><div class="param-desc"><span class="param-type"><a href="#AttemptInfoRead">array[AttemptInfoRead]</a></span> </div>
11296+
<div class="param">workflowState (optional)</div><div class="param-desc"><span class="param-type"><a href="#WorkflowStateRead">WorkflowStateRead</a></span> </div>
1129211297
</div> <!-- field-items -->
1129311298
</div>
1129411299
<div class="model">
@@ -12103,6 +12108,13 @@ <h3><a name="WebhookConfigWrite"><code>WebhookConfigWrite</code> - </a> <a class
1210312108
<div class="param">validationUrl (optional)</div><div class="param-desc"><span class="param-type"><a href="#string">String</a></span> if supplied, the webhook config will be validated by checking that this URL returns a 2xx response. </div>
1210412109
</div> <!-- field-items -->
1210512110
</div>
12111+
<div class="model">
12112+
<h3><a name="WorkflowStateRead"><code>WorkflowStateRead</code> - </a> <a class="up" href="#__Models">Up</a></h3>
12113+
<div class='model-description'></div>
12114+
<div class="field-items">
12115+
<div class="param">running </div><div class="param-desc"><span class="param-type"><a href="#boolean">Boolean</a></span> </div>
12116+
</div> <!-- field-items -->
12117+
</div>
1210612118
<div class="model">
1210712119
<h3><a name="WorkspaceCreate"><code>WorkspaceCreate</code> - </a> <a class="up" href="#__Models">Up</a></h3>
1210812120
<div class='model-description'></div>

0 commit comments

Comments
 (0)