Skip to content

Commit 44d747d

Browse files
authored
Revert "Revert "fix: refresh actor configuration and state between sync attempts"" (#22281)
* Revert "Revert "fix: refresh actor configuration and state between sync attempts (#21629)" (#22211)" This reverts commit 7978862. * fmt
1 parent 531627e commit 44d747d

File tree

38 files changed

+902
-142
lines changed

38 files changed

+902
-142
lines changed

airbyte-api/src/main/openapi/config.yaml

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2214,6 +2214,26 @@ paths:
22142214
application/json:
22152215
schema:
22162216
$ref: "#/components/schemas/InternalOperationResult"
2217+
/v1/attempt/save_sync_config:
2218+
post:
2219+
tags:
2220+
- attempt
2221+
- internal
2222+
summary: For worker to save the AttemptSyncConfig for an attempt.
2223+
operationId: saveSyncConfig
2224+
requestBody:
2225+
content:
2226+
application/json:
2227+
schema:
2228+
$ref: "#/components/schemas/SaveAttemptSyncConfigRequestBody"
2229+
required: true
2230+
responses:
2231+
"200":
2232+
description: Successful Operation
2233+
content:
2234+
application/json:
2235+
schema:
2236+
$ref: "#/components/schemas/InternalOperationResult"
22172237

22182238
components:
22192239
securitySchemes:
@@ -5026,6 +5046,31 @@ components:
50265046
type: array
50275047
items:
50285048
$ref: "#/components/schemas/AttemptStreamStats"
5049+
AttemptSyncConfig:
5050+
type: object
5051+
required:
5052+
- sourceConfiguration
5053+
- destinationConfiguration
5054+
properties:
5055+
sourceConfiguration:
5056+
$ref: "#/components/schemas/SourceConfiguration"
5057+
destinationConfiguration:
5058+
$ref: "#/components/schemas/DestinationConfiguration"
5059+
state:
5060+
$ref: "#/components/schemas/ConnectionState"
5061+
SaveAttemptSyncConfigRequestBody:
5062+
type: object
5063+
required:
5064+
- jobId
5065+
- attemptNumber
5066+
- syncConfig
5067+
properties:
5068+
jobId:
5069+
$ref: "#/components/schemas/JobId"
5070+
attemptNumber:
5071+
$ref: "#/components/schemas/AttemptNumber"
5072+
syncConfig:
5073+
$ref: "#/components/schemas/AttemptSyncConfig"
50295074
InternalOperationResult:
50305075
type: object
50315076
required:

airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ class BootloaderTest {
8282

8383
// ⚠️ This line should change with every new migration to show that you meant to make a new
8484
// migration to the prod database
85-
private static final String CURRENT_MIGRATION_VERSION = "0.40.28.001";
85+
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.40.28.001";
86+
private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.40.28.001";
8687

8788
@BeforeEach
8889
void setup() {
@@ -147,10 +148,10 @@ void testBootloaderAppBlankDb() throws Exception {
147148
bootloader.load();
148149

149150
val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
150-
assertEquals("0.40.26.001", jobsMigrator.getLatestMigration().getVersion().getVersion());
151+
assertEquals(CURRENT_JOBS_MIGRATION_VERSION, jobsMigrator.getLatestMigration().getVersion().getVersion());
151152

152153
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
153-
assertEquals(CURRENT_MIGRATION_VERSION, configsMigrator.getLatestMigration().getVersion().getVersion());
154+
assertEquals(CURRENT_CONFIGS_MIGRATION_VERSION, configsMigrator.getLatestMigration().getVersion().getVersion());
154155

155156
assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get());
156157
assertEquals(new Version(PROTOCOL_VERSION_123), jobsPersistence.getAirbyteProtocolVersionMin().get());

airbyte-commons-server/src/main/java/io/airbyte/commons/server/converters/ApiPojoConverters.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@
55
package io.airbyte.commons.server.converters;
66

77
import io.airbyte.api.model.generated.ActorDefinitionResourceRequirements;
8+
import io.airbyte.api.model.generated.AttemptSyncConfig;
89
import io.airbyte.api.model.generated.ConnectionRead;
910
import io.airbyte.api.model.generated.ConnectionSchedule;
1011
import io.airbyte.api.model.generated.ConnectionScheduleData;
1112
import io.airbyte.api.model.generated.ConnectionScheduleDataBasicSchedule;
1213
import io.airbyte.api.model.generated.ConnectionScheduleDataCron;
14+
import io.airbyte.api.model.generated.ConnectionState;
15+
import io.airbyte.api.model.generated.ConnectionStateType;
1316
import io.airbyte.api.model.generated.ConnectionStatus;
1417
import io.airbyte.api.model.generated.Geography;
1518
import io.airbyte.api.model.generated.JobType;
@@ -22,6 +25,12 @@
2225
import io.airbyte.config.BasicSchedule;
2326
import io.airbyte.config.Schedule;
2427
import io.airbyte.config.StandardSync;
28+
import io.airbyte.config.State;
29+
import io.airbyte.config.StateWrapper;
30+
import io.airbyte.config.helpers.StateMessageHelper;
31+
import io.airbyte.workers.helper.StateConverter;
32+
import java.util.Optional;
33+
import java.util.UUID;
2534
import java.util.stream.Collectors;
2635

2736
public class ApiPojoConverters {
@@ -42,6 +51,42 @@ public static io.airbyte.config.ActorDefinitionResourceRequirements actorDefReso
4251
.collect(Collectors.toList()));
4352
}
4453

54+
public static io.airbyte.config.AttemptSyncConfig attemptSyncConfigToInternal(final AttemptSyncConfig attemptSyncConfig) {
55+
if (attemptSyncConfig == null) {
56+
return null;
57+
}
58+
59+
final io.airbyte.config.AttemptSyncConfig internalAttemptSyncConfig = new io.airbyte.config.AttemptSyncConfig()
60+
.withSourceConfiguration(attemptSyncConfig.getSourceConfiguration())
61+
.withDestinationConfiguration(attemptSyncConfig.getDestinationConfiguration());
62+
63+
final ConnectionState connectionState = attemptSyncConfig.getState();
64+
if (connectionState != null && connectionState.getStateType() != ConnectionStateType.NOT_SET) {
65+
final StateWrapper stateWrapper = StateConverter.toInternal(attemptSyncConfig.getState());
66+
final io.airbyte.config.State state = StateMessageHelper.getState(stateWrapper);
67+
internalAttemptSyncConfig.setState(state);
68+
}
69+
70+
return internalAttemptSyncConfig;
71+
}
72+
73+
public static io.airbyte.api.client.model.generated.AttemptSyncConfig attemptSyncConfigToClient(final io.airbyte.config.AttemptSyncConfig attemptSyncConfig,
74+
final UUID connectionId,
75+
final boolean useStreamCapableState) {
76+
if (attemptSyncConfig == null) {
77+
return null;
78+
}
79+
80+
final State state = attemptSyncConfig.getState();
81+
final Optional<StateWrapper> optStateWrapper = state != null ? StateMessageHelper.getTypedState(
82+
state.getState(), useStreamCapableState) : Optional.empty();
83+
84+
return new io.airbyte.api.client.model.generated.AttemptSyncConfig()
85+
.sourceConfiguration(attemptSyncConfig.getSourceConfiguration())
86+
.destinationConfiguration(attemptSyncConfig.getDestinationConfiguration())
87+
.state(StateConverter.toClient(connectionId, optStateWrapper.orElse(null)));
88+
}
89+
4590
public static ActorDefinitionResourceRequirements actorDefResourceReqsToApi(final io.airbyte.config.ActorDefinitionResourceRequirements actorDefResourceReqs) {
4691
if (actorDefResourceReqs == null) {
4792
return null;

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/AttemptHandler.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
package io.airbyte.commons.server.handlers;
66

77
import io.airbyte.api.model.generated.InternalOperationResult;
8+
import io.airbyte.api.model.generated.SaveAttemptSyncConfigRequestBody;
89
import io.airbyte.api.model.generated.SaveStatsRequestBody;
910
import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody;
11+
import io.airbyte.commons.server.converters.ApiPojoConverters;
1012
import io.airbyte.config.StreamSyncStats;
1113
import io.airbyte.config.SyncStats;
1214
import io.airbyte.persistence.job.JobPersistence;
@@ -63,4 +65,17 @@ public InternalOperationResult saveStats(final SaveStatsRequestBody requestBody)
6365
return new InternalOperationResult().succeeded(true);
6466
}
6567

68+
public InternalOperationResult saveSyncConfig(final SaveAttemptSyncConfigRequestBody requestBody) {
69+
try {
70+
jobPersistence.writeAttemptSyncConfig(
71+
requestBody.getJobId(),
72+
requestBody.getAttemptNumber(),
73+
ApiPojoConverters.attemptSyncConfigToInternal(requestBody.getSyncConfig()));
74+
} catch (final IOException ioe) {
75+
LOGGER.error("IOException when saving AttemptSyncConfig for attempt;", ioe);
76+
return new InternalOperationResult().succeeded(false);
77+
}
78+
return new InternalOperationResult().succeeded(true);
79+
}
80+
6681
}

airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/AttemptHandlerTest.java

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,18 @@
1212
import static org.mockito.ArgumentMatchers.anyLong;
1313
import static org.mockito.Mockito.doThrow;
1414

15+
import com.fasterxml.jackson.databind.JsonNode;
16+
import io.airbyte.api.model.generated.AttemptSyncConfig;
17+
import io.airbyte.api.model.generated.ConnectionState;
18+
import io.airbyte.api.model.generated.ConnectionStateType;
19+
import io.airbyte.api.model.generated.GlobalState;
20+
import io.airbyte.api.model.generated.SaveAttemptSyncConfigRequestBody;
1521
import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody;
22+
import io.airbyte.commons.json.Jsons;
23+
import io.airbyte.commons.server.converters.ApiPojoConverters;
1624
import io.airbyte.persistence.job.JobPersistence;
1725
import java.io.IOException;
26+
import java.util.Map;
1827
import java.util.UUID;
1928
import org.junit.jupiter.api.BeforeEach;
2029
import org.junit.jupiter.api.Test;
@@ -26,6 +35,7 @@ class AttemptHandlerTest {
2635
JobPersistence jobPersistence;
2736
AttemptHandler handler;
2837

38+
private static final UUID CONNECTION_ID = UUID.randomUUID();
2939
private static final long JOB_ID = 10002L;
3040
private static final int ATTEMPT_NUMBER = 1;
3141

@@ -39,14 +49,14 @@ public void init() {
3949

4050
@Test
4151
void testInternalWorkerHandlerSetsTemporalWorkflowId() throws Exception {
42-
String workflowId = UUID.randomUUID().toString();
52+
final String workflowId = UUID.randomUUID().toString();
4353

4454
final ArgumentCaptor<Integer> attemptNumberCapture = ArgumentCaptor.forClass(Integer.class);
4555
final ArgumentCaptor<Long> jobIdCapture = ArgumentCaptor.forClass(Long.class);
4656
final ArgumentCaptor<String> workflowIdCapture = ArgumentCaptor.forClass(String.class);
4757
final ArgumentCaptor<String> queueCapture = ArgumentCaptor.forClass(String.class);
4858

49-
SetWorkflowInAttemptRequestBody requestBody =
59+
final SetWorkflowInAttemptRequestBody requestBody =
5060
new SetWorkflowInAttemptRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).workflowId(workflowId)
5161
.processingTaskQueue(PROCESSING_TASK_QUEUE);
5262

@@ -63,7 +73,7 @@ void testInternalWorkerHandlerSetsTemporalWorkflowId() throws Exception {
6373

6474
@Test
6575
void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception {
66-
String workflowId = UUID.randomUUID().toString();
76+
final String workflowId = UUID.randomUUID().toString();
6777

6878
doThrow(IOException.class).when(jobPersistence).setAttemptTemporalWorkflowInfo(anyLong(), anyInt(),
6979
any(), any());
@@ -73,7 +83,7 @@ void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception {
7383
final ArgumentCaptor<String> workflowIdCapture = ArgumentCaptor.forClass(String.class);
7484
final ArgumentCaptor<String> queueCapture = ArgumentCaptor.forClass(String.class);
7585

76-
SetWorkflowInAttemptRequestBody requestBody =
86+
final SetWorkflowInAttemptRequestBody requestBody =
7787
new SetWorkflowInAttemptRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).workflowId(workflowId)
7888
.processingTaskQueue(PROCESSING_TASK_QUEUE);
7989

@@ -88,4 +98,38 @@ void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception {
8898
assertEquals(PROCESSING_TASK_QUEUE, queueCapture.getValue());
8999
}
90100

101+
@Test
102+
void testInternalHandlerSetsAttemptSyncConfig() throws Exception {
103+
final ArgumentCaptor<Integer> attemptNumberCapture = ArgumentCaptor.forClass(Integer.class);
104+
final ArgumentCaptor<Long> jobIdCapture = ArgumentCaptor.forClass(Long.class);
105+
final ArgumentCaptor<io.airbyte.config.AttemptSyncConfig> attemptSyncConfigCapture =
106+
ArgumentCaptor.forClass(io.airbyte.config.AttemptSyncConfig.class);
107+
108+
final JsonNode sourceConfig = Jsons.jsonNode(Map.of("source_key", "source_val"));
109+
final JsonNode destinationConfig = Jsons.jsonNode(Map.of("destination_key", "destination_val"));
110+
final ConnectionState state = new ConnectionState()
111+
.connectionId(CONNECTION_ID)
112+
.stateType(ConnectionStateType.GLOBAL)
113+
.streamState(null)
114+
.globalState(new GlobalState().sharedState(Jsons.jsonNode(Map.of("state_key", "state_val"))));
115+
116+
final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig()
117+
.destinationConfiguration(destinationConfig)
118+
.sourceConfiguration(sourceConfig)
119+
.state(state);
120+
121+
final SaveAttemptSyncConfigRequestBody requestBody =
122+
new SaveAttemptSyncConfigRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).syncConfig(attemptSyncConfig);
123+
124+
assertTrue(handler.saveSyncConfig(requestBody).getSucceeded());
125+
126+
Mockito.verify(jobPersistence).writeAttemptSyncConfig(jobIdCapture.capture(), attemptNumberCapture.capture(), attemptSyncConfigCapture.capture());
127+
128+
final io.airbyte.config.AttemptSyncConfig expectedAttemptSyncConfig = ApiPojoConverters.attemptSyncConfigToInternal(attemptSyncConfig);
129+
130+
assertEquals(ATTEMPT_NUMBER, attemptNumberCapture.getValue());
131+
assertEquals(JOB_ID, jobIdCapture.getValue());
132+
assertEquals(expectedAttemptSyncConfig, attemptSyncConfigCapture.getValue());
133+
}
134+
91135
}

airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobHistoryHandlerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ private static AttemptRead toAttemptRead(final Attempt a) {
157157
}
158158

159159
private static Attempt createAttempt(final long jobId, final long timestamps, final AttemptStatus status) {
160-
return new Attempt(ATTEMPT_NUMBER, jobId, LOG_PATH, null, status, null, null, timestamps, timestamps, timestamps);
160+
return new Attempt(ATTEMPT_NUMBER, jobId, LOG_PATH, null, null, status, null, null, timestamps, timestamps, timestamps);
161161
}
162162

163163
@BeforeEach

airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.airbyte.commons.temporal.scheduling.SpecWorkflow;
1717
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
1818
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
19+
import io.airbyte.config.AttemptSyncConfig;
1920
import io.airbyte.config.ConnectorJobOutput;
2021
import io.airbyte.config.JobCheckConnectionConfig;
2122
import io.airbyte.config.JobDiscoverCatalogConfig;
@@ -374,7 +375,11 @@ public TemporalResponse<ConnectorJobOutput> submitDiscoverSchema(final UUID jobI
374375
() -> getWorkflowStubWithTaskQueue(DiscoverCatalogWorkflow.class, taskQueue).run(jobRunConfig, launcherConfig, input));
375376
}
376377

377-
public TemporalResponse<StandardSyncOutput> submitSync(final long jobId, final int attempt, final JobSyncConfig config, final UUID connectionId) {
378+
public TemporalResponse<StandardSyncOutput> submitSync(final long jobId,
379+
final int attempt,
380+
final JobSyncConfig config,
381+
final AttemptSyncConfig attemptConfig,
382+
final UUID connectionId) {
378383
final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt);
379384

380385
final IntegrationLauncherConfig sourceLauncherConfig = new IntegrationLauncherConfig()
@@ -395,11 +400,11 @@ public TemporalResponse<StandardSyncOutput> submitSync(final long jobId, final i
395400
.withNamespaceDefinition(config.getNamespaceDefinition())
396401
.withNamespaceFormat(config.getNamespaceFormat())
397402
.withPrefix(config.getPrefix())
398-
.withSourceConfiguration(config.getSourceConfiguration())
399-
.withDestinationConfiguration(config.getDestinationConfiguration())
403+
.withSourceConfiguration(attemptConfig.getSourceConfiguration())
404+
.withDestinationConfiguration(attemptConfig.getDestinationConfiguration())
400405
.withOperationSequence(config.getOperationSequence())
401406
.withCatalog(config.getConfiguredAirbyteCatalog())
402-
.withState(config.getState())
407+
.withState(attemptConfig.getState())
403408
.withResourceRequirements(config.getResourceRequirements())
404409
.withSourceResourceRequirements(config.getSourceResourceRequirements())
405410
.withDestinationResourceRequirements(config.getDestinationResourceRequirements())

airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.airbyte.commons.temporal.scheduling.SpecWorkflow;
3232
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
3333
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
34+
import io.airbyte.config.AttemptSyncConfig;
3435
import io.airbyte.config.ConnectorJobOutput;
3536
import io.airbyte.config.FailureReason;
3637
import io.airbyte.config.JobCheckConnectionConfig;
@@ -274,26 +275,27 @@ void testSubmitSync() {
274275
final JobSyncConfig syncConfig = new JobSyncConfig()
275276
.withSourceDockerImage(IMAGE_NAME1)
276277
.withDestinationDockerImage(IMAGE_NAME2)
277-
.withSourceConfiguration(Jsons.emptyObject())
278-
.withDestinationConfiguration(Jsons.emptyObject())
279278
.withOperationSequence(List.of())
280279
.withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog());
280+
final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig()
281+
.withSourceConfiguration(Jsons.emptyObject())
282+
.withDestinationConfiguration(Jsons.emptyObject());
281283
final StandardSyncInput input = new StandardSyncInput()
282284
.withNamespaceDefinition(syncConfig.getNamespaceDefinition())
283285
.withNamespaceFormat(syncConfig.getNamespaceFormat())
284286
.withPrefix(syncConfig.getPrefix())
285-
.withSourceConfiguration(syncConfig.getSourceConfiguration())
286-
.withDestinationConfiguration(syncConfig.getDestinationConfiguration())
287+
.withSourceConfiguration(attemptSyncConfig.getSourceConfiguration())
288+
.withDestinationConfiguration(attemptSyncConfig.getDestinationConfiguration())
287289
.withOperationSequence(syncConfig.getOperationSequence())
288290
.withCatalog(syncConfig.getConfiguredAirbyteCatalog())
289-
.withState(syncConfig.getState());
291+
.withState(attemptSyncConfig.getState());
290292

291293
final IntegrationLauncherConfig destinationLauncherConfig = new IntegrationLauncherConfig()
292294
.withJobId(String.valueOf(JOB_ID))
293295
.withAttemptId((long) ATTEMPT_ID)
294296
.withDockerImage(IMAGE_NAME2);
295297

296-
temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID);
298+
temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, attemptSyncConfig, CONNECTION_ID);
297299
discoverCatalogWorkflow.run(JOB_RUN_CONFIG, LAUNCHER_CONFIG, destinationLauncherConfig, input, CONNECTION_ID);
298300
verify(workflowClient).newWorkflowStub(SyncWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.SYNC));
299301
}
@@ -343,15 +345,17 @@ void testforceCancelConnection() {
343345
doReturn(true).when(temporalClient).isWorkflowReachable(any(UUID.class));
344346
when(workflowClient.newWorkflowStub(any(Class.class), anyString())).thenReturn(mConnectionManagerWorkflow);
345347

348+
final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig()
349+
.withSourceConfiguration(Jsons.emptyObject())
350+
.withDestinationConfiguration(Jsons.emptyObject());
351+
346352
final JobSyncConfig syncConfig = new JobSyncConfig()
347353
.withSourceDockerImage(IMAGE_NAME1)
348354
.withDestinationDockerImage(IMAGE_NAME2)
349-
.withSourceConfiguration(Jsons.emptyObject())
350-
.withDestinationConfiguration(Jsons.emptyObject())
351355
.withOperationSequence(List.of())
352356
.withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog());
353357

354-
temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID);
358+
temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, attemptSyncConfig, CONNECTION_ID);
355359
temporalClient.forceDeleteWorkflow(CONNECTION_ID);
356360

357361
verify(connectionManagerUtils).deleteWorkflowIfItExist(workflowClient, CONNECTION_ID);

0 commit comments

Comments
 (0)