Skip to content

Commit 4a29f3e

Browse files
benmoriceauakashkulk
authored andcommitted
Fix MDC scope (#19501)
* Fix MDC scope * format
1 parent 4885afa commit 4a29f3e

File tree

4 files changed

+14
-9
lines changed

4 files changed

+14
-9
lines changed

airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteDestination.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
public class DefaultAirbyteDestination implements AirbyteDestination {
3737

3838
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAirbyteDestination.class);
39-
private static final MdcScope.Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
39+
public static final MdcScope.Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
4040
.setLogPrefix("destination")
4141
.setPrefixColor(Color.YELLOW_BACKGROUND);
4242

airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class DefaultAirbyteSource implements AirbyteSource {
3939
private static final Duration HEARTBEAT_FRESH_DURATION = Duration.of(5, ChronoUnit.MINUTES);
4040
private static final Duration GRACEFUL_SHUTDOWN_DURATION = Duration.of(1, ChronoUnit.MINUTES);
4141

42-
private static final MdcScope.Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
42+
public static final MdcScope.Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
4343
.setLogPrefix("source")
4444
.setPrefixColor(Color.BLUE_BACKGROUND);
4545

airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import datadog.trace.api.Trace;
1313
import io.airbyte.commons.features.FeatureFlags;
1414
import io.airbyte.commons.json.Jsons;
15+
import io.airbyte.commons.logging.MdcScope;
1516
import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider;
1617
import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory;
1718
import io.airbyte.commons.temporal.TemporalUtils;
@@ -120,7 +121,8 @@ public Optional<String> runJob() throws Exception {
120121
final AirbyteSource airbyteSource =
121122
WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equals(sourceLauncherConfig.getDockerImage()) ? new EmptyAirbyteSource(
122123
featureFlags.useStreamCapableState())
123-
: new DefaultAirbyteSource(sourceLauncher, getStreamFactory(sourceLauncherConfig.getProtocolVersion()));
124+
: new DefaultAirbyteSource(sourceLauncher,
125+
getStreamFactory(sourceLauncherConfig.getProtocolVersion(), DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER));
124126

125127
MetricClientFactory.initialize(MetricEmittingApps.WORKER);
126128
final MetricClient metricClient = MetricClientFactory.getMetricClient();
@@ -132,7 +134,8 @@ public Optional<String> runJob() throws Exception {
132134
Math.toIntExact(jobRunConfig.getAttemptId()),
133135
airbyteSource,
134136
new NamespacingMapper(syncInput.getNamespaceDefinition(), syncInput.getNamespaceFormat(), syncInput.getPrefix()),
135-
new DefaultAirbyteDestination(destinationLauncher, getStreamFactory(destinationLauncherConfig.getProtocolVersion()),
137+
new DefaultAirbyteDestination(destinationLauncher, getStreamFactory(destinationLauncherConfig.getProtocolVersion(),
138+
DefaultAirbyteDestination.CONTAINER_LOG_MDC_BUILDER),
136139
new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())),
137140
new AirbyteMessageTracker(),
138141
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)),
@@ -146,10 +149,10 @@ public Optional<String> runJob() throws Exception {
146149
return Optional.of(Jsons.serialize(replicationOutput));
147150
}
148151

149-
private AirbyteStreamFactory getStreamFactory(final Version protocolVersion) {
152+
private AirbyteStreamFactory getStreamFactory(final Version protocolVersion, final MdcScope.Builder mdcScope) {
150153
return protocolVersion != null
151-
? new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, protocolVersion)
152-
: new DefaultAirbyteStreamFactory();
154+
? new VersionedAirbyteStreamFactory(serDeProvider, migratorFactory, protocolVersion, mdcScope)
155+
: new DefaultAirbyteStreamFactory(mdcScope);
153156
}
154157

155158
}

airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,8 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
276276
WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equals(sourceLauncherConfig.getDockerImage())
277277
? new EmptyAirbyteSource(featureFlags.useStreamCapableState())
278278
: new DefaultAirbyteSource(sourceLauncher,
279-
new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, sourceLauncherConfig.getProtocolVersion()));
279+
new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, sourceLauncherConfig.getProtocolVersion(),
280+
DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER));
280281
MetricClientFactory.initialize(MetricEmittingApps.WORKER);
281282
final MetricClient metricClient = MetricClientFactory.getMetricClient();
282283
final WorkerMetricReporter metricReporter = new WorkerMetricReporter(metricClient, sourceLauncherConfig.getDockerImage());
@@ -287,7 +288,8 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
287288
airbyteSource,
288289
new NamespacingMapper(syncInput.getNamespaceDefinition(), syncInput.getNamespaceFormat(), syncInput.getPrefix()),
289290
new DefaultAirbyteDestination(destinationLauncher,
290-
new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion()),
291+
new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion(),
292+
DefaultAirbyteDestination.CONTAINER_LOG_MDC_BUILDER),
291293
new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())),
292294
new AirbyteMessageTracker(),
293295
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)),

0 commit comments

Comments
 (0)