Skip to content

Commit 2378b87

Browse files
authored
add streams to reset to job info (#13919)
1 parent 5689483 commit 2378b87

File tree

7 files changed

+160
-71
lines changed

7 files changed

+160
-71
lines changed

airbyte-api/src/main/openapi/config.yaml

+7-1
Original file line numberDiff line numberDiff line change
@@ -3729,7 +3729,13 @@ components:
37293729
format: int64
37303730
status:
37313731
$ref: "#/components/schemas/JobStatus"
3732-
streams:
3732+
resetConfig:
3733+
$ref: "#/components/schemas/ResetConfig"
3734+
ResetConfig:
3735+
type: object
3736+
description: contains information about how a reset was configured. only populated if the job was a reset.
3737+
properties:
3738+
streamsToReset:
37333739
type: array
37343740
items:
37353741
$ref: "#/components/schemas/StreamDescriptor"

airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public static AirbyteCatalog configuredCatalogToCatalog(final ConfiguredAirbyteC
8080
configuredCatalog.getStreams()
8181
.stream()
8282
.map(ConfiguredAirbyteStream::getStream)
83-
.collect(Collectors.toList()));
83+
.toList());
8484
}
8585

8686
/**
@@ -122,7 +122,7 @@ public static List<StreamDescriptor> extractStreamDescriptors(final ConfiguredAi
122122
public static List<StreamDescriptor> extractStreamDescriptors(final AirbyteCatalog catalog) {
123123
return catalog.getStreams()
124124
.stream()
125-
.map(abStream -> new StreamDescriptor().withName(abStream.getName()).withNamespace(abStream.getNamespace()))
125+
.map(CatalogHelpers::extractDescriptor)
126126
.toList();
127127
}
128128

@@ -138,7 +138,7 @@ public static ConfiguredAirbyteCatalog toDefaultConfiguredCatalog(final AirbyteC
138138
.withStreams(catalog.getStreams()
139139
.stream()
140140
.map(CatalogHelpers::toDefaultConfiguredStream)
141-
.collect(Collectors.toList()));
141+
.toList());
142142
}
143143

144144
public static ConfiguredAirbyteStream toDefaultConfiguredStream(final AirbyteStream stream) {

airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java

+29-8
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
import io.airbyte.api.model.generated.JobStatus;
2222
import io.airbyte.api.model.generated.JobWithAttemptsRead;
2323
import io.airbyte.api.model.generated.LogRead;
24+
import io.airbyte.api.model.generated.ResetConfig;
2425
import io.airbyte.api.model.generated.SourceDefinitionRead;
2526
import io.airbyte.api.model.generated.SynchronousJobRead;
2627
import io.airbyte.commons.enums.Enums;
2728
import io.airbyte.commons.version.AirbyteVersion;
2829
import io.airbyte.config.Configs.WorkerEnvironment;
30+
import io.airbyte.config.JobConfig.ConfigType;
2931
import io.airbyte.config.JobOutput;
3032
import io.airbyte.config.StandardSyncOutput;
3133
import io.airbyte.config.StandardSyncSummary;
@@ -41,12 +43,11 @@
4143
import java.nio.file.Path;
4244
import java.util.Collections;
4345
import java.util.List;
46+
import java.util.Optional;
4447
import java.util.stream.Collectors;
4548

4649
public class JobConverter {
4750

48-
private static final int LOG_TAIL_SIZE = 1000000;
49-
5051
private final WorkerEnvironment workerEnvironment;
5152
private final LogConfigs logConfigs;
5253

@@ -58,13 +59,13 @@ public JobConverter(final WorkerEnvironment workerEnvironment, final LogConfigs
5859
public JobInfoRead getJobInfoRead(final Job job) {
5960
return new JobInfoRead()
6061
.job(getJobWithAttemptsRead(job).getJob())
61-
.attempts(job.getAttempts().stream().map(attempt -> getAttemptInfoRead(attempt)).collect(Collectors.toList()));
62+
.attempts(job.getAttempts().stream().map(this::getAttemptInfoRead).collect(Collectors.toList()));
6263
}
6364

64-
public JobDebugRead getDebugJobInfoRead(final JobInfoRead jobInfoRead,
65-
final SourceDefinitionRead sourceDefinitionRead,
66-
final DestinationDefinitionRead destinationDefinitionRead,
67-
final AirbyteVersion airbyteVersion) {
65+
public static JobDebugRead getDebugJobInfoRead(final JobInfoRead jobInfoRead,
66+
final SourceDefinitionRead sourceDefinitionRead,
67+
final DestinationDefinitionRead destinationDefinitionRead,
68+
final AirbyteVersion airbyteVersion) {
6869
return new JobDebugRead()
6970
.id(jobInfoRead.getJob().getId())
7071
.configId(jobInfoRead.getJob().getConfigId())
@@ -84,10 +85,30 @@ public static JobWithAttemptsRead getJobWithAttemptsRead(final Job job) {
8485
.id(job.getId())
8586
.configId(configId)
8687
.configType(configType)
88+
.resetConfig(extractResetConfigIfReset(job).orElse(null))
8789
.createdAt(job.getCreatedAtInSecond())
8890
.updatedAt(job.getUpdatedAtInSecond())
8991
.status(Enums.convertTo(job.getStatus(), JobStatus.class)))
90-
.attempts(job.getAttempts().stream().map(attempt -> getAttemptRead(attempt)).collect(Collectors.toList()));
92+
.attempts(job.getAttempts().stream().map(JobConverter::getAttemptRead).toList());
93+
}
94+
95+
/**
96+
* If the job is of type RESET, extracts the part of the reset config that we expose in the API.
97+
* Otherwise, returns empty optional.
98+
*
99+
* @param job - job
100+
* @return api representation of reset config
101+
*/
102+
private static Optional<ResetConfig> extractResetConfigIfReset(final Job job) {
103+
if (job.getConfigType() == ConfigType.RESET_CONNECTION) {
104+
return Optional.ofNullable(
105+
new ResetConfig().streamsToReset(job.getConfig().getResetConnection().getResetSourceConfiguration().getStreamsToReset()
106+
.stream()
107+
.map(ProtocolConverters::streamDescriptorToApi)
108+
.toList()));
109+
} else {
110+
return Optional.empty();
111+
}
91112
}
92113

