Skip to content

Commit 7ba102b

Browse files
authored
Add and persist job failures for Normalization (#14790)
* added TracedException and uncaught exception handler * added trace message capturing * added tests for TRACE messages * pre-json logging * propagating normalization failures * log format json & fix hang * parsing dbt json logs * bump normalization version * tests * Benoit comments * update trace exception user message * review comments * bump version * bump version * review comments * nit comments * add normalization trace failure test * version bump * pmd * formatto * bump version
1 parent f924359 commit 7ba102b

File tree

17 files changed

+477
-17
lines changed

17 files changed

+477
-17
lines changed

airbyte-config/config-models/src/main/resources/types/NormalizationSummary.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,7 @@ properties:
1313
type: integer
1414
endTime:
1515
type: integer
16+
failures:
17+
type: array
18+
items:
19+
"$ref": FailureReason.yaml

airbyte-integrations/bases/base-normalization/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ WORKDIR /airbyte
2828
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
2929
ENTRYPOINT ["/airbyte/entrypoint.sh"]
3030

31-
LABEL io.airbyte.version=0.2.13
31+
LABEL io.airbyte.version=0.2.14
3232
LABEL io.airbyte.name=airbyte/normalization

airbyte-integrations/bases/base-normalization/entrypoint.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,11 @@ function main() {
123123
set +e # allow script to continue running even if next commands fail to run properly
124124
# We don't run dbt 1.0.x on all destinations (because their plugins don't support it yet)
125125
# So we need to only pass `--event-buffer-size` if it's supported by DBT.
126+
# Same goes for JSON formatted logging.
126127
check_dbt_event_buffer_size
127128
if [ "$ret" -eq 0 ]; then
128129
echo -e "\nDBT >=1.0.0 detected; using 10K event buffer size\n"
129-
dbt_additional_args="--event-buffer-size=10000"
130+
dbt_additional_args="--event-buffer-size=10000 --log-format json"
130131
else
131132
dbt_additional_args=""
132133
fi

airbyte-integrations/bases/base-normalization/main_dev_transform_catalog.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,19 @@
33
#
44

55

6+
import logging
7+
8+
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
9+
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
610
from normalization.transform_catalog.transform import main
711

812
if __name__ == "__main__":
9-
main()
13+
init_uncaught_exception_handler(logging.getLogger("airbyte"))
14+
try:
15+
main()
16+
except Exception as e:
17+
msg = (
18+
"Something went wrong while normalizing the data moved in this sync "
19+
+ "(failed to transform catalog into dbt project). See the logs for more details."
20+
)
21+
raise AirbyteTracedException.from_exception(e, message=msg)

airbyte-integrations/bases/base-normalization/main_dev_transform_config.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,19 @@
33
#
44

55

6+
import logging
7+
8+
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
9+
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
610
from normalization.transform_config.transform import main
711

812
if __name__ == "__main__":
9-
main()
13+
init_uncaught_exception_handler(logging.getLogger("airbyte"))
14+
try:
15+
main()
16+
except Exception as e:
17+
msg = (
18+
"Something went wrong while normalizing the data moved in this sync "
19+
+ "(failed to transform config for dbt project). See the logs for more details."
20+
)
21+
raise AirbyteTracedException.from_exception(e, message=msg)

airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultNormalizationWorker.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,19 @@
55
package io.airbyte.workers.general;
66

77
import io.airbyte.config.Configs.WorkerEnvironment;
8+
import io.airbyte.config.FailureReason;
89
import io.airbyte.config.NormalizationInput;
910
import io.airbyte.config.NormalizationSummary;
11+
import io.airbyte.protocol.models.AirbyteTraceMessage;
1012
import io.airbyte.workers.exception.WorkerException;
13+
import io.airbyte.workers.helper.FailureHelper;
1114
import io.airbyte.workers.normalization.NormalizationRunner;
1215
import io.airbyte.workers.normalization.NormalizationWorker;
1316
import java.nio.file.Files;
1417
import java.nio.file.Path;
1518
import java.time.Duration;
19+
import java.util.ArrayList;
20+
import java.util.List;
1621
import java.util.concurrent.atomic.AtomicBoolean;
1722
import org.apache.commons.lang3.time.DurationFormatUtils;
1823
import org.slf4j.Logger;
@@ -27,6 +32,8 @@ public class DefaultNormalizationWorker implements NormalizationWorker {
2732
private final int attempt;
2833
private final NormalizationRunner normalizationRunner;
2934
private final WorkerEnvironment workerEnvironment;
35+
private final List<FailureReason> traceFailureReasons = new ArrayList<>();
36+
private boolean failed = false;
3037

3138
private final AtomicBoolean cancelled;
3239

@@ -58,10 +65,10 @@ public NormalizationSummary run(final NormalizationInput input, final Path jobRo
5865

5966
if (!normalizationRunner.normalize(jobId, attempt, normalizationRoot, input.getDestinationConfiguration(), input.getCatalog(),
6067
input.getResourceRequirements())) {
61-
throw new WorkerException("Normalization Failed.");
68+
buildFailureReasonsAndSetFailure();
6269
}
6370
} catch (final Exception e) {
64-
throw new WorkerException("Normalization Failed.", e);
71+
buildFailureReasonsAndSetFailure();
6572
}
6673

6774
if (cancelled.get()) {
@@ -77,11 +84,25 @@ public NormalizationSummary run(final NormalizationInput input, final Path jobRo
7784
.withStartTime(startTime)
7885
.withEndTime(endTime);
7986

87+
if (!traceFailureReasons.isEmpty()) {
88+
summary.setFailures(traceFailureReasons);
89+
LOGGER.error("Normalization Failed.");
90+
} else if (failed) {
91+
throw new WorkerException("Normalization Failed.");
92+
}
93+
8094
LOGGER.info("Normalization summary: {}", summary);
8195

8296
return summary;
8397
}
8498

99+
private void buildFailureReasonsAndSetFailure() {
100+
normalizationRunner.getTraceMessages()
101+
.filter(traceMessage -> traceMessage.getType() == AirbyteTraceMessage.Type.ERROR)
102+
.forEach(traceMessage -> traceFailureReasons.add(FailureHelper.normalizationFailure(traceMessage, Long.valueOf(jobId), attempt)));
103+
failed = true;
104+
}
105+
85106
@Override
86107
public void cancel() {
87108
LOGGER.info("Cancelling normalization runner...");

airbyte-workers/src/main/java/io/airbyte/workers/helper/FailureHelper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ public static FailureReason normalizationFailure(final Throwable t, final Long j
115115
.withExternalMessage("Something went wrong during normalization");
116116
}
117117

118+
public static FailureReason normalizationFailure(final AirbyteTraceMessage m, final Long jobId, final Integer attemptNumber) {
119+
return genericFailure(m, jobId, attemptNumber)
120+
.withFailureOrigin(FailureOrigin.NORMALIZATION)
121+
.withExternalMessage(m.getError().getMessage());
122+
}
123+
118124
public static FailureReason dbtFailure(final Throwable t, final Long jobId, final Integer attemptNumber) {
119125
return genericFailure(t, jobId, attemptNumber)
120126
.withFailureOrigin(FailureOrigin.DBT)

airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,34 @@
88
import com.google.common.annotations.VisibleForTesting;
99
import com.google.common.base.Strings;
1010
import com.google.common.collect.ImmutableMap;
11+
import io.airbyte.commons.io.IOs;
1112
import io.airbyte.commons.io.LineGobbler;
1213
import io.airbyte.commons.json.Jsons;
1314
import io.airbyte.commons.logging.LoggingHelper.Color;
1415
import io.airbyte.commons.logging.MdcScope;
1516
import io.airbyte.commons.logging.MdcScope.Builder;
1617
import io.airbyte.config.OperatorDbt;
1718
import io.airbyte.config.ResourceRequirements;
19+
import io.airbyte.protocol.models.AirbyteErrorTraceMessage;
20+
import io.airbyte.protocol.models.AirbyteErrorTraceMessage.FailureType;
21+
import io.airbyte.protocol.models.AirbyteMessage;
22+
import io.airbyte.protocol.models.AirbyteMessage.Type;
23+
import io.airbyte.protocol.models.AirbyteTraceMessage;
1824
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
1925
import io.airbyte.workers.WorkerConfigs;
2026
import io.airbyte.workers.WorkerConstants;
2127
import io.airbyte.workers.WorkerUtils;
2228
import io.airbyte.workers.exception.WorkerException;
2329
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
2430
import io.airbyte.workers.process.ProcessFactory;
31+
import java.io.InputStream;
2532
import java.nio.file.Path;
2633
import java.util.Collections;
34+
import java.util.List;
2735
import java.util.Map;
2836
import java.util.concurrent.TimeUnit;
37+
import java.util.stream.Collectors;
38+
import java.util.stream.Stream;
2939
import org.slf4j.Logger;
3040
import org.slf4j.LoggerFactory;
3141

@@ -40,6 +50,8 @@ public class DefaultNormalizationRunner implements NormalizationRunner {
4050
private final DestinationType destinationType;
4151
private final ProcessFactory processFactory;
4252
private final String normalizationImageName;
53+
private final NormalizationAirbyteStreamFactory streamFactory = new NormalizationAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER);
54+
private Map<Type, List<AirbyteMessage>> airbyteMessagesByType;
4355

4456
private Process process = null;
4557

@@ -135,7 +147,30 @@ private boolean runProcess(final String jobId,
135147
Collections.emptyMap(),
136148
args);
137149

138-
LineGobbler.gobble(process.getInputStream(), LOGGER::info, CONTAINER_LOG_MDC_BUILDER);
150+
try (final InputStream stdout = process.getInputStream()) {
151+
// finds and collects any AirbyteMessages from stdout
152+
// also builds a list of raw dbt errors and stores in streamFactory
153+
airbyteMessagesByType = streamFactory.create(IOs.newBufferedReader(stdout))
154+
.collect(Collectors.groupingBy(AirbyteMessage::getType));
155+
156+
// picks up error logs from dbt
157+
String dbtErrorStack = String.join("\n\t", streamFactory.getDbtErrors());
158+
159+
if (!"".equals(dbtErrorStack)) {
160+
AirbyteMessage dbtTraceMessage = new AirbyteMessage()
161+
.withType(Type.TRACE)
162+
.withTrace(new AirbyteTraceMessage()
163+
.withType(AirbyteTraceMessage.Type.ERROR)
164+
.withEmittedAt((double) System.currentTimeMillis())
165+
.withError(new AirbyteErrorTraceMessage()
166+
.withFailureType(FailureType.SYSTEM_ERROR) // TODO: decide on best FailureType for this
167+
.withMessage("Normalization failed during the dbt run. This may indicate a problem with the data itself.")
168+
.withInternalMessage(dbtErrorStack)
169+
.withStackTrace(dbtErrorStack)));
170+
171+
airbyteMessagesByType.putIfAbsent(Type.TRACE, List.of(dbtTraceMessage));
172+
}
173+
}
139174
LineGobbler.gobble(process.getErrorStream(), LOGGER::error, CONTAINER_LOG_MDC_BUILDER);
140175

141176
WorkerUtils.wait(process);
@@ -163,6 +198,14 @@ public void close() throws Exception {
163198
}
164199
}
165200

201+
@Override
202+
public Stream<AirbyteTraceMessage> getTraceMessages() {
203+
if (airbyteMessagesByType != null && airbyteMessagesByType.get(Type.TRACE) != null) {
204+
return airbyteMessagesByType.get(Type.TRACE).stream().map(AirbyteMessage::getTrace);
205+
}
206+
return Stream.empty();
207+
}
208+
166209
@VisibleForTesting
167210
DestinationType getDestinationType() {
168211
return destinationType;
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.workers.normalization;
6+
7+
import com.fasterxml.jackson.databind.JsonNode;
8+
import com.fasterxml.jackson.databind.node.JsonNodeType;
9+
import io.airbyte.commons.json.Jsons;
10+
import io.airbyte.commons.logging.MdcScope;
11+
import io.airbyte.protocol.models.AirbyteLogMessage;
12+
import io.airbyte.protocol.models.AirbyteMessage;
13+
import io.airbyte.workers.internal.AirbyteStreamFactory;
14+
import java.io.BufferedReader;
15+
import java.util.ArrayList;
16+
import java.util.List;
17+
import java.util.Optional;
18+
import java.util.stream.Stream;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
/**
23+
* Creates a stream from an input stream. The produced stream attempts to parse each line of the
24+
* InputStream into a AirbyteMessage. If the line cannot be parsed into a AirbyteMessage it is
25+
* assumed to be from dbt. dbt [error] messages are also parsed
26+
*
27+
* <p>
28+
* If a line starts with a AirbyteMessage and then has other characters after it, that
29+
* AirbyteMessage will still be parsed. If there are multiple AirbyteMessage records on the same
30+
* line, only the first will be parsed.
31+
*/
32+
public class NormalizationAirbyteStreamFactory implements AirbyteStreamFactory {
33+
34+
private static final Logger LOGGER = LoggerFactory.getLogger(NormalizationAirbyteStreamFactory.class);
35+
36+
private final MdcScope.Builder containerLogMdcBuilder;
37+
private final Logger logger;
38+
private final List<String> dbtErrors = new ArrayList<>();
39+
40+
public NormalizationAirbyteStreamFactory(final MdcScope.Builder containerLogMdcBuilder) {
41+
this(LOGGER, containerLogMdcBuilder);
42+
}
43+
44+
NormalizationAirbyteStreamFactory(final Logger logger, final MdcScope.Builder containerLogMdcBuilder) {
45+
this.logger = logger;
46+
this.containerLogMdcBuilder = containerLogMdcBuilder;
47+
}
48+
49+
@Override
50+
public Stream<AirbyteMessage> create(final BufferedReader bufferedReader) {
51+
return bufferedReader
52+
.lines()
53+
.flatMap(this::filterOutAndHandleNonJsonLines)
54+
.flatMap(this::filterOutAndHandleNonAirbyteMessageLines)
55+
// so now we are just left with AirbyteMessages
56+
.filter(airbyteMessage -> {
57+
final boolean isLog = airbyteMessage.getType() == AirbyteMessage.Type.LOG;
58+
if (isLog) {
59+
try (final var mdcScope = containerLogMdcBuilder.build()) {
60+
internalLog(airbyteMessage.getLog());
61+
}
62+
}
63+
return !isLog;
64+
});
65+
}
66+
67+
private Stream<JsonNode> filterOutAndHandleNonJsonLines(String line) {
68+
final Optional<JsonNode> jsonLine = Jsons.tryDeserialize(line);
69+
if (jsonLine.isEmpty()) {
70+
// we log as info all the lines that are not valid json.
71+
try (final var mdcScope = containerLogMdcBuilder.build()) {
72+
logger.info(line);
73+
// this is really hacky and vulnerable to picking up lines we don't want,
74+
// however it is only for destinations that are using dbt version < 1.0.
75+
// For v1 + we switch on JSON logging and parse those in the next block.
76+
if (line.contains("[error]")) {
77+
dbtErrors.add(line);
78+
}
79+
}
80+
}
81+
return jsonLine.stream();
82+
}
83+
84+
private Stream<AirbyteMessage> filterOutAndHandleNonAirbyteMessageLines(JsonNode jsonLine) {
85+
final Optional<AirbyteMessage> m = Jsons.tryObject(jsonLine, AirbyteMessage.class);
86+
if (m.isEmpty()) {
87+
// valid JSON but not an AirbyteMessage, so we assume this is a dbt json log
88+
try {
89+
final String logLevel = (jsonLine.getNodeType() == JsonNodeType.NULL || jsonLine.get("level").isNull())
90+
? ""
91+
: jsonLine.get("level").asText();
92+
final String logMsg = jsonLine.get("msg").isNull() ? "" : jsonLine.get("msg").asText();
93+
try (final var mdcScope = containerLogMdcBuilder.build()) {
94+
switch (logLevel) {
95+
case "debug" -> logger.debug(logMsg);
96+
case "info" -> logger.info(logMsg);
97+
case "warn" -> logger.warn(logMsg);
98+
case "error" -> logAndCollectErrorMessage(logMsg);
99+
default -> logger.info(jsonLine.asText()); // this shouldn't happen but logging it to avoid hiding unexpected lines.
100+
}
101+
}
102+
} catch (final Exception e) {
103+
logger.info(jsonLine.asText());
104+
}
105+
}
106+
return m.stream();
107+
}
108+
109+
private void logAndCollectErrorMessage(String logMessage) {
110+
logger.error(logMessage);
111+
dbtErrors.add(logMessage);
112+
}
113+
114+
public List<String> getDbtErrors() {
115+
return dbtErrors;
116+
}
117+
118+
private void internalLog(final AirbyteLogMessage logMessage) {
119+
switch (logMessage.getLevel()) {
120+
case FATAL, ERROR -> logger.error(logMessage.getMessage());
121+
case WARN -> logger.warn(logMessage.getMessage());
122+
case DEBUG -> logger.debug(logMessage.getMessage());
123+
case TRACE -> logger.trace(logMessage.getMessage());
124+
default -> logger.info(logMessage.getMessage());
125+
}
126+
}
127+
128+
}

airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
import com.fasterxml.jackson.databind.JsonNode;
88
import io.airbyte.config.OperatorDbt;
99
import io.airbyte.config.ResourceRequirements;
10+
import io.airbyte.protocol.models.AirbyteTraceMessage;
1011
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
1112
import java.nio.file.Path;
13+
import java.util.stream.Stream;
1214

1315
public interface NormalizationRunner extends AutoCloseable {
1416

@@ -62,4 +64,6 @@ boolean normalize(String jobId,
6264
ResourceRequirements resourceRequirements)
6365
throws Exception;
6466

67+
Stream<AirbyteTraceMessage> getTraceMessages();
68+
6569
}

0 commit comments

Comments
 (0)