Skip to content

Commit 2a369e8

Browse files
authored
Normalization logs: remove json parse warnings (#34978)
1 parent 95b05a8 commit 2a369e8

File tree

3 files changed

+8
-7
lines changed

3 files changed

+8
-7
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ MavenLocal debugging steps:
166166

167167
| Version | Date | Pull Request | Subject |
168168
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
169+
| 0.21.2 | 2024-02-20 | [\#34978](https://github.com/airbytehq/airbyte/pull/34978) | Reduce log noise in NormalizationLogParser. |
169170
| 0.21.1 | 2024-02-20 | [\#35199](https://github.com/airbytehq/airbyte/pull/35199) | Add thread names to the logs. |
170171
| 0.21.0 | 2024-02-16 | [\#35314](https://github.com/airbytehq/airbyte/pull/35314) | Delete S3StreamCopier classes. These have been superseded by the async destinations framework. |
171172
| 0.20.9 | 2024-02-15 | [\#35240](https://github.com/airbytehq/airbyte/pull/35240) | Make state emission to platform inside state manager itself. |

airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination/normalization/NormalizationLogParser.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ Stream<AirbyteMessage> toMessages(final String line) {
5353
if (Strings.isEmpty(line)) {
5454
return Stream.of(logMessage(Level.INFO, ""));
5555
}
56-
final Optional<JsonNode> json = Jsons.tryDeserialize(line);
56+
final Optional<JsonNode> json = Jsons.tryDeserializeWithoutWarn(line);
5757
if (json.isPresent()) {
5858
return jsonToMessage(json.get());
5959
} else {
@@ -96,7 +96,7 @@ private Stream<AirbyteMessage> jsonToMessage(final JsonNode jsonLine) {
9696
*/
9797
final String logLevel = (jsonLine.hasNonNull("level")) ? jsonLine.get("level").asText() : "";
9898
String logMsg = jsonLine.hasNonNull("msg") ? jsonLine.get("msg").asText() : "";
99-
Level level;
99+
final Level level;
100100
switch (logLevel) {
101101
case "debug" -> level = Level.DEBUG;
102102
case "info" -> level = Level.INFO;
@@ -117,15 +117,15 @@ private Stream<AirbyteMessage> jsonToMessage(final JsonNode jsonLine) {
117117
}
118118
}
119119

120-
private static AirbyteMessage logMessage(Level level, String message) {
120+
private static AirbyteMessage logMessage(final Level level, final String message) {
121121
return new AirbyteMessage()
122122
.withType(Type.LOG)
123123
.withLog(new AirbyteLogMessage()
124124
.withLevel(level)
125125
.withMessage(message));
126126
}
127127

128-
public static void main(String[] args) {
128+
public static void main(final String[] args) {
129129
final NormalizationLogParser normalizationLogParser = new NormalizationLogParser();
130130
final Stream<AirbyteMessage> airbyteMessageStream =
131131
normalizationLogParser.create(new BufferedReader(new InputStreamReader(System.in, StandardCharsets.UTF_8)));
@@ -135,8 +135,8 @@ public static void main(String[] args) {
135135
final String dbtErrorStack = String.join("\n", errors);
136136
if (!"".equals(dbtErrorStack)) {
137137
final Map<ErrorMapKeys, String> errorMap = SentryExceptionHelper.getUsefulErrorMessageAndTypeFromDbtError(dbtErrorStack);
138-
String internalMessage = errorMap.get(ErrorMapKeys.ERROR_MAP_MESSAGE_KEY);
139-
AirbyteMessage traceMessage = new AirbyteMessage()
138+
final String internalMessage = errorMap.get(ErrorMapKeys.ERROR_MAP_MESSAGE_KEY);
139+
final AirbyteMessage traceMessage = new AirbyteMessage()
140140
.withType(Type.TRACE)
141141
.withTrace(new AirbyteTraceMessage()
142142
.withType(AirbyteTraceMessage.Type.ERROR)
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.21.1
1+
version=0.21.2

0 commit comments

Comments
 (0)