93114
public AttemptInfoRead getAttemptInfoRead(final Attempt attempt) {

airbyte-server/src/main/java/io/airbyte/server/converters/ProtocolConverters.java

+4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111
*/
1212
public class ProtocolConverters {
1313

14+
public static StreamDescriptor streamDescriptorToApi(final io.airbyte.config.StreamDescriptor protocolStreamDescriptor) {
15+
return new StreamDescriptor().name(protocolStreamDescriptor.getName()).namespace(protocolStreamDescriptor.getNamespace());
16+
}
17+
1418
public static StreamDescriptor streamDescriptorToApi(final io.airbyte.protocol.models.StreamDescriptor protocolStreamDescriptor) {
1519
return new StreamDescriptor().name(protocolStreamDescriptor.getName()).namespace(protocolStreamDescriptor.getNamespace());
1620
}

airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private JobDebugInfoRead buildJobDebugInfoRead(final JobInfoRead jobInfoRead)
137137
final DestinationRead destination = getDestinationRead(connection);
138138
final SourceDefinitionRead sourceDefinitionRead = getSourceDefinitionRead(source);
139139
final DestinationDefinitionRead destinationDefinitionRead = getDestinationDefinitionRead(destination);
140-
final JobDebugRead jobDebugRead = jobConverter.getDebugJobInfoRead(jobInfoRead, sourceDefinitionRead, destinationDefinitionRead, airbyteVersion);
140+
final JobDebugRead jobDebugRead = JobConverter.getDebugJobInfoRead(jobInfoRead, sourceDefinitionRead, destinationDefinitionRead, airbyteVersion);
141141

142142
return new JobDebugInfoRead()
143143
.attempts(jobInfoRead.getAttempts())

airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java

+53-15
Original file line numberDiff line numberDiff line change
@@ -25,28 +25,38 @@
2525
import io.airbyte.api.model.generated.JobRead;
2626
import io.airbyte.api.model.generated.JobWithAttemptsRead;
2727
import io.airbyte.api.model.generated.LogRead;
28+
import io.airbyte.api.model.generated.ResetConfig;
2829
import io.airbyte.api.model.generated.SourceDefinitionRead;
30+
import io.airbyte.api.model.generated.StreamDescriptor;
2931
import io.airbyte.commons.enums.Enums;
3032
import io.airbyte.commons.version.AirbyteVersion;
3133
import io.airbyte.config.Configs.WorkerEnvironment;
3234
import io.airbyte.config.FailureReason;
3335
import io.airbyte.config.FailureReason.FailureOrigin;
3436
import io.airbyte.config.FailureReason.FailureType;
35-
import io.airbyte.config.JobCheckConnectionConfig;
3637
import io.airbyte.config.JobConfig;
38+
import io.airbyte.config.JobConfig.ConfigType;
3739
import io.airbyte.config.JobOutput;
3840
import io.airbyte.config.JobOutput.OutputType;
41+
import io.airbyte.config.JobResetConnectionConfig;
42+
import io.airbyte.config.JobSyncConfig;
43+
import io.airbyte.config.ResetSourceConfiguration;
3944
import io.airbyte.config.StandardSyncOutput;
4045
import io.airbyte.config.StandardSyncSummary;
4146
import io.airbyte.config.StreamSyncStats;
4247
import io.airbyte.config.SyncStats;
4348
import io.airbyte.config.helpers.LogConfigs;
49+
import io.airbyte.protocol.models.AirbyteStream;
50+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
51+
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
4452
import io.airbyte.scheduler.models.Attempt;
4553
import io.airbyte.scheduler.models.AttemptStatus;
4654
import io.airbyte.scheduler.models.Job;
4755
import io.airbyte.scheduler.models.JobStatus;
4856
import java.nio.file.Path;
4957
import java.util.ArrayList;
58+
import java.util.Collections;
59+
import java.util.List;
5060
import java.util.Optional;
5161
import java.util.UUID;
5262
import java.util.stream.Collectors;
@@ -60,10 +70,7 @@ class JobConverterTest {
6070
private static final String JOB_CONFIG_ID = "123";
6171
private static final JobStatus JOB_STATUS = JobStatus.RUNNING;
6272
private static final AttemptStatus ATTEMPT_STATUS = AttemptStatus.RUNNING;
63-
private static final JobConfig.ConfigType CONFIG_TYPE = JobConfig.ConfigType.CHECK_CONNECTION_SOURCE;
64-
private static final JobConfig JOB_CONFIG = new JobConfig()
65-
.withConfigType(CONFIG_TYPE)
66-
.withCheckConnection(new JobCheckConnectionConfig());
73+
private static final JobConfig.ConfigType CONFIG_TYPE = ConfigType.SYNC;
6774
private static final Path LOG_PATH = Path.of("log_path");
6875
private static final long CREATED_AT = System.currentTimeMillis() / 1000;
6976
private static final long RECORDS_EMITTED = 15L;
@@ -76,6 +83,12 @@ class JobConverterTest {
7683
private static final String FAILURE_STACKTRACE = "stacktrace";
7784
private static final boolean PARTIAL_SUCCESS = false;
7885

86+
private static final JobConfig JOB_CONFIG = new JobConfig()
87+
.withConfigType(CONFIG_TYPE)
88+
.withSync(new JobSyncConfig().withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog().withStreams(List.of(
89+
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("users")),
90+
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("accounts"))))));
91+
7992
private static final JobOutput JOB_OUTPUT = new JobOutput()
8093
.withOutputType(OutputType.SYNC)
8194
.withSync(new StandardSyncOutput()
@@ -104,7 +117,7 @@ class JobConverterTest {
104117
.id(JOB_ID)
105118
.configId(JOB_CONFIG_ID)
106119
.status(io.airbyte.api.model.generated.JobStatus.RUNNING)
107-
.configType(JobConfigType.CHECK_CONNECTION_SOURCE)
120+
.configType(JobConfigType.SYNC)
108121
.createdAt(CREATED_AT)
109122
.updatedAt(CREATED_AT))
110123
.attempts(Lists.newArrayList(new AttemptInfoRead()
@@ -149,7 +162,7 @@ class JobConverterTest {
149162
.id(JOB_ID)
150163
.configId(JOB_CONFIG_ID)
151164
.status(io.airbyte.api.model.generated.JobStatus.RUNNING)
152-
.configType(JobConfigType.CHECK_CONNECTION_SOURCE)
165+
.configType(JobConfigType.SYNC)
153166
.airbyteVersion(airbyteVersion.serialize())
154167
.sourceDefinition(sourceDefinitionRead)
155168
.destinationDefinition(destinationDefinitionRead);
@@ -192,31 +205,56 @@ public void setUp() {
192205
}
193206

194207
@Test
195-
public void testGetJobInfoRead() {
208+
void testGetJobInfoRead() {
196209
assertEquals(JOB_INFO, jobConverter.getJobInfoRead(job));
197210
}
198211

199212
@Test
200-
public void testGetDebugJobInfoRead() {
201-
assertEquals(JOB_DEBUG_INFO, jobConverter.getDebugJobInfoRead(JOB_INFO, sourceDefinitionRead, destinationDefinitionRead, airbyteVersion));
213+
void testGetDebugJobInfoRead() {
214+
assertEquals(JOB_DEBUG_INFO, JobConverter.getDebugJobInfoRead(JOB_INFO, sourceDefinitionRead, destinationDefinitionRead, airbyteVersion));
202215
}
203216

204217
@Test
205-
public void testGetJobWithAttemptsRead() {
206-
assertEquals(JOB_WITH_ATTEMPTS_READ, jobConverter.getJobWithAttemptsRead(job));
218+
void testGetJobWithAttemptsRead() {
219+
assertEquals(JOB_WITH_ATTEMPTS_READ, JobConverter.getJobWithAttemptsRead(job));
207220
}
208221

209222
@Test
210-
public void testGetJobRead() {
211-
final JobWithAttemptsRead jobReadActual = jobConverter.getJobWithAttemptsRead(job);
223+
void testGetJobRead() {
224+
final JobWithAttemptsRead jobReadActual = JobConverter.getJobWithAttemptsRead(job);
212225
assertEquals(JOB_WITH_ATTEMPTS_READ, jobReadActual);
213226
}
214227

215228
@Test
216-
public void testEnumConversion() {
229+
void testEnumConversion() {
217230
assertTrue(Enums.isCompatible(JobConfig.ConfigType.class, JobConfigType.class));
218231
assertTrue(Enums.isCompatible(JobStatus.class, io.airbyte.api.model.generated.JobStatus.class));
219232
assertTrue(Enums.isCompatible(AttemptStatus.class, io.airbyte.api.model.generated.AttemptStatus.class));
220233
}
221234

235+
// this test intentionally only looks at the reset config as the rest is the same here.
236+
@Test
237+
void testResetJobIncludesResetConfig() {
238+
final JobConfig resetConfig = new JobConfig()
239+
.withConfigType(ConfigType.RESET_CONNECTION)
240+
.withResetConnection(new JobResetConnectionConfig().withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(
241+
new io.airbyte.config.StreamDescriptor().withName("users"),
242+
new io.airbyte.config.StreamDescriptor().withName("accounts")))));
243+
final Job resetJob = new Job(
244+
JOB_ID,
245+
ConfigType.RESET_CONNECTION,
246+
JOB_CONFIG_ID,
247+
resetConfig,
248+
Collections.emptyList(),
249+
JobStatus.SUCCEEDED,
250+
CREATED_AT,
251+
CREATED_AT,
252+
CREATED_AT);
253+
254+
final ResetConfig expectedResetConfig = new ResetConfig().streamsToReset(List.of(
255+
new StreamDescriptor().name("users"),
256+
new StreamDescriptor().name("accounts")));
257+
assertEquals(expectedResetConfig, jobConverter.getJobInfoRead(resetJob).getJob().getResetConfig());
258+
}
259+
222260
}

0 commit comments

Comments
 (0)