Skip to content

Commit f15234b

Browse files
authored
Log sync summary and sync activity seperators (#16314)
* Log activity start and end * pretty-print JSON replication and failure summaries * simplify
1 parent 68ab523 commit f15234b

File tree

5 files changed

+47
-2
lines changed

5 files changed

+47
-2
lines changed

airbyte-commons/src/main/java/io/airbyte/commons/io/LineGobbler.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77
import io.airbyte.commons.concurrency.VoidCallable;
88
import io.airbyte.commons.logging.MdcScope;
99
import java.io.BufferedReader;
10+
import java.io.ByteArrayInputStream;
1011
import java.io.IOException;
1112
import java.io.InputStream;
13+
import java.nio.charset.StandardCharsets;
1214
import java.util.Map;
1315
import java.util.concurrent.ExecutorService;
1416
import java.util.concurrent.Executors;
@@ -26,6 +28,35 @@ public static void gobble(final InputStream is, final Consumer<String> consumer)
2628
gobble(is, consumer, GENERIC, MdcScope.DEFAULT_BUILDER);
2729
}
2830

31+
public static void gobble(final String message, final Consumer<String> consumer) {
32+
final InputStream stringAsSteam = new ByteArrayInputStream(message.getBytes(StandardCharsets.UTF_8));
33+
gobble(stringAsSteam, consumer);
34+
}
35+
36+
public static void gobble(final String message) {
37+
gobble(message, LOGGER::info);
38+
}
39+
40+
/**
41+
* Used to emit a visual separator in the user-facing logs indicating a start of a meaningful
42+
* temporal activity
43+
*
44+
* @param message
45+
*/
46+
public static void startSection(final String message) {
47+
gobble("\r\n----- START " + message + " -----\r\n\r\n");
48+
}
49+
50+
/**
51+
* Used to emit a visual separator in the user-facing logs indicating a end of a meaningful temporal
52+
* activity
53+
*
54+
* @param message
55+
*/
56+
public static void endSection(final String message) {
57+
gobble("\r\n----- END " + message + " -----\r\n\r\n");
58+
}
59+
2960
public static void gobble(final InputStream is, final Consumer<String> consumer, final MdcScope.Builder mdcScopeBuilder) {
3061
gobble(is, consumer, GENERIC, mdcScopeBuilder);
3162
}

airbyte-workers/src/main/java/io/airbyte/workers/general/DbtTransformationWorker.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.workers.general;
66

7+
import io.airbyte.commons.io.LineGobbler;
78
import io.airbyte.config.OperatorDbtInput;
89
import io.airbyte.config.ResourceRequirements;
910
import io.airbyte.workers.Worker;
@@ -42,6 +43,7 @@ public DbtTransformationWorker(final String jobId,
4243
@Override
4344
public Void run(final OperatorDbtInput operatorDbtInput, final Path jobRoot) throws WorkerException {
4445
final long startTime = System.currentTimeMillis();
46+
LineGobbler.startSection("DBT TRANSFORMATION");
4547

4648
try (dbtTransformationRunner) {
4749
LOGGER.info("Running dbt transformation.");
@@ -65,6 +67,7 @@ public Void run(final OperatorDbtInput operatorDbtInput, final Path jobRoot) thr
6567

6668
final Duration duration = Duration.ofMillis(System.currentTimeMillis() - startTime);
6769
LOGGER.info("Dbt Transformation executed in {}.", duration.toMinutesPart());
70+
LineGobbler.endSection("DBT TRANSFORMATION");
6871

6972
return null;
7073
}

airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public DefaultCheckConnectionWorker(final WorkerConfigs workerConfigs, final Int
5858

5959
@Override
6060
public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Path jobRoot) throws WorkerException {
61+
LineGobbler.startSection("CHECK");
6162

6263
try {
6364
process = integrationLauncher.check(
@@ -88,6 +89,7 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa
8889

8990
LOGGER.debug("Check connection job subprocess finished with exit code {}", exitCode);
9091
LOGGER.debug("Check connection job received output: {}", output);
92+
LineGobbler.endSection("CHECK");
9193
return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(output);
9294
} else {
9395
final String message = String.format("Error checking connection, status: %s, exit code: %d", status, exitCode);

airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultNormalizationWorker.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.workers.general;
66

7+
import io.airbyte.commons.io.LineGobbler;
78
import io.airbyte.config.Configs.WorkerEnvironment;
89
import io.airbyte.config.FailureReason;
910
import io.airbyte.config.NormalizationInput;
@@ -54,7 +55,7 @@ public NormalizationSummary run(final NormalizationInput input, final Path jobRo
5455
final long startTime = System.currentTimeMillis();
5556

5657
try (normalizationRunner) {
57-
LOGGER.info("Running normalization.");
58+
LineGobbler.startSection("DEFAULT NORMALIZATION");
5859
normalizationRunner.start();
5960

6061
Path normalizationRoot = null;
@@ -92,6 +93,7 @@ public NormalizationSummary run(final NormalizationInput input, final Path jobRo
9293
}
9394

9495
LOGGER.info("Normalization summary: {}", summary);
96+
LineGobbler.endSection("DEFAULT NORMALIZATION");
9597

9698
return summary;
9799
}

airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
package io.airbyte.workers.general;
66

7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import io.airbyte.commons.io.LineGobbler;
79
import io.airbyte.config.FailureReason;
810
import io.airbyte.config.ReplicationAttemptSummary;
911
import io.airbyte.config.ReplicationOutput;
@@ -117,6 +119,7 @@ public DefaultReplicationWorker(final String jobId,
117119
@Override
118120
public final ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRoot) throws WorkerException {
119121
LOGGER.info("start sync worker. job id: {} attempt id: {}", jobId, attempt);
122+
LineGobbler.startSection("REPLICATION");
120123

121124
// todo (cgardens) - this should not be happening in the worker. this is configuration information
122125
// that is independent of workflow executions.
@@ -246,7 +249,6 @@ else if (hasFailed.get()) {
246249
.withStartTime(startTime)
247250
.withEndTime(System.currentTimeMillis());
248251

249-
LOGGER.info("sync summary: {}", summary);
250252
final ReplicationOutput output = new ReplicationOutput()
251253
.withReplicationAttemptSummary(summary)
252254
.withOutputCatalog(destinationConfig.getCatalog());
@@ -293,6 +295,11 @@ else if (hasFailed.get()) {
293295
metricReporter.trackStateMetricTrackerError();
294296
}
295297

298+
final ObjectMapper mapper = new ObjectMapper();
299+
LOGGER.info("sync summary: {}", mapper.writerWithDefaultPrettyPrinter().writeValueAsString(summary));
300+
LOGGER.info("failures: {}", mapper.writerWithDefaultPrettyPrinter().writeValueAsString(failures));
301+
302+
LineGobbler.endSection("REPLICATION");
296303
return output;
297304
} catch (final Exception e) {
298305
throw new WorkerException("Sync failed", e);

0 commit comments

Comments
 (0